diff --git a/sdks/python/openrag_sdk/documents.py b/sdks/python/openrag_sdk/documents.py index f4d0ac3a..867fbc9c 100644 --- a/sdks/python/openrag_sdk/documents.py +++ b/sdks/python/openrag_sdk/documents.py @@ -1,11 +1,10 @@ """OpenRAG SDK documents client.""" +import asyncio from pathlib import Path from typing import TYPE_CHECKING, BinaryIO -import httpx - -from .models import DeleteDocumentResponse, IngestResponse +from .models import DeleteDocumentResponse, IngestResponse, IngestTaskStatus if TYPE_CHECKING: from .client import OpenRAGClient @@ -23,7 +22,10 @@ class DocumentsClient: *, file: BinaryIO | None = None, filename: str | None = None, - ) -> IngestResponse: + wait: bool = True, + poll_interval: float = 1.0, + timeout: float = 300.0, + ) -> IngestResponse | IngestTaskStatus: """ Ingest a document into the knowledge base. @@ -31,12 +33,17 @@ class DocumentsClient: file_path: Path to the file to ingest. file: File-like object to ingest (alternative to file_path). filename: Filename to use when providing file object. + wait: If True, poll until ingestion completes. If False, return immediately. + poll_interval: Seconds between status checks when waiting. + timeout: Maximum seconds to wait for completion. Returns: - IngestResponse with document_id and chunk count. + IngestTaskStatus with final status if wait=True. + IngestResponse with task_id if wait=False. Raises: ValueError: If neither file_path nor file is provided. + TimeoutError: If ingestion doesn't complete within timeout. """ if file_path is not None: path = Path(file_path) @@ -60,7 +67,64 @@ class DocumentsClient: raise ValueError("Either file_path or file must be provided") data = response.json() - return IngestResponse(**data) + ingest_response = IngestResponse(**data) + + if not wait: + return ingest_response + + # Poll for completion + return await self.wait_for_task( + ingest_response.task_id, + poll_interval=poll_interval, + timeout=timeout, + ) + + async def get_task_status(self, task_id: str) -> IngestTaskStatus: + """ + Get the status of an ingestion task. + + Args: + task_id: The task ID returned from ingest(). + + Returns: + IngestTaskStatus with current task status. + """ + response = await self._client._request( + "GET", + f"/api/v1/tasks/{task_id}", + ) + data = response.json() + return IngestTaskStatus(**data) + + async def wait_for_task( + self, + task_id: str, + poll_interval: float = 1.0, + timeout: float = 300.0, + ) -> IngestTaskStatus: + """ + Wait for an ingestion task to complete. + + Args: + task_id: The task ID to wait for. + poll_interval: Seconds between status checks. + timeout: Maximum seconds to wait. + + Returns: + IngestTaskStatus with final status. + + Raises: + TimeoutError: If task doesn't complete within timeout. + """ + elapsed = 0.0 + while elapsed < timeout: + status = await self.get_task_status(task_id) + if status.status in ("completed", "failed"): + return status + await asyncio.sleep(poll_interval) + elapsed += poll_interval + + raise TimeoutError(f"Ingestion task {task_id} did not complete within {timeout}s") async def delete(self, filename: str) -> DeleteDocumentResponse: """ diff --git a/sdks/python/openrag_sdk/models.py b/sdks/python/openrag_sdk/models.py index 576439af..70e69abc 100644 --- a/sdks/python/openrag_sdk/models.py +++ b/sdks/python/openrag_sdk/models.py @@ -71,12 +71,23 @@ class SearchResponse(BaseModel): # Document models class IngestResponse(BaseModel): - """Response from document ingestion.""" + """Response from document ingestion (async task-based).""" - success: bool - document_id: str | None = None + task_id: str + status: str | None = None # Optional - we poll for actual status filename: str | None = None - chunks: int = 0 + + +class IngestTaskStatus(BaseModel): + """Status of an ingestion task.""" + + task_id: str + status: str # "pending", "running", "completed", "failed" + total_files: int = 0 + processed_files: int = 0 + successful_files: int = 0 + failed_files: int = 0 + files: dict = {} # Detailed per-file status class DeleteDocumentResponse(BaseModel): diff --git a/sdks/python/tests/test_integration.py b/sdks/python/tests/test_integration.py index a83af327..62981520 100644 --- a/sdks/python/tests/test_integration.py +++ b/sdks/python/tests/test_integration.py @@ -79,15 +79,28 @@ class TestDocuments: @pytest.mark.asyncio async def test_ingest_document(self, client, test_file: Path): """Test document ingestion.""" + # wait=True (default) polls until completion result = await client.documents.ingest(file_path=str(test_file)) - assert result.success is True - assert result.chunks > 0 + assert result.status == "completed" + assert result.successful_files >= 1 + + @pytest.mark.asyncio + async def test_ingest_document_no_wait(self, client, test_file: Path): + """Test document ingestion without waiting.""" + # wait=False returns immediately with task_id + result = await client.documents.ingest(file_path=str(test_file), wait=False) + + assert result.task_id is not None + + # Can poll manually + final_status = await client.documents.wait_for_task(result.task_id) + assert final_status.status == "completed" @pytest.mark.asyncio async def test_delete_document(self, client, test_file: Path): """Test document deletion.""" - # First ingest + # First ingest (wait for completion) await client.documents.ingest(file_path=str(test_file)) # Then delete diff --git a/sdks/typescript/src/documents.ts b/sdks/typescript/src/documents.ts index 2af9c193..e63c7503 100644 --- a/sdks/typescript/src/documents.ts +++ b/sdks/typescript/src/documents.ts @@ -3,7 +3,11 @@ */ import type { OpenRAGClient } from "./client"; -import type { DeleteDocumentResponse, IngestResponse } from "./types"; +import type { + DeleteDocumentResponse, + IngestResponse, + IngestTaskStatus, +} from "./types"; export interface IngestOptions { /** Path to file (Node.js only). */ @@ -12,6 +16,12 @@ export interface IngestOptions { file?: File | Blob; /** Filename when providing file/blob. */ filename?: string; + /** If true, poll until ingestion completes. Default: true. */ + wait?: boolean; + /** Seconds between status checks when waiting. Default: 1. */ + pollInterval?: number; + /** Maximum seconds to wait for completion. Default: 300. */ + timeout?: number; } export class DocumentsClient { @@ -21,10 +31,15 @@ export class DocumentsClient { * Ingest a document into the knowledge base. * * @param options - Ingest options (filePath or file+filename). - * @returns IngestResponse with document_id and chunk count. + * @returns IngestTaskStatus with final status if wait=true, IngestResponse with task_id if wait=false. */ - async ingest(options: IngestOptions): Promise { + async ingest( + options: IngestOptions + ): Promise { const formData = new FormData(); + const wait = options.wait ?? true; + const pollInterval = options.pollInterval ?? 1; + const timeout = options.timeout ?? 300; if (options.filePath) { // Node.js: read file from path @@ -57,12 +72,74 @@ export class DocumentsClient { ); const data = await response.json(); - return { - success: data.success ?? false, - document_id: data.document_id ?? null, + const ingestResponse: IngestResponse = { + task_id: data.task_id, + status: data.status ?? null, filename: data.filename ?? null, - chunks: data.chunks ?? 0, }; + + if (!wait) { + return ingestResponse; + } + + // Poll for completion + return await this.waitForTask(ingestResponse.task_id, pollInterval, timeout); + } + + /** + * Get the status of an ingestion task. + * + * @param taskId - The task ID returned from ingest(). + * @returns IngestTaskStatus with current task status. + */ + async getTaskStatus(taskId: string): Promise { + const response = await this.client._request( + "GET", + `/api/v1/tasks/${taskId}` + ); + const data = await response.json(); + return { + task_id: data.task_id, + status: data.status, + total_files: data.total_files ?? 0, + processed_files: data.processed_files ?? 0, + successful_files: data.successful_files ?? 0, + failed_files: data.failed_files ?? 0, + files: data.files ?? {}, + }; + } + + /** + * Wait for an ingestion task to complete. + * + * @param taskId - The task ID to wait for. + * @param pollInterval - Seconds between status checks. + * @param timeout - Maximum seconds to wait. + * @returns IngestTaskStatus with final status. + */ + async waitForTask( + taskId: string, + pollInterval: number = 1, + timeout: number = 300 + ): Promise { + const startTime = Date.now(); + const timeoutMs = timeout * 1000; + + while (Date.now() - startTime < timeoutMs) { + const status = await this.getTaskStatus(taskId); + if (status.status === "completed" || status.status === "failed") { + return status; + } + await this.sleep(pollInterval * 1000); + } + + throw new Error( + `Ingestion task ${taskId} did not complete within ${timeout}s` + ); + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); } /** diff --git a/sdks/typescript/src/types.ts b/sdks/typescript/src/types.ts index d9d8923e..9ee40a72 100644 --- a/sdks/typescript/src/types.ts +++ b/sdks/typescript/src/types.ts @@ -56,10 +56,19 @@ export interface SearchFilters { // Document types export interface IngestResponse { - success: boolean; - document_id?: string | null; + task_id: string; + status?: string | null; // Optional - we poll for actual status filename?: string | null; - chunks: number; +} + +export interface IngestTaskStatus { + task_id: string; + status: string; // "pending", "running", "completed", "failed" + total_files: number; + processed_files: number; + successful_files: number; + failed_files: number; + files: Record; } export interface DeleteDocumentResponse { diff --git a/sdks/typescript/tests/integration.test.ts b/sdks/typescript/tests/integration.test.ts index bb481470..2eaedc64 100644 --- a/sdks/typescript/tests/integration.test.ts +++ b/sdks/typescript/tests/integration.test.ts @@ -2,7 +2,7 @@ * Integration tests for OpenRAG TypeScript SDK. * * These tests run against a real OpenRAG instance. - * Requires: OPENRAG_URL environment variable (defaults to http://localhost:3000) + * Requires: OPENRAG_URL environment variable (defaults to http://localhost:8000) * * Run with: npm test */ @@ -15,7 +15,7 @@ import * as os from "os"; // Dynamic import to handle the SDK not being built yet let OpenRAGClient: typeof import("../src").OpenRAGClient; -const BASE_URL = process.env.OPENRAG_URL || "http://localhost:3000"; +const BASE_URL = process.env.OPENRAG_URL || "http://localhost:8000"; const SKIP_TESTS = process.env.SKIP_SDK_INTEGRATION_TESTS === "true"; // Create API key for tests @@ -78,15 +78,32 @@ describe.skipIf(SKIP_TESTS)("OpenRAG TypeScript SDK Integration", () => { }); describe("Documents", () => { - it("should ingest a document", async () => { + it("should ingest a document (wait for completion)", async () => { + // wait=true (default) polls until completion const result = await client.documents.ingest({ filePath: testFilePath }); - expect(result.success).toBe(true); - expect(result.chunks).toBeGreaterThan(0); + expect(result.status).toBe("completed"); + expect((result as any).successful_files).toBeGreaterThanOrEqual(1); + }); + + it("should ingest a document without waiting", async () => { + // wait=false returns immediately with task_id + const result = await client.documents.ingest({ + filePath: testFilePath, + wait: false, + }); + + expect((result as any).task_id).toBeDefined(); + + // Can poll manually + const finalStatus = await client.documents.waitForTask( + (result as any).task_id + ); + expect(finalStatus.status).toBe("completed"); }); it("should delete a document", async () => { - // First ingest + // First ingest (wait for completion) await client.documents.ingest({ filePath: testFilePath }); // Then delete diff --git a/src/api/v1/documents.py b/src/api/v1/documents.py index 22785ffd..7d03c8e2 100644 --- a/src/api/v1/documents.py +++ b/src/api/v1/documents.py @@ -6,12 +6,20 @@ Uses API key authentication. """ from starlette.requests import Request from starlette.responses import JSONResponse + +from api.router import upload_ingest_router from utils.logging_config import get_logger logger = get_logger(__name__) -async def ingest_endpoint(request: Request, document_service, task_service, session_manager): +async def ingest_endpoint( + request: Request, + document_service, + task_service, + session_manager, + langflow_file_service, +): """ Ingest a document into the knowledge base. @@ -19,68 +27,57 @@ async def ingest_endpoint(request: Request, document_service, task_service, sess Request: multipart/form-data with "file" field - Response: + Response (async via Langflow): + { + "task_id": "...", + "status": "processing", + "filename": "doc.pdf" + } + + Response (sync when Langflow disabled): { "success": true, "document_id": "...", "filename": "doc.pdf", "chunks": 10 } + """ + # Delegate to the existing upload_ingest_router which handles both + # Langflow and traditional paths + return await upload_ingest_router( + request, + document_service=document_service, + langflow_file_service=langflow_file_service, + session_manager=session_manager, + task_service=task_service, + ) - For bulk uploads, returns a task ID: + +async def task_status_endpoint(request: Request, task_service, session_manager): + """ + Get the status of an ingestion task. + + GET /api/v1/tasks/{task_id} + + Response: { "task_id": "...", - "status": "processing" + "status": "completed", + "total_files": 1, + "processed_files": 1, + "successful_files": 1, + "failed_files": 0, + "files": {...} } """ - try: - content_type = request.headers.get("content-type", "") + task_id = request.path_params.get("task_id") + user = request.state.user - if "multipart/form-data" in content_type: - # Single file upload - form = await request.form() - upload_file = form.get("file") + task_status = task_service.get_task_status(user.user_id, task_id) + if not task_status: + return JSONResponse({"error": "Task not found"}, status_code=404) - if not upload_file: - return JSONResponse( - {"error": "File is required"}, - status_code=400, - ) - - user = request.state.user - - result = await document_service.process_upload_file( - upload_file, - owner_user_id=user.user_id, - jwt_token=None, # API key auth, no JWT - owner_name=user.name, - owner_email=user.email, - ) - - if result.get("error"): - return JSONResponse(result, status_code=500) - - return JSONResponse({ - "success": True, - "document_id": result.get("id"), # process_upload_file returns "id" - "filename": upload_file.filename, - "chunks": result.get("chunks", 0), - }, status_code=201) - - else: - return JSONResponse( - {"error": "Content-Type must be multipart/form-data"}, - status_code=400, - ) - - except Exception as e: - error_msg = str(e) - logger.error("Document ingestion failed", error=error_msg) - - if "AuthenticationException" in error_msg or "access denied" in error_msg.lower(): - return JSONResponse({"error": error_msg}, status_code=403) - else: - return JSONResponse({"error": error_msg}, status_code=500) + return JSONResponse(task_status) async def delete_document_endpoint(request: Request, document_service, session_manager): diff --git a/src/main.py b/src/main.py index 98a46ddc..a18e7f58 100644 --- a/src/main.py +++ b/src/main.py @@ -1377,10 +1377,22 @@ async def create_app(): document_service=services["document_service"], task_service=services["task_service"], session_manager=services["session_manager"], + langflow_file_service=services["langflow_file_service"], ) ), methods=["POST"], ), + Route( + "/api/v1/tasks/{task_id}", + require_api_key(services["api_key_service"])( + partial( + v1_documents.task_status_endpoint, + task_service=services["task_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), Route( "/api/v1/documents", require_api_key(services["api_key_service"])(