diff --git a/backend/.env.example b/backend/.env.example index a6423959d..72174eb38 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -104,6 +104,7 @@ ZOOM_API_ENABLED=False ZOOM_AUTH_CLIENT_ID= ZOOM_AUTH_SECRET= ZOOM_AUTH_CALLBACK=http://localhost:5000/zoom/callback +ZOOM_TOKEN_TTL_IN_SECONDS=604800 # 7 days # -- SIGNED URL SECRET -- # Shared secret for url signing (e.g. create it by running `openssl rand -hex 32`) @@ -154,4 +155,4 @@ OIDC_TOKEN_INTROSPECTION_URL= OIDC_FALLBACK_MATCH_BY_EMAIL= # Required for Appointment's CalDAV auto-setup, needs to match the one in thunderbird-accounts -APPOINTMENT_CALDAV_SECRET= \ No newline at end of file +APPOINTMENT_CALDAV_SECRET= diff --git a/backend/.env.test b/backend/.env.test index fb0d364e6..343f7e440 100644 --- a/backend/.env.test +++ b/backend/.env.test @@ -63,6 +63,7 @@ ZOOM_API_ENABLED=False ZOOM_AUTH_CLIENT_ID= ZOOM_AUTH_SECRET= ZOOM_AUTH_CALLBACK=http://localhost:8090/zoom/callback +ZOOM_TOKEN_TTL_IN_SECONDS=604800 # 7 days # -- SIGNED URL SECRET -- # Shared secret for url signing (e.g. create it by running `openssl rand -hex 32`) @@ -117,4 +118,4 @@ OIDC_EXP_GRACE_PERIOD=300 OIDC_FALLBACK_MATCH_BY_EMAIL= # Required for Appointment's CalDAV auto-setup, needs to match the one in thunderbird-accounts -APPOINTMENT_CALDAV_SECRET= \ No newline at end of file +APPOINTMENT_CALDAV_SECRET= diff --git a/backend/src/appointment/celery_app.py b/backend/src/appointment/celery_app.py index e16747b82..4d594befd 100644 --- a/backend/src/appointment/celery_app.py +++ b/backend/src/appointment/celery_app.py @@ -44,6 +44,9 @@ def create_celery_app() -> Celery: google_channel_ttl = float(os.getenv('GOOGLE_CHANNEL_TTL_IN_SECONDS', SEVEN_DAYS_IN_SECONDS)) google_channel_renew_interval = google_channel_ttl - ONE_DAY_IN_SECONDS + zoom_token_ttl = float(os.getenv('ZOOM_TOKEN_TTL_IN_SECONDS', SEVEN_DAYS_IN_SECONDS)) + zoom_token_renew_interval = zoom_token_ttl - ONE_DAY_IN_SECONDS + app = Celery('appointment') app.config_from_object({ @@ -68,6 +71,10 @@ def create_celery_app() -> Celery: 'task': 'appointment.tasks.google.renew_google_channels', 'schedule': google_channel_renew_interval, }, + 'refresh-zoom-tokens': { + 'task': 'appointment.tasks.zoom.refresh_zoom_tokens', + 'schedule': zoom_token_renew_interval, + }, }, }) diff --git a/backend/src/appointment/commands/refresh_zoom_tokens.py b/backend/src/appointment/commands/refresh_zoom_tokens.py new file mode 100644 index 000000000..09533f935 --- /dev/null +++ b/backend/src/appointment/commands/refresh_zoom_tokens.py @@ -0,0 +1,81 @@ +"""Renew expiring Zoom auth tokens. + +Run periodically (e.g., weekly) to ensure tokens don't expire. +Zoom tokens typically last ~90 days, so weekly renewal keeps a buffer. +""" + +import json +import logging + +from ..database import repo, models +from ..dependencies.database import get_engine_and_session +from ..dependencies.zoom import get_zoom_client +from ..main import _common_setup + + +def run(): + _common_setup() + + _, SessionLocal = get_engine_and_session() + db = SessionLocal() + zoom_connections = [] + refreshed = 0 + failed = 0 + cleaned = 0 + + try: + zoom_connections = repo.external_connection.get_zoom(db) + + for connection in zoom_connections: + if not connection or not connection.token: + continue + + owner_id = connection.owner_id + try: + subscriber = repo.subscriber.get(db, owner_id) + if not subscriber: + # Orphaned external connection so let's just remove it. + repo.external_connection.delete(db, connection) + cleaned += 1 + continue + + zoom_client = get_zoom_client(subscriber) + token = json.loads(connection.token) + + # Force the OAuth session to attempt a refresh by marking the token as expired. + token['expires_in'] = -100 + token['expires_at'] = 0 + zoom_client.setup(subscriber.id, token) + zoom_client.get_me() + + new_token = zoom_client.client.token + repo.external_connection.update_token_by_connection( + db, + json.dumps(new_token), + connection, + ) + repo.external_connection.update_status( + db, + connection, + models.ExternalConnectionStatus.ok, + ) + refreshed += 1 + except Exception: + # Mark the external connection as error so the user can see + # it in the Settings UI and fix it manually when they log in + repo.external_connection.update_status( + db, + connection, + models.ExternalConnectionStatus.error, + ) + + # TODO: Send an email to the Subscriber + # https://github.com/thunderbird/appointment/issues/1662 + failed += 1 + finally: + db.close() + + logging.info( + f'[refresh_zoom_tokens] Zoom token refresh complete: ' + f'{refreshed} refreshed, {failed} failed, {cleaned} cleaned, {len(zoom_connections)} total processed' + ) diff --git a/backend/src/appointment/controller/apis/zoom_client.py b/backend/src/appointment/controller/apis/zoom_client.py index fed08f7cb..f776d005f 100644 --- a/backend/src/appointment/controller/apis/zoom_client.py +++ b/backend/src/appointment/controller/apis/zoom_client.py @@ -39,13 +39,15 @@ def scopes(self): """Returns the appropriate scopes""" return self.SCOPES - def check_expiry(self, token: dict | None): - """Checks expires_at and if expired sets expires_in to a negative number to trigger refresh""" + def check_expiry(self, token: dict | None, threshold: float): + """Checks expires_at and sets expires_in to a negative number to trigger refresh + if already expired or within the given renewal threshold + """ if not token: return token expires_at = token.get('expires_at') - if expires_at and expires_at <= time.time(): + if expires_at and expires_at <= (time.time() + threshold): token['expires_in'] = -100 elif not expires_at: # We shouldn't have to handle this but just in case alert us! @@ -53,12 +55,12 @@ def check_expiry(self, token: dict | None): return token - def setup(self, subscriber_id=None, token=None): - """Setup our oAuth session""" + def setup(self, subscriber_id=None, token=None, threshold=0.0): + """Setup our OAuth session""" if isinstance(token, str): token = json.loads(token) - token = self.check_expiry(token) + token = self.check_expiry(token, threshold) self.subscriber_id = subscriber_id self.client = OAuth2Session( diff --git a/backend/src/appointment/database/repo/external_connection.py b/backend/src/appointment/database/repo/external_connection.py index 43b8a404b..adc9e22c3 100644 --- a/backend/src/appointment/database/repo/external_connection.py +++ b/backend/src/appointment/database/repo/external_connection.py @@ -5,6 +5,7 @@ import logging import os +from datetime import UTC, datetime import sentry_sdk from sqlalchemy.orm import Session from .. import models @@ -36,7 +37,14 @@ def update_token( if db_results is None or len(db_results) == 0: return None - db_external_connection = db_results[0] + return update_token_by_connection(db, token, db_results[0]) + + +def update_token_by_connection( + db: Session, + token: str, + db_external_connection: models.ExternalConnections, +): db_external_connection.token = token db.commit() db.refresh(db_external_connection) @@ -54,6 +62,12 @@ def delete_by_type(db: Session, subscriber_id: int, type: models.ExternalConnect return True +def delete(db: Session, db_external_connection: models.ExternalConnections): + db.delete(db_external_connection) + db.commit() + return True + + def get_by_type( db: Session, subscriber_id: int, type: models.ExternalConnectionType, type_id: str | None = None ) -> list[models.ExternalConnections] | None: @@ -144,6 +158,18 @@ def update_name(db: Session, db_external_connection: models.ExternalConnections, return db_external_connection +def update_status( + db: Session, + db_external_connection: models.ExternalConnections, + status: models.ExternalConnectionStatus, +): + db_external_connection.status = status + db_external_connection.status_checked_at = datetime.now(UTC) + db.commit() + db.refresh(db_external_connection) + return db_external_connection + + def get_subscriber_without_oidc_by_email(db: Session, email: str): """Return a subscriber by the OIDC recovery email address without an OIDC connection""" # Subquery to check if a subscriber has an OIDC external connection @@ -158,3 +184,15 @@ def get_subscriber_without_oidc_by_email(db: Session, email: str): query = db.query(models.Subscriber).filter(~oidc_exists).filter(models.Subscriber.email == email) return query.first() + + +def get_zoom(db: Session) -> list[models.ExternalConnections] | None: + """Return all external connections by Zoom type""" + query = ( + db.query(models.ExternalConnections) + .filter(models.ExternalConnections.type == models.ExternalConnectionType.zoom) + ) + + result = query.all() + + return result diff --git a/backend/src/appointment/routes/commands.py b/backend/src/appointment/routes/commands.py index 0f3d376ba..3b33c81c7 100644 --- a/backend/src/appointment/routes/commands.py +++ b/backend/src/appointment/routes/commands.py @@ -69,3 +69,10 @@ def backfill_channels(): backfill_google_channels.run() except FileExistsError: print('backfill-google-channels is already running, skipping.') + +@router.command('refresh-zoom-tokens') +def refresh_tokens(): + from ..tasks.zoom import refresh_zoom_tokens as refresh_zoom_tokens_task + + refresh_zoom_tokens_task.delay() + print('refresh-zoom-tokens task queued.') diff --git a/backend/src/appointment/tasks/__init__.py b/backend/src/appointment/tasks/__init__.py index aeb62618f..4abe57b8d 100644 --- a/backend/src/appointment/tasks/__init__.py +++ b/backend/src/appointment/tasks/__init__.py @@ -1,2 +1,3 @@ from appointment.tasks.health import * # noqa: F401,F403 from appointment.tasks.google import * # noqa: F401,F403 +from appointment.tasks.zoom import * # noqa: F401,F403 diff --git a/backend/src/appointment/tasks/locks.py b/backend/src/appointment/tasks/locks.py new file mode 100644 index 000000000..3cb714d8f --- /dev/null +++ b/backend/src/appointment/tasks/locks.py @@ -0,0 +1,72 @@ +import logging +import os +import uuid +from contextlib import contextmanager +from appointment.dependencies.database import get_redis + +DEFAULT_LOCK_TTL_SECONDS = 60 * 60 + + +class TaskLockFailed(Exception): + """Raised when a task lock cannot be acquired.""" + + +def _task_lock_key(task_name: str) -> str: + return f'lock:task:{task_name}' + + +def acquire_task_lock(redis_instance, task_name: str, ttl_seconds: int = DEFAULT_LOCK_TTL_SECONDS) -> str | None: + lock_key = _task_lock_key(task_name) + lock_token = str(uuid.uuid4()) + lock_acquired = bool(redis_instance.set(lock_key, lock_token, nx=True, ex=ttl_seconds)) + if not lock_acquired: + return None + return lock_token + + +def release_task_lock(redis_instance, task_name: str, lock_token: str): + """Only delete the lock if still owned by this task instance.""" + lock_key = _task_lock_key(task_name) + + with redis_instance.pipeline() as pipe: + pipe.watch(lock_key) + if pipe.get(lock_key) == lock_token: + pipe.multi() + pipe.delete(lock_key) + pipe.execute() + else: + pipe.unwatch() + + +@contextmanager +def task_lock(task_name: str, ttl_seconds: int = DEFAULT_LOCK_TTL_SECONDS): + """Context manager that acquires and releases a distributed task lock. + + Raises TaskLockFailed if Redis is available but the lock is already held. + When Redis is unavailable, execution proceeds without a lock. + """ + import sentry_sdk + + redis_instance = get_redis(os.getenv('REDIS_CELERY_DB')) + lock_token = None + + if redis_instance is not None: + lock_token = acquire_task_lock(redis_instance, task_name, ttl_seconds) + if lock_token is None: + raise TaskLockFailed(f'Failed to acquire lock for {task_name}') + else: + logging.warning(f'Redis unavailable; running {task_name} without distributed lock.') + + try: + yield + except Exception as e: + logging.warning(f'{task_name} failed: {e}') + sentry_sdk.capture_exception(e) + raise + finally: + if lock_token is not None: + try: + release_task_lock(redis_instance, task_name, lock_token) + except Exception as e: + logging.warning(f'Failed to release {task_name} lock: {e}') + sentry_sdk.capture_exception(e) diff --git a/backend/src/appointment/tasks/zoom.py b/backend/src/appointment/tasks/zoom.py new file mode 100644 index 000000000..2fa19118b --- /dev/null +++ b/backend/src/appointment/tasks/zoom.py @@ -0,0 +1,22 @@ +import logging +import sentry_sdk + +from appointment.celery_app import celery +from appointment.tasks.locks import TaskLockFailed, task_lock + + +@celery.task +def refresh_zoom_tokens(): + from appointment.commands.refresh_zoom_tokens import run + + try: + with task_lock(refresh_zoom_tokens.__name__): + logging.info('Starting Zoom token check') + run() + logging.info('Zoom token check complete') + except TaskLockFailed: + logging.info('Zoom token check is already running, skipping.') + except Exception as e: + logging.error(f'Zoom token check failed: {e}') + sentry_sdk.capture_exception(e) + raise diff --git a/backend/test/unit/test_commands.py b/backend/test/unit/test_commands.py index 0343c910c..3aa7adfa7 100644 --- a/backend/test/unit/test_commands.py +++ b/backend/test/unit/test_commands.py @@ -1,12 +1,16 @@ import json import os +import time import pytest +from contextlib import contextmanager from datetime import datetime, timedelta, timezone from unittest.mock import Mock, patch from appointment.database import models, repo -from appointment.routes.commands import cron_lock +from appointment.routes.commands import cron_lock, refresh_tokens +from appointment.tasks.locks import acquire_task_lock, release_task_lock, task_lock, TaskLockFailed +from appointment.tasks.zoom import refresh_zoom_tokens as refresh_zoom_tokens_task def test_cron_lock(): @@ -35,6 +39,183 @@ def test_cron_lock(): os.remove(test_lock_file_name) +def test_refresh_zoom_tokens_command_queues_celery_task(): + """CLI refresh command should enqueue task instead of running inline.""" + with patch('appointment.tasks.zoom.refresh_zoom_tokens.delay') as mock_delay: + refresh_tokens() + + mock_delay.assert_called_once() + + +def test_acquire_task_lock_returns_token_when_acquired(): + """Lock helper should return a lock token when Redis acquires lock.""" + + mock_redis = Mock() + mock_redis.set.return_value = True + + with patch('appointment.tasks.locks.uuid.uuid4', return_value='abc-123'): + lock_token = acquire_task_lock(mock_redis, 'refresh_zoom_tokens', ttl_seconds=123) + + assert lock_token == 'abc-123' + mock_redis.set.assert_called_once_with( + 'lock:task:refresh_zoom_tokens', + 'abc-123', + nx=True, + ex=123, + ) + + +def test_acquire_task_lock_returns_none_when_not_acquired(): + """Lock helper should return None when lock is already held.""" + + mock_redis = Mock() + mock_redis.set.return_value = False + + lock_token = acquire_task_lock(mock_redis, 'refresh_zoom_tokens') + + assert lock_token is None + + +def test_release_task_lock_deletes_when_token_matches(): + """Lock helper should delete the key when the stored token matches.""" + mock_pipe = Mock() + mock_pipe.get.return_value = 'abc-123' + mock_pipe.__enter__ = Mock(return_value=mock_pipe) + mock_pipe.__exit__ = Mock(return_value=False) + + mock_redis = Mock() + mock_redis.pipeline.return_value = mock_pipe + + release_task_lock(mock_redis, 'refresh_zoom_tokens', 'abc-123') + + mock_pipe.watch.assert_called_once_with('lock:task:refresh_zoom_tokens') + mock_pipe.multi.assert_called_once() + mock_pipe.delete.assert_called_once_with('lock:task:refresh_zoom_tokens') + mock_pipe.execute.assert_called_once() + + +def test_release_task_lock_skips_when_token_differs(): + """Lock helper should not delete the key when a different token owns it.""" + mock_pipe = Mock() + mock_pipe.get.return_value = 'other-token' + mock_pipe.__enter__ = Mock(return_value=mock_pipe) + mock_pipe.__exit__ = Mock(return_value=False) + + mock_redis = Mock() + mock_redis.pipeline.return_value = mock_pipe + + release_task_lock(mock_redis, 'refresh_zoom_tokens', 'abc-123') + + mock_pipe.watch.assert_called_once_with('lock:task:refresh_zoom_tokens') + mock_pipe.multi.assert_not_called() + mock_pipe.delete.assert_not_called() + mock_pipe.unwatch.assert_called_once() + + +def test_task_lock_context_manager_acquires_and_releases(): + """Context manager should acquire lock on entry and release on exit.""" + + mock_redis = Mock() + mock_redis.set.return_value = True + + with patch('appointment.tasks.locks.get_redis', return_value=mock_redis): + with patch('appointment.tasks.locks.uuid.uuid4', return_value='ctx-token'): + with patch('appointment.tasks.locks.release_task_lock') as mock_release: + with task_lock('my_task'): + mock_redis.set.assert_called_once() + + mock_release.assert_called_once_with(mock_redis, 'my_task', 'ctx-token') + + +def test_task_lock_context_manager_raises_when_lock_held(): + """Context manager should raise TaskLockFailed when lock is already held.""" + + mock_redis = Mock() + mock_redis.set.return_value = False + + with patch('appointment.tasks.locks.get_redis', return_value=mock_redis): + with pytest.raises(TaskLockFailed): + with task_lock('my_task'): + pass + + +def test_task_lock_context_manager_releases_on_exception(): + """Context manager should still release the lock when the body raises.""" + + mock_redis = Mock() + mock_redis.set.return_value = True + + with patch('appointment.tasks.locks.get_redis', return_value=mock_redis): + with patch('appointment.tasks.locks.uuid.uuid4', return_value='err-token'): + with patch('appointment.tasks.locks.release_task_lock') as mock_release: + with pytest.raises(RuntimeError): + with task_lock('my_task'): + raise RuntimeError('boom') + + mock_release.assert_called_once_with(mock_redis, 'my_task', 'err-token') + + +def test_task_lock_context_manager_proceeds_without_redis(): + """Context manager should proceed without locking when Redis is unavailable.""" + + ran = False + with patch('appointment.tasks.locks.get_redis', return_value=None): + with task_lock('my_task'): + ran = True + + assert ran + + +def test_refresh_zoom_tokens_task_skips_when_lock_is_held(): + """Celery task should skip execution when lock cannot be acquired.""" + + @contextmanager + def _failing_lock(task_name, ttl_seconds=None): + raise TaskLockFailed('already locked') + + with patch('appointment.tasks.zoom.task_lock', side_effect=_failing_lock): + with patch('appointment.commands.refresh_zoom_tokens.run') as mock_run: + refresh_zoom_tokens_task() + + mock_run.assert_not_called() + + +def test_refresh_zoom_tokens_task_acquires_lock_for_run(): + """Celery task should run inside the task_lock context manager.""" + + lock_entered = False + + @contextmanager + def _tracking_lock(task_name, ttl_seconds=None): + nonlocal lock_entered + lock_entered = True + yield + + with patch('appointment.tasks.zoom.task_lock', side_effect=_tracking_lock): + with patch('appointment.commands.refresh_zoom_tokens.run') as mock_run: + refresh_zoom_tokens_task() + + assert lock_entered + mock_run.assert_called_once() + + +def test_refresh_zoom_tokens_task_uses_function_name_for_lock(): + """Task should use its own function name as lock key scope.""" + + captured_name = None + + @contextmanager + def _capture_lock(task_name, ttl_seconds=None): + nonlocal captured_name + captured_name = task_name + raise TaskLockFailed('test') + + with patch('appointment.tasks.zoom.task_lock', side_effect=_capture_lock): + refresh_zoom_tokens_task() + + assert captured_name == 'refresh_zoom_tokens' + + def _make_google_token(): return json.dumps( { @@ -389,3 +570,239 @@ def test_stop_channel_is_async_and_does_not_block_renewal( assert updated.channel_id == 'new-channel-id' assert updated.state is not None assert updated.state != 'old-state' + +class TestRefreshZoomTokens: + """Tests that the refresh command correctly renews Zoom OAuth tokens.""" + + MODULE = 'appointment.commands.refresh_zoom_tokens' + + @staticmethod + def _make_zoom_token(**overrides): + token = { + 'access_token': 'old-access-token', + 'refresh_token': 'old-refresh-token', + 'token_type': 'bearer', + 'expires_in': 3600, + 'scope': 'meeting:read meeting:write user:read', + } + token.update(overrides) + return json.dumps(token) + + def _run_refresh(self, with_db, mock_zoom_client): + with patch(f'{self.MODULE}._common_setup'): + with patch(f'{self.MODULE}.get_zoom_client', return_value=mock_zoom_client): + with patch(f'{self.MODULE}.get_engine_and_session', return_value=(None, with_db)): + from appointment.commands.refresh_zoom_tokens import run + + run() + + def test_token_is_refreshed(self, with_db, make_external_connections): + """Running the command should trigger a token refresh and persist the new token in the DB.""" + old_token = self._make_zoom_token() + make_external_connections( + subscriber_id=1, + type=models.ExternalConnectionType.zoom, + token=old_token, + ) + + refreshed_token = { + 'access_token': 'new-access-token', + 'refresh_token': 'new-refresh-token', + 'token_type': 'bearer', + 'expires_in': 3600, + 'expires_at': time.time() + 3600, + 'scope': 'meeting:read meeting:write user:read', + } + + mock_zoom_client = Mock() + + def fake_setup(subscriber_id, token, threshold=0.0): + mock_zoom_client.client = Mock() + mock_zoom_client.client.token = refreshed_token + + mock_zoom_client.setup.side_effect = fake_setup + mock_zoom_client.get_me.return_value = {'id': 'user123'} + + self._run_refresh(with_db, mock_zoom_client) + + mock_zoom_client.get_me.assert_called_once() + setup_subscriber_id, setup_token = mock_zoom_client.setup.call_args.args + assert setup_subscriber_id == 1 + assert setup_token['expires_in'] == -100 + assert setup_token['expires_at'] == 0 + + with with_db() as db: + connections = repo.external_connection.get_by_type( + db, 1, models.ExternalConnectionType.zoom + ) + assert len(connections) == 1 + stored_token = json.loads(connections[0].token) + assert stored_token['access_token'] == 'new-access-token' + assert stored_token['refresh_token'] == 'new-refresh-token' + assert 'expires_at' in stored_token + + def test_skips_connection_without_token(self, with_db, make_external_connections): + """Connections with no token should be skipped.""" + make_external_connections( + subscriber_id=1, + type=models.ExternalConnectionType.zoom, + token='', + ) + + mock_zoom_client = Mock() + self._run_refresh(with_db, mock_zoom_client) + + mock_zoom_client.setup.assert_not_called() + + def test_failed_refresh_does_not_stop_others(self, with_db, make_external_connections, make_pro_subscriber): + """If one token fails to refresh, the command should continue with the rest.""" + make_external_connections( + subscriber_id=1, + type=models.ExternalConnectionType.zoom, + token=self._make_zoom_token(), + ) + + subscriber_2 = make_pro_subscriber() + make_external_connections( + subscriber_id=subscriber_2.id, + type=models.ExternalConnectionType.zoom, + token=self._make_zoom_token(), + ) + + call_count = 0 + + def fail_first_setup(subscriber_id, token, threshold=0.0): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise Exception('Zoom API error') + + mock_zoom_client = Mock() + mock_zoom_client.setup.side_effect = fail_first_setup + + self._run_refresh(with_db, mock_zoom_client) + + assert mock_zoom_client.setup.call_count == 2 + + def test_failed_refresh_marks_external_connection_error(self, with_db, make_external_connections): + """If token refresh fails, external connection status should be marked as error.""" + make_external_connections( + subscriber_id=1, + type=models.ExternalConnectionType.zoom, + token=self._make_zoom_token(), + ) + + mock_zoom_client = Mock() + mock_zoom_client.setup.side_effect = Exception('Zoom API error') + + self._run_refresh(with_db, mock_zoom_client) + + with with_db() as db: + connection = repo.external_connection.get_by_type(db, 1, models.ExternalConnectionType.zoom)[0] + assert connection.status == models.ExternalConnectionStatus.error + assert connection.status_checked_at is not None + + def test_successful_refresh_resets_external_connection_status(self, with_db, make_external_connections): + """A successful refresh should mark previously-failed connections back to ok.""" + make_external_connections( + subscriber_id=1, + type=models.ExternalConnectionType.zoom, + token=self._make_zoom_token(), + ) + + with with_db() as db: + connection = repo.external_connection.get_by_type(db, 1, models.ExternalConnectionType.zoom)[0] + repo.external_connection.update_status( + db, + connection, + models.ExternalConnectionStatus.error, + ) + + refreshed_token = { + 'access_token': 'new-access-token', + 'refresh_token': 'new-refresh-token', + 'token_type': 'bearer', + 'expires_in': 3600, + 'expires_at': time.time() + 3600, + 'scope': 'meeting:read meeting:write user:read', + } + + mock_zoom_client = Mock() + + def fake_setup(subscriber_id, token, threshold=0.0): + mock_zoom_client.client = Mock() + mock_zoom_client.client.token = refreshed_token + + mock_zoom_client.setup.side_effect = fake_setup + mock_zoom_client.get_me.return_value = {'id': 'user123'} + + self._run_refresh(with_db, mock_zoom_client) + + with with_db() as db: + connection = repo.external_connection.get_by_type(db, 1, models.ExternalConnectionType.zoom)[0] + assert connection.status == models.ExternalConnectionStatus.ok + assert connection.status_checked_at is not None + + def test_orphaned_zoom_connection_is_cleaned_up(self, with_db, make_external_connections): + """If an external connection has no subscriber, it should be deleted as orphaned.""" + make_external_connections( + subscriber_id=99999, + type=models.ExternalConnectionType.zoom, + token=self._make_zoom_token(), + ) + + mock_zoom_client = Mock() + self._run_refresh(with_db, mock_zoom_client) + + mock_zoom_client.setup.assert_not_called() + + with with_db() as db: + assert repo.external_connection.get_zoom(db) == [] + + def test_invalid_token_payload_does_not_stop_others( + self, with_db, make_external_connections, make_pro_subscriber + ): + """If one stored token is invalid JSON, the command should still refresh other users.""" + make_external_connections( + subscriber_id=1, + type=models.ExternalConnectionType.zoom, + token='not-json', + ) + + subscriber_2 = make_pro_subscriber() + make_external_connections( + subscriber_id=subscriber_2.id, + type=models.ExternalConnectionType.zoom, + token=self._make_zoom_token(), + ) + + refreshed_token = { + 'access_token': 'new-access-token', + 'refresh_token': 'new-refresh-token', + 'token_type': 'bearer', + 'expires_in': 3600, + 'expires_at': time.time() + 3600, + 'scope': 'meeting:read meeting:write user:read', + } + + mock_zoom_client = Mock() + + def fake_setup(subscriber_id, token, threshold=0.0): + assert subscriber_id == subscriber_2.id + mock_zoom_client.client = Mock() + mock_zoom_client.client.token = refreshed_token + + mock_zoom_client.setup.side_effect = fake_setup + mock_zoom_client.get_me.return_value = {'id': 'user456'} + + self._run_refresh(with_db, mock_zoom_client) + + mock_zoom_client.setup.assert_called_once() + mock_zoom_client.get_me.assert_called_once() + + with with_db() as db: + valid_connection = repo.external_connection.get_by_type( + db, subscriber_2.id, models.ExternalConnectionType.zoom + )[0] + assert json.loads(valid_connection.token)['refresh_token'] == 'new-refresh-token' +