diff --git a/wavefront/server/apps/floware/floware/config.ini b/wavefront/server/apps/floware/floware/config.ini index 82e35721..8b44df4c 100644 --- a/wavefront/server/apps/floware/floware/config.ini +++ b/wavefront/server/apps/floware/floware/config.ini @@ -103,6 +103,7 @@ gcp_location = ${GCP_LOCATION} gcp_key_ring = ${GCP_KMS_KEY_RING} gcp_crypto_key = ${GCP_KMS_CRYPTO_KEY} gcp_crypto_key_version = ${GCP_KMS_CRYPTO_KEY_VERSION} +gcp_enc_crypto_key = ${GCP_KMS_ENC_CRYPTO_KEY} [floware] asset_storage_bucket=${ASSET_STORAGE_BUCKET} @@ -177,3 +178,12 @@ url=${HERMES_URL:http://localhost:8080/flo-hermes} [workflow] worker_topic=${WORKFLOW_WORKER_TOPIC} + +[triggers_gmail] +client_id=${GOOGLE_OAUTH_CLIENT_ID} +client_secret=${GOOGLE_OAUTH_CLIENT_SECRET} +redirect_uri=${GOOGLE_OAUTH_REDIRECT_URI} +pubsub_project_id=${GCP_PROJECT_ID} +pubsub_topic_prefix=${GMAIL_PUBSUB_TOPIC_PREFIX:agentic-trigger} +push_endpoint_template=${GMAIL_PUSH_ENDPOINT_TEMPLATE} +oidc_service_account_email=${GMAIL_PUBSUB_OIDC_SA_EMAIL} diff --git a/wavefront/server/apps/floware/floware/server.py b/wavefront/server/apps/floware/floware/server.py index 9f99134f..55374a0d 100644 --- a/wavefront/server/apps/floware/floware/server.py +++ b/wavefront/server/apps/floware/floware/server.py @@ -74,6 +74,8 @@ AsyncAgenticExecutionResultConsumer, ) from agents_module.agents_container import AgentsContainer +from triggers_module.controllers.trigger_controller import trigger_router +from triggers_module.triggers_container import TriggersContainer from inference_module.inference_container import InferenceContainer from inference_module.controllers.inference_controller import inference_router @@ -202,6 +204,17 @@ cache_manager=db_repo_container.cache_manager, cloud_storage_manager=common_container.cloud_storage_manager, ) + +triggers_container = TriggersContainer( + trigger_repository=db_repo_container.agentic_trigger_repository, + credential_repository=db_repo_container.agentic_trigger_credential_repository, + event_repository=db_repo_container.agentic_trigger_event_repository, + agent_repository=db_repo_container.agent_repository, + workflow_repository=db_repo_container.workflow_repository, + async_agentic_execution_service=agents_container.async_agentic_execution_service, + cache_manager=db_repo_container.cache_manager, +) + scheduler_manager = SchedulerManager() @@ -234,6 +247,18 @@ async def lifespan(app: FastAPI): scheduler_manager.register_stale_lock_recovery( callback=scheduled_job_service.recover_stale_locks_sync ) + + trigger_subscription_renewer = triggers_container.trigger_subscription_renewer() + + def _run_trigger_renewer_sync() -> None: + try: + asyncio.run(trigger_subscription_renewer.run_once()) + except Exception as exc: + logger.warning(f'Trigger subscription renewer failed: {exc}') + + scheduler_manager.register_trigger_subscription_renewer( + callback=_run_trigger_renewer_sync + ) logger.info('Database connection established.') # Load API services from database into registry @@ -418,6 +443,7 @@ async def metrics(request: Request): app.include_router(tool_router, prefix='/floware') app.include_router(message_processor_router, prefix='/floware') app.include_router(cloud_storage_router, prefix='/floware') +app.include_router(trigger_router, prefix='/floware') @app.exception_handler(Exception) @@ -506,6 +532,7 @@ async def global_exception_handler(request: Request, exc: Exception): 'llm_inference_config_module.controllers', 'tools_module.controllers', 'voice_agents_module.controllers', + 'triggers_module.controllers', ], ) @@ -538,6 +565,14 @@ async def global_exception_handler(request: Request, exc: Exception): ], ) +triggers_container.wire( + modules=[__name__], + packages=[ + 'triggers_module.controllers', + 'triggers_module.services', + ], +) + inference_container.wire( modules=[__name__], packages=['inference_module.controllers'], diff --git a/wavefront/server/apps/floware/floware/services/scheduler_manager.py b/wavefront/server/apps/floware/floware/services/scheduler_manager.py index 48f7fecf..0cedc2fe 100644 --- a/wavefront/server/apps/floware/floware/services/scheduler_manager.py +++ b/wavefront/server/apps/floware/floware/services/scheduler_manager.py @@ -59,6 +59,17 @@ def register_stale_lock_recovery(self, callback: Callable): replace_existing=True, ) + def register_trigger_subscription_renewer(self, callback: Callable): + """Runs every 6 hours to renew provider subscriptions about to expire.""" + if self.scheduler is None: + raise RuntimeError('Scheduler must be started before registering jobs') + self.scheduler.add_job( + callback, + trigger=CronTrigger(hour='*/6'), + id='trigger-subscription-renewer', + replace_existing=True, + ) + def shutdown(self): if self.scheduler and self.scheduler.running: # wait=True ensures in-flight jobs finish before shutdown, diff --git a/wavefront/server/apps/floware/pyproject.toml b/wavefront/server/apps/floware/pyproject.toml index 143f3995..b71c6ce9 100644 --- a/wavefront/server/apps/floware/pyproject.toml +++ b/wavefront/server/apps/floware/pyproject.toml @@ -22,6 +22,7 @@ dependencies = [ "tools-module", "api-services-module", "voice-agents-module", + "triggers-module", "fastapi>=0.115.2,<1.0.0", @@ -48,6 +49,7 @@ llm-inference-config-module = {workspace = true} tools-module = {workspace = true} api-services-module = {workspace = true} voice-agents-module = {workspace = true} +triggers-module = {workspace = true} [build-system] requires = ["hatchling"] diff --git a/wavefront/server/background_jobs/celery_worker/celery_worker/celery_app.py b/wavefront/server/background_jobs/celery_worker/celery_worker/celery_app.py index d3583666..01a125cc 100644 --- a/wavefront/server/background_jobs/celery_worker/celery_worker/celery_app.py +++ b/wavefront/server/background_jobs/celery_worker/celery_worker/celery_app.py @@ -9,6 +9,7 @@ include=[ 'celery_worker.tasks.agent_task', 'celery_worker.tasks.workflow_task', + 'celery_worker.tasks.trigger_event_task', ], task_serializer='json', accept_content=['json'], diff --git a/wavefront/server/background_jobs/celery_worker/celery_worker/env.py b/wavefront/server/background_jobs/celery_worker/celery_worker/env.py index a415888e..291131f3 100644 --- a/wavefront/server/background_jobs/celery_worker/celery_worker/env.py +++ b/wavefront/server/background_jobs/celery_worker/celery_worker/env.py @@ -37,3 +37,22 @@ DB_HOST: str = os.environ['DB_HOST'] DB_PORT: str = os.environ['DB_PORT'] DB_NAME: str = os.environ['DB_NAME'] + + +def _required_env(name: str) -> str: + value = os.getenv(name) + if not value: + raise RuntimeError(f'Missing required environment variable: {name}') + return value + + +# Triggers — Gmail OAuth + Pub/Sub +GOOGLE_OAUTH_CLIENT_ID: str = _required_env('GOOGLE_OAUTH_CLIENT_ID') +GOOGLE_OAUTH_CLIENT_SECRET: str = _required_env('GOOGLE_OAUTH_CLIENT_SECRET') +GOOGLE_OAUTH_REDIRECT_URI: str = _required_env('GOOGLE_OAUTH_REDIRECT_URI') +GCP_PROJECT_ID: str = _required_env('GCP_PROJECT_ID') +GMAIL_PUBSUB_TOPIC_PREFIX: str = os.getenv( + 'GMAIL_PUBSUB_TOPIC_PREFIX', 'agentic-trigger' +) +GMAIL_PUSH_ENDPOINT_TEMPLATE: str = _required_env('GMAIL_PUSH_ENDPOINT_TEMPLATE') +GMAIL_PUBSUB_OIDC_SA_EMAIL: str = _required_env('GMAIL_PUBSUB_OIDC_SA_EMAIL') diff --git a/wavefront/server/background_jobs/celery_worker/celery_worker/tasks/trigger_event_task.py b/wavefront/server/background_jobs/celery_worker/celery_worker/tasks/trigger_event_task.py new file mode 100644 index 00000000..d107d883 --- /dev/null +++ b/wavefront/server/background_jobs/celery_worker/celery_worker/tasks/trigger_event_task.py @@ -0,0 +1,50 @@ +import asyncio +from typing import Any, Dict +from uuid import UUID + +from common_module.log.logger import logger + +from celery_worker.celery_app import app +from celery_worker.env import MAX_RETRIES, RETRY_DELAY +from celery_worker.worker_setup import get_services + + +@app.task( + name='celery_worker.tasks.trigger_event_task.process_trigger_event_task', + bind=True, + max_retries=MAX_RETRIES, + default_retry_delay=RETRY_DELAY, +) +def process_trigger_event_task( + self, trigger_id: str, raw_payload: Dict[str, Any], push_message_id: str +) -> Dict[str, Any]: + services = get_services() + processor = services.trigger_event_processor + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + parsed_trigger_id = UUID(trigger_id) + return loop.run_until_complete( + processor.process(trigger_id=parsed_trigger_id, raw_payload=raw_payload) + ) + except ValueError: + logger.error(f'Invalid trigger_id for process_trigger_event_task: {trigger_id}') + raise + except Exception as exc: + logger.exception( + f'process_trigger_event_task failed for trigger {trigger_id} ' + f'(push_message_id={push_message_id}): {exc}' + ) + if self.request.retries < self.max_retries: + raise self.retry(exc=exc) + raise + finally: + pending = asyncio.all_tasks(loop) + for task in pending: + task.cancel() + try: + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + except Exception: + pass + loop.close() diff --git a/wavefront/server/background_jobs/celery_worker/celery_worker/worker_setup.py b/wavefront/server/background_jobs/celery_worker/celery_worker/worker_setup.py index 53794e94..6fd19d66 100644 --- a/wavefront/server/background_jobs/celery_worker/celery_worker/worker_setup.py +++ b/wavefront/server/background_jobs/celery_worker/celery_worker/worker_setup.py @@ -24,6 +24,8 @@ from flo_cloud.cloud_storage import CloudStorageManager from plugins_module.plugins_container import PluginsContainer from tools_module.tools_container import ToolsContainer +from triggers_module.services.trigger_event_processor import TriggerEventProcessor +from triggers_module.triggers_container import TriggersContainer from celery_worker.env import ( AGENT_YAML_BUCKET, @@ -35,6 +37,13 @@ DB_PASSWORD, DB_PORT, DB_USERNAME, + GCP_PROJECT_ID, + GMAIL_PUBSUB_OIDC_SA_EMAIL, + GMAIL_PUBSUB_TOPIC_PREFIX, + GMAIL_PUSH_ENDPOINT_TEMPLATE, + GOOGLE_OAUTH_CLIENT_ID, + GOOGLE_OAUTH_CLIENT_SECRET, + GOOGLE_OAUTH_REDIRECT_URI, WORKFLOW_WORKER_TOPIC, ) @@ -46,6 +55,7 @@ class WorkerServices: cloud_storage: CloudStorageManager cache: CacheManager execution_bucket: str + trigger_event_processor: TriggerEventProcessor _lock = threading.Lock() @@ -145,12 +155,36 @@ def get_services() -> WorkerServices: # floware's CacheManager uses config.env_config.app_name as its namespace cache = CacheManager(namespace=APP_NAME) + triggers_container = TriggersContainer( + trigger_repository=db_repo_container.agentic_trigger_repository, + credential_repository=db_repo_container.agentic_trigger_credential_repository, + event_repository=db_repo_container.agentic_trigger_event_repository, + agent_repository=db_repo_container.agent_repository, + workflow_repository=db_repo_container.workflow_repository, + async_agentic_execution_service=agents_container.async_agentic_execution_service, + ) + triggers_container.config.from_dict( + { + 'cloud_config': {'cloud_provider': CLOUD_PROVIDER}, + 'triggers_gmail': { + 'client_id': GOOGLE_OAUTH_CLIENT_ID, + 'client_secret': GOOGLE_OAUTH_CLIENT_SECRET, + 'redirect_uri': GOOGLE_OAUTH_REDIRECT_URI, + 'pubsub_project_id': GCP_PROJECT_ID, + 'pubsub_topic_prefix': GMAIL_PUBSUB_TOPIC_PREFIX, + 'push_endpoint_template': GMAIL_PUSH_ENDPOINT_TEMPLATE, + 'oidc_service_account_email': GMAIL_PUBSUB_OIDC_SA_EMAIL or None, + }, + } + ) + _services = WorkerServices( agent_inference=agents_container.agent_inference_service(), workflow_inference=agents_container.workflow_inference_service(), cloud_storage=common_container.cloud_storage_manager(), cache=cache, execution_bucket=AGENTIC_EXECUTIONS_BUCKET, + trigger_event_processor=triggers_container.trigger_event_processor(), ) return _services diff --git a/wavefront/server/background_jobs/celery_worker/pyproject.toml b/wavefront/server/background_jobs/celery_worker/pyproject.toml index 5ef8d08d..8915f899 100644 --- a/wavefront/server/background_jobs/celery_worker/pyproject.toml +++ b/wavefront/server/background_jobs/celery_worker/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "tools-module", "api-services-module", "common-module", + "triggers-module", "celery[redis]>=5.4.0,<6.0.0", "python-dotenv>=1.1.0,<2.0.0", ] @@ -27,6 +28,7 @@ flo-utils = { workspace = true } tools-module = { workspace = true } api-services-module = { workspace = true } common-module = { workspace = true } +triggers-module = { workspace = true } [tool.uv] package = true diff --git a/wavefront/server/docker/celery_worker.Dockerfile b/wavefront/server/docker/celery_worker.Dockerfile new file mode 100644 index 00000000..d6249779 --- /dev/null +++ b/wavefront/server/docker/celery_worker.Dockerfile @@ -0,0 +1,43 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY --from=ghcr.io/astral-sh/uv:0.8.6 /uv /uvx /bin/ + +RUN apt-get update && apt-get install -y \ + libpq-dev \ + gcc \ + libgl1 \ + libglib2.0-0 \ + && rm -rf /var/lib/apt/lists/* + +COPY wavefront/server/pyproject.toml wavefront/server/uv.lock ./ + +COPY wavefront/server/modules/common_module /app/modules/common_module +COPY wavefront/server/modules/db_repo_module /app/modules/db_repo_module +COPY wavefront/server/modules/knowledge_base_module /app/modules/knowledge_base_module +COPY wavefront/server/modules/llm_inference_config_module /app/modules/llm_inference_config_module +COPY wavefront/server/modules/agents_module /app/modules/agents_module +COPY wavefront/server/modules/plugins_module /app/modules/plugins_module +COPY wavefront/server/modules/tools_module /app/modules/tools_module +COPY wavefront/server/modules/api_services_module /app/modules/api_services_module +COPY wavefront/server/modules/triggers_module /app/modules/triggers_module + +COPY wavefront/server/packages/flo_cloud /app/packages/flo_cloud +COPY wavefront/server/packages/flo_utils /app/packages/flo_utils + +COPY wavefront/server/plugins/datasource /app/plugins/datasource +COPY wavefront/server/plugins/authenticator /app/plugins/authenticator + +COPY wavefront/server/background_jobs/celery_worker /app/background_jobs/celery_worker + +RUN uv sync --package celery-worker --frozen --no-dev + +RUN useradd -m -u 1000 celery && \ + chown -R celery:celery /app + +USER celery + +WORKDIR /app/background_jobs/celery_worker + +CMD ["uv", "run", "celery", "-A", "celery_worker.celery_app", "worker", "--loglevel=info", "--pool=solo"] diff --git a/wavefront/server/docker/floware.Dockerfile b/wavefront/server/docker/floware.Dockerfile index 15d1fe1c..d0253815 100644 --- a/wavefront/server/docker/floware.Dockerfile +++ b/wavefront/server/docker/floware.Dockerfile @@ -27,6 +27,7 @@ COPY wavefront/server/modules/inference_module /app/modules/inference_module COPY wavefront/server/modules/tools_module /app/modules/tools_module COPY wavefront/server/modules/voice_agents_module /app/modules/voice_agents_module COPY wavefront/server/modules/api_services_module /app/modules/api_services_module +COPY wavefront/server/modules/triggers_module /app/modules/triggers_module COPY wavefront/server/packages/flo_cloud /app/packages/flo_cloud COPY wavefront/server/packages/flo_utils /app/packages/flo_utils diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/env.py b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/env.py index 676e98e1..c7ccbb8d 100644 --- a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/env.py +++ b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/env.py @@ -35,6 +35,10 @@ from db_repo_module.models.message_processors import MessageProcessors from db_repo_module.models.scheduled_job import ScheduledJob from db_repo_module.models.scheduled_job_execution import ScheduledJobExecution +from db_repo_module.models.async_agentic_execution import AsyncAgenticExecution +from db_repo_module.models.agentic_trigger_credential import AgenticTriggerCredential +from db_repo_module.models.agentic_trigger import AgenticTrigger +from db_repo_module.models.agentic_trigger_event import AgenticTriggerEvent from dotenv import load_dotenv from sqlalchemy import engine_from_config from sqlalchemy import pool @@ -80,6 +84,10 @@ MessageProcessors, ScheduledJob, ScheduledJobExecution, + AsyncAgenticExecution, + AgenticTriggerCredential, + AgenticTrigger, + AgenticTriggerEvent, ] target_metadata = Base.metadata diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_05_16_1401-74c837a023f3_add_agentic_triggers.py b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_05_16_1401-74c837a023f3_add_agentic_triggers.py new file mode 100644 index 00000000..2defcb8f --- /dev/null +++ b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_05_16_1401-74c837a023f3_add_agentic_triggers.py @@ -0,0 +1,256 @@ +"""add agentic triggers + +Revision ID: 74c837a023f3 +Revises: 3b5b1bf90e6c +Create Date: 2026-05-16 14:01:11.712250 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + + +# revision identifiers, used by Alembic. +revision: str = '74c837a023f3' +down_revision: Union[str, None] = '3b5b1bf90e6c' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + 'agentic_trigger_credentials', + sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column( + 'provider', + sa.String(length=32), + nullable=False, + comment='possible values: gmail', + ), + sa.Column('external_account_id', sa.String(length=320), nullable=False), + sa.Column('encrypted_refresh_token', sa.Text(), nullable=False), + sa.Column('encrypted_access_token', sa.Text(), nullable=True), + sa.Column('token_expires_at', sa.DateTime(timezone=True), nullable=True), + sa.Column('scopes', sa.Text(), nullable=True), + sa.Column( + 'created_at', + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text('now()'), + ), + sa.Column( + 'updated_at', + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text('now()'), + ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint( + 'provider', + 'external_account_id', + name='uq_trigger_credential_account', + ), + ) + op.create_index( + op.f('ix_agentic_trigger_credentials_id'), + 'agentic_trigger_credentials', + ['id'], + unique=False, + ) + op.create_index( + 'ix_agentic_trigger_credentials_provider', + 'agentic_trigger_credentials', + ['provider'], + unique=False, + ) + op.create_index( + 'ix_agentic_trigger_credentials_external_account_id', + 'agentic_trigger_credentials', + ['external_account_id'], + unique=False, + ) + + op.create_table( + 'agentic_triggers', + sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False), + sa.Column( + 'provider', + sa.String(length=32), + nullable=False, + comment='possible values: gmail', + ), + sa.Column( + 'entity_type', + sa.String(length=32), + nullable=False, + comment='possible values: agent, workflow', + ), + sa.Column('entity_id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('namespace', sa.String(length=255), nullable=True), + sa.Column( + 'status', + sa.String(length=32), + nullable=False, + server_default=sa.text("'pending_auth'"), + comment=('possible values: pending_auth, active, paused, error, deleted'), + ), + sa.Column( + 'filter_config', postgresql.JSONB(astext_type=sa.Text()), nullable=True + ), + sa.Column( + 'provider_config', + postgresql.JSONB(astext_type=sa.Text()), + nullable=True, + ), + sa.Column('credential_id', postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('last_error', sa.Text(), nullable=True), + sa.Column( + 'created_at', + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text('now()'), + ), + sa.Column( + 'updated_at', + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text('now()'), + ), + sa.ForeignKeyConstraint( + ['credential_id'], + ['agentic_trigger_credentials.id'], + ondelete='SET NULL', + ), + sa.PrimaryKeyConstraint('id'), + ) + op.create_index( + op.f('ix_agentic_triggers_id'), + 'agentic_triggers', + ['id'], + unique=False, + ) + op.create_index( + 'ix_agentic_triggers_provider', + 'agentic_triggers', + ['provider'], + unique=False, + ) + op.create_index( + 'ix_agentic_triggers_entity_id', + 'agentic_triggers', + ['entity_id'], + unique=False, + ) + op.create_index( + 'ix_agentic_triggers_status', + 'agentic_triggers', + ['status'], + unique=False, + ) + op.create_index( + 'ix_agentic_triggers_credential_id', + 'agentic_triggers', + ['credential_id'], + unique=False, + ) + + op.create_table( + 'agentic_trigger_events', + sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('trigger_id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('provider_event_id', sa.String(length=255), nullable=False), + sa.Column( + 'status', + sa.String(length=32), + nullable=False, + comment=('possible values: received, filtered_out, dispatched, failed'), + ), + sa.Column('execution_id', postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('subject', sa.String(length=1024), nullable=True), + sa.Column('error', sa.Text(), nullable=True), + sa.Column( + 'received_at', + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text('now()'), + ), + sa.Column('processed_at', sa.DateTime(timezone=True), nullable=True), + sa.ForeignKeyConstraint( + ['trigger_id'], ['agentic_triggers.id'], ondelete='CASCADE' + ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint( + 'trigger_id', + 'provider_event_id', + name='uq_trigger_event_provider_id', + ), + ) + op.create_index( + op.f('ix_agentic_trigger_events_id'), + 'agentic_trigger_events', + ['id'], + unique=False, + ) + op.create_index( + 'ix_agentic_trigger_events_trigger_id', + 'agentic_trigger_events', + ['trigger_id'], + unique=False, + ) + op.create_index( + 'ix_agentic_trigger_events_status', + 'agentic_trigger_events', + ['status'], + unique=False, + ) + op.create_index( + 'ix_agentic_trigger_events_execution_id', + 'agentic_trigger_events', + ['execution_id'], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index( + 'ix_agentic_trigger_events_execution_id', + table_name='agentic_trigger_events', + ) + op.drop_index( + 'ix_agentic_trigger_events_status', + table_name='agentic_trigger_events', + ) + op.drop_index( + 'ix_agentic_trigger_events_trigger_id', + table_name='agentic_trigger_events', + ) + op.drop_index( + op.f('ix_agentic_trigger_events_id'), + table_name='agentic_trigger_events', + ) + op.drop_table('agentic_trigger_events') + + op.drop_index('ix_agentic_triggers_credential_id', table_name='agentic_triggers') + op.drop_index('ix_agentic_triggers_status', table_name='agentic_triggers') + op.drop_index('ix_agentic_triggers_entity_id', table_name='agentic_triggers') + op.drop_index('ix_agentic_triggers_provider', table_name='agentic_triggers') + op.drop_index(op.f('ix_agentic_triggers_id'), table_name='agentic_triggers') + op.drop_table('agentic_triggers') + + op.drop_index( + 'ix_agentic_trigger_credentials_external_account_id', + table_name='agentic_trigger_credentials', + ) + op.drop_index( + 'ix_agentic_trigger_credentials_provider', + table_name='agentic_trigger_credentials', + ) + op.drop_index( + op.f('ix_agentic_trigger_credentials_id'), + table_name='agentic_trigger_credentials', + ) + op.drop_table('agentic_trigger_credentials') diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/db_repo_container.py b/wavefront/server/modules/db_repo_module/db_repo_module/db_repo_container.py index 3a984ece..d547edac 100644 --- a/wavefront/server/modules/db_repo_module/db_repo_module/db_repo_container.py +++ b/wavefront/server/modules/db_repo_module/db_repo_module/db_repo_container.py @@ -36,6 +36,9 @@ from db_repo_module.models.workflow import Workflow from db_repo_module.models.api_services import ApiServices from db_repo_module.models.async_agentic_execution import AsyncAgenticExecution +from db_repo_module.models.agentic_trigger_credential import AgenticTriggerCredential +from db_repo_module.models.agentic_trigger import AgenticTrigger +from db_repo_module.models.agentic_trigger_event import AgenticTriggerEvent from dependency_injector import containers from dependency_injector import providers @@ -240,3 +243,21 @@ class DatabaseModuleContainer(containers.DeclarativeContainer): model=AsyncAgenticExecution, db_client=db_client, ) + + agentic_trigger_credential_repository = providers.Singleton( + SQLAlchemyRepository[AgenticTriggerCredential], + model=AgenticTriggerCredential, + db_client=db_client, + ) + + agentic_trigger_repository = providers.Singleton( + SQLAlchemyRepository[AgenticTrigger], + model=AgenticTrigger, + db_client=db_client, + ) + + agentic_trigger_event_repository = providers.Singleton( + SQLAlchemyRepository[AgenticTriggerEvent], + model=AgenticTriggerEvent, + db_client=db_client, + ) diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/models/agentic_trigger.py b/wavefront/server/modules/db_repo_module/db_repo_module/models/agentic_trigger.py new file mode 100644 index 00000000..55c3686a --- /dev/null +++ b/wavefront/server/modules/db_repo_module/db_repo_module/models/agentic_trigger.py @@ -0,0 +1,69 @@ +import uuid +from datetime import datetime + +from sqlalchemy import ForeignKey, String, Text, func +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.orm import Mapped, mapped_column + +from ..database.base import Base + + +class AgenticTrigger(Base): + __tablename__ = 'agentic_triggers' + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True + ) + name: Mapped[str] = mapped_column(String(length=255), nullable=False) + provider: Mapped[str] = mapped_column( + String(length=32), + nullable=False, + index=True, + comment='possible values: gmail', + ) + entity_type: Mapped[str] = mapped_column( + String(length=32), + nullable=False, + comment='possible values: agent, workflow', + ) + entity_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), nullable=False, index=True + ) + namespace: Mapped[str] = mapped_column(String(length=255), nullable=True) + status: Mapped[str] = mapped_column( + String(length=32), + nullable=False, + index=True, + server_default='pending_auth', + comment='possible values: pending_auth, active, paused, error, deleted', + ) + filter_config: Mapped[dict] = mapped_column(JSONB, nullable=True) + provider_config: Mapped[dict] = mapped_column(JSONB, nullable=True) + credential_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey('agentic_trigger_credentials.id', ondelete='SET NULL'), + nullable=True, + index=True, + ) + last_error: Mapped[str] = mapped_column(Text, nullable=True) + created_at: Mapped[datetime] = mapped_column(nullable=False, default=func.now()) + updated_at: Mapped[datetime] = mapped_column( + nullable=False, default=func.now(), onupdate=func.now() + ) + + def to_dict(self): + return { + 'id': str(self.id), + 'name': self.name, + 'provider': self.provider, + 'entity_type': self.entity_type, + 'entity_id': str(self.entity_id), + 'namespace': self.namespace, + 'status': self.status, + 'filter_config': self.filter_config, + 'provider_config': self.provider_config, + 'credential_id': str(self.credential_id) if self.credential_id else None, + 'last_error': self.last_error, + 'created_at': self.created_at.isoformat() if self.created_at else None, + 'updated_at': self.updated_at.isoformat() if self.updated_at else None, + } diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/models/agentic_trigger_credential.py b/wavefront/server/modules/db_repo_module/db_repo_module/models/agentic_trigger_credential.py new file mode 100644 index 00000000..af6f1c3b --- /dev/null +++ b/wavefront/server/modules/db_repo_module/db_repo_module/models/agentic_trigger_credential.py @@ -0,0 +1,52 @@ +import uuid +from datetime import datetime + +from sqlalchemy import String, Text, UniqueConstraint, func +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import Mapped, mapped_column + +from ..database.base import Base + + +class AgenticTriggerCredential(Base): + __tablename__ = 'agentic_trigger_credentials' + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True + ) + provider: Mapped[str] = mapped_column( + String(length=32), + nullable=False, + index=True, + comment='possible values: gmail', + ) + external_account_id: Mapped[str] = mapped_column( + String(length=320), nullable=False, index=True + ) + encrypted_refresh_token: Mapped[str] = mapped_column(Text, nullable=False) + encrypted_access_token: Mapped[str] = mapped_column(Text, nullable=True) + token_expires_at: Mapped[datetime] = mapped_column(nullable=True) + scopes: Mapped[str] = mapped_column(Text, nullable=True) + created_at: Mapped[datetime] = mapped_column(nullable=False, default=func.now()) + updated_at: Mapped[datetime] = mapped_column( + nullable=False, default=func.now(), onupdate=func.now() + ) + + __table_args__ = ( + UniqueConstraint( + 'provider', 'external_account_id', name='uq_trigger_credential_account' + ), + ) + + def to_dict(self): + return { + 'id': str(self.id), + 'provider': self.provider, + 'external_account_id': self.external_account_id, + 'token_expires_at': self.token_expires_at.isoformat() + if self.token_expires_at + else None, + 'scopes': self.scopes, + 'created_at': self.created_at.isoformat() if self.created_at else None, + 'updated_at': self.updated_at.isoformat() if self.updated_at else None, + } diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/models/agentic_trigger_event.py b/wavefront/server/modules/db_repo_module/db_repo_module/models/agentic_trigger_event.py new file mode 100644 index 00000000..d840c11e --- /dev/null +++ b/wavefront/server/modules/db_repo_module/db_repo_module/models/agentic_trigger_event.py @@ -0,0 +1,57 @@ +import uuid +from datetime import datetime + +from sqlalchemy import ForeignKey, String, Text, UniqueConstraint, func +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import Mapped, mapped_column + +from ..database.base import Base + + +class AgenticTriggerEvent(Base): + __tablename__ = 'agentic_trigger_events' + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True + ) + trigger_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey('agentic_triggers.id', ondelete='CASCADE'), + nullable=False, + index=True, + ) + provider_event_id: Mapped[str] = mapped_column(String(length=255), nullable=False) + status: Mapped[str] = mapped_column( + String(length=32), + nullable=False, + index=True, + comment='possible values: received, filtered_out, dispatched, failed', + ) + execution_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), nullable=True, index=True + ) + subject: Mapped[str] = mapped_column(String(length=1024), nullable=True) + error: Mapped[str] = mapped_column(Text, nullable=True) + received_at: Mapped[datetime] = mapped_column(nullable=False, default=func.now()) + processed_at: Mapped[datetime] = mapped_column(nullable=True) + + __table_args__ = ( + UniqueConstraint( + 'trigger_id', 'provider_event_id', name='uq_trigger_event_provider_id' + ), + ) + + def to_dict(self): + return { + 'id': str(self.id), + 'trigger_id': str(self.trigger_id), + 'provider_event_id': self.provider_event_id, + 'status': self.status, + 'execution_id': str(self.execution_id) if self.execution_id else None, + 'subject': self.subject, + 'error': self.error, + 'received_at': self.received_at.isoformat() if self.received_at else None, + 'processed_at': self.processed_at.isoformat() + if self.processed_at + else None, + } diff --git a/wavefront/server/modules/triggers_module/pyproject.toml b/wavefront/server/modules/triggers_module/pyproject.toml new file mode 100644 index 00000000..fa558a73 --- /dev/null +++ b/wavefront/server/modules/triggers_module/pyproject.toml @@ -0,0 +1,44 @@ +[project] +name = "triggers-module" +version = "0.1.0" +description = "Agentic triggers module — external events fire v3 inference" +authors = [ + { name = "rootflo engineering", email = "engineering@rootflo.ai" } +] +requires-python = ">=3.11" + +dependencies = [ + "common-module", + "db-repo-module", + "agents-module", + "flo-cloud", + "flo-utils", + "google-auth>=2.30.0", + "google-auth-oauthlib>=1.2.0", + "google-api-python-client>=2.130.0", + "google-cloud-pubsub>=2.21.0", + "beautifulsoup4>=4.8.0,<4.9.dev0", + "fastapi>=0.110.0", + "pydantic>=2.0.0", + "dependency-injector>=4.41.0", +] + +[tool.uv.sources] +common-module = { workspace = true } +db-repo-module = { workspace = true } +agents-module = { workspace = true } +flo-cloud = { workspace = true } +flo-utils = { workspace = true } + +[tool.pytest.ini_options] +asyncio_mode = "auto" + +[tool.uv] +package = true + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["triggers_module"] diff --git a/wavefront/server/modules/triggers_module/triggers_module/controllers/trigger_controller.py b/wavefront/server/modules/triggers_module/triggers_module/controllers/trigger_controller.py new file mode 100644 index 00000000..c1693bce --- /dev/null +++ b/wavefront/server/modules/triggers_module/triggers_module/controllers/trigger_controller.py @@ -0,0 +1,309 @@ +from json import JSONDecodeError +from typing import Optional +from urllib.parse import urlparse +from uuid import UUID + +from common_module.common_container import CommonContainer +from common_module.log.logger import logger +from common_module.response_formatter import ResponseFormatter +from dependency_injector.wiring import Provide, inject +from fastapi import APIRouter, Depends, Header, Query, Request, Response, status +from fastapi.responses import JSONResponse, RedirectResponse + +from triggers_module.models.trigger_schemas import CreateTriggerRequest +from triggers_module.services.trigger_crud_service import ( + EntityNotFound, + InvalidTriggerState, + TriggerCrudService, + TriggerNotFound, +) +from triggers_module.services.trigger_push_receiver import ( + TriggerMismatch, + TriggerPushReceiver, +) +from triggers_module.triggers_container import TriggersContainer + + +trigger_router = APIRouter(prefix='/v1/triggers', tags=['triggers']) + + +def _is_safe_redirect(url: str) -> bool: + parsed = urlparse(url) + # Allow only relative URLs (no scheme, no host) to prevent open redirects. + return not parsed.scheme and not parsed.netloc + + +@trigger_router.post('', status_code=status.HTTP_201_CREATED) +@inject +async def create_trigger( + payload: CreateTriggerRequest, + trigger_crud_service: TriggerCrudService = Depends( + Provide[TriggersContainer.trigger_crud_service] + ), + response_formatter: ResponseFormatter = Depends( + Provide[CommonContainer.response_formatter] + ), +): + try: + result = await trigger_crud_service.create_trigger(payload) + except EntityNotFound as exc: + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content=response_formatter.buildErrorResponse(str(exc)), + ) + except InvalidTriggerState as exc: + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content=response_formatter.buildErrorResponse(str(exc)), + ) + + return JSONResponse( + status_code=status.HTTP_201_CREATED, + content=response_formatter.buildSuccessResponse( + { + 'message': 'Trigger created', + 'data': result.model_dump(mode='json'), + } + ), + ) + + +@trigger_router.get('/oauth/google/callback') +@inject +async def gmail_oauth_callback( + state: str = Query(...), + code: str = Query(...), + success_redirect_url: Optional[str] = Query(default=None), + failure_redirect_url: Optional[str] = Query(default=None), + trigger_crud_service: TriggerCrudService = Depends( + Provide[TriggersContainer.trigger_crud_service] + ), + response_formatter: ResponseFormatter = Depends( + Provide[CommonContainer.response_formatter] + ), +): + try: + result = await trigger_crud_service.complete_oauth(state=state, code=code) + except TriggerNotFound as exc: + if failure_redirect_url and _is_safe_redirect(failure_redirect_url): + return RedirectResponse(url=failure_redirect_url) + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content=response_formatter.buildErrorResponse(str(exc)), + ) + except InvalidTriggerState as exc: + if failure_redirect_url and _is_safe_redirect(failure_redirect_url): + return RedirectResponse(url=failure_redirect_url) + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content=response_formatter.buildErrorResponse(str(exc)), + ) + + if success_redirect_url and _is_safe_redirect(success_redirect_url): + return RedirectResponse(url=success_redirect_url) + return JSONResponse( + status_code=status.HTTP_200_OK, + content=response_formatter.buildSuccessResponse( + { + 'message': 'Trigger activated', + 'data': result.model_dump(mode='json'), + } + ), + ) + + +@trigger_router.get('') +@inject +async def list_triggers( + provider: Optional[str] = Query(default=None), + namespace: Optional[str] = Query(default=None), + status_filter: Optional[str] = Query(default=None, alias='status'), + limit: int = Query(default=100, ge=1, le=500), + trigger_crud_service: TriggerCrudService = Depends( + Provide[TriggersContainer.trigger_crud_service] + ), + response_formatter: ResponseFormatter = Depends( + Provide[CommonContainer.response_formatter] + ), +): + triggers = await trigger_crud_service.list_triggers( + provider=provider, namespace=namespace, status=status_filter, limit=limit + ) + return JSONResponse( + status_code=status.HTTP_200_OK, + content=response_formatter.buildSuccessResponse( + {'data': [t.model_dump(mode='json') for t in triggers]} + ), + ) + + +@trigger_router.get('/{trigger_id}') +@inject +async def get_trigger( + trigger_id: UUID, + trigger_crud_service: TriggerCrudService = Depends( + Provide[TriggersContainer.trigger_crud_service] + ), + response_formatter: ResponseFormatter = Depends( + Provide[CommonContainer.response_formatter] + ), +): + try: + result = await trigger_crud_service.get_trigger(trigger_id) + except TriggerNotFound as exc: + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content=response_formatter.buildErrorResponse(str(exc)), + ) + return JSONResponse( + status_code=status.HTTP_200_OK, + content=response_formatter.buildSuccessResponse( + {'data': result.model_dump(mode='json')} + ), + ) + + +@trigger_router.post('/{trigger_id}/pause') +@inject +async def pause_trigger( + trigger_id: UUID, + trigger_crud_service: TriggerCrudService = Depends( + Provide[TriggersContainer.trigger_crud_service] + ), + response_formatter: ResponseFormatter = Depends( + Provide[CommonContainer.response_formatter] + ), +): + try: + result = await trigger_crud_service.pause_trigger(trigger_id) + except TriggerNotFound as exc: + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content=response_formatter.buildErrorResponse(str(exc)), + ) + return JSONResponse( + status_code=status.HTTP_200_OK, + content=response_formatter.buildSuccessResponse( + {'data': result.model_dump(mode='json')} + ), + ) + + +@trigger_router.post('/{trigger_id}/resume') +@inject +async def resume_trigger( + trigger_id: UUID, + trigger_crud_service: TriggerCrudService = Depends( + Provide[TriggersContainer.trigger_crud_service] + ), + response_formatter: ResponseFormatter = Depends( + Provide[CommonContainer.response_formatter] + ), +): + try: + result = await trigger_crud_service.resume_trigger(trigger_id) + except TriggerNotFound as exc: + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content=response_formatter.buildErrorResponse(str(exc)), + ) + return JSONResponse( + status_code=status.HTTP_200_OK, + content=response_formatter.buildSuccessResponse( + {'data': result.model_dump(mode='json')} + ), + ) + + +@trigger_router.post('/{trigger_id}/retry') +@inject +async def retry_trigger( + trigger_id: UUID, + trigger_crud_service: TriggerCrudService = Depends( + Provide[TriggersContainer.trigger_crud_service] + ), + response_formatter: ResponseFormatter = Depends( + Provide[CommonContainer.response_formatter] + ), +): + try: + result = await trigger_crud_service.retry_trigger(trigger_id) + except TriggerNotFound as exc: + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content=response_formatter.buildErrorResponse(str(exc)), + ) + except InvalidTriggerState as exc: + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content=response_formatter.buildErrorResponse(str(exc)), + ) + return JSONResponse( + status_code=status.HTTP_200_OK, + content=response_formatter.buildSuccessResponse( + {'data': result.model_dump(mode='json')} + ), + ) + + +@trigger_router.delete('/{trigger_id}', status_code=status.HTTP_204_NO_CONTENT) +@inject +async def delete_trigger( + trigger_id: UUID, + trigger_crud_service: TriggerCrudService = Depends( + Provide[TriggersContainer.trigger_crud_service] + ), +): + try: + await trigger_crud_service.delete_trigger(trigger_id) + except TriggerNotFound: + pass + return Response(status_code=status.HTTP_204_NO_CONTENT) + + +@trigger_router.post('/{trigger_id}/{agentic_id}/invoke') +@inject +async def invoke_trigger( + request: Request, + trigger_id: UUID, + agentic_id: UUID, + authorization: Optional[str] = Header(default=None), + push_receiver: TriggerPushReceiver = Depends( + Provide[TriggersContainer.trigger_push_receiver] + ), + response_formatter: ResponseFormatter = Depends( + Provide[CommonContainer.response_formatter] + ), +): + try: + raw_payload = await request.json() + except (JSONDecodeError, ValueError) as exc: + logger.warning(f'Trigger invoke received invalid JSON for {trigger_id}: {exc}') + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content=response_formatter.buildErrorResponse('invalid_json_payload'), + ) + + try: + result = await push_receiver.handle_push( + trigger_id=trigger_id, + agentic_id=agentic_id, + raw_payload=raw_payload, + authorization_header=authorization, + ) + except TriggerMismatch as exc: + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content=response_formatter.buildErrorResponse(str(exc)), + ) + except Exception as exc: + logger.exception(f'Trigger invoke failed for {trigger_id}: {exc}') + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=response_formatter.buildErrorResponse('internal_error'), + ) + + return JSONResponse( + status_code=status.HTTP_200_OK, + content=response_formatter.buildSuccessResponse(result), + ) diff --git a/wavefront/server/modules/triggers_module/triggers_module/models/trigger_schemas.py b/wavefront/server/modules/triggers_module/triggers_module/models/trigger_schemas.py new file mode 100644 index 00000000..0864ad90 --- /dev/null +++ b/wavefront/server/modules/triggers_module/triggers_module/models/trigger_schemas.py @@ -0,0 +1,88 @@ +from datetime import datetime +from typing import Any, Dict, List, Literal, Optional +from uuid import UUID + +from pydantic import BaseModel, Field + + +TriggerProviderLiteral = Literal['gmail'] +TriggerEntityTypeLiteral = Literal['agent', 'workflow'] +TriggerStatusLiteral = Literal['pending_auth', 'active', 'paused', 'error', 'deleted'] +TriggerEventStatusLiteral = Literal['received', 'filtered_out', 'dispatched', 'failed'] + + +class TriggerFilterConfig(BaseModel): + subject_regex: Optional[str] = Field( + default=None, + description='Python regex matched against the email subject. If None, no subject filter.', + ) + allowed_mime_types: Optional[List[str]] = Field( + default=None, + description='Whitelist of attachment MIME types. If None, the server default applies.', + ) + + +class CreateTriggerRequest(BaseModel): + name: str + provider: TriggerProviderLiteral + entity_type: TriggerEntityTypeLiteral + entity_id: UUID + namespace: Optional[str] = None + filter_config: TriggerFilterConfig = Field(default_factory=TriggerFilterConfig) + provider_config: Optional[Dict[str, Any]] = None + + +class CreateTriggerResponse(BaseModel): + trigger_id: UUID + status: TriggerStatusLiteral + consent_url: Optional[str] = Field( + default=None, + description='OAuth consent URL when the provider requires user authorization.', + ) + + +class TriggerResponse(BaseModel): + id: UUID + name: str + provider: TriggerProviderLiteral + entity_type: TriggerEntityTypeLiteral + entity_id: UUID + namespace: Optional[str] + status: TriggerStatusLiteral + filter_config: Optional[Dict[str, Any]] = None + provider_config: Optional[Dict[str, Any]] = None + credential_id: Optional[UUID] = None + last_error: Optional[str] = None + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + + +class TriggerEventResponse(BaseModel): + id: UUID + trigger_id: UUID + provider_event_id: str + status: TriggerEventStatusLiteral + execution_id: Optional[UUID] = None + subject: Optional[str] = None + error: Optional[str] = None + received_at: Optional[datetime] = None + processed_at: Optional[datetime] = None + + +class GmailPubSubMessage(BaseModel): + """The base64-decoded JSON inside `message.data` from a Pub/Sub push.""" + + emailAddress: str + historyId: int + + +class GmailPubSubPushPayload(BaseModel): + """Raw Pub/Sub push wrapper. `message.data` is base64-encoded JSON.""" + + class _Message(BaseModel): + data: str + messageId: Optional[str] = None + publishTime: Optional[str] = None + + message: _Message + subscription: Optional[str] = None diff --git a/wavefront/server/modules/triggers_module/triggers_module/providers/base.py b/wavefront/server/modules/triggers_module/triggers_module/providers/base.py new file mode 100644 index 00000000..486530ab --- /dev/null +++ b/wavefront/server/modules/triggers_module/triggers_module/providers/base.py @@ -0,0 +1,100 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Dict, List, Optional + + +@dataclass +class TokenBundle: + refresh_token: str + access_token: Optional[str] + expires_at: Optional[datetime] + scopes: Optional[str] + external_account_id: str + + +@dataclass +class Attachment: + file_name: str + mime_type: str + content_bytes: bytes + + +@dataclass +class NormalizedEmailEvent: + provider_event_id: str + subject: str + sender: Optional[str] + body_text: str + attachments: List[Attachment] = field(default_factory=list) + + +class TriggerProvider(ABC): + """Strategy interface for an external event source that can fire an agentic + inference. Implementations cover the lifecycle: OAuth (optional), webhook + subscription (start/renew/stop), and event normalisation.""" + + provider_type: str = '' + requires_oauth: bool = False + + def build_consent_url(self, trigger_id: str, scopes: List[str]) -> Optional[str]: + if not self.requires_oauth: + return None + raise NotImplementedError + + async def exchange_oauth_code(self, code: str, trigger_id: str) -> TokenBundle: + raise NotImplementedError + + async def refresh_access_token(self, refresh_token: str) -> TokenBundle: + raise NotImplementedError + + @abstractmethod + async def start_subscription( + self, + trigger_id: str, + access_token: str, + external_account_id: str, + agentic_id: Optional[str] = None, + ) -> Dict[str, Any]: + """Register the upstream subscription/watch. Returns provider_config to persist. + + `agentic_id` is the target entity (agent/workflow) id; some providers + bake it into the push endpoint URL when registering the subscription. + """ + + @abstractmethod + async def stop_subscription( + self, + provider_config: Dict[str, Any], + access_token: str, + external_account_id: str, + ) -> None: + """Tear down the upstream subscription/watch.""" + + @abstractmethod + async def renew_subscription( + self, + provider_config: Dict[str, Any], + access_token: str, + external_account_id: str, + ) -> Dict[str, Any]: + """Renew the upstream subscription/watch. Returns updated provider_config.""" + + @abstractmethod + async def fetch_events( + self, + access_token: str, + provider_config: Dict[str, Any], + raw_push_payload: Dict[str, Any], + ) -> List[NormalizedEmailEvent]: + """Decode the push payload, fetch the underlying messages, normalize.""" + + def extract_push_cursor(self, raw_push_payload: Dict[str, Any]) -> Optional[int]: + """Return the provider's monotonically-increasing cursor from a push + payload (Gmail: historyId). Used by the webhook receiver to short-circuit + stale redeliveries before doing any DB / upstream-API work. + + Return None if the cursor can't be derived; the receiver will then fall + through to enqueueing normally. + """ + return None diff --git a/wavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_oauth.py b/wavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_oauth.py new file mode 100644 index 00000000..a216e7fd --- /dev/null +++ b/wavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_oauth.py @@ -0,0 +1,121 @@ +from datetime import datetime, timedelta, timezone +from typing import List, Optional +from urllib.parse import urlencode + +import requests + +from common_module.log.logger import logger +from triggers_module.providers.base import TokenBundle + + +GOOGLE_AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth' +GOOGLE_TOKEN_URL = 'https://oauth2.googleapis.com/token' +GOOGLE_USERINFO_URL = 'https://www.googleapis.com/oauth2/v2/userinfo' + + +class GmailOAuthClient: + def __init__( + self, + client_id: str, + client_secret: str, + redirect_uri: str, + ): + self.client_id = client_id + self.client_secret = client_secret + self.redirect_uri = redirect_uri + + def build_consent_url(self, state: str, scopes: List[str]) -> str: + params = { + 'client_id': self.client_id, + 'redirect_uri': self.redirect_uri, + 'response_type': 'code', + 'scope': ' '.join(scopes), + 'access_type': 'offline', + 'prompt': 'consent', + # 'include_granted_scopes': 'true', + 'state': state, + } + return f'{GOOGLE_AUTH_URL}?{urlencode(params)}' + + def exchange_code(self, code: str) -> TokenBundle: + response = requests.post( + GOOGLE_TOKEN_URL, + data={ + 'code': code, + 'client_id': self.client_id, + 'client_secret': self.client_secret, + 'redirect_uri': self.redirect_uri, + 'grant_type': 'authorization_code', + }, + timeout=20, + ) + if not response.ok: + logger.error( + f'Google token exchange failed: status={response.status_code} ' + f'body={response.text} redirect_uri={self.redirect_uri!r}' + ) + response.raise_for_status() + payload = response.json() + + refresh_token = payload.get('refresh_token') + if not refresh_token: + raise RuntimeError( + 'Google did not return a refresh_token. Ensure the consent URL ' + 'requests access_type=offline and prompt=consent.' + ) + access_token = payload.get('access_token') + expires_in = payload.get('expires_in') + expires_at = ( + datetime.now(timezone.utc) + timedelta(seconds=int(expires_in)) + if expires_in + else None + ) + + email = self._fetch_user_email(access_token) if access_token else None + if not email: + raise RuntimeError('Failed to resolve Google account email.') + + return TokenBundle( + refresh_token=refresh_token, + access_token=access_token, + expires_at=expires_at, + scopes=payload.get('scope'), + external_account_id=email, + ) + + def refresh_access_token(self, refresh_token: str) -> TokenBundle: + response = requests.post( + GOOGLE_TOKEN_URL, + data={ + 'refresh_token': refresh_token, + 'client_id': self.client_id, + 'client_secret': self.client_secret, + 'grant_type': 'refresh_token', + }, + timeout=20, + ) + response.raise_for_status() + payload = response.json() + access_token = payload['access_token'] + expires_in = payload.get('expires_in') + expires_at = ( + datetime.now(timezone.utc) + timedelta(seconds=int(expires_in)) + if expires_in + else None + ) + return TokenBundle( + refresh_token=refresh_token, + access_token=access_token, + expires_at=expires_at, + scopes=payload.get('scope'), + external_account_id='', + ) + + def _fetch_user_email(self, access_token: str) -> Optional[str]: + response = requests.get( + GOOGLE_USERINFO_URL, + headers={'Authorization': f'Bearer {access_token}'}, + timeout=20, + ) + response.raise_for_status() + return response.json().get('email') diff --git a/wavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_provider.py b/wavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_provider.py new file mode 100644 index 00000000..29ec1b9c --- /dev/null +++ b/wavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_provider.py @@ -0,0 +1,484 @@ +import asyncio +import base64 +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional, Tuple + +from bs4 import BeautifulSoup +from common_module.log.logger import logger +from google.api_core import exceptions as google_exceptions +from google.cloud import pubsub_v1 +from google.oauth2.credentials import Credentials +from googleapiclient.discovery import build + +from triggers_module.providers.base import ( + Attachment, + NormalizedEmailEvent, + TokenBundle, + TriggerProvider, +) +from triggers_module.providers.gmail.gmail_oauth import GmailOAuthClient + + +DEFAULT_GMAIL_SCOPES: List[str] = [ + 'https://www.googleapis.com/auth/gmail.readonly', + 'https://www.googleapis.com/auth/userinfo.email', +] + + +class GmailProvider(TriggerProvider): + provider_type = 'gmail' + requires_oauth = True + + def __init__( + self, + oauth_client: GmailOAuthClient, + pubsub_project_id: str, + pubsub_topic_prefix: str = 'agentic-trigger', + push_endpoint_template: Optional[str] = None, + oidc_service_account_email: Optional[str] = None, + ): + self._oauth = oauth_client + self._pubsub_project_id = pubsub_project_id + self._pubsub_topic_prefix = pubsub_topic_prefix + self._push_endpoint_template = push_endpoint_template + self._oidc_service_account_email = ( + oidc_service_account_email.strip() if oidc_service_account_email else None + ) or None + self._publisher_client: Optional[pubsub_v1.PublisherClient] = None + self._subscriber_client: Optional[pubsub_v1.SubscriberClient] = None + + def _publisher(self) -> pubsub_v1.PublisherClient: + if self._publisher_client is None: + self._publisher_client = pubsub_v1.PublisherClient() + return self._publisher_client + + def _subscriber(self) -> pubsub_v1.SubscriberClient: + if self._subscriber_client is None: + self._subscriber_client = pubsub_v1.SubscriberClient() + return self._subscriber_client + + # ---- OAuth ---------------------------------------------------------- + + def build_consent_url( + self, trigger_id: str, scopes: Optional[List[str]] = None + ) -> str: + return self._oauth.build_consent_url( + state=trigger_id, scopes=scopes or DEFAULT_GMAIL_SCOPES + ) + + async def exchange_oauth_code(self, code: str, trigger_id: str) -> TokenBundle: + return await asyncio.to_thread(self._oauth.exchange_code, code) + + async def refresh_access_token(self, refresh_token: str) -> TokenBundle: + return await asyncio.to_thread(self._oauth.refresh_access_token, refresh_token) + + # ---- Subscription lifecycle ---------------------------------------- + + def _topic_name(self, trigger_id: str) -> str: + return f'{self._pubsub_topic_prefix}-{trigger_id}' + + def _topic_path(self, trigger_id: str) -> str: + return ( + f'projects/{self._pubsub_project_id}/topics/{self._topic_name(trigger_id)}' + ) + + def _subscription_path(self, trigger_id: str) -> str: + sub_name = f'{self._pubsub_topic_prefix}-sub-{trigger_id}' + return f'projects/{self._pubsub_project_id}/subscriptions/{sub_name}' + + def _push_endpoint(self, trigger_id: str, agentic_id: str) -> Optional[str]: + if not self._push_endpoint_template: + return None + return self._push_endpoint_template.format( + trigger_id=trigger_id, agentic_id=agentic_id + ) + + async def start_subscription( + self, + trigger_id: str, + access_token: str, + external_account_id: str, + agentic_id: Optional[str] = None, + ) -> Dict[str, Any]: + topic_path = self._topic_path(trigger_id) + subscription_path = self._subscription_path(trigger_id) + push_endpoint = ( + self._push_endpoint(trigger_id, agentic_id) if agentic_id else None + ) + + await asyncio.to_thread( + self._ensure_topic_and_subscription, + topic_path, + subscription_path, + push_endpoint, + ) + + history_id, watch_expiration = await asyncio.to_thread( + self._call_users_watch, + access_token, + external_account_id, + topic_path, + ) + + return { + 'email_address': external_account_id, + 'pubsub_topic': topic_path, + 'pubsub_subscription': subscription_path, + 'push_endpoint': push_endpoint, + 'oidc_audience': push_endpoint + if self._oidc_service_account_email + else None, + 'history_id': history_id, + 'watch_expiration': watch_expiration.isoformat() + if watch_expiration + else None, + } + + async def stop_subscription( + self, + provider_config: Dict[str, Any], + access_token: str, + external_account_id: str, + ) -> None: + await asyncio.to_thread( + self._call_users_stop, access_token, external_account_id + ) + + topic_path = provider_config.get('pubsub_topic') + subscription_path = provider_config.get('pubsub_subscription') + if subscription_path: + await asyncio.to_thread(self._delete_subscription, subscription_path) + if topic_path: + await asyncio.to_thread(self._delete_topic, topic_path) + + async def renew_subscription( + self, + provider_config: Dict[str, Any], + access_token: str, + external_account_id: str, + ) -> Dict[str, Any]: + topic_path = provider_config['pubsub_topic'] + history_id, watch_expiration = await asyncio.to_thread( + self._call_users_watch, + access_token, + external_account_id, + topic_path, + ) + updated = dict(provider_config) + updated['history_id'] = history_id + updated['watch_expiration'] = ( + watch_expiration.isoformat() if watch_expiration else None + ) + return updated + + # ---- Event fetch / normalize --------------------------------------- + + def extract_push_cursor(self, raw_push_payload): + try: + _, history_id = self._decode_push_payload(raw_push_payload) + return history_id + except Exception: + return None + + async def fetch_events( + self, + access_token: str, + provider_config: Dict[str, Any], + raw_push_payload: Dict[str, Any], + ) -> List[NormalizedEmailEvent]: + email_address, push_history_id = self._decode_push_payload(raw_push_payload) + if email_address != provider_config.get('email_address'): + logger.warning( + f'Pub/Sub push email {email_address!r} does not match trigger ' + f'config {provider_config.get("email_address")!r}; ignoring' + ) + return [] + + start_history_id = int(provider_config.get('history_id') or push_history_id) + return await asyncio.to_thread( + self._fetch_and_normalize_messages, + access_token, + email_address, + start_history_id, + ) + + # ---- Internals: Pub/Sub -------------------------------------------- + + def _ensure_topic_and_subscription( + self, + topic_path: str, + subscription_path: str, + push_endpoint: Optional[str], + ) -> None: + publisher = self._publisher() + + # Create-or-ignore beats get-then-create: one round-trip in the steady + # state, and avoids gRPC retries on NotFound. + try: + publisher.create_topic(request={'name': topic_path}, timeout=30) + except google_exceptions.AlreadyExists: + pass + + gmail_service_account = 'gmail-api-push@system.gserviceaccount.com' + wants_role = 'roles/pubsub.publisher' + wants_member = f'serviceAccount:{gmail_service_account}' + try: + policy = publisher.get_iam_policy( + request={'resource': topic_path}, timeout=30 + ) + binding = next((b for b in policy.bindings if b.role == wants_role), None) + mutated = False + if binding is None: + policy.bindings.add(role=wants_role, members=[wants_member]) + mutated = True + elif wants_member not in binding.members: + binding.members.append(wants_member) + mutated = True + if mutated: + publisher.set_iam_policy( + request={'resource': topic_path, 'policy': policy}, + timeout=30, + ) + except google_exceptions.PermissionDenied as exc: + logger.error( + f'Permission denied setting IAM policy on {topic_path} ' + f'(role={wants_role}, member={wants_member}). ' + f'Gmail will not be able to publish to this topic: {exc}' + ) + raise + except google_exceptions.GoogleAPIError as exc: + logger.error( + f'Failed to set IAM policy on {topic_path} ' + f'(role={wants_role}, member={wants_member}): {exc}' + ) + raise + + subscriber = self._subscriber() + request: Dict[str, Any] = { + 'name': subscription_path, + 'topic': topic_path, + 'ack_deadline_seconds': 60, + 'message_retention_duration': {'seconds': 86400}, + 'retry_policy': pubsub_v1.types.RetryPolicy( + minimum_backoff={'seconds': 10}, + maximum_backoff={'seconds': 600}, + ), + } + if push_endpoint: + push_config_kwargs: Dict[str, Any] = {'push_endpoint': push_endpoint} + if self._oidc_service_account_email: + push_config_kwargs['oidc_token'] = pubsub_v1.types.PushConfig.OidcToken( + service_account_email=self._oidc_service_account_email, + audience=push_endpoint, + ) + request['push_config'] = pubsub_v1.types.PushConfig(**push_config_kwargs) + try: + subscriber.create_subscription(request=request, timeout=30) + except google_exceptions.AlreadyExists: + pass + + def _delete_subscription(self, subscription_path: str) -> None: + try: + self._subscriber().delete_subscription( + request={'subscription': subscription_path}, timeout=30 + ) + except Exception as exc: + logger.warning(f'Failed to delete subscription {subscription_path}: {exc}') + + def _delete_topic(self, topic_path: str) -> None: + try: + self._publisher().delete_topic(request={'topic': topic_path}, timeout=30) + except Exception as exc: + logger.warning(f'Failed to delete topic {topic_path}: {exc}') + + # ---- Internals: Gmail API ------------------------------------------ + + def _gmail_service(self, access_token: str): + credentials = Credentials(token=access_token) + return build('gmail', 'v1', credentials=credentials, cache_discovery=False) + + def _call_users_watch( + self, + access_token: str, + email_address: str, + topic_path: str, + ) -> Tuple[int, Optional[datetime]]: + service = self._gmail_service(access_token) + body = { + 'topicName': topic_path, + 'labelIds': ['INBOX'], + 'labelFilterAction': 'include', + } + response = service.users().watch(userId=email_address, body=body).execute() + history_id = int(response['historyId']) + expiration_ms = response.get('expiration') + expiration = ( + datetime.fromtimestamp(int(expiration_ms) / 1000, tz=timezone.utc) + if expiration_ms + else datetime.now(timezone.utc) + timedelta(days=7) + ) + return history_id, expiration + + def _call_users_stop(self, access_token: str, email_address: str) -> None: + service = self._gmail_service(access_token) + try: + service.users().stop(userId=email_address).execute() + except Exception as exc: + logger.warning(f'users.stop for {email_address} failed: {exc}') + + def _decode_push_payload(self, raw_push_payload: Dict[str, Any]) -> Tuple[str, int]: + message = raw_push_payload.get('message') or {} + data_b64 = message.get('data') + if not data_b64: + raise ValueError('Pub/Sub push has no message.data') + decoded = json.loads(base64.b64decode(data_b64).decode('utf-8')) + return decoded['emailAddress'], int(decoded['historyId']) + + def _fetch_and_normalize_messages( + self, + access_token: str, + email_address: str, + start_history_id: int, + ) -> List[NormalizedEmailEvent]: + service = self._gmail_service(access_token) + message_ids = self._list_new_message_ids( + service, email_address, start_history_id + ) + + events: List[NormalizedEmailEvent] = [] + for message_id in message_ids: + try: + events.append( + self._fetch_single_message(service, email_address, message_id) + ) + except Exception as exc: + logger.warning(f'Failed to fetch Gmail message {message_id}: {exc}') + + return events + + def _list_new_message_ids( + self, service, email_address: str, start_history_id: int + ) -> List[str]: + message_ids: List[str] = [] + page_token: Optional[str] = None + while True: + request_kwargs: Dict[str, Any] = { + 'userId': email_address, + 'startHistoryId': str(start_history_id), + 'historyTypes': ['messageAdded'], + 'labelId': 'INBOX', + } + if page_token: + request_kwargs['pageToken'] = page_token + response = service.users().history().list(**request_kwargs).execute() + + for history_entry in response.get('history', []): + for added in history_entry.get('messagesAdded', []): + msg = added.get('message') or {} + msg_id = msg.get('id') + if msg_id and msg_id not in message_ids: + message_ids.append(msg_id) + + page_token = response.get('nextPageToken') + if not page_token: + break + + return message_ids + + def _fetch_single_message( + self, service, email_address: str, message_id: str + ) -> NormalizedEmailEvent: + msg = ( + service.users() + .messages() + .get(userId=email_address, id=message_id, format='full') + .execute() + ) + headers = { + h['name'].lower(): h['value'] + for h in msg.get('payload', {}).get('headers', []) + } + subject = headers.get('subject', '') + sender = headers.get('from') + + body_text, attachments_meta = self._walk_parts(msg.get('payload', {})) + attachments = [ + Attachment( + file_name=meta['filename'], + mime_type=meta['mime_type'], + content_bytes=self._download_attachment( + service, email_address, message_id, meta['attachment_id'] + ), + ) + for meta in attachments_meta + ] + + return NormalizedEmailEvent( + provider_event_id=message_id, + subject=subject, + sender=sender, + body_text=body_text, + attachments=attachments, + ) + + def _walk_parts(self, payload: Dict[str, Any]) -> Tuple[str, List[Dict[str, Any]]]: + text_parts: List[str] = [] + html_parts: List[str] = [] + attachments: List[Dict[str, Any]] = [] + + def _walk(part: Dict[str, Any]) -> None: + mime_type = part.get('mimeType', '') + filename = part.get('filename') or '' + body = part.get('body') or {} + + if part.get('parts'): + for child in part['parts']: + _walk(child) + return + + if filename and body.get('attachmentId'): + attachments.append( + { + 'filename': filename, + 'mime_type': mime_type, + 'attachment_id': body['attachmentId'], + } + ) + return + + data_b64 = body.get('data') + if not data_b64: + return + decoded = base64.urlsafe_b64decode(data_b64.encode('utf-8')) + try: + text = decoded.decode('utf-8', errors='replace') + except Exception: + return + + if mime_type == 'text/plain': + text_parts.append(text) + elif mime_type == 'text/html': + html_parts.append(text) + + _walk(payload) + + if text_parts: + return '\n'.join(text_parts).strip(), attachments + if html_parts: + soup = BeautifulSoup('\n'.join(html_parts), 'html.parser') + return soup.get_text(separator='\n').strip(), attachments + return '', attachments + + def _download_attachment( + self, service, email_address: str, message_id: str, attachment_id: str + ) -> bytes: + response = ( + service.users() + .messages() + .attachments() + .get(userId=email_address, messageId=message_id, id=attachment_id) + .execute() + ) + data_b64 = response.get('data') or '' + return base64.urlsafe_b64decode(data_b64.encode('utf-8')) diff --git a/wavefront/server/modules/triggers_module/triggers_module/providers/gmail/pubsub_signature.py b/wavefront/server/modules/triggers_module/triggers_module/providers/gmail/pubsub_signature.py new file mode 100644 index 00000000..8df7171e --- /dev/null +++ b/wavefront/server/modules/triggers_module/triggers_module/providers/gmail/pubsub_signature.py @@ -0,0 +1,52 @@ +from typing import Optional + +from common_module.log.logger import logger +from google.auth.transport import requests as google_requests +from google.oauth2 import id_token + + +class PubSubSignatureError(Exception): + pass + + +class PubSubPushVerifier: + """Verifies the OIDC JWT that Google Pub/Sub attaches to push requests. + + Pub/Sub only signs pushes when the subscription is created with an + `oidc_token` PushConfig. If a subscription was created without that, no + Authorization header is sent and this verifier should be skipped at the + caller (TriggerPushReceiver only calls verify when the trigger's + provider_config records that OIDC was configured). + """ + + def __init__(self): + self._request = google_requests.Request() + + def verify( + self, + authorization_header: Optional[str], + expected_audience: Optional[str] = None, + ) -> dict: + if not authorization_header or not authorization_header.lower().startswith( + 'bearer ' + ): + raise PubSubSignatureError( + 'Missing or malformed Authorization header on Pub/Sub push' + ) + + token = authorization_header.split(' ', 1)[1].strip() + try: + claims = id_token.verify_oauth2_token( + token, + self._request, + audience=expected_audience, + ) + except Exception as exc: + logger.warning(f'Pub/Sub push signature verification failed: {exc}') + raise PubSubSignatureError(str(exc)) from exc + + issuer = claims.get('iss') + if issuer not in ('https://accounts.google.com', 'accounts.google.com'): + raise PubSubSignatureError(f'Unexpected JWT issuer: {issuer}') + + return claims diff --git a/wavefront/server/modules/triggers_module/triggers_module/providers/registry.py b/wavefront/server/modules/triggers_module/triggers_module/providers/registry.py new file mode 100644 index 00000000..0cda4c02 --- /dev/null +++ b/wavefront/server/modules/triggers_module/triggers_module/providers/registry.py @@ -0,0 +1,28 @@ +from typing import Dict + +from .base import TriggerProvider + + +class UnsupportedTriggerProvider(Exception): + pass + + +class TriggerProviderRegistry: + def __init__(self) -> None: + self._providers: Dict[str, TriggerProvider] = {} + + def register(self, provider: TriggerProvider) -> None: + if not provider.provider_type: + raise ValueError('provider.provider_type must be set') + self._providers[provider.provider_type] = provider + + def get(self, provider_type: str) -> TriggerProvider: + try: + return self._providers[provider_type] + except KeyError: + raise UnsupportedTriggerProvider( + f'No trigger provider registered for type: {provider_type}' + ) + + def has(self, provider_type: str) -> bool: + return provider_type in self._providers diff --git a/wavefront/server/modules/triggers_module/triggers_module/services/trigger_crud_service.py b/wavefront/server/modules/triggers_module/triggers_module/services/trigger_crud_service.py new file mode 100644 index 00000000..f6726658 --- /dev/null +++ b/wavefront/server/modules/triggers_module/triggers_module/services/trigger_crud_service.py @@ -0,0 +1,331 @@ +from datetime import datetime, timezone +from typing import List, Optional +from uuid import UUID + +from common_module.log.logger import logger +from db_repo_module.models.agent import Agent +from db_repo_module.models.agentic_trigger import AgenticTrigger +from db_repo_module.models.agentic_trigger_credential import AgenticTriggerCredential +from db_repo_module.models.workflow import Workflow +from db_repo_module.repositories.sql_alchemy_repository import SQLAlchemyRepository + +from triggers_module.models.trigger_schemas import ( + CreateTriggerRequest, + CreateTriggerResponse, + TriggerResponse, +) +from triggers_module.providers.base import TokenBundle, TriggerProvider +from triggers_module.providers.registry import TriggerProviderRegistry +from triggers_module.utils.token_crypto import TokenCrypto + + +class TriggerNotFound(Exception): + pass + + +class InvalidTriggerState(Exception): + pass + + +class EntityNotFound(Exception): + pass + + +class TriggerCrudService: + def __init__( + self, + trigger_repository: SQLAlchemyRepository[AgenticTrigger], + credential_repository: SQLAlchemyRepository[AgenticTriggerCredential], + agent_repository: SQLAlchemyRepository[Agent], + workflow_repository: SQLAlchemyRepository[Workflow], + provider_registry: TriggerProviderRegistry, + token_crypto: TokenCrypto, + ): + self._triggers = trigger_repository + self._credentials = credential_repository + self._agents = agent_repository + self._workflows = workflow_repository + self._registry = provider_registry + self._crypto = token_crypto + + async def create_trigger( + self, request: CreateTriggerRequest + ) -> CreateTriggerResponse: + await self._validate_entity(request.entity_type, request.entity_id) + provider = self._registry.get(request.provider) + + trigger = await self._triggers.create( + name=request.name, + provider=request.provider, + entity_type=request.entity_type, + entity_id=request.entity_id, + namespace=request.namespace, + status='pending_auth' if provider.requires_oauth else 'active', + filter_config=request.filter_config.model_dump(exclude_none=True), + provider_config=request.provider_config, + ) + + consent_url: Optional[str] = None + if provider.requires_oauth: + consent_url = provider.build_consent_url(trigger_id=str(trigger.id)) + + return CreateTriggerResponse( + trigger_id=trigger.id, + status=trigger.status, + consent_url=consent_url, + ) + + async def complete_oauth(self, state: str, code: str) -> TriggerResponse: + try: + trigger_id = UUID(state) + except ValueError as exc: + raise InvalidTriggerState(f'Invalid OAuth state: {state}') from exc + + trigger = await self._triggers.find_one(id=trigger_id) + if not trigger: + raise TriggerNotFound(f'Trigger {trigger_id} not found') + if trigger.status != 'pending_auth': + raise InvalidTriggerState( + f'Trigger {trigger_id} is in status {trigger.status!r}, ' + 'cannot complete OAuth' + ) + + provider = self._registry.get(trigger.provider) + token_bundle = await provider.exchange_oauth_code( + code=code, trigger_id=str(trigger_id) + ) + + credential = await self._upsert_credential(trigger.provider, token_bundle) + + # Link the credential to the trigger immediately so a later + # `start_subscription` failure leaves a retry-able state instead of an + # orphaned credential. + await self._triggers.find_one_and_update( + {'id': trigger_id}, credential_id=credential.id + ) + + try: + provider_config = await provider.start_subscription( + trigger_id=str(trigger_id), + access_token=token_bundle.access_token or '', + external_account_id=token_bundle.external_account_id, + agentic_id=str(trigger.entity_id), + ) + except Exception as exc: + await self._triggers.find_one_and_update( + {'id': trigger_id}, + status='error', + last_error=f'start_subscription failed: {exc}', + ) + logger.exception(f'start_subscription failed for trigger {trigger_id}') + raise + + updated = await self._triggers.find_one_and_update( + {'id': trigger_id}, + status='active', + provider_config=provider_config, + last_error=None, + refresh=True, + ) + return self._to_response(updated) + + async def _upsert_credential( + self, provider: str, token_bundle: TokenBundle + ) -> AgenticTriggerCredential: + existing = await self._credentials.find_one( + provider=provider, + external_account_id=token_bundle.external_account_id, + ) + + encrypted_refresh = self._crypto.encrypt(token_bundle.refresh_token) + encrypted_access = self._crypto.encrypt(token_bundle.access_token) + expires_at = token_bundle.expires_at + + if existing: + return await self._credentials.find_one_and_update( + {'id': existing.id}, + encrypted_refresh_token=encrypted_refresh, + encrypted_access_token=encrypted_access, + token_expires_at=expires_at, + scopes=token_bundle.scopes, + refresh=True, + ) + + return await self._credentials.create( + provider=provider, + external_account_id=token_bundle.external_account_id, + encrypted_refresh_token=encrypted_refresh, + encrypted_access_token=encrypted_access, + token_expires_at=expires_at, + scopes=token_bundle.scopes, + ) + + async def list_triggers( + self, + provider: Optional[str] = None, + namespace: Optional[str] = None, + status: Optional[str] = None, + limit: int = 100, + ) -> List[TriggerResponse]: + filters = {} + if provider: + filters['provider'] = provider + if namespace: + filters['namespace'] = namespace + if status: + filters['status'] = status + rows = await self._triggers.find(limit=limit, **filters) + return [self._to_response(row) for row in rows] + + async def get_trigger(self, trigger_id: UUID) -> TriggerResponse: + trigger = await self._triggers.find_one(id=trigger_id) + if not trigger: + raise TriggerNotFound(f'Trigger {trigger_id} not found') + return self._to_response(trigger) + + async def pause_trigger(self, trigger_id: UUID) -> TriggerResponse: + updated = await self._triggers.find_one_and_update( + {'id': trigger_id}, status='paused', refresh=True + ) + if not updated: + raise TriggerNotFound(f'Trigger {trigger_id} not found') + return self._to_response(updated) + + async def resume_trigger(self, trigger_id: UUID) -> TriggerResponse: + updated = await self._triggers.find_one_and_update( + {'id': trigger_id}, status='active', refresh=True + ) + if not updated: + raise TriggerNotFound(f'Trigger {trigger_id} not found') + return self._to_response(updated) + + async def retry_trigger(self, trigger_id: UUID) -> TriggerResponse: + """Re-runs `start_subscription` for an `error` trigger using its already- + stored credential. Use when the original OAuth completed but the upstream + subscription setup failed (e.g. transient Pub/Sub IAM issue).""" + trigger = await self._triggers.find_one(id=trigger_id) + if not trigger: + raise TriggerNotFound(f'Trigger {trigger_id} not found') + if trigger.status != 'error': + raise InvalidTriggerState( + f'Trigger {trigger_id} is in status {trigger.status!r}; ' + 'retry only applies to triggers in error.' + ) + if not trigger.credential_id: + raise InvalidTriggerState( + f'Trigger {trigger_id} has no credential; cannot retry without OAuth.' + ) + + provider = self._registry.get(trigger.provider) + access_token, external_account_id = await self._fresh_access_token( + trigger.credential_id, provider + ) + + try: + provider_config = await provider.start_subscription( + trigger_id=str(trigger_id), + access_token=access_token, + external_account_id=external_account_id, + agentic_id=str(trigger.entity_id), + ) + except Exception as exc: + await self._triggers.find_one_and_update( + {'id': trigger_id}, + last_error=f'start_subscription failed: {exc}', + ) + logger.exception( + f'retry_trigger: start_subscription failed for {trigger_id}' + ) + raise + + updated = await self._triggers.find_one_and_update( + {'id': trigger_id}, + status='active', + provider_config=provider_config, + last_error=None, + refresh=True, + ) + return self._to_response(updated) + + async def delete_trigger(self, trigger_id: UUID) -> None: + trigger = await self._triggers.find_one(id=trigger_id) + if not trigger: + raise TriggerNotFound(f'Trigger {trigger_id} not found') + + provider = self._registry.get(trigger.provider) + if trigger.credential_id and trigger.provider_config: + try: + access_token, external_account_id = await self._fresh_access_token( + trigger.credential_id, provider + ) + await provider.stop_subscription( + provider_config=trigger.provider_config, + access_token=access_token, + external_account_id=external_account_id, + ) + except Exception as exc: + logger.warning( + f'stop_subscription failed for trigger {trigger_id}; ' + f'soft-deleting anyway: {exc}' + ) + + await self._triggers.find_one_and_update({'id': trigger_id}, status='deleted') + + if trigger.credential_id: + other_refs = await self._triggers.count(credential_id=trigger.credential_id) + if other_refs <= 1: + await self._credentials.delete_all(id=trigger.credential_id) + + async def _fresh_access_token( + self, credential_id: UUID, provider: TriggerProvider + ) -> tuple[str, str]: + credential = await self._credentials.find_one(id=credential_id) + if not credential: + raise InvalidTriggerState(f'Credential {credential_id} not found') + + now = datetime.now(timezone.utc) + if ( + credential.encrypted_access_token + and credential.token_expires_at + and credential.token_expires_at > now + ): + return ( + self._crypto.decrypt(credential.encrypted_access_token), + credential.external_account_id, + ) + + refresh_token = self._crypto.decrypt(credential.encrypted_refresh_token) + bundle = await provider.refresh_access_token(refresh_token) + await self._credentials.find_one_and_update( + {'id': credential_id}, + encrypted_access_token=self._crypto.encrypt(bundle.access_token), + token_expires_at=bundle.expires_at, + ) + return bundle.access_token or '', credential.external_account_id + + async def _validate_entity(self, entity_type: str, entity_id: UUID) -> None: + if entity_type == 'agent': + row = await self._agents.find_one(id=entity_id) + elif entity_type == 'workflow': + row = await self._workflows.find_one(id=entity_id) + else: + raise InvalidTriggerState(f'Unknown entity_type: {entity_type}') + if not row: + raise EntityNotFound(f'{entity_type} {entity_id} not found') + + def _to_response(self, trigger: AgenticTrigger) -> TriggerResponse: + return TriggerResponse( + id=trigger.id, + name=trigger.name, + provider=trigger.provider, + entity_type=trigger.entity_type, + entity_id=trigger.entity_id, + namespace=trigger.namespace, + status=trigger.status, + filter_config=trigger.filter_config, + provider_config=trigger.provider_config, + credential_id=trigger.credential_id, + last_error=trigger.last_error, + created_at=trigger.created_at, + updated_at=trigger.updated_at, + ) diff --git a/wavefront/server/modules/triggers_module/triggers_module/services/trigger_event_processor.py b/wavefront/server/modules/triggers_module/triggers_module/services/trigger_event_processor.py new file mode 100644 index 00000000..0c8b7d2b --- /dev/null +++ b/wavefront/server/modules/triggers_module/triggers_module/services/trigger_event_processor.py @@ -0,0 +1,256 @@ +import re +from datetime import datetime, timezone +from typing import Any, Dict, List +from uuid import UUID + +from agents_module.services.async_agentic_execution_service import ( + AsyncAgenticExecutionService, +) +from common_module.log.logger import logger +from db_repo_module.models.agentic_trigger import AgenticTrigger +from db_repo_module.models.agentic_trigger_credential import AgenticTriggerCredential +from db_repo_module.models.agentic_trigger_event import AgenticTriggerEvent +from db_repo_module.models.workflow import Workflow +from db_repo_module.repositories.sql_alchemy_repository import SQLAlchemyRepository + +from triggers_module.providers.base import ( + NormalizedEmailEvent, + TriggerProvider, +) +from triggers_module.providers.registry import TriggerProviderRegistry +from triggers_module.utils.input_builder import ( + EmailTooLargeError, + build_inference_inputs, +) +from triggers_module.utils.token_crypto import TokenCrypto + + +class TriggerEventProcessor: + """Worker-side processor: fetches Gmail messages, filters them, and feeds + matching ones into the existing v3 async-execution pipeline.""" + + def __init__( + self, + trigger_repository: SQLAlchemyRepository[AgenticTrigger], + credential_repository: SQLAlchemyRepository[AgenticTriggerCredential], + event_repository: SQLAlchemyRepository[AgenticTriggerEvent], + workflow_repository: SQLAlchemyRepository[Workflow], + provider_registry: TriggerProviderRegistry, + token_crypto: TokenCrypto, + async_execution_service: AsyncAgenticExecutionService, + ): + self._triggers = trigger_repository + self._credentials = credential_repository + self._events = event_repository + self._workflows = workflow_repository + self._registry = provider_registry + self._crypto = token_crypto + self._async_exec = async_execution_service + + async def process( + self, trigger_id: UUID, raw_payload: Dict[str, Any] + ) -> Dict[str, Any]: + trigger = await self._triggers.find_one(id=trigger_id) + if not trigger: + return {'status': 'ignored', 'reason': 'trigger_not_found'} + if trigger.status != 'active': + return {'status': 'ignored', 'reason': f'trigger_status_{trigger.status}'} + if not trigger.credential_id: + return {'status': 'ignored', 'reason': 'no_credential'} + + provider = self._registry.get(trigger.provider) + access_token, _ = await self._fresh_access_token( + trigger.credential_id, provider + ) + + events = await provider.fetch_events( + access_token=access_token, + provider_config=trigger.provider_config or {}, + raw_push_payload=raw_payload, + ) + + results: List[Dict[str, Any]] = [] + for event in events: + results.append(await self._handle_single_event(trigger, event)) + + # Layer-3: advance the stored cursor so future pushes only fetch the + # range past this one. Only move forward, never backward. + await self._advance_cursor(trigger, provider, raw_payload) + + return {'status': 'ok', 'events': results} + + async def _advance_cursor( + self, + trigger: AgenticTrigger, + provider: TriggerProvider, + raw_payload: Dict[str, Any], + ) -> None: + incoming_cursor = provider.extract_push_cursor(raw_payload) + if incoming_cursor is None: + return + current = await self._triggers.find_one(id=trigger.id) + if not current: + return + config = dict(current.provider_config or {}) + stored = config.get('history_id') + if stored is not None and int(incoming_cursor) <= int(stored): + return + config['history_id'] = int(incoming_cursor) + await self._triggers.find_one_and_update( + {'id': trigger.id}, provider_config=config + ) + + async def _handle_single_event( + self, trigger: AgenticTrigger, event: NormalizedEmailEvent + ) -> Dict[str, Any]: + existing = await self._events.find_one( + trigger_id=trigger.id, provider_event_id=event.provider_event_id + ) + if existing: + return { + 'provider_event_id': event.provider_event_id, + 'status': existing.status, + 'duplicate': True, + } + + row = await self._events.create( + trigger_id=trigger.id, + provider_event_id=event.provider_event_id, + status='received', + subject=event.subject[:1024] if event.subject else None, + ) + + filter_config = trigger.filter_config or {} + subject_regex = filter_config.get('subject_regex') + if subject_regex and not re.search(subject_regex, event.subject or ''): + await self._events.find_one_and_update( + {'id': row.id}, + status='filtered_out', + processed_at=datetime.now(timezone.utc), + ) + return { + 'provider_event_id': event.provider_event_id, + 'status': 'filtered_out', + } + + try: + inputs = build_inference_inputs( + event, + allowed_mime_types=filter_config.get('allowed_mime_types'), + ) + execution = await self._dispatch_inference(trigger, event, inputs) + await self._events.find_one_and_update( + {'id': row.id}, + status='dispatched', + execution_id=execution.execution_id, + processed_at=datetime.now(timezone.utc), + ) + return { + 'provider_event_id': event.provider_event_id, + 'status': 'dispatched', + 'execution_id': str(execution.execution_id), + } + except EmailTooLargeError as exc: + await self._events.find_one_and_update( + {'id': row.id}, + status='failed', + error=f'email_too_large: {exc}', + processed_at=datetime.now(timezone.utc), + ) + return { + 'provider_event_id': event.provider_event_id, + 'status': 'failed', + 'error': str(exc), + } + except Exception as exc: + logger.exception( + f'Failed to dispatch trigger event {event.provider_event_id}' + ) + await self._events.find_one_and_update( + {'id': row.id}, + status='failed', + error=str(exc)[:2000], + processed_at=datetime.now(timezone.utc), + ) + return { + 'provider_event_id': event.provider_event_id, + 'status': 'failed', + 'error': str(exc), + } + + async def _dispatch_inference( + self, + trigger: AgenticTrigger, + event: NormalizedEmailEvent, + inputs: List[Dict[str, Any]], + ): + variables = { + 'trigger_id': str(trigger.id), + 'trigger_name': trigger.name, + 'email_subject': event.subject or '', + 'email_from': event.sender or '', + } + + if trigger.entity_type == 'agent': + return await self._async_exec.create_and_enqueue_agent( + agent_id=trigger.entity_id, + inputs=inputs, + variables=variables, + output_json_enabled=True, + access_token=None, + app_key=None, + ) + + workflow = await self._workflows.find_one(id=trigger.entity_id) + if not workflow: + raise RuntimeError(f'Workflow {trigger.entity_id} not found') + return await self._async_exec.create_and_enqueue_workflow( + workflow_id=workflow.id, + workflow_name=workflow.name, + namespace=workflow.namespace, + inputs=inputs, + variables=variables, + output_json_enabled=False, + access_token=None, + app_key=None, + ) + + async def _fresh_access_token( + self, credential_id: UUID, provider: TriggerProvider + ) -> tuple[str, str]: + credential = await self._credentials.find_one(id=credential_id) + if not credential: + raise RuntimeError(f'Credential {credential_id} not found') + + now = datetime.now(timezone.utc) + if ( + credential.encrypted_access_token + and credential.token_expires_at + and credential.token_expires_at > now + ): + access_token = self._crypto.decrypt(credential.encrypted_access_token) + if not access_token: + raise RuntimeError( + f'Credential {credential_id} (account={credential.external_account_id}) ' + f'has invalid/undecryptable access_token' + ) + return (access_token, credential.external_account_id) + + refresh_token = self._crypto.decrypt(credential.encrypted_refresh_token) + if not refresh_token: + raise RuntimeError( + f'Credential {credential_id} (account={credential.external_account_id}) ' + f'has invalid/undecryptable refresh_token' + ) + bundle = await provider.refresh_access_token(refresh_token) + if not bundle.access_token: + raise RuntimeError( + f'Credential {credential_id} (account={credential.external_account_id}) ' + f'refresh returned empty access_token' + ) + await self._credentials.find_one_and_update( + {'id': credential_id}, + encrypted_access_token=self._crypto.encrypt(bundle.access_token), + token_expires_at=bundle.expires_at, + ) + return bundle.access_token, credential.external_account_id diff --git a/wavefront/server/modules/triggers_module/triggers_module/services/trigger_push_receiver.py b/wavefront/server/modules/triggers_module/triggers_module/services/trigger_push_receiver.py new file mode 100644 index 00000000..4e82419d --- /dev/null +++ b/wavefront/server/modules/triggers_module/triggers_module/services/trigger_push_receiver.py @@ -0,0 +1,113 @@ +import uuid +from typing import Any, Dict, Optional +from uuid import UUID + +from common_module.log.logger import logger +from db_repo_module.models.agentic_trigger import AgenticTrigger +from db_repo_module.repositories.sql_alchemy_repository import SQLAlchemyRepository + +from agents_module.utils.celery_client import get_celery_client +from triggers_module.providers.gmail.pubsub_signature import ( + PubSubPushVerifier, + PubSubSignatureError, +) +from triggers_module.providers.registry import TriggerProviderRegistry + + +_TRIGGER_EVENT_TASK_NAME = ( + 'celery_worker.tasks.trigger_event_task.process_trigger_event_task' +) + + +class TriggerMismatch(Exception): + pass + + +class TriggerPushReceiver: + """Floware-side handler for `POST /triggers/{trigger_id}/{agentic_id}/invoke`. + + Verifies the upstream provider's signature, short-circuits stale pushes + using the provider's cursor, then enqueues a Celery task to do the heavy + lifting (Gmail history list, message fetch, attachment download, regex + filter, v3 dispatch). Returns fast; no Gmail I/O happens here. Per-message + idempotency lives in the Celery task via the unique + `(trigger_id, provider_event_id)` constraint on `agentic_trigger_events`. + """ + + def __init__( + self, + trigger_repository: SQLAlchemyRepository[AgenticTrigger], + pubsub_verifier: PubSubPushVerifier, + provider_registry: TriggerProviderRegistry, + ): + self._triggers = trigger_repository + self._verifier = pubsub_verifier + self._registry = provider_registry + + async def handle_push( + self, + trigger_id: UUID, + agentic_id: UUID, + raw_payload: Dict[str, Any], + authorization_header: Optional[str], + ) -> Dict[str, Any]: + trigger = await self._triggers.find_one(id=trigger_id) + if not trigger: + return {'status': 'ignored', 'reason': 'trigger_not_found'} + + if trigger.status != 'active': + return {'status': 'ignored', 'reason': f'trigger_status_{trigger.status}'} + + if trigger.entity_id != agentic_id: + raise TriggerMismatch( + f'Path agentic_id {agentic_id} does not match trigger entity_id ' + f'{trigger.entity_id}' + ) + + if trigger.provider == 'gmail': + oidc_audience = (trigger.provider_config or {}).get('oidc_audience') + if not oidc_audience: + logger.warning( + f'Missing oidc_audience for gmail trigger {trigger_id}; refusing push' + ) + return {'status': 'ignored', 'reason': 'missing_oidc_audience'} + try: + self._verifier.verify( + authorization_header, expected_audience=oidc_audience + ) + except PubSubSignatureError as exc: + logger.warning( + f'Pub/Sub signature verification failed for trigger {trigger_id}: {exc}' + ) + return {'status': 'ignored', 'reason': 'invalid_signature'} + + # Layer-2 dedup: skip pushes whose cursor we've already processed. + provider = self._registry.get(trigger.provider) + incoming_cursor = provider.extract_push_cursor(raw_payload) + stored_cursor = (trigger.provider_config or {}).get('history_id') + if ( + incoming_cursor is not None + and stored_cursor is not None + and int(incoming_cursor) <= int(stored_cursor) + ): + return { + 'status': 'ignored', + 'reason': 'stale_cursor', + 'incoming_cursor': int(incoming_cursor), + 'stored_cursor': int(stored_cursor), + } + + push_message_id = (raw_payload.get('message') or {}).get('messageId') or str( + uuid.uuid4() + ) + + get_celery_client().send_task( + _TRIGGER_EVENT_TASK_NAME, + kwargs={ + 'trigger_id': str(trigger_id), + 'raw_payload': raw_payload, + 'push_message_id': push_message_id, + }, + ) + + return {'status': 'queued', 'push_message_id': push_message_id} diff --git a/wavefront/server/modules/triggers_module/triggers_module/services/trigger_subscription_renewer.py b/wavefront/server/modules/triggers_module/triggers_module/services/trigger_subscription_renewer.py new file mode 100644 index 00000000..f5f08191 --- /dev/null +++ b/wavefront/server/modules/triggers_module/triggers_module/services/trigger_subscription_renewer.py @@ -0,0 +1,134 @@ +from datetime import datetime, timedelta, timezone +from typing import Optional +from uuid import UUID + +from common_module.common_cache import CommonCache +from common_module.log.logger import logger +from db_repo_module.models.agentic_trigger import AgenticTrigger +from db_repo_module.models.agentic_trigger_credential import AgenticTriggerCredential +from db_repo_module.repositories.sql_alchemy_repository import SQLAlchemyRepository + +from triggers_module.providers.base import TriggerProvider +from triggers_module.providers.registry import TriggerProviderRegistry +from triggers_module.utils.token_crypto import TokenCrypto + + +class TriggerSubscriptionRenewer: + """Periodically renews provider subscriptions/watches that are about to + expire. Designed to be called from the floware APScheduler poller.""" + + def __init__( + self, + trigger_repository: SQLAlchemyRepository[AgenticTrigger], + credential_repository: SQLAlchemyRepository[AgenticTriggerCredential], + provider_registry: TriggerProviderRegistry, + token_crypto: TokenCrypto, + cache_manager: CommonCache, + renew_window_hours: int = 24, + ): + self._triggers = trigger_repository + self._credentials = credential_repository + self._registry = provider_registry + self._crypto = token_crypto + self._cache = cache_manager + self._renew_window = timedelta(hours=renew_window_hours) + + async def run_once(self) -> int: + lock_key = 'lock:trigger_subscription_renewer' + # Try to acquire lock with a 30-minute expiry (1800 seconds) + # using the atomic Set-if-Not-Exists (nx=True) flag + acquired = self._cache.add(lock_key, 'locked', expiry=1800, nx=True) + if not acquired: + logger.info( + 'TriggerSubscriptionRenewer: lock already held in Redis. Skipping run.' + ) + return 0 + + # Note: we intentionally do NOT release the lock on completion. + # Letting the TTL expire avoids a compare-and-delete race where a pod + # whose work outran the TTL would otherwise delete another pod's + # freshly-acquired lock. The next cron fire is 6h away, well past the + # 30-min TTL. + logger.info( + 'TriggerSubscriptionRenewer: successfully acquired Redis lock. Starting watches renewal.' + ) + renewed = 0 + active = await self._triggers.find(status='active', limit=1000) + cutoff = datetime.now(timezone.utc) + self._renew_window + + for trigger in active: + try: + expiration = self._extract_expiration(trigger.provider_config) + if expiration is None or expiration > cutoff: + continue + await self._renew_one(trigger) + renewed += 1 + except Exception as exc: + logger.warning( + f'Failed to renew subscription for trigger {trigger.id}: {exc}' + ) + await self._triggers.find_one_and_update( + {'id': trigger.id}, + last_error=f'renew failed: {exc}', + ) + return renewed + + async def _renew_one(self, trigger: AgenticTrigger) -> None: + if not trigger.credential_id or not trigger.provider_config: + return + provider = self._registry.get(trigger.provider) + access_token, external_account_id = await self._fresh_access_token( + trigger.credential_id, provider + ) + updated_config = await provider.renew_subscription( + provider_config=trigger.provider_config, + access_token=access_token, + external_account_id=external_account_id, + ) + await self._triggers.find_one_and_update( + {'id': trigger.id}, + provider_config=updated_config, + last_error=None, + ) + + @staticmethod + def _extract_expiration(provider_config: Optional[dict]) -> Optional[datetime]: + if not provider_config: + return None + raw = provider_config.get('watch_expiration') + if not raw: + return None + try: + dt = datetime.fromisoformat(raw) + except Exception: + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + + async def _fresh_access_token( + self, credential_id: UUID, provider: TriggerProvider + ) -> tuple[str, str]: + credential = await self._credentials.find_one(id=credential_id) + if not credential: + raise RuntimeError(f'Credential {credential_id} not found') + + now = datetime.now(timezone.utc) + if ( + credential.encrypted_access_token + and credential.token_expires_at + and credential.token_expires_at > now + ): + return ( + self._crypto.decrypt(credential.encrypted_access_token) or '', + credential.external_account_id, + ) + + refresh_token = self._crypto.decrypt(credential.encrypted_refresh_token) + bundle = await provider.refresh_access_token(refresh_token or '') + await self._credentials.find_one_and_update( + {'id': credential_id}, + encrypted_access_token=self._crypto.encrypt(bundle.access_token), + token_expires_at=bundle.expires_at, + ) + return bundle.access_token or '', credential.external_account_id diff --git a/wavefront/server/modules/triggers_module/triggers_module/triggers_container.py b/wavefront/server/modules/triggers_module/triggers_module/triggers_container.py new file mode 100644 index 00000000..b2763896 --- /dev/null +++ b/wavefront/server/modules/triggers_module/triggers_module/triggers_container.py @@ -0,0 +1,99 @@ +from dependency_injector import containers, providers +from flo_cloud.kms import FloKmsService + +from triggers_module.providers.gmail.gmail_oauth import GmailOAuthClient +from triggers_module.providers.gmail.gmail_provider import GmailProvider +from triggers_module.providers.gmail.pubsub_signature import PubSubPushVerifier +from triggers_module.providers.registry import TriggerProviderRegistry +from triggers_module.services.trigger_crud_service import TriggerCrudService +from triggers_module.services.trigger_event_processor import TriggerEventProcessor +from triggers_module.services.trigger_push_receiver import TriggerPushReceiver +from triggers_module.services.trigger_subscription_renewer import ( + TriggerSubscriptionRenewer, +) +from triggers_module.utils.token_crypto import TokenCrypto + + +def _build_registry(gmail_provider: GmailProvider) -> TriggerProviderRegistry: + registry = TriggerProviderRegistry() + registry.register(gmail_provider) + return registry + + +class TriggersContainer(containers.DeclarativeContainer): + config = providers.Configuration(ini_files=['config.ini']) + + trigger_repository = providers.Dependency() + credential_repository = providers.Dependency() + event_repository = providers.Dependency() + agent_repository = providers.Dependency() + workflow_repository = providers.Dependency() + + async_agentic_execution_service = providers.Dependency() + cache_manager = providers.Dependency() + + kms_service = providers.Singleton( + FloKmsService, + cloud_provider=config.cloud_config.cloud_provider, + ) + + token_crypto = providers.Singleton(TokenCrypto, kms_service=kms_service) + + gmail_oauth_client = providers.Singleton( + GmailOAuthClient, + client_id=config.triggers_gmail.client_id, + client_secret=config.triggers_gmail.client_secret, + redirect_uri=config.triggers_gmail.redirect_uri, + ) + + gmail_pubsub_verifier = providers.Singleton(PubSubPushVerifier) + + gmail_provider = providers.Singleton( + GmailProvider, + oauth_client=gmail_oauth_client, + pubsub_project_id=config.triggers_gmail.pubsub_project_id, + pubsub_topic_prefix=config.triggers_gmail.pubsub_topic_prefix, + push_endpoint_template=config.triggers_gmail.push_endpoint_template, + oidc_service_account_email=config.triggers_gmail.oidc_service_account_email, + ) + + trigger_provider_registry = providers.Singleton( + _build_registry, gmail_provider=gmail_provider + ) + + trigger_crud_service = providers.Singleton( + TriggerCrudService, + trigger_repository=trigger_repository, + credential_repository=credential_repository, + agent_repository=agent_repository, + workflow_repository=workflow_repository, + provider_registry=trigger_provider_registry, + token_crypto=token_crypto, + ) + + trigger_push_receiver = providers.Singleton( + TriggerPushReceiver, + trigger_repository=trigger_repository, + pubsub_verifier=gmail_pubsub_verifier, + provider_registry=trigger_provider_registry, + ) + + trigger_event_processor = providers.Singleton( + TriggerEventProcessor, + trigger_repository=trigger_repository, + credential_repository=credential_repository, + event_repository=event_repository, + workflow_repository=workflow_repository, + provider_registry=trigger_provider_registry, + token_crypto=token_crypto, + async_execution_service=async_agentic_execution_service, + ) + + trigger_subscription_renewer = providers.Singleton( + TriggerSubscriptionRenewer, + trigger_repository=trigger_repository, + credential_repository=credential_repository, + provider_registry=trigger_provider_registry, + token_crypto=token_crypto, + cache_manager=cache_manager, + ) diff --git a/wavefront/server/modules/triggers_module/triggers_module/utils/input_builder.py b/wavefront/server/modules/triggers_module/triggers_module/utils/input_builder.py new file mode 100644 index 00000000..f4f3b8a1 --- /dev/null +++ b/wavefront/server/modules/triggers_module/triggers_module/utils/input_builder.py @@ -0,0 +1,68 @@ +import base64 +from typing import Any, Dict, List, Optional, Sequence + +from common_module.log.logger import logger + +from triggers_module.providers.base import NormalizedEmailEvent + + +DEFAULT_ALLOWED_MIME_TYPES = ( + 'application/pdf', + 'text/plain', + 'text/csv', + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', + 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', + 'image/png', + 'image/jpeg', + 'image/jpg', +) + + +class EmailTooLargeError(Exception): + pass + + +def build_inference_inputs( + event: NormalizedEmailEvent, + allowed_mime_types: Optional[Sequence[str]] = None, + max_total_bytes: int = 25 * 1024 * 1024, +) -> List[Dict[str, Any]]: + """Produce the v3-inference `inputs` list for a normalized email. + + Order: each accepted attachment (base64-encoded) first, body text last. + Drops attachments outside `allowed_mime_types`. Raises EmailTooLargeError + if total attachment bytes exceed `max_total_bytes`. + """ + allowed = set(allowed_mime_types or DEFAULT_ALLOWED_MIME_TYPES) + + total_bytes = 0 + inputs: List[Dict[str, Any]] = [] + + for attachment in event.attachments: + if attachment.mime_type not in allowed: + logger.debug( + f'Skipping attachment {attachment.file_name} ' + f'(mime_type={attachment.mime_type} not in allowlist)' + ) + continue + + total_bytes += len(attachment.content_bytes) + if total_bytes > max_total_bytes: + raise EmailTooLargeError( + f'Email attachments exceed {max_total_bytes} bytes total' + ) + + encoded = base64.b64encode(attachment.content_bytes).decode('utf-8') + inputs.append( + { + 'role': 'user', + 'content': { + 'document_base64': encoded, + 'mime_type': attachment.mime_type, + 'file_name': attachment.file_name, + }, + } + ) + + inputs.append({'role': 'user', 'content': event.body_text or ''}) + return inputs diff --git a/wavefront/server/modules/triggers_module/triggers_module/utils/token_crypto.py b/wavefront/server/modules/triggers_module/triggers_module/utils/token_crypto.py new file mode 100644 index 00000000..0f7bd46a --- /dev/null +++ b/wavefront/server/modules/triggers_module/triggers_module/utils/token_crypto.py @@ -0,0 +1,42 @@ +import base64 +from typing import Optional + +from flo_cloud.kms import FloKmsService + + +class TokenCrypto: + """Thin wrapper around `FloKmsService` for OAuth-token-at-rest encryption. + + Stores ciphertext as base64-encoded UTF-8 strings so it slots into a `Text` + DB column. Returns plaintext as a UTF-8 string. + """ + + def __init__(self, kms_service: FloKmsService): + self._kms = kms_service + + def encrypt(self, plaintext: Optional[str]) -> Optional[str]: + if plaintext is None: + return None + # return plaintext #disabling use of kms for now + + ciphertext_bytes = self._kms.encrypt(plaintext) + if not isinstance(ciphertext_bytes, (bytes, bytearray)): + raise RuntimeError( + f'KMS encrypt returned {type(ciphertext_bytes).__name__}, ' + 'expected bytes' + ) + return base64.b64encode(ciphertext_bytes).decode('utf-8') + + def decrypt(self, stored: Optional[str]) -> Optional[str]: + if stored is None: + return None + # return stored #disabling use of kms for now + + ciphertext_bytes = base64.b64decode(stored.encode('utf-8')) + plaintext_bytes = self._kms.decrypt(ciphertext_bytes) # type: ignore[arg-type] + if not isinstance(plaintext_bytes, (bytes, bytearray)): + raise RuntimeError( + f'KMS decrypt returned {type(plaintext_bytes).__name__}, ' + 'expected bytes' + ) + return plaintext_bytes.decode('utf-8') diff --git a/wavefront/server/modules/user_management_module/user_management_module/authorization/require_auth.py b/wavefront/server/modules/user_management_module/user_management_module/authorization/require_auth.py index ff7629d3..f2769733 100644 --- a/wavefront/server/modules/user_management_module/user_management_module/authorization/require_auth.py +++ b/wavefront/server/modules/user_management_module/user_management_module/authorization/require_auth.py @@ -47,6 +47,8 @@ '/floware/v1/oauth/microsoft/callback', '/floware/v1/plugin-auth/oauth/init', '/floware/v1/settings/config', + '/floware/v1/triggers/oauth/google/callback', + '/floware/v1/triggers/{trigger_id}/{agentic_id}/invoke', ] hmac_routes = os.getenv('HMAC_AUTH_ROUTES', '').split(',') @@ -381,8 +383,13 @@ async def dispatch( if authorization and authorization.startswith('Bearer '): token = authorization.split(' ')[1] - # Skip authentication for optional APIs - if request.url.path in optional_auth_apis: + # Skip authentication for optional APIs (supports {param} placeholders) + if any( + request.url.path == pattern + if '{' not in pattern + else matches_dynamic_route(request.url.path, pattern) + for pattern in optional_auth_apis + ): return await call_next(request) # For non-production environments: Check passthrough authentication globally diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py index 8dc99684..3396ab60 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py @@ -152,7 +152,6 @@ def generate_presigned_url( if hasattr(self.credentials, 'service_account_email'): service_account_email = self.credentials.service_account_email - print(f'service_account_email: {service_account_email}') if hasattr(self.credentials, 'token'): token = self.credentials.token diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/kms.py b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/kms.py index c51dc731..7ba2bc26 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/gcp/kms.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/gcp/kms.py @@ -16,6 +16,8 @@ gcp_crypto_key = os.getenv('GCP_KMS_CRYPTO_KEY') gcp_crypto_key_version = os.getenv('GCP_KMS_CRYPTO_KEY_VERSION') +gcp_enc_crypto_key = os.getenv('GCP_KMS_ENC_CRYPTO_KEY') + class GcpKMS(FloKMS): def __init__(self): @@ -41,17 +43,34 @@ def __init__(self): crypto_key_version=gcp_crypto_key_version, ) - def encrypt(self, plaintext: str) -> bytes: + # Separate symmetric key path for encryption/decryption. + # Symmetric encrypt/decrypt operates on a CryptoKey (not a version); + # GCP picks the primary version on encrypt and reads the embedded + # version metadata on decrypt. + enc_key = gcp_enc_crypto_key or gcp_crypto_key + + self.enc_key_name = self.kms_client.crypto_key_path( + project=gcp_project_id, + location=gcp_location, + key_ring=gcp_key_ring, + crypto_key=enc_key, + ) + + def encrypt(self, plaintext: bytes | str) -> bytes: + if isinstance(plaintext, str): + plaintext = plaintext.encode('utf-8') request = kms_v1.EncryptRequest( - name=self.key_name, + name=self.enc_key_name, plaintext=plaintext, ) response = self.kms_client.encrypt(request=request) return response.ciphertext - def decrypt(self, ciphertext: str) -> bytes: + def decrypt(self, ciphertext: bytes | str) -> bytes: + if isinstance(ciphertext, str): + ciphertext = ciphertext.encode('utf-8') request = kms_v1.DecryptRequest( - name=self.key_name, + name=self.enc_key_name, ciphertext=ciphertext, ) response = self.kms_client.decrypt(request=request) diff --git a/wavefront/server/uv.lock b/wavefront/server/uv.lock index e4714500..6f8ef988 100644 --- a/wavefront/server/uv.lock +++ b/wavefront/server/uv.lock @@ -40,6 +40,7 @@ members = [ "product-analysis-module", "rag-ingestion", "tools-module", + "triggers-module", "user-management-module", "voice-agents-module", "wavefront", @@ -784,6 +785,7 @@ dependencies = [ { name = "flo-utils" }, { name = "python-dotenv" }, { name = "tools-module" }, + { name = "triggers-module" }, ] [package.metadata] @@ -797,6 +799,7 @@ requires-dist = [ { name = "flo-utils", editable = "packages/flo_utils" }, { name = "python-dotenv", specifier = ">=1.1.0,<2.0.0" }, { name = "tools-module", editable = "modules/tools_module" }, + { name = "triggers-module", editable = "modules/triggers_module" }, ] [[package]] @@ -1654,6 +1657,7 @@ dependencies = [ { name = "python-dotenv" }, { name = "python-multipart" }, { name = "tools-module" }, + { name = "triggers-module" }, { name = "user-management-module" }, { name = "uvicorn" }, { name = "voice-agents-module" }, @@ -1679,6 +1683,7 @@ requires-dist = [ { name = "python-dotenv", specifier = ">=1.1.0,<2.0.0" }, { name = "python-multipart", specifier = "==0.0.9" }, { name = "tools-module", editable = "modules/tools_module" }, + { name = "triggers-module", editable = "modules/triggers_module" }, { name = "user-management-module", editable = "modules/user_management_module" }, { name = "uvicorn", specifier = ">=0.30.1,<1.0.0" }, { name = "voice-agents-module", editable = "modules/voice_agents_module" }, @@ -1876,6 +1881,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/be/8a/fe34d2f3f9470a27b01c9e76226965863f153d5fbe276f83608562e49c04/google_auth_httplib2-0.2.0-py2.py3-none-any.whl", hash = "sha256:b65a0a2123300dd71281a7bf6e64d65a0759287df52729bdd1ae2e47dc311a3d", size = 9253, upload-time = "2023-12-12T17:40:13.055Z" }, ] +[[package]] +name = "google-auth-oauthlib" +version = "1.4.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "google-auth" }, + { name = "requests-oauthlib" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/70/18/90c7fac516e63cf2058166fce0c88c353647c677b51cc036c09c49bb5cbb/google_auth_oauthlib-1.4.0.tar.gz", hash = "sha256:18b5e28880eb8eba9065c436becdc0ee8e4b59117a73a510679c82f70cd363d2", size = 21675, upload-time = "2026-05-07T08:03:47.816Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/37/d3/d7dff0d58a9e9244b48044bfb6a898bfcc8ecc42e0031d1bebc695344725/google_auth_oauthlib-1.4.0-py3-none-any.whl", hash = "sha256:251314f213a9ee46a5ae73988e84fd7cca8bb68e7ecf4bfd45940f9e7f51d070", size = 19261, upload-time = "2026-05-07T08:02:13.798Z" }, +] + [[package]] name = "google-cloud-aiplatform" version = "1.117.0" @@ -3573,6 +3591,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a2/eb/86626c1bbc2edb86323022371c39aa48df6fd8b0a1647bc274577f72e90b/nvidia_nvtx_cu12-12.8.90-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:5b17e2001cc0d751a5bc2c6ec6d26ad95913324a4adb86788c944f8ce9ba441f", size = 89954, upload-time = "2025-03-07T01:42:44.131Z" }, ] +[[package]] +name = "oauthlib" +version = "3.3.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/0b/5f/19930f824ffeb0ad4372da4812c50edbd1434f678c90c2733e1188edfc63/oauthlib-3.3.1.tar.gz", hash = "sha256:0f0f8aa759826a193cf66c12ea1af1637f87b9b4622d46e866952bb022e538c9", size = 185918, upload-time = "2025-06-19T22:48:08.269Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/be/9c/92789c596b8df838baa98fa71844d84283302f7604ed565dafe5a6b5041a/oauthlib-3.3.1-py3-none-any.whl", hash = "sha256:88119c938d2b8fb88561af5f6ee0eec8cc8d552b7bb1f712743136eb7523b7a1", size = 160065, upload-time = "2025-06-19T22:48:06.508Z" }, +] + [[package]] name = "olefile" version = "0.47" @@ -5069,6 +5096,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6", size = 64738, upload-time = "2025-08-18T20:46:00.542Z" }, ] +[[package]] +name = "requests-oauthlib" +version = "2.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "oauthlib" }, + { name = "requests" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/42/f2/05f29bc3913aea15eb670be136045bf5c5bbf4b99ecb839da9b422bb2c85/requests-oauthlib-2.0.0.tar.gz", hash = "sha256:b3dffaebd884d8cd778494369603a9e7b58d29111bf6b41bdc2dcd87203af4e9", size = 55650, upload-time = "2024-03-22T20:32:29.939Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3b/5d/63d4ae3b9daea098d5d6f5da83984853c1bbacd5dc826764b249fe119d24/requests_oauthlib-2.0.0-py2.py3-none-any.whl", hash = "sha256:7dd8a5c40426b779b0868c404bdef9768deccf22749cde15852df527e6269b36", size = 24179, upload-time = "2024-03-22T20:32:28.055Z" }, +] + [[package]] name = "resampy" version = "0.4.3" @@ -5810,6 +5850,43 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/70/26/2591b48412bde75e33bfd292034103ffe41743cacd03120e3242516cd143/transformers-4.56.2-py3-none-any.whl", hash = "sha256:79c03d0e85b26cb573c109ff9eafa96f3c8d4febfd8a0774e8bba32702dd6dde", size = 11608055, upload-time = "2025-09-19T15:16:23.736Z" }, ] +[[package]] +name = "triggers-module" +version = "0.1.0" +source = { editable = "modules/triggers_module" } +dependencies = [ + { name = "agents-module" }, + { name = "beautifulsoup4" }, + { name = "common-module" }, + { name = "db-repo-module" }, + { name = "dependency-injector" }, + { name = "fastapi" }, + { name = "flo-cloud" }, + { name = "flo-utils" }, + { name = "google-api-python-client" }, + { name = "google-auth" }, + { name = "google-auth-oauthlib" }, + { name = "google-cloud-pubsub" }, + { name = "pydantic" }, +] + +[package.metadata] +requires-dist = [ + { name = "agents-module", editable = "modules/agents_module" }, + { name = "beautifulsoup4", specifier = ">=4.8.0,<4.9.dev0" }, + { name = "common-module", editable = "modules/common_module" }, + { name = "db-repo-module", editable = "modules/db_repo_module" }, + { name = "dependency-injector", specifier = ">=4.41.0" }, + { name = "fastapi", specifier = ">=0.110.0" }, + { name = "flo-cloud", editable = "packages/flo_cloud" }, + { name = "flo-utils", editable = "packages/flo_utils" }, + { name = "google-api-python-client", specifier = ">=2.130.0" }, + { name = "google-auth", specifier = ">=2.30.0" }, + { name = "google-auth-oauthlib", specifier = ">=1.2.0" }, + { name = "google-cloud-pubsub", specifier = ">=2.21.0" }, + { name = "pydantic", specifier = ">=2.0.0" }, +] + [[package]] name = "triton" version = "3.4.0"