-
Notifications
You must be signed in to change notification settings - Fork 23
Auto-refresh zoom tokens via celery task + implement task locks #1652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f1897ca
f6d15f8
864162a
24f9764
a0fbdb5
91dbaec
5fa4335
1b257c4
3a0928a
0b8ab55
1b1756c
07b4205
67225ba
890c63c
064949f
002ae95
45633ed
694b96f
5be2b18
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| """Renew expiring Zoom auth tokens. | ||
|
|
||
| Run periodically (e.g., weekly) to ensure tokens don't expire. | ||
| Zoom tokens typically last ~90 days, so weekly renewal keeps a buffer. | ||
| """ | ||
|
|
||
| import json | ||
| import logging | ||
|
|
||
| from ..database import repo, models | ||
| from ..dependencies.database import get_engine_and_session | ||
| from ..dependencies.zoom import get_zoom_client | ||
| from ..main import _common_setup | ||
|
|
||
|
|
||
| def run(): | ||
| _common_setup() | ||
|
|
||
| _, SessionLocal = get_engine_and_session() | ||
| db = SessionLocal() | ||
| zoom_connections = [] | ||
| refreshed = 0 | ||
| failed = 0 | ||
| cleaned = 0 | ||
|
|
||
| try: | ||
| zoom_connections = repo.external_connection.get_zoom(db) | ||
|
|
||
| for connection in zoom_connections: | ||
| if not connection or not connection.token: | ||
| continue | ||
|
|
||
| owner_id = connection.owner_id | ||
| try: | ||
| subscriber = repo.subscriber.get(db, owner_id) | ||
| if not subscriber: | ||
| # Orphaned external connection so let's just remove it. | ||
| repo.external_connection.delete(db, connection) | ||
| cleaned += 1 | ||
| continue | ||
|
|
||
| zoom_client = get_zoom_client(subscriber) | ||
| token = json.loads(connection.token) | ||
|
|
||
| # Force the OAuth session to attempt a refresh by marking the token as expired. | ||
| token['expires_in'] = -100 | ||
| token['expires_at'] = 0 | ||
| zoom_client.setup(subscriber.id, token) | ||
| zoom_client.get_me() | ||
|
|
||
| new_token = zoom_client.client.token | ||
| repo.external_connection.update_token_by_connection( | ||
| db, | ||
| json.dumps(new_token), | ||
| connection, | ||
| ) | ||
| repo.external_connection.update_status( | ||
| db, | ||
| connection, | ||
| models.ExternalConnectionStatus.ok, | ||
| ) | ||
| refreshed += 1 | ||
| except Exception: | ||
| # Mark the external connection as error so the user can see | ||
| # it in the Settings UI and fix it manually when they log in | ||
| repo.external_connection.update_status( | ||
| db, | ||
| connection, | ||
| models.ExternalConnectionStatus.error, | ||
| ) | ||
|
|
||
| # TODO: Send an email to the Subscriber | ||
| # https://github.com/thunderbird/appointment/issues/1662 | ||
| failed += 1 | ||
| finally: | ||
| db.close() | ||
|
|
||
| logging.info( | ||
| f'[refresh_zoom_tokens] Zoom token refresh complete: ' | ||
| f'{refreshed} refreshed, {failed} failed, {cleaned} cleaned, {len(zoom_connections)} total processed' | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,3 @@ | ||
| from appointment.tasks.health import * # noqa: F401,F403 | ||
| from appointment.tasks.google import * # noqa: F401,F403 | ||
| from appointment.tasks.zoom import * # noqa: F401,F403 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| import logging | ||
| import os | ||
| import uuid | ||
| from contextlib import contextmanager | ||
| from appointment.dependencies.database import get_redis | ||
|
|
||
| DEFAULT_LOCK_TTL_SECONDS = 60 * 60 | ||
|
|
||
|
|
||
| class TaskLockFailed(Exception): | ||
| """Raised when a task lock cannot be acquired.""" | ||
|
|
||
|
|
||
| def _task_lock_key(task_name: str) -> str: | ||
| return f'lock:task:{task_name}' | ||
|
|
||
|
|
||
| def acquire_task_lock(redis_instance, task_name: str, ttl_seconds: int = DEFAULT_LOCK_TTL_SECONDS) -> str | None: | ||
| lock_key = _task_lock_key(task_name) | ||
| lock_token = str(uuid.uuid4()) | ||
| lock_acquired = bool(redis_instance.set(lock_key, lock_token, nx=True, ex=ttl_seconds)) | ||
| if not lock_acquired: | ||
| return None | ||
| return lock_token | ||
|
|
||
|
|
||
| def release_task_lock(redis_instance, task_name: str, lock_token: str): | ||
| """Only delete the lock if still owned by this task instance.""" | ||
| lock_key = _task_lock_key(task_name) | ||
|
|
||
| with redis_instance.pipeline() as pipe: | ||
| pipe.watch(lock_key) | ||
| if pipe.get(lock_key) == lock_token: | ||
| pipe.multi() | ||
| pipe.delete(lock_key) | ||
| pipe.execute() | ||
| else: | ||
| pipe.unwatch() | ||
|
|
||
|
|
||
| @contextmanager | ||
| def task_lock(task_name: str, ttl_seconds: int = DEFAULT_LOCK_TTL_SECONDS): | ||
| """Context manager that acquires and releases a distributed task lock. | ||
|
|
||
| Raises TaskLockFailed if Redis is available but the lock is already held. | ||
| When Redis is unavailable, execution proceeds without a lock. | ||
| """ | ||
| import sentry_sdk | ||
|
|
||
| redis_instance = get_redis(os.getenv('REDIS_CELERY_DB')) | ||
| lock_token = None | ||
|
|
||
| if redis_instance is not None: | ||
| lock_token = acquire_task_lock(redis_instance, task_name, ttl_seconds) | ||
| if lock_token is None: | ||
| raise TaskLockFailed(f'Failed to acquire lock for {task_name}') | ||
| else: | ||
| logging.warning(f'Redis unavailable; running {task_name} without distributed lock.') | ||
|
|
||
| try: | ||
| yield | ||
| except Exception as e: | ||
| logging.warning(f'{task_name} failed: {e}') | ||
| sentry_sdk.capture_exception(e) | ||
| raise | ||
| finally: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might want to log all uncaught errors as well.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yes, added an |
||
| if lock_token is not None: | ||
| try: | ||
| release_task_lock(redis_instance, task_name, lock_token) | ||
| except Exception as e: | ||
| logging.warning(f'Failed to release {task_name} lock: {e}') | ||
| sentry_sdk.capture_exception(e) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed sentry creates "issues" for logging.error as well as capture_exception. I'm not sure what would be the best solution here, maybe drop the log down to warning?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm I dropped the logging down to warning in this case and in the uncaught errors from the comment above too! Thanks! |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| import logging | ||
| import sentry_sdk | ||
|
|
||
| from appointment.celery_app import celery | ||
| from appointment.tasks.locks import TaskLockFailed, task_lock | ||
|
|
||
|
|
||
| @celery.task | ||
| def refresh_zoom_tokens(): | ||
| from appointment.commands.refresh_zoom_tokens import run | ||
|
|
||
| try: | ||
| with task_lock(refresh_zoom_tokens.__name__): | ||
| logging.info('Starting Zoom token check') | ||
| run() | ||
| logging.info('Zoom token check complete') | ||
| except TaskLockFailed: | ||
| logging.info('Zoom token check is already running, skipping.') | ||
| except Exception as e: | ||
| logging.error(f'Zoom token check failed: {e}') | ||
| sentry_sdk.capture_exception(e) | ||
| raise |
Uh oh!
There was an error while loading. Please reload this page.