Merge branch 'main' into integration-tests

This commit is contained in:
Sebastián Estévez 2025-09-11 16:17:43 -04:00 committed by GitHub
commit a608f4e65c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 955 additions and 804 deletions

View file

@ -21,7 +21,7 @@ jobs:
tag: phact/openrag-backend tag: phact/openrag-backend
platform: linux/arm64 platform: linux/arm64
arch: arm64 arch: arm64
runs-on: self-hosted runs-on: [self-hosted, linux, ARM64, langflow-ai-arm64-2]
# frontend # frontend
- image: frontend - image: frontend
@ -35,7 +35,7 @@ jobs:
tag: phact/openrag-frontend tag: phact/openrag-frontend
platform: linux/arm64 platform: linux/arm64
arch: arm64 arch: arm64
runs-on: self-hosted runs-on: [self-hosted, linux, ARM64, langflow-ai-arm64-2]
# opensearch # opensearch
- image: opensearch - image: opensearch
@ -49,7 +49,7 @@ jobs:
tag: phact/openrag-opensearch tag: phact/openrag-opensearch
platform: linux/arm64 platform: linux/arm64
arch: arm64 arch: arm64
runs-on: self-hosted runs-on: [self-hosted, linux, ARM64, langflow-ai-arm64-2]
runs-on: ${{ matrix.runs-on }} runs-on: ${{ matrix.runs-on }}

View file

