aws s3 upload v0

This commit is contained in:
phact 2025-08-18 23:46:32 -04:00
parent 0aeaa93aba
commit 6704ea89fc
10 changed files with 445 additions and 23 deletions

View file

@ -12,4 +12,8 @@ MICROSOFT_GRAPH_OAUTH_CLIENT_ID=
MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET=
# Optional dns routable from google (etc.) to handle continous ingest (ngrok works)
WEBHOOK_BASE_URL=
OPENAI_API_KEY=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=

View file

@ -1,22 +1,45 @@
"use client"
import { useState } from "react"
import { useState, useEffect } from "react"
import { Button } from "@/components/ui/button"
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"
import { Input } from "@/components/ui/input"
import { Label } from "@/components/ui/label"
import { Upload, FolderOpen, Loader2 } from "lucide-react"
import { Upload, FolderOpen, Loader2, Cloud } from "lucide-react"
import { ProtectedRoute } from "@/components/protected-route"
import { useTask } from "@/contexts/task-context"
function AdminPage() {
console.log("AdminPage component rendered!")
const [fileUploadLoading, setFileUploadLoading] = useState(false)
const [pathUploadLoading, setPathUploadLoading] = useState(false)
const [selectedFile, setSelectedFile] = useState<File | null>(null)
const [folderPath, setFolderPath] = useState("/app/documents/")
const [bucketUploadLoading, setBucketUploadLoading] = useState(false)
const [bucketUrl, setBucketUrl] = useState("s3://")
const [uploadStatus, setUploadStatus] = useState<string>("")
const [awsEnabled, setAwsEnabled] = useState(false)
const { addTask } = useTask()
useEffect(() => {
console.log("AdminPage useEffect running - checking AWS availability")
const checkAws = async () => {
try {
console.log("Making request to /api/upload_options")
const res = await fetch("/api/upload_options")
console.log("Response status:", res.status, "OK:", res.ok)
if (res.ok) {
const data = await res.json()
console.log("Response data:", data)
setAwsEnabled(Boolean(data.aws))
}
} catch (err) {
console.error("Failed to check AWS availability", err)
}
}
checkAws()
}, [])
const handleFileUpload = async (e: React.FormEvent) => {
e.preventDefault()
if (!selectedFile) return
@ -51,6 +74,47 @@ function AdminPage() {
}
}
const handleBucketUpload = async (e: React.FormEvent) => {
e.preventDefault()
if (!bucketUrl.trim()) return
setBucketUploadLoading(true)
setUploadStatus("")
try {
const response = await fetch("/api/upload_bucket", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ s3_url: bucketUrl }),
})
const result = await response.json()
if (response.status === 201) {
const taskId = result.task_id || result.id
const totalFiles = result.total_files || 0
if (!taskId) {
throw new Error("No task ID received from server")
}
addTask(taskId)
setUploadStatus(`🔄 Processing started for ${totalFiles} files. Check the task notification panel for real-time progress. (Task ID: ${taskId})`)
setBucketUrl("")
} else {
setUploadStatus(`Error: ${result.error || "Bucket processing failed"}`)
}
} catch (error) {
setUploadStatus(
`Error: ${error instanceof Error ? error.message : "Bucket processing failed"}`,
)
} finally {
setBucketUploadLoading(false)
}
}
const handlePathUpload = async (e: React.FormEvent) => {
e.preventDefault()
if (!folderPath.trim()) return
@ -121,7 +185,7 @@ function AdminPage() {
</Card>
)}
<div className="grid gap-6 md:grid-cols-2">
<div className="grid gap-6 md:grid-cols-3">
<Card>
<CardHeader>
<CardTitle className="flex items-center gap-2">
@ -207,6 +271,50 @@ function AdminPage() {
</form>
</CardContent>
</Card>
{awsEnabled && (
<Card>
<CardHeader>
<CardTitle className="flex items-center gap-2">
<Cloud className="h-5 w-5" />
Process Bucket
</CardTitle>
<CardDescription>
Process all documents from an S3 bucket. AWS credentials must be set as environment variables.
</CardDescription>
</CardHeader>
<CardContent>
<form onSubmit={handleBucketUpload} className="space-y-4">
<div className="space-y-2">
<Label htmlFor="bucket-url">S3 URL</Label>
<Input
id="bucket-url"
type="text"
placeholder="s3://bucket/path"
value={bucketUrl}
onChange={(e) => setBucketUrl(e.target.value)}
/>
</div>
<Button
type="submit"
disabled={!bucketUrl.trim() || bucketUploadLoading}
className="w-full"
>
{bucketUploadLoading ? (
<>
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
Processing...
</>
) : (
<>
<Cloud className="mr-2 h-4 w-4" />
Process Bucket
</>
)}
</Button>
</form>
</CardContent>
</Card>
)}
</div>
</div>
)

