diff --git a/cloudrift/__init__.py b/cloudrift/__init__.py index c669a53..c8db3c5 100644 --- a/cloudrift/__init__.py +++ b/cloudrift/__init__.py @@ -1,18 +1,19 @@ from cloudrift.storage import get_storage from cloudrift.messaging import get_queue from cloudrift.document import get_mongodb, get_mongodb_sync -from cloudrift.cache import get_cache +from cloudrift.cache import get_cache, cache_broker_url from cloudrift.secrets import get_secrets from cloudrift.pubsub import get_pubsub from cloudrift.email import get_email -__version__ = "0.2.3" +__version__ = "0.2.4" __all__ = [ "get_storage", "get_queue", "get_mongodb", "get_mongodb_sync", "get_cache", + "cache_broker_url", "get_secrets", "get_pubsub", "get_email", diff --git a/cloudrift/cache/__init__.py b/cloudrift/cache/__init__.py index 0e5d1ef..e593711 100644 --- a/cloudrift/cache/__init__.py +++ b/cloudrift/cache/__init__.py @@ -1,5 +1,76 @@ +from urllib.parse import quote + from cloudrift.cache.base import CacheBackend +_VALID_SSL_CERT_REQS = ("CERT_NONE", "CERT_OPTIONAL", "CERT_REQUIRED") + + +def cache_broker_url( + provider: str, + host: str, + port: int, + password: str = "", + db: int = 0, + ssl_cert_reqs: str = "CERT_NONE", +) -> str: + """Return a Redis URL (``redis://`` or ``rediss://``) suitable for clients + that require URL-based configuration — most notably Celery, which cannot + consume a :class:`CacheBackend` directly. + + Args: + provider: ``"redis"`` (self-hosted), ``"elasticache"`` (AWS), or + ``"azure_redis"``. + host: Redis host. + port: Redis port (6379 for plain, 6380 for TLS, 10000 for some Azure + tiers — pass the value the cluster actually listens on). + password: Optional. Omit (or pass empty string) for unauthenticated + self-hosted Redis. For ``elasticache`` / ``azure_redis`` this is the + AUTH token / access key. + db: Redis database index. + ssl_cert_reqs: TLS verification mode for cloud providers. One of + ``CERT_NONE`` / ``CERT_OPTIONAL`` / ``CERT_REQUIRED`` (defaults to + ``CERT_REQUIRED``). Ignored when ``provider == "redis"``. + + Notes: + Token-based auth (ElastiCache IAM, Azure Managed Identity / Service + Principal) cannot be expressed in a static URL — for those, configure + the consumer (e.g. Celery) with a CredentialProvider instead of a URL. + + Celery's Redis transport does not forward the ``ssl_cert_reqs`` query + parameter to redis-py; when used as a Celery broker URL the value is + silently ignored. To enforce non-default cert verification with Celery, + set ``broker_use_ssl`` (e.g. ``{"ssl_cert_reqs": ssl.CERT_REQUIRED}``) + on the Celery app config in addition to passing this URL. + """ + # Validate eagerly so a bad value fails at the call site rather than at + # connection time — applies to every provider, even where it's unused. + if ssl_cert_reqs not in _VALID_SSL_CERT_REQS: + raise ValueError( + f"Invalid ssl_cert_reqs: {ssl_cert_reqs!r}. " + f"Must be one of: {', '.join(_VALID_SSL_CERT_REQS)}." + ) + if not isinstance(db, int) or isinstance(db, bool) or db < 0: + raise ValueError(f"Invalid db: {db!r}. Must be a non-negative integer.") + + # When a password is present, include the ``default`` username so the URL is + # valid against Redis 6+ ACL deployments (``redis://default:pw@host``). + # Without it, ``redis://:pw@host`` can silently fail to authenticate. The + # password is percent-encoded so special characters (@ : / ? # %) don't + # corrupt the URL. + auth = f"default:{quote(password, safe='')}@" if password else "" + + if provider == "redis": + return f"redis://{auth}{host}:{port}/{db}" + if provider in ("elasticache", "azure_redis"): + return ( + f"rediss://{auth}{host}:{port}/{db}" + f"?ssl_cert_reqs={ssl_cert_reqs}" + ) + raise ValueError( + f"Unsupported cache provider for broker URL: {provider!r}. " + "Must be one of: 'redis', 'elasticache', 'azure_redis'." + ) + def get_cache(provider: str, auth_method: str, **kwargs) -> CacheBackend: """Factory to instantiate a cache backend. @@ -40,4 +111,4 @@ def get_cache(provider: str, auth_method: str, **kwargs) -> CacheBackend: return factory(**kwargs) -__all__ = ["CacheBackend", "get_cache"] +__all__ = ["CacheBackend", "get_cache", "cache_broker_url"] diff --git a/cloudrift/cache/base.py b/cloudrift/cache/base.py index e588156..6e6800d 100644 --- a/cloudrift/cache/base.py +++ b/cloudrift/cache/base.py @@ -55,6 +55,24 @@ async def ttl(self, key: str) -> int: async def keys(self, pattern: str = "*") -> list[str]: """Return all keys matching *pattern*. Avoid on large keyspaces in production.""" + @abstractmethod + async def scan( + self, + cursor: int = 0, + match: str | None = None, + count: int | None = None, + ) -> tuple[int, list[bytes]]: + """Incremental keyspace iteration. + + Returns ``(next_cursor, keys)``. Iterate until ``next_cursor == 0``. + Preferred over :meth:`keys` in production: bounded per-call work. + """ + + @abstractmethod + async def getdel(self, key: str) -> bytes | None: + """Atomically get and delete *key*. Returns the value, or ``None`` + if the key did not exist. Requires Redis ≥ 6.2.""" + @abstractmethod async def hget(self, key: str, field: str) -> bytes | None: """Return the value of *field* in the hash stored at *key*.""" @@ -304,6 +322,29 @@ async def ttl(self, key: str) -> int: except RedisError as e: raise CacheError(str(e)) from e + async def scan( + self, + cursor: int = 0, + match: str | None = None, + count: int | None = None, + ) -> tuple[int, list[bytes]]: + kwargs: dict = {} + if match is not None: + kwargs["match"] = match + if count is not None: + kwargs["count"] = count + try: + next_cursor, keys = await self._client.scan(cursor=cursor, **kwargs) + return int(next_cursor), keys + except RedisError as e: + raise CacheError(str(e)) from e + + async def getdel(self, key: str) -> bytes | None: + try: + return await self._client.getdel(key) + except RedisError as e: + raise CacheError(str(e)) from e + async def keys(self, pattern: str = "*") -> list[str]: try: result = await self._client.keys(pattern) diff --git a/pyproject.toml b/pyproject.toml index 1e53a25..210b065 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "lyzr-cloudrift" -version = "0.2.3" +version = "0.2.4" description = "Cloud-agnostic abstraction for storage, messaging, document databases, cache, secrets, pub/sub, and email" readme = "README.md" requires-python = ">=3.11"