@ -1,82 +1,128 @@
"use client" "use client";
import { useState, useEffect, useRef } from "react" import { useQueryClient } from "@tanstack/react-query";
import { ChevronDown, Upload, FolderOpen, Cloud, PlugZap, Plus } from "lucide-react" import {
import { Button } from "@/components/ui/button" ChevronDown,
import { Dialog, DialogContent, DialogDescription, DialogHeader, DialogTitle } from "@/components/ui/dialog" Cloud,
import { Input } from "@/components/ui/input" FolderOpen,
import { Label } from "@/components/ui/label" PlugZap,
import { cn } from "@/lib/utils" Plus,
import { useTask } from "@/contexts/task-context" Upload,
import { useRouter } from "next/navigation" } from "lucide-react";
import { useRouter } from "next/navigation";
import { useEffect, useRef, useState } from "react";
import { Button } from "@/components/ui/button";
import {
Dialog,
DialogContent,
DialogDescription,
DialogHeader,
DialogTitle,
} from "@/components/ui/dialog";
import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label";
import { useTask } from "@/contexts/task-context";
import { cn } from "@/lib/utils";
interface KnowledgeDropdownProps { interface KnowledgeDropdownProps {
active?: boolean active?: boolean;
variant?: 'navigation' | 'button' variant?: "navigation" | "button";
} }
export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeDropdownProps) { export function KnowledgeDropdown({
const { addTask } = useTask() active,
const router = useRouter() variant = "navigation",
const [isOpen, setIsOpen] = useState(false) }: KnowledgeDropdownProps) {
const [showFolderDialog, setShowFolderDialog] = useState(false) const { addTask } = useTask();
const [showS3Dialog, setShowS3Dialog] = useState(false) const router = useRouter();
const [awsEnabled, setAwsEnabled] = useState(false) const [isOpen, setIsOpen] = useState(false);
const [folderPath, setFolderPath] = useState("/app/documents/") const [showFolderDialog, setShowFolderDialog] = useState(false);
const [bucketUrl, setBucketUrl] = useState("s3://") const [showS3Dialog, setShowS3Dialog] = useState(false);
const [folderLoading, setFolderLoading] = useState(false) const [awsEnabled, setAwsEnabled] = useState(false);
const [s3Loading, setS3Loading] = useState(false) const [folderPath, setFolderPath] = useState("/app/documents/");
const [fileUploading, setFileUploading] = useState(false) const [bucketUrl, setBucketUrl] = useState("s3://");
const [cloudConnectors, setCloudConnectors] = useState<{[key: string]: {name: string, available: boolean, connected: boolean, hasToken: boolean}}>({}) const [folderLoading, setFolderLoading] = useState(false);
const fileInputRef = useRef<HTMLInputElement>(null) const [s3Loading, setS3Loading] = useState(false);
const dropdownRef = useRef<HTMLDivElement>(null) const [fileUploading, setFileUploading] = useState(false);
const [cloudConnectors, setCloudConnectors] = useState<{
[key: string]: {
name: string;
available: boolean;
connected: boolean;
hasToken: boolean;
};
}>({});
const fileInputRef = useRef<HTMLInputElement>(null);
const dropdownRef = useRef<HTMLDivElement>(null);
const queryClient = useQueryClient();
const refetchSearch = () => {
queryClient.invalidateQueries({ queryKey: ["search"] });
};
// Check AWS availability and cloud connectors on mount // Check AWS availability and cloud connectors on mount
useEffect(() => { useEffect(() => {
const checkAvailability = async () => { const checkAvailability = async () => {
try { try {
// Check AWS // Check AWS
const awsRes = await fetch("/api/upload_options") const awsRes = await fetch("/api/upload_options");
if (awsRes.ok) { if (awsRes.ok) {
const awsData = await awsRes.json() const awsData = await awsRes.json();
setAwsEnabled(Boolean(awsData.aws)) setAwsEnabled(Boolean(awsData.aws));
} }
// Check cloud connectors // Check cloud connectors
const connectorsRes = await fetch('/api/connectors') const connectorsRes = await fetch("/api/connectors");
if (connectorsRes.ok) { if (connectorsRes.ok) {
const connectorsResult = await connectorsRes.json() const connectorsResult = await connectorsRes.json();
const cloudConnectorTypes = ['google_drive', 'onedrive', 'sharepoint'] const cloudConnectorTypes = [
const connectorInfo: {[key: string]: {name: string, available: boolean, connected: boolean, hasToken: boolean}} = {} "google_drive",
"onedrive",
"sharepoint",
];
const connectorInfo: {
[key: string]: {
name: string;
available: boolean;
connected: boolean;
hasToken: boolean;
};
} = {};
for (const type of cloudConnectorTypes) { for (const type of cloudConnectorTypes) {
if (connectorsResult.connectors[type]) { if (connectorsResult.connectors[type]) {
connectorInfo[type] = { connectorInfo[type] = {
name: connectorsResult.connectors[type].name, name: connectorsResult.connectors[type].name,
available: connectorsResult.connectors[type].available, available: connectorsResult.connectors[type].available,
connected: false, connected: false,
hasToken: false hasToken: false,
} };
// Check connection status // Check connection status
try { try {
const statusRes = await fetch(`/api/connectors/${type}/status`) const statusRes = await fetch(`/api/connectors/${type}/status`);
if (statusRes.ok) { if (statusRes.ok) {
const statusData = await statusRes.json() const statusData = await statusRes.json();
const connections = statusData.connections || [] const connections = statusData.connections || [];
const activeConnection = connections.find((conn: {is_active: boolean, connection_id: string}) => conn.is_active) const activeConnection = connections.find(
const isConnected = activeConnection !== undefined (conn: { is_active: boolean; connection_id: string }) =>
conn.is_active,
);
const isConnected = activeConnection !== undefined;
if (isConnected && activeConnection) { if (isConnected && activeConnection) {
connectorInfo[type].connected = true connectorInfo[type].connected = true;
// Check token availability // Check token availability
try { try {
const tokenRes = await fetch(`/api/connectors/${type}/token?connection_id=${activeConnection.connection_id}`) const tokenRes = await fetch(
`/api/connectors/${type}/token?connection_id=${activeConnection.connection_id}`,
);
if (tokenRes.ok) { if (tokenRes.ok) {
const tokenData = await tokenRes.json() const tokenData = await tokenRes.json();
if (tokenData.access_token) { if (tokenData.access_token) {
connectorInfo[type].hasToken = true connectorInfo[type].hasToken = true;
} }
} }
} catch { } catch {
@ -90,114 +136,134 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
} }
} }
setCloudConnectors(connectorInfo) setCloudConnectors(connectorInfo);
} }
} catch (err) { } catch (err) {
console.error("Failed to check availability", err) console.error("Failed to check availability", err);
} }
} };
checkAvailability() checkAvailability();
}, []) }, []);
// Handle click outside to close dropdown // Handle click outside to close dropdown
useEffect(() => { useEffect(() => {
const handleClickOutside = (event: MouseEvent) => { const handleClickOutside = (event: MouseEvent) => {
if (dropdownRef.current && !dropdownRef.current.contains(event.target as Node)) { if (
setIsOpen(false) dropdownRef.current &&
!dropdownRef.current.contains(event.target as Node)
) {
setIsOpen(false);
} }
} };
if (isOpen) { if (isOpen) {
document.addEventListener("mousedown", handleClickOutside) document.addEventListener("mousedown", handleClickOutside);
return () => document.removeEventListener("mousedown", handleClickOutside) return () =>
document.removeEventListener("mousedown", handleClickOutside);
} }
}, [isOpen]) }, [isOpen]);
const handleFileUpload = () => { const handleFileUpload = () => {
fileInputRef.current?.click() fileInputRef.current?.click();
} };
const handleFileChange = async (e: React.ChangeEvent<HTMLInputElement>) => { const handleFileChange = async (e: React.ChangeEvent<HTMLInputElement>) => {
const files = e.target.files const files = e.target.files;
if (files && files.length > 0) { if (files && files.length > 0) {
// Close dropdown and disable button immediately after file selection // Close dropdown and disable button immediately after file selection
setIsOpen(false) setIsOpen(false);
setFileUploading(true) setFileUploading(true);
// Trigger the same file upload event as the chat page // Trigger the same file upload event as the chat page
window.dispatchEvent(new CustomEvent('fileUploadStart', { window.dispatchEvent(
detail: { filename: files[0].name } new CustomEvent("fileUploadStart", {
})) detail: { filename: files[0].name },
}),
);
try { try {
const formData = new FormData() const formData = new FormData();
formData.append('file', files[0]) formData.append("file", files[0]);
// Use router upload and ingest endpoint (automatically routes based on configuration)
const uploadIngestRes = await fetch('/api/upload', { const uploadIngestRes = await fetch('/api/upload', {
method: 'POST',
body: formData, body: formData,
}) });
const uploadIngestJson = await uploadIngestRes.json() const uploadIngestJson = await uploadIngestRes.json();
if (!uploadIngestRes.ok) { if (!uploadIngestRes.ok) {
throw new Error(uploadIngestJson?.error || 'Upload and ingest failed') throw new Error(
uploadIngestJson?.error || "Upload and ingest failed",
);
} }
// Extract results from the unified response // Extract results from the unified response
const fileId = uploadIngestJson?.upload?.id const fileId = uploadIngestJson?.upload?.id;
const filePath = uploadIngestJson?.upload?.path const filePath = uploadIngestJson?.upload?.path;
const runJson = uploadIngestJson?.ingestion const runJson = uploadIngestJson?.ingestion;
const deleteResult = uploadIngestJson?.deletion const deleteResult = uploadIngestJson?.deletion;
if (!fileId || !filePath) { if (!fileId || !filePath) {
throw new Error('Upload successful but no file id/path returned') throw new Error("Upload successful but no file id/path returned");
} }
// Log deletion status if provided // Log deletion status if provided
if (deleteResult) { if (deleteResult) {
if (deleteResult.status === 'deleted') { if (deleteResult.status === "deleted") {
console.log('File successfully cleaned up from Langflow:', deleteResult.file_id) console.log(
} else if (deleteResult.status === 'delete_failed') { "File successfully cleaned up from Langflow:",
console.warn('Failed to cleanup file from Langflow:', deleteResult.error) deleteResult.file_id,
);
} else if (deleteResult.status === "delete_failed") {
console.warn(
"Failed to cleanup file from Langflow:",
deleteResult.error,
);
} }
} }
// Notify UI // Notify UI
window.dispatchEvent(new CustomEvent('fileUploaded', { window.dispatchEvent(
detail: { new CustomEvent("fileUploaded", {
file: files[0], detail: {
result: { file: files[0],
file_id: fileId, result: {
file_path: filePath, file_id: fileId,
run: runJson, file_path: filePath,
deletion: deleteResult, run: runJson,
unified: true deletion: deleteResult,
} unified: true,
} },
})) },
}),
);
// Trigger search refresh after successful ingestion // Trigger search refresh after successful ingestion
window.dispatchEvent(new CustomEvent('knowledgeUpdated')) window.dispatchEvent(new CustomEvent("knowledgeUpdated"));
} catch (error) { } catch (error) {
window.dispatchEvent(new CustomEvent('fileUploadError', { window.dispatchEvent(
detail: { filename: files[0].name, error: error instanceof Error ? error.message : 'Upload failed' } new CustomEvent("fileUploadError", {
})) detail: {
filename: files[0].name,
error: error instanceof Error ? error.message : "Upload failed",
},
}),
);
} finally { } finally {
window.dispatchEvent(new CustomEvent('fileUploadComplete')) window.dispatchEvent(new CustomEvent("fileUploadComplete"));
setFileUploading(false) setFileUploading(false);
refetchSearch();
} }
} }
// Reset file input // Reset file input
if (fileInputRef.current) { if (fileInputRef.current) {
fileInputRef.current.value = '' fileInputRef.current.value = "";
} }
} };
const handleFolderUpload = async () => { const handleFolderUpload = async () => {
if (!folderPath.trim()) return if (!folderPath.trim()) return;
setFolderLoading(true) setFolderLoading(true);
setShowFolderDialog(false) setShowFolderDialog(false);
try { try {
const response = await fetch("/api/upload_path", { const response = await fetch("/api/upload_path", {
@ -206,40 +272,40 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
"Content-Type": "application/json", "Content-Type": "application/json",
}, },
body: JSON.stringify({ path: folderPath }), body: JSON.stringify({ path: folderPath }),
}) });
const result = await response.json();
const result = await response.json()
if (response.status === 201) { if (response.status === 201) {
const taskId = result.task_id || result.id const taskId = result.task_id || result.id;
if (!taskId) { if (!taskId) {
throw new Error("No task ID received from server") throw new Error("No task ID received from server");
} }
addTask(taskId) addTask(taskId);
setFolderPath("") setFolderPath("");
// Trigger search refresh after successful folder processing starts // Trigger search refresh after successful folder processing starts
window.dispatchEvent(new CustomEvent('knowledgeUpdated')) window.dispatchEvent(new CustomEvent("knowledgeUpdated"));
} else if (response.ok) { } else if (response.ok) {
setFolderPath("") setFolderPath("");
window.dispatchEvent(new CustomEvent('knowledgeUpdated')) window.dispatchEvent(new CustomEvent("knowledgeUpdated"));
} else { } else {
console.error("Folder upload failed:", result.error) console.error("Folder upload failed:", result.error);
} }
} catch (error) { } catch (error) {
console.error("Folder upload error:", error) console.error("Folder upload error:", error);
} finally { } finally {
setFolderLoading(false) setFolderLoading(false);
refetchSearch();
} }
} };
const handleS3Upload = async () => { const handleS3Upload = async () => {
if (!bucketUrl.trim()) return if (!bucketUrl.trim()) return;
setS3Loading(true) setS3Loading(true);
setShowS3Dialog(false) setShowS3Dialog(false);
try { try {
const response = await fetch("/api/upload_bucket", { const response = await fetch("/api/upload_bucket", {
@ -248,30 +314,31 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
"Content-Type": "application/json", "Content-Type": "application/json",
}, },
body: JSON.stringify({ s3_url: bucketUrl }), body: JSON.stringify({ s3_url: bucketUrl }),
}) });
const result = await response.json() const result = await response.json();
if (response.status === 201) { if (response.status === 201) {
const taskId = result.task_id || result.id const taskId = result.task_id || result.id;
if (!taskId) { if (!taskId) {
throw new Error("No task ID received from server") throw new Error("No task ID received from server");
} }
addTask(taskId) addTask(taskId);
setBucketUrl("s3://") setBucketUrl("s3://");
// Trigger search refresh after successful S3 processing starts // Trigger search refresh after successful S3 processing starts
window.dispatchEvent(new CustomEvent('knowledgeUpdated')) window.dispatchEvent(new CustomEvent("knowledgeUpdated"));
} else { } else {
console.error("S3 upload failed:", result.error) console.error("S3 upload failed:", result.error);
} }
} catch (error) { } catch (error) {
console.error("S3 upload error:", error) console.error("S3 upload error:", error);
} finally { } finally {
setS3Loading(false) setS3Loading(false);
refetchSearch();
} }
} };
const cloudConnectorItems = Object.entries(cloudConnectors) const cloudConnectorItems = Object.entries(cloudConnectors)
.filter(([, info]) => info.available) .filter(([, info]) => info.available)
@ -279,72 +346,99 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
label: info.name, label: info.name,
icon: PlugZap, icon: PlugZap,
onClick: () => { onClick: () => {
setIsOpen(false) setIsOpen(false);
if (info.connected && info.hasToken) { if (info.connected && info.hasToken) {
router.push(`/upload/${type}`) router.push(`/upload/${type}`);
} else { } else {
router.push('/settings') router.push("/settings");
} }
}, },
disabled: !info.connected || !info.hasToken, disabled: !info.connected || !info.hasToken,
tooltip: !info.connected ? `Connect ${info.name} in Settings first` : tooltip: !info.connected
!info.hasToken ? `Reconnect ${info.name} - access token required` : ? `Connect ${info.name} in Settings first`
undefined : !info.hasToken
})) ? `Reconnect ${info.name} - access token required`
: undefined,
}));
const menuItems = [ const menuItems = [
{ {
label: "Add File", label: "Add File",
icon: Upload, icon: Upload,
onClick: handleFileUpload onClick: handleFileUpload,
}, },
{ {
label: "Process Folder", label: "Process Folder",
icon: FolderOpen, icon: FolderOpen,
onClick: () => { onClick: () => {
setIsOpen(false) setIsOpen(false);
setShowFolderDialog(true) setShowFolderDialog(true);
} },
}, },
...(awsEnabled ? [{ ...(awsEnabled
label: "Process S3 Bucket", ? [
icon: Cloud, {
onClick: () => { label: "Process S3 Bucket",
setIsOpen(false) icon: Cloud,
setShowS3Dialog(true) onClick: () => {
} setIsOpen(false);
}] : []), setShowS3Dialog(true);
...cloudConnectorItems },
] },
]
: []),
...cloudConnectorItems,
];
return ( return (
<> <>
<div ref={dropdownRef} className="relative"> <div ref={dropdownRef} className="relative">
<button <button
onClick={() => !(fileUploading || folderLoading || s3Loading) && setIsOpen(!isOpen)} onClick={() =>
!(fileUploading || folderLoading || s3Loading) && setIsOpen(!isOpen)
}
disabled={fileUploading || folderLoading || s3Loading} disabled={fileUploading || folderLoading || s3Loading}
className={cn( className={cn(
variant === 'button' variant === "button"
? "rounded-lg h-12 px-4 flex items-center gap-2 bg-primary text-primary-foreground hover:bg-primary/90 transition-colors disabled:opacity-50 disabled:cursor-not-allowed" ? "rounded-lg h-12 px-4 flex items-center gap-2 bg-primary text-primary-foreground hover:bg-primary/90 transition-colors disabled:opacity-50 disabled:cursor-not-allowed"
: "text-sm group flex p-3 w-full justify-start font-medium cursor-pointer hover:bg-accent hover:text-accent-foreground rounded-lg transition-all disabled:opacity-50 disabled:cursor-not-allowed", : "text-sm group flex p-3 w-full justify-start font-medium cursor-pointer hover:bg-accent hover:text-accent-foreground rounded-lg transition-all disabled:opacity-50 disabled:cursor-not-allowed",
variant === 'navigation' && active variant === "navigation" && active
? "bg-accent text-accent-foreground shadow-sm" ? "bg-accent text-accent-foreground shadow-sm"
: variant === 'navigation' ? "text-foreground hover:text-accent-foreground" : "", : variant === "navigation"
? "text-foreground hover:text-accent-foreground"
: "",
)} )}
> >
{variant === 'button' ? ( {variant === "button" ? (
<> <>
<Plus className="h-4 w-4" /> <Plus className="h-4 w-4" />
<span>Add Knowledge</span> <span>Add Knowledge</span>
<ChevronDown className={cn("h-4 w-4 transition-transform", isOpen && "rotate-180")} /> <ChevronDown
className={cn(
"h-4 w-4 transition-transform",
isOpen && "rotate-180",
)}
/>
</> </>
) : ( ) : (
<> <>
<div className="flex items-center flex-1"> <div className="flex items-center flex-1">
<Upload className={cn("h-4 w-4 mr-3 shrink-0", active ? "text-accent-foreground" : "text-muted-foreground group-hover:text-foreground")} /> <Upload
className={cn(
"h-4 w-4 mr-3 shrink-0",
active
? "text-accent-foreground"
: "text-muted-foreground group-hover:text-foreground",
)}
/>
Knowledge Knowledge
</div> </div>
<ChevronDown className={cn("h-4 w-4 transition-transform", isOpen && "rotate-180")} /> <ChevronDown
className={cn(
"h-4 w-4 transition-transform",
isOpen && "rotate-180",
)}
/>
</> </>
)} )}
</button> </button>
@ -356,11 +450,13 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
<button <button
key={index} key={index}
onClick={item.onClick} onClick={item.onClick}
disabled={'disabled' in item ? item.disabled : false} disabled={"disabled" in item ? item.disabled : false}
title={'tooltip' in item ? item.tooltip : undefined} title={"tooltip" in item ? item.tooltip : undefined}
className={cn( className={cn(
"w-full px-3 py-2 text-left text-sm hover:bg-accent hover:text-accent-foreground", "w-full px-3 py-2 text-left text-sm hover:bg-accent hover:text-accent-foreground",
'disabled' in item && item.disabled && "opacity-50 cursor-not-allowed hover:bg-transparent hover:text-current" "disabled" in item &&
item.disabled &&
"opacity-50 cursor-not-allowed hover:bg-transparent hover:text-current",
)} )}
> >
{item.label} {item.label}
@ -429,7 +525,8 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
Process S3 Bucket Process S3 Bucket
</DialogTitle> </DialogTitle>
<DialogDescription> <DialogDescription>
Process all documents from an S3 bucket. AWS credentials must be configured. Process all documents from an S3 bucket. AWS credentials must be
configured.
</DialogDescription> </DialogDescription>
</DialogHeader> </DialogHeader>
<div className="space-y-4"> <div className="space-y-4">
@ -444,10 +541,7 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
/> />
</div> </div>
<div className="flex justify-end gap-2"> <div className="flex justify-end gap-2">
<Button <Button variant="outline" onClick={() => setShowS3Dialog(false)}>
variant="outline"
onClick={() => setShowS3Dialog(false)}
>
Cancel Cancel
</Button> </Button>
<Button <Button
@ -460,7 +554,6 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
</div> </div>
</DialogContent> </DialogContent>
</Dialog> </Dialog>
</> </>
) );
} }

