diff --git a/frontend/app/api/mutations/useCancelTaskMutation.ts b/frontend/app/api/mutations/useCancelTaskMutation.ts index 88ea7e0c5..c18bb130c 100644 --- a/frontend/app/api/mutations/useCancelTaskMutation.ts +++ b/frontend/app/api/mutations/useCancelTaskMutation.ts @@ -3,6 +3,8 @@ import { useMutation, useQueryClient, } from "@tanstack/react-query"; +import { taskDetailQueryKey } from "@/app/api/queries/useGetTaskQuery"; +import { TASKS_QUERY_KEY } from "@/app/api/queries/useGetTasksQuery"; export interface CancelTaskRequest { taskId: string; @@ -36,12 +38,19 @@ export const useCancelTaskMutation = ( return response.json(); } + const { onSuccess, onError, onSettled, ...restOptions } = options ?? {}; + return useMutation({ mutationFn: cancelTask, - onSuccess: () => { - // Invalidate tasks query to refresh the list - queryClient.invalidateQueries({ queryKey: ["tasks"] }); + ...restOptions, + onSuccess: (data, variables, context) => { + queryClient.invalidateQueries({ queryKey: [...TASKS_QUERY_KEY] }); + queryClient.invalidateQueries({ + queryKey: taskDetailQueryKey(variables.taskId), + }); + onSuccess?.(data, variables, context); }, - ...options, + onError, + onSettled, }); }; diff --git a/frontend/app/api/mutations/useDeleteDocument.ts b/frontend/app/api/mutations/useDeleteDocument.ts index f4fd9ce41..20039ad07 100644 --- a/frontend/app/api/mutations/useDeleteDocument.ts +++ b/frontend/app/api/mutations/useDeleteDocument.ts @@ -13,15 +13,15 @@ interface DeleteDocumentResponse { message: string; } -const deleteDocument = async ( - data: DeleteDocumentRequest, -): Promise => { +export async function deleteDocumentByFilename( + filename: string, +): Promise { const response = await fetch("/api/documents/delete-by-filename", { method: "POST", headers: { "Content-Type": "application/json", }, - body: JSON.stringify(data), + body: JSON.stringify({ filename } satisfies DeleteDocumentRequest), }); if (!response.ok) { @@ -30,13 +30,14 @@ const deleteDocument = async ( } return response.json(); -}; +} export const useDeleteDocument = () => { const queryClient = useQueryClient(); return useMutation({ - mutationFn: deleteDocument, + mutationFn: ({ filename }: DeleteDocumentRequest) => + deleteDocumentByFilename(filename), onSettled: () => { // Invalidate and refetch search queries to update the UI setTimeout(() => { diff --git a/frontend/app/api/mutations/useRetryTaskMutation.ts b/frontend/app/api/mutations/useRetryTaskMutation.ts new file mode 100644 index 000000000..04f3b831f --- /dev/null +++ b/frontend/app/api/mutations/useRetryTaskMutation.ts @@ -0,0 +1,84 @@ +import { + type UseMutationOptions, + useMutation, + useQueryClient, +} from "@tanstack/react-query"; +import { taskDetailQueryKey } from "@/app/api/queries/useGetTaskQuery"; +import { TASKS_QUERY_KEY } from "@/app/api/queries/useGetTasksQuery"; + +export interface RetryTaskRequest { + taskId: string; + /** When set, only these task file paths are retried. Omit to retry all failed RETRYABLE files. */ + filePaths?: string[]; +} + +export interface RetryTaskSkippedFile { + file_path: string; + filename?: string; + reason: + | "not_retryable" + | "source_file_missing" + | "file_not_in_task" + | "not_failed" + | string; +} + +export interface RetryTaskResponse { + task_id: string; + retried: number; + skipped: RetryTaskSkippedFile[]; + status: string; + message?: string; + error?: string; +} + +export const useRetryTaskMutation = ( + options?: Omit< + UseMutationOptions, + "mutationFn" + >, +) => { + const queryClient = useQueryClient(); + + async function retryTask( + variables: RetryTaskRequest, + ): Promise { + const body = JSON.stringify( + variables.filePaths != null ? { file_paths: variables.filePaths } : {}, + ); + + const response = await fetch(`/api/tasks/${variables.taskId}/retry`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body, + }); + + const payload = (await response + .json() + .catch(() => ({}))) as RetryTaskResponse; + + if (!response.ok) { + throw new Error( + payload.message || payload.error || "Failed to retry task files", + ); + } + + return payload; + } + + const { onSuccess, onError, onSettled, ...restOptions } = options ?? {}; + + return useMutation({ + mutationFn: retryTask, + ...restOptions, + onSuccess: (data, variables, context) => { + queryClient.invalidateQueries({ queryKey: [...TASKS_QUERY_KEY] }); + queryClient.invalidateQueries({ + queryKey: taskDetailQueryKey(variables.taskId), + }); + onSuccess?.(data, variables, context); + }, + onError, + onSettled, + }); +}; diff --git a/frontend/app/api/queries/useGetTaskQuery.ts b/frontend/app/api/queries/useGetTaskQuery.ts new file mode 100644 index 000000000..e5cd2948a --- /dev/null +++ b/frontend/app/api/queries/useGetTaskQuery.ts @@ -0,0 +1,33 @@ +import { type UseQueryOptions, useQuery } from "@tanstack/react-query"; +import type { Task } from "@/app/api/queries/useGetTasksQuery"; +export const TASK_DETAIL_QUERY_KEY = ["tasks", "detail"] as const; + +export function taskDetailQueryKey(taskId: string) { + return [...TASK_DETAIL_QUERY_KEY, taskId] as const; +} + +export function useGetTaskQuery( + taskId: string | null, + options?: Omit, "queryKey" | "queryFn">, +) { + return useQuery({ + queryKey: taskId + ? taskDetailQueryKey(taskId) + : [...TASK_DETAIL_QUERY_KEY, "idle"], + queryFn: async (): Promise => { + if (!taskId) { + return null; + } + const response = await fetch(`/api/tasks/${taskId}/enhanced`); + if (response.status === 404) { + return null; + } + if (!response.ok) { + throw new Error("Failed to fetch task"); + } + return response.json() as Promise; + }, + ...options, + enabled: options?.enabled ?? !!taskId, + }); +} diff --git a/frontend/app/api/queries/useGetTasksQuery.ts b/frontend/app/api/queries/useGetTasksQuery.ts index bcdfa9d14..d871007e6 100644 --- a/frontend/app/api/queries/useGetTasksQuery.ts +++ b/frontend/app/api/queries/useGetTasksQuery.ts @@ -4,6 +4,25 @@ import { useQueryClient, } from "@tanstack/react-query"; +/** Component that failed, from GET /tasks/enhanced file metadata. */ +export type TaskFailureComponent = + | "docling" + | "openrag" + | "langflow" + | "opensearch"; + +/** Pipeline or validation step where failure occurred. */ +export type TaskFailurePhase = + | "parsing" + | "chunking" + | "embedding" + | "indexing" + | "file_validation" + | "unknown"; + +/** Who can resolve the failure (enhanced API). */ +export type TaskActionableBy = "USER_ACTIONABLE" | "RETRYABLE"; + export interface TaskFileEntry { status?: | "pending" @@ -21,7 +40,14 @@ export interface TaskFileEntry { filename?: string; embedding_model?: string; embedding_dimensions?: number; - [key: string]: unknown; + phase?: "docling" | "langflow" | "complete" | string; + docling_status?: string; + docling_task_id?: string; + /** Present on failed files when the enhanced API can classify the failure. */ + component?: TaskFailureComponent; + failure_phase?: TaskFailurePhase; + user_facing_message?: string; + actionable_by?: TaskActionableBy; } export interface Task { @@ -51,13 +77,15 @@ export interface TasksResponse { tasks: Task[]; } +export const TASKS_QUERY_KEY = ["tasks", "enhanced"] as const; + export const useGetTasksQuery = ( options?: Omit, "queryKey" | "queryFn">, ) => { const queryClient = useQueryClient(); async function getTasks(): Promise { - const response = await fetch("/api/tasks"); + const response = await fetch("/api/tasks/enhanced"); if (!response.ok) { throw new Error("Failed to fetch tasks"); @@ -69,13 +97,12 @@ export const useGetTasksQuery = ( const queryResult = useQuery( { - queryKey: ["tasks"], + queryKey: [...TASKS_QUERY_KEY], queryFn: getTasks, refetchInterval: (query) => { - // Only poll if there are tasks with pending or running status const data = query.state.data; if (!data || data.length === 0) { - return false; // Stop polling if no tasks + return false; } const hasActiveTasks = data.some( @@ -85,11 +112,11 @@ export const useGetTasksQuery = ( task.status === "processing", ); - return hasActiveTasks ? 3000 : false; // Poll every 3 seconds if active tasks exist + return hasActiveTasks ? 3000 : false; }, refetchIntervalInBackground: true, - staleTime: 0, // Always consider data stale to ensure fresh updates - gcTime: 5 * 60 * 1000, // Keep in cache for 5 minutes + staleTime: 0, + gcTime: 5 * 60 * 1000, ...options, }, queryClient, diff --git a/frontend/app/globals.css b/frontend/app/globals.css index 770721c4a..afdeda928 100644 --- a/frontend/app/globals.css +++ b/frontend/app/globals.css @@ -31,6 +31,17 @@ --placeholder-foreground: 240 5% 65%; /* App-level UI tokens (safe defaults, override per brand/theme) */ + --canvas: 0 0% 100%; + --border-subtle-background-contextual: var(--border); + /* Task dialog layout (file row indent = px + checkbox + gaps + chevron) */ + --task-dialog-width: 40rem; + --task-dialog-max-height: 90vh; + --task-dialog-file-type-width: 10rem; + --task-dialog-error-indent: 4.5rem; + --task-dialog-error-indent-cloud: 4.75rem; + --task-dialog-oss-bg: var(--background); + --task-dialog-oss-selected: var(--muted); + --z-task-dialog-menu: 100; --layered-select-bg: hsl(var(--muted) / 0.5); --chat-surface-gradient: rgba(69, 137, 255, 0.16); --chat-input-border: hsl(var(--input)); @@ -118,6 +129,10 @@ --placeholder-foreground: 240 4% 46%; /* App-level UI tokens (safe defaults, override per brand/theme) */ + --canvas: 0 0% 3.9%; /* #0A0A0A */ + --border-subtle-background-contextual: var(--border); + --task-dialog-oss-bg: 0 0% 9.02%; /* #171717 */ + --task-dialog-oss-selected: 0 0% 13.73%; /* #232323 */ --layered-select-bg: hsl(var(--muted) / 0.5); --chat-surface-gradient: rgba(69, 137, 255, 0.16); --chat-input-border: hsl(var(--input)); @@ -236,6 +251,7 @@ --layered-select-bg: rgba(131, 131, 131, 0.24); --chat-input-border: #f4f4f4; --icon-disabled: #525252; + --border-subtle-background-contextual: 0 0% 82%; /* Contextual layer (~Gray 100); use hsl(var(--layer-contextual)) */ /* Layer / contextual surfaces (IBM Gray 10 light, Gray 100 dark) */ --layer-contextual: 0 0% 96%; @@ -336,6 +352,7 @@ --icon-primary: 0 0% 96%; /* #f4f4f4*/ --placeholder: 0 0% 44%; /* #6F6F6F */ + --border-subtle-background-contextual: 0 0% 22.4%; /* #393939 */ --layer-contextual: 0 0% 15%; --layer-contextual-foreground: 0 0% 96%; --text-text-01: 0 0% 96%; diff --git a/frontend/app/knowledge/page.tsx b/frontend/app/knowledge/page.tsx index ea5f9af27..b03e2f492 100644 --- a/frontend/app/knowledge/page.tsx +++ b/frontend/app/knowledge/page.tsx @@ -12,7 +12,7 @@ import { import { AgGridReact, type CustomCellRendererProps } from "ag-grid-react"; import { AlertTriangle, Cloud, FileIcon, Globe, RefreshCw } from "lucide-react"; import { useRouter } from "next/navigation"; -import { useCallback, useEffect, useRef, useState } from "react"; +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { KnowledgeDropdown } from "@/components/knowledge-dropdown"; import { ProtectedRoute } from "@/components/protected-route"; import { Banner, BannerIcon, BannerTitle } from "@/components/ui/banner"; @@ -64,6 +64,42 @@ import { useSyncAllConnectorsPreview, } from "../api/mutations/useSyncConnector"; +function sameFileSelection(a: File[], b: File[]): boolean { + if (a.length !== b.length) { + return false; + } + const identities = new Set(b.map((row) => getKnowledgeFileIdentity(row))); + return a.every((row) => identities.has(getKnowledgeFileIdentity(row))); +} + +/** Failed overlays can stay selected after they lose their checkbox (processing → failed). */ +function syncGridSelectionToDeletableRows( + api: NonNullable["api"]>, + isDeletable: (file?: File) => boolean, +): File[] { + api.forEachNode((node) => { + if (node.isSelected() && !isDeletable(node.data)) { + node.setSelected(false); + } + }); + return api.getSelectedRows().filter(isDeletable); +} + +/** Deselect non-deletable rows in the grid only; returns whether anything changed. */ +function pruneNonDeletableGridSelection( + api: NonNullable["api"]>, + isDeletable: (file?: File) => boolean, +): boolean { + let pruned = false; + api.forEachNode((node) => { + if (node.isSelected() && !isDeletable(node.data)) { + node.setSelected(false); + pruned = true; + } + }); + return pruned; +} + /** List-files uses term filters; "*" means "any" in the UI — do not send it literally. */ function listFilesFilterParam(values?: string[]): string | undefined { const raw = values?.[0]?.trim(); @@ -327,6 +363,21 @@ function SearchPage() { return getKnowledgeFileIdentity(file); }, []); + const isDeletableKnowledgeRow = useCallback((file?: File) => { + return (file?.status || "active") === "active"; + }, []); + + const resolveDeleteFilename = useCallback( + (row: File) => { + const identity = getKnowledgeFileIdentity(row); + const indexed = effectiveData.find( + (file) => getKnowledgeFileIdentity(file) === identity, + ); + return indexed?.filename ?? row.filename; + }, + [effectiveData], + ); + const getOwnerLabel = useCallback((file?: File): string => { return file?.owner_name?.trim() || file?.owner_email?.trim() || "—"; }, []); @@ -417,6 +468,29 @@ function SearchPage() { const gridRows = fileResults; const gridRef = useRef(null); + // Re-run only when row identity/status changes, not on every list poll reference. + const gridRowsSelectionKey = useMemo( + () => + gridRows + .map( + (row) => `${getKnowledgeFileIdentity(row)}:${row.status ?? "active"}`, + ) + .join("\0"), + [gridRows], + ); + + useEffect(() => { + const api = gridRef.current?.api; + if (!api) { + return; + } + pruneNonDeletableGridSelection(api, isDeletableKnowledgeRow); + const nextSelected = api.getSelectedRows().filter(isDeletableKnowledgeRow); + setSelectedRows((current) => + sameFileSelection(current, nextSelected) ? current : nextSelected, + ); + }, [gridRowsSelectionKey, isDeletableKnowledgeRow]); + const columnDefs: ColDef[] = [ { field: "filename", @@ -436,7 +510,7 @@ function SearchPage() { return sourceA < sourceB ? -1 : 1; }, checkboxSelection: (params: CheckboxSelectionCallbackParams) => - (params?.data?.status || "active") === "active", + isDeletableKnowledgeRow(params?.data), headerCheckboxSelection: true, ...(isCloudBrand ? { flex: 2.2, minWidth: 260 } @@ -685,51 +759,81 @@ function SearchPage() { }; const onSelectionChanged = useCallback(() => { - if (gridRef.current) { - const selectedNodes = gridRef.current.api.getSelectedRows(); - setSelectedRows(selectedNodes); + if (!gridRef.current) { + return; } - }, []); + const nextSelected = syncGridSelectionToDeletableRows( + gridRef.current.api, + isDeletableKnowledgeRow, + ); + setSelectedRows((current) => + sameFileSelection(current, nextSelected) ? current : nextSelected, + ); + }, [isDeletableKnowledgeRow]); const handleBulkDelete = async () => { - if (selectedRows.length === 0) return; + const rowsToDelete = selectedRows.filter(isDeletableKnowledgeRow); + if (rowsToDelete.length === 0) return; try { - // Delete each file individually since the API expects one filename at a time - const deletePromises = selectedRows.map((row) => - deleteDocumentMutation.mutateAsync({ filename: row.filename }), + const deleteResults = await Promise.allSettled( + rowsToDelete.map((row) => + deleteDocumentMutation.mutateAsync({ + filename: resolveDeleteFilename(row), + }), + ), ); - const deleteResults = await Promise.all(deletePromises); await refreshTasks(); await queryClient.invalidateQueries({ queryKey: ["search"] }); + await queryClient.invalidateQueries({ queryKey: ["listFiles"] }); await queryClient.refetchQueries({ queryKey: ["search"] }); + await queryClient.refetchQueries({ queryKey: ["listFiles"] }); - const totalDeletedChunks = deleteResults.reduce( - (sum, result) => sum + (result.deleted_chunks || 0), - 0, + const deleted = deleteResults.filter( + ( + result, + ): result is PromiseFulfilledResult< + Awaited> + > => + result.status === "fulfilled" && + (result.value.deleted_chunks || 0) > 0, ); - const filesWithNoDeletion = deleteResults.filter( - (result) => (result.deleted_chunks || 0) === 0, + const noChunks = deleteResults.filter( + (result) => + result.status === "fulfilled" && + (result.value.deleted_chunks || 0) === 0, + ); + const failed = deleteResults.filter( + (result): result is PromiseRejectedResult => + result.status === "rejected", ); - if (totalDeletedChunks > 0) { + if (deleted.length > 0) { toast.success( - `Successfully deleted ${selectedRows.length} document${ - selectedRows.length > 1 ? "s" : "" - }`, + `Deleted ${deleted.length} document${deleted.length > 1 ? "s" : ""}`, ); - } else { + } else if (failed.length === 0) { toast.warning( - "No document chunks were deleted. Files may be owned by another context or already removed.", + "No document chunks were deleted. Files may be missing or not deletable in your current context.", ); } - if (filesWithNoDeletion.length > 0 && totalDeletedChunks > 0) { + if (noChunks.length > 0 && deleted.length > 0) { toast.warning( - `${filesWithNoDeletion.length} selected file${ - filesWithNoDeletion.length > 1 ? "s were" : " was" - } not deleted (0 chunks matched).`, + `${noChunks.length} selected file${noChunks.length > 1 ? "s had" : " had"} no matching chunks.`, + ); + } + + if (failed.length > 0) { + toast.error( + `${failed.length} document${failed.length > 1 ? "s" : ""} could not be deleted`, + { + description: + failed[0].reason instanceof Error + ? failed[0].reason.message + : undefined, + }, ); } setSelectedRows([]); @@ -919,6 +1023,7 @@ function SearchPage() { getRowId={(params: GetRowIdParams) => getFileIdentity(params.data) } + isRowSelectable={(params) => isDeletableKnowledgeRow(params.data)} domLayout="normal" onSelectionChanged={onSelectionChanged} pagination={pagination} @@ -952,6 +1057,7 @@ function SearchPage() { getRowId={(params: GetRowIdParams) => getFileIdentity(params.data) } + isRowSelectable={(params) => isDeletableKnowledgeRow(params.data)} domLayout="normal" onSelectionChanged={onSelectionChanged} pagination={pagination} diff --git a/frontend/components/task-dialog/category-chips.tsx b/frontend/components/task-dialog/category-chips.tsx new file mode 100644 index 000000000..d009c5f97 --- /dev/null +++ b/frontend/components/task-dialog/category-chips.tsx @@ -0,0 +1,69 @@ +"use client"; + +import { + ALL_TASK_STATUS_CATEGORIES, + type TaskFileStatusCategory, +} from "@/lib/task-utils"; +import { cn } from "@/lib/utils"; +import { CATEGORY_CHIPS } from "./constants"; + +interface TaskDialogCategoryChipsProps { + isCloudBrand: boolean; + counts: Record | null; + statusCategory: string; + onStatusCategoryChange: (value: string) => void; +} + +export function TaskDialogCategoryChips({ + isCloudBrand, + counts, + statusCategory, + onStatusCategoryChange, +}: TaskDialogCategoryChipsProps) { + if (!counts) return null; + + return ( +
+ {CATEGORY_CHIPS.map((chip) => { + const Icon = chip.icon; + const isActive = statusCategory === chip.id; + const count = counts[chip.id]; + + return ( + + ); + })} +
+ ); +} diff --git a/frontend/components/task-dialog/constants.ts b/frontend/components/task-dialog/constants.ts new file mode 100644 index 000000000..881f08f5d --- /dev/null +++ b/frontend/components/task-dialog/constants.ts @@ -0,0 +1,28 @@ +import { AlertCircle, CheckCircle, Clock, type LucideIcon } from "lucide-react"; +import type { TaskFileStatusCategory } from "@/lib/task-utils"; + +export const CATEGORY_CHIPS: Array<{ + id: TaskFileStatusCategory; + label: string; + icon: LucideIcon; + iconClassName: string; +}> = [ + { + id: "completed", + label: "Completed", + icon: CheckCircle, + iconClassName: "text-emerald-500", + }, + { + id: "system_error", + label: "System error", + icon: AlertCircle, + iconClassName: "text-destructive", + }, + { + id: "indexing", + label: "Indexing", + icon: Clock, + iconClassName: "text-muted-foreground", + }, +]; diff --git a/frontend/components/task-dialog/file-error-details.tsx b/frontend/components/task-dialog/file-error-details.tsx new file mode 100644 index 000000000..1416d0be2 --- /dev/null +++ b/frontend/components/task-dialog/file-error-details.tsx @@ -0,0 +1,127 @@ +"use client"; + +import { Ban, Check, Flag } from "lucide-react"; +import type { TaskFileEntry } from "@/app/api/queries/useGetTasksQuery"; +import { + analyzeTaskFileIngestionFailure, + type TaskFileIngestionFailureAnalysis, +} from "@/lib/task-error-display"; +import { cn } from "@/lib/utils"; + +interface TaskDialogFileErrorDetailsProps { + isCloudBrand: boolean; + fileInfo: TaskFileEntry; + taskError?: string; + analysis?: TaskFileIngestionFailureAnalysis; +} + +export function TaskDialogFileErrorDetails({ + isCloudBrand, + fileInfo, + taskError, + analysis: analysisProp, +}: TaskDialogFileErrorDetailsProps) { + const analysis = + analysisProp ?? analyzeTaskFileIngestionFailure(fileInfo, taskError); + + return ( +
+
+ {analysis.pipelineSteps.map((step, index) => { + const isFailed = step.status === "failed"; + const isLast = index === analysis.pipelineSteps.length - 1; + const showComponentTags = + isFailed && analysis.componentTags.length > 0; + const contentRowCount = + 1 + (isFailed ? 2 + (showComponentTags ? 1 : 0) : 0); + + return ( +
+
+ {step.status === "completed" ? ( + + ) : ( + + )} + {!isLast && ( + + )} +
+ +

+ {step.label} +

+ + {isFailed && ( + <> +

+ {analysis.resolvedError} +

+

+ {analysis.failureSummary} +

+ {showComponentTags && ( +
+ + {analysis.componentTags.map((tag) => ( + + {tag} + + ))} +
+ )} + + )} +
+ ); + })} +
+
+ ); +} diff --git a/frontend/components/task-dialog/file-list.tsx b/frontend/components/task-dialog/file-list.tsx new file mode 100644 index 000000000..d709cebe5 --- /dev/null +++ b/frontend/components/task-dialog/file-list.tsx @@ -0,0 +1,402 @@ +"use client"; + +import { ArrowUpAZ, ChevronDown, FileText } from "lucide-react"; +import { useEffect, useMemo, useState } from "react"; +import type { TaskFileEntry } from "@/app/api/queries/useGetTasksQuery"; +import type { Task } from "@/contexts/task-context"; +import { analyzeTaskFileIngestionFailure } from "@/lib/task-error-display"; +import { + getTaskFileDialogStatusLabel, + getTaskFileName, + isTaskFileFailed, + isTaskFileRetryable, + type TaskFileNameSort, +} from "@/lib/task-utils"; +import { cn } from "@/lib/utils"; +import { TaskDialogFileErrorDetails } from "./file-error-details"; + +type TaskDialogFileListTab = "task-ingestions" | "retry-ingestions"; + +interface TaskDialogFileListProps { + isCloudBrand: boolean; + task: Task; + entries: Array<[string, TaskFileEntry]>; + retryIngestionEntries: Array<[string, TaskFileEntry]>; + totalSourceCount: number; + totalSourceCountAll?: number; + nameSort: TaskFileNameSort; + onToggleNameSort: () => void; + expandedPath: string | null; + onExpandedPathChange: (path: string | null) => void; + retryIngestionCount?: number; + selectablePaths: string[]; + selectedPaths: Set; + allSelectableSelected: boolean; + onToggleSelectedPath: (filePath: string) => void; + onToggleSelectAllVisible: () => void; + allRetryIngestionsSelected: boolean; + onToggleSelectAllRetryIngestions: () => void; + selectedCount: number; + retryIngestionSelectedCount: number; + retryingTarget?: "all" | "selected" | string | null; +} + +export function TaskDialogFileList({ + isCloudBrand, + task, + entries, + retryIngestionEntries, + totalSourceCount, + totalSourceCountAll, + nameSort, + onToggleNameSort, + expandedPath, + onExpandedPathChange, + retryIngestionCount = 0, + selectablePaths, + selectedPaths, + allSelectableSelected, + onToggleSelectedPath, + onToggleSelectAllVisible, + allRetryIngestionsSelected, + onToggleSelectAllRetryIngestions, + selectedCount, + retryIngestionSelectedCount, + retryingTarget = null, +}: TaskDialogFileListProps) { + const [activeTab, setActiveTab] = + useState("task-ingestions"); + + const showRetryIngestionsTab = retryIngestionCount > 0; + + useEffect(() => { + if (!showRetryIngestionsTab && activeTab === "retry-ingestions") { + setActiveTab("task-ingestions"); + } + }, [showRetryIngestionsTab, activeTab]); + + const analysisByPath = useMemo(() => { + const map = new Map< + string, + ReturnType + >(); + const allEntries = [...entries, ...retryIngestionEntries]; + const seen = new Set(); + for (const [filePath, fileInfo] of allEntries) { + if (seen.has(filePath)) continue; + seen.add(filePath); + if (isTaskFileFailed(fileInfo)) { + map.set( + filePath, + analyzeTaskFileIngestionFailure(fileInfo, task.error), + ); + } + } + return map; + }, [entries, retryIngestionEntries, task.error]); + + const containerClass = cn( + "flex min-h-0 flex-1 flex-col overflow-hidden", + isCloudBrand + ? "rounded-md border-x border-b bg-layer-contextual" + : "border-b border-muted bg-task-dialog-oss", + ); + + const taskIngestionsTabCount = + totalSourceCountAll != null && totalSourceCountAll > totalSourceCount + ? `${totalSourceCount} of ${totalSourceCountAll}` + : String(totalSourceCount); + + const isTabActive = (tab: TaskDialogFileListTab) => activeTab === tab; + + const tabTriggerClass = (tab: TaskDialogFileListTab) => { + const isActive = isTabActive(tab); + return cn( + "inline-flex w-fit max-w-fit min-h-10 shrink-0 items-center px-4 text-sm font-medium transition-colors", + isCloudBrand + ? cn( + "rounded-none border-0 border-b-2", + isActive + ? "border-[var(--border-border-interactive)] bg-muted text-foreground" + : "border-transparent bg-transparent text-muted-foreground hover:border-[var(--border-border-interactive)]", + ) + : cn( + "border-0", + isActive + ? "rounded-none rounded-t-lg bg-task-dialog-oss-selected text-foreground" + : "rounded-none bg-transparent text-muted-foreground hover:text-foreground", + ), + ); + }; + + const listScrollClass = "min-h-0 flex-1 overflow-y-auto overscroll-contain"; + + const rowGridClass = + "grid min-h-10 grid-cols-[auto_auto_1fr_auto] items-center gap-3"; + + const renderFileRows = (listEntries: Array<[string, TaskFileEntry]>) => + listEntries.map(([filePath, fileInfo]) => { + const fileName = getTaskFileName(filePath, fileInfo); + const failed = isTaskFileFailed(fileInfo); + const analysis = analysisByPath.get(filePath); + const rowStatusLabel = failed + ? (analysis?.rowStatusLabel ?? "Failed") + : getTaskFileDialogStatusLabel(fileInfo, task.error); + const isExpanded = expandedPath === filePath; + const retryable = isTaskFileRetryable(fileInfo); + const isSelected = selectedPaths.has(filePath); + const isRowRetrying = + retryingTarget === "all" || + retryingTarget === filePath || + (retryingTarget === "selected" && isSelected); + const retryAttempts = fileInfo.retry_count ?? 0; + const statusLabel = + retryable && retryAttempts > 0 + ? `${rowStatusLabel} · Retry ${retryAttempts}` + : rowStatusLabel; + + return ( +
+
+ {retryable ? ( + onToggleSelectedPath(filePath)} + /> + ) : ( + + )} + {failed ? ( + + ) : ( + + )} + + + {statusLabel} + +
+ + {failed && isExpanded && analysis && ( + + )} +
+ ); + }); + + const renderListHeader = ({ + showSelectAll, + allSelected, + onToggleSelectAll, + selectedLabel, + }: { + showSelectAll: boolean; + allSelected: boolean; + onToggleSelectAll: () => void; + selectedLabel?: string; + }) => ( +
+ {showSelectAll ? ( + + ) : null} + + {selectedLabel ? ( + {selectedLabel} + ) : null} +
+ ); + + const renderEmptyPanel = (message: string) => ( +

+ {message} +

+ ); + + return ( +
+
+ + {showRetryIngestionsTab && ( + + )} +
+ + {isTabActive("task-ingestions") ? ( +
+ {entries.length === 0 ? ( + renderEmptyPanel("No files match your filters.") + ) : ( + <> + {renderListHeader({ + showSelectAll: selectablePaths.length > 0, + allSelected: allSelectableSelected, + onToggleSelectAll: onToggleSelectAllVisible, + selectedLabel: + selectedCount > 0 ? `${selectedCount} selected` : undefined, + })} +
+ {renderFileRows(entries)} +
+ + )} +
+ ) : ( +
+ {retryIngestionEntries.length === 0 ? ( + renderEmptyPanel("No retryable files in this task.") + ) : ( + <> + {renderListHeader({ + showSelectAll: true, + allSelected: allRetryIngestionsSelected, + onToggleSelectAll: onToggleSelectAllRetryIngestions, + selectedLabel: + retryIngestionSelectedCount > 0 + ? `${retryIngestionSelectedCount} selected` + : undefined, + })} +
+ {renderFileRows(retryIngestionEntries)} +
+ + )} +
+ )} +
+ ); +} diff --git a/frontend/components/task-dialog/filters.tsx b/frontend/components/task-dialog/filters.tsx new file mode 100644 index 000000000..a5d3b72b0 --- /dev/null +++ b/frontend/components/task-dialog/filters.tsx @@ -0,0 +1,204 @@ +"use client"; + +import { ChevronDown, ChevronUp, Search } from "lucide-react"; +import { useState } from "react"; +import { Button } from "@/components/ui/button"; +import { + DropdownMenu, + DropdownMenuContent, + DropdownMenuItem, + DropdownMenuTrigger, +} from "@/components/ui/dropdown-menu"; +import { Input } from "@/components/ui/input"; +import { ALL_TASK_FILE_TYPES, formatTaskFileTypeLabel } from "@/lib/task-utils"; +import { cn } from "@/lib/utils"; + +interface TaskDialogFiltersProps { + isCloudBrand: boolean; + search: string; + onSearchChange: (value: string) => void; + fileType: string; + onFileTypeChange: (value: string) => void; + fileTypes: string[]; + fileTypeLabel: string; + searchDisabled: boolean; + fileTypeDisabled: boolean; +} + +function FileTypeMenu({ + isCloudBrand, + fileType, + onFileTypeChange, + fileTypes, + allTypesLabel, + fileTypeLabel, + disabled, +}: { + isCloudBrand: boolean; + fileType: string; + onFileTypeChange: (value: string) => void; + fileTypes: string[]; + allTypesLabel: string; + fileTypeLabel: string; + disabled: boolean; +}) { + const [open, setOpen] = useState(false); + const options = [ + { value: ALL_TASK_FILE_TYPES, label: allTypesLabel }, + ...fileTypes.map((type) => ({ + value: type, + label: formatTaskFileTypeLabel(type), + })), + ]; + + return ( + + + {isCloudBrand ? ( + + ) : ( + + )} + + event.preventDefault()} + > + {options.map(({ value, label }) => ( + onFileTypeChange(value)} + className={cn( + "px-2", + isCloudBrand && + "bg-layer-contextual text-layer-contextual-foreground hover:bg-muted focus:bg-muted data-[highlighted]:bg-muted", + fileType === value && + "bg-muted text-foreground focus:bg-muted data-[highlighted]:bg-muted", + )} + > + {label} + + ))} + + + ); +} + +function TaskDialogSearchField({ + isCloudBrand, + search, + onSearchChange, + disabled, +}: { + isCloudBrand: boolean; + search: string; + onSearchChange: (value: string) => void; + disabled: boolean; +}) { + const input = ( + onSearchChange(e.target.value)} + disabled={disabled} + icon={} + inputClassName={ + isCloudBrand + ? "h-10 min-w-0 !rounded-none !border-0 bg-layer-contextual text-layer-contextual-foreground placeholder:text-muted-foreground focus:outline-none disabled:cursor-not-allowed disabled:opacity-50" + : "h-10 rounded-md !bg-canvas" + } + /> + ); + + return ( +
+ {input} +
+ ); +} + +export function TaskDialogFilters({ + isCloudBrand, + search, + onSearchChange, + fileType, + onFileTypeChange, + fileTypes, + fileTypeLabel, + searchDisabled, + fileTypeDisabled, +}: TaskDialogFiltersProps) { + const fileTypeMenuProps = { + isCloudBrand, + fileType, + onFileTypeChange, + fileTypes, + allTypesLabel: isCloudBrand ? "All categories" : "All file types", + fileTypeLabel, + disabled: fileTypeDisabled, + }; + + return ( +
+ + +
+ ); +} diff --git a/frontend/components/task-dialog/header.tsx b/frontend/components/task-dialog/header.tsx new file mode 100644 index 000000000..92bed7da0 --- /dev/null +++ b/frontend/components/task-dialog/header.tsx @@ -0,0 +1,101 @@ +"use client"; + +import { DialogHeader, DialogTitle } from "@/components/ui/dialog"; +import { + ALL_TASK_FILE_TYPES, + formatTaskFileTypeLabel, + type TaskFileStatusCategory, +} from "@/lib/task-utils"; +import { cn } from "@/lib/utils"; +import { TaskDialogCategoryChips } from "./category-chips"; +import { TaskDialogFilters } from "./filters"; + +interface TaskDialogHeaderProps { + isCloudBrand: boolean; + taskId: string; + search: string; + onSearchChange: (value: string) => void; + fileType: string; + onFileTypeChange: (value: string) => void; + fileTypes: string[]; + statusCategory: string; + onStatusCategoryChange: (value: string) => void; + categoryCounts: Record | null; + filtersDisabled: boolean; + fileTypeDisabled: boolean; +} + +export function TaskDialogHeader({ + isCloudBrand, + taskId, + search, + onSearchChange, + fileType, + onFileTypeChange, + fileTypes, + statusCategory, + onStatusCategoryChange, + categoryCounts, + filtersDisabled, + fileTypeDisabled, +}: TaskDialogHeaderProps) { + const allTypesLabel = isCloudBrand ? "All categories" : "All file types"; + const fileTypeLabel = + fileType === ALL_TASK_FILE_TYPES + ? allTypesLabel + : formatTaskFileTypeLabel(fileType); + + return ( +
+ + + Task {taskId} + + + +
+ + +
+
+ ); +} diff --git a/frontend/components/task-dialog/index.ts b/frontend/components/task-dialog/index.ts new file mode 100644 index 000000000..5d29db243 --- /dev/null +++ b/frontend/components/task-dialog/index.ts @@ -0,0 +1 @@ +export { default } from "./task-dialog"; diff --git a/frontend/components/task-dialog/task-dialog.tsx b/frontend/components/task-dialog/task-dialog.tsx new file mode 100644 index 000000000..9163f8926 --- /dev/null +++ b/frontend/components/task-dialog/task-dialog.tsx @@ -0,0 +1,209 @@ +"use client"; + +import { Button } from "@/components/ui/button"; +import { Dialog, DialogContent, DialogFooter } from "@/components/ui/dialog"; +import { useIsCloudBrand } from "@/contexts/brand-context"; +import { cn } from "@/lib/utils"; +import { TaskDialogFileList } from "./file-list"; +import { TaskDialogHeader } from "./header"; +import { useTaskDialog } from "./use-task-dialog"; + +interface TaskDialogProps { + open: boolean; + task_id: string; + onOpenChange: (open: boolean) => void; + onClose: () => void; +} + +function TaskDialogContent({ + open, + task_id, + onClose, +}: Pick) { + const isCloudBrand = useIsCloudBrand(); + const { + task, + isLoading, + isError, + fileEntries, + fileTypes, + categoryCounts, + sortedEntries, + retryIngestionEntries, + retryIngestionSelectedCount, + allRetryIngestionsSelected, + toggleSelectAllRetryIngestions, + search, + setSearch, + fileType, + setFileType, + statusCategory, + setStatusCategory, + expandedPath, + setExpandedPath, + nameSort, + toggleNameSort, + retryableCount, + selectedCount, + selectedPaths, + selectablePaths, + allSelectableSelected, + toggleSelectedPath, + toggleSelectAllVisible, + isRetrying, + retryingTarget, + handleRetryAll, + handleRetrySelected, + } = useTaskDialog(open, task_id); + + const filtersDisabled = !task; + // Always offer "All file types"; only disable while task data is loading. + const fileTypeDisabled = !task; + const showRetryActions = retryableCount > 0; + + return ( +
+ + +
+ {isLoading ? ( +

Loading task…

+ ) : isError || !task ? ( +

Task not found.

+ ) : fileEntries.length === 0 && retryableCount === 0 ? ( +

+ No files in this task. +

+ ) : ( + + )} +
+ + + + {showRetryActions && selectedCount > 0 ? ( + + ) : null} + {showRetryActions && selectedCount === 0 ? ( + + ) : null} + +
+ ); +} + +export default function TaskDialog({ + open, + onOpenChange, + task_id, + onClose, +}: TaskDialogProps) { + const isCloudBrand = useIsCloudBrand(); + + return ( + + + + + + ); +} diff --git a/frontend/components/task-dialog/use-task-dialog.ts b/frontend/components/task-dialog/use-task-dialog.ts new file mode 100644 index 000000000..2ee714a00 --- /dev/null +++ b/frontend/components/task-dialog/use-task-dialog.ts @@ -0,0 +1,366 @@ +"use client"; + +import { useCallback, useEffect, useMemo, useState } from "react"; +import { toast } from "sonner"; +import { + type RetryTaskResponse, + useRetryTaskMutation, +} from "@/app/api/mutations/useRetryTaskMutation"; +import { useGetTaskQuery } from "@/app/api/queries/useGetTaskQuery"; +import { useTask } from "@/contexts/task-context"; +import { + ALL_TASK_FILE_TYPES, + ALL_TASK_STATUS_CATEGORIES, + countRetryIngestionFiles, + countTaskFilesByCategory, + filterTaskFileEntries, + getRetryableFileEntries, + getRetryableFilePaths, + getTaskFileEntries, + getTaskFileTypes, + isTaskInProgressStatus, + sortTaskFileEntries, + type TaskFileNameSort, + TaskFileStatusCategory, +} from "@/lib/task-utils"; + +function showRetryResultToast(result: RetryTaskResponse) { + if (result.skipped.length > 0) { + const missingSources = result.skipped.filter( + (entry) => entry.reason === "source_file_missing", + ).length; + if (missingSources > 0) { + toast.warning("Some files could not be retried", { + description: `${result.retried} file(s) queued. ${missingSources} need to be uploaded again.`, + }); + return; + } + } + + if (result.retried > 0) { + toast.success("Retry started", { + description: `${result.retried} file(s) queued for ingestion`, + }); + return; + } + + toast.warning("No files were retried", { + description: result.message ?? "Selected files could not be retried", + }); +} + +export function useTaskDialog(open: boolean, taskId: string) { + const { markTaskFilesProcessing, refreshTasks } = useTask(); + + const [selectedPaths, setSelectedPaths] = useState>( + () => new Set(), + ); + + const [retryingTarget, setRetryingTarget] = useState< + "all" | "selected" | string | null + >(null); + + const retryMutation = useRetryTaskMutation(); + + const { + data: task, + isLoading, + isError, + refetch: refetchTask, + } = useGetTaskQuery(taskId, { + enabled: open && !!taskId, + refetchOnMount: "always", + refetchInterval: (query) => { + if (!open) { + return false; + } + if (retryingTarget != null) { + return 2000; + } + const data = query.state.data; + if (!data) { + return false; + } + if (isTaskInProgressStatus(data.status)) { + return 2000; + } + return getTaskFileEntries(data).some(([, fileInfo]) => { + const status = fileInfo.status ?? "pending"; + return ( + status === "pending" || + status === "running" || + status === "processing" + ); + }) + ? 2000 + : false; + }, + }); + + const retryableCount = useMemo( + () => (task ? countRetryIngestionFiles(task) : 0), + [task], + ); + + const [search, setSearch] = useState(""); + const [fileType, setFileType] = useState(ALL_TASK_FILE_TYPES); + const [statusCategory, setStatusCategory] = useState( + ALL_TASK_STATUS_CATEGORIES, + ); + const [expandedPath, setExpandedPath] = useState(null); + const [nameSort, setNameSort] = useState("asc"); + + useEffect(() => { + if (!open) { + setSelectedPaths(new Set()); + } + }, [open]); + + const fileEntries = useMemo( + () => (task ? getTaskFileEntries(task) : []), + [task], + ); + + const fileTypes = useMemo(() => (task ? getTaskFileTypes(task) : []), [task]); + + const categoryCounts = useMemo( + () => (task ? countTaskFilesByCategory(task) : null), + [task], + ); + + const activeFileType = + fileType === ALL_TASK_FILE_TYPES || fileTypes.includes(fileType) + ? fileType + : ALL_TASK_FILE_TYPES; + + const filteredEntries = useMemo( + () => + filterTaskFileEntries(fileEntries, { + search, + fileType: activeFileType, + statusCategory: statusCategory as TaskFileStatusCategory, + task: task ?? undefined, + }), + [fileEntries, search, activeFileType, statusCategory, task], + ); + + const sortedEntries = useMemo( + () => sortTaskFileEntries(filteredEntries, nameSort), + [filteredEntries, nameSort], + ); + + const retryIngestionEntries = useMemo( + () => + task ? sortTaskFileEntries(getRetryableFileEntries(task), nameSort) : [], + [task, nameSort], + ); + + const retryIngestionPaths = useMemo( + () => retryIngestionEntries.map(([filePath]) => filePath), + [retryIngestionEntries], + ); + + const selectablePaths = useMemo( + () => getRetryableFilePaths(sortedEntries), + [sortedEntries], + ); + + const allRetryablePaths = useMemo( + () => (task ? getRetryableFilePaths(getRetryableFileEntries(task)) : []), + [task], + ); + + const selectedCount = useMemo(() => { + let count = 0; + for (const path of selectablePaths) { + if (selectedPaths.has(path)) { + count += 1; + } + } + return count; + }, [selectablePaths, selectedPaths]); + + const allSelectableSelected = + selectablePaths.length > 0 && selectedCount === selectablePaths.length; + + const retryIngestionSelectedCount = useMemo(() => { + let count = 0; + for (const path of retryIngestionPaths) { + if (selectedPaths.has(path)) { + count += 1; + } + } + return count; + }, [retryIngestionPaths, selectedPaths]); + + const allRetryIngestionsSelected = + retryIngestionPaths.length > 0 && + retryIngestionSelectedCount === retryIngestionPaths.length; + + const selectedRetryablePaths = useMemo( + () => allRetryablePaths.filter((path) => selectedPaths.has(path)), + [allRetryablePaths, selectedPaths], + ); + + const runRetry = useCallback( + async (filePaths?: string[]) => { + if (!taskId) { + return; + } + if (!filePaths && retryableCount === 0) { + return; + } + if (filePaths && filePaths.length === 0) { + return; + } + + const pathsToRetry = + filePaths ?? + (task ? getRetryableFilePaths(getRetryableFileEntries(task)) : []); + + setRetryingTarget( + filePaths + ? filePaths.length === 1 + ? filePaths[0] + : "selected" + : "all", + ); + + if (pathsToRetry.length > 0) { + markTaskFilesProcessing(taskId, pathsToRetry); + } + + try { + const result = await retryMutation.mutateAsync({ + taskId, + ...(filePaths ? { filePaths } : {}), + }); + await refetchTask(); + showRetryResultToast(result); + if (result.retried > 0) { + setSelectedPaths(new Set()); + } + } catch (error) { + await refreshTasks(); + toast.error("Retry failed", { + description: + error instanceof Error ? error.message : "Could not retry files", + }); + } finally { + setRetryingTarget(null); + } + }, + [ + task, + taskId, + retryableCount, + retryMutation, + refetchTask, + markTaskFilesProcessing, + refreshTasks, + ], + ); + + const handleRetryAll = useCallback(() => runRetry(), [runRetry]); + + const handleRetrySelected = useCallback( + () => runRetry(selectedRetryablePaths), + [runRetry, selectedRetryablePaths], + ); + + const toggleSelectedPath = useCallback((filePath: string) => { + setSelectedPaths((current) => { + const next = new Set(current); + if (next.has(filePath)) { + next.delete(filePath); + } else { + next.add(filePath); + } + return next; + }); + }, []); + + const toggleSelectAllVisible = useCallback(() => { + setSelectedPaths((current) => { + const visible = new Set(selectablePaths); + const allSelected = + selectablePaths.length > 0 && + selectablePaths.every((path) => current.has(path)); + + if (allSelected) { + const next = new Set(current); + for (const path of visible) { + next.delete(path); + } + return next; + } + + const next = new Set(current); + for (const path of visible) { + next.add(path); + } + return next; + }); + }, [selectablePaths]); + + const toggleSelectAllRetryIngestions = useCallback(() => { + setSelectedPaths((current) => { + const allSelected = + retryIngestionPaths.length > 0 && + retryIngestionPaths.every((path) => current.has(path)); + + if (allSelected) { + const next = new Set(current); + for (const path of retryIngestionPaths) { + next.delete(path); + } + return next; + } + + const next = new Set(current); + for (const path of retryIngestionPaths) { + next.add(path); + } + return next; + }); + }, [retryIngestionPaths]); + + const toggleNameSort = () => { + setNameSort((current) => (current === "asc" ? "desc" : "asc")); + }; + + return { + task: task ?? undefined, + isLoading, + isError, + retryableCount, + selectedCount, + selectedPaths, + selectablePaths, + allSelectableSelected, + toggleSelectedPath, + toggleSelectAllVisible, + isRetrying: retryMutation.isPending, + retryingTarget, + handleRetryAll, + handleRetrySelected, + fileEntries, + fileTypes, + categoryCounts, + sortedEntries, + retryIngestionEntries, + retryIngestionSelectedCount, + allRetryIngestionsSelected, + toggleSelectAllRetryIngestions, + search, + setSearch, + fileType: activeFileType, + setFileType, + statusCategory, + setStatusCategory, + expandedPath, + setExpandedPath, + nameSort, + toggleNameSort, + }; +} diff --git a/frontend/components/task-error-content.tsx b/frontend/components/task-error-content.tsx index fbd291403..c03d9fe44 100644 --- a/frontend/components/task-error-content.tsx +++ b/frontend/components/task-error-content.tsx @@ -1,21 +1,26 @@ "use client"; +import * as AccordionPrimitive from "@radix-ui/react-accordion"; import { AlertCircle, ChevronDown, Flag, XCircle } from "lucide-react"; import { useMemo, useState } from "react"; import { IncidentReporterIcon } from "@/components/icons/incident-reporter-icon"; +import TaskDialog from "@/components/task-dialog"; import { Accordion, AccordionContent, AccordionItem, - AccordionTrigger, } from "@/components/ui/accordion"; import { useIsCloudBrand } from "@/contexts/brand-context"; import { type Task } from "@/contexts/task-context"; -import { displayFileTaskError } from "@/lib/task-error-display"; +import { + formatApiComponent, + resolveTaskFileError, +} from "@/lib/task-error-display"; import { getFailedFileCount, getFailedFileEntries, getSuccessfulFileCount, + getTaskFileName, isCompletedTotalFailure, isTerminalFailedTask, } from "@/lib/task-utils"; @@ -41,6 +46,7 @@ export function TaskErrorContent({ const [accordionValue, setAccordionValue] = useState( defaultExpanded ? "failed-files" : "", ); + const [isTaskDialogOpen, setIsTaskDialogOpen] = useState(false); const isExpanded = accordionValue === "failed-files"; const failedEntries = useMemo(() => getFailedFileEntries(task), [task]); @@ -70,29 +76,45 @@ export function TaskErrorContent({ const ossIconColumn = showHeader && !isCloudBrand; - const accordionTrigger = ( -
-
- - {successCount} success · {failedCount} failed - - -
- + const accordionSummary = ( +
+ + {successCount} success · {failedCount} failed + +
); + const openTaskDialogButton = ( + + ); + + const accordionHeader = ( + + {ossIconColumn ?
: null} + + {accordionSummary} + + {openTaskDialogButton} + + ); + return (
- - {ossIconColumn ? ( -
-
-
{accordionTrigger}
-
- ) : ( - accordionTrigger - )} - + {accordionHeader}
{failedEntries.map(([filePath, fileInfo], index) => { - const fileName = - fileInfo.filename || filePath.split("/").pop() || filePath; - const rawError = - typeof fileInfo.error === "string" && fileInfo.error.trim() - ? fileInfo.error.trim() - : task.error; - const { line, componentCause } = - displayFileTaskError(rawError); + const fileName = getTaskFileName(filePath, fileInfo); + const line = resolveTaskFileError(fileInfo, task.error); + const componentCause = formatApiComponent(fileInfo.component); return (
{line}

@@ -229,6 +237,13 @@ export function TaskErrorContent({
+ + setIsTaskDialogOpen(false)} + />
); } diff --git a/frontend/components/tasks_details.tsx b/frontend/components/tasks_details.tsx index f6f923453..8a9d2f3ae 100644 --- a/frontend/components/tasks_details.tsx +++ b/frontend/components/tasks_details.tsx @@ -2,18 +2,15 @@ import { useEffect, useMemo, useState } from "react"; import { TaskCollapsibleSection } from "@/components/task-collapsible-section"; import { TaskErrorContent } from "@/components/task-error-content"; import { TaskPanelHeader } from "@/components/task-panel-header"; -import { useIsCloudBrand } from "@/contexts/brand-context"; import { useKnowledgeFilter } from "@/contexts/knowledge-filter-context"; import { type Task } from "@/contexts/task-context"; import { parseTimestampMs } from "@/lib/time-utils"; -import { cn } from "@/lib/utils"; interface FailedTasksInfoProps { failedTasks: Task[]; } export const FailedTasksInfo = ({ failedTasks }: FailedTasksInfoProps) => { - const isCloudBrand = useIsCloudBrand(); const [openSections, setOpenSections] = useState< Record<"recent" | "past", boolean> >({ @@ -79,12 +76,7 @@ export const FailedTasksInfo = ({ failedTasks }: FailedTasksInfoProps) => { ); return ( -
+
{failedTasks.length === 0 ? ( @@ -106,12 +98,6 @@ export const FailedTasksInfo = ({ failedTasks }: FailedTasksInfoProps) => { })) } emptyText={section.emptyText} - contentClassName={cn( - "flex flex-col", - isCloudBrand - ? "p-0 [&>*:last-child]:border-b [&>*:last-child]:border-muted" - : "gap-2 p-4", - )} renderItem={(task) => ( void; addFiles: (files: Partial[], taskId: string) => void; + /** Mark knowledge-table overlays as processing when a retry starts. */ + markTaskFilesProcessing: (taskId: string, sourceUrls: string[]) => void; refreshTasks: () => Promise; cancelTask: (taskId: string) => Promise; isPolling: boolean; @@ -102,10 +113,13 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { const cancelTaskMutation = useCancelTaskMutation({ onSuccess: (_data, variables) => { // Immediately remove from React Query cache - queryClient.setQueryData(["tasks"], (oldTasks: Task[] | undefined) => { - if (!oldTasks) return []; - return oldTasks.filter((task) => task.task_id !== variables.taskId); - }); + queryClient.setQueryData( + [...TASKS_QUERY_KEY], + (oldTasks: Task[] | undefined) => { + if (!oldTasks) return []; + return oldTasks.filter((task) => task.task_id !== variables.taskId); + }, + ); // Update file to display as cancelled setFiles((prevFiles) => @@ -141,6 +155,33 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { }); }, [queryClient]); + const markTaskFilesProcessing = useCallback( + (taskId: string, sourceUrls: string[]) => { + const paths = new Set(sourceUrls); + if (paths.size === 0) { + return; + } + const now = new Date().toISOString(); + setFiles((prevFiles) => { + let changed = false; + const updated = prevFiles.map((file) => { + if (file.task_id !== taskId || !paths.has(file.source_url)) { + return file; + } + changed = true; + return { + ...file, + status: "processing" as const, + error: undefined, + updated_at: now, + }; + }); + return changed ? updated : prevFiles; + }); + }, + [], + ); + const addFiles = useCallback( (newFiles: Partial[], taskId: string) => { const now = new Date().toISOString(); @@ -178,11 +219,7 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { (prev) => prev.task_id === currentTask.task_id, ); - // Check if task is in progress - const isTaskInProgress = - currentTask.status === "pending" || - currentTask.status === "running" || - currentTask.status === "processing"; + const isTaskInProgress = isTaskInProgressStatus(currentTask.status); // On initial load, previousTasksRef is empty, so we need to process all in-progress tasks const isInitialLoad = previousTasksRef.current.length === 0; @@ -249,8 +286,11 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { })(); setFiles((prevFiles) => { - const existingFileIndex = prevFiles.findIndex( - (f) => f.source_url === filePath, + const existingFileIndex = findTaskFileOverlayIndex( + prevFiles, + currentTask.task_id, + filePath, + fileName, ); // Detect connector type based on file path or other indicators @@ -302,28 +342,28 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { }); } - if (isTaskInProgress && previousTask?.files) { - const currentFileKeys = new Set(Object.keys(currentTask.files ?? {})); - const disappearedFilePaths = Object.keys(previousTask.files).filter( - (fp) => !currentFileKeys.has(fp), + if (previousTask?.files) { + const disappearedFilePaths = getEnhancedListDisappearedFilePaths( + currentTask, + previousTask, ); - - if (disappearedFilePaths.length > 0) { - setFiles((prevFiles) => { - let changed = false; - const updated = prevFiles.map((f) => { - if ( - f.task_id === currentTask.task_id && - f.status === "processing" && - disappearedFilePaths.includes(f.source_url) - ) { - changed = true; - return { ...f, status: "active" as TaskFile["status"] }; - } - return f; - }); - return changed ? updated : prevFiles; - }); + const taskJustCompleted = didTaskReachCompleted( + previousTask, + currentTask, + ); + const shouldFinalizeDisappeared = + disappearedFilePaths.length > 0 && + (isTaskInProgress || taskJustCompleted); + const shouldFinalizeAllProcessing = taskJustCompleted; + + if (shouldFinalizeDisappeared || shouldFinalizeAllProcessing) { + setFiles((prevFiles) => + finalizeProcessingOverlaysForEnhancedTask( + prevFiles, + currentTask, + shouldFinalizeAllProcessing ? undefined : disappearedFilePaths, + ), + ); refetchSearch(); } } @@ -417,12 +457,57 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { e, ); } finally { + const indexedIdentities = new Set(); + const indexedFilenames = new Set(); + for (const [ + , + data, + ] of queryClient.getQueriesData({ + queryKey: ["listFiles"], + })) { + for (const indexed of data?.files ?? []) { + indexedIdentities.add(getKnowledgeFileIdentity(indexed)); + if (indexed.filename?.trim()) { + indexedFilenames.add(indexed.filename.trim()); + } + } + } + for (const [, data] of queryClient.getQueriesData({ + queryKey: ["search"], + })) { + for (const indexed of data?.files ?? []) { + indexedIdentities.add(getKnowledgeFileIdentity(indexed)); + if (indexed.filename?.trim()) { + indexedFilenames.add(indexed.filename.trim()); + } + } + } + setFiles((prevFiles) => - prevFiles.filter( - (file) => - file.task_id !== currentTask.task_id || - (completedHasFailures && file.status === "failed"), - ), + prevFiles.filter((file) => { + if (file.task_id !== currentTask.task_id) { + return true; + } + if (file.status === "failed") { + return completedHasFailures; + } + if (file.status === "processing") { + return false; + } + if (file.status === "active") { + const filename = file.filename?.trim(); + if (filename && indexedFilenames.has(filename)) { + return false; + } + return !indexedIdentities.has( + getKnowledgeFileIdentity({ + filename: file.filename, + source_url: file.source_url, + }), + ); + } + return false; + }), ); } } @@ -531,6 +616,7 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { files, addTask, addFiles, + markTaskFilesProcessing, refreshTasks, cancelTask, isPolling, diff --git a/frontend/lib/task-error-display.ts b/frontend/lib/task-error-display.ts index 782b40502..bbe2e8277 100644 --- a/frontend/lib/task-error-display.ts +++ b/frontend/lib/task-error-display.ts @@ -1,33 +1,65 @@ -// Will update when backend is ready (error_summary, failing_step, component_cause). +import type { + TaskFailureComponent, + TaskFailurePhase, + TaskFileEntry, +} from "@/app/api/queries/useGetTasksQuery"; export const FILE_ERROR_MAX_LINE_LENGTH = 80; -export type TaskErrorComponentCause = "OpenSearch" | "Docling" | "Langflow"; - -const COMPONENT_CAUSES: ReadonlyArray<{ - keyword: string; - label: TaskErrorComponentCause; -}> = [ - { keyword: "opensearch", label: "OpenSearch" }, - { keyword: "docling", label: "Docling" }, - { keyword: "langflow", label: "Langflow" }, -]; +export type TaskErrorComponentCause = + | "OpenSearch" + | "Docling" + | "Langflow" + | "OpenRAG"; + +export type TaskPipelineStepId = + | "parsing" + | "chunking" + | "embedding" + | "indexing" + | "file_validation" + | "unknown"; + +export interface IngestionPipelineStep { + id: TaskPipelineStepId; + label: string; + status: "completed" | "failed"; +} -export interface FileTaskErrorDisplay { - line: string; +export interface TaskFileIngestionFailureAnalysis { + resolvedError: string; + failedStep: TaskPipelineStepId; + pipelineSteps: IngestionPipelineStep[]; + rowStatusLabel: string; + failureSummary: string; componentCause?: TaskErrorComponentCause; + componentTags: string[]; + summaryLine: string; } -function normalizeErrorText(raw: string): string { - return raw.replace(/\s+/g, " ").trim(); -} +const PIPELINE_STEP_ORDER: TaskPipelineStepId[] = [ + "parsing", + "chunking", + "embedding", + "indexing", +]; -function stripNoisePrefixes(text: string): string { - return text - .replace(/^Error running graph:\s*/i, "") - .replace(/^Error building Component [^:]+:\s*/i, "") - .trim(); -} +const PIPELINE_STEP_LABELS: Record = { + parsing: "Parsing", + chunking: "Chunking", + embedding: "Embedding", + indexing: "Indexing", + file_validation: "File validation", + unknown: "Ingestion", +}; + +const COMPONENT_LABELS: Record = + { + docling: "Docling", + openrag: "OpenRAG", + langflow: "Langflow", + opensearch: "OpenSearch", + }; function truncateLine( text: string, @@ -39,55 +71,105 @@ function truncateLine( return `${text.slice(0, maxLength - 1).trimEnd()}…`; } -/** Prefer a short clause from long, nested error strings. */ -function extractReadableLine(text: string): string { - const beforeCausedBy = text.split(/\s+caused by:/i)[0]?.trim() ?? text; - - if (beforeCausedBy.length <= FILE_ERROR_MAX_LINE_LENGTH) { - return beforeCausedBy; +export function formatApiComponent( + component?: TaskFailureComponent, +): TaskErrorComponentCause | undefined { + if (!component) { + return undefined; } + return COMPONENT_LABELS[component]; +} - const colonParts = beforeCausedBy.split(":"); - const lastClause = colonParts[colonParts.length - 1]?.trim(); - if ( - lastClause && - lastClause.length >= 10 && - lastClause.length <= FILE_ERROR_MAX_LINE_LENGTH - ) { - return lastClause; +export function normalizeFailurePhase( + phase?: string, +): TaskPipelineStepId | undefined { + if (!phase) { + return undefined; } - - return beforeCausedBy; + if (phase in PIPELINE_STEP_LABELS) { + return phase as TaskPipelineStepId; + } + return undefined; } -export function detectComponentCause( - raw: string, -): TaskErrorComponentCause | undefined { - const lower = raw.toLowerCase(); - for (const { keyword, label } of COMPONENT_CAUSES) { - if (lower.includes(keyword)) { - return label; - } +export function buildRowStatusLabel(failedStep: TaskPipelineStepId): string { + if (failedStep === "file_validation") { + return "File validation issue"; } - return undefined; + if (failedStep === "unknown") { + return "Failed"; + } + return `${PIPELINE_STEP_LABELS[failedStep]} issue`; } -export function displayFileTaskError( - raw: string | undefined | null, -): FileTaskErrorDisplay { - if (!raw?.trim()) { - return { line: "Unknown error" }; +export function buildFailureSummary(failedStep: TaskPipelineStepId): string { + return `Failed at ${PIPELINE_STEP_LABELS[failedStep].toLowerCase()}`; +} + +export function buildPipelineStepsFromFailurePhase( + failurePhase: TaskPipelineStepId, +): IngestionPipelineStep[] { + if (failurePhase === "file_validation" || failurePhase === "unknown") { + return [ + { + id: failurePhase, + label: PIPELINE_STEP_LABELS[failurePhase], + status: "failed", + }, + ]; } - const normalized = normalizeErrorText(raw); - const componentCause = detectComponentCause(normalized); + const failedIndex = PIPELINE_STEP_ORDER.indexOf(failurePhase); + if (failedIndex < 0) { + return [ + { id: "unknown", label: PIPELINE_STEP_LABELS.unknown, status: "failed" }, + ]; + } - let line = stripNoisePrefixes(normalized); - line = truncateLine(extractReadableLine(line)); + return PIPELINE_STEP_ORDER.slice(0, failedIndex + 1).map((id, index) => ({ + id, + label: PIPELINE_STEP_LABELS[id], + status: index < failedIndex ? "completed" : "failed", + })); +} - if (!line) { - line = "Unknown error"; +export function resolveTaskFileError( + fileInfo: TaskFileEntry, + taskError?: string, +): string { + if (typeof fileInfo.user_facing_message === "string") { + const message = fileInfo.user_facing_message.trim(); + if (message) { + return message; + } } + if (typeof fileInfo.error === "string" && fileInfo.error.trim()) { + return fileInfo.error.trim(); + } + if (typeof taskError === "string" && taskError.trim()) { + return taskError.trim(); + } + return "Unknown error"; +} - return componentCause ? { line, componentCause } : { line }; +export function analyzeTaskFileIngestionFailure( + fileInfo: TaskFileEntry, + taskError?: string, +): TaskFileIngestionFailureAnalysis { + const resolvedError = resolveTaskFileError(fileInfo, taskError); + const failedStep = normalizeFailurePhase(fileInfo.failure_phase) ?? "unknown"; + const pipelineSteps = buildPipelineStepsFromFailurePhase(failedStep); + const componentCause = formatApiComponent(fileInfo.component); + const componentTags = componentCause ? [componentCause] : []; + + return { + resolvedError, + failedStep, + pipelineSteps, + rowStatusLabel: buildRowStatusLabel(failedStep), + failureSummary: buildFailureSummary(failedStep), + componentCause, + componentTags, + summaryLine: truncateLine(resolvedError), + }; } diff --git a/frontend/lib/task-utils.ts b/frontend/lib/task-utils.ts index 8f22468d5..e41958507 100644 --- a/frontend/lib/task-utils.ts +++ b/frontend/lib/task-utils.ts @@ -1,11 +1,214 @@ import type { Task, TaskFileEntry } from "@/app/api/queries/useGetTasksQuery"; +import { + buildRowStatusLabel, + normalizeFailurePhase, +} from "@/lib/task-error-display"; + +export const ALL_TASK_FILE_TYPES = "__all__"; +export const ALL_TASK_STATUS_CATEGORIES = "__all__"; + +export type TaskFileStatusCategory = "completed" | "system_error" | "indexing"; + +export type TaskFileNameSort = "asc" | "desc"; + +export type TaskFileFilterOptions = { + search?: string; + fileType?: string | typeof ALL_TASK_FILE_TYPES; + statusCategory?: TaskFileStatusCategory | typeof ALL_TASK_STATUS_CATEGORIES; + task?: Task; +}; + +export function isTaskFileCompleted(fileInfo: TaskFileEntry): boolean { + return fileInfo.status === "completed"; +} + +export function isTaskFileFailed(fileInfo: TaskFileEntry): boolean { + return fileInfo.status === "failed" || fileInfo.status === "error"; +} + +export function getTaskFileDialogStatusLabel( + fileInfo: TaskFileEntry, + taskError?: string, +): string { + if (isTaskFileFailed(fileInfo)) { + const failurePhase = normalizeFailurePhase(fileInfo.failure_phase); + if (failurePhase) { + return buildRowStatusLabel(failurePhase); + } + if (fileInfo.user_facing_message?.trim()) { + return "Failed"; + } + return buildRowStatusLabel("unknown"); + } + if (isTaskFileCompleted(fileInfo)) { + return "Complete"; + } + return "Processing"; +} + +export function getTaskFileName( + filePath: string, + fileInfo: TaskFileEntry, +): string { + return fileInfo.filename || filePath.split("/").pop() || filePath; +} + +/** Lowercase extension without dot, or empty string when none. */ +export function getFileExtensionFromName(filename: string): string { + const trimmed = filename.trim(); + const dotIndex = trimmed.lastIndexOf("."); + if (dotIndex <= 0 || dotIndex === trimmed.length - 1) { + return ""; + } + return trimmed.slice(dotIndex + 1).toLowerCase(); +} + +export function getTaskFileTypeKey( + filePath: string, + fileInfo: TaskFileEntry, +): string { + const extension = getFileExtensionFromName( + getTaskFileName(filePath, fileInfo), + ); + return extension || "unknown"; +} + +export function getTaskFileEntries(task: Task): Array<[string, TaskFileEntry]> { + return Object.entries(task.files || {}); +} + +export function getTaskFileTypes(task: Task): string[] { + const types = new Set( + getTaskFileEntries(task).map(([path, entry]) => + getTaskFileTypeKey(path, entry), + ), + ); + return Array.from(types).sort((a, b) => a.localeCompare(b)); +} + +export function formatTaskFileTypeLabel(fileType: string): string { + if (fileType === "unknown") { + return "Unknown"; + } + return fileType.toUpperCase(); +} + +export function isTaskFileRetryable(fileInfo: TaskFileEntry): boolean { + return fileInfo.actionable_by === "RETRYABLE"; +} + +export function getRetryableFileEntries( + task: Task, +): Array<[string, TaskFileEntry]> { + return getTaskFileEntries(task).filter(([, fileInfo]) => + isTaskFileRetryable(fileInfo), + ); +} + +export function getRetryableFilePaths( + entries: Array<[string, TaskFileEntry]>, +): string[] { + return entries + .filter(([, fileInfo]) => isTaskFileRetryable(fileInfo)) + .map(([filePath]) => filePath); +} + +export function countRetryIngestionFiles(task: Task): number { + return getTaskFileEntries(task).filter(([, fileInfo]) => + isTaskFileRetryable(fileInfo), + ).length; +} + +/** Maps a file to a dialog filter chip bucket (aligned with per-file status labels). */ +export function getTaskFileStatusCategory( + fileInfo: TaskFileEntry, +): TaskFileStatusCategory { + if (isTaskFileFailed(fileInfo)) { + return "system_error"; + } + + const status = fileInfo.status ?? "pending"; + if (status === "pending" || status === "running" || status === "processing") { + return "indexing"; + } + + if (status === "completed") { + return "completed"; + } + + return "indexing"; +} + +export function countTaskFilesByCategory( + task: Task, +): Record { + const counts: Record = { + completed: 0, + system_error: 0, + indexing: 0, + }; + + for (const [, fileInfo] of getTaskFileEntries(task)) { + const category = getTaskFileStatusCategory(fileInfo); + counts[category] += 1; + } + + return counts; +} + +export function sortTaskFileEntries( + entries: Array<[string, TaskFileEntry]>, + direction: TaskFileNameSort = "asc", +): Array<[string, TaskFileEntry]> { + const sorted = [...entries].sort(([pathA, infoA], [pathB, infoB]) => + getTaskFileName(pathA, infoA).localeCompare( + getTaskFileName(pathB, infoB), + undefined, + { sensitivity: "base" }, + ), + ); + return direction === "asc" ? sorted : sorted.reverse(); +} + +export function filterTaskFileEntries( + entries: Array<[string, TaskFileEntry]>, + options: TaskFileFilterOptions, +): Array<[string, TaskFileEntry]> { + const query = options.search?.trim().toLowerCase() ?? ""; + const fileType = options.fileType ?? ALL_TASK_FILE_TYPES; + const statusCategory = options.statusCategory ?? ALL_TASK_STATUS_CATEGORIES; + + return entries.filter(([filePath, fileInfo]) => { + if (fileType !== ALL_TASK_FILE_TYPES) { + const typeKey = getTaskFileTypeKey(filePath, fileInfo); + if (typeKey !== fileType) { + return false; + } + } + + if ( + statusCategory !== ALL_TASK_STATUS_CATEGORIES && + getTaskFileStatusCategory(fileInfo) !== statusCategory + ) { + return false; + } + + if (query) { + const name = getTaskFileName(filePath, fileInfo); + if (!name.toLowerCase().includes(query)) { + return false; + } + } + + return true; + }); +} export function getFailedFileEntries( task: Task, ): Array<[string, TaskFileEntry]> { - return Object.entries(task.files || {}).filter( - ([, fileInfo]) => - fileInfo?.status === "failed" || fileInfo?.status === "error", + return Object.entries(task.files || {}).filter(([, fileInfo]) => + isTaskFileFailed(fileInfo), ); } @@ -48,3 +251,121 @@ export function isCompletedTotalFailure(task: Task): boolean { export function isFailureLikeTask(task: Task): boolean { return isTerminalFailedTask(task) || isCompletedWithFailures(task); } + +export function isTaskInProgressStatus(status: Task["status"]): boolean { + return ( + status === "pending" || status === "running" || status === "processing" + ); +} + +/** True when a task has just transitioned to completed. */ +export function didTaskReachCompleted( + previousTask: Task | undefined, + currentTask: Task, +): boolean { + return ( + !!previousTask && + previousTask.status !== "completed" && + currentTask.status === "completed" + ); +} + +/** + * File paths present on the previous enhanced-list payload but omitted now. + * The enhanced list drops completed files from `files` while a task is still running. + */ +/** Stable overlay key before the backend temp path is known. */ +export function pendingTaskFileSourceUrl( + taskId: string, + filename: string, +): string { + return `pending:${taskId}:${filename}`; +} + +export function isPendingTaskFileSourceUrl(sourceUrl: string): boolean { + return sourceUrl.startsWith("pending:"); +} + +export function findTaskFileOverlayIndex( + files: Array<{ task_id: string; source_url: string; filename: string }>, + taskId: string, + filePath: string, + fileName: string, +): number { + const pendingUrl = pendingTaskFileSourceUrl(taskId, fileName); + return files.findIndex( + (f) => + f.task_id === taskId && + (f.source_url === filePath || + f.source_url === pendingUrl || + (f.filename === fileName && isPendingTaskFileSourceUrl(f.source_url))), + ); +} + +export function getEnhancedListDisappearedFilePaths( + currentTask: Task, + previousTask: Task, +): string[] { + const currentKeys = new Set(Object.keys(currentTask.files ?? {})); + return Object.keys(previousTask.files ?? {}).filter( + (filePath) => !currentKeys.has(filePath), + ); +} + +interface ProcessingFileOverlay { + task_id: string; + source_url: string; + status: "active" | "failed" | "processing"; + error?: string; +} + +/** + * Promote local processing overlays when the enhanced list omits completed files. + * Pass `disappearedPaths` while the task is in progress; omit it when the task completes + * to finalize every remaining processing file for that task. + */ +export function finalizeProcessingOverlaysForEnhancedTask< + T extends ProcessingFileOverlay, +>(prevFiles: T[], currentTask: Task, disappearedPaths?: string[]): T[] { + const pathsFilter = + disappearedPaths === undefined ? null : new Set(disappearedPaths); + let changed = false; + + const updated = prevFiles.map((file) => { + if (file.task_id !== currentTask.task_id) { + return file; + } + if (pathsFilter !== null && !pathsFilter.has(file.source_url)) { + return file; + } + // Overlays can still be "failed" until the list poll sees a retry as running. + if (file.status !== "processing" && file.status !== "failed") { + return file; + } + + const entry = currentTask.files?.[file.source_url]; + if (entry && isTaskFileFailed(entry)) { + if (file.status === "failed") { + const error = + typeof entry.error === "string" && entry.error.trim().length > 0 + ? entry.error.trim() + : file.error; + if (error === file.error) { + return file; + } + } + changed = true; + const error = + typeof entry.error === "string" && entry.error.trim().length > 0 + ? entry.error.trim() + : file.error; + return { ...file, status: "failed" as const, error }; + } + + // Left the enhanced list (completed files are omitted) or task finished. + changed = true; + return { ...file, status: "active" as const, error: undefined }; + }); + + return changed ? (updated as T[]) : prevFiles; +} diff --git a/frontend/tailwind.config.ts b/frontend/tailwind.config.ts index b50d0e638..8ac93ccf4 100644 --- a/frontend/tailwind.config.ts +++ b/frontend/tailwind.config.ts @@ -24,6 +24,17 @@ const config = { extend: { maxWidth: { content: "960px", + "task-dialog": "var(--task-dialog-width)", + }, + maxHeight: { + "task-dialog": "var(--task-dialog-max-height)", + }, + width: { + "task-dialog": "var(--task-dialog-width)", + "task-dialog-file-type": "var(--task-dialog-file-type-width)", + }, + zIndex: { + "task-dialog-menu": "var(--z-task-dialog-menu)", }, screens: { xl: "1200px", @@ -157,9 +168,14 @@ const config = { DEFAULT: "hsl(var(--card))", foreground: "hsl(var(--card-foreground))", }, + canvas: "hsl(var(--canvas))", "layer-contextual": "hsl(var(--layer-contextual))", + "task-dialog-oss": "hsl(var(--task-dialog-oss-bg))", + "task-dialog-oss-selected": "hsl(var(--task-dialog-oss-selected))", "layer-contextual-foreground": "hsl(var(--layer-contextual-foreground))", + "border-subtle-contextual": + "hsl(var(--border-subtle-background-contextual))", "text-text-01": "hsl(var(--text-text-01))", "link-primary": "hsl(var(--link-primary))", "button-tertiary": "hsl(var(--button-tertiary))", @@ -215,6 +231,9 @@ const config = { }, spacing: { mmd: "13px", + "task-dialog-error-indent": "var(--task-dialog-error-indent)", + "task-dialog-error-indent-cloud": + "var(--task-dialog-error-indent-cloud)", }, borderRadius: { lg: "var(--radius)", diff --git a/frontend/tests/core/tasks-unified-panel.spec.ts b/frontend/tests/core/tasks-unified-panel.spec.ts index 7bb57146a..3d6797a29 100644 --- a/frontend/tests/core/tasks-unified-panel.spec.ts +++ b/frontend/tests/core/tasks-unified-panel.spec.ts @@ -64,7 +64,11 @@ const buildTask = ( const wireTasksState = async (page: Page, initialTasks: MockTask[]) => { let currentTasks = initialTasks; - await page.route("**/api/tasks", async (route: Route) => { + await page.route("**/api/tasks**", async (route: Route) => { + if (route.request().method() !== "GET") { + await route.continue(); + return; + } await route.fulfill({ status: 200, contentType: "application/json", diff --git a/src/api/schemas/tasks.py b/src/api/schemas/tasks.py new file mode 100644 index 000000000..de2d45138 --- /dev/null +++ b/src/api/schemas/tasks.py @@ -0,0 +1,22 @@ +from pydantic import BaseModel, Field + + +class ErrorResponse(BaseModel): + error: str + message: str | None = None + task_id: str | None = None + + +class TaskRetrySkippedFile(BaseModel): + file_path: str + filename: str | None = None + reason: str | None = None + + +class TaskRetryResponse(BaseModel): + task_id: str + retried: int = 0 + skipped: list[TaskRetrySkippedFile] = Field(default_factory=list) + status: str + message: str | None = None + error: str | None = None diff --git a/src/api/tasks.py b/src/api/tasks.py index c5c779ce7..8e695c8d2 100644 --- a/src/api/tasks.py +++ b/src/api/tasks.py @@ -1,5 +1,6 @@ -from fastapi import Depends +from fastapi import Body, Depends from fastapi.responses import JSONResponse +from pydantic import BaseModel, Field from dependencies import get_current_user, get_task_service from session_manager import User @@ -50,6 +51,41 @@ async def all_tasks_enhanced( return JSONResponse({"tasks": tasks}) +class RetryTaskBody(BaseModel): + file_paths: list[str] | None = Field( + default=None, + description=( + "Optional subset of task file paths to retry. Omit to retry all failed RETRYABLE files." + ), + ) + + +async def retry_task( + task_id: str, + body: RetryTaskBody = Body(default_factory=RetryTaskBody), + task_service=Depends(get_task_service), + user: User = Depends(get_current_user), +): + """Re-run ingestion for failed RETRYABLE files in an existing task.""" + result = await task_service.retry_failed_files( + user.user_id, task_id, file_paths=body.file_paths + ) + if result is None: + return JSONResponse({"error": "Task not found"}, status_code=404) + + if result.get("error") == "task_in_progress": + return JSONResponse(result, status_code=409) + + if result.get("error") == "no_processor": + return JSONResponse(result, status_code=400) + + if result.get("status") == "no_op": + return JSONResponse(result, status_code=400) + + await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORB_TASK_RETRIED) + return JSONResponse(result, status_code=202) + + async def cancel_task( task_id: str, task_service=Depends(get_task_service), diff --git a/src/app/routes/internal.py b/src/app/routes/internal.py index 1b0a64f35..299f05016 100644 --- a/src/app/routes/internal.py +++ b/src/app/routes/internal.py @@ -6,7 +6,7 @@ Depends, not here. This module only wires URL → handler. """ -from fastapi import FastAPI +from fastapi import FastAPI, status from api import ( auth, @@ -30,6 +30,7 @@ ) from api import keys as api_keys from api.health import health_check, opensearch_health_ready +from api.schemas.tasks import ErrorResponse, TaskRetryResponse from connectors.aws_s3.api import ( s3_bucket_status, s3_configure, @@ -91,6 +92,20 @@ def register_internal_routes(app: FastAPI): tags=["internal"], ) app.add_api_route("/tasks", tasks.all_tasks, methods=["GET"], tags=["internal"]) + app.add_api_route( + "/tasks/{task_id}/retry", + tasks.retry_task, + methods=["POST"], + tags=["internal"], + status_code=status.HTTP_202_ACCEPTED, + response_model=TaskRetryResponse, + responses={ + 400: {"model": ErrorResponse}, + 404: {"model": ErrorResponse}, + 409: {"model": ErrorResponse}, + 500: {"model": ErrorResponse}, + }, + ) app.add_api_route( "/tasks/{task_id}/cancel", tasks.cancel_task, diff --git a/src/config/settings.py b/src/config/settings.py index e8f3aabee..c599ab59a 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -243,6 +243,8 @@ def _resolve_skip_os_security_default() -> str: # Default: 40 minutes total, 40 minutes read timeout LANGFLOW_TIMEOUT = get_env_float("LANGFLOW_TIMEOUT", 2400.0) # 40 minutes LANGFLOW_CONNECT_TIMEOUT = get_env_float("LANGFLOW_CONNECT_TIMEOUT", 30.0) # 30 seconds +# Retries for transient Langflow HTTP failures (disconnects, 502/503/504). +LANGFLOW_REQUEST_RETRIES = get_env_int("LANGFLOW_REQUEST_RETRIES", 2) # Per-file processing timeout for document ingestion tasks (in seconds) # Should be >= LANGFLOW_TIMEOUT to allow long-running ingestion to complete @@ -844,42 +846,113 @@ async def close(self): async def langflow_request(self, method: str, endpoint: str, **kwargs): """Central method for all Langflow API requests. - Retries once with a fresh API key on auth failures (401/403). + Retries transient transport/server errors and once with a fresh API key on + auth failures (401/403). """ - api_key = await get_langflow_api_key() - if not api_key: - raise ValueError("No Langflow API key available") + _TRANSIENT_STATUS_CODES = {408, 429, 500, 502, 503, 504} + max_attempts = max(1, LANGFLOW_REQUEST_RETRIES + 1) + last_error: Exception | None = None + auth_retry_attempted = False + request_kwargs = dict(kwargs) + passed_headers = request_kwargs.pop("headers", {}) or {} - # Merge headers properly - passed headers take precedence over defaults - default_headers = {"x-api-key": api_key, "Content-Type": "application/json"} - existing_headers = kwargs.pop("headers", {}) - headers = {**default_headers, **existing_headers} + for attempt in range(max_attempts): + api_key = await get_langflow_api_key() + if not api_key: + raise ValueError("No Langflow API key available") - # Remove Content-Type if explicitly set to None (for file uploads) - if headers.get("Content-Type") is None: - headers.pop("Content-Type", None) + # Merge headers properly - passed headers take precedence over defaults + default_headers = {"x-api-key": api_key, "Content-Type": "application/json"} + headers = {**default_headers, **passed_headers} - url = f"{LANGFLOW_URL}{endpoint}" + # Remove Content-Type if explicitly set to None (for file uploads) + if headers.get("Content-Type") is None: + headers.pop("Content-Type", None) - response = await self.langflow_http_client.request( - method=method, url=url, headers=headers, **kwargs - ) + url = f"{LANGFLOW_URL}{endpoint}" - # Retry once with a fresh API key on auth failure - if response.status_code in (401, 403): - logger.warning( - "Langflow request auth failed, regenerating API key and retrying", - status_code=response.status_code, - endpoint=endpoint, - ) - api_key = await get_langflow_api_key(force_regenerate=True) - if api_key: - headers["x-api-key"] = api_key + try: response = await self.langflow_http_client.request( - method=method, url=url, headers=headers, **kwargs + method=method, url=url, headers=headers, **request_kwargs + ) + except httpx.RequestError as exc: + last_error = exc + if attempt + 1 < max_attempts: + delay = min(2**attempt, 4) + logger.warning( + "Langflow request transport error, retrying", + endpoint=endpoint, + attempt=attempt + 1, + max_attempts=max_attempts, + error=str(exc), + retry_in_seconds=delay, + ) + await asyncio.sleep(delay) + continue + raise + + # Retry once with a fresh API key on auth failure + if response.status_code in (401, 403) and not auth_retry_attempted: + logger.warning( + "Langflow request auth failed, regenerating API key and retrying", + status_code=response.status_code, + endpoint=endpoint, + ) + auth_retry_attempted = True + api_key = await get_langflow_api_key(force_regenerate=True) + if api_key: + headers["x-api-key"] = api_key + try: + response = await self.langflow_http_client.request( + method=method, url=url, headers=headers, **request_kwargs + ) + except httpx.RequestError as exc: + last_error = exc + if attempt + 1 < max_attempts: + delay = min(2**attempt, 4) + logger.warning( + "Langflow auth retry transport error, retrying", + endpoint=endpoint, + attempt=attempt + 1, + max_attempts=max_attempts, + error=str(exc), + retry_in_seconds=delay, + ) + await asyncio.sleep(delay) + continue + raise + + if response.status_code in (401, 403) and attempt + 1 < max_attempts: + delay = min(2**attempt, 4) + logger.warning( + "Langflow auth failure persists, retrying request", + endpoint=endpoint, + status_code=response.status_code, + attempt=attempt + 1, + max_attempts=max_attempts, + retry_in_seconds=delay, ) + await asyncio.sleep(delay) + continue + + if response.status_code in _TRANSIENT_STATUS_CODES and attempt + 1 < max_attempts: + delay = min(2**attempt, 4) + logger.warning( + "Langflow request returned transient status, retrying", + endpoint=endpoint, + status_code=response.status_code, + attempt=attempt + 1, + max_attempts=max_attempts, + retry_in_seconds=delay, + ) + await asyncio.sleep(delay) + continue + + return response - return response + if last_error is not None: + raise last_error + raise RuntimeError("Langflow request failed without a response") async def _create_langflow_global_variable(self, name: str, value: str, modify: bool = False): """Create a global variable in Langflow via API""" diff --git a/src/models/processors.py b/src/models/processors.py index 84f4d647a..fbfdd3620 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -798,6 +798,9 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File return # Process using Langflow pipeline + from models.tasks import IngestionPhase + + file_task.phase = IngestionPhase.LANGFLOW result = await self.langflow_connector_service.process_connector_document( document, self.user_id, diff --git a/src/services/task_service.py b/src/services/task_service.py index 20604ff84..48a590a87 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -17,6 +17,107 @@ logger = get_logger(__name__) +# Substrings that indicate a permanent file/content problem (not worth retrying). +_NON_RETRYABLE_FILE_ERROR_MARKERS = ( + "corrupt", + "corrupted", + "corruption", + "invalid file", + "invalid document", + "unsupported format", + "unsupported file", + "malformed", + "damaged", + "not a zip file", + "bad zipfile", + "failed to parse", + "could not be parsed", + "cannot parse", + "no text content could be extracted", + "empty or unreadable", + "unreadable", + "validationerror", +) + +# Docling polling errors that are transient (service/timeout), not bad file content. +_DOCLING_TRANSIENT_ERROR_MARKERS = ( + "(timeout)", + "timed out", + "polling exceeded", + "polling timed out", +) + +# Connector/download/network failures seen while file is still in Docling phase. +# These are infrastructure/transient and should be retryable. +_TRANSIENT_CONNECTIVITY_ERROR_MARKERS = ( + "all connection attempts failed", + "connection refused", + "connection reset", + "temporary failure in name resolution", + "name or service not known", + "service unavailable", + "network is unreachable", + "server disconnected", + "disconnected without sending a response", + "remote protocol error", + "broken pipe", +) + + +def _is_non_retryable_file_error(error: str) -> bool: + lowered = error.lower() + return any(marker in lowered for marker in _NON_RETRYABLE_FILE_ERROR_MARKERS) + + +def _is_transient_connectivity_error(error: str) -> bool: + lowered = error.lower() + return any(marker in lowered for marker in _TRANSIENT_CONNECTIVITY_ERROR_MARKERS) + + +def _is_docling_transient_error(error: str) -> bool: + lowered = error.lower() + # Legacy timeout messages emitted by Docling polling path. + if "docling conversion did not complete" in lowered: + return any(marker in lowered for marker in _DOCLING_TRANSIENT_ERROR_MARKERS) + + # Connector-originated transient network failures can still surface while + # file phase is DOCLING (before conversion can complete). + return _is_transient_connectivity_error(error) + + +def _transient_connectivity_user_message(error: str) -> str: + lowered = error.lower() + if any( + marker in lowered + for marker in ( + "disconnect", + "without sending a response", + "connection refused", + "connection reset", + "broken pipe", + "remote protocol", + ) + ): + return "Ingestion service connection was lost. Please retry ingestion." + if any(marker in lowered for marker in ("timeout", "timed out")): + return "Document processing timed out. Please retry ingestion." + return "Ingestion service was temporarily unavailable. Please retry ingestion." + + +def _is_langflow_transport_failure(error: str) -> bool: + """HTTP client / Langflow transport failures on the connector ingest path.""" + lowered = error.lower() + return any( + marker in lowered + for marker in ( + "server disconnected", + "disconnected without sending a response", + "remote protocol error", + "read error", + "write error", + ) + ) + class IngestionTimeoutError(Exception): """Raised when file processing exceeds the configured timeout""" @@ -590,6 +691,171 @@ def _resolve_upload_task(self, user_id: str, task_id: str) -> UploadTask | None: return self.task_store[candidate_user_id][task_id] return None + def _resolve_upload_task_store( + self, user_id: str, task_id: str + ) -> tuple[str, UploadTask] | None: + """Return (store_user_id, upload_task) for a task visible to this user.""" + if not task_id: + return None + for candidate_user_id in [user_id, AnonymousUser().user_id]: + if ( + candidate_user_id in self.task_store + and task_id in self.task_store[candidate_user_id] + ): + return candidate_user_id, self.task_store[candidate_user_id][task_id] + return None + + async def retry_failed_files( + self, + user_id: str, + task_id: str, + *, + file_paths: list[str] | None = None, + retryable_only: bool = True, + ) -> dict | None: + """Re-queue failed files for ingestion when their source paths still exist. + + Only files classified as RETRYABLE are retried when *retryable_only* is + True (the default). When *file_paths* is set, only those task paths are + considered; paths missing from the task or not in a failed state are + reported in *skipped*. This reuses the task's original processor — it + does not accept new uploads from the client. + + Connector tasks often use provider IDs (not local absolute paths) as + ``file_path`` keys. For those non-absolute identifiers, skip local + filesystem existence checks and let the connector processor re-fetch. + """ + resolved = self._resolve_upload_task_store(user_id, task_id) + if resolved is None: + return None + + store_user_id, upload_task = resolved + + paths_to_retry: list[str] = [] + skipped: list[dict] = [] + requested_paths = set(file_paths) if file_paths is not None else None + + # Keep status checks, candidate selection, and state transitions inside + # one lock to avoid concurrent retry requests enqueueing duplicates. + async with self._get_task_lock(task_id): + processor = upload_task.processor + if upload_task.status == TaskStatus.RUNNING: + return { + "error": "task_in_progress", + "message": "Task is still running", + "task_id": task_id, + } + + if processor is None: + return { + "error": "no_processor", + "message": "Cannot retry: task processor is no longer available", + "task_id": task_id, + } + + if requested_paths is not None: + for path in requested_paths: + file_task = upload_task.file_tasks.get(path) + if file_task is None: + skipped.append({"file_path": path, "reason": "file_not_in_task"}) + elif file_task.status != TaskStatus.FAILED: + skipped.append( + { + "file_path": path, + "filename": file_task.filename, + "reason": "not_failed", + } + ) + + # Build retry candidates before mutating shared task/file state. + retry_candidates: list[tuple[str, FileTask]] = [] + for file_path, file_task in list(upload_task.file_tasks.items()): + if requested_paths is not None and file_path not in requested_paths: + continue + if file_task.status != TaskStatus.FAILED: + continue + + if retryable_only: + metadata = self._infer_failure_metadata(file_task) + if not metadata or metadata.get("actionable_by") != "RETRYABLE": + skipped.append( + { + "file_path": file_path, + "filename": file_task.filename, + "reason": "not_retryable", + } + ) + continue + + # Only enforce local source-file existence for absolute paths. + # Connector-backed tasks typically store remote IDs as file_path. + if os.path.isabs(file_path) and not os.path.isfile(file_path): + skipped.append( + { + "file_path": file_path, + "filename": file_task.filename, + "reason": "source_file_missing", + } + ) + continue + + retry_candidates.append((file_path, file_task)) + + now = time.time() + for file_path, file_task in retry_candidates: + if upload_task.failed_files > 0: + upload_task.failed_files -= 1 + if upload_task.processed_files > 0: + upload_task.processed_files -= 1 + + file_task.status = TaskStatus.PENDING + file_task.error = None + file_task.result = None + file_task.retry_count += 1 + file_task.docling_task_id = None + # Connector tasks use remote IDs as file_path and ingest via Langflow. + if os.path.isabs(file_path): + file_task.docling_status = DoclingPhaseStatus.PENDING + file_task.phase = IngestionPhase.DOCLING + else: + file_task.docling_status = DoclingPhaseStatus.PENDING + file_task.phase = IngestionPhase.LANGFLOW + file_task.updated_at = now + paths_to_retry.append(file_path) + + if paths_to_retry: + upload_task.status = TaskStatus.RUNNING + upload_task.updated_at = now + + if not paths_to_retry: + return { + "task_id": task_id, + "retried": 0, + "skipped": skipped, + "status": "no_op", + "message": "No retryable files with available source data", + } + + background_task = asyncio.create_task( + self.background_custom_processor(store_user_id, task_id, paths_to_retry, processor) + ) + upload_task.background_task = background_task + self.background_tasks.add(background_task) + + def _clear_retry_background_task(done_task: asyncio.Task) -> None: + self.background_tasks.discard(done_task) + if upload_task.background_task is done_task: + upload_task.background_task = None + + background_task.add_done_callback(_clear_retry_background_task) + + return { + "task_id": task_id, + "retried": len(paths_to_retry), + "skipped": skipped, + "status": "accepted", + } + def _serialize_file_task(self, file_task: FileTask) -> dict: """Serialize a FileTask to the standard dict shape.""" return { @@ -613,35 +879,51 @@ def _infer_failure_metadata(self, file_task: FileTask) -> dict | None: actionable_by when the failure can be classified, or None when the cause is unknown and no fields should be emitted. - Priority order: docling_status enum first (stable), error string patterns - second (fallback for edge cases like polling timeout). + Priority order: transient / retryable docling outcomes (expired, polling + timeout) before generic docling FAILED, which indicates conversion failure. """ docling_status = file_task.docling_status phase = file_task.phase error = file_task.error or "" - if docling_status == DoclingPhaseStatus.FAILED: + if docling_status == DoclingPhaseStatus.EXPIRED: return { "component": "docling", "failure_phase": "parsing", "user_facing_message": ( - "The file could not be processed into readable document content." + "The document processing result could not be found. " + "The task may have expired. Please retry ingestion." ), - "actionable_by": "USER_ACTIONABLE", + "actionable_by": "RETRYABLE", } - if docling_status == DoclingPhaseStatus.EXPIRED: + if _is_non_retryable_file_error(error): + if phase == IngestionPhase.LANGFLOW: + component = "langflow" + failure_phase = "unknown" + else: + component = "docling" + failure_phase = "parsing" return { - "component": "docling", - "failure_phase": "parsing", + "component": component, + "failure_phase": failure_phase, "user_facing_message": ( - "The document processing result could not be found. " - "The task may have expired. Please retry ingestion." + "The file appears corrupted or invalid and cannot be processed. " + "Upload a valid file." ), + "actionable_by": "USER_ACTIONABLE", + } + + if phase == IngestionPhase.DOCLING and _is_docling_transient_error(error): + langflow_transport = _is_langflow_transport_failure(error) + return { + "component": "langflow" if langflow_transport else "docling", + "failure_phase": "unknown" if langflow_transport else "parsing", + "user_facing_message": _transient_connectivity_user_message(error), "actionable_by": "RETRYABLE", } - if phase == IngestionPhase.DOCLING and "Docling conversion did not complete" in error: + if phase == IngestionPhase.DOCLING and docling_status == DoclingPhaseStatus.PROCESSING: return { "component": "docling", "failure_phase": "parsing", @@ -649,12 +931,14 @@ def _infer_failure_metadata(self, file_task: FileTask) -> dict | None: "actionable_by": "RETRYABLE", } - if phase == IngestionPhase.DOCLING and docling_status == DoclingPhaseStatus.PROCESSING: + if docling_status == DoclingPhaseStatus.FAILED: return { "component": "docling", "failure_phase": "parsing", - "user_facing_message": "Document processing timed out. Please retry ingestion.", - "actionable_by": "RETRYABLE", + "user_facing_message": ( + "The file could not be processed into readable document content." + ), + "actionable_by": "USER_ACTIONABLE", } if "already exists" in error: @@ -665,7 +949,28 @@ def _infer_failure_metadata(self, file_task: FileTask) -> dict | None: "actionable_by": "USER_ACTIONABLE", } + if _is_transient_connectivity_error(error) or _is_langflow_transport_failure(error): + return { + "component": "langflow", + "failure_phase": "unknown", + "user_facing_message": _transient_connectivity_user_message(error), + "actionable_by": "RETRYABLE", + } + if phase == IngestionPhase.LANGFLOW: + error_lower = error.lower() + if any( + marker in error_lower + for marker in ("timeout", "timed out", "unavailable", "connection refused") + ): + return { + "component": "langflow", + "failure_phase": "unknown", + "user_facing_message": ( + "Ingestion timed out or the service was unavailable. Please retry." + ), + "actionable_by": "RETRYABLE", + } return { "component": "langflow", "failure_phase": "unknown", diff --git a/src/utils/telemetry/message_id.py b/src/utils/telemetry/message_id.py index 7b0f17d45..07121ad9a 100644 --- a/src/utils/telemetry/message_id.py +++ b/src/utils/telemetry/message_id.py @@ -7,18 +7,18 @@ class MessageId: """Telemetry message IDs.""" - + # Category: APPLICATION_STARTUP -------------------------------------------> - + # Message: Application started successfully ORB_APP_STARTED = "ORB_APP_STARTED" # Message: Application startup initiated ORB_APP_START_INIT = "ORB_APP_START_INIT" # Message: Application shutdown initiated ORB_APP_SHUTDOWN = "ORB_APP_SHUTDOWN" - + # Category: SERVICE_INITIALIZATION -----------------------------------------> - + # Message: Services initialized successfully ORB_SVC_INIT_SUCCESS = "ORB_SVC_INIT_SUCCESS" # Message: Service initialization started @@ -29,9 +29,9 @@ class MessageId: ORB_SVC_OS_CLIENT_FAIL = "ORB_SVC_OS_CLIENT_FAIL" # Message: Failed to generate JWT keys ORB_SVC_JWT_KEY_FAIL = "ORB_SVC_JWT_KEY_FAIL" - + # Category: OPENSEARCH_SETUP ----------------------------------------------> - + # Message: OpenSearch connection established ORB_OS_CONN_ESTABLISHED = "ORB_OS_CONN_ESTABLISHED" # Message: OpenSearch connection failed @@ -40,9 +40,9 @@ class MessageId: ORB_OS_WAITING = "ORB_OS_WAITING" # Message: OpenSearch ready check timeout ORB_OS_TIMEOUT = "ORB_OS_TIMEOUT" - + # Category: OPENSEARCH_INDEX ----------------------------------------------> - + # Message: OpenSearch index created successfully ORB_OS_INDEX_CREATED = "ORB_OS_INDEX_CREATED" # Message: OpenSearch index already exists @@ -55,9 +55,9 @@ class MessageId: ORB_OS_KF_INDEX_CREATED = "ORB_OS_KF_INDEX_CREATED" # Message: Failed to create knowledge filters index ORB_OS_KF_INDEX_FAIL = "ORB_OS_KF_INDEX_FAIL" - + # Category: DOCUMENT_INGESTION --------------------------------------------> - + # Message: Document ingestion started ORB_DOC_INGEST_START = "ORB_DOC_INGEST_START" # Message: Document ingestion completed successfully @@ -84,9 +84,9 @@ class MessageId: ORB_DOC_REFRESH_COMPLETE = "ORB_DOC_REFRESH_COMPLETE" # Message: Default docs refresh failed ORB_DOC_REFRESH_FAILED = "ORB_DOC_REFRESH_FAILED" - + # Category: DOCUMENT_PROCESSING --------------------------------------------> - + # Message: Document processing started ORB_DOC_PROCESS_START = "ORB_DOC_PROCESS_START" # Message: Document processing completed @@ -95,9 +95,9 @@ class MessageId: ORB_DOC_PROCESS_FAILED = "ORB_DOC_PROCESS_FAILED" # Message: Process pool recreation attempted ORB_DOC_POOL_RECREATE = "ORB_DOC_POOL_RECREATE" - + # Category: AUTHENTICATION ------------------------------------------------> - + # Message: Authentication successful ORB_AUTH_SUCCESS = "ORB_AUTH_SUCCESS" # Message: Authentication failed @@ -108,9 +108,9 @@ class MessageId: ORB_AUTH_OAUTH_CALLBACK = "ORB_AUTH_OAUTH_CALLBACK" # Message: OAuth callback failed ORB_AUTH_OAUTH_FAILED = "ORB_AUTH_OAUTH_FAILED" - + # Category: CONNECTOR_OPERATIONS -------------------------------------------> - + # Message: Connector connection established ORB_CONN_CONNECTED = "ORB_CONN_CONNECTED" # Message: Connector connection failed @@ -127,9 +127,9 @@ class MessageId: ORB_CONN_WEBHOOK_FAILED = "ORB_CONN_WEBHOOK_FAILED" # Message: Failed to load persisted connections ORB_CONN_LOAD_FAILED = "ORB_CONN_LOAD_FAILED" - + # Category: FLOW_OPERATIONS ------------------------------------------------> - + # Message: Flow backup completed ORB_FLOW_BACKUP_COMPLETE = "ORB_FLOW_BACKUP_COMPLETE" # Message: Flow backup failed @@ -140,38 +140,40 @@ class MessageId: ORB_FLOW_RESET_CHECK_FAIL = "ORB_FLOW_RESET_CHECK_FAIL" # Message: Settings reapplied after flow reset ORB_FLOW_SETTINGS_REAPPLIED = "ORB_FLOW_SETTINGS_REAPPLIED" - + # Category: TASK_OPERATIONS ------------------------------------------------> - + # Message: Task created successfully ORB_TASK_CREATED = "ORB_TASK_CREATED" # Message: Task completed successfully ORB_TASK_COMPLETE = "ORB_TASK_COMPLETE" # Message: Task failed ORB_TASK_FAILED = "ORB_TASK_FAILED" + # Message: Task retry requested + ORB_TASK_RETRIED = "ORB_TASK_RETRIED" # Message: Task cancelled ORB_TASK_CANCELLED = "ORB_TASK_CANCELLED" # Message: Task cancellation failed ORB_TASK_CANCEL_FAILED = "ORB_TASK_CANCEL_FAILED" - + # Category: CHAT_OPERATIONS ------------------------------------------------> - + # Message: Chat request received ORB_CHAT_REQUEST_RECV = "ORB_CHAT_REQUEST_RECV" # Message: Chat request completed ORB_CHAT_REQUEST_COMPLETE = "ORB_CHAT_REQUEST_COMPLETE" # Message: Chat request failed ORB_CHAT_REQUEST_FAILED = "ORB_CHAT_REQUEST_FAILED" - + # Category: ERROR_CONDITIONS -----------------------------------------------> - + # Message: Critical error occurred ORB_ERROR_CRITICAL = "ORB_ERROR_CRITICAL" # Message: Warning condition ORB_ERROR_WARNING = "ORB_ERROR_WARNING" - + # Category: SETTINGS_OPERATIONS --------------------------------------------> - + # Message: Settings updated successfully ORB_SETTINGS_UPDATED = "ORB_SETTINGS_UPDATED" # Message: Settings update failed @@ -194,9 +196,9 @@ class MessageId: ORB_SETTINGS_INDEX_NAME_UPDATED = "ORB_SETTINGS_INDEX_NAME_UPDATED" # Message: Provider credentials updated ORB_SETTINGS_PROVIDER_CREDS = "ORB_SETTINGS_PROVIDER_CREDS" - + # Category: ONBOARDING -----------------------------------------------------> - + # Message: Onboarding started ORB_ONBOARD_START = "ORB_ONBOARD_START" # Message: Onboarding completed successfully diff --git a/tests/unit/test_task_service_get_task_status2.py b/tests/unit/test_task_service_get_task_status2.py index 01b54321b..8d7a4c082 100644 --- a/tests/unit/test_task_service_get_task_status2.py +++ b/tests/unit/test_task_service_get_task_status2.py @@ -87,6 +87,52 @@ def test_docling_timeout_via_error_string(self, task_service): assert meta["failure_phase"] == "parsing" assert meta["actionable_by"] == "RETRYABLE" + def test_docling_polling_timeout_sets_failed_status(self, task_service): + """Backend polling marks TIMEOUT as docling_status=FAILED; still retryable.""" + ft = _make_file_task( + phase=IngestionPhase.DOCLING, + docling_status=DoclingPhaseStatus.FAILED, + error=( + "Docling conversion did not complete (timeout): Docling polling timed out after 10s" + ), + ) + meta = task_service._infer_failure_metadata(ft) + assert meta is not None + assert meta["actionable_by"] == "RETRYABLE" + + def test_docling_conversion_failed_not_retryable(self, task_service): + """Permanent conversion failure (e.g. corrupt file) is not retryable.""" + ft = _make_file_task( + phase=IngestionPhase.DOCLING, + docling_status=DoclingPhaseStatus.FAILED, + error=("Docling conversion did not complete (failed): Docling reported failure"), + ) + meta = task_service._infer_failure_metadata(ft) + assert meta is not None + assert meta["actionable_by"] == "USER_ACTIONABLE" + + def test_corrupt_docx_bad_zip_not_retryable(self, task_service): + ft = _make_file_task( + phase=IngestionPhase.DOCLING, + docling_status=DoclingPhaseStatus.FAILED, + error=( + "Docling conversion did not complete (failed): BadZipFile: File is not a zip file" + ), + ) + meta = task_service._infer_failure_metadata(ft) + assert meta is not None + assert meta["actionable_by"] == "USER_ACTIONABLE" + + def test_langflow_empty_content_not_retryable(self, task_service): + ft = _make_file_task( + phase=IngestionPhase.LANGFLOW, + docling_status=DoclingPhaseStatus.SUCCESS, + error="No text content could be extracted from document", + ) + meta = task_service._infer_failure_metadata(ft) + assert meta is not None + assert meta["actionable_by"] == "USER_ACTIONABLE" + def test_docling_phase_still_processing(self, task_service): ft = _make_file_task( phase=IngestionPhase.DOCLING, diff --git a/tests/unit/test_task_service_retry_failed_files.py b/tests/unit/test_task_service_retry_failed_files.py new file mode 100644 index 000000000..811374efb --- /dev/null +++ b/tests/unit/test_task_service_retry_failed_files.py @@ -0,0 +1,247 @@ +"""Unit tests for TaskService.retry_failed_files.""" + +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from models.tasks import DoclingPhaseStatus, FileTask, IngestionPhase, TaskStatus, UploadTask +from services.task_service import TaskService + + +@pytest.fixture +def task_service(): + return TaskService(document_service=Mock(), ingestion_timeout=2) + + +def _retryable_failed_file( + file_path: str = "/data/doc.pdf", + *, + error: str = "Docling conversion did not complete (timeout)", +) -> FileTask: + ft = FileTask(file_path=file_path, filename="doc.pdf") + ft.status = TaskStatus.FAILED + ft.phase = IngestionPhase.DOCLING + ft.docling_status = DoclingPhaseStatus.FAILED + ft.error = error + return ft + + +def _store_task( + task_service: TaskService, + user_id: str, + upload_task: UploadTask, +) -> None: + task_service.task_store.setdefault(user_id, {})[upload_task.task_id] = upload_task + + +@pytest.mark.asyncio +async def test_retry_all_failed_retryable_files(task_service): + processor = Mock() + ft_a = _retryable_failed_file("/data/a.pdf") + ft_b = _retryable_failed_file("/data/b.pdf") + task = UploadTask( + task_id="task-1", + total_files=2, + file_tasks={"/data/a.pdf": ft_a, "/data/b.pdf": ft_b}, + status=TaskStatus.FAILED, + failed_files=2, + processor=processor, + ) + _store_task(task_service, "user1", task) + + with ( + patch("os.path.isfile", return_value=True), + patch.object( + task_service, + "background_custom_processor", + new_callable=AsyncMock, + ) as mock_bg, + patch("asyncio.create_task", return_value=Mock()), + ): + result = await task_service.retry_failed_files("user1", "task-1") + + assert result is not None + assert result["retried"] == 2 + assert result["status"] == "accepted" + assert ft_a.status == TaskStatus.PENDING + assert ft_b.status == TaskStatus.PENDING + mock_bg.assert_called_once() + assert set(mock_bg.call_args[0][2]) == {"/data/a.pdf", "/data/b.pdf"} + + +@pytest.mark.asyncio +async def test_retry_subset_by_file_paths(task_service): + processor = Mock() + ft_a = _retryable_failed_file("/data/a.pdf") + ft_b = _retryable_failed_file("/data/b.pdf") + task = UploadTask( + task_id="task-1", + total_files=2, + file_tasks={"/data/a.pdf": ft_a, "/data/b.pdf": ft_b}, + status=TaskStatus.FAILED, + failed_files=2, + processor=processor, + ) + _store_task(task_service, "user1", task) + + with ( + patch("os.path.isfile", return_value=True), + patch.object( + task_service, + "background_custom_processor", + new_callable=AsyncMock, + ) as mock_bg, + patch("asyncio.create_task", return_value=Mock()), + ): + result = await task_service.retry_failed_files( + "user1", + "task-1", + file_paths=["/data/a.pdf"], + ) + + assert result is not None + assert result["retried"] == 1 + assert ft_a.status == TaskStatus.PENDING + assert ft_b.status == TaskStatus.FAILED + mock_bg.assert_called_once() + assert mock_bg.call_args[0][2] == ["/data/a.pdf"] + + +@pytest.mark.asyncio +async def test_retry_unknown_path_is_skipped(task_service): + processor = Mock() + ft_a = _retryable_failed_file("/data/a.pdf") + task = UploadTask( + task_id="task-1", + total_files=1, + file_tasks={"/data/a.pdf": ft_a}, + status=TaskStatus.FAILED, + failed_files=1, + processor=processor, + ) + _store_task(task_service, "user1", task) + + with patch("os.path.isfile", return_value=True): + result = await task_service.retry_failed_files( + "user1", + "task-1", + file_paths=["/data/missing.pdf"], + ) + + assert result is not None + assert result["retried"] == 0 + assert result["status"] == "no_op" + assert result["skipped"] == [{"file_path": "/data/missing.pdf", "reason": "file_not_in_task"}] + assert ft_a.status == TaskStatus.FAILED + + +@pytest.mark.asyncio +async def test_retry_connector_connectivity_error_is_retryable(task_service): + processor = Mock() + ft = FileTask(file_path="/data/connector.pdf", filename="connector.pdf") + ft.status = TaskStatus.FAILED + ft.phase = IngestionPhase.DOCLING + ft.docling_status = DoclingPhaseStatus.PENDING + ft.error = "All connection attempts failed" + task = UploadTask( + task_id="task-connector", + total_files=1, + file_tasks={"/data/connector.pdf": ft}, + status=TaskStatus.FAILED, + failed_files=1, + processor=processor, + ) + _store_task(task_service, "user1", task) + + with ( + patch("os.path.isfile", return_value=True), + patch.object( + task_service, + "background_custom_processor", + new_callable=AsyncMock, + ) as mock_bg, + patch("asyncio.create_task", return_value=Mock()), + ): + result = await task_service.retry_failed_files("user1", "task-connector") + + assert result is not None + assert result["retried"] == 1 + assert result["status"] == "accepted" + assert ft.status == TaskStatus.PENDING + mock_bg.assert_called_once() + + +@pytest.mark.asyncio +async def test_langflow_disconnect_error_is_retryable(task_service): + processor = Mock() + ft = FileTask(file_path="google-drive-file-id", filename="statement.pdf") + ft.status = TaskStatus.FAILED + ft.phase = IngestionPhase.DOCLING + ft.docling_status = DoclingPhaseStatus.PENDING + ft.error = "Server disconnected without sending a response." + task = UploadTask( + task_id="task-langflow-disconnect", + total_files=1, + file_tasks={"google-drive-file-id": ft}, + status=TaskStatus.FAILED, + failed_files=1, + processor=processor, + ) + _store_task(task_service, "user1", task) + + metadata = task_service._infer_failure_metadata(ft) + assert metadata is not None + assert metadata["actionable_by"] == "RETRYABLE" + assert metadata["component"] == "langflow" + + with ( + patch("os.path.isfile", return_value=False), + patch.object( + task_service, + "background_custom_processor", + new_callable=AsyncMock, + ) as mock_bg, + patch("asyncio.create_task", return_value=Mock()), + ): + result = await task_service.retry_failed_files("user1", "task-langflow-disconnect") + + assert result is not None + assert result["retried"] == 1 + assert ft.phase == IngestionPhase.LANGFLOW + mock_bg.assert_called_once() + + +@pytest.mark.asyncio +async def test_retry_connector_id_path_does_not_require_local_file(task_service): + processor = Mock() + ft = FileTask(file_path="remote-file-id-123", filename="connector.pdf") + ft.status = TaskStatus.FAILED + ft.phase = IngestionPhase.DOCLING + ft.docling_status = DoclingPhaseStatus.PENDING + ft.error = "All connection attempts failed" + task = UploadTask( + task_id="task-connector-id", + total_files=1, + file_tasks={"remote-file-id-123": ft}, + status=TaskStatus.FAILED, + failed_files=1, + processor=processor, + ) + _store_task(task_service, "user1", task) + + with ( + patch("os.path.isfile", return_value=False), + patch.object( + task_service, + "background_custom_processor", + new_callable=AsyncMock, + ) as mock_bg, + patch("asyncio.create_task", return_value=Mock()), + ): + result = await task_service.retry_failed_files("user1", "task-connector-id") + + assert result is not None + assert result["retried"] == 1 + assert result["status"] == "accepted" + assert ft.status == TaskStatus.PENDING + mock_bg.assert_called_once()