View file

@ -7,7 +7,7 @@ import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/com
import { Badge } from "@/components/ui/badge"
import { Input } from "@/components/ui/input"
import { Label } from "@/components/ui/label"
import { Upload, FolderOpen, Loader2, PlugZap, RefreshCw, Download } from "lucide-react"
import { Upload, FolderOpen, Loader2, PlugZap, RefreshCw, Download, Cloud } from "lucide-react"
import { ProtectedRoute } from "@/components/protected-route"
import { useTask } from "@/contexts/task-context"
import { useAuth } from "@/contexts/auth-context"
@ -50,7 +50,10 @@ function KnowledgeSourcesPage() {
const [fileUploadLoading, setFileUploadLoading] = useState(false)
const [pathUploadLoading, setPathUploadLoading] = useState(false)
const [folderPath, setFolderPath] = useState("/app/documents/")
const [bucketUploadLoading, setBucketUploadLoading] = useState(false)
const [bucketUrl, setBucketUrl] = useState("s3://")
const [uploadStatus, setUploadStatus] = useState<string>("")
const [awsEnabled, setAwsEnabled] = useState(false)
// Connectors state
const [connectors, setConnectors] = useState<Connector[]>([])
@ -144,6 +147,50 @@ function KnowledgeSourcesPage() {
}
}
const handleBucketUpload = async (e: React.FormEvent) => {
e.preventDefault()
if (!bucketUrl.trim()) return
setBucketUploadLoading(true)
setUploadStatus("")
try {
const response = await fetch("/api/upload_bucket", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ s3_url: bucketUrl }),
})
const result = await response.json()
if (response.status === 201) {
const taskId = result.task_id || result.id
const totalFiles = result.total_files || 0
if (!taskId) {
throw new Error("No task ID received from server")
}
addTask(taskId)
setUploadStatus(`🔄 Processing started for ${totalFiles} files. Check the task notification panel for real-time progress. (Task ID: ${taskId})`)
setBucketUrl("s3://")
// Refresh stats after successful bucket upload
fetchStats()
} else {
setUploadStatus(`Error: ${result.error || "Bucket processing failed"}`)
}
} catch (error) {
setUploadStatus(
`Error: ${error instanceof Error ? error.message : "Bucket processing failed"}`,
)
} finally {
setBucketUploadLoading(false)
}
}
// Helper function to get connector icon
const getConnectorIcon = (iconName: string) => {
const iconMap: { [key: string]: React.ReactElement } = {
@ -374,6 +421,22 @@ function KnowledgeSourcesPage() {
}
}
// Check AWS availability
useEffect(() => {
const checkAws = async () => {
try {
const res = await fetch("/api/upload_options")
if (res.ok) {
const data = await res.json()
setAwsEnabled(Boolean(data.aws))
}
} catch (err) {
console.error("Failed to check AWS availability", err)
}
}
checkAws()
}, [])
// Initial stats fetch
useEffect(() => {
fetchStats()
@ -499,7 +562,7 @@ function KnowledgeSourcesPage() {
</p>
</div>
<div className="grid gap-6 md:grid-cols-2">
<div className={`grid gap-6 ${awsEnabled ? 'md:grid-cols-3' : 'md:grid-cols-2'}`}>
{/* File Upload Card */}
<Card className="flex flex-col">
<CardHeader>
@ -562,6 +625,52 @@ function KnowledgeSourcesPage() {
</form>
</CardContent>
</Card>
{/* S3 Bucket Upload Card - only show if AWS is enabled */}
{awsEnabled && (
<Card className="flex flex-col">
<CardHeader>
<CardTitle className="flex items-center gap-2">
<Cloud className="h-5 w-5" />
Process S3 Bucket
</CardTitle>
<CardDescription>
Process all documents from an S3 bucket. AWS credentials must be configured.
</CardDescription>
</CardHeader>
<CardContent className="flex-1 flex flex-col justify-end">
<form onSubmit={handleBucketUpload} className="space-y-4">
<div className="space-y-2">
<Label htmlFor="bucket-url">S3 URL</Label>
<Input
id="bucket-url"
type="text"
placeholder="s3://bucket/path"
value={bucketUrl}
onChange={(e) => setBucketUrl(e.target.value)}
/>
</div>
<Button
type="submit"
disabled={!bucketUrl.trim() || bucketUploadLoading}
className="w-full"
>
{bucketUploadLoading ? (
<>
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
Processing...
</>
) : (
<>
<Cloud className="mr-2 h-4 w-4" />
Process Bucket
</>
)}
</Button>
</form>
</CardContent>
</Card>
)}
</div>
{/* Upload Status */}
@ -710,13 +819,6 @@ function KnowledgeSourcesPage() {
<div className="text-sm text-muted-foreground">File storage</div>
</div>
</div>
<div className="flex items-center gap-3 p-3 rounded-lg border border-dashed">
<div className="w-8 h-8 bg-purple-600 rounded flex items-center justify-center text-white font-bold leading-none">O</div>
<div>
<div className="font-medium">OneDrive</div>
<div className="text-sm text-muted-foreground">Microsoft cloud storage</div>
</div>
</div>
<div className="flex items-center gap-3 p-3 rounded-lg border border-dashed">
<div className="w-8 h-8 bg-orange-600 rounded flex items-center justify-center text-white font-bold leading-none">B</div>
<div>

View file

@ -13,6 +13,8 @@ export function ProtectedRoute({ children }: ProtectedRouteProps) {
const { isLoading, isAuthenticated } = useAuth()
const router = useRouter()
const pathname = usePathname()
console.log("ProtectedRoute - isLoading:", isLoading, "isAuthenticated:", isAuthenticated, "pathname:", pathname)
useEffect(() => {
if (!isLoading && !isAuthenticated) {

View file

@ -11,7 +11,7 @@ import {
DropdownMenuTrigger,
} from "@/components/ui/dropdown-menu"
import { useAuth } from "@/contexts/auth-context"
import { LogIn, LogOut, User, Moon, Sun, Settings, ChevronsUpDown } from "lucide-react"
import { LogIn, LogOut, User, Moon, Sun, ChevronsUpDown } from "lucide-react"
import { useTheme } from "next-themes"
export function UserNav() {
@ -61,14 +61,6 @@ export function UserNav() {
</div>
</DropdownMenuLabel>
<DropdownMenuSeparator />
<DropdownMenuItem>
<User className="mr-2 h-4 w-4" />
Profile
</DropdownMenuItem>
<DropdownMenuItem>
<Settings className="mr-2 h-4 w-4" />
<span>Settings</span>
</DropdownMenuItem>
<DropdownMenuItem onClick={() => setTheme(theme === "light" ? "dark" : "light")}>
{theme === "light" ? (
<Moon className="mr-2 h-4 w-4" />

View file

@ -20,6 +20,7 @@ dependencies = [
"starlette>=0.47.1",
"torch>=2.7.1",
"uvicorn>=0.35.0",
"boto3>=1.35.0",
]
[tool.uv.sources]

View file

@ -1,4 +1,6 @@
import os
from urllib.parse import urlparse
import boto3
from starlette.requests import Request
from starlette.responses import JSONResponse
@ -84,3 +86,58 @@ async def upload_context(request: Request, document_service, chat_service, sessi
return JSONResponse(response_data)
async def upload_options(request: Request, session_manager):
"""Return availability of upload features"""
aws_enabled = bool(
os.getenv("AWS_ACCESS_KEY_ID") and os.getenv("AWS_SECRET_ACCESS_KEY")
)
return JSONResponse({"aws": aws_enabled})
async def upload_bucket(request: Request, task_service, session_manager):
"""Process all files from an S3 bucket URL"""
if not os.getenv("AWS_ACCESS_KEY_ID") or not os.getenv("AWS_SECRET_ACCESS_KEY"):
return JSONResponse({"error": "AWS credentials not configured"}, status_code=400)
payload = await request.json()
s3_url = payload.get("s3_url")
if not s3_url or not s3_url.startswith("s3://"):
return JSONResponse({"error": "Invalid S3 URL"}, status_code=400)
parsed = urlparse(s3_url)
bucket = parsed.netloc
prefix = parsed.path.lstrip("/")
s3_client = boto3.client("s3")
keys = []
paginator = s3_client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
key = obj["Key"]
if not key.endswith("/"):
keys.append(key)
if not keys:
return JSONResponse({"error": "No files found in bucket"}, status_code=400)
user = request.state.user
jwt_token = request.cookies.get("auth_token")
from models.processors import S3FileProcessor
processor = S3FileProcessor(
task_service.document_service,
bucket,
s3_client=s3_client,
owner_user_id=user.user_id,
jwt_token=jwt_token,
)
task_id = await task_service.create_custom_task(user.user_id, keys, processor)
return JSONResponse(
{"task_id": task_id, "total_files": len(keys), "status": "accepted"},
status_code=201,
)

View file

@ -162,12 +162,25 @@ def create_app():
session_manager=services['session_manager'])
), methods=["POST"]),
Route("/upload_path",
Route("/upload_path",
require_auth(services['session_manager'])(
partial(upload.upload_path,
task_service=services['task_service'],
session_manager=services['session_manager'])
), methods=["POST"]),
Route("/upload_options",
require_auth(services['session_manager'])(
partial(upload.upload_options,
session_manager=services['session_manager'])
), methods=["GET"]),
Route("/upload_bucket",
require_auth(services['session_manager'])(
partial(upload.upload_bucket,
task_service=services['task_service'],
session_manager=services['session_manager'])
), methods=["POST"]),
Route("/tasks/{task_id}",
require_auth(services['session_manager'])(

View file

@ -81,4 +81,96 @@ class ConnectorFileProcessor(TaskProcessor):
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
upload_task.successful_files += 1
class S3FileProcessor(TaskProcessor):
"""Processor for files stored in S3 buckets"""
def __init__(
self,
document_service,
bucket: str,
s3_client=None,
owner_user_id: str = None,
jwt_token: str = None,
):
import boto3
self.document_service = document_service
self.bucket = bucket
self.s3_client = s3_client or boto3.client("s3")
self.owner_user_id = owner_user_id
self.jwt_token = jwt_token
async def process_item(self, upload_task: UploadTask, item: str, file_task: FileTask) -> None:
"""Download an S3 object and process it using DocumentService"""
from models.tasks import TaskStatus
import tempfile
import os
import time
import asyncio
import datetime
from config.settings import INDEX_NAME, EMBED_MODEL, clients
from services.document_service import chunk_texts_for_embeddings
from utils.document_processing import process_document_sync
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
tmp = tempfile.NamedTemporaryFile(delete=False)
try:
# Download object to temporary file
self.s3_client.download_fileobj(self.bucket, item, tmp)
tmp.flush()
loop = asyncio.get_event_loop()
slim_doc = await loop.run_in_executor(
self.document_service.process_pool, process_document_sync, tmp.name
)
opensearch_client = self.document_service.session_manager.get_user_opensearch_client(
self.owner_user_id, self.jwt_token
)
exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"])
if exists:
result = {"status": "unchanged", "id": slim_doc["id"]}
else:
texts = [c["text"] for c in slim_doc["chunks"]]
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": slim_doc["id"],
"filename": slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"owner": self.owner_user_id,
"indexed_time": datetime.datetime.now().isoformat(),
}
chunk_id = f"{slim_doc['id']}_{i}"
await opensearch_client.index(index=INDEX_NAME, id=chunk_id, body=chunk_doc)
result = {"status": "indexed", "id": slim_doc["id"]}
result["path"] = f"s3://{self.bucket}/{item}"
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
upload_task.failed_files += 1
finally:
tmp.close()
os.remove(tmp.name)
file_task.updated_at = time.time()

51
uv.lock generated
View file

@ -131,6 +131,34 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/50/cd/30110dc0ffcf3b131156077b90e9f60ed75711223f306da4db08eff8403b/beautifulsoup4-4.13.4-py3-none-any.whl", hash = "sha256:9bbbb14bfde9d79f38b8cd5f8c7c85f4b8f2523190ebed90e950a8dea4cb1c4b", size = 187285, upload-time = "2025-04-15T17:05:12.221Z" },
]
[[package]]
name = "boto3"
version = "1.40.12"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "botocore" },
{ name = "jmespath" },
{ name = "s3transfer" },
]
sdist = { url = "https://files.pythonhosted.org/packages/41/19/2c4d140a7f99b5903b21b9ccd7253c71f147c346c3c632b2117444cf2d65/boto3-1.40.12.tar.gz", hash = "sha256:c6b32aee193fbd2eb84696d2b5b2410dcda9fb4a385e1926cff908377d222247", size = 111959, upload-time = "2025-08-18T19:30:23.827Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/16/6e/5a9dcf38ad87838fb99742c4a3ab1b7507ad3a02c8c27a9ccda7a0bb5709/boto3-1.40.12-py3-none-any.whl", hash = "sha256:3c3d6731390b5b11f5e489d5d9daa57f0c3e171efb63ac8f47203df9c71812b3", size = 140075, upload-time = "2025-08-18T19:30:22.494Z" },
]
[[package]]
name = "botocore"
version = "1.40.12"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "jmespath" },
{ name = "python-dateutil" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/7d/b2/7933590fc5bca1980801b71e09db1a95581afff177cbf3c8a031d922885c/botocore-1.40.12.tar.gz", hash = "sha256:c6560578e799b47b762b7e555bd9c5dd5c29c5d23bd778a8a72e98c979b3c727", size = 14349930, upload-time = "2025-08-18T19:30:13.794Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/1e/b6/65fd6e718c9538ba1462c9b71e9262bc723202ff203fe64ff66ff676d823/botocore-1.40.12-py3-none-any.whl", hash = "sha256:84e96004a8b426c5508f6b5600312d6271364269466a3a957dc377ad8effc438", size = 14018004, upload-time = "2025-08-18T19:30:09.054Z" },
]
[[package]]
name = "cachetools"
version = "5.5.2"
@ -753,6 +781,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/4a/4175a563579e884192ba6e81725fc0448b042024419be8d83aa8a80a3f44/jiter-0.10.0-cp314-cp314t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3aa96f2abba33dc77f79b4cf791840230375f9534e5fac927ccceb58c5e604a5", size = 354213, upload-time = "2025-05-18T19:04:41.894Z" },
]
[[package]]
name = "jmespath"
version = "1.0.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/00/2a/e867e8531cf3e36b41201936b7fa7ba7b5702dbef42922193f05c8976cd6/jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe", size = 25843, upload-time = "2022-06-17T18:00:12.224Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/31/b4/b9b800c45527aadd64d5b442f9b932b00648617eb5d63d2c7a6587b7cafc/jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980", size = 20256, upload-time = "2022-06-17T18:00:10.251Z" },
]
[[package]]
name = "jsonlines"
version = "3.1.0"
@ -1336,6 +1373,7 @@ source = { virtual = "." }
dependencies = [
{ name = "agentd" },
{ name = "aiofiles" },
{ name = "boto3" },
{ name = "cryptography" },
{ name = "docling" },
{ name = "google-api-python-client" },
@ -1355,6 +1393,7 @@ dependencies = [
requires-dist = [
{ name = "agentd", specifier = ">=0.2.2" },
{ name = "aiofiles", specifier = ">=24.1.0" },
{ name = "boto3", specifier = ">=1.35.0" },
{ name = "cryptography", specifier = ">=45.0.6" },
{ name = "docling", specifier = ">=2.41.0" },
{ name = "google-api-python-client", specifier = ">=2.143.0" },
@ -2001,6 +2040,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ce/c2/362f2cc36a7a57b47380061c23fc109c7222c1a544ffd24cda289ba19673/rtree-1.4.0-py3-none-win_amd64.whl", hash = "sha256:ba83efc7b7563905b1bfdfc14490c4bfb59e92e5e6156bdeb6ec5df5117252f4", size = 385221, upload-time = "2025-03-05T23:31:44.537Z" },
]
[[package]]
name = "s3transfer"
version = "0.13.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "botocore" },
]
sdist = { url = "https://files.pythonhosted.org/packages/6d/05/d52bf1e65044b4e5e27d4e63e8d1579dbdec54fce685908ae09bc3720030/s3transfer-0.13.1.tar.gz", hash = "sha256:c3fdba22ba1bd367922f27ec8032d6a1cf5f10c934fb5d68cf60fd5a23d936cf", size = 150589, upload-time = "2025-07-18T19:22:42.31Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/6d/4f/d073e09df851cfa251ef7840007d04db3293a0482ce607d2b993926089be/s3transfer-0.13.1-py3-none-any.whl", hash = "sha256:a981aa7429be23fe6dfc13e80e4020057cbab622b08c0315288758d67cabc724", size = 85308, upload-time = "2025-07-18T19:22:40.947Z" },
]
[[package]]
name = "safetensors"
version = "0.5.3"