View file

@ -0,0 +1,177 @@
import {
type UseQueryOptions,
useQuery,
useQueryClient,
} from "@tanstack/react-query";
import type { ParsedQueryData } from "@/contexts/knowledge-filter-context";
export interface SearchPayload {
query: string;
limit: number;
scoreThreshold: number;
filters?: {
data_sources?: string[];
document_types?: string[];
owners?: string[];
connector_types?: string[];
};
}
export interface ChunkResult {
filename: string;
mimetype: string;
page: number;
text: string;
score: number;
source_url?: string;
owner?: string;
owner_name?: string;
owner_email?: string;
file_size?: number;
connector_type?: string;
}
export interface File {
filename: string;
mimetype: string;
chunkCount: number;
avgScore: number;
source_url: string;
owner: string;
owner_name: string;
owner_email: string;
size: number;
connector_type: string;
chunks: ChunkResult[];
}
export const useGetSearchQuery = (
query: string,
queryData?: ParsedQueryData | null,
options?: Omit<UseQueryOptions, "queryKey" | "queryFn">,
) => {
const queryClient = useQueryClient();
async function getFiles(): Promise<File[]> {
try {
const searchPayload: SearchPayload = {
query: query || queryData?.query || "*",
limit: queryData?.limit || (query.trim() === "" ? 10000 : 10), // Maximum allowed limit for wildcard searches
scoreThreshold: queryData?.scoreThreshold || 0,
};
if (queryData?.filters) {
const filters = queryData.filters;
// Only include filters if they're not wildcards (not "*")
const hasSpecificFilters =
!filters.data_sources.includes("*") ||
!filters.document_types.includes("*") ||
!filters.owners.includes("*") ||
(filters.connector_types && !filters.connector_types.includes("*"));
if (hasSpecificFilters) {
const processedFilters: SearchPayload["filters"] = {};
// Only add filter arrays that don't contain wildcards
if (!filters.data_sources.includes("*")) {
processedFilters.data_sources = filters.data_sources;
}
if (!filters.document_types.includes("*")) {
processedFilters.document_types = filters.document_types;
}
if (!filters.owners.includes("*")) {
processedFilters.owners = filters.owners;
}
if (
filters.connector_types &&
!filters.connector_types.includes("*")
) {
processedFilters.connector_types = filters.connector_types;
}
// Only add filters object if it has any actual filters
if (Object.keys(processedFilters).length > 0) {
searchPayload.filters = processedFilters;
}
}
}
const response = await fetch(`/api/search`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(searchPayload),
});
const data = await response.json();
// Group chunks by filename to create file results similar to page.tsx
const fileMap = new Map<
string,
{
filename: string;
mimetype: string;
chunks: ChunkResult[];
totalScore: number;
source_url?: string;
owner?: string;
owner_name?: string;
owner_email?: string;
file_size?: number;
connector_type?: string;
}
>();
data.results.forEach((chunk: ChunkResult) => {
const existing = fileMap.get(chunk.filename);
if (existing) {
existing.chunks.push(chunk);
existing.totalScore += chunk.score;
} else {
fileMap.set(chunk.filename, {
filename: chunk.filename,
mimetype: chunk.mimetype,
chunks: [chunk],
totalScore: chunk.score,
source_url: chunk.source_url,
owner: chunk.owner,
owner_name: chunk.owner_name,
owner_email: chunk.owner_email,
file_size: chunk.file_size,
connector_type: chunk.connector_type,
});
}
});
const files: File[] = Array.from(fileMap.values()).map((file) => ({
filename: file.filename,
mimetype: file.mimetype,
chunkCount: file.chunks.length,
avgScore: file.totalScore / file.chunks.length,
source_url: file.source_url || "",
owner: file.owner || "",
owner_name: file.owner_name || "",
owner_email: file.owner_email || "",
size: file.file_size || 0,
connector_type: file.connector_type || "local",
chunks: file.chunks,
}));
return files;
} catch (error) {
console.error("Error getting files", error);
return [];
}
}
const queryResult = useQuery(
{
queryKey: ["search", query],
placeholderData: (prev) => prev,
queryFn: getFiles,
...options,
},
queryClient,
);
return queryResult;
};

View file

