ingest poll and other sdk endpoint fixes

This commit is contained in:
phact 2025-12-16 22:40:39 -05:00
parent 74cba85ae6
commit 9bc6f8b6eb
8 changed files with 279 additions and 79 deletions

View file

@ -1,11 +1,10 @@
"""OpenRAG SDK documents client."""
import asyncio
from pathlib import Path
from typing import TYPE_CHECKING, BinaryIO
import httpx
from .models import DeleteDocumentResponse, IngestResponse
from .models import DeleteDocumentResponse, IngestResponse, IngestTaskStatus
if TYPE_CHECKING:
from .client import OpenRAGClient
@ -23,7 +22,10 @@ class DocumentsClient:
*,
file: BinaryIO | None = None,
filename: str | None = None,
) -> IngestResponse:
wait: bool = True,
poll_interval: float = 1.0,
timeout: float = 300.0,
) -> IngestResponse | IngestTaskStatus:
"""
Ingest a document into the knowledge base.
@ -31,12 +33,17 @@ class DocumentsClient:
file_path: Path to the file to ingest.
file: File-like object to ingest (alternative to file_path).
filename: Filename to use when providing file object.
wait: If True, poll until ingestion completes. If False, return immediately.
poll_interval: Seconds between status checks when waiting.
timeout: Maximum seconds to wait for completion.
Returns:
IngestResponse with document_id and chunk count.
IngestTaskStatus with final status if wait=True.
IngestResponse with task_id if wait=False.
Raises:
ValueError: If neither file_path nor file is provided.
TimeoutError: If ingestion doesn't complete within timeout.
"""
if file_path is not None:
path = Path(file_path)
@ -60,7 +67,64 @@ class DocumentsClient:
raise ValueError("Either file_path or file must be provided")
data = response.json()
return IngestResponse(**data)
ingest_response = IngestResponse(**data)
if not wait:
return ingest_response
# Poll for completion
return await self.wait_for_task(
ingest_response.task_id,
poll_interval=poll_interval,
timeout=timeout,
)
async def get_task_status(self, task_id: str) -> IngestTaskStatus:
"""
Get the status of an ingestion task.
Args:
task_id: The task ID returned from ingest().
Returns:
IngestTaskStatus with current task status.
"""
response = await self._client._request(
"GET",
f"/api/v1/tasks/{task_id}",
)
data = response.json()
return IngestTaskStatus(**data)
async def wait_for_task(
self,
task_id: str,
poll_interval: float = 1.0,
timeout: float = 300.0,
) -> IngestTaskStatus:
"""
Wait for an ingestion task to complete.
Args:
task_id: The task ID to wait for.
poll_interval: Seconds between status checks.
timeout: Maximum seconds to wait.
Returns:
IngestTaskStatus with final status.
Raises:
TimeoutError: If task doesn't complete within timeout.
"""
elapsed = 0.0
while elapsed < timeout:
status = await self.get_task_status(task_id)
if status.status in ("completed", "failed"):
return status
await asyncio.sleep(poll_interval)
elapsed += poll_interval
raise TimeoutError(f"Ingestion task {task_id} did not complete within {timeout}s")
async def delete(self, filename: str) -> DeleteDocumentResponse:
"""

View file

@ -71,12 +71,23 @@ class SearchResponse(BaseModel):
# Document models
class IngestResponse(BaseModel):
"""Response from document ingestion."""
"""Response from document ingestion (async task-based)."""
success: bool
document_id: str | None = None
task_id: str
status: str | None = None # Optional - we poll for actual status
filename: str | None = None
chunks: int = 0
class IngestTaskStatus(BaseModel):
"""Status of an ingestion task."""
task_id: str
status: str # "pending", "running", "completed", "failed"
total_files: int = 0
processed_files: int = 0
successful_files: int = 0
failed_files: int = 0
files: dict = {} # Detailed per-file status
class DeleteDocumentResponse(BaseModel):

View file

@ -79,15 +79,28 @@ class TestDocuments:
@pytest.mark.asyncio
async def test_ingest_document(self, client, test_file: Path):
"""Test document ingestion."""
# wait=True (default) polls until completion
result = await client.documents.ingest(file_path=str(test_file))
assert result.success is True
assert result.chunks > 0
assert result.status == "completed"
assert result.successful_files >= 1
@pytest.mark.asyncio
async def test_ingest_document_no_wait(self, client, test_file: Path):
"""Test document ingestion without waiting."""
# wait=False returns immediately with task_id
result = await client.documents.ingest(file_path=str(test_file), wait=False)
assert result.task_id is not None
# Can poll manually
final_status = await client.documents.wait_for_task(result.task_id)
assert final_status.status == "completed"
@pytest.mark.asyncio
async def test_delete_document(self, client, test_file: Path):
"""Test document deletion."""
# First ingest
# First ingest (wait for completion)
await client.documents.ingest(file_path=str(test_file))
# Then delete

View file

@ -3,7 +3,11 @@
*/
import type { OpenRAGClient } from "./client";
import type { DeleteDocumentResponse, IngestResponse } from "./types";
import type {
DeleteDocumentResponse,
IngestResponse,
IngestTaskStatus,
} from "./types";
export interface IngestOptions {
/** Path to file (Node.js only). */
@ -12,6 +16,12 @@ export interface IngestOptions {
file?: File | Blob;
/** Filename when providing file/blob. */
filename?: string;
/** If true, poll until ingestion completes. Default: true. */
wait?: boolean;
/** Seconds between status checks when waiting. Default: 1. */
pollInterval?: number;
/** Maximum seconds to wait for completion. Default: 300. */
timeout?: number;
}
export class DocumentsClient {
@ -21,10 +31,15 @@ export class DocumentsClient {
* Ingest a document into the knowledge base.
*
* @param options - Ingest options (filePath or file+filename).
* @returns IngestResponse with document_id and chunk count.
* @returns IngestTaskStatus with final status if wait=true, IngestResponse with task_id if wait=false.
*/
async ingest(options: IngestOptions): Promise<IngestResponse> {
async ingest(
options: IngestOptions
): Promise<IngestResponse | IngestTaskStatus> {
const formData = new FormData();
const wait = options.wait ?? true;
const pollInterval = options.pollInterval ?? 1;
const timeout = options.timeout ?? 300;
if (options.filePath) {
// Node.js: read file from path
@ -57,12 +72,74 @@ export class DocumentsClient {
);
const data = await response.json();
return {
success: data.success ?? false,
document_id: data.document_id ?? null,
const ingestResponse: IngestResponse = {
task_id: data.task_id,
status: data.status ?? null,
filename: data.filename ?? null,
chunks: data.chunks ?? 0,
};
if (!wait) {
return ingestResponse;
}
// Poll for completion
return await this.waitForTask(ingestResponse.task_id, pollInterval, timeout);
}
/**
* Get the status of an ingestion task.
*
* @param taskId - The task ID returned from ingest().
* @returns IngestTaskStatus with current task status.
*/
async getTaskStatus(taskId: string): Promise<IngestTaskStatus> {
const response = await this.client._request(
"GET",
`/api/v1/tasks/${taskId}`
);
const data = await response.json();
return {
task_id: data.task_id,
status: data.status,
total_files: data.total_files ?? 0,
processed_files: data.processed_files ?? 0,
successful_files: data.successful_files ?? 0,
failed_files: data.failed_files ?? 0,
files: data.files ?? {},
};
}
/**
* Wait for an ingestion task to complete.
*
* @param taskId - The task ID to wait for.
* @param pollInterval - Seconds between status checks.
* @param timeout - Maximum seconds to wait.
* @returns IngestTaskStatus with final status.
*/
async waitForTask(
taskId: string,
pollInterval: number = 1,
timeout: number = 300
): Promise<IngestTaskStatus> {
const startTime = Date.now();
const timeoutMs = timeout * 1000;
while (Date.now() - startTime < timeoutMs) {
const status = await this.getTaskStatus(taskId);
if (status.status === "completed" || status.status === "failed") {
return status;
}
await this.sleep(pollInterval * 1000);
}
throw new Error(
`Ingestion task ${taskId} did not complete within ${timeout}s`
);
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**

View file

@ -56,10 +56,19 @@ export interface SearchFilters {
// Document types
export interface IngestResponse {
success: boolean;
document_id?: string | null;
task_id: string;
status?: string | null; // Optional - we poll for actual status
filename?: string | null;
chunks: number;
}
export interface IngestTaskStatus {
task_id: string;
status: string; // "pending", "running", "completed", "failed"
total_files: number;
processed_files: number;
successful_files: number;
failed_files: number;
files: Record<string, unknown>;
}
export interface DeleteDocumentResponse {

View file

@ -2,7 +2,7 @@
* Integration tests for OpenRAG TypeScript SDK.
*
* These tests run against a real OpenRAG instance.
* Requires: OPENRAG_URL environment variable (defaults to http://localhost:3000)
* Requires: OPENRAG_URL environment variable (defaults to http://localhost:8000)
*
* Run with: npm test
*/
@ -15,7 +15,7 @@ import * as os from "os";
// Dynamic import to handle the SDK not being built yet
let OpenRAGClient: typeof import("../src").OpenRAGClient;
const BASE_URL = process.env.OPENRAG_URL || "http://localhost:3000";
const BASE_URL = process.env.OPENRAG_URL || "http://localhost:8000";
const SKIP_TESTS = process.env.SKIP_SDK_INTEGRATION_TESTS === "true";
// Create API key for tests
@ -78,15 +78,32 @@ describe.skipIf(SKIP_TESTS)("OpenRAG TypeScript SDK Integration", () => {
});
describe("Documents", () => {
it("should ingest a document", async () => {
it("should ingest a document (wait for completion)", async () => {
// wait=true (default) polls until completion
const result = await client.documents.ingest({ filePath: testFilePath });
expect(result.success).toBe(true);
expect(result.chunks).toBeGreaterThan(0);
expect(result.status).toBe("completed");
expect((result as any).successful_files).toBeGreaterThanOrEqual(1);
});
it("should ingest a document without waiting", async () => {
// wait=false returns immediately with task_id
const result = await client.documents.ingest({
filePath: testFilePath,
wait: false,
});
expect((result as any).task_id).toBeDefined();
// Can poll manually
const finalStatus = await client.documents.waitForTask(
(result as any).task_id
);
expect(finalStatus.status).toBe("completed");
});
it("should delete a document", async () => {
// First ingest
// First ingest (wait for completion)
await client.documents.ingest({ filePath: testFilePath });
// Then delete

View file

@ -6,12 +6,20 @@ Uses API key authentication.
"""
from starlette.requests import Request
from starlette.responses import JSONResponse
from api.router import upload_ingest_router
from utils.logging_config import get_logger
logger = get_logger(__name__)
async def ingest_endpoint(request: Request, document_service, task_service, session_manager):
async def ingest_endpoint(
request: Request,
document_service,
task_service,
session_manager,
langflow_file_service,
):
"""
Ingest a document into the knowledge base.
@ -19,68 +27,57 @@ async def ingest_endpoint(request: Request, document_service, task_service, sess
Request: multipart/form-data with "file" field
Response:
Response (async via Langflow):
{
"task_id": "...",
"status": "processing",
"filename": "doc.pdf"
}
Response (sync when Langflow disabled):
{
"success": true,
"document_id": "...",
"filename": "doc.pdf",
"chunks": 10
}
"""
# Delegate to the existing upload_ingest_router which handles both
# Langflow and traditional paths
return await upload_ingest_router(
request,
document_service=document_service,
langflow_file_service=langflow_file_service,
session_manager=session_manager,
task_service=task_service,
)
For bulk uploads, returns a task ID:
async def task_status_endpoint(request: Request, task_service, session_manager):
"""
Get the status of an ingestion task.
GET /api/v1/tasks/{task_id}
Response:
{
"task_id": "...",
"status": "processing"
"status": "completed",
"total_files": 1,
"processed_files": 1,
"successful_files": 1,
"failed_files": 0,
"files": {...}
}
"""
try:
content_type = request.headers.get("content-type", "")
task_id = request.path_params.get("task_id")
user = request.state.user
if "multipart/form-data" in content_type:
# Single file upload
form = await request.form()
upload_file = form.get("file")
task_status = task_service.get_task_status(user.user_id, task_id)
if not task_status:
return JSONResponse({"error": "Task not found"}, status_code=404)
if not upload_file:
return JSONResponse(
{"error": "File is required"},
status_code=400,
)
user = request.state.user
result = await document_service.process_upload_file(
upload_file,
owner_user_id=user.user_id,
jwt_token=None, # API key auth, no JWT
owner_name=user.name,
owner_email=user.email,
)
if result.get("error"):
return JSONResponse(result, status_code=500)
return JSONResponse({
"success": True,
"document_id": result.get("id"), # process_upload_file returns "id"
"filename": upload_file.filename,
"chunks": result.get("chunks", 0),
}, status_code=201)
else:
return JSONResponse(
{"error": "Content-Type must be multipart/form-data"},
status_code=400,
)
except Exception as e:
error_msg = str(e)
logger.error("Document ingestion failed", error=error_msg)
if "AuthenticationException" in error_msg or "access denied" in error_msg.lower():
return JSONResponse({"error": error_msg}, status_code=403)
else:
return JSONResponse({"error": error_msg}, status_code=500)
return JSONResponse(task_status)
async def delete_document_endpoint(request: Request, document_service, session_manager):

View file

@ -1377,10 +1377,22 @@ async def create_app():
document_service=services["document_service"],
task_service=services["task_service"],
session_manager=services["session_manager"],
langflow_file_service=services["langflow_file_service"],
)
),
methods=["POST"],
),
Route(
"/api/v1/tasks/{task_id}",
require_api_key(services["api_key_service"])(
partial(
v1_documents.task_status_endpoint,
task_service=services["task_service"],
session_manager=services["session_manager"],
)
),
methods=["GET"],
),
Route(
"/api/v1/documents",
require_api_key(services["api_key_service"])(