Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:

services:
redis:
image: redis:6
image: redis:7.4-bookworm
ports:
- 6379:6379
options: >-
Expand Down Expand Up @@ -47,7 +47,7 @@ jobs:

services:
redis:
image: redis:8
image: redis:7.4-bookworm
ports:
- 6379:6379
options: >-
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.test.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
redis:
image: redis:6.2-bookworm
image: redis:7.4-bookworm
ports:
- "6379:6379"
volumes:
Expand Down
58 changes: 58 additions & 0 deletions examples/workflows/fan_in_fan_out.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import asyncio

from sheppy import Queue, RedisBackend, task
from sheppy._workflow import workflow

ADMIN_EMAILS = [
"admin1@example.com",
"admin2@example.com",
"admin3@example.com",
]

@task
async def cleanup_old_data(days: int = 7):
if days > 7: # deterministic "random" failure
raise Exception("some random failure happened")

return "everything ok!"

@task
async def some_cleanup_at_the_end():
return True

@task
async def rollback_changes():
return True

@task
async def send_notification(to: str, subject: str):
print(f"Sending email to {to}, subject {subject}")


@workflow
def daily_cleanup(days_to_clean: int):
result_task = yield cleanup_old_data(days=days_to_clean)

if result_task.error:
yield rollback_changes()
yield [send_notification(email, "Oh no, daily cleanup failed!") for email in ADMIN_EMAILS]

raise Exception("Cleanup failed, notifications were sent") # fail the workflow

if result_task.status == 'completed':
yield some_cleanup_at_the_end()
yield send_notification("devteam@example.com", "Cleanup finished")

return "Daily cleanup finished successfully"

raise Exception("not sure what happened!")


async def main():
queue = Queue(RedisBackend())
# await queue.add_workflow(daily_cleanup(7))
await queue.add_workflow(daily_cleanup(30))


if __name__ == "__main__":
asyncio.run(main())
33 changes: 33 additions & 0 deletions examples/workflows/simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio

from sheppy import Queue, task
from sheppy._workflow import workflow

ADMIN_EMAILS = [
"admin1@example.com",
"admin2@example.com",
"admin3@example.com",
]

@task
async def say_hello(name: str) -> str:
return f"Hello, {name}!"


@workflow
def example_workflow(names: list[str]):
t1 = yield say_hello("Alice")
t2 = yield say_hello("Bob")
tx = yield [say_hello(name) for name in names] # fan-out style

return "\n".join([t1.result, t2.result] + [t.result for t in tx])


async def main():
queue = Queue("redis://")
wf = example_workflow(["Alex", "John"])
await queue.add_workflow(wf)


if __name__ == "__main__":
asyncio.run(main())
2 changes: 2 additions & 0 deletions src/sheppy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from ._utils.fastapi import Depends as Depends
from ._workflow import Workflow as Workflow
from ._workflow import workflow as workflow
from .backend import Backend as Backend
from .backend import BackendError as BackendError
from .backend import LocalBackend as LocalBackend
Expand Down
4 changes: 4 additions & 0 deletions src/sheppy/_localkv/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ async def list_len(self, key: str) -> int:
r = await self._call("list_len", key=key)
return r["count"] # type:ignore[no-any-return]

async def list_remove(self, key: str, value: str) -> bool:
r = await self._call("list_remove", key=key, value=value)
return r["removed"] # type:ignore[no-any-return]

# sorted list
async def sorted_push(self, key: str, position: float, value: str) -> None:
await self._call("sorted_push", key=key, position=position, value=value)
Expand Down
19 changes: 18 additions & 1 deletion src/sheppy/_localkv/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,16 @@ def handle_command(cmd: str, args: dict[str, Any]) -> dict[str, Any]:
return {"ok": True, "created": True}

case "delete":
count = sum(1 for k in args["keys"] if store.kv.pop(k, None) is not None)
count = 0
for k in args["keys"]:
if store.kv.pop(k, None) is not None:
count += 1
if k in store.lists:
del store.lists[k]
count += 1
if k in store.sorted_list:
del store.sorted_list[k]
count += 1
return {"ok": True, "count": count}

case "keys":
Expand Down Expand Up @@ -98,6 +107,14 @@ def handle_command(cmd: str, args: dict[str, Any]) -> dict[str, Any]:
case "list_len":
return {"ok": True, "count": len(store.lists[args["key"]])}

case "list_remove":
lst = store.lists[args["key"]]
value = args["value"]
if value in lst:
lst.remove(value)
return {"ok": True, "removed": True}
return {"ok": True, "removed": False}

# sorted list
case "sorted_push":
bisect.insort(store.sorted_list[args["key"]], SortedItem(args["position"], args["value"]))
Expand Down
16 changes: 16 additions & 0 deletions src/sheppy/_utils/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,22 @@
cache_main_module: str | None = None


def stringify_function(func: Callable[..., Any]) -> str:
_module = func.__module__
# special case if the task is in the main python file that is executed
if _module == "__main__":
global cache_main_module
if not cache_main_module:
# this handles "python -m app.main" because with "-m" sys.argv[0] is absolute path
_main_path = os.path.relpath(sys.argv[0])[:-3]
# replace handles situations when user runs "python app/main.py"
cache_main_module = _main_path.replace(os.sep, ".")

_module = cache_main_module

return f"{_module}:{func.__name__}"


def resolve_function(func: str, wrapped: bool = True) -> Callable[..., Any]:
module_name = None
function_name = None
Expand Down
Loading