From df7241004a9065c2de765cf8626d360b8e675685 Mon Sep 17 00:00:00 2001 From: simo <49877847+saimouu@users.noreply.github.com> Date: Mon, 23 Feb 2026 16:01:02 +0200 Subject: [PATCH 01/18] Add celery_task_id to Job model and store the id when job is created --- ...-ec09219baf8d_add_celery_task_id_to_job.py | 32 +++++++++++++++++++ server/src/crud/job_crud.py | 11 ++++++- server/src/db/models/job.py | 1 + server/src/services/job_service.py | 4 ++- 4 files changed, 46 insertions(+), 2 deletions(-) create mode 100644 server/migrations/versions/1771854934-ec09219baf8d_add_celery_task_id_to_job.py diff --git a/server/migrations/versions/1771854934-ec09219baf8d_add_celery_task_id_to_job.py b/server/migrations/versions/1771854934-ec09219baf8d_add_celery_task_id_to_job.py new file mode 100644 index 0000000..7725e10 --- /dev/null +++ b/server/migrations/versions/1771854934-ec09219baf8d_add_celery_task_id_to_job.py @@ -0,0 +1,32 @@ +"""add_celery_task_id_to_job + +Revision ID: ec09219baf8d +Revises: a7d869b16346 +Create Date: 2026-02-23 13:55:34.097658 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'ec09219baf8d' +down_revision: Union[str, None] = 'a7d869b16346' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('job', sa.Column('celery_task_id', sa.UUID(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('job', 'celery_task_id') + # ### end Alembic commands ### diff --git a/server/src/crud/job_crud.py b/server/src/crud/job_crud.py index eeebc36..10eb525 100644 --- a/server/src/crud/job_crud.py +++ b/server/src/crud/job_crud.py @@ -1,7 +1,7 @@ from typing import List from uuid import UUID -from sqlalchemy import select +from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from src.db.models.job import Job @@ -63,6 +63,15 @@ async def fetch_job_by_uuid(self, uuid: UUID) -> JobRead: # TODO: Fix return job # type: ignore + async def update_celery_task_id(self, job_uuid: UUID, celery_task_id: UUID): + stmt = ( + update(Job) + .where(Job.uuid == job_uuid) + .values(celery_task_id=celery_task_id) + ) + await self.db.execute(stmt) + await self.db.flush() + async def create_job(self, job_data: JobCreate): stmt = select(Project).where(Project.uuid == job_data.project_uuid) result = await self.db.execute(stmt) diff --git a/server/src/db/models/job.py b/server/src/db/models/job.py index 37c0efb..83002de 100644 --- a/server/src/db/models/job.py +++ b/server/src/db/models/job.py @@ -22,6 +22,7 @@ class Job(Base, TimestampMixin): uuid: Mapped[PyUUID] = mapped_column( UUID(as_uuid=True), default=uuid.uuid4, unique=True, nullable=False ) + celery_task_id: Mapped[PyUUID] = mapped_column(UUID(as_uuid=True), nullable=True) project_id: Mapped[int] = mapped_column( ForeignKey("project.id", ondelete="CASCADE"), nullable=False ) diff --git a/server/src/services/job_service.py b/server/src/services/job_service.py index e409038..9f0bee3 100644 --- a/server/src/services/job_service.py +++ b/server/src/services/job_service.py @@ -83,7 +83,9 @@ async def create(self, job_data: JobCreate): created_at=new_job.created_at, updated_at=new_job.updated_at, ) - await self.jobtask_service.start_job_tasks(new_job.id, job_read.model_dump()) + task = await self.jobtask_service.start_job_tasks(new_job.id, job_read.model_dump()) + + await self.job_crud.update_celery_task_id(new_job.uuid, UUID(task.id)) return job_read From 196f4c30ca5176643a53e54f9249e9edf4ecce24 Mon Sep 17 00:00:00 2001 From: simo <49877847+saimouu@users.noreply.github.com> Date: Mon, 23 Feb 2026 16:41:22 +0200 Subject: [PATCH 02/18] Add base route and service logic for task cancelling --- server/src/api/controllers/job.py | 18 ++++++++++++++---- server/src/celery/tasks.py | 5 +++++ server/src/crud/job_crud.py | 5 +++++ server/src/services/job_service.py | 15 ++++++++++++++- 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/server/src/api/controllers/job.py b/server/src/api/controllers/job.py index 78e2b8a..96f32e8 100644 --- a/server/src/api/controllers/job.py +++ b/server/src/api/controllers/job.py @@ -8,10 +8,7 @@ from src.schemas.job import FewShotPromptingConfig, JobCreate, JobRead from src.schemas.project import FewShotPreferences from src.services.job_service import create_job_service -from src.services.project_service import ( - ProjectPreferences, - create_project_service, -) +from src.services.project_service import ProjectPreferences, create_project_service router = APIRouter() @@ -100,3 +97,16 @@ async def create_job( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Job creation failed: {str(e)}", ) + + +@router.post("/job/{uuid}/cancel", status_code=status.HTTP_200_OK, tags=["Job"]) +async def cancel_job(uuid: UUID, db_ctx: DBContext = Depends(get_db_ctx)): + job_service = create_job_service(db_ctx) + try: + res = await job_service.cancel_job(uuid) + return res + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to cancel job: {str(e)}", + ) from e diff --git a/server/src/celery/tasks.py b/server/src/celery/tasks.py index 83c6298..f9bc2bd 100644 --- a/server/src/celery/tasks.py +++ b/server/src/celery/tasks.py @@ -1,6 +1,7 @@ import asyncio import logging from typing import Dict +from uuid import UUID from httpx import AsyncClient, HTTPStatusError from pydantic_ai.retries import AsyncTenacityTransport, RetryConfig, wait_retry_after @@ -218,3 +219,7 @@ async def process_job( await asyncio.gather(*tasks) return {"result": "all job tasks processed"} + + +def cancel_task(task_id: UUID): + celery_app.control.revoke(str(task_id), terminate=True) diff --git a/server/src/crud/job_crud.py b/server/src/crud/job_crud.py index 10eb525..e25c9ea 100644 --- a/server/src/crud/job_crud.py +++ b/server/src/crud/job_crud.py @@ -63,6 +63,11 @@ async def fetch_job_by_uuid(self, uuid: UUID) -> JobRead: # TODO: Fix return job # type: ignore + async def fetch_celery_task_id(self, job_uuid: UUID) -> UUID | None: + stmt = select(Job.celery_task_id).where(Job.uuid == job_uuid) + result = await self.db.execute(stmt) + return result.scalar_one_or_none() + async def update_celery_task_id(self, job_uuid: UUID, celery_task_id: UUID): stmt = ( update(Job) diff --git a/server/src/services/job_service.py b/server/src/services/job_service.py index 9f0bee3..d7301b8 100644 --- a/server/src/services/job_service.py +++ b/server/src/services/job_service.py @@ -1,6 +1,7 @@ import logging from uuid import UUID +from src.celery.tasks import cancel_task from src.crud.job_crud import JobCrud from src.db.db_context import DBContext from src.schemas.job import JobCreate, JobRead @@ -83,12 +84,24 @@ async def create(self, job_data: JobCreate): created_at=new_job.created_at, updated_at=new_job.updated_at, ) - task = await self.jobtask_service.start_job_tasks(new_job.id, job_read.model_dump()) + task = await self.jobtask_service.start_job_tasks( + new_job.id, job_read.model_dump() + ) await self.job_crud.update_celery_task_id(new_job.uuid, UUID(task.id)) return job_read + async def cancel_job(self, job_uuid: UUID): + task_id = await self.job_crud.fetch_celery_task_id(job_uuid) + + if task_id is None: + raise RuntimeError(f"No task associated with job {job_uuid}") + + cancel_task(task_id) + + return {f"task {task_id} cancelled"} + def create_job_service(db_ctx: DBContext) -> JobService: jobtask_service = create_jobtask_service(db_ctx) From a8228c6a2524dcf613c4beffbc45862a8b6a46b9 Mon Sep 17 00:00:00 2001 From: simo <49877847+saimouu@users.noreply.github.com> Date: Tue, 24 Feb 2026 13:56:03 +0200 Subject: [PATCH 03/18] Add task cancel button to front and fix project delete bug /project/delete was returning 204 when content was expected --- client/src/components/DropDownMenus.tsx | 2 ++ client/src/pages/ProjectPage.tsx | 37 +++++++++++++++++++++++-- client/src/services/jobService.ts | 10 +++++++ server/src/api/controllers/job.py | 4 +-- server/src/api/controllers/project.py | 2 +- 5 files changed, 49 insertions(+), 6 deletions(-) diff --git a/client/src/components/DropDownMenus.tsx b/client/src/components/DropDownMenus.tsx index 0eebcea..f94e7f8 100644 --- a/client/src/components/DropDownMenus.tsx +++ b/client/src/components/DropDownMenus.tsx @@ -4,6 +4,7 @@ import { Ellipsis } from "lucide-react"; type EllipsisItem = { label: React.ElementType; + disabled: boolean; onClick: () => void; }; @@ -47,6 +48,7 @@ export const DropdownMenuEllipsis: React.FC = ({ items }) => { as="button" onClick={item.onClick} className="block w-full px-4 py-2 text-left text-sm text-gray-700 data-focus:bg-gray-100 focus:outline-none cursor-pointer data-disabled:opacity-50" + disabled={item.disabled} >