openrag/sdks/python/openrag_sdk/documents.py

146 lines
4.4 KiB
Python

"""OpenRAG SDK documents client."""
import asyncio
from pathlib import Path
from typing import TYPE_CHECKING, BinaryIO
from .models import DeleteDocumentResponse, IngestResponse, IngestTaskStatus
if TYPE_CHECKING:
from .client import OpenRAGClient
class DocumentsClient:
"""Client for document operations."""
def __init__(self, client: "OpenRAGClient"):
self._client = client
async def ingest(
self,
file_path: str | Path | None = None,
*,
file: BinaryIO | None = None,
filename: str | None = None,
wait: bool = True,
poll_interval: float = 1.0,
timeout: float = 300.0,
) -> IngestResponse | IngestTaskStatus:
"""
Ingest a document into the knowledge base.
Args:
file_path: Path to the file to ingest.
file: File-like object to ingest (alternative to file_path).
filename: Filename to use when providing file object.
wait: If True, poll until ingestion completes. If False, return immediately.
poll_interval: Seconds between status checks when waiting.
timeout: Maximum seconds to wait for completion.
Returns:
IngestTaskStatus with final status if wait=True.
IngestResponse with task_id if wait=False.
Raises:
ValueError: If neither file_path nor file is provided.
TimeoutError: If ingestion doesn't complete within timeout.
"""
if file_path is not None:
path = Path(file_path)
with open(path, "rb") as f:
files = {"file": (path.name, f)}
response = await self._client._request(
"POST",
"/api/v1/documents/ingest",
files=files,
)
elif file is not None:
if filename is None:
raise ValueError("filename is required when providing file object")
files = {"file": (filename, file)}
response = await self._client._request(
"POST",
"/api/v1/documents/ingest",
files=files,
)
else:
raise ValueError("Either file_path or file must be provided")
data = response.json()
ingest_response = IngestResponse(**data)
if not wait:
return ingest_response
# Poll for completion
return await self.wait_for_task(
ingest_response.task_id,
poll_interval=poll_interval,
timeout=timeout,
)
async def get_task_status(self, task_id: str) -> IngestTaskStatus:
"""
Get the status of an ingestion task.
Args:
task_id: The task ID returned from ingest().
Returns:
IngestTaskStatus with current task status.
"""
response = await self._client._request(
"GET",
f"/api/v1/tasks/{task_id}",
)
data = response.json()
return IngestTaskStatus(**data)
async def wait_for_task(
self,
task_id: str,
poll_interval: float = 1.0,
timeout: float = 300.0,
) -> IngestTaskStatus:
"""
Wait for an ingestion task to complete.
Args:
task_id: The task ID to wait for.
poll_interval: Seconds between status checks.
timeout: Maximum seconds to wait.
Returns:
IngestTaskStatus with final status.
Raises:
TimeoutError: If task doesn't complete within timeout.
"""
elapsed = 0.0
while elapsed < timeout:
status = await self.get_task_status(task_id)
if status.status in ("completed", "failed"):
return status
await asyncio.sleep(poll_interval)
elapsed += poll_interval
raise TimeoutError(f"Ingestion task {task_id} did not complete within {timeout}s")
async def delete(self, filename: str) -> DeleteDocumentResponse:
"""
Delete a document from the knowledge base.
Args:
filename: Name of the file to delete.
Returns:
DeleteDocumentResponse with deleted chunk count.
"""
response = await self._client._request(
"DELETE",
"/api/v1/documents",
json={"filename": filename},
)
data = response.json()
return DeleteDocumentResponse(**data)