Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
7c2b9e0
Add hook getter and setter to the class
jimjshields Jun 9, 2020
0ce9364
Add hook context and call hooks
jimjshields Jun 9, 2020
750fb45
Change API to be a global event registry that can handle multiple hoo…
jimjshields Jun 15, 2020
f557af1
Clean up events API & add unit tests
jimjshields Jun 16, 2020
d2d19a7
Refactor running hooks & add functional tests
jimjshields Jun 16, 2020
0d456c0
Fix style & Python 2 issues
jimjshields Jun 25, 2020
b0415a2
Merge pull request #71 from jimjshields/feature/add-pre-and-post-proc…
spulec Jun 25, 2020
5df7bbc
0.1.5
spulec Jun 25, 2020
d6c88f3
Fix rst.
spulec Jun 25, 2020
b1eeed9
Refactor connection creation.
spulec Jan 18, 2021
defb07f
Merge pull request #75 from spulec/cleanup-boto-conns
spulec Jan 18, 2021
5be3712
0.1.6
spulec Jan 18, 2021
d0c8e3d
Cleanup make publish.
spulec Jan 18, 2021
4889961
Add initial SimpleProcessWorker implementation
psarma89 Apr 7, 2021
9bf631a
Add SimpleManagerWorker intial implementation
psarma89 Apr 7, 2021
c9bb542
Add a simple-worker flag to use the SimpleManagerWorker class
psarma89 Apr 7, 2021
f3af494
Fix manager worker tests
psarma89 Apr 7, 2021
394246d
Remove visibility timeout check from SimpleProcessWorkerbut keep the …
psarma89 Apr 9, 2021
c2041a1
Default batchsize to be 1 if simple-worker flag is passed
psarma89 Apr 9, 2021
e02acd2
Rename SimpleManagerWorker to BaseManager
psarma89 Apr 9, 2021
1ef4c9f
Subclass SimpleManagerWorker and ManagerWorker from BaseManager
psarma89 Apr 12, 2021
ab0d385
Update BaseManager to SimpleManagerWorker for main
psarma89 Apr 12, 2021
fdd9d40
Move SimpleProcessWorker methods to SimpleManagerWorker sub class
psarma89 Apr 12, 2021
f0e98b3
Add methods to BaseManager that sub classes should override and imple…
psarma89 Apr 12, 2021
f3b9d66
Move worker_children state down to the subclasses
psarma89 Apr 12, 2021
e587c13
Make replace_workers something that subclasses have to implement
psarma89 Apr 12, 2021
0c49f79
Move up _run_hooks to the parent class BaseWorker
psarma89 Apr 14, 2021
4bafcfa
Move common pre_process_context code from child classes to BaseWorker…
psarma89 Apr 14, 2021
fbc716e
Move messages_processed into instance state for ProcessWorker
psarma89 Apr 15, 2021
891150c
Add interval back to Simple* classes
psarma89 Apr 15, 2021
4194526
Refactor ProcessWorker and SimpleProcessWorker to better align proces…
psarma89 Apr 15, 2021
ad710a9
Move _process_task into parent BaseWorker
psarma89 Apr 15, 2021
0a06269
Created a new base class BaseProcessWorker so that we don't give Read…
psarma89 Apr 15, 2021
c5411e3
Fix test_master_shuts_down_busy_process_workers to look at worker pro…
psarma89 Apr 15, 2021
e81e2de
Add tests for SimpleManagerWorker
psarma89 Apr 15, 2021
407dcbe
Add tests for different default batchsize for simple worker vs normal…
psarma89 Apr 15, 2021
f62bafe
Fix pre-commit issue
psarma89 Apr 19, 2021
b8d63ee
Revert test test_master_shuts_down_busy_process_workers to original
psarma89 May 13, 2021
bcfdcc1
Don't run python 2 travis workflow
psarma89 May 14, 2021
44da356
Merge pull request #76 from psarma89/feature/simple-process-worker
spulec May 14, 2021
01f43cd
1.0.0
spulec May 14, 2021
e8a564e
Update README for simple process worker
psarma89 May 17, 2021
cf5f25a
Merge pull request #77 from psarma89/simple-process-worker-update-readme
spulec May 17, 2021
229ebd2
Make worker_children class configurable in manager subclasses
januszm Aug 30, 2021
b8d5ec1
Add Github Action for linter and tests
januszm Aug 30, 2021
d0f0dac
Properly ignore flake8 with noqa
januszm Aug 30, 2021
958f9d6
Ignore VIRTUAL_ENV check in CI
januszm Aug 30, 2021
0488e07
Merge pull request #78 from januszm/fix/configurable_worker_classes
spulec Sep 2, 2021
85a711f
Add message_id to pre_process_context
psarma89 Oct 5, 2021
1b22c9d
Merge pull request #80 from psarma89/feature/add-message-id-to-context
spulec Oct 8, 2021
1aeef3b
1.0.1
spulec Oct 8, 2021
05c7487
add --endpint-url params
opapy Nov 29, 2021
a983ce1
Merge pull request #81 from opapy/feature/add-endpoint-url-params
spulec Nov 29, 2021
3f2b2f0
Fix typos in README file
filipemcg Dec 20, 2021
7fce9af
Merge pull request #82 from filipemcg/master
spulec Dec 20, 2021
8852017
As a starting point fix tests that were broken by PR #81 for endpoint…
psarma89 Jul 12, 2022
272fd18
Removed introduced formatting
psarma89 Jul 12, 2022
b26e9f5
Merge pull request #85 from psarma89/bug/fix-tests-broken-by-endpoint…
spulec Jul 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: CI