@ -1,325 +1,109 @@
"use client" "use client";
import { useState, useEffect, useCallback, useRef } from "react" import {
Building2,
import { Button } from "@/components/ui/button" Cloud,
import { Input } from "@/components/ui/input" FileText,
import { Search, Loader2, FileText, HardDrive, Building2, Cloud } from "lucide-react" HardDrive,
import { TbBrandOnedrive } from "react-icons/tb" Loader2,
import { SiGoogledrive } from "react-icons/si" Search,
import { ProtectedRoute } from "@/components/protected-route" } from "lucide-react";
import { useKnowledgeFilter } from "@/contexts/knowledge-filter-context" import { type FormEvent, useCallback, useEffect, useState } from "react";
import { useTask } from "@/contexts/task-context" import { SiGoogledrive } from "react-icons/si";
import { KnowledgeDropdown } from "@/components/knowledge-dropdown" import { TbBrandOnedrive } from "react-icons/tb";
import { KnowledgeDropdown } from "@/components/knowledge-dropdown";
import { ProtectedRoute } from "@/components/protected-route";
interface ChunkResult { import { Button } from "@/components/ui/button";
filename: string import { Input } from "@/components/ui/input";
mimetype: string import { useKnowledgeFilter } from "@/contexts/knowledge-filter-context";
page: number import { useTask } from "@/contexts/task-context";
text: string import { type File, useGetSearchQuery } from "../api/queries/useGetSearchQuery";
score: number
source_url?: string
owner?: string
owner_name?: string
owner_email?: string
file_size?: number
connector_type?: string
}
interface FileResult {
filename: string
mimetype: string
chunkCount: number
avgScore: number
source_url?: string
owner?: string
owner_name?: string
owner_email?: string
lastModified?: string
size?: number
connector_type?: string
}
interface SearchResponse {
results: ChunkResult[]
files?: FileResult[]
error?: string
total?: number
aggregations?: {
data_sources?: { buckets?: Array<{ key: string | number; doc_count: number }> }
document_types?: { buckets?: Array<{ key: string | number; doc_count: number }> }
owners?: { buckets?: Array<{ key: string | number; doc_count: number }> }
connector_types?: { buckets?: Array<{ key: string | number; doc_count: number }> }
}
}
// Function to get the appropriate icon for a connector type // Function to get the appropriate icon for a connector type
function getSourceIcon(connectorType?: string) { function getSourceIcon(connectorType?: string) {
switch (connectorType) { switch (connectorType) {
case 'google_drive': case "google_drive":
return <SiGoogledrive className="h-4 w-4 text-foreground" /> return <SiGoogledrive className="h-4 w-4 text-foreground" />;
case 'onedrive': case "onedrive":
return <TbBrandOnedrive className="h-4 w-4 text-foreground" /> return <TbBrandOnedrive className="h-4 w-4 text-foreground" />;
case 'sharepoint': case "sharepoint":
return <Building2 className="h-4 w-4 text-foreground" /> return <Building2 className="h-4 w-4 text-foreground" />;
case 's3': case "s3":
return <Cloud className="h-4 w-4 text-foreground" /> return <Cloud className="h-4 w-4 text-foreground" />;
case 'local':
default: default:
return <HardDrive className="h-4 w-4 text-muted-foreground" /> return <HardDrive className="h-4 w-4 text-muted-foreground" />;
} }
} }
function SearchPage() { function SearchPage() {
const { isMenuOpen } = useTask() const { isMenuOpen } = useTask();
const { parsedFilterData, isPanelOpen } = useKnowledgeFilter() const { parsedFilterData, isPanelOpen } = useKnowledgeFilter();
const [query, setQuery] = useState("*") const [query, setQuery] = useState("");
const [loading, setLoading] = useState(false) const [queryInputText, setQueryInputText] = useState("");
const [chunkResults, setChunkResults] = useState<ChunkResult[]>([]) const [selectedFile, setSelectedFile] = useState<string | null>(null);
const [fileResults, setFileResults] = useState<FileResult[]>([])
const [selectedFile, setSelectedFile] = useState<string | null>(null)
const [searchPerformed, setSearchPerformed] = useState(false)
const prevFilterDataRef = useRef<string>("")
const handleSearch = useCallback(async (e?: React.FormEvent) => { const {
if (e) e.preventDefault() data = [],
if (!query.trim()) return isFetching,
refetch: refetchSearch,
setLoading(true) } = useGetSearchQuery(query, parsedFilterData);
setSearchPerformed(false)
try {
// Build search payload with global filter data
interface SearchPayload {
query: string;
limit: number;
scoreThreshold: number;
filters?: {
data_sources?: string[];
document_types?: string[];
owners?: string[];
connector_types?: string[];
};
}
const searchPayload: SearchPayload = {
query,
limit: parsedFilterData?.limit || (query.trim() === "*" ? 10000 : 10), // Maximum allowed limit for wildcard searches
scoreThreshold: parsedFilterData?.scoreThreshold || 0
}
// Debug logging for wildcard searches
if (query.trim() === "*") {
console.log("Wildcard search - parsedFilterData:", parsedFilterData)
}
// Add filters from global context if available and not wildcards
if (parsedFilterData?.filters) {
const filters = parsedFilterData.filters
// Only include filters if they're not wildcards (not "*")
const hasSpecificFilters =
!filters.data_sources.includes("*") ||
!filters.document_types.includes("*") ||
!filters.owners.includes("*") ||
(filters.connector_types && !filters.connector_types.includes("*"))
if (hasSpecificFilters) {
const processedFilters: SearchPayload['filters'] = {}
// Only add filter arrays that don't contain wildcards
if (!filters.data_sources.includes("*")) {
processedFilters.data_sources = filters.data_sources
}
if (!filters.document_types.includes("*")) {
processedFilters.document_types = filters.document_types
}
if (!filters.owners.includes("*")) {
processedFilters.owners = filters.owners
}
if (filters.connector_types && !filters.connector_types.includes("*")) {
processedFilters.connector_types = filters.connector_types
}
// Only add filters object if it has any actual filters
if (Object.keys(processedFilters).length > 0) {
searchPayload.filters = processedFilters
}
}
// If all filters are wildcards, omit the filters object entirely
}
const response = await fetch("/api/search", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(searchPayload),
})
const result: SearchResponse = await response.json()
if (response.ok) {
const chunks = result.results || []
// Debug logging for wildcard searches
if (query.trim() === "*") {
console.log("Wildcard search results:", {
chunks: chunks.length,
totalFromBackend: result.total,
searchPayload,
firstChunk: chunks[0]
})
}
setChunkResults(chunks)
// Group chunks by filename to create file results
const fileMap = new Map<string, {
filename: string
mimetype: string
chunks: ChunkResult[]
totalScore: number
source_url?: string
owner?: string
owner_name?: string
owner_email?: string
file_size?: number
connector_type?: string
}>()
chunks.forEach(chunk => {
const existing = fileMap.get(chunk.filename)
if (existing) {
existing.chunks.push(chunk)
existing.totalScore += chunk.score
} else {
fileMap.set(chunk.filename, {
filename: chunk.filename,
mimetype: chunk.mimetype,
chunks: [chunk],
totalScore: chunk.score,
source_url: chunk.source_url,
owner: chunk.owner,
owner_name: chunk.owner_name,
owner_email: chunk.owner_email,
file_size: chunk.file_size,
connector_type: chunk.connector_type
})
}
})
const files: FileResult[] = Array.from(fileMap.values()).map(file => ({
filename: file.filename,
mimetype: file.mimetype,
chunkCount: file.chunks.length,
avgScore: file.totalScore / file.chunks.length,
source_url: file.source_url,
owner: file.owner,
owner_name: file.owner_name,
owner_email: file.owner_email,
size: file.file_size,
connector_type: file.connector_type
}))
setFileResults(files)
setSearchPerformed(true)
} else {
console.error("Search failed:", result.error)
setChunkResults([])
setFileResults([])
setSearchPerformed(true)
}
} catch (error) {
console.error("Search error:", error)
setChunkResults([])
setFileResults([])
setSearchPerformed(true)
} finally {
setLoading(false)
}
}, [query, parsedFilterData])
// Update query when global filter changes // Update query when global filter changes
useEffect(() => { useEffect(() => {
if (parsedFilterData?.query) { if (parsedFilterData?.query) {
setQuery(parsedFilterData.query) setQueryInputText(parsedFilterData.query);
} }
}, [parsedFilterData]) }, [parsedFilterData]);
// Auto-refresh search when filter changes (but only if search was already performed)
useEffect(() => {
if (!parsedFilterData) return
// Create a stable string representation of the filter data for comparison
const currentFilterString = JSON.stringify({
filters: parsedFilterData.filters,
limit: parsedFilterData.limit,
scoreThreshold: parsedFilterData.scoreThreshold
})
// Only trigger search if filter data actually changed and we've done a search before
if (prevFilterDataRef.current !== "" &&
prevFilterDataRef.current !== currentFilterString &&
searchPerformed &&
query.trim()) {
console.log("Filter changed, auto-refreshing search")
handleSearch()
}
// Update the ref with current filter data
prevFilterDataRef.current = currentFilterString
}, [parsedFilterData, searchPerformed, query, handleSearch])
// Auto-search on mount with "*"
useEffect(() => {
// Only trigger initial search on mount when query is "*"
handleSearch()
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []) // Only run once on mount - ignore handleSearch dependency
// Listen for knowledge updates and refresh search
useEffect(() => {
const handleKnowledgeUpdate = () => {
console.log("Knowledge updated, refreshing search")
handleSearch()
}
window.addEventListener('knowledgeUpdated', handleKnowledgeUpdate)
return () => window.removeEventListener('knowledgeUpdated', handleKnowledgeUpdate)
}, [handleSearch])
const handleSearch = useCallback(
(e?: FormEvent<HTMLFormElement>) => {
if (e) e.preventDefault();
if (query.trim() === queryInputText.trim()) {
refetchSearch();
return;
}
setQuery(queryInputText);
},
[queryInputText, refetchSearch, query],
);
const fileResults = data as File[];
return ( return (
<div className={`fixed inset-0 md:left-72 top-[53px] flex flex-col transition-all duration-300 ${ <div
isMenuOpen && isPanelOpen ? 'md:right-[704px]' : // Both open: 384px (menu) + 320px (KF panel) className={`fixed inset-0 md:left-72 top-[53px] flex flex-col transition-all duration-300 ${
isMenuOpen ? 'md:right-96' : // Only menu open: 384px isMenuOpen && isPanelOpen
isPanelOpen ? 'md:right-80' : // Only KF panel open: 320px ? "md:right-[704px]"
'md:right-6' // Neither open: 24px : // Both open: 384px (menu) + 320px (KF panel)
}`}> isMenuOpen
? "md:right-96"
: // Only menu open: 384px
isPanelOpen
? "md:right-80"
: // Only KF panel open: 320px
"md:right-6" // Neither open: 24px
}`}
>
<div className="flex-1 flex flex-col min-h-0 px-6 py-6"> <div className="flex-1 flex flex-col min-h-0 px-6 py-6">
{/* Search Input Area */} {/* Search Input Area */}
<div className="flex-shrink-0 mb-6"> <div className="flex-shrink-0 mb-6">
<form onSubmit={handleSearch} className="flex gap-3"> <form onSubmit={handleSearch} className="flex gap-3">
<Input <Input
name="search-query"
id="search-query" id="search-query"
type="text" type="text"
defaultValue={parsedFilterData?.query}
value={queryInputText}
onChange={(e) => setQueryInputText(e.target.value)}
placeholder="Search your documents..." placeholder="Search your documents..."
value={query}
onChange={(e) => setQuery(e.target.value)}
className="flex-1 bg-muted/20 rounded-lg border border-border/50 px-4 py-3 h-12 focus-visible:ring-1 focus-visible:ring-ring" className="flex-1 bg-muted/20 rounded-lg border border-border/50 px-4 py-3 h-12 focus-visible:ring-1 focus-visible:ring-ring"
/> />
<Button <Button
type="submit" type="submit"
disabled={!query.trim() || loading}
variant="secondary" variant="secondary"
className="rounded-lg h-12 w-12 p-0 flex-shrink-0" className="rounded-lg h-12 w-12 p-0 flex-shrink-0"
> >
{loading ? ( {isFetching ? (
<Loader2 className="h-4 w-4 animate-spin" /> <Loader2 className="h-4 w-4 animate-spin" />
) : ( ) : (
<Search className="h-4 w-4" /> <Search className="h-4 w-4" />
@ -334,48 +118,59 @@ function SearchPage() {
{/* Results Area */} {/* Results Area */}
<div className="flex-1 overflow-y-auto"> <div className="flex-1 overflow-y-auto">
<div className="space-y-4"> <div className="space-y-4">
{fileResults.length === 0 && chunkResults.length === 0 && !loading ? ( {fileResults.length === 0 && !isFetching ? (
<div className="text-center py-12"> <div className="text-center py-12">
<Search className="h-12 w-12 mx-auto mb-4 text-muted-foreground/50" /> <Search className="h-12 w-12 mx-auto mb-4 text-muted-foreground/50" />
<p className="text-lg text-muted-foreground">No documents found</p> <p className="text-lg text-muted-foreground">
No documents found
</p>
<p className="text-sm text-muted-foreground/70 mt-2"> <p className="text-sm text-muted-foreground/70 mt-2">
Try adjusting your search terms Try adjusting your search terms
</p> </p>
</div> </div>
) : ( ) : (
<> <div className="space-y-4">
{/* Results Count */} {/* Results Count */}
<div className="mb-4"> <div className="mb-4">
<div className="text-sm text-muted-foreground"> <div className="text-sm text-muted-foreground">
{fileResults.length} file{fileResults.length !== 1 ? 's' : ''} found {fileResults.length} file
{fileResults.length !== 1 ? "s" : ""} found
</div> </div>
</div> </div>
{/* Results Display */} {/* Results Display */}
<div className="space-y-4"> <div
className={isFetching ? "opacity-50 pointer-events-none" : ""}
>
{selectedFile ? ( {selectedFile ? (
// Show chunks for selected file // Show chunks for selected file
<> <>
<div className="flex items-center gap-2 mb-4"> <div className="flex items-center gap-2 mb-4">
<Button <Button
variant="ghost" variant="ghost"
size="sm" size="sm"
onClick={() => setSelectedFile(null)} onClick={() => setSelectedFile(null)}
>
Back to files
</Button>
<span className="text-sm text-muted-foreground">
Chunks from {selectedFile}
</span>
</div>
{fileResults
.filter((file) => file.filename === selectedFile)
.flatMap((file) => file.chunks)
.map((chunk, index) => (
<div
key={chunk.filename + index}
className="bg-muted/20 rounded-lg p-4 border border-border/50"
> >
Back to files
</Button>
<span className="text-sm text-muted-foreground">
Chunks from {selectedFile}
</span>
</div>
{chunkResults
.filter(chunk => chunk.filename === selectedFile)
.map((chunk, index) => (
<div key={index} className="bg-muted/20 rounded-lg p-4 border border-border/50">
<div className="flex items-center justify-between mb-2"> <div className="flex items-center justify-between mb-2">
<div className="flex items-center gap-2"> <div className="flex items-center gap-2">
<FileText className="h-4 w-4 text-blue-400" /> <FileText className="h-4 w-4 text-blue-400" />
<span className="font-medium truncate">{chunk.filename}</span> <span className="font-medium truncate">
{chunk.filename}
</span>
</div> </div>
<span className="text-xs text-green-400 bg-green-400/20 px-2 py-1 rounded"> <span className="text-xs text-green-400 bg-green-400/20 px-2 py-1 rounded">
{chunk.score.toFixed(2)} {chunk.score.toFixed(2)}
@ -389,67 +184,87 @@ function SearchPage() {
</p> </p>
</div> </div>
))} ))}
</> </>
) : ( ) : (
// Show files table // Show files table
<div className="bg-muted/20 rounded-lg border border-border/50 overflow-hidden"> <div className="bg-muted/20 rounded-lg border border-border/50 overflow-hidden">
<table className="w-full"> <table className="w-full">
<thead> <thead>
<tr className="border-b border-border/50 bg-muted/10"> <tr className="border-b border-border/50 bg-muted/10">
<th className="text-left p-3 text-sm font-medium text-muted-foreground">Source</th> <th className="text-left p-3 text-sm font-medium text-muted-foreground">
<th className="text-left p-3 text-sm font-medium text-muted-foreground">Type</th> Source
<th className="text-left p-3 text-sm font-medium text-muted-foreground">Size</th> </th>
<th className="text-left p-3 text-sm font-medium text-muted-foreground">Matching chunks</th> <th className="text-left p-3 text-sm font-medium text-muted-foreground">
<th className="text-left p-3 text-sm font-medium text-muted-foreground">Average score</th> Type
<th className="text-left p-3 text-sm font-medium text-muted-foreground">Owner</th> </th>
</tr> <th className="text-left p-3 text-sm font-medium text-muted-foreground">
</thead> Size
<tbody> </th>
{fileResults.map((file, index) => ( <th className="text-left p-3 text-sm font-medium text-muted-foreground">
<tr Matching chunks
key={index} </th>
className="border-b border-border/30 hover:bg-muted/20 cursor-pointer transition-colors" <th className="text-left p-3 text-sm font-medium text-muted-foreground">
onClick={() => setSelectedFile(file.filename)} Average score
> </th>
<td className="p-3"> <th className="text-left p-3 text-sm font-medium text-muted-foreground">
<div className="flex items-center gap-2"> Owner
{getSourceIcon(file.connector_type)} </th>
<span className="font-medium truncate" title={file.filename}> </tr>
{file.filename} </thead>
</span> <tbody>
</div> {fileResults.map((file) => (
</td> <tr
<td className="p-3 text-sm text-muted-foreground"> key={file.filename}
{file.mimetype} className="border-b border-border/30 hover:bg-muted/20 cursor-pointer transition-colors"
</td> onClick={() => setSelectedFile(file.filename)}
<td className="p-3 text-sm text-muted-foreground"> >
{file.size ? `${Math.round(file.size / 1024)} KB` : '—'} <td className="p-3">
</td> <div className="flex items-center gap-2">
<td className="p-3 text-sm text-muted-foreground"> {getSourceIcon(file.connector_type)}
{file.chunkCount} <span
</td> className="font-medium truncate"
<td className="p-3"> title={file.filename}
<span className="text-xs text-green-400 bg-green-400/20 px-2 py-1 rounded"> >
{file.avgScore.toFixed(2)} {file.filename}
</span> </span>
</td> </div>
<td className="p-3 text-sm text-muted-foreground" title={file.owner_email}> </td>
{file.owner_name || file.owner || '—'} <td className="p-3 text-sm text-muted-foreground">
</td> {file.mimetype}
</tr> </td>
))} <td className="p-3 text-sm text-muted-foreground">
</tbody> {file.size
</table> ? `${Math.round(file.size / 1024)} KB`
</div> : "—"}
)} </td>
<td className="p-3 text-sm text-muted-foreground">
{file.chunkCount}
</td>
<td className="p-3">
<span className="text-xs text-green-400 bg-green-400/20 px-2 py-1 rounded">
{file.avgScore.toFixed(2)}
</span>
</td>
<td
className="p-3 text-sm text-muted-foreground"
title={file.owner_email}
>
{file.owner_name || file.owner || "—"}
</td>
</tr>
))}
</tbody>
</table>
</div>
)}
</div> </div>
</> </div>
)} )}
</div> </div>
</div> </div>
</div> </div>
</div> </div>
) );
} }
export default function ProtectedSearchPage() { export default function ProtectedSearchPage() {
@ -457,5 +272,5 @@ export default function ProtectedSearchPage() {
<ProtectedRoute> <ProtectedRoute>
<SearchPage /> <SearchPage />
</ProtectedRoute> </ProtectedRoute>
) );
} }

View file

@ -1,95 +1,107 @@
"use client" "use client";
import React, { createContext, useContext, useState, ReactNode } from 'react' import React, {
createContext,
type ReactNode,
useContext,
useState,
} from "react";
interface KnowledgeFilter { interface KnowledgeFilter {
id: string id: string;
name: string name: string;
description: string description: string;
query_data: string query_data: string;
owner: string owner: string;
created_at: string created_at: string;
updated_at: string updated_at: string;
} }
interface ParsedQueryData { export interface ParsedQueryData {
query: string query: string;
filters: { filters: {
data_sources: string[] data_sources: string[];
document_types: string[] document_types: string[];
owners: string[] owners: string[];
connector_types: string[] connector_types: string[];
} };
limit: number limit: number;
scoreThreshold: number scoreThreshold: number;
} }
interface KnowledgeFilterContextType { interface KnowledgeFilterContextType {
selectedFilter: KnowledgeFilter | null selectedFilter: KnowledgeFilter | null;
parsedFilterData: ParsedQueryData | null parsedFilterData: ParsedQueryData | null;
setSelectedFilter: (filter: KnowledgeFilter | null) => void setSelectedFilter: (filter: KnowledgeFilter | null) => void;
clearFilter: () => void clearFilter: () => void;
isPanelOpen: boolean isPanelOpen: boolean;
openPanel: () => void openPanel: () => void;
closePanel: () => void closePanel: () => void;
closePanelOnly: () => void closePanelOnly: () => void;
} }
const KnowledgeFilterContext = createContext<KnowledgeFilterContextType | undefined>(undefined) const KnowledgeFilterContext = createContext<
KnowledgeFilterContextType | undefined
>(undefined);
export function useKnowledgeFilter() { export function useKnowledgeFilter() {
const context = useContext(KnowledgeFilterContext) const context = useContext(KnowledgeFilterContext);
if (context === undefined) { if (context === undefined) {
throw new Error('useKnowledgeFilter must be used within a KnowledgeFilterProvider') throw new Error(
"useKnowledgeFilter must be used within a KnowledgeFilterProvider",
);
} }
return context return context;
} }
interface KnowledgeFilterProviderProps { interface KnowledgeFilterProviderProps {
children: ReactNode children: ReactNode;
} }
export function KnowledgeFilterProvider({ children }: KnowledgeFilterProviderProps) { export function KnowledgeFilterProvider({
const [selectedFilter, setSelectedFilterState] = useState<KnowledgeFilter | null>(null) children,
const [parsedFilterData, setParsedFilterData] = useState<ParsedQueryData | null>(null) }: KnowledgeFilterProviderProps) {
const [isPanelOpen, setIsPanelOpen] = useState(false) const [selectedFilter, setSelectedFilterState] =
useState<KnowledgeFilter | null>(null);
const [parsedFilterData, setParsedFilterData] =
useState<ParsedQueryData | null>(null);
const [isPanelOpen, setIsPanelOpen] = useState(false);
const setSelectedFilter = (filter: KnowledgeFilter | null) => { const setSelectedFilter = (filter: KnowledgeFilter | null) => {
setSelectedFilterState(filter) setSelectedFilterState(filter);
if (filter) { if (filter) {
try { try {
const parsed = JSON.parse(filter.query_data) as ParsedQueryData const parsed = JSON.parse(filter.query_data) as ParsedQueryData;
setParsedFilterData(parsed) setParsedFilterData(parsed);
// Auto-open panel when filter is selected // Auto-open panel when filter is selected
setIsPanelOpen(true) setIsPanelOpen(true);
} catch (error) { } catch (error) {
console.error('Error parsing filter data:', error) console.error("Error parsing filter data:", error);
setParsedFilterData(null) setParsedFilterData(null);
} }
} else { } else {
setParsedFilterData(null) setParsedFilterData(null);
setIsPanelOpen(false) setIsPanelOpen(false);
} }
} };
const clearFilter = () => { const clearFilter = () => {
setSelectedFilter(null) setSelectedFilter(null);
} };
const openPanel = () => { const openPanel = () => {
setIsPanelOpen(true) setIsPanelOpen(true);
} };
const closePanel = () => { const closePanel = () => {
setSelectedFilter(null) // This will also close the panel setSelectedFilter(null); // This will also close the panel
} };
const closePanelOnly = () => { const closePanelOnly = () => {
setIsPanelOpen(false) // Close panel but keep filter selected setIsPanelOpen(false); // Close panel but keep filter selected
} };
const value: KnowledgeFilterContextType = { const value: KnowledgeFilterContextType = {
selectedFilter, selectedFilter,
@ -100,11 +112,11 @@ export function KnowledgeFilterProvider({ children }: KnowledgeFilterProviderPro
openPanel, openPanel,
closePanel, closePanel,
closePanelOnly, closePanelOnly,
} };
return ( return (
<KnowledgeFilterContext.Provider value={value}> <KnowledgeFilterContext.Provider value={value}>
{children} {children}
</KnowledgeFilterContext.Provider> </KnowledgeFilterContext.Provider>
) );
} }

View file

@ -1,62 +1,88 @@
"use client" "use client";
import React, { createContext, useContext, useState, useEffect, useCallback } from 'react' import { useQueryClient } from "@tanstack/react-query";
import { toast } from 'sonner' import type React from "react";
import { useAuth } from '@/contexts/auth-context' import {
createContext,
useCallback,
useContext,
useEffect,
useState,
} from "react";
import { toast } from "sonner";
import { useAuth } from "@/contexts/auth-context";
export interface Task { export interface Task {
task_id: string task_id: string;
status: 'pending' | 'running' | 'processing' | 'completed' | 'failed' | 'error' status:
total_files?: number | "pending"
processed_files?: number | "running"
successful_files?: number | "processing"
failed_files?: number | "completed"
created_at: string | "failed"
updated_at: string | "error";
duration_seconds?: number total_files?: number;
result?: Record<string, unknown> processed_files?: number;
error?: string successful_files?: number;
files?: Record<string, Record<string, unknown>> failed_files?: number;
created_at: string;
updated_at: string;
duration_seconds?: number;
result?: Record<string, unknown>;
error?: string;
files?: Record<string, Record<string, unknown>>;
} }
interface TaskContextType { interface TaskContextType {
tasks: Task[] tasks: Task[];
addTask: (taskId: string) => void addTask: (taskId: string) => void;
removeTask: (taskId: string) => void removeTask: (taskId: string) => void;
refreshTasks: () => Promise<void> refreshTasks: () => Promise<void>;
cancelTask: (taskId: string) => Promise<void> cancelTask: (taskId: string) => Promise<void>;
isPolling: boolean isPolling: boolean;
isFetching: boolean isFetching: boolean;
isMenuOpen: boolean isMenuOpen: boolean;
toggleMenu: () => void toggleMenu: () => void;
} }
const TaskContext = createContext<TaskContextType | undefined>(undefined) const TaskContext = createContext<TaskContextType | undefined>(undefined);
export function TaskProvider({ children }: { children: React.ReactNode }) { export function TaskProvider({ children }: { children: React.ReactNode }) {
const [tasks, setTasks] = useState<Task[]>([]) const [tasks, setTasks] = useState<Task[]>([]);
const [isPolling, setIsPolling] = useState(false) const [isPolling, setIsPolling] = useState(false);
const [isFetching, setIsFetching] = useState(false) const [isFetching, setIsFetching] = useState(false);
const [isMenuOpen, setIsMenuOpen] = useState(false) const [isMenuOpen, setIsMenuOpen] = useState(false);
const { isAuthenticated, isNoAuthMode } = useAuth() const { isAuthenticated, isNoAuthMode } = useAuth();
const queryClient = useQueryClient();
const refetchSearch = () => {
queryClient.invalidateQueries({ queryKey: ["search"] });
};
const fetchTasks = useCallback(async () => { const fetchTasks = useCallback(async () => {
if (!isAuthenticated && !isNoAuthMode) return if (!isAuthenticated && !isNoAuthMode) return;
setIsFetching(true) setIsFetching(true);
try { try {
const response = await fetch('/api/tasks') const response = await fetch("/api/tasks");
if (response.ok) { if (response.ok) {
const data = await response.json() const data = await response.json();
const newTasks = data.tasks || [] const newTasks = data.tasks || [];
// Update tasks and check for status changes in the same state update // Update tasks and check for status changes in the same state update
setTasks(prevTasks => { setTasks((prevTasks) => {
// Check for newly completed tasks to show toasts // Check for newly completed tasks to show toasts
if (prevTasks.length > 0) { if (prevTasks.length > 0) {
newTasks.forEach((newTask: Task) => { newTasks.forEach((newTask: Task) => {
const oldTask = prevTasks.find(t => t.task_id === newTask.task_id) const oldTask = prevTasks.find(
if (oldTask && oldTask.status !== 'completed' && newTask.status === 'completed') { (t) => t.task_id === newTask.task_id,
);
if (
oldTask &&
oldTask.status !== "completed" &&
newTask.status === "completed"
) {
// Task just completed - show success toast // Task just completed - show success toast
toast.success("Task completed successfully!", { toast.success("Task completed successfully!", {
description: `Task ${newTask.task_id} has finished processing.`, description: `Task ${newTask.task_id} has finished processing.`,
@ -64,121 +90,136 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
label: "View", label: "View",
onClick: () => console.log("View task", newTask.task_id), onClick: () => console.log("View task", newTask.task_id),
}, },
}) });
} else if (oldTask && oldTask.status !== 'failed' && oldTask.status !== 'error' && (newTask.status === 'failed' || newTask.status === 'error')) { refetchSearch();
} else if (
oldTask &&
oldTask.status !== "failed" &&
oldTask.status !== "error" &&
(newTask.status === "failed" || newTask.status === "error")
) {
// Task just failed - show error toast // Task just failed - show error toast
toast.error("Task failed", { toast.error("Task failed", {
description: `Task ${newTask.task_id} failed: ${newTask.error || 'Unknown error'}`, description: `Task ${newTask.task_id} failed: ${
}) newTask.error || "Unknown error"
}`,
});
} }
}) });
} }
return newTasks return newTasks;
}) });
} }
} catch (error) { } catch (error) {
console.error('Failed to fetch tasks:', error) console.error("Failed to fetch tasks:", error);
} finally { } finally {
setIsFetching(false) setIsFetching(false);
} }
}, [isAuthenticated, isNoAuthMode]) // Removed 'tasks' from dependencies to prevent infinite loop! }, [isAuthenticated, isNoAuthMode]); // Removed 'tasks' from dependencies to prevent infinite loop!
const addTask = useCallback((taskId: string) => { const addTask = useCallback((taskId: string) => {
// Immediately start aggressive polling for the new task // Immediately start aggressive polling for the new task
let pollAttempts = 0 let pollAttempts = 0;
const maxPollAttempts = 30 // Poll for up to 30 seconds const maxPollAttempts = 30; // Poll for up to 30 seconds
const aggressivePoll = async () => { const aggressivePoll = async () => {
try { try {
const response = await fetch('/api/tasks') const response = await fetch("/api/tasks");
if (response.ok) { if (response.ok) {
const data = await response.json() const data = await response.json();
const newTasks = data.tasks || [] const newTasks = data.tasks || [];
const foundTask = newTasks.find((task: Task) => task.task_id === taskId) const foundTask = newTasks.find(
(task: Task) => task.task_id === taskId,
);
if (foundTask) { if (foundTask) {
// Task found! Update the tasks state // Task found! Update the tasks state
setTasks(prevTasks => { setTasks((prevTasks) => {
// Check if task is already in the list // Check if task is already in the list
const exists = prevTasks.some(t => t.task_id === taskId) const exists = prevTasks.some((t) => t.task_id === taskId);
if (!exists) { if (!exists) {
return [...prevTasks, foundTask] return [...prevTasks, foundTask];
} }
// Update existing task // Update existing task
return prevTasks.map(t => t.task_id === taskId ? foundTask : t) return prevTasks.map((t) =>
}) t.task_id === taskId ? foundTask : t,
return // Stop polling, we found it );
});
return; // Stop polling, we found it
} }
} }
} catch (error) { } catch (error) {
console.error('Aggressive polling failed:', error) console.error("Aggressive polling failed:", error);
} }
pollAttempts++ pollAttempts++;
if (pollAttempts < maxPollAttempts) { if (pollAttempts < maxPollAttempts) {
// Continue polling every 1 second for new tasks // Continue polling every 1 second for new tasks
setTimeout(aggressivePoll, 1000) setTimeout(aggressivePoll, 1000);
} }
} };
// Start aggressive polling after a short delay to allow backend to process // Start aggressive polling after a short delay to allow backend to process
setTimeout(aggressivePoll, 500) setTimeout(aggressivePoll, 500);
}, []) }, []);
const refreshTasks = useCallback(async () => { const refreshTasks = useCallback(async () => {
await fetchTasks() await fetchTasks();
}, [fetchTasks]) }, [fetchTasks]);
const removeTask = useCallback((taskId: string) => { const removeTask = useCallback((taskId: string) => {
setTasks(prev => prev.filter(task => task.task_id !== taskId)) setTasks((prev) => prev.filter((task) => task.task_id !== taskId));
}, []) }, []);
const cancelTask = useCallback(async (taskId: string) => { const cancelTask = useCallback(
try { async (taskId: string) => {
const response = await fetch(`/api/tasks/${taskId}/cancel`, { try {
method: 'POST', const response = await fetch(`/api/tasks/${taskId}/cancel`, {
}) method: "POST",
});
if (response.ok) {
// Immediately refresh tasks to show the updated status if (response.ok) {
await fetchTasks() // Immediately refresh tasks to show the updated status
toast.success("Task cancelled", { await fetchTasks();
description: `Task ${taskId.substring(0, 8)}... has been cancelled` toast.success("Task cancelled", {
}) description: `Task ${taskId.substring(0, 8)}... has been cancelled`,
} else { });
const errorData = await response.json().catch(() => ({})) } else {
throw new Error(errorData.error || 'Failed to cancel task') const errorData = await response.json().catch(() => ({}));
throw new Error(errorData.error || "Failed to cancel task");
}
} catch (error) {
console.error("Failed to cancel task:", error);
toast.error("Failed to cancel task", {
description: error instanceof Error ? error.message : "Unknown error",
});
} }
} catch (error) { },
console.error('Failed to cancel task:', error) [fetchTasks],
toast.error("Failed to cancel task", { );
description: error instanceof Error ? error.message : 'Unknown error'
})
}
}, [fetchTasks])
const toggleMenu = useCallback(() => { const toggleMenu = useCallback(() => {
setIsMenuOpen(prev => !prev) setIsMenuOpen((prev) => !prev);
}, []) }, []);
// Periodic polling for task updates // Periodic polling for task updates
useEffect(() => { useEffect(() => {
if (!isAuthenticated && !isNoAuthMode) return if (!isAuthenticated && !isNoAuthMode) return;
setIsPolling(true);
setIsPolling(true)
// Initial fetch // Initial fetch
fetchTasks() fetchTasks();
// Set up polling interval - every 3 seconds (more responsive for active tasks) // Set up polling interval - every 3 seconds (more responsive for active tasks)
const interval = setInterval(fetchTasks, 3000) const interval = setInterval(fetchTasks, 3000);
return () => { return () => {
clearInterval(interval) clearInterval(interval);
setIsPolling(false) setIsPolling(false);
} };
}, [isAuthenticated, isNoAuthMode, fetchTasks]) }, [isAuthenticated, isNoAuthMode, fetchTasks]);
const value: TaskContextType = { const value: TaskContextType = {
tasks, tasks,
@ -190,19 +231,15 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
isFetching, isFetching,
isMenuOpen, isMenuOpen,
toggleMenu, toggleMenu,
} };
return ( return <TaskContext.Provider value={value}>{children}</TaskContext.Provider>;
<TaskContext.Provider value={value}>
{children}
</TaskContext.Provider>
)
} }
export function useTask() { export function useTask() {
const context = useContext(TaskContext) const context = useContext(TaskContext);
if (context === undefined) { if (context === undefined) {
throw new Error('useTask must be used within a TaskProvider') throw new Error("useTask must be used within a TaskProvider");
} }
return context return context;
} }

View file

@ -8,6 +8,7 @@ from services.conversation_persistence_service import conversation_persistence
# In-memory storage for active conversation threads (preserves function calls) # In-memory storage for active conversation threads (preserves function calls)
active_conversations = {} active_conversations = {}
def get_user_conversations(user_id: str): def get_user_conversations(user_id: str):
"""Get conversation metadata for a user from persistent storage""" """Get conversation metadata for a user from persistent storage"""
return conversation_persistence.get_user_conversations(user_id) return conversation_persistence.get_user_conversations(user_id)
@ -23,7 +24,9 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None):
# If we have a previous_response_id, try to get the existing conversation # If we have a previous_response_id, try to get the existing conversation
if previous_response_id and previous_response_id in active_conversations[user_id]: if previous_response_id and previous_response_id in active_conversations[user_id]:
logger.debug(f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}") logger.debug(
f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}"
)
return active_conversations[user_id][previous_response_id] return active_conversations[user_id][previous_response_id]
# Create new conversation thread # Create new conversation thread
@ -48,7 +51,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
if user_id not in active_conversations: if user_id not in active_conversations:
active_conversations[user_id] = {} active_conversations[user_id] = {}
active_conversations[user_id][response_id] = conversation_state active_conversations[user_id][response_id] = conversation_state
# 2. Store only essential metadata to disk (simplified JSON) # 2. Store only essential metadata to disk (simplified JSON)
messages = conversation_state.get("messages", []) messages = conversation_state.get("messages", [])
first_user_msg = next((msg for msg in messages if msg.get("role") == "user"), None) first_user_msg = next((msg for msg in messages if msg.get("role") == "user"), None)
@ -56,7 +59,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
if first_user_msg: if first_user_msg:
content = first_user_msg.get("content", "") content = first_user_msg.get("content", "")
title = content[:50] + "..." if len(content) > 50 else content title = content[:50] + "..." if len(content) > 50 else content
metadata_only = { metadata_only = {
"response_id": response_id, "response_id": response_id,
"title": title, "title": title,
@ -64,11 +67,15 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
"created_at": conversation_state.get("created_at"), "created_at": conversation_state.get("created_at"),
"last_activity": conversation_state.get("last_activity"), "last_activity": conversation_state.get("last_activity"),
"previous_response_id": conversation_state.get("previous_response_id"), "previous_response_id": conversation_state.get("previous_response_id"),
"total_messages": len([msg for msg in messages if msg.get("role") in ["user", "assistant"]]), "total_messages": len(
[msg for msg in messages if msg.get("role") in ["user", "assistant"]]
),
# Don't store actual messages - Langflow has them # Don't store actual messages - Langflow has them
} }
conversation_persistence.store_conversation_thread(user_id, response_id, metadata_only) conversation_persistence.store_conversation_thread(
user_id, response_id, metadata_only
)
# Legacy function for backward compatibility # Legacy function for backward compatibility
@ -76,10 +83,12 @@ def get_user_conversation(user_id: str):
"""Get the most recent conversation for a user (for backward compatibility)""" """Get the most recent conversation for a user (for backward compatibility)"""
# Check in-memory conversations first (with function calls) # Check in-memory conversations first (with function calls)
if user_id in active_conversations and active_conversations[user_id]: if user_id in active_conversations and active_conversations[user_id]:
latest_response_id = max(active_conversations[user_id].keys(), latest_response_id = max(
key=lambda k: active_conversations[user_id][k]["last_activity"]) active_conversations[user_id].keys(),
key=lambda k: active_conversations[user_id][k]["last_activity"],
)
return active_conversations[user_id][latest_response_id] return active_conversations[user_id][latest_response_id]
# Fallback to metadata-only conversations # Fallback to metadata-only conversations
conversations = get_user_conversations(user_id) conversations = get_user_conversations(user_id)
if not conversations: if not conversations:
@ -342,7 +351,9 @@ async def async_chat(
"content": response_text, "content": response_text,
"response_id": response_id, "response_id": response_id,
"timestamp": datetime.now(), "timestamp": datetime.now(),
"response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls "response_data": response_obj.model_dump()
if hasattr(response_obj, "model_dump")
else str(response_obj), # Store complete response for function calls
} }
conversation_state["messages"].append(assistant_message) conversation_state["messages"].append(assistant_message)
logger.debug( logger.debug(
@ -428,7 +439,7 @@ async def async_chat_stream(
conversation_state["last_activity"] = datetime.now() conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state) store_conversation_thread(user_id, response_id, conversation_state)
logger.debug( logger.debug(
"Stored conversation thread", user_id=user_id, response_id=response_id f"Stored conversation thread for user {user_id} with response_id: {response_id}"
) )
@ -443,11 +454,8 @@ async def async_langflow_chat(
store_conversation: bool = True, store_conversation: bool = True,
): ):
logger.debug( logger.debug(
"async_langflow_chat called", "async_langflow_chat called",
user_id=user_id, user_id=user_id,
previous_response_id=previous_response_id, previous_response_id=previous_response_id,
) )
@ -496,7 +504,9 @@ async def async_langflow_chat(
"content": response_text, "content": response_text,
"response_id": response_id, "response_id": response_id,
"timestamp": datetime.now(), "timestamp": datetime.now(),
"response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls "response_data": response_obj.model_dump()
if hasattr(response_obj, "model_dump")
else str(response_obj), # Store complete response for function calls
} }
conversation_state["messages"].append(assistant_message) conversation_state["messages"].append(assistant_message)
logger.debug( logger.debug(
@ -511,17 +521,18 @@ async def async_langflow_chat(
if response_id: if response_id:
conversation_state["last_activity"] = datetime.now() conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state) store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership for this user # Claim session ownership for this user
try: try:
from services.session_ownership_service import session_ownership_service from services.session_ownership_service import session_ownership_service
session_ownership_service.claim_session(user_id, response_id)
print(f"[DEBUG] Claimed session {response_id} for user {user_id}")
except Exception as e:
print(f"[WARNING] Failed to claim session ownership: {e}")
print( session_ownership_service.claim_session(user_id, response_id)
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" logger.debug(f"Claimed session {response_id} for user {user_id}")
except Exception as e:
logger.warning(f"Failed to claim session ownership: {e}")
logger.debug(
f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
) )
logger.debug( logger.debug(
"Stored langflow conversation thread", "Stored langflow conversation thread",
@ -570,7 +581,7 @@ async def async_langflow_chat_stream(
full_response = "" full_response = ""
response_id = None response_id = None
collected_chunks = [] # Store all chunks for function call data collected_chunks = [] # Store all chunks for function call data
async for chunk in async_stream( async for chunk in async_stream(
langflow_client, langflow_client,
prompt, prompt,
@ -585,7 +596,7 @@ async def async_langflow_chat_stream(
chunk_data = json.loads(chunk.decode("utf-8")) chunk_data = json.loads(chunk.decode("utf-8"))
collected_chunks.append(chunk_data) # Collect all chunk data collected_chunks.append(chunk_data) # Collect all chunk data
if "delta" in chunk_data and "content" in chunk_data["delta"]: if "delta" in chunk_data and "content" in chunk_data["delta"]:
full_response += chunk_data["delta"]["content"] full_response += chunk_data["delta"]["content"]
# Extract response_id from chunk # Extract response_id from chunk
@ -612,20 +623,19 @@ async def async_langflow_chat_stream(
if response_id: if response_id:
conversation_state["last_activity"] = datetime.now() conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state) store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership for this user # Claim session ownership for this user
try: try:
from services.session_ownership_service import session_ownership_service from services.session_ownership_service import session_ownership_service
session_ownership_service.claim_session(user_id, response_id) session_ownership_service.claim_session(user_id, response_id)
print(f"[DEBUG] Claimed session {response_id} for user {user_id}") logger.debug(f"Claimed session {response_id} for user {user_id}")
except Exception as e: except Exception as e:
print(f"[WARNING] Failed to claim session ownership: {e}") logger.warning(f"Failed to claim session ownership: {e}")
print( logger.debug(
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
) )
logger.debug( logger.debug(
"Stored langflow conversation thread", f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
user_id=user_id,
response_id=response_id,
) )

View file

@ -1,4 +1,5 @@
from starlette.responses import JSONResponse from starlette.responses import JSONResponse
from utils.logging_config import get_logger
from config.settings import ( from config.settings import (
LANGFLOW_URL, LANGFLOW_URL,
LANGFLOW_CHAT_FLOW_ID, LANGFLOW_CHAT_FLOW_ID,
@ -7,6 +8,9 @@ from config.settings import (
clients, clients,
) )
logger = get_logger(__name__)
async def get_settings(request, session_manager): async def get_settings(request, session_manager):
"""Get application settings""" """Get application settings"""
@ -91,7 +95,7 @@ async def get_settings(request, session_manager):
settings["ingestion_defaults"] = ingestion_defaults settings["ingestion_defaults"] = ingestion_defaults
except Exception as e: except Exception as e:
print(f"[WARNING] Failed to fetch ingestion flow defaults: {e}") logger.warning(f"Failed to fetch ingestion flow defaults: {e}")
# Continue without ingestion defaults # Continue without ingestion defaults
return JSONResponse(settings) return JSONResponse(settings)

View file

@ -64,7 +64,6 @@ class GoogleDriveConnector(BaseConnector):
Integration points: Integration points:
- `BaseConnector` is your projects base class; minimum methods used here: - `BaseConnector` is your projects base class; minimum methods used here:
* self.emit(doc: ConnectorDocument) -> None (or adapt to your ingestion pipeline) * self.emit(doc: ConnectorDocument) -> None (or adapt to your ingestion pipeline)
* self.log/info/warn/error (optional)
- Adjust paths, logging, and error handling to match your project style. - Adjust paths, logging, and error handling to match your project style.
""" """
@ -81,8 +80,6 @@ class GoogleDriveConnector(BaseConnector):
_FILE_ID_ALIASES = ("file_ids", "selected_file_ids", "selected_files") _FILE_ID_ALIASES = ("file_ids", "selected_file_ids", "selected_files")
_FOLDER_ID_ALIASES = ("folder_ids", "selected_folder_ids", "selected_folders") _FOLDER_ID_ALIASES = ("folder_ids", "selected_folder_ids", "selected_folders")
def log(self, message: str) -> None:
print(message)
def emit(self, doc: ConnectorDocument) -> None: def emit(self, doc: ConnectorDocument) -> None:
""" """
@ -91,7 +88,7 @@ class GoogleDriveConnector(BaseConnector):
""" """
# If BaseConnector has an emit method, call super().emit(doc) # If BaseConnector has an emit method, call super().emit(doc)
# Otherwise, implement your custom logic here. # Otherwise, implement your custom logic here.
print(f"Emitting document: {doc.id} ({doc.filename})") logger.debug(f"Emitting document: {doc.id} ({doc.filename})")
def __init__(self, config: Dict[str, Any]) -> None: def __init__(self, config: Dict[str, Any]) -> None:
# Read from config OR env (backend env, not NEXT_PUBLIC_*): # Read from config OR env (backend env, not NEXT_PUBLIC_*):
@ -433,7 +430,7 @@ class GoogleDriveConnector(BaseConnector):
# If still not authenticated, bail (caller should kick off OAuth init) # If still not authenticated, bail (caller should kick off OAuth init)
if not await self.oauth.is_authenticated(): if not await self.oauth.is_authenticated():
self.log("authenticate: no valid credentials; run OAuth init/callback first.") logger.debug("authenticate: no valid credentials; run OAuth init/callback first.")
return False return False
# Build Drive service from OAuth helper # Build Drive service from OAuth helper
@ -482,7 +479,7 @@ class GoogleDriveConnector(BaseConnector):
except Exception as e: except Exception as e:
# Optionally log error with your base class logger # Optionally log error with your base class logger
try: try:
self.log(f"GoogleDriveConnector.list_files failed: {e}") logger.error(f"GoogleDriveConnector.list_files failed: {e}")
except Exception: except Exception:
pass pass
return {"files": [], "next_page_token": None} return {"files": [], "next_page_token": None}
@ -500,7 +497,7 @@ class GoogleDriveConnector(BaseConnector):
except Exception as e: except Exception as e:
# Use your base class logger if available # Use your base class logger if available
try: try:
self.log(f"Download failed for {file_id}: {e}") logger.error(f"Download failed for {file_id}: {e}")
except Exception: except Exception:
pass pass
raise raise
@ -567,7 +564,7 @@ class GoogleDriveConnector(BaseConnector):
except Exception as e: except Exception as e:
# Optional: use your base logger # Optional: use your base logger
try: try:
self.log(f"Failed to get start page token: {e}") logger.error(f"Failed to get start page token: {e}")
except Exception: except Exception:
pass pass
raise raise
@ -634,7 +631,7 @@ class GoogleDriveConnector(BaseConnector):
ok = await self.authenticate() ok = await self.authenticate()
if not ok: if not ok:
try: try:
self.log("cleanup_subscription: not authenticated") logger.error("cleanup_subscription: not authenticated")
except Exception: except Exception:
pass pass
return False return False
@ -662,7 +659,7 @@ class GoogleDriveConnector(BaseConnector):
if not resource_id: if not resource_id:
try: try:
self.log( logger.error(
f"cleanup_subscription: missing resource_id for channel {subscription_id}. " f"cleanup_subscription: missing resource_id for channel {subscription_id}. "
f"Persist (channel_id, resource_id) when creating the subscription." f"Persist (channel_id, resource_id) when creating the subscription."
) )
@ -684,7 +681,7 @@ class GoogleDriveConnector(BaseConnector):
except Exception as e: except Exception as e:
try: try:
self.log(f"cleanup_subscription failed for {subscription_id}: {e}") logger.error(f"cleanup_subscription failed for {subscription_id}: {e}")
except Exception: except Exception:
pass pass
return False return False
@ -708,7 +705,7 @@ class GoogleDriveConnector(BaseConnector):
ok = await self.authenticate() ok = await self.authenticate()
if not ok: if not ok:
try: try:
self.log("handle_webhook: not authenticated") logger.error("handle_webhook: not authenticated")
except Exception: except Exception:
pass pass
return affected return affected
@ -728,7 +725,7 @@ class GoogleDriveConnector(BaseConnector):
except Exception as e: except Exception as e:
selected_ids = set() selected_ids = set()
try: try:
self.log(f"handle_webhook: scope build failed, proceeding unfiltered: {e}") logger.error(f"handle_webhook: scope build failed, proceeding unfiltered: {e}")
except Exception: except Exception:
pass pass
@ -797,7 +794,7 @@ class GoogleDriveConnector(BaseConnector):
except Exception as e: except Exception as e:
try: try:
self.log(f"handle_webhook failed: {e}") logger.error(f"handle_webhook failed: {e}")
except Exception: except Exception:
pass pass
return [] return []
@ -814,7 +811,7 @@ class GoogleDriveConnector(BaseConnector):
blob = self._download_file_bytes(meta) blob = self._download_file_bytes(meta)
except HttpError as e: except HttpError as e:
# Skip/record failures # Skip/record failures
self.log(f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}") logger.error(f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}")
continue continue
from datetime import datetime from datetime import datetime

View file

@ -391,7 +391,7 @@ class ChatService:
local_metadata[response_id] = conversation_metadata local_metadata[response_id] = conversation_metadata
# 2. Get actual conversations from Langflow database (source of truth for messages) # 2. Get actual conversations from Langflow database (source of truth for messages)
print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}") logger.debug(f"Attempting to fetch Langflow history for user: {user_id}")
langflow_history = ( langflow_history = (
await langflow_history_service.get_user_conversation_history( await langflow_history_service.get_user_conversation_history(
user_id, flow_id=LANGFLOW_CHAT_FLOW_ID user_id, flow_id=LANGFLOW_CHAT_FLOW_ID
@ -462,24 +462,24 @@ class ChatService:
) )
if langflow_history.get("conversations"): if langflow_history.get("conversations"):
print( logger.debug(
f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow" f"Added {len(langflow_history['conversations'])} historical conversations from Langflow"
) )
elif langflow_history.get("error"): elif langflow_history.get("error"):
print( logger.debug(
f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}" f"Could not fetch Langflow history for user {user_id}: {langflow_history['error']}"
) )
else: else:
print(f"[DEBUG] No Langflow conversations found for user {user_id}") logger.debug(f"No Langflow conversations found for user {user_id}")
except Exception as e: except Exception as e:
print(f"[ERROR] Failed to fetch Langflow history: {e}") logger.error(f"Failed to fetch Langflow history: {e}")
# Continue with just in-memory conversations # Continue with just in-memory conversations
# Sort by last activity (most recent first) # Sort by last activity (most recent first)
all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
print( logger.debug(
f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)" f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)"
) )

View file

@ -8,7 +8,9 @@ import os
from typing import Dict, Any from typing import Dict, Any
from datetime import datetime from datetime import datetime
import threading import threading
from utils.logging_config import get_logger
logger = get_logger(__name__)
class ConversationPersistenceService: class ConversationPersistenceService:
"""Simple service to persist conversations to disk""" """Simple service to persist conversations to disk"""
@ -24,10 +26,10 @@ class ConversationPersistenceService:
try: try:
with open(self.storage_file, 'r', encoding='utf-8') as f: with open(self.storage_file, 'r', encoding='utf-8') as f:
data = json.load(f) data = json.load(f)
print(f"Loaded {self._count_total_conversations(data)} conversations from {self.storage_file}") logger.debug(f"Loaded {self._count_total_conversations(data)} conversations from {self.storage_file}")
return data return data
except Exception as e: except Exception as e:
print(f"Error loading conversations from {self.storage_file}: {e}") logger.error(f"Error loading conversations from {self.storage_file}: {e}")
return {} return {}
return {} return {}
@ -37,9 +39,9 @@ class ConversationPersistenceService:
with self.lock: with self.lock:
with open(self.storage_file, 'w', encoding='utf-8') as f: with open(self.storage_file, 'w', encoding='utf-8') as f:
json.dump(self._conversations, f, indent=2, ensure_ascii=False, default=str) json.dump(self._conversations, f, indent=2, ensure_ascii=False, default=str)
print(f"Saved {self._count_total_conversations(self._conversations)} conversations to {self.storage_file}") logger.debug(f"Saved {self._count_total_conversations(self._conversations)} conversations to {self.storage_file}")
except Exception as e: except Exception as e:
print(f"Error saving conversations to {self.storage_file}: {e}") logger.error(f"Error saving conversations to {self.storage_file}: {e}")
def _count_total_conversations(self, data: Dict[str, Any]) -> int: def _count_total_conversations(self, data: Dict[str, Any]) -> int:
"""Count total conversations across all users""" """Count total conversations across all users"""
@ -89,14 +91,14 @@ class ConversationPersistenceService:
if user_id in self._conversations and response_id in self._conversations[user_id]: if user_id in self._conversations and response_id in self._conversations[user_id]:
del self._conversations[user_id][response_id] del self._conversations[user_id][response_id]
self._save_conversations() self._save_conversations()
print(f"Deleted conversation {response_id} for user {user_id}") logger.debug(f"Deleted conversation {response_id} for user {user_id}")
def clear_user_conversations(self, user_id: str): def clear_user_conversations(self, user_id: str):
"""Clear all conversations for a user""" """Clear all conversations for a user"""
if user_id in self._conversations: if user_id in self._conversations:
del self._conversations[user_id] del self._conversations[user_id]
self._save_conversations() self._save_conversations()
print(f"Cleared all conversations for user {user_id}") logger.debug(f"Cleared all conversations for user {user_id}")
def get_storage_stats(self) -> Dict[str, Any]: def get_storage_stats(self) -> Dict[str, Any]:
"""Get statistics about stored conversations""" """Get statistics about stored conversations"""

View file

@ -6,7 +6,9 @@ Simplified service that retrieves message history from Langflow using shared cli
from typing import List, Dict, Optional, Any from typing import List, Dict, Optional, Any
from config.settings import clients from config.settings import clients
from utils.logging_config import get_logger
logger = get_logger(__name__)
class LangflowHistoryService: class LangflowHistoryService:
"""Simplified service to retrieve message history from Langflow""" """Simplified service to retrieve message history from Langflow"""
@ -29,14 +31,14 @@ class LangflowHistoryService:
if response.status_code == 200: if response.status_code == 200:
session_ids = response.json() session_ids = response.json()
print(f"Found {len(session_ids)} total sessions from Langflow") logger.debug(f"Found {len(session_ids)} total sessions from Langflow")
return session_ids return session_ids
else: else:
print(f"Failed to get sessions: {response.status_code} - {response.text}") logger.error(f"Failed to get sessions: {response.status_code} - {response.text}")
return [] return []
except Exception as e: except Exception as e:
print(f"Error getting user sessions: {e}") logger.error(f"Error getting user sessions: {e}")
return [] return []
async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]: async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]:
@ -56,11 +58,11 @@ class LangflowHistoryService:
# Convert to OpenRAG format # Convert to OpenRAG format
return self._convert_langflow_messages(messages) return self._convert_langflow_messages(messages)
else: else:
print(f"Failed to get messages for session {session_id}: {response.status_code}") logger.error(f"Failed to get messages for session {session_id}: {response.status_code}")
return [] return []
except Exception as e: except Exception as e:
print(f"Error getting session messages: {e}") logger.error(f"Error getting session messages: {e}")
return [] return []
def _convert_langflow_messages(self, langflow_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: def _convert_langflow_messages(self, langflow_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
@ -114,7 +116,7 @@ class LangflowHistoryService:
converted_messages.append(converted_msg) converted_messages.append(converted_msg)
except Exception as e: except Exception as e:
print(f"Error converting message: {e}") logger.error(f"Error converting message: {e}")
continue continue
return converted_messages return converted_messages
@ -159,7 +161,7 @@ class LangflowHistoryService:
} }
except Exception as e: except Exception as e:
print(f"Error getting user conversation history: {e}") logger.error(f"Error getting user conversation history: {e}")
return { return {
"error": str(e), "error": str(e),
"conversations": [] "conversations": []

View file

@ -7,7 +7,9 @@ import json
import os import os
from typing import Dict, List, Optional from typing import Dict, List, Optional
from datetime import datetime from datetime import datetime
from utils.logging_config import get_logger
logger = get_logger(__name__)
class SessionOwnershipService: class SessionOwnershipService:
"""Simple service to track which user owns which session""" """Simple service to track which user owns which session"""
@ -23,7 +25,7 @@ class SessionOwnershipService:
with open(self.ownership_file, 'r') as f: with open(self.ownership_file, 'r') as f:
return json.load(f) return json.load(f)
except Exception as e: except Exception as e:
print(f"Error loading session ownership data: {e}") logger.error(f"Error loading session ownership data: {e}")
return {} return {}
return {} return {}
@ -32,9 +34,9 @@ class SessionOwnershipService:
try: try:
with open(self.ownership_file, 'w') as f: with open(self.ownership_file, 'w') as f:
json.dump(self.ownership_data, f, indent=2) json.dump(self.ownership_data, f, indent=2)
print(f"Saved session ownership data to {self.ownership_file}") logger.debug(f"Saved session ownership data to {self.ownership_file}")
except Exception as e: except Exception as e:
print(f"Error saving session ownership data: {e}") logger.error(f"Error saving session ownership data: {e}")
def claim_session(self, user_id: str, session_id: str): def claim_session(self, user_id: str, session_id: str):
"""Claim a session for a user""" """Claim a session for a user"""
@ -45,7 +47,7 @@ class SessionOwnershipService:
"last_accessed": datetime.now().isoformat() "last_accessed": datetime.now().isoformat()
} }
self._save_ownership_data() self._save_ownership_data()
print(f"Claimed session {session_id} for user {user_id}") logger.debug(f"Claimed session {session_id} for user {user_id}")
else: else:
# Update last accessed time # Update last accessed time
self.ownership_data[session_id]["last_accessed"] = datetime.now().isoformat() self.ownership_data[session_id]["last_accessed"] = datetime.now().isoformat()