Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cloudrift/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
from cloudrift.storage import get_storage
from cloudrift.messaging import get_queue
from cloudrift.document import get_mongodb, get_mongodb_sync
from cloudrift.cache import get_cache
from cloudrift.cache import get_cache, cache_broker_url
from cloudrift.secrets import get_secrets
from cloudrift.pubsub import get_pubsub
from cloudrift.email import get_email

__version__ = "0.2.3"
__version__ = "0.2.4"
__all__ = [
"get_storage",
"get_queue",
"get_mongodb",
"get_mongodb_sync",
"get_cache",
"cache_broker_url",
"get_secrets",
"get_pubsub",
"get_email",
Expand Down
73 changes: 72 additions & 1 deletion cloudrift/cache/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,76 @@
from urllib.parse import quote

from cloudrift.cache.base import CacheBackend

_VALID_SSL_CERT_REQS = ("CERT_NONE", "CERT_OPTIONAL", "CERT_REQUIRED")


def cache_broker_url(
provider: str,
host: str,
port: int,
password: str = "",
db: int = 0,
ssl_cert_reqs: str = "CERT_NONE",
) -> str:
"""Return a Redis URL (``redis://`` or ``rediss://``) suitable for clients
that require URL-based configuration — most notably Celery, which cannot
consume a :class:`CacheBackend` directly.

Args:
provider: ``"redis"`` (self-hosted), ``"elasticache"`` (AWS), or
``"azure_redis"``.
host: Redis host.
port: Redis port (6379 for plain, 6380 for TLS, 10000 for some Azure
tiers — pass the value the cluster actually listens on).
password: Optional. Omit (or pass empty string) for unauthenticated
self-hosted Redis. For ``elasticache`` / ``azure_redis`` this is the
AUTH token / access key.
db: Redis database index.
ssl_cert_reqs: TLS verification mode for cloud providers. One of
``CERT_NONE`` / ``CERT_OPTIONAL`` / ``CERT_REQUIRED`` (defaults to
``CERT_REQUIRED``). Ignored when ``provider == "redis"``.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth noting in the docstring that ssl_cert_reqs as a URL query param is not forwarded by Celery's Redis transport — callers still need to configure ssl_options on the Celery app separately when using this URL as a broker.


Notes:
Token-based auth (ElastiCache IAM, Azure Managed Identity / Service
Principal) cannot be expressed in a static URL — for those, configure
the consumer (e.g. Celery) with a CredentialProvider instead of a URL.

Celery's Redis transport does not forward the ``ssl_cert_reqs`` query
parameter to redis-py; when used as a Celery broker URL the value is
silently ignored. To enforce non-default cert verification with Celery,
set ``broker_use_ssl`` (e.g. ``{"ssl_cert_reqs": ssl.CERT_REQUIRED}``)
on the Celery app config in addition to passing this URL.
"""
# Validate eagerly so a bad value fails at the call site rather than at
# connection time — applies to every provider, even where it's unused.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provider == 'redis' branch returns a plain redis:// URL regardless of what ssl_cert_reqs was passed. But self-hosted Redis can also run with TLS (rediss://). There is no way for a caller to get a TLS URL for self-hosted Redis from this function — they would have to fall back to StandaloneRedisBackend.from_url directly. If intentional, worth documenting explicitly. If not, consider accepting an ssl: bool = False param for the 'redis' provider, mirroring what StandaloneRedisBackend.from_credentials does.

if ssl_cert_reqs not in _VALID_SSL_CERT_REQS:
raise ValueError(
f"Invalid ssl_cert_reqs: {ssl_cert_reqs!r}. "
f"Must be one of: {', '.join(_VALID_SSL_CERT_REQS)}."
)
if not isinstance(db, int) or isinstance(db, bool) or db < 0:
raise ValueError(f"Invalid db: {db!r}. Must be a non-negative integer.")

# When a password is present, include the ``default`` username so the URL is
# valid against Redis 6+ ACL deployments (``redis://default:pw@host``).
# Without it, ``redis://:pw@host`` can silently fail to authenticate. The
# password is percent-encoded so special characters (@ : / ? # %) don't
# corrupt the URL.
auth = f"default:{quote(password, safe='')}@" if password else ""

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ssl_cert_reqs query parameter is silently ignored by Celery's Redis transport, but the function still encodes it into the URL for elasticache and azure_redis providers. This creates a false sense of security: a caller that passes ssl_cert_reqs='CERT_NONE' thinking they are relaxing certificate verification for Celery will have no effect at all. The docstring mentions this, but only in a 'Notes' block that is easy to miss. Consider raising a warning (or at minimum making it more prominent inline near the parameter docs) so callers aren't surprised when cert verification is not what they set.

Also worth noting: redis-py itself does respect ssl_cert_reqs as a query param when parsing a rediss:// URL, so for direct redis-py usage the parameter is effective — it is only Celery that ignores it. Clarifying which consumer is affected would help.


if provider == "redis":
return f"redis://{auth}{host}:{port}/{db}"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the plain redis provider, a non-empty password produces redis://:password@host:6379/0 — colon prefix, no username. This is spec-valid and most clients handle it, but it differs from the redis://default:password@... form expected by Redis 6+ ACL-based auth. Worth a brief comment clarifying the intent, or switching to the explicit form to avoid silent auth failures against ACL-enabled clusters.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Password characters that are special in a URI (:, @, /, ?, #, %) will silently corrupt the URL here. Use urllib.parse.quote(password, safe='') to percent-encode before interpolating:

Suggested change
return f"redis://{auth}{host}:{port}/{db}"
from urllib.parse import quote
auth = f"default:{quote(password, safe='')}@" if password else ""

if provider in ("elasticache", "azure_redis"):
return (

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ssl_cert_reqs validation is only reached for elasticache/azure_redis. A caller using provider="redis" with a bad ssl_cert_reqs value gets no error and the param is silently dropped. Move the validation above the if provider == "redis" check so it applies to all code paths.

f"rediss://{auth}{host}:{port}/{db}"
f"?ssl_cert_reqs={ssl_cert_reqs}"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ssl_cert_reqs is a free-form string with no validation. An invalid value (e.g. CERT_INVALID) gets silently embedded in the URL and only surfaces as a confusing error from the Redis client at connection time, far from the call site. Suggest validating eagerly:

Suggested change
f"?ssl_cert_reqs={ssl_cert_reqs}"
_VALID_CERT_REQS = {"CERT_NONE", "CERT_OPTIONAL", "CERT_REQUIRED"}
if ssl_cert_reqs not in _VALID_CERT_REQS:
raise ValueError(
f"ssl_cert_reqs must be one of {sorted(_VALID_CERT_REQS)}, got {ssl_cert_reqs!r}"
)
return (
f"rediss://:{password}@{host}:{port}/{db}"
f"?ssl_cert_reqs={ssl_cert_reqs}"
)

)
raise ValueError(
f"Unsupported cache provider for broker URL: {provider!r}. "
"Must be one of: 'redis', 'elasticache', 'azure_redis'."
)


def get_cache(provider: str, auth_method: str, **kwargs) -> CacheBackend:
"""Factory to instantiate a cache backend.
Expand Down Expand Up @@ -40,4 +111,4 @@ def get_cache(provider: str, auth_method: str, **kwargs) -> CacheBackend:
return factory(**kwargs)


__all__ = ["CacheBackend", "get_cache"]
__all__ = ["CacheBackend", "get_cache", "cache_broker_url"]
41 changes: 41 additions & 0 deletions cloudrift/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,24 @@ async def ttl(self, key: str) -> int:
async def keys(self, pattern: str = "*") -> list[str]:
"""Return all keys matching *pattern*. Avoid on large keyspaces in production."""

@abstractmethod
async def scan(
self,
cursor: int = 0,
match: str | None = None,
count: int | None = None,
) -> tuple[int, list[bytes]]:
"""Incremental keyspace iteration.

Returns ``(next_cursor, keys)``. Iterate until ``next_cursor == 0``.
Preferred over :meth:`keys` in production: bounded per-call work.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scan abstract method declares the return type as tuple[int, list[bytes]], but the concrete _RedisMixin.scan implementation returns whatever aioredis.Redis.scan() gives back. In recent versions of redis-py, scan() returns tuple[int, list[bytes]] for the default decode mode, but tuple[int, list[str]] when decode_responses=True is set on the client. If any backend is created with decode_responses=True (none appear to be currently, but it is an easy mistake), the return type contract would be silently violated and downstream callers expecting bytes would get str.

Consider adding a note to the abstract method doc, or asserting in _RedisMixin.scan that the client is not in decode_responses mode, or widening the return type annotation to tuple[int, list[bytes | str]].

"""

@abstractmethod
async def getdel(self, key: str) -> bytes | None:
"""Atomically get and delete *key*. Returns the value, or ``None``
if the key did not exist. Requires Redis ≥ 6.2."""

@abstractmethod
async def hget(self, key: str, field: str) -> bytes | None:
"""Return the value of *field* in the hash stored at *key*."""
Expand Down Expand Up @@ -304,6 +322,29 @@ async def ttl(self, key: str) -> int:
except RedisError as e:
raise CacheError(str(e)) from e

async def scan(
self,
cursor: int = 0,
match: str | None = None,
count: int | None = None,
) -> tuple[int, list[bytes]]:
kwargs: dict = {}
if match is not None:
kwargs["match"] = match
if count is not None:
kwargs["count"] = count
try:
next_cursor, keys = await self._client.scan(cursor=cursor, **kwargs)
return int(next_cursor), keys
except RedisError as e:
raise CacheError(str(e)) from e

async def getdel(self, key: str) -> bytes | None:
try:
return await self._client.getdel(key)
except RedisError as e:
raise CacheError(str(e)) from e

async def keys(self, pattern: str = "*") -> list[str]:
try:
result = await self._client.keys(pattern)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "lyzr-cloudrift"
version = "0.2.3"
version = "0.2.4"
description = "Cloud-agnostic abstraction for storage, messaging, document databases, cache, secrets, pub/sub, and email"
readme = "README.md"
requires-python = ">=3.11"
Expand Down
Loading