diff --git a/.github/workflows/build-multiarch.yml b/.github/workflows/build-multiarch.yml index 360d29d8..939a80f1 100644 --- a/.github/workflows/build-multiarch.yml +++ b/.github/workflows/build-multiarch.yml @@ -21,7 +21,7 @@ jobs: tag: phact/openrag-backend platform: linux/arm64 arch: arm64 - runs-on: self-hosted + runs-on: [self-hosted, linux, ARM64, langflow-ai-arm64-2] # frontend - image: frontend @@ -35,7 +35,7 @@ jobs: tag: phact/openrag-frontend platform: linux/arm64 arch: arm64 - runs-on: self-hosted + runs-on: [self-hosted, linux, ARM64, langflow-ai-arm64-2] # opensearch - image: opensearch @@ -49,7 +49,7 @@ jobs: tag: phact/openrag-opensearch platform: linux/arm64 arch: arm64 - runs-on: self-hosted + runs-on: [self-hosted, linux, ARM64, langflow-ai-arm64-2] runs-on: ${{ matrix.runs-on }} diff --git a/frontend/components/knowledge-dropdown.tsx b/frontend/components/knowledge-dropdown.tsx index 31cdea31..19211d5f 100644 --- a/frontend/components/knowledge-dropdown.tsx +++ b/frontend/components/knowledge-dropdown.tsx @@ -1,82 +1,128 @@ -"use client" +"use client"; -import { useState, useEffect, useRef } from "react" -import { ChevronDown, Upload, FolderOpen, Cloud, PlugZap, Plus } from "lucide-react" -import { Button } from "@/components/ui/button" -import { Dialog, DialogContent, DialogDescription, DialogHeader, DialogTitle } from "@/components/ui/dialog" -import { Input } from "@/components/ui/input" -import { Label } from "@/components/ui/label" -import { cn } from "@/lib/utils" -import { useTask } from "@/contexts/task-context" -import { useRouter } from "next/navigation" +import { useQueryClient } from "@tanstack/react-query"; +import { + ChevronDown, + Cloud, + FolderOpen, + PlugZap, + Plus, + Upload, +} from "lucide-react"; +import { useRouter } from "next/navigation"; +import { useEffect, useRef, useState } from "react"; +import { Button } from "@/components/ui/button"; +import { + Dialog, + DialogContent, + DialogDescription, + DialogHeader, + DialogTitle, +} from "@/components/ui/dialog"; +import { Input } from "@/components/ui/input"; +import { Label } from "@/components/ui/label"; +import { useTask } from "@/contexts/task-context"; +import { cn } from "@/lib/utils"; interface KnowledgeDropdownProps { - active?: boolean - variant?: 'navigation' | 'button' + active?: boolean; + variant?: "navigation" | "button"; } -export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeDropdownProps) { - const { addTask } = useTask() - const router = useRouter() - const [isOpen, setIsOpen] = useState(false) - const [showFolderDialog, setShowFolderDialog] = useState(false) - const [showS3Dialog, setShowS3Dialog] = useState(false) - const [awsEnabled, setAwsEnabled] = useState(false) - const [folderPath, setFolderPath] = useState("/app/documents/") - const [bucketUrl, setBucketUrl] = useState("s3://") - const [folderLoading, setFolderLoading] = useState(false) - const [s3Loading, setS3Loading] = useState(false) - const [fileUploading, setFileUploading] = useState(false) - const [cloudConnectors, setCloudConnectors] = useState<{[key: string]: {name: string, available: boolean, connected: boolean, hasToken: boolean}}>({}) - const fileInputRef = useRef(null) - const dropdownRef = useRef(null) +export function KnowledgeDropdown({ + active, + variant = "navigation", +}: KnowledgeDropdownProps) { + const { addTask } = useTask(); + const router = useRouter(); + const [isOpen, setIsOpen] = useState(false); + const [showFolderDialog, setShowFolderDialog] = useState(false); + const [showS3Dialog, setShowS3Dialog] = useState(false); + const [awsEnabled, setAwsEnabled] = useState(false); + const [folderPath, setFolderPath] = useState("/app/documents/"); + const [bucketUrl, setBucketUrl] = useState("s3://"); + const [folderLoading, setFolderLoading] = useState(false); + const [s3Loading, setS3Loading] = useState(false); + const [fileUploading, setFileUploading] = useState(false); + const [cloudConnectors, setCloudConnectors] = useState<{ + [key: string]: { + name: string; + available: boolean; + connected: boolean; + hasToken: boolean; + }; + }>({}); + const fileInputRef = useRef(null); + const dropdownRef = useRef(null); + + const queryClient = useQueryClient(); + + const refetchSearch = () => { + queryClient.invalidateQueries({ queryKey: ["search"] }); + }; // Check AWS availability and cloud connectors on mount useEffect(() => { const checkAvailability = async () => { try { // Check AWS - const awsRes = await fetch("/api/upload_options") + const awsRes = await fetch("/api/upload_options"); if (awsRes.ok) { - const awsData = await awsRes.json() - setAwsEnabled(Boolean(awsData.aws)) + const awsData = await awsRes.json(); + setAwsEnabled(Boolean(awsData.aws)); } // Check cloud connectors - const connectorsRes = await fetch('/api/connectors') + const connectorsRes = await fetch("/api/connectors"); if (connectorsRes.ok) { - const connectorsResult = await connectorsRes.json() - const cloudConnectorTypes = ['google_drive', 'onedrive', 'sharepoint'] - const connectorInfo: {[key: string]: {name: string, available: boolean, connected: boolean, hasToken: boolean}} = {} - + const connectorsResult = await connectorsRes.json(); + const cloudConnectorTypes = [ + "google_drive", + "onedrive", + "sharepoint", + ]; + const connectorInfo: { + [key: string]: { + name: string; + available: boolean; + connected: boolean; + hasToken: boolean; + }; + } = {}; + for (const type of cloudConnectorTypes) { if (connectorsResult.connectors[type]) { connectorInfo[type] = { name: connectorsResult.connectors[type].name, available: connectorsResult.connectors[type].available, connected: false, - hasToken: false - } + hasToken: false, + }; // Check connection status try { - const statusRes = await fetch(`/api/connectors/${type}/status`) + const statusRes = await fetch(`/api/connectors/${type}/status`); if (statusRes.ok) { - const statusData = await statusRes.json() - const connections = statusData.connections || [] - const activeConnection = connections.find((conn: {is_active: boolean, connection_id: string}) => conn.is_active) - const isConnected = activeConnection !== undefined + const statusData = await statusRes.json(); + const connections = statusData.connections || []; + const activeConnection = connections.find( + (conn: { is_active: boolean; connection_id: string }) => + conn.is_active, + ); + const isConnected = activeConnection !== undefined; if (isConnected && activeConnection) { - connectorInfo[type].connected = true - + connectorInfo[type].connected = true; + // Check token availability try { - const tokenRes = await fetch(`/api/connectors/${type}/token?connection_id=${activeConnection.connection_id}`) + const tokenRes = await fetch( + `/api/connectors/${type}/token?connection_id=${activeConnection.connection_id}`, + ); if (tokenRes.ok) { - const tokenData = await tokenRes.json() + const tokenData = await tokenRes.json(); if (tokenData.access_token) { - connectorInfo[type].hasToken = true + connectorInfo[type].hasToken = true; } } } catch { @@ -90,114 +136,134 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD } } - setCloudConnectors(connectorInfo) + setCloudConnectors(connectorInfo); } } catch (err) { - console.error("Failed to check availability", err) + console.error("Failed to check availability", err); } - } - checkAvailability() - }, []) + }; + checkAvailability(); + }, []); // Handle click outside to close dropdown useEffect(() => { const handleClickOutside = (event: MouseEvent) => { - if (dropdownRef.current && !dropdownRef.current.contains(event.target as Node)) { - setIsOpen(false) + if ( + dropdownRef.current && + !dropdownRef.current.contains(event.target as Node) + ) { + setIsOpen(false); } - } + }; if (isOpen) { - document.addEventListener("mousedown", handleClickOutside) - return () => document.removeEventListener("mousedown", handleClickOutside) + document.addEventListener("mousedown", handleClickOutside); + return () => + document.removeEventListener("mousedown", handleClickOutside); } - }, [isOpen]) + }, [isOpen]); const handleFileUpload = () => { - fileInputRef.current?.click() - } + fileInputRef.current?.click(); + }; const handleFileChange = async (e: React.ChangeEvent) => { - const files = e.target.files + const files = e.target.files; if (files && files.length > 0) { // Close dropdown and disable button immediately after file selection - setIsOpen(false) - setFileUploading(true) - + setIsOpen(false); + setFileUploading(true); + // Trigger the same file upload event as the chat page - window.dispatchEvent(new CustomEvent('fileUploadStart', { - detail: { filename: files[0].name } - })) - + window.dispatchEvent( + new CustomEvent("fileUploadStart", { + detail: { filename: files[0].name }, + }), + ); + try { - const formData = new FormData() - formData.append('file', files[0]) - - // Use router upload and ingest endpoint (automatically routes based on configuration) + const formData = new FormData(); + formData.append("file", files[0]); + const uploadIngestRes = await fetch('/api/upload', { - method: 'POST', body: formData, - }) - const uploadIngestJson = await uploadIngestRes.json() + }); + const uploadIngestJson = await uploadIngestRes.json(); if (!uploadIngestRes.ok) { - throw new Error(uploadIngestJson?.error || 'Upload and ingest failed') + throw new Error( + uploadIngestJson?.error || "Upload and ingest failed", + ); } // Extract results from the unified response - const fileId = uploadIngestJson?.upload?.id - const filePath = uploadIngestJson?.upload?.path - const runJson = uploadIngestJson?.ingestion - const deleteResult = uploadIngestJson?.deletion - + const fileId = uploadIngestJson?.upload?.id; + const filePath = uploadIngestJson?.upload?.path; + const runJson = uploadIngestJson?.ingestion; + const deleteResult = uploadIngestJson?.deletion; + if (!fileId || !filePath) { - throw new Error('Upload successful but no file id/path returned') + throw new Error("Upload successful but no file id/path returned"); } // Log deletion status if provided if (deleteResult) { - if (deleteResult.status === 'deleted') { - console.log('File successfully cleaned up from Langflow:', deleteResult.file_id) - } else if (deleteResult.status === 'delete_failed') { - console.warn('Failed to cleanup file from Langflow:', deleteResult.error) + if (deleteResult.status === "deleted") { + console.log( + "File successfully cleaned up from Langflow:", + deleteResult.file_id, + ); + } else if (deleteResult.status === "delete_failed") { + console.warn( + "Failed to cleanup file from Langflow:", + deleteResult.error, + ); } } // Notify UI - window.dispatchEvent(new CustomEvent('fileUploaded', { - detail: { - file: files[0], - result: { - file_id: fileId, - file_path: filePath, - run: runJson, - deletion: deleteResult, - unified: true - } - } - })) + window.dispatchEvent( + new CustomEvent("fileUploaded", { + detail: { + file: files[0], + result: { + file_id: fileId, + file_path: filePath, + run: runJson, + deletion: deleteResult, + unified: true, + }, + }, + }), + ); // Trigger search refresh after successful ingestion - window.dispatchEvent(new CustomEvent('knowledgeUpdated')) + window.dispatchEvent(new CustomEvent("knowledgeUpdated")); } catch (error) { - window.dispatchEvent(new CustomEvent('fileUploadError', { - detail: { filename: files[0].name, error: error instanceof Error ? error.message : 'Upload failed' } - })) + window.dispatchEvent( + new CustomEvent("fileUploadError", { + detail: { + filename: files[0].name, + error: error instanceof Error ? error.message : "Upload failed", + }, + }), + ); } finally { - window.dispatchEvent(new CustomEvent('fileUploadComplete')) - setFileUploading(false) + window.dispatchEvent(new CustomEvent("fileUploadComplete")); + setFileUploading(false); + refetchSearch(); } } - + // Reset file input if (fileInputRef.current) { - fileInputRef.current.value = '' + fileInputRef.current.value = ""; } - } + }; const handleFolderUpload = async () => { - if (!folderPath.trim()) return + if (!folderPath.trim()) return; - setFolderLoading(true) - setShowFolderDialog(false) + setFolderLoading(true); + setShowFolderDialog(false); try { const response = await fetch("/api/upload_path", { @@ -206,40 +272,40 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD "Content-Type": "application/json", }, body: JSON.stringify({ path: folderPath }), - }) + }); + + const result = await response.json(); - const result = await response.json() - if (response.status === 201) { - const taskId = result.task_id || result.id - + const taskId = result.task_id || result.id; + if (!taskId) { - throw new Error("No task ID received from server") + throw new Error("No task ID received from server"); } - - addTask(taskId) - setFolderPath("") + + addTask(taskId); + setFolderPath(""); // Trigger search refresh after successful folder processing starts - window.dispatchEvent(new CustomEvent('knowledgeUpdated')) - + window.dispatchEvent(new CustomEvent("knowledgeUpdated")); } else if (response.ok) { - setFolderPath("") - window.dispatchEvent(new CustomEvent('knowledgeUpdated')) + setFolderPath(""); + window.dispatchEvent(new CustomEvent("knowledgeUpdated")); } else { - console.error("Folder upload failed:", result.error) + console.error("Folder upload failed:", result.error); } } catch (error) { - console.error("Folder upload error:", error) + console.error("Folder upload error:", error); } finally { - setFolderLoading(false) + setFolderLoading(false); + refetchSearch(); } - } + }; const handleS3Upload = async () => { - if (!bucketUrl.trim()) return + if (!bucketUrl.trim()) return; - setS3Loading(true) - setShowS3Dialog(false) + setS3Loading(true); + setShowS3Dialog(false); try { const response = await fetch("/api/upload_bucket", { @@ -248,30 +314,31 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD "Content-Type": "application/json", }, body: JSON.stringify({ s3_url: bucketUrl }), - }) + }); - const result = await response.json() + const result = await response.json(); if (response.status === 201) { - const taskId = result.task_id || result.id + const taskId = result.task_id || result.id; if (!taskId) { - throw new Error("No task ID received from server") + throw new Error("No task ID received from server"); } - addTask(taskId) - setBucketUrl("s3://") + addTask(taskId); + setBucketUrl("s3://"); // Trigger search refresh after successful S3 processing starts - window.dispatchEvent(new CustomEvent('knowledgeUpdated')) + window.dispatchEvent(new CustomEvent("knowledgeUpdated")); } else { - console.error("S3 upload failed:", result.error) + console.error("S3 upload failed:", result.error); } } catch (error) { - console.error("S3 upload error:", error) + console.error("S3 upload error:", error); } finally { - setS3Loading(false) + setS3Loading(false); + refetchSearch(); } - } + }; const cloudConnectorItems = Object.entries(cloudConnectors) .filter(([, info]) => info.available) @@ -279,72 +346,99 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD label: info.name, icon: PlugZap, onClick: () => { - setIsOpen(false) + setIsOpen(false); if (info.connected && info.hasToken) { - router.push(`/upload/${type}`) + router.push(`/upload/${type}`); } else { - router.push('/settings') + router.push("/settings"); } }, disabled: !info.connected || !info.hasToken, - tooltip: !info.connected ? `Connect ${info.name} in Settings first` : - !info.hasToken ? `Reconnect ${info.name} - access token required` : - undefined - })) + tooltip: !info.connected + ? `Connect ${info.name} in Settings first` + : !info.hasToken + ? `Reconnect ${info.name} - access token required` + : undefined, + })); const menuItems = [ { label: "Add File", icon: Upload, - onClick: handleFileUpload + onClick: handleFileUpload, }, { - label: "Process Folder", + label: "Process Folder", icon: FolderOpen, onClick: () => { - setIsOpen(false) - setShowFolderDialog(true) - } + setIsOpen(false); + setShowFolderDialog(true); + }, }, - ...(awsEnabled ? [{ - label: "Process S3 Bucket", - icon: Cloud, - onClick: () => { - setIsOpen(false) - setShowS3Dialog(true) - } - }] : []), - ...cloudConnectorItems - ] + ...(awsEnabled + ? [ + { + label: "Process S3 Bucket", + icon: Cloud, + onClick: () => { + setIsOpen(false); + setShowS3Dialog(true); + }, + }, + ] + : []), + ...cloudConnectorItems, + ]; return ( <>
@@ -356,11 +450,13 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD + + Chunks from {selectedFile} + +
+ {fileResults + .filter((file) => file.filename === selectedFile) + .flatMap((file) => file.chunks) + .map((chunk, index) => ( +
- ← Back to files - - - Chunks from {selectedFile} - -
- {chunkResults - .filter(chunk => chunk.filename === selectedFile) - .map((chunk, index) => ( -
- {chunk.filename} + + {chunk.filename} +
{chunk.score.toFixed(2)} @@ -389,67 +184,87 @@ function SearchPage() {

))} - - ) : ( - // Show files table -
- - - - - - - - - - - - - {fileResults.map((file, index) => ( - setSelectedFile(file.filename)} - > - - - - - + + + + + + + ))} + +
SourceTypeSizeMatching chunksAverage scoreOwner
-
- {getSourceIcon(file.connector_type)} - - {file.filename} - -
-
- {file.mimetype} - - {file.size ? `${Math.round(file.size / 1024)} KB` : '—'} - - {file.chunkCount} - - - {file.avgScore.toFixed(2)} + + ) : ( + // Show files table +
+ + + + + + + + + + + + + {fileResults.map((file) => ( + setSelectedFile(file.filename)} + > + - - - ))} - -
+ Source + + Type + + Size + + Matching chunks + + Average score + + Owner +
+
+ {getSourceIcon(file.connector_type)} + + {file.filename} -
- {file.owner_name || file.owner || '—'} -
-
- )} + +
+ {file.mimetype} + + {file.size + ? `${Math.round(file.size / 1024)} KB` + : "—"} + + {file.chunkCount} + + + {file.avgScore.toFixed(2)} + + + {file.owner_name || file.owner || "—"} +
+
+ )}
- + )} - ) + ); } export default function ProtectedSearchPage() { @@ -457,5 +272,5 @@ export default function ProtectedSearchPage() { - ) + ); } diff --git a/frontend/src/contexts/knowledge-filter-context.tsx b/frontend/src/contexts/knowledge-filter-context.tsx index 66e8d44b..c08cb543 100644 --- a/frontend/src/contexts/knowledge-filter-context.tsx +++ b/frontend/src/contexts/knowledge-filter-context.tsx @@ -1,95 +1,107 @@ -"use client" +"use client"; -import React, { createContext, useContext, useState, ReactNode } from 'react' +import React, { + createContext, + type ReactNode, + useContext, + useState, +} from "react"; interface KnowledgeFilter { - id: string - name: string - description: string - query_data: string - owner: string - created_at: string - updated_at: string + id: string; + name: string; + description: string; + query_data: string; + owner: string; + created_at: string; + updated_at: string; } -interface ParsedQueryData { - query: string +export interface ParsedQueryData { + query: string; filters: { - data_sources: string[] - document_types: string[] - owners: string[] - connector_types: string[] - } - limit: number - scoreThreshold: number + data_sources: string[]; + document_types: string[]; + owners: string[]; + connector_types: string[]; + }; + limit: number; + scoreThreshold: number; } interface KnowledgeFilterContextType { - selectedFilter: KnowledgeFilter | null - parsedFilterData: ParsedQueryData | null - setSelectedFilter: (filter: KnowledgeFilter | null) => void - clearFilter: () => void - isPanelOpen: boolean - openPanel: () => void - closePanel: () => void - closePanelOnly: () => void + selectedFilter: KnowledgeFilter | null; + parsedFilterData: ParsedQueryData | null; + setSelectedFilter: (filter: KnowledgeFilter | null) => void; + clearFilter: () => void; + isPanelOpen: boolean; + openPanel: () => void; + closePanel: () => void; + closePanelOnly: () => void; } -const KnowledgeFilterContext = createContext(undefined) +const KnowledgeFilterContext = createContext< + KnowledgeFilterContextType | undefined +>(undefined); export function useKnowledgeFilter() { - const context = useContext(KnowledgeFilterContext) + const context = useContext(KnowledgeFilterContext); if (context === undefined) { - throw new Error('useKnowledgeFilter must be used within a KnowledgeFilterProvider') + throw new Error( + "useKnowledgeFilter must be used within a KnowledgeFilterProvider", + ); } - return context + return context; } interface KnowledgeFilterProviderProps { - children: ReactNode + children: ReactNode; } -export function KnowledgeFilterProvider({ children }: KnowledgeFilterProviderProps) { - const [selectedFilter, setSelectedFilterState] = useState(null) - const [parsedFilterData, setParsedFilterData] = useState(null) - const [isPanelOpen, setIsPanelOpen] = useState(false) +export function KnowledgeFilterProvider({ + children, +}: KnowledgeFilterProviderProps) { + const [selectedFilter, setSelectedFilterState] = + useState(null); + const [parsedFilterData, setParsedFilterData] = + useState(null); + const [isPanelOpen, setIsPanelOpen] = useState(false); const setSelectedFilter = (filter: KnowledgeFilter | null) => { - setSelectedFilterState(filter) - + setSelectedFilterState(filter); + if (filter) { try { - const parsed = JSON.parse(filter.query_data) as ParsedQueryData - setParsedFilterData(parsed) - + const parsed = JSON.parse(filter.query_data) as ParsedQueryData; + setParsedFilterData(parsed); + // Auto-open panel when filter is selected - setIsPanelOpen(true) + setIsPanelOpen(true); } catch (error) { - console.error('Error parsing filter data:', error) - setParsedFilterData(null) + console.error("Error parsing filter data:", error); + setParsedFilterData(null); } } else { - setParsedFilterData(null) - setIsPanelOpen(false) + setParsedFilterData(null); + setIsPanelOpen(false); } - } + }; const clearFilter = () => { - setSelectedFilter(null) - } + setSelectedFilter(null); + }; const openPanel = () => { - setIsPanelOpen(true) - } + setIsPanelOpen(true); + }; const closePanel = () => { - setSelectedFilter(null) // This will also close the panel - } + setSelectedFilter(null); // This will also close the panel + }; const closePanelOnly = () => { - setIsPanelOpen(false) // Close panel but keep filter selected - } - + setIsPanelOpen(false); // Close panel but keep filter selected + }; const value: KnowledgeFilterContextType = { selectedFilter, @@ -100,11 +112,11 @@ export function KnowledgeFilterProvider({ children }: KnowledgeFilterProviderPro openPanel, closePanel, closePanelOnly, - } + }; return ( {children} - ) -} \ No newline at end of file + ); +} diff --git a/frontend/src/contexts/task-context.tsx b/frontend/src/contexts/task-context.tsx index c132f39b..f84b0f95 100644 --- a/frontend/src/contexts/task-context.tsx +++ b/frontend/src/contexts/task-context.tsx @@ -1,62 +1,88 @@ -"use client" +"use client"; -import React, { createContext, useContext, useState, useEffect, useCallback } from 'react' -import { toast } from 'sonner' -import { useAuth } from '@/contexts/auth-context' +import { useQueryClient } from "@tanstack/react-query"; +import type React from "react"; +import { + createContext, + useCallback, + useContext, + useEffect, + useState, +} from "react"; +import { toast } from "sonner"; +import { useAuth } from "@/contexts/auth-context"; export interface Task { - task_id: string - status: 'pending' | 'running' | 'processing' | 'completed' | 'failed' | 'error' - total_files?: number - processed_files?: number - successful_files?: number - failed_files?: number - created_at: string - updated_at: string - duration_seconds?: number - result?: Record - error?: string - files?: Record> + task_id: string; + status: + | "pending" + | "running" + | "processing" + | "completed" + | "failed" + | "error"; + total_files?: number; + processed_files?: number; + successful_files?: number; + failed_files?: number; + created_at: string; + updated_at: string; + duration_seconds?: number; + result?: Record; + error?: string; + files?: Record>; } interface TaskContextType { - tasks: Task[] - addTask: (taskId: string) => void - removeTask: (taskId: string) => void - refreshTasks: () => Promise - cancelTask: (taskId: string) => Promise - isPolling: boolean - isFetching: boolean - isMenuOpen: boolean - toggleMenu: () => void + tasks: Task[]; + addTask: (taskId: string) => void; + removeTask: (taskId: string) => void; + refreshTasks: () => Promise; + cancelTask: (taskId: string) => Promise; + isPolling: boolean; + isFetching: boolean; + isMenuOpen: boolean; + toggleMenu: () => void; } -const TaskContext = createContext(undefined) +const TaskContext = createContext(undefined); export function TaskProvider({ children }: { children: React.ReactNode }) { - const [tasks, setTasks] = useState([]) - const [isPolling, setIsPolling] = useState(false) - const [isFetching, setIsFetching] = useState(false) - const [isMenuOpen, setIsMenuOpen] = useState(false) - const { isAuthenticated, isNoAuthMode } = useAuth() + const [tasks, setTasks] = useState([]); + const [isPolling, setIsPolling] = useState(false); + const [isFetching, setIsFetching] = useState(false); + const [isMenuOpen, setIsMenuOpen] = useState(false); + const { isAuthenticated, isNoAuthMode } = useAuth(); + + const queryClient = useQueryClient(); + + const refetchSearch = () => { + queryClient.invalidateQueries({ queryKey: ["search"] }); + }; const fetchTasks = useCallback(async () => { - if (!isAuthenticated && !isNoAuthMode) return + if (!isAuthenticated && !isNoAuthMode) return; - setIsFetching(true) + setIsFetching(true); try { - const response = await fetch('/api/tasks') + const response = await fetch("/api/tasks"); if (response.ok) { - const data = await response.json() - const newTasks = data.tasks || [] - + const data = await response.json(); + const newTasks = data.tasks || []; + // Update tasks and check for status changes in the same state update - setTasks(prevTasks => { + setTasks((prevTasks) => { // Check for newly completed tasks to show toasts if (prevTasks.length > 0) { newTasks.forEach((newTask: Task) => { - const oldTask = prevTasks.find(t => t.task_id === newTask.task_id) - if (oldTask && oldTask.status !== 'completed' && newTask.status === 'completed') { + const oldTask = prevTasks.find( + (t) => t.task_id === newTask.task_id, + ); + if ( + oldTask && + oldTask.status !== "completed" && + newTask.status === "completed" + ) { // Task just completed - show success toast toast.success("Task completed successfully!", { description: `Task ${newTask.task_id} has finished processing.`, @@ -64,121 +90,136 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { label: "View", onClick: () => console.log("View task", newTask.task_id), }, - }) - } else if (oldTask && oldTask.status !== 'failed' && oldTask.status !== 'error' && (newTask.status === 'failed' || newTask.status === 'error')) { + }); + refetchSearch(); + } else if ( + oldTask && + oldTask.status !== "failed" && + oldTask.status !== "error" && + (newTask.status === "failed" || newTask.status === "error") + ) { // Task just failed - show error toast toast.error("Task failed", { - description: `Task ${newTask.task_id} failed: ${newTask.error || 'Unknown error'}`, - }) + description: `Task ${newTask.task_id} failed: ${ + newTask.error || "Unknown error" + }`, + }); } - }) + }); } - - return newTasks - }) + + return newTasks; + }); } } catch (error) { - console.error('Failed to fetch tasks:', error) + console.error("Failed to fetch tasks:", error); } finally { - setIsFetching(false) + setIsFetching(false); } - }, [isAuthenticated, isNoAuthMode]) // Removed 'tasks' from dependencies to prevent infinite loop! + }, [isAuthenticated, isNoAuthMode]); // Removed 'tasks' from dependencies to prevent infinite loop! const addTask = useCallback((taskId: string) => { // Immediately start aggressive polling for the new task - let pollAttempts = 0 - const maxPollAttempts = 30 // Poll for up to 30 seconds - + let pollAttempts = 0; + const maxPollAttempts = 30; // Poll for up to 30 seconds + const aggressivePoll = async () => { try { - const response = await fetch('/api/tasks') + const response = await fetch("/api/tasks"); if (response.ok) { - const data = await response.json() - const newTasks = data.tasks || [] - const foundTask = newTasks.find((task: Task) => task.task_id === taskId) - + const data = await response.json(); + const newTasks = data.tasks || []; + const foundTask = newTasks.find( + (task: Task) => task.task_id === taskId, + ); + if (foundTask) { // Task found! Update the tasks state - setTasks(prevTasks => { + setTasks((prevTasks) => { // Check if task is already in the list - const exists = prevTasks.some(t => t.task_id === taskId) + const exists = prevTasks.some((t) => t.task_id === taskId); if (!exists) { - return [...prevTasks, foundTask] + return [...prevTasks, foundTask]; } // Update existing task - return prevTasks.map(t => t.task_id === taskId ? foundTask : t) - }) - return // Stop polling, we found it + return prevTasks.map((t) => + t.task_id === taskId ? foundTask : t, + ); + }); + return; // Stop polling, we found it } } } catch (error) { - console.error('Aggressive polling failed:', error) + console.error("Aggressive polling failed:", error); } - - pollAttempts++ + + pollAttempts++; if (pollAttempts < maxPollAttempts) { // Continue polling every 1 second for new tasks - setTimeout(aggressivePoll, 1000) + setTimeout(aggressivePoll, 1000); } - } - + }; + // Start aggressive polling after a short delay to allow backend to process - setTimeout(aggressivePoll, 500) - }, []) + setTimeout(aggressivePoll, 500); + }, []); const refreshTasks = useCallback(async () => { - await fetchTasks() - }, [fetchTasks]) + await fetchTasks(); + }, [fetchTasks]); const removeTask = useCallback((taskId: string) => { - setTasks(prev => prev.filter(task => task.task_id !== taskId)) - }, []) + setTasks((prev) => prev.filter((task) => task.task_id !== taskId)); + }, []); - const cancelTask = useCallback(async (taskId: string) => { - try { - const response = await fetch(`/api/tasks/${taskId}/cancel`, { - method: 'POST', - }) - - if (response.ok) { - // Immediately refresh tasks to show the updated status - await fetchTasks() - toast.success("Task cancelled", { - description: `Task ${taskId.substring(0, 8)}... has been cancelled` - }) - } else { - const errorData = await response.json().catch(() => ({})) - throw new Error(errorData.error || 'Failed to cancel task') + const cancelTask = useCallback( + async (taskId: string) => { + try { + const response = await fetch(`/api/tasks/${taskId}/cancel`, { + method: "POST", + }); + + if (response.ok) { + // Immediately refresh tasks to show the updated status + await fetchTasks(); + toast.success("Task cancelled", { + description: `Task ${taskId.substring(0, 8)}... has been cancelled`, + }); + } else { + const errorData = await response.json().catch(() => ({})); + throw new Error(errorData.error || "Failed to cancel task"); + } + } catch (error) { + console.error("Failed to cancel task:", error); + toast.error("Failed to cancel task", { + description: error instanceof Error ? error.message : "Unknown error", + }); } - } catch (error) { - console.error('Failed to cancel task:', error) - toast.error("Failed to cancel task", { - description: error instanceof Error ? error.message : 'Unknown error' - }) - } - }, [fetchTasks]) + }, + [fetchTasks], + ); const toggleMenu = useCallback(() => { - setIsMenuOpen(prev => !prev) - }, []) + setIsMenuOpen((prev) => !prev); + }, []); // Periodic polling for task updates useEffect(() => { - if (!isAuthenticated && !isNoAuthMode) return + if (!isAuthenticated && !isNoAuthMode) return; + + setIsPolling(true); - setIsPolling(true) - // Initial fetch - fetchTasks() - + fetchTasks(); + // Set up polling interval - every 3 seconds (more responsive for active tasks) - const interval = setInterval(fetchTasks, 3000) - + const interval = setInterval(fetchTasks, 3000); + return () => { - clearInterval(interval) - setIsPolling(false) - } - }, [isAuthenticated, isNoAuthMode, fetchTasks]) + clearInterval(interval); + setIsPolling(false); + }; + }, [isAuthenticated, isNoAuthMode, fetchTasks]); const value: TaskContextType = { tasks, @@ -190,19 +231,15 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { isFetching, isMenuOpen, toggleMenu, - } + }; - return ( - - {children} - - ) + return {children}; } export function useTask() { - const context = useContext(TaskContext) + const context = useContext(TaskContext); if (context === undefined) { - throw new Error('useTask must be used within a TaskProvider') + throw new Error("useTask must be used within a TaskProvider"); } - return context -} \ No newline at end of file + return context; +} diff --git a/src/agent.py b/src/agent.py index 749eb299..5eba9762 100644 --- a/src/agent.py +++ b/src/agent.py @@ -8,6 +8,7 @@ from services.conversation_persistence_service import conversation_persistence # In-memory storage for active conversation threads (preserves function calls) active_conversations = {} + def get_user_conversations(user_id: str): """Get conversation metadata for a user from persistent storage""" return conversation_persistence.get_user_conversations(user_id) @@ -23,7 +24,9 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None): # If we have a previous_response_id, try to get the existing conversation if previous_response_id and previous_response_id in active_conversations[user_id]: - logger.debug(f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}") + logger.debug( + f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}" + ) return active_conversations[user_id][previous_response_id] # Create new conversation thread @@ -48,7 +51,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state if user_id not in active_conversations: active_conversations[user_id] = {} active_conversations[user_id][response_id] = conversation_state - + # 2. Store only essential metadata to disk (simplified JSON) messages = conversation_state.get("messages", []) first_user_msg = next((msg for msg in messages if msg.get("role") == "user"), None) @@ -56,7 +59,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state if first_user_msg: content = first_user_msg.get("content", "") title = content[:50] + "..." if len(content) > 50 else content - + metadata_only = { "response_id": response_id, "title": title, @@ -64,11 +67,15 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state "created_at": conversation_state.get("created_at"), "last_activity": conversation_state.get("last_activity"), "previous_response_id": conversation_state.get("previous_response_id"), - "total_messages": len([msg for msg in messages if msg.get("role") in ["user", "assistant"]]), + "total_messages": len( + [msg for msg in messages if msg.get("role") in ["user", "assistant"]] + ), # Don't store actual messages - Langflow has them } - - conversation_persistence.store_conversation_thread(user_id, response_id, metadata_only) + + conversation_persistence.store_conversation_thread( + user_id, response_id, metadata_only + ) # Legacy function for backward compatibility @@ -76,10 +83,12 @@ def get_user_conversation(user_id: str): """Get the most recent conversation for a user (for backward compatibility)""" # Check in-memory conversations first (with function calls) if user_id in active_conversations and active_conversations[user_id]: - latest_response_id = max(active_conversations[user_id].keys(), - key=lambda k: active_conversations[user_id][k]["last_activity"]) + latest_response_id = max( + active_conversations[user_id].keys(), + key=lambda k: active_conversations[user_id][k]["last_activity"], + ) return active_conversations[user_id][latest_response_id] - + # Fallback to metadata-only conversations conversations = get_user_conversations(user_id) if not conversations: @@ -342,7 +351,9 @@ async def async_chat( "content": response_text, "response_id": response_id, "timestamp": datetime.now(), - "response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls + "response_data": response_obj.model_dump() + if hasattr(response_obj, "model_dump") + else str(response_obj), # Store complete response for function calls } conversation_state["messages"].append(assistant_message) logger.debug( @@ -428,7 +439,7 @@ async def async_chat_stream( conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) logger.debug( - "Stored conversation thread", user_id=user_id, response_id=response_id + f"Stored conversation thread for user {user_id} with response_id: {response_id}" ) @@ -443,11 +454,8 @@ async def async_langflow_chat( store_conversation: bool = True, ): logger.debug( - "async_langflow_chat called", - user_id=user_id, - previous_response_id=previous_response_id, ) @@ -496,7 +504,9 @@ async def async_langflow_chat( "content": response_text, "response_id": response_id, "timestamp": datetime.now(), - "response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls + "response_data": response_obj.model_dump() + if hasattr(response_obj, "model_dump") + else str(response_obj), # Store complete response for function calls } conversation_state["messages"].append(assistant_message) logger.debug( @@ -511,17 +521,18 @@ async def async_langflow_chat( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - + # Claim session ownership for this user try: from services.session_ownership_service import session_ownership_service - session_ownership_service.claim_session(user_id, response_id) - print(f"[DEBUG] Claimed session {response_id} for user {user_id}") - except Exception as e: - print(f"[WARNING] Failed to claim session ownership: {e}") - print( - f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" + session_ownership_service.claim_session(user_id, response_id) + logger.debug(f"Claimed session {response_id} for user {user_id}") + except Exception as e: + logger.warning(f"Failed to claim session ownership: {e}") + + logger.debug( + f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) logger.debug( "Stored langflow conversation thread", @@ -570,7 +581,7 @@ async def async_langflow_chat_stream( full_response = "" response_id = None collected_chunks = [] # Store all chunks for function call data - + async for chunk in async_stream( langflow_client, prompt, @@ -585,7 +596,7 @@ async def async_langflow_chat_stream( chunk_data = json.loads(chunk.decode("utf-8")) collected_chunks.append(chunk_data) # Collect all chunk data - + if "delta" in chunk_data and "content" in chunk_data["delta"]: full_response += chunk_data["delta"]["content"] # Extract response_id from chunk @@ -612,20 +623,19 @@ async def async_langflow_chat_stream( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - + # Claim session ownership for this user try: from services.session_ownership_service import session_ownership_service + session_ownership_service.claim_session(user_id, response_id) - print(f"[DEBUG] Claimed session {response_id} for user {user_id}") + logger.debug(f"Claimed session {response_id} for user {user_id}") except Exception as e: - print(f"[WARNING] Failed to claim session ownership: {e}") - - print( - f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" + logger.warning(f"Failed to claim session ownership: {e}") + + logger.debug( + f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) logger.debug( - "Stored langflow conversation thread", - user_id=user_id, - response_id=response_id, + f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) diff --git a/src/api/settings.py b/src/api/settings.py index b5ce0b90..0f49f85e 100644 --- a/src/api/settings.py +++ b/src/api/settings.py @@ -1,4 +1,5 @@ from starlette.responses import JSONResponse +from utils.logging_config import get_logger from config.settings import ( LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, @@ -7,6 +8,9 @@ from config.settings import ( clients, ) +logger = get_logger(__name__) + + async def get_settings(request, session_manager): """Get application settings""" @@ -91,7 +95,7 @@ async def get_settings(request, session_manager): settings["ingestion_defaults"] = ingestion_defaults except Exception as e: - print(f"[WARNING] Failed to fetch ingestion flow defaults: {e}") + logger.warning(f"Failed to fetch ingestion flow defaults: {e}") # Continue without ingestion defaults return JSONResponse(settings) diff --git a/src/connectors/google_drive/connector.py b/src/connectors/google_drive/connector.py index 71eb24e0..0aa4234a 100644 --- a/src/connectors/google_drive/connector.py +++ b/src/connectors/google_drive/connector.py @@ -64,7 +64,6 @@ class GoogleDriveConnector(BaseConnector): Integration points: - `BaseConnector` is your project’s base class; minimum methods used here: * self.emit(doc: ConnectorDocument) -> None (or adapt to your ingestion pipeline) - * self.log/info/warn/error (optional) - Adjust paths, logging, and error handling to match your project style. """ @@ -81,8 +80,6 @@ class GoogleDriveConnector(BaseConnector): _FILE_ID_ALIASES = ("file_ids", "selected_file_ids", "selected_files") _FOLDER_ID_ALIASES = ("folder_ids", "selected_folder_ids", "selected_folders") - def log(self, message: str) -> None: - print(message) def emit(self, doc: ConnectorDocument) -> None: """ @@ -91,7 +88,7 @@ class GoogleDriveConnector(BaseConnector): """ # If BaseConnector has an emit method, call super().emit(doc) # Otherwise, implement your custom logic here. - print(f"Emitting document: {doc.id} ({doc.filename})") + logger.debug(f"Emitting document: {doc.id} ({doc.filename})") def __init__(self, config: Dict[str, Any]) -> None: # Read from config OR env (backend env, not NEXT_PUBLIC_*): @@ -433,7 +430,7 @@ class GoogleDriveConnector(BaseConnector): # If still not authenticated, bail (caller should kick off OAuth init) if not await self.oauth.is_authenticated(): - self.log("authenticate: no valid credentials; run OAuth init/callback first.") + logger.debug("authenticate: no valid credentials; run OAuth init/callback first.") return False # Build Drive service from OAuth helper @@ -482,7 +479,7 @@ class GoogleDriveConnector(BaseConnector): except Exception as e: # Optionally log error with your base class logger try: - self.log(f"GoogleDriveConnector.list_files failed: {e}") + logger.error(f"GoogleDriveConnector.list_files failed: {e}") except Exception: pass return {"files": [], "next_page_token": None} @@ -500,7 +497,7 @@ class GoogleDriveConnector(BaseConnector): except Exception as e: # Use your base class logger if available try: - self.log(f"Download failed for {file_id}: {e}") + logger.error(f"Download failed for {file_id}: {e}") except Exception: pass raise @@ -567,7 +564,7 @@ class GoogleDriveConnector(BaseConnector): except Exception as e: # Optional: use your base logger try: - self.log(f"Failed to get start page token: {e}") + logger.error(f"Failed to get start page token: {e}") except Exception: pass raise @@ -634,7 +631,7 @@ class GoogleDriveConnector(BaseConnector): ok = await self.authenticate() if not ok: try: - self.log("cleanup_subscription: not authenticated") + logger.error("cleanup_subscription: not authenticated") except Exception: pass return False @@ -662,7 +659,7 @@ class GoogleDriveConnector(BaseConnector): if not resource_id: try: - self.log( + logger.error( f"cleanup_subscription: missing resource_id for channel {subscription_id}. " f"Persist (channel_id, resource_id) when creating the subscription." ) @@ -684,7 +681,7 @@ class GoogleDriveConnector(BaseConnector): except Exception as e: try: - self.log(f"cleanup_subscription failed for {subscription_id}: {e}") + logger.error(f"cleanup_subscription failed for {subscription_id}: {e}") except Exception: pass return False @@ -708,7 +705,7 @@ class GoogleDriveConnector(BaseConnector): ok = await self.authenticate() if not ok: try: - self.log("handle_webhook: not authenticated") + logger.error("handle_webhook: not authenticated") except Exception: pass return affected @@ -728,7 +725,7 @@ class GoogleDriveConnector(BaseConnector): except Exception as e: selected_ids = set() try: - self.log(f"handle_webhook: scope build failed, proceeding unfiltered: {e}") + logger.error(f"handle_webhook: scope build failed, proceeding unfiltered: {e}") except Exception: pass @@ -797,7 +794,7 @@ class GoogleDriveConnector(BaseConnector): except Exception as e: try: - self.log(f"handle_webhook failed: {e}") + logger.error(f"handle_webhook failed: {e}") except Exception: pass return [] @@ -814,7 +811,7 @@ class GoogleDriveConnector(BaseConnector): blob = self._download_file_bytes(meta) except HttpError as e: # Skip/record failures - self.log(f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}") + logger.error(f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}") continue from datetime import datetime diff --git a/src/services/chat_service.py b/src/services/chat_service.py index 83e8fc4f..51da4b31 100644 --- a/src/services/chat_service.py +++ b/src/services/chat_service.py @@ -391,7 +391,7 @@ class ChatService: local_metadata[response_id] = conversation_metadata # 2. Get actual conversations from Langflow database (source of truth for messages) - print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}") + logger.debug(f"Attempting to fetch Langflow history for user: {user_id}") langflow_history = ( await langflow_history_service.get_user_conversation_history( user_id, flow_id=LANGFLOW_CHAT_FLOW_ID @@ -462,24 +462,24 @@ class ChatService: ) if langflow_history.get("conversations"): - print( - f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow" + logger.debug( + f"Added {len(langflow_history['conversations'])} historical conversations from Langflow" ) elif langflow_history.get("error"): - print( - f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}" + logger.debug( + f"Could not fetch Langflow history for user {user_id}: {langflow_history['error']}" ) else: - print(f"[DEBUG] No Langflow conversations found for user {user_id}") + logger.debug(f"No Langflow conversations found for user {user_id}") except Exception as e: - print(f"[ERROR] Failed to fetch Langflow history: {e}") + logger.error(f"Failed to fetch Langflow history: {e}") # Continue with just in-memory conversations # Sort by last activity (most recent first) all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) - print( + logger.debug( f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)" ) diff --git a/src/services/conversation_persistence_service.py b/src/services/conversation_persistence_service.py index 1b37eb4e..fa5717c1 100644 --- a/src/services/conversation_persistence_service.py +++ b/src/services/conversation_persistence_service.py @@ -8,7 +8,9 @@ import os from typing import Dict, Any from datetime import datetime import threading +from utils.logging_config import get_logger +logger = get_logger(__name__) class ConversationPersistenceService: """Simple service to persist conversations to disk""" @@ -24,10 +26,10 @@ class ConversationPersistenceService: try: with open(self.storage_file, 'r', encoding='utf-8') as f: data = json.load(f) - print(f"Loaded {self._count_total_conversations(data)} conversations from {self.storage_file}") + logger.debug(f"Loaded {self._count_total_conversations(data)} conversations from {self.storage_file}") return data except Exception as e: - print(f"Error loading conversations from {self.storage_file}: {e}") + logger.error(f"Error loading conversations from {self.storage_file}: {e}") return {} return {} @@ -37,9 +39,9 @@ class ConversationPersistenceService: with self.lock: with open(self.storage_file, 'w', encoding='utf-8') as f: json.dump(self._conversations, f, indent=2, ensure_ascii=False, default=str) - print(f"Saved {self._count_total_conversations(self._conversations)} conversations to {self.storage_file}") + logger.debug(f"Saved {self._count_total_conversations(self._conversations)} conversations to {self.storage_file}") except Exception as e: - print(f"Error saving conversations to {self.storage_file}: {e}") + logger.error(f"Error saving conversations to {self.storage_file}: {e}") def _count_total_conversations(self, data: Dict[str, Any]) -> int: """Count total conversations across all users""" @@ -89,14 +91,14 @@ class ConversationPersistenceService: if user_id in self._conversations and response_id in self._conversations[user_id]: del self._conversations[user_id][response_id] self._save_conversations() - print(f"Deleted conversation {response_id} for user {user_id}") + logger.debug(f"Deleted conversation {response_id} for user {user_id}") def clear_user_conversations(self, user_id: str): """Clear all conversations for a user""" if user_id in self._conversations: del self._conversations[user_id] self._save_conversations() - print(f"Cleared all conversations for user {user_id}") + logger.debug(f"Cleared all conversations for user {user_id}") def get_storage_stats(self) -> Dict[str, Any]: """Get statistics about stored conversations""" diff --git a/src/services/langflow_history_service.py b/src/services/langflow_history_service.py index 613c2113..ee3366c1 100644 --- a/src/services/langflow_history_service.py +++ b/src/services/langflow_history_service.py @@ -6,7 +6,9 @@ Simplified service that retrieves message history from Langflow using shared cli from typing import List, Dict, Optional, Any from config.settings import clients +from utils.logging_config import get_logger +logger = get_logger(__name__) class LangflowHistoryService: """Simplified service to retrieve message history from Langflow""" @@ -29,14 +31,14 @@ class LangflowHistoryService: if response.status_code == 200: session_ids = response.json() - print(f"Found {len(session_ids)} total sessions from Langflow") + logger.debug(f"Found {len(session_ids)} total sessions from Langflow") return session_ids else: - print(f"Failed to get sessions: {response.status_code} - {response.text}") + logger.error(f"Failed to get sessions: {response.status_code} - {response.text}") return [] except Exception as e: - print(f"Error getting user sessions: {e}") + logger.error(f"Error getting user sessions: {e}") return [] async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]: @@ -56,11 +58,11 @@ class LangflowHistoryService: # Convert to OpenRAG format return self._convert_langflow_messages(messages) else: - print(f"Failed to get messages for session {session_id}: {response.status_code}") + logger.error(f"Failed to get messages for session {session_id}: {response.status_code}") return [] except Exception as e: - print(f"Error getting session messages: {e}") + logger.error(f"Error getting session messages: {e}") return [] def _convert_langflow_messages(self, langflow_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: @@ -114,7 +116,7 @@ class LangflowHistoryService: converted_messages.append(converted_msg) except Exception as e: - print(f"Error converting message: {e}") + logger.error(f"Error converting message: {e}") continue return converted_messages @@ -159,7 +161,7 @@ class LangflowHistoryService: } except Exception as e: - print(f"Error getting user conversation history: {e}") + logger.error(f"Error getting user conversation history: {e}") return { "error": str(e), "conversations": [] diff --git a/src/services/session_ownership_service.py b/src/services/session_ownership_service.py index 9e3677fd..220a6d96 100644 --- a/src/services/session_ownership_service.py +++ b/src/services/session_ownership_service.py @@ -7,7 +7,9 @@ import json import os from typing import Dict, List, Optional from datetime import datetime +from utils.logging_config import get_logger +logger = get_logger(__name__) class SessionOwnershipService: """Simple service to track which user owns which session""" @@ -23,7 +25,7 @@ class SessionOwnershipService: with open(self.ownership_file, 'r') as f: return json.load(f) except Exception as e: - print(f"Error loading session ownership data: {e}") + logger.error(f"Error loading session ownership data: {e}") return {} return {} @@ -32,9 +34,9 @@ class SessionOwnershipService: try: with open(self.ownership_file, 'w') as f: json.dump(self.ownership_data, f, indent=2) - print(f"Saved session ownership data to {self.ownership_file}") + logger.debug(f"Saved session ownership data to {self.ownership_file}") except Exception as e: - print(f"Error saving session ownership data: {e}") + logger.error(f"Error saving session ownership data: {e}") def claim_session(self, user_id: str, session_id: str): """Claim a session for a user""" @@ -45,7 +47,7 @@ class SessionOwnershipService: "last_accessed": datetime.now().isoformat() } self._save_ownership_data() - print(f"Claimed session {session_id} for user {user_id}") + logger.debug(f"Claimed session {session_id} for user {user_id}") else: # Update last accessed time self.ownership_data[session_id]["last_accessed"] = datetime.now().isoformat()