on: [push]

jobs:
test:
runs-on: ubuntu-latest
env:
VIRTUAL_ENV: ignore
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r development.txt
- name: Lint
run: |
pip install flake8
python -m flake8 pyqs tests
- name: Test
run: |
make test
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dist/*
*.egg-info/*
build/*
htmlcov/*
.idea/*
*~
*#

1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
language: python

python:
- "2.7"
- "3.6"
- "3.7"
- "3.8"
Expand Down
19 changes: 19 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
Changelog
---------

1.0.1
~~~~~
- Add MessageId for tracking task executions

1.0.0
~~~~~
- Drop Py2 support
- Add new SimpleProcessWorker (https://github.com/spulec/PyQS/pull/76)

0.1.6
~~~~~

- Fix broken pickle of botocore clients.

0.1.5
~~~~~

- Add events hooks for pre and post processors.

0.1.4
~~~~~

Expand Down
27 changes: 4 additions & 23 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,7 @@ clean:
@find . -name __pycache__ -delete
@rm -rf .coverage *.egg-info *.log build dist MANIFEST yc

publish: clean tag
@if [ -e "$$HOME/.pypirc" ]; then \
echo "Uploading to '$(CUSTOM_PIP_INDEX)'"; \
python setup.py register -r "$(CUSTOM_PIP_INDEX)"; \
python setup.py sdist upload -r "$(CUSTOM_PIP_INDEX)"; \
else \
echo "You should create a file called '.pypirc' under your home dir.\n"; \
echo "That's the right place to configure 'pypi' repos.\n"; \
exit 1; \
fi

tag:
@if [ $$(git rev-list $$(git describe --abbrev=0 --tags)..HEAD --count) -gt 0 ]; then \
if [ $$(git log -n 1 --oneline $$(git describe --abbrev=0 --tags)..HEAD CHANGELOG.rst | wc -l) -gt 0 ]; then \
git tag $$(python setup.py --version) && git push --tags || echo 'Version already released, update your version!'; \
else \
echo "CHANGELOG not updated since last release!"; \
exit 1; \
fi; \
else \
echo "No commits since last release!"; \
exit 1;\
fi
publish: clean
rm -rf dist
python -m pep517.build --source --binary .
twine upload dist/*
41 changes: 40 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ To read tasks we need to run PyQS. If the task is already in your

$ pyqs email.tasks.send_email

If we want want to run all tasks with a certain prefix. This is based on
If we want to run all tasks with a certain prefix. This is based on
Python's `fnmatch <http://docs.python.org/2/library/fnmatch.html>`__.

.. code:: bash
Expand All @@ -100,6 +100,45 @@ messages.

$ pyqs send_email --concurrency 10

Simple Process Worker
~~~~~~~~~~~~~~~~~~~~~

To use a simpler version of PyQS that deals with some of the edge cases in the original implementation, pass the ``simple-worker`` flag.

.. code:: bash

$ pyqs send_email --simple-worker

The Simple Process Worker differs in the following way from the original implementation.

* Does not use an internal queue and removes support for the ``prefetch-multiplier`` flag. This helps simply the mental model required, as messages are not on both the SQS queue and an internal queue.
* When the ``simple-worker`` flag is passed, the default ``batchsize`` is 1 instead of 10. This is configurable.
* Does not check the visibility timeout when reading or processing a message from SQS.
* Allowing the worker to process the message even past its visibility timeout means we solve the problem of never processing a message if ``max_receives=1`` and we incorrectly set a shorter visibility timeout and exceed the visibility timeout. Previously, this message would have ended up in the DLQ, if one was configured, and never actually processed.
* It increases the probability that we process a message more than once, especially if ``batchsize > 1``, but this can be solved by the developer checking if the message has already been processed.

Hooks
~~~~~

PyQS has an event registry which can be used to run a function before or after every tasks runs.

.. code:: python

from pyqs import task, events

def print_pre_process(context):
print({"pre_process": context})

def print_post_process(context):
print({"post_process": context})

events.register_event("pre_process", print_pre_process)
events.register_event("post_process", print_post_process)

@task(queue="my_queue")
def send_email(subject):
pass

Operational Notes
~~~~~~~~~~~~~~~~~

Expand Down
2 changes: 2 additions & 0 deletions development.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ coverage==4.4.1
mock==1.0.1
moto==1.3.13
nose==1.3.0
pep517==0.9.1
pre-commit==0.7.6
sure==1.2.2
twine==3.3.0
functools32;python_version=='2.7'
pycodestyle==2.4.0
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
2 changes: 1 addition & 1 deletion pyqs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .decorator import task # noqa

__title__ = 'pyqs'
__version__ = '0.1.4'
__version__ = '1.0.1'
43 changes: 43 additions & 0 deletions pyqs/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""
pyqs events registry: register callback functions on pyqs events

Usage:
from pyqs.events import register_event

register_event("pre_process", lambda context: print(context))
"""


class Events:
def __init__(self):
self.pre_process = []
self.post_process = []

def clear(self):
self.pre_process = []
self.post_process = []


# Global singleton
_EVENTS = Events()


class NoEventException(Exception):
pass


def register_event(name, callback):
if hasattr(_EVENTS, name):
getattr(_EVENTS, name).append(callback)
else:
raise NoEventException(
"{name} is not a valid pyqs event.".format(name=name)
)


def get_events():
return _EVENTS


def clear_events():
_EVENTS.clear()
67 changes: 57 additions & 10 deletions pyqs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,28 @@
import sys
from argparse import ArgumentParser

from .worker import ManagerWorker
from .worker import ManagerWorker, SimpleManagerWorker
from . import __version__

logger = logging.getLogger("pyqs")

SIMPLE_WORKER_DEFAULT_BATCH_SIZE = 1
DEFAULT_BATCH_SIZE = 10


def _set_batchsize(args):
batchsize = args.batchsize
if batchsize:
return batchsize

simple_worker = args.simple_worker
if simple_worker:
# Default batchsize for SimpleProcessWorker
return SIMPLE_WORKER_DEFAULT_BATCH_SIZE

# Default batchsize for ProcessWorker
return DEFAULT_BATCH_SIZE


def main():
parser = ArgumentParser(description="""
Expand Down Expand Up @@ -77,6 +94,15 @@ def main():
action="store",
)

parser.add_argument(
"--endpoint-url",
dest="endpoint_url",
type=str,
default=None,
help="AWS SQS endpoint url",
action="store",
)

parser.add_argument(
"--interval",
dest="interval",
Expand All @@ -90,7 +116,7 @@ def main():
"--batchsize",
dest="batchsize",
type=int,
default=10,
default=None,
help='How many messages to download at a time from SQS.',
action="store",
)
Expand All @@ -107,6 +133,13 @@ def main():
action="store",
)

parser.add_argument(
'--simple-worker',
dest='simple_worker',
default=False,
action='store_true'
)

args = parser.parse_args()

_main(
Expand All @@ -117,8 +150,10 @@ def main():
access_key_id=args.access_key_id,
secret_access_key=args.secret_access_key,
interval=args.interval,
batchsize=args.batchsize,
prefetch_multiplier=args.prefetch_multiplier
batchsize=_set_batchsize(args),
prefetch_multiplier=args.prefetch_multiplier,
simple_worker=args.simple_worker,
endpoint_url=args.endpoint_url,
)


Expand All @@ -130,17 +165,29 @@ def _add_cwd_to_path():

def _main(queue_prefixes, concurrency=5, logging_level="WARN",
region=None, access_key_id=None, secret_access_key=None,
interval=1, batchsize=10, prefetch_multiplier=2):
interval=1, batchsize=DEFAULT_BATCH_SIZE, prefetch_multiplier=2,
simple_worker=False, endpoint_url=None):
logging.basicConfig(
format="[%(levelname)s]: %(message)s",
level=getattr(logging, logging_level),
)
logger.info("Starting PyQS version {}".format(__version__))
manager = ManagerWorker(
queue_prefixes, concurrency, interval, batchsize,
prefetch_multiplier=prefetch_multiplier, region=region,
access_key_id=access_key_id, secret_access_key=secret_access_key,
)

if simple_worker:
manager = SimpleManagerWorker(
queue_prefixes, concurrency, interval, batchsize,
region=region, access_key_id=access_key_id,
secret_access_key=secret_access_key,
endpoint_url=endpoint_url,
)
else:
manager = ManagerWorker(
queue_prefixes, concurrency, interval, batchsize,
prefetch_multiplier=prefetch_multiplier, region=region,
access_key_id=access_key_id, secret_access_key=secret_access_key,
endpoint_url=endpoint_url,
)

_add_cwd_to_path()
manager.start()
manager.sleep()
Loading