@@ -838,11 +880,15 @@ function KnowledgeSourcesPage() {
-
diff --git a/frontend/src/components/ui/animated-processing-icon.tsx b/frontend/src/components/ui/animated-processing-icon.tsx
new file mode 100644
index 00000000..eb36b2ab
--- /dev/null
+++ b/frontend/src/components/ui/animated-processing-icon.tsx
@@ -0,0 +1,49 @@
+interface AnimatedProcessingIconProps {
+ className?: string;
+ size?: number;
+}
+
+export const AnimatedProcessingIcon = ({
+ className = "",
+ size = 10,
+}: AnimatedProcessingIconProps) => {
+ const width = Math.round((size * 6) / 10);
+ const height = size;
+
+ return (
+
+ );
+};
diff --git a/frontend/src/components/ui/status-badge.tsx b/frontend/src/components/ui/status-badge.tsx
new file mode 100644
index 00000000..f0f63241
--- /dev/null
+++ b/frontend/src/components/ui/status-badge.tsx
@@ -0,0 +1,58 @@
+import { AnimatedProcessingIcon } from "./animated-processing-icon";
+
+export type Status =
+ | "processing"
+ | "active"
+ | "unavailable"
+ | "hidden"
+ | "sync"
+ | "failed";
+
+interface StatusBadgeProps {
+ status: Status;
+ className?: string;
+}
+
+const statusConfig = {
+ processing: {
+ label: "Processing",
+ className: "text-muted-foreground dark:text-muted-foreground ",
+ },
+ active: {
+ label: "Active",
+ className: "text-emerald-600 dark:text-emerald-400 ",
+ },
+ unavailable: {
+ label: "Unavailable",
+ className: "text-red-600 dark:text-red-400 ",
+ },
+ failed: {
+ label: "Failed",
+ className: "text-red-600 dark:text-red-400 ",
+ },
+ hidden: {
+ label: "Hidden",
+ className: "text-zinc-400 dark:text-zinc-500 ",
+ },
+ sync: {
+ label: "Sync",
+ className: "text-amber-700 dark:text-amber-300 underline",
+ },
+};
+
+export const StatusBadge = ({ status, className }: StatusBadgeProps) => {
+ const config = statusConfig[status];
+
+ return (
+
+ {status === "processing" && (
+
+ )}
+ {config.label}
+
+ );
+};
diff --git a/frontend/src/contexts/task-context.tsx b/frontend/src/contexts/task-context.tsx
index f15e9cc1..4b6c18c2 100644
--- a/frontend/src/contexts/task-context.tsx
+++ b/frontend/src/contexts/task-context.tsx
@@ -35,9 +35,22 @@ export interface Task {
files?: Record
>;
}
+export interface TaskFile {
+ filename: string;
+ mimetype: string;
+ source_url: string;
+ size: number;
+ connector_type: string;
+ status: "active" | "failed" | "processing";
+ task_id: string;
+ created_at: string;
+ updated_at: string;
+}
interface TaskContextType {
tasks: Task[];
+ files: TaskFile[];
addTask: (taskId: string) => void;
+ addFiles: (files: Partial[], taskId: string) => void;
removeTask: (taskId: string) => void;
refreshTasks: () => Promise;
cancelTask: (taskId: string) => Promise;
@@ -51,6 +64,7 @@ const TaskContext = createContext(undefined);
export function TaskProvider({ children }: { children: React.ReactNode }) {
const [tasks, setTasks] = useState([]);
+ const [files, setFiles] = useState([]);
const [isPolling, setIsPolling] = useState(false);
const [isFetching, setIsFetching] = useState(false);
const [isMenuOpen, setIsMenuOpen] = useState(false);
@@ -58,12 +72,32 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
const queryClient = useQueryClient();
- const refetchSearch = () => {
+ const refetchSearch = useCallback(() => {
queryClient.invalidateQueries({
queryKey: ["search"],
exact: false,
});
- };
+ }, [queryClient]);
+
+ const addFiles = useCallback(
+ (newFiles: Partial[], taskId: string) => {
+ const now = new Date().toISOString();
+ const filesToAdd: TaskFile[] = newFiles.map((file) => ({
+ filename: file.filename || "",
+ mimetype: file.mimetype || "",
+ source_url: file.source_url || "",
+ size: file.size || 0,
+ connector_type: file.connector_type || "local",
+ status: "processing",
+ task_id: taskId,
+ created_at: now,
+ updated_at: now,
+ }));
+
+ setFiles((prevFiles) => [...prevFiles, ...filesToAdd]);
+ },
+ [],
+ );
const fetchTasks = useCallback(async () => {
if (!isAuthenticated && !isNoAuthMode) return;
@@ -76,13 +110,87 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
const newTasks = data.tasks || [];
// Update tasks and check for status changes in the same state update
- setTasks(prevTasks => {
+ setTasks((prevTasks) => {
// Check for newly completed tasks to show toasts
if (prevTasks.length > 0) {
newTasks.forEach((newTask: Task) => {
const oldTask = prevTasks.find(
- t => t.task_id === newTask.task_id
+ (t) => t.task_id === newTask.task_id,
);
+
+ // Update or add files from task.files if available
+ if (newTask.files && typeof newTask.files === "object") {
+ const taskFileEntries = Object.entries(newTask.files);
+ const now = new Date().toISOString();
+
+ taskFileEntries.forEach(([filePath, fileInfo]) => {
+ if (typeof fileInfo === "object" && fileInfo) {
+ const fileName = filePath.split("/").pop() || filePath;
+ const fileStatus = fileInfo.status as string;
+
+ // Map backend file status to our TaskFile status
+ let mappedStatus: TaskFile["status"];
+ switch (fileStatus) {
+ case "pending":
+ case "running":
+ mappedStatus = "processing";
+ break;
+ case "completed":
+ mappedStatus = "active";
+ break;
+ case "failed":
+ mappedStatus = "failed";
+ break;
+ default:
+ mappedStatus = "processing";
+ }
+
+ setFiles((prevFiles) => {
+ const existingFileIndex = prevFiles.findIndex(
+ (f) =>
+ f.source_url === filePath &&
+ f.task_id === newTask.task_id,
+ );
+
+ // Detect connector type based on file path or other indicators
+ let connectorType = "local";
+ if (filePath.includes("/") && !filePath.startsWith("/")) {
+ // Likely S3 key format (bucket/path/file.ext)
+ connectorType = "s3";
+ }
+
+ const fileEntry: TaskFile = {
+ filename: fileName,
+ mimetype: "", // We don't have this info from the task
+ source_url: filePath,
+ size: 0, // We don't have this info from the task
+ connector_type: connectorType,
+ status: mappedStatus,
+ task_id: newTask.task_id,
+ created_at:
+ typeof fileInfo.created_at === "string"
+ ? fileInfo.created_at
+ : now,
+ updated_at:
+ typeof fileInfo.updated_at === "string"
+ ? fileInfo.updated_at
+ : now,
+ };
+
+ if (existingFileIndex >= 0) {
+ // Update existing file
+ const updatedFiles = [...prevFiles];
+ updatedFiles[existingFileIndex] = fileEntry;
+ return updatedFiles;
+ } else {
+ // Add new file
+ return [...prevFiles, fileEntry];
+ }
+ });
+ }
+ });
+ }
+
if (
oldTask &&
oldTask.status !== "completed" &&
@@ -99,9 +207,14 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
refetchSearch();
// Dispatch knowledge updated event for all knowledge-related pages
console.log(
- "Task completed successfully, dispatching knowledgeUpdated event"
+ "Task completed successfully, dispatching knowledgeUpdated event",
);
window.dispatchEvent(new CustomEvent("knowledgeUpdated"));
+
+ // Remove files for this completed task from the files list
+ setFiles((prevFiles) =>
+ prevFiles.filter((file) => file.task_id !== newTask.task_id),
+ );
} else if (
oldTask &&
oldTask.status !== "failed" &&
@@ -114,6 +227,8 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
newTask.error || "Unknown error"
}`,
});
+
+ // Files will be updated to failed status by the file parsing logic above
}
});
}
@@ -126,7 +241,7 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
} finally {
setIsFetching(false);
}
- }, [isAuthenticated, isNoAuthMode]); // Removed 'tasks' from dependencies to prevent infinite loop!
+ }, [isAuthenticated, isNoAuthMode, refetchSearch]); // Removed 'tasks' from dependencies to prevent infinite loop!
const addTask = useCallback((taskId: string) => {
// Immediately start aggressive polling for the new task
@@ -140,19 +255,21 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
const data = await response.json();
const newTasks = data.tasks || [];
const foundTask = newTasks.find(
- (task: Task) => task.task_id === taskId
+ (task: Task) => task.task_id === taskId,
);
if (foundTask) {
// Task found! Update the tasks state
- setTasks(prevTasks => {
+ setTasks((prevTasks) => {
// Check if task is already in the list
- const exists = prevTasks.some(t => t.task_id === taskId);
+ const exists = prevTasks.some((t) => t.task_id === taskId);
if (!exists) {
return [...prevTasks, foundTask];
}
// Update existing task
- return prevTasks.map(t => (t.task_id === taskId ? foundTask : t));
+ return prevTasks.map((t) =>
+ t.task_id === taskId ? foundTask : t,
+ );
});
return; // Stop polling, we found it
}
@@ -177,7 +294,7 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
}, [fetchTasks]);
const removeTask = useCallback((taskId: string) => {
- setTasks(prev => prev.filter(task => task.task_id !== taskId));
+ setTasks((prev) => prev.filter((task) => task.task_id !== taskId));
}, []);
const cancelTask = useCallback(
@@ -204,11 +321,11 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
});
}
},
- [fetchTasks]
+ [fetchTasks],
);
const toggleMenu = useCallback(() => {
- setIsMenuOpen(prev => !prev);
+ setIsMenuOpen((prev) => !prev);
}, []);
// Periodic polling for task updates
@@ -231,7 +348,9 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
const value: TaskContextType = {
tasks,
+ files,
addTask,
+ addFiles,
removeTask,
refreshTasks,
cancelTask,
diff --git a/src/agent.py b/src/agent.py
index ab99f597..1eb362bd 100644
--- a/src/agent.py
+++ b/src/agent.py
@@ -106,7 +106,6 @@ async def async_response_stream(
model: str,
extra_headers: dict = None,
previous_response_id: str = None,
- tweaks: dict = None,
log_prefix: str = "response",
):
logger.info("User prompt received", prompt=prompt)
@@ -121,8 +120,6 @@ async def async_response_stream(
}
if previous_response_id is not None:
request_params["previous_response_id"] = previous_response_id
- if tweaks:
- request_params["tweaks"] = tweaks
if "x-api-key" not in client.default_headers:
if hasattr(client, "api_key") and extra_headers is not None:
@@ -199,7 +196,6 @@ async def async_response(
model: str,
extra_headers: dict = None,
previous_response_id: str = None,
- tweaks: dict = None,
log_prefix: str = "response",
):
try:
@@ -214,8 +210,6 @@ async def async_response(
}
if previous_response_id is not None:
request_params["previous_response_id"] = previous_response_id
- if tweaks:
- request_params["tweaks"] = tweaks
if extra_headers:
request_params["extra_headers"] = extra_headers
@@ -249,7 +243,6 @@ async def async_stream(
model: str,
extra_headers: dict = None,
previous_response_id: str = None,
- tweaks: dict = None,
log_prefix: str = "response",
):
async for chunk in async_response_stream(
@@ -258,7 +251,6 @@ async def async_stream(
model,
extra_headers=extra_headers,
previous_response_id=previous_response_id,
- tweaks=tweaks,
log_prefix=log_prefix,
):
yield chunk
@@ -271,7 +263,6 @@ async def async_langflow(
prompt: str,
extra_headers: dict = None,
previous_response_id: str = None,
- tweaks: dict = None,
):
response_text, response_id, response_obj = await async_response(
langflow_client,
@@ -279,7 +270,6 @@ async def async_langflow(
flow_id,
extra_headers=extra_headers,
previous_response_id=previous_response_id,
- tweaks=tweaks,
log_prefix="langflow",
)
return response_text, response_id
@@ -292,7 +282,6 @@ async def async_langflow_stream(
prompt: str,
extra_headers: dict = None,
previous_response_id: str = None,
- tweaks: dict = None,
):
logger.debug("Starting langflow stream", prompt=prompt)
try:
@@ -302,8 +291,7 @@ async def async_langflow_stream(
flow_id,
extra_headers=extra_headers,
previous_response_id=previous_response_id,
- tweaks=tweaks,
- log_prefix="langflow",
+ log_prefix="langflow",
):
logger.debug(
"Yielding chunk from langflow stream",
@@ -463,7 +451,6 @@ async def async_langflow_chat(
user_id: str,
extra_headers: dict = None,
previous_response_id: str = None,
- tweaks: dict = None,
store_conversation: bool = True,
):
logger.debug(
@@ -497,7 +484,6 @@ async def async_langflow_chat(
flow_id,
extra_headers=extra_headers,
previous_response_id=previous_response_id,
- tweaks=tweaks,
log_prefix="langflow",
)
logger.debug(
@@ -576,7 +562,6 @@ async def async_langflow_chat_stream(
user_id: str,
extra_headers: dict = None,
previous_response_id: str = None,
- tweaks: dict = None,
):
logger.debug(
"async_langflow_chat_stream called",
@@ -603,7 +588,6 @@ async def async_langflow_chat_stream(
flow_id,
extra_headers=extra_headers,
previous_response_id=previous_response_id,
- tweaks=tweaks,
log_prefix="langflow",
):
# Extract text content to build full response for history
diff --git a/src/api/models.py b/src/api/models.py
index d79d1b23..fa7269f3 100644
--- a/src/api/models.py
+++ b/src/api/models.py
@@ -17,14 +17,18 @@ async def get_openai_models(request, models_service, session_manager):
try:
config = get_openrag_config()
api_key = config.provider.api_key
- logger.info(f"Retrieved API key from config: {'yes' if api_key else 'no'}")
+ logger.info(
+ f"Retrieved API key from config: {'yes' if api_key else 'no'}"
+ )
except Exception as e:
logger.error(f"Failed to get config: {e}")
if not api_key:
return JSONResponse(
- {"error": "OpenAI API key is required either as query parameter or in configuration"},
- status_code=400
+ {
+ "error": "OpenAI API key is required either as query parameter or in configuration"
+ },
+ status_code=400,
)
models = await models_service.get_openai_models(api_key=api_key)
@@ -32,8 +36,7 @@ async def get_openai_models(request, models_service, session_manager):
except Exception as e:
logger.error(f"Failed to get OpenAI models: {str(e)}")
return JSONResponse(
- {"error": f"Failed to retrieve OpenAI models: {str(e)}"},
- status_code=500
+ {"error": f"Failed to retrieve OpenAI models: {str(e)}"}, status_code=500
)
@@ -44,13 +47,31 @@ async def get_ollama_models(request, models_service, session_manager):
query_params = dict(request.query_params)
endpoint = query_params.get("endpoint")
+ # If no API key provided, try to get it from stored configuration
+ if not endpoint:
+ try:
+ config = get_openrag_config()
+ endpoint = config.provider.endpoint
+ logger.info(
+ f"Retrieved endpoint from config: {'yes' if endpoint else 'no'}"
+ )
+ except Exception as e:
+ logger.error(f"Failed to get config: {e}")
+
+ if not endpoint:
+ return JSONResponse(
+ {
+ "error": "Endpoint is required either as query parameter or in configuration"
+ },
+ status_code=400,
+ )
+
models = await models_service.get_ollama_models(endpoint=endpoint)
return JSONResponse(models)
except Exception as e:
logger.error(f"Failed to get Ollama models: {str(e)}")
return JSONResponse(
- {"error": f"Failed to retrieve Ollama models: {str(e)}"},
- status_code=500
+ {"error": f"Failed to retrieve Ollama models: {str(e)}"}, status_code=500
)
@@ -63,15 +84,65 @@ async def get_ibm_models(request, models_service, session_manager):
api_key = query_params.get("api_key")
project_id = query_params.get("project_id")
+ config = get_openrag_config()
+ # If no API key provided, try to get it from stored configuration
+ if not api_key:
+ try:
+ api_key = config.provider.api_key
+ logger.info(
+ f"Retrieved API key from config: {'yes' if api_key else 'no'}"
+ )
+ except Exception as e:
+ logger.error(f"Failed to get config: {e}")
+
+ if not api_key:
+ return JSONResponse(
+ {
+ "error": "OpenAI API key is required either as query parameter or in configuration"
+ },
+ status_code=400,
+ )
+
+ if not endpoint:
+ try:
+ endpoint = config.provider.endpoint
+ logger.info(
+ f"Retrieved endpoint from config: {'yes' if endpoint else 'no'}"
+ )
+ except Exception as e:
+ logger.error(f"Failed to get config: {e}")
+
+ if not endpoint:
+ return JSONResponse(
+ {
+ "error": "Endpoint is required either as query parameter or in configuration"
+ },
+ status_code=400,
+ )
+
+ if not project_id:
+ try:
+ project_id = config.provider.project_id
+ logger.info(
+ f"Retrieved project ID from config: {'yes' if project_id else 'no'}"
+ )
+ except Exception as e:
+ logger.error(f"Failed to get config: {e}")
+
+ if not project_id:
+ return JSONResponse(
+ {
+ "error": "Project ID is required either as query parameter or in configuration"
+ },
+ status_code=400,
+ )
+
models = await models_service.get_ibm_models(
- endpoint=endpoint,
- api_key=api_key,
- project_id=project_id
+ endpoint=endpoint, api_key=api_key, project_id=project_id
)
return JSONResponse(models)
except Exception as e:
logger.error(f"Failed to get IBM models: {str(e)}")
return JSONResponse(
- {"error": f"Failed to retrieve IBM models: {str(e)}"},
- status_code=500
- )
\ No newline at end of file
+ {"error": f"Failed to retrieve IBM models: {str(e)}"}, status_code=500
+ )
diff --git a/src/api/settings.py b/src/api/settings.py
index c169b263..560eb400 100644
--- a/src/api/settings.py
+++ b/src/api/settings.py
@@ -7,6 +7,7 @@ from config.settings import (
LANGFLOW_CHAT_FLOW_ID,
LANGFLOW_INGEST_FLOW_ID,
LANGFLOW_PUBLIC_URL,
+ DOCLING_COMPONENT_ID,
clients,
get_openrag_config,
config_manager,
@@ -46,22 +47,7 @@ def get_docling_preset_configs():
}
-def get_docling_tweaks(docling_preset: str = None) -> dict:
- """Get Langflow tweaks for docling component based on preset"""
- if not docling_preset:
- # Get current preset from config
- openrag_config = get_openrag_config()
- docling_preset = openrag_config.knowledge.doclingPresets
- preset_configs = get_docling_preset_configs()
-
- if docling_preset not in preset_configs:
- docling_preset = "standard" # fallback
-
- preset_config = preset_configs[docling_preset]
- docling_serve_opts = json.dumps(preset_config)
-
- return {"DoclingRemote-ayRdw": {"docling_serve_opts": docling_serve_opts}}
async def get_settings(request, session_manager):
@@ -234,6 +220,15 @@ async def update_settings(request, session_manager):
current_config.knowledge.doclingPresets = body["doclingPresets"]
config_updated = True
+ # Also update the flow with the new docling preset
+ try:
+ await _update_flow_docling_preset(body["doclingPresets"], preset_configs[body["doclingPresets"]])
+ logger.info(f"Successfully updated docling preset in flow to '{body['doclingPresets']}'")
+ except Exception as e:
+ logger.error(f"Failed to update docling preset in flow: {str(e)}")
+ # Don't fail the entire settings update if flow update fails
+ # The config will still be saved
+
if "chunk_size" in body:
if not isinstance(body["chunk_size"], int) or body["chunk_size"] <= 0:
return JSONResponse(
@@ -527,3 +522,93 @@ async def onboarding(request, flows_service):
{"error": f"Failed to update onboarding settings: {str(e)}"},
status_code=500,
)
+
+
+async def _update_flow_docling_preset(preset: str, preset_config: dict):
+ """Helper function to update docling preset in the ingest flow"""
+ if not LANGFLOW_INGEST_FLOW_ID:
+ raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
+
+ # Get the current flow data from Langflow
+ response = await clients.langflow_request(
+ "GET", f"/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}"
+ )
+
+ if response.status_code != 200:
+ raise Exception(f"Failed to get ingest flow: HTTP {response.status_code} - {response.text}")
+
+ flow_data = response.json()
+
+ # Find the target node in the flow using environment variable
+ nodes = flow_data.get("data", {}).get("nodes", [])
+ target_node = None
+ target_node_index = None
+
+ for i, node in enumerate(nodes):
+ if node.get("id") == DOCLING_COMPONENT_ID:
+ target_node = node
+ target_node_index = i
+ break
+
+ if target_node is None:
+ raise Exception(f"Docling component '{DOCLING_COMPONENT_ID}' not found in ingest flow")
+
+ # Update the docling_serve_opts value directly in the existing node
+ if (target_node.get("data", {}).get("node", {}).get("template", {}).get("docling_serve_opts")):
+ flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"]["docling_serve_opts"]["value"] = preset_config
+ else:
+ raise Exception(f"docling_serve_opts field not found in node '{DOCLING_COMPONENT_ID}'")
+
+ # Update the flow via PATCH request
+ patch_response = await clients.langflow_request(
+ "PATCH", f"/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}", json=flow_data
+ )
+
+ if patch_response.status_code != 200:
+ raise Exception(f"Failed to update ingest flow: HTTP {patch_response.status_code} - {patch_response.text}")
+
+
+async def update_docling_preset(request, session_manager):
+ """Update docling preset in the ingest flow"""
+ try:
+ # Parse request body
+ body = await request.json()
+
+ # Validate preset parameter
+ if "preset" not in body:
+ return JSONResponse(
+ {"error": "preset parameter is required"},
+ status_code=400
+ )
+
+ preset = body["preset"]
+ preset_configs = get_docling_preset_configs()
+
+ if preset not in preset_configs:
+ valid_presets = list(preset_configs.keys())
+ return JSONResponse(
+ {"error": f"Invalid preset '{preset}'. Valid presets: {', '.join(valid_presets)}"},
+ status_code=400
+ )
+
+ # Get the preset configuration
+ preset_config = preset_configs[preset]
+
+ # Use the helper function to update the flow
+ await _update_flow_docling_preset(preset, preset_config)
+
+ logger.info(f"Successfully updated docling preset to '{preset}' in ingest flow")
+
+ return JSONResponse({
+ "message": f"Successfully updated docling preset to '{preset}'",
+ "preset": preset,
+ "preset_config": preset_config
+ })
+
+ except Exception as e:
+ logger.error("Failed to update docling preset", error=str(e))
+ return JSONResponse(
+ {"error": f"Failed to update docling preset: {str(e)}"},
+ status_code=500
+ )
+
diff --git a/src/config/settings.py b/src/config/settings.py
index 6a4afc05..5f9b189d 100644
--- a/src/config/settings.py
+++ b/src/config/settings.py
@@ -544,6 +544,9 @@ OLLAMA_LLM_TEXT_COMPONENT_ID = os.getenv(
"OLLAMA_LLM_TEXT_COMPONENT_ID", "OllamaModel-XDGqZ"
)
+# Docling component ID for ingest flow
+DOCLING_COMPONENT_ID = os.getenv("DOCLING_COMPONENT_ID", "DoclingRemote-78KoX")
+
# Global clients instance
clients = AppClients()
diff --git a/src/main.py b/src/main.py
index c29ddf8b..3546cc91 100644
--- a/src/main.py
+++ b/src/main.py
@@ -971,12 +971,23 @@ async def create_app():
"/onboarding",
require_auth(services["session_manager"])(
partial(
- settings.onboarding,
+ settings.onboarding,
flows_service=services["flows_service"]
)
),
methods=["POST"],
),
+ # Docling preset update endpoint
+ Route(
+ "/settings/docling-preset",
+ require_auth(services["session_manager"])(
+ partial(
+ settings.update_docling_preset,
+ session_manager=services["session_manager"]
+ )
+ ),
+ methods=["PATCH"],
+ ),
Route(
"/nudges",
require_auth(services["session_manager"])(
diff --git a/src/services/chat_service.py b/src/services/chat_service.py
index 4b3c9d26..5ffe30f9 100644
--- a/src/services/chat_service.py
+++ b/src/services/chat_service.py
@@ -2,7 +2,6 @@ import json
from config.settings import NUDGES_FLOW_ID, clients, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID
from agent import async_chat, async_langflow, async_chat_stream
from auth_context import set_auth_context
-from api.settings import get_docling_tweaks
from utils.logging_config import get_logger
logger = get_logger(__name__)
@@ -127,8 +126,6 @@ class ChatService:
"Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY."
)
- # Get docling tweaks based on current configuration
- docling_tweaks = get_docling_tweaks()
if stream:
from agent import async_langflow_chat_stream
@@ -140,7 +137,6 @@ class ChatService:
user_id,
extra_headers=extra_headers,
previous_response_id=previous_response_id,
- tweaks=docling_tweaks,
)
else:
from agent import async_langflow_chat
@@ -152,7 +148,6 @@ class ChatService:
user_id,
extra_headers=extra_headers,
previous_response_id=previous_response_id,
- tweaks=docling_tweaks,
)
response_data = {"response": response_text}
if response_id:
@@ -202,8 +197,6 @@ class ChatService:
from agent import async_langflow_chat
- # Get docling tweaks (might not be used by nudges flow, but keeping consistent)
- docling_tweaks = get_docling_tweaks()
response_text, response_id = await async_langflow_chat(
langflow_client,
@@ -211,7 +204,6 @@ class ChatService:
prompt,
user_id,
extra_headers=extra_headers,
- tweaks=docling_tweaks,
store_conversation=False,
)
response_data = {"response": response_text}
@@ -242,8 +234,6 @@ class ChatService:
raise ValueError(
"Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY."
)
- # Get docling tweaks based on current configuration
- docling_tweaks = get_docling_tweaks()
response_text, response_id = await async_langflow(
langflow_client=langflow_client,
@@ -251,7 +241,6 @@ class ChatService:
prompt=document_prompt,
extra_headers=extra_headers,
previous_response_id=previous_response_id,
- tweaks=docling_tweaks,
)
else: # chat
# Set auth context for chat tools and provide user_id
diff --git a/src/services/task_service.py b/src/services/task_service.py
index c9328b90..de297dff 100644
--- a/src/services/task_service.py
+++ b/src/services/task_service.py
@@ -17,7 +17,9 @@ class TaskService:
def __init__(self, document_service=None, process_pool=None):
self.document_service = document_service
self.process_pool = process_pool
- self.task_store: dict[str, dict[str, UploadTask]] = {} # user_id -> {task_id -> UploadTask}
+ self.task_store: dict[
+ str, dict[str, UploadTask]
+ ] = {} # user_id -> {task_id -> UploadTask}
self.background_tasks = set()
if self.process_pool is None:
@@ -122,18 +124,27 @@ class TaskService:
# Process files with limited concurrency to avoid overwhelming the system
max_workers = get_worker_count()
- semaphore = asyncio.Semaphore(max_workers * 2) # Allow 2x process pool size for async I/O
+ semaphore = asyncio.Semaphore(
+ max_workers * 2
+ ) # Allow 2x process pool size for async I/O
async def process_with_semaphore(file_path: str):
async with semaphore:
- await self.document_service.process_single_file_task(upload_task, file_path)
+ await self.document_service.process_single_file_task(
+ upload_task, file_path
+ )
- tasks = [process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys()]
+ tasks = [
+ process_with_semaphore(file_path)
+ for file_path in upload_task.file_tasks.keys()
+ ]
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
- logger.error("Background upload processor failed", task_id=task_id, error=str(e))
+ logger.error(
+ "Background upload processor failed", task_id=task_id, error=str(e)
+ )
import traceback
traceback.print_exc()
@@ -141,7 +152,9 @@ class TaskService:
self.task_store[user_id][task_id].status = TaskStatus.FAILED
self.task_store[user_id][task_id].updated_at = time.time()
- async def background_custom_processor(self, user_id: str, task_id: str, items: list) -> None:
+ async def background_custom_processor(
+ self, user_id: str, task_id: str, items: list
+ ) -> None:
"""Background task to process items using custom processor"""
try:
upload_task = self.task_store[user_id][task_id]
@@ -163,7 +176,9 @@ class TaskService:
try:
await processor.process_item(upload_task, item, file_task)
except Exception as e:
- logger.error("Failed to process item", item=str(item), error=str(e))
+ logger.error(
+ "Failed to process item", item=str(item), error=str(e)
+ )
import traceback
traceback.print_exc()
@@ -190,7 +205,9 @@ class TaskService:
pass
raise # Re-raise to properly handle cancellation
except Exception as e:
- logger.error("Background custom processor failed", task_id=task_id, error=str(e))
+ logger.error(
+ "Background custom processor failed", task_id=task_id, error=str(e)
+ )
import traceback
traceback.print_exc()
@@ -212,7 +229,10 @@ class TaskService:
upload_task = None
for candidate_user_id in candidate_user_ids:
- if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]:
+ if (
+ candidate_user_id in self.task_store
+ and task_id in self.task_store[candidate_user_id]
+ ):
upload_task = self.task_store[candidate_user_id][task_id]
break
@@ -271,10 +291,23 @@ class TaskService:
if task_id in tasks_by_id:
continue
- # Calculate running and pending counts
+ # Calculate running and pending counts and build file statuses
running_files_count = 0
pending_files_count = 0
- for file_task in upload_task.file_tasks.values():
+ file_statuses = {}
+
+ for file_path, file_task in upload_task.file_tasks.items():
+ if file_task.status.value != "completed":
+ file_statuses[file_path] = {
+ "status": file_task.status.value,
+ "result": file_task.result,
+ "error": file_task.error,
+ "retry_count": file_task.retry_count,
+ "created_at": file_task.created_at,
+ "updated_at": file_task.updated_at,
+ "duration_seconds": file_task.duration_seconds,
+ }
+
if file_task.status.value == "running":
running_files_count += 1
elif file_task.status.value == "pending":
@@ -292,6 +325,7 @@ class TaskService:
"created_at": upload_task.created_at,
"updated_at": upload_task.updated_at,
"duration_seconds": upload_task.duration_seconds,
+ "files": file_statuses,
}
# First, add user-owned tasks; then shared anonymous;
@@ -312,7 +346,10 @@ class TaskService:
store_user_id = None
for candidate_user_id in candidate_user_ids:
- if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]:
+ if (
+ candidate_user_id in self.task_store
+ and task_id in self.task_store[candidate_user_id]
+ ):
store_user_id = candidate_user_id
break
@@ -326,7 +363,10 @@ class TaskService:
return False
# Cancel the background task to stop scheduling new work
- if hasattr(upload_task, "background_task") and not upload_task.background_task.done():
+ if (
+ hasattr(upload_task, "background_task")
+ and not upload_task.background_task.done()
+ ):
upload_task.background_task.cancel()
# Mark task as failed (cancelled)