diff --git a/.env.example b/.env.example index cbb994e4..2d163a52 100644 --- a/.env.example +++ b/.env.example @@ -12,4 +12,8 @@ MICROSOFT_GRAPH_OAUTH_CLIENT_ID= MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET= # Optional dns routable from google (etc.) to handle continous ingest (ngrok works) WEBHOOK_BASE_URL= + OPENAI_API_KEY= + +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= diff --git a/frontend/src/app/admin/page.tsx b/frontend/src/app/admin/page.tsx index d3847701..121a460b 100644 --- a/frontend/src/app/admin/page.tsx +++ b/frontend/src/app/admin/page.tsx @@ -1,22 +1,45 @@ "use client" -import { useState } from "react" +import { useState, useEffect } from "react" import { Button } from "@/components/ui/button" import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card" import { Input } from "@/components/ui/input" import { Label } from "@/components/ui/label" -import { Upload, FolderOpen, Loader2 } from "lucide-react" +import { Upload, FolderOpen, Loader2, Cloud } from "lucide-react" import { ProtectedRoute } from "@/components/protected-route" import { useTask } from "@/contexts/task-context" function AdminPage() { + console.log("AdminPage component rendered!") const [fileUploadLoading, setFileUploadLoading] = useState(false) const [pathUploadLoading, setPathUploadLoading] = useState(false) const [selectedFile, setSelectedFile] = useState(null) const [folderPath, setFolderPath] = useState("/app/documents/") + const [bucketUploadLoading, setBucketUploadLoading] = useState(false) + const [bucketUrl, setBucketUrl] = useState("s3://") const [uploadStatus, setUploadStatus] = useState("") + const [awsEnabled, setAwsEnabled] = useState(false) const { addTask } = useTask() + useEffect(() => { + console.log("AdminPage useEffect running - checking AWS availability") + const checkAws = async () => { + try { + console.log("Making request to /api/upload_options") + const res = await fetch("/api/upload_options") + console.log("Response status:", res.status, "OK:", res.ok) + if (res.ok) { + const data = await res.json() + console.log("Response data:", data) + setAwsEnabled(Boolean(data.aws)) + } + } catch (err) { + console.error("Failed to check AWS availability", err) + } + } + checkAws() + }, []) + const handleFileUpload = async (e: React.FormEvent) => { e.preventDefault() if (!selectedFile) return @@ -51,6 +74,47 @@ function AdminPage() { } } + const handleBucketUpload = async (e: React.FormEvent) => { + e.preventDefault() + if (!bucketUrl.trim()) return + + setBucketUploadLoading(true) + setUploadStatus("") + + try { + const response = await fetch("/api/upload_bucket", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ s3_url: bucketUrl }), + }) + + const result = await response.json() + + if (response.status === 201) { + const taskId = result.task_id || result.id + const totalFiles = result.total_files || 0 + + if (!taskId) { + throw new Error("No task ID received from server") + } + + addTask(taskId) + setUploadStatus(`🔄 Processing started for ${totalFiles} files. Check the task notification panel for real-time progress. (Task ID: ${taskId})`) + setBucketUrl("") + } else { + setUploadStatus(`Error: ${result.error || "Bucket processing failed"}`) + } + } catch (error) { + setUploadStatus( + `Error: ${error instanceof Error ? error.message : "Bucket processing failed"}`, + ) + } finally { + setBucketUploadLoading(false) + } + } + const handlePathUpload = async (e: React.FormEvent) => { e.preventDefault() if (!folderPath.trim()) return @@ -121,7 +185,7 @@ function AdminPage() { )} -
+
@@ -207,6 +271,50 @@ function AdminPage() { + {awsEnabled && ( + + + + + Process Bucket + + + Process all documents from an S3 bucket. AWS credentials must be set as environment variables. + + + +
+
+ + setBucketUrl(e.target.value)} + /> +
+ +
+
+
+ )}
) diff --git a/frontend/src/app/knowledge-sources/page.tsx b/frontend/src/app/knowledge-sources/page.tsx index 89c79b5e..460fa244 100644 --- a/frontend/src/app/knowledge-sources/page.tsx +++ b/frontend/src/app/knowledge-sources/page.tsx @@ -7,7 +7,7 @@ import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/com import { Badge } from "@/components/ui/badge" import { Input } from "@/components/ui/input" import { Label } from "@/components/ui/label" -import { Upload, FolderOpen, Loader2, PlugZap, RefreshCw, Download } from "lucide-react" +import { Upload, FolderOpen, Loader2, PlugZap, RefreshCw, Download, Cloud } from "lucide-react" import { ProtectedRoute } from "@/components/protected-route" import { useTask } from "@/contexts/task-context" import { useAuth } from "@/contexts/auth-context" @@ -50,7 +50,10 @@ function KnowledgeSourcesPage() { const [fileUploadLoading, setFileUploadLoading] = useState(false) const [pathUploadLoading, setPathUploadLoading] = useState(false) const [folderPath, setFolderPath] = useState("/app/documents/") + const [bucketUploadLoading, setBucketUploadLoading] = useState(false) + const [bucketUrl, setBucketUrl] = useState("s3://") const [uploadStatus, setUploadStatus] = useState("") + const [awsEnabled, setAwsEnabled] = useState(false) // Connectors state const [connectors, setConnectors] = useState([]) @@ -144,6 +147,50 @@ function KnowledgeSourcesPage() { } } + const handleBucketUpload = async (e: React.FormEvent) => { + e.preventDefault() + if (!bucketUrl.trim()) return + + setBucketUploadLoading(true) + setUploadStatus("") + + try { + const response = await fetch("/api/upload_bucket", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ s3_url: bucketUrl }), + }) + + const result = await response.json() + + if (response.status === 201) { + const taskId = result.task_id || result.id + const totalFiles = result.total_files || 0 + + if (!taskId) { + throw new Error("No task ID received from server") + } + + addTask(taskId) + setUploadStatus(`🔄 Processing started for ${totalFiles} files. Check the task notification panel for real-time progress. (Task ID: ${taskId})`) + setBucketUrl("s3://") + + // Refresh stats after successful bucket upload + fetchStats() + } else { + setUploadStatus(`Error: ${result.error || "Bucket processing failed"}`) + } + } catch (error) { + setUploadStatus( + `Error: ${error instanceof Error ? error.message : "Bucket processing failed"}`, + ) + } finally { + setBucketUploadLoading(false) + } + } + // Helper function to get connector icon const getConnectorIcon = (iconName: string) => { const iconMap: { [key: string]: React.ReactElement } = { @@ -374,6 +421,22 @@ function KnowledgeSourcesPage() { } } + // Check AWS availability + useEffect(() => { + const checkAws = async () => { + try { + const res = await fetch("/api/upload_options") + if (res.ok) { + const data = await res.json() + setAwsEnabled(Boolean(data.aws)) + } + } catch (err) { + console.error("Failed to check AWS availability", err) + } + } + checkAws() + }, []) + // Initial stats fetch useEffect(() => { fetchStats() @@ -499,7 +562,7 @@ function KnowledgeSourcesPage() {

-
+
{/* File Upload Card */} @@ -562,6 +625,52 @@ function KnowledgeSourcesPage() { + + {/* S3 Bucket Upload Card - only show if AWS is enabled */} + {awsEnabled && ( + + + + + Process S3 Bucket + + + Process all documents from an S3 bucket. AWS credentials must be configured. + + + +
+
+ + setBucketUrl(e.target.value)} + /> +
+ +
+
+
+ )}
{/* Upload Status */} @@ -710,13 +819,6 @@ function KnowledgeSourcesPage() {
File storage
-
-
O
-
-
OneDrive
-
Microsoft cloud storage
-
-
B
diff --git a/frontend/src/components/protected-route.tsx b/frontend/src/components/protected-route.tsx index 04719462..775dae7d 100644 --- a/frontend/src/components/protected-route.tsx +++ b/frontend/src/components/protected-route.tsx @@ -13,6 +13,8 @@ export function ProtectedRoute({ children }: ProtectedRouteProps) { const { isLoading, isAuthenticated } = useAuth() const router = useRouter() const pathname = usePathname() + + console.log("ProtectedRoute - isLoading:", isLoading, "isAuthenticated:", isAuthenticated, "pathname:", pathname) useEffect(() => { if (!isLoading && !isAuthenticated) { diff --git a/frontend/src/components/user-nav.tsx b/frontend/src/components/user-nav.tsx index b818f06c..85eb578a 100644 --- a/frontend/src/components/user-nav.tsx +++ b/frontend/src/components/user-nav.tsx @@ -11,7 +11,7 @@ import { DropdownMenuTrigger, } from "@/components/ui/dropdown-menu" import { useAuth } from "@/contexts/auth-context" -import { LogIn, LogOut, User, Moon, Sun, Settings, ChevronsUpDown } from "lucide-react" +import { LogIn, LogOut, User, Moon, Sun, ChevronsUpDown } from "lucide-react" import { useTheme } from "next-themes" export function UserNav() { @@ -61,14 +61,6 @@ export function UserNav() {
- - - Profile - - - - Settings - setTheme(theme === "light" ? "dark" : "light")}> {theme === "light" ? ( diff --git a/pyproject.toml b/pyproject.toml index 437e96f0..a8511901 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dependencies = [ "starlette>=0.47.1", "torch>=2.7.1", "uvicorn>=0.35.0", + "boto3>=1.35.0", ] [tool.uv.sources] diff --git a/src/api/upload.py b/src/api/upload.py index e15ad48a..fa3d470a 100644 --- a/src/api/upload.py +++ b/src/api/upload.py @@ -1,4 +1,6 @@ import os +from urllib.parse import urlparse +import boto3 from starlette.requests import Request from starlette.responses import JSONResponse @@ -84,3 +86,58 @@ async def upload_context(request: Request, document_service, chat_service, sessi return JSONResponse(response_data) + +async def upload_options(request: Request, session_manager): + """Return availability of upload features""" + aws_enabled = bool( + os.getenv("AWS_ACCESS_KEY_ID") and os.getenv("AWS_SECRET_ACCESS_KEY") + ) + return JSONResponse({"aws": aws_enabled}) + + +async def upload_bucket(request: Request, task_service, session_manager): + """Process all files from an S3 bucket URL""" + if not os.getenv("AWS_ACCESS_KEY_ID") or not os.getenv("AWS_SECRET_ACCESS_KEY"): + return JSONResponse({"error": "AWS credentials not configured"}, status_code=400) + + payload = await request.json() + s3_url = payload.get("s3_url") + if not s3_url or not s3_url.startswith("s3://"): + return JSONResponse({"error": "Invalid S3 URL"}, status_code=400) + + parsed = urlparse(s3_url) + bucket = parsed.netloc + prefix = parsed.path.lstrip("/") + + s3_client = boto3.client("s3") + keys = [] + paginator = s3_client.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + for obj in page.get("Contents", []): + key = obj["Key"] + if not key.endswith("/"): + keys.append(key) + + if not keys: + return JSONResponse({"error": "No files found in bucket"}, status_code=400) + + user = request.state.user + jwt_token = request.cookies.get("auth_token") + + from models.processors import S3FileProcessor + + processor = S3FileProcessor( + task_service.document_service, + bucket, + s3_client=s3_client, + owner_user_id=user.user_id, + jwt_token=jwt_token, + ) + + task_id = await task_service.create_custom_task(user.user_id, keys, processor) + + return JSONResponse( + {"task_id": task_id, "total_files": len(keys), "status": "accepted"}, + status_code=201, + ) + diff --git a/src/main.py b/src/main.py index 3d9c5524..bd29bbee 100644 --- a/src/main.py +++ b/src/main.py @@ -162,12 +162,25 @@ def create_app(): session_manager=services['session_manager']) ), methods=["POST"]), - Route("/upload_path", + Route("/upload_path", require_auth(services['session_manager'])( partial(upload.upload_path, task_service=services['task_service'], session_manager=services['session_manager']) ), methods=["POST"]), + + Route("/upload_options", + require_auth(services['session_manager'])( + partial(upload.upload_options, + session_manager=services['session_manager']) + ), methods=["GET"]), + + Route("/upload_bucket", + require_auth(services['session_manager'])( + partial(upload.upload_bucket, + task_service=services['task_service'], + session_manager=services['session_manager']) + ), methods=["POST"]), Route("/tasks/{task_id}", require_auth(services['session_manager'])( diff --git a/src/models/processors.py b/src/models/processors.py index ef33984f..69a5429d 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -81,4 +81,96 @@ class ConnectorFileProcessor(TaskProcessor): file_task.status = TaskStatus.COMPLETED file_task.result = result - upload_task.successful_files += 1 \ No newline at end of file + upload_task.successful_files += 1 + + +class S3FileProcessor(TaskProcessor): + """Processor for files stored in S3 buckets""" + + def __init__( + self, + document_service, + bucket: str, + s3_client=None, + owner_user_id: str = None, + jwt_token: str = None, + ): + import boto3 + + self.document_service = document_service + self.bucket = bucket + self.s3_client = s3_client or boto3.client("s3") + self.owner_user_id = owner_user_id + self.jwt_token = jwt_token + + async def process_item(self, upload_task: UploadTask, item: str, file_task: FileTask) -> None: + """Download an S3 object and process it using DocumentService""" + from models.tasks import TaskStatus + import tempfile + import os + import time + import asyncio + import datetime + from config.settings import INDEX_NAME, EMBED_MODEL, clients + from services.document_service import chunk_texts_for_embeddings + from utils.document_processing import process_document_sync + + file_task.status = TaskStatus.RUNNING + file_task.updated_at = time.time() + + tmp = tempfile.NamedTemporaryFile(delete=False) + try: + # Download object to temporary file + self.s3_client.download_fileobj(self.bucket, item, tmp) + tmp.flush() + + loop = asyncio.get_event_loop() + slim_doc = await loop.run_in_executor( + self.document_service.process_pool, process_document_sync, tmp.name + ) + + opensearch_client = self.document_service.session_manager.get_user_opensearch_client( + self.owner_user_id, self.jwt_token + ) + exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"]) + if exists: + result = {"status": "unchanged", "id": slim_doc["id"]} + else: + texts = [c["text"] for c in slim_doc["chunks"]] + text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000) + embeddings = [] + for batch in text_batches: + resp = await clients.patched_async_client.embeddings.create( + model=EMBED_MODEL, input=batch + ) + embeddings.extend([d.embedding for d in resp.data]) + + for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): + chunk_doc = { + "document_id": slim_doc["id"], + "filename": slim_doc["filename"], + "mimetype": slim_doc["mimetype"], + "page": chunk["page"], + "text": chunk["text"], + "chunk_embedding": vect, + "owner": self.owner_user_id, + "indexed_time": datetime.datetime.now().isoformat(), + } + chunk_id = f"{slim_doc['id']}_{i}" + await opensearch_client.index(index=INDEX_NAME, id=chunk_id, body=chunk_doc) + + result = {"status": "indexed", "id": slim_doc["id"]} + + result["path"] = f"s3://{self.bucket}/{item}" + file_task.status = TaskStatus.COMPLETED + file_task.result = result + upload_task.successful_files += 1 + + except Exception as e: + file_task.status = TaskStatus.FAILED + file_task.error = str(e) + upload_task.failed_files += 1 + finally: + tmp.close() + os.remove(tmp.name) + file_task.updated_at = time.time() diff --git a/uv.lock b/uv.lock index 275fd012..85ba9b9b 100644 --- a/uv.lock +++ b/uv.lock @@ -131,6 +131,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/50/cd/30110dc0ffcf3b131156077b90e9f60ed75711223f306da4db08eff8403b/beautifulsoup4-4.13.4-py3-none-any.whl", hash = "sha256:9bbbb14bfde9d79f38b8cd5f8c7c85f4b8f2523190ebed90e950a8dea4cb1c4b", size = 187285, upload-time = "2025-04-15T17:05:12.221Z" }, ] +[[package]] +name = "boto3" +version = "1.40.12" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "botocore" }, + { name = "jmespath" }, + { name = "s3transfer" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/41/19/2c4d140a7f99b5903b21b9ccd7253c71f147c346c3c632b2117444cf2d65/boto3-1.40.12.tar.gz", hash = "sha256:c6b32aee193fbd2eb84696d2b5b2410dcda9fb4a385e1926cff908377d222247", size = 111959, upload-time = "2025-08-18T19:30:23.827Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/16/6e/5a9dcf38ad87838fb99742c4a3ab1b7507ad3a02c8c27a9ccda7a0bb5709/boto3-1.40.12-py3-none-any.whl", hash = "sha256:3c3d6731390b5b11f5e489d5d9daa57f0c3e171efb63ac8f47203df9c71812b3", size = 140075, upload-time = "2025-08-18T19:30:22.494Z" }, +] + +[[package]] +name = "botocore" +version = "1.40.12" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "jmespath" }, + { name = "python-dateutil" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/7d/b2/7933590fc5bca1980801b71e09db1a95581afff177cbf3c8a031d922885c/botocore-1.40.12.tar.gz", hash = "sha256:c6560578e799b47b762b7e555bd9c5dd5c29c5d23bd778a8a72e98c979b3c727", size = 14349930, upload-time = "2025-08-18T19:30:13.794Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1e/b6/65fd6e718c9538ba1462c9b71e9262bc723202ff203fe64ff66ff676d823/botocore-1.40.12-py3-none-any.whl", hash = "sha256:84e96004a8b426c5508f6b5600312d6271364269466a3a957dc377ad8effc438", size = 14018004, upload-time = "2025-08-18T19:30:09.054Z" }, +] + [[package]] name = "cachetools" version = "5.5.2" @@ -753,6 +781,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b3/4a/4175a563579e884192ba6e81725fc0448b042024419be8d83aa8a80a3f44/jiter-0.10.0-cp314-cp314t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3aa96f2abba33dc77f79b4cf791840230375f9534e5fac927ccceb58c5e604a5", size = 354213, upload-time = "2025-05-18T19:04:41.894Z" }, ] +[[package]] +name = "jmespath" +version = "1.0.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/00/2a/e867e8531cf3e36b41201936b7fa7ba7b5702dbef42922193f05c8976cd6/jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe", size = 25843, upload-time = "2022-06-17T18:00:12.224Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/31/b4/b9b800c45527aadd64d5b442f9b932b00648617eb5d63d2c7a6587b7cafc/jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980", size = 20256, upload-time = "2022-06-17T18:00:10.251Z" }, +] + [[package]] name = "jsonlines" version = "3.1.0" @@ -1336,6 +1373,7 @@ source = { virtual = "." } dependencies = [ { name = "agentd" }, { name = "aiofiles" }, + { name = "boto3" }, { name = "cryptography" }, { name = "docling" }, { name = "google-api-python-client" }, @@ -1355,6 +1393,7 @@ dependencies = [ requires-dist = [ { name = "agentd", specifier = ">=0.2.2" }, { name = "aiofiles", specifier = ">=24.1.0" }, + { name = "boto3", specifier = ">=1.35.0" }, { name = "cryptography", specifier = ">=45.0.6" }, { name = "docling", specifier = ">=2.41.0" }, { name = "google-api-python-client", specifier = ">=2.143.0" }, @@ -2001,6 +2040,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ce/c2/362f2cc36a7a57b47380061c23fc109c7222c1a544ffd24cda289ba19673/rtree-1.4.0-py3-none-win_amd64.whl", hash = "sha256:ba83efc7b7563905b1bfdfc14490c4bfb59e92e5e6156bdeb6ec5df5117252f4", size = 385221, upload-time = "2025-03-05T23:31:44.537Z" }, ] +[[package]] +name = "s3transfer" +version = "0.13.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "botocore" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/6d/05/d52bf1e65044b4e5e27d4e63e8d1579dbdec54fce685908ae09bc3720030/s3transfer-0.13.1.tar.gz", hash = "sha256:c3fdba22ba1bd367922f27ec8032d6a1cf5f10c934fb5d68cf60fd5a23d936cf", size = 150589, upload-time = "2025-07-18T19:22:42.31Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/6d/4f/d073e09df851cfa251ef7840007d04db3293a0482ce607d2b993926089be/s3transfer-0.13.1-py3-none-any.whl", hash = "sha256:a981aa7429be23fe6dfc13e80e4020057cbab622b08c0315288758d67cabc724", size = 85308, upload-time = "2025-07-18T19:22:40.947Z" }, +] + [[package]] name = "safetensors" version = "0.5.3"