feat: add track_id support for document processing progress monitoring
- Add get_docs_by_track_id() method to all storage backends (MongoDB, PostgreSQL, Redis, JSON)
- Implement automatic track_id generation with upload_/insert_ prefixes
- Add /track_status/{track_id} API endpoint for frontend progress queries
- Create database indexes for efficient track_id lookups
- Enable real-time document processing status tracking across all storage types
This commit is contained in:
parent
dafdf92715
commit
6014b9bf73
8 changed files with 463 additions and 39 deletions
|
|
@ -210,18 +210,21 @@ class InsertResponse(BaseModel):
|
|||
Attributes:
|
||||
status: Status of the operation (success, duplicated, partial_success, failure)
|
||||
message: Detailed message describing the operation result
|
||||
track_id: Tracking ID for monitoring processing status
|
||||
"""
|
||||
|
||||
status: Literal["success", "duplicated", "partial_success", "failure"] = Field(
|
||||
description="Status of the operation"
|
||||
)
|
||||
message: str = Field(description="Message describing the operation result")
|
||||
track_id: str = Field(description="Tracking ID for monitoring processing status")
|
||||
|
||||
class Config:
|
||||
json_schema_extra = {
|
||||
"example": {
|
||||
"status": "success",
|
||||
"message": "File 'document.pdf' uploaded successfully. Processing will continue in background.",
|
||||
"track_id": "upload_20250729_170612_abc123",
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -360,6 +363,9 @@ class DocStatusResponse(BaseModel):
|
|||
status: DocStatus = Field(description="Current processing status")
|
||||
created_at: str = Field(description="Creation timestamp (ISO format string)")
|
||||
updated_at: str = Field(description="Last update timestamp (ISO format string)")
|
||||
track_id: Optional[str] = Field(
|
||||
default=None, description="Tracking ID for monitoring progress"
|
||||
)
|
||||
chunks_count: Optional[int] = Field(
|
||||
default=None, description="Number of chunks the document was split into"
|
||||
)
|
||||
|
|
@ -380,6 +386,7 @@ class DocStatusResponse(BaseModel):
|
|||
"status": "PROCESSED",
|
||||
"created_at": "2025-03-31T12:34:56",
|
||||
"updated_at": "2025-03-31T12:35:30",
|
||||
"track_id": "upload_20250729_170612_abc123",
|
||||
"chunks_count": 12,
|
||||
"error": None,
|
||||
"metadata": {"author": "John Doe", "year": 2025},
|
||||
|
|
@ -412,6 +419,10 @@ class DocsStatusesResponse(BaseModel):
|
|||
"status": "PENDING",
|
||||
"created_at": "2025-03-31T10:00:00",
|
||||
"updated_at": "2025-03-31T10:00:00",
|
||||
"track_id": "upload_20250331_100000_abc123",
|
||||
"chunks_count": None,
|
||||
"error": None,
|
||||
"metadata": None,
|
||||
"file_path": "pending_doc.pdf",
|
||||
}
|
||||
],
|
||||
|
|
@ -423,7 +434,10 @@ class DocsStatusesResponse(BaseModel):
|
|||
"status": "PROCESSED",
|
||||
"created_at": "2025-03-31T09:00:00",
|
||||
"updated_at": "2025-03-31T09:05:00",
|
||||
"track_id": "insert_20250331_090000_def456",
|
||||
"chunks_count": 8,
|
||||
"error": None,
|
||||
"metadata": {"author": "John Doe"},
|
||||
"file_path": "processed_doc.pdf",
|
||||
}
|
||||
],
|
||||
|
|
@ -432,6 +446,48 @@ class DocsStatusesResponse(BaseModel):
|
|||
}
|
||||
|
||||
|
||||
class TrackStatusResponse(BaseModel):
|
||||
"""Response model for tracking document processing status by track_id
|
||||
|
||||
Attributes:
|
||||
track_id: The tracking ID
|
||||
documents: List of documents associated with this track_id
|
||||
total_count: Total number of documents for this track_id
|
||||
status_summary: Count of documents by status
|
||||
"""
|
||||
|
||||
track_id: str = Field(description="The tracking ID")
|
||||
documents: List[DocStatusResponse] = Field(
|
||||
description="List of documents associated with this track_id"
|
||||
)
|
||||
total_count: int = Field(description="Total number of documents for this track_id")
|
||||
status_summary: Dict[str, int] = Field(description="Count of documents by status")
|
||||
|
||||
class Config:
|
||||
json_schema_extra = {
|
||||
"example": {
|
||||
"track_id": "upload_20250729_170612_abc123",
|
||||
"documents": [
|
||||
{
|
||||
"id": "doc_123456",
|
||||
"content_summary": "Research paper on machine learning",
|
||||
"content_length": 15240,
|
||||
"status": "PROCESSED",
|
||||
"created_at": "2025-03-31T12:34:56",
|
||||
"updated_at": "2025-03-31T12:35:30",
|
||||
"track_id": "upload_20250729_170612_abc123",
|
||||
"chunks_count": 12,
|
||||
"error": None,
|
||||
"metadata": {"author": "John Doe", "year": 2025},
|
||||
"file_path": "research_paper.pdf",
|
||||
}
|
||||
],
|
||||
"total_count": 1,
|
||||
"status_summary": {"PROCESSED": 1},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class PipelineStatusResponse(BaseModel):
|
||||
"""Response model for pipeline status
|
||||
|
||||
|
|
@ -549,14 +605,17 @@ class DocumentManager:
|
|||
return any(filename.lower().endswith(ext) for ext in self.supported_extensions)
|
||||
|
||||
|
||||
async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
|
||||
async def pipeline_enqueue_file(
|
||||
rag: LightRAG, file_path: Path, track_id: str = None
|
||||
) -> tuple[bool, str]:
|
||||
"""Add a file to the queue for processing
|
||||
|
||||
Args:
|
||||
rag: LightRAG instance
|
||||
file_path: Path to the saved file
|
||||
track_id: Optional tracking ID, if not provided will be generated
|
||||
Returns:
|
||||
bool: True if the file was successfully enqueued, False otherwise
|
||||
tuple: (success: bool, track_id: str)
|
||||
"""
|
||||
|
||||
try:
|
||||
|
|
@ -730,9 +789,17 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
|
|||
f"File contains only whitespace characters. file_paths={file_path.name}"
|
||||
)
|
||||
|
||||
await rag.apipeline_enqueue_documents(content, file_paths=file_path.name)
|
||||
# Generate track_id if not provided
|
||||
if track_id is None:
|
||||
from lightrag.utils import generate_track_id
|
||||
|
||||
track_id = generate_track_id("upload")
|
||||
|
||||
returned_track_id = await rag.ainsert(
|
||||
content, file_paths=file_path.name, track_id=track_id
|
||||
)
|
||||
logger.info(f"Successfully fetched and enqueued file: {file_path.name}")
|
||||
return True
|
||||
return True, returned_track_id
|
||||
else:
|
||||
logger.error(f"No content could be extracted from file: {file_path.name}")
|
||||
|
||||
|
|
@ -745,18 +812,22 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
|
|||
file_path.unlink()
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting file {file_path}: {str(e)}")
|
||||
return False
|
||||
return False, ""
|
||||
|
||||
|
||||
async def pipeline_index_file(rag: LightRAG, file_path: Path):
|
||||
"""Index a file
|
||||
async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None):
|
||||
"""Index a file with track_id
|
||||
|
||||
Args:
|
||||
rag: LightRAG instance
|
||||
file_path: Path to the saved file
|
||||
track_id: Optional tracking ID
|
||||
"""
|
||||
try:
|
||||
if await pipeline_enqueue_file(rag, file_path):
|
||||
success, returned_track_id = await pipeline_enqueue_file(
|
||||
rag, file_path, track_id
|
||||
)
|
||||
if success:
|
||||
await rag.apipeline_process_enqueue_documents()
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -794,14 +865,18 @@ async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]):
|
|||
|
||||
|
||||
async def pipeline_index_texts(
|
||||
rag: LightRAG, texts: List[str], file_sources: List[str] = None
|
||||
rag: LightRAG,
|
||||
texts: List[str],
|
||||
file_sources: List[str] = None,
|
||||
track_id: str = None,
|
||||
):
|
||||
"""Index a list of texts
|
||||
"""Index a list of texts with track_id
|
||||
|
||||
Args:
|
||||
rag: LightRAG instance
|
||||
texts: The texts to index
|
||||
file_sources: Sources of the texts
|
||||
track_id: Optional tracking ID
|
||||
"""
|
||||
if not texts:
|
||||
return
|
||||
|
|
@ -811,7 +886,9 @@ async def pipeline_index_texts(
|
|||
file_sources.append("unknown_source")
|
||||
for _ in range(len(file_sources), len(texts))
|
||||
]
|
||||
await rag.apipeline_enqueue_documents(input=texts, file_paths=file_sources)
|
||||
await rag.apipeline_enqueue_documents(
|
||||
input=texts, file_paths=file_sources, track_id=track_id
|
||||
)
|
||||
await rag.apipeline_process_enqueue_documents()
|
||||
|
||||
|
||||
|
|
@ -1080,18 +1157,26 @@ def create_document_routes(
|
|||
return InsertResponse(
|
||||
status="duplicated",
|
||||
message=f"File '{safe_filename}' already exists in the input directory.",
|
||||
track_id="",
|
||||
)
|
||||
|
||||
with open(file_path, "wb") as buffer:
|
||||
shutil.copyfileobj(file.file, buffer)
|
||||
|
||||
# Add to background tasks
|
||||
background_tasks.add_task(pipeline_index_file, rag, file_path)
|
||||
|
||||
return InsertResponse(
|
||||
status="success",
|
||||
message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.",
|
||||
)
|
||||
# Add to background tasks and get track_id
|
||||
success, track_id = await pipeline_enqueue_file(rag, file_path)
|
||||
if success:
|
||||
background_tasks.add_task(rag.apipeline_process_enqueue_documents)
|
||||
return InsertResponse(
|
||||
status="success",
|
||||
message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.",
|
||||
track_id=track_id,
|
||||
)
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Failed to enqueue file '{safe_filename}' for processing.",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error /documents/upload: {file.filename}: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
|
|
@ -1120,15 +1205,20 @@ def create_document_routes(
|
|||
HTTPException: If an error occurs during text processing (500).
|
||||
"""
|
||||
try:
|
||||
background_tasks.add_task(
|
||||
pipeline_index_texts,
|
||||
rag,
|
||||
[request.text],
|
||||
file_sources=[request.file_source],
|
||||
from lightrag.utils import generate_track_id
|
||||
|
||||
# Generate track_id for text insertion
|
||||
track_id = generate_track_id("insert")
|
||||
|
||||
# Insert text and get track_id
|
||||
returned_track_id = await rag.ainsert(
|
||||
request.text, file_paths=request.file_source, track_id=track_id
|
||||
)
|
||||
|
||||
return InsertResponse(
|
||||
status="success",
|
||||
message="Text successfully received. Processing will continue in background.",
|
||||
track_id=returned_track_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error /documents/text: {str(e)}")
|
||||
|
|
@ -1160,18 +1250,23 @@ def create_document_routes(
|
|||
HTTPException: If an error occurs during text processing (500).
|
||||
"""
|
||||
try:
|
||||
background_tasks.add_task(
|
||||
pipeline_index_texts,
|
||||
rag,
|
||||
request.texts,
|
||||
file_sources=request.file_sources,
|
||||
from lightrag.utils import generate_track_id
|
||||
|
||||
# Generate track_id for texts insertion
|
||||
track_id = generate_track_id("insert")
|
||||
|
||||
# Insert texts and get track_id
|
||||
returned_track_id = await rag.ainsert(
|
||||
request.texts, file_paths=request.file_sources, track_id=track_id
|
||||
)
|
||||
|
||||
return InsertResponse(
|
||||
status="success",
|
||||
message="Text successfully received. Processing will continue in background.",
|
||||
message="Texts successfully received. Processing will continue in background.",
|
||||
track_id=returned_track_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error /documents/text: {str(e)}")
|
||||
logger.error(f"Error /documents/texts: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
|
@ -1473,6 +1568,7 @@ def create_document_routes(
|
|||
status=doc_status.status,
|
||||
created_at=format_datetime(doc_status.created_at),
|
||||
updated_at=format_datetime(doc_status.updated_at),
|
||||
track_id=doc_status.track_id,
|
||||
chunks_count=doc_status.chunks_count,
|
||||
error=doc_status.error,
|
||||
metadata=doc_status.metadata,
|
||||
|
|
@ -1699,4 +1795,77 @@ def create_document_routes(
|
|||
logger.error(traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=error_msg)
|
||||
|
||||
@router.get(
|
||||
"/track_status/{track_id}",
|
||||
response_model=TrackStatusResponse,
|
||||
dependencies=[Depends(combined_auth)],
|
||||
)
|
||||
async def get_track_status(track_id: str) -> TrackStatusResponse:
|
||||
"""
|
||||
Get the processing status of documents by tracking ID.
|
||||
|
||||
This endpoint retrieves all documents associated with a specific tracking ID,
|
||||
allowing users to monitor the processing progress of their uploaded files or inserted texts.
|
||||
|
||||
Args:
|
||||
track_id (str): The tracking ID returned from upload, text, or texts endpoints
|
||||
|
||||
Returns:
|
||||
TrackStatusResponse: A response object containing:
|
||||
- track_id: The tracking ID
|
||||
- documents: List of documents associated with this track_id
|
||||
- total_count: Total number of documents for this track_id
|
||||
|
||||
Raises:
|
||||
HTTPException: If track_id is invalid (400) or an error occurs (500).
|
||||
"""
|
||||
try:
|
||||
# Validate track_id
|
||||
if not track_id or not track_id.strip():
|
||||
raise HTTPException(status_code=400, detail="Track ID cannot be empty")
|
||||
|
||||
track_id = track_id.strip()
|
||||
|
||||
# Get documents by track_id
|
||||
docs_by_track_id = await rag.aget_docs_by_track_id(track_id)
|
||||
|
||||
# Convert to response format
|
||||
documents = []
|
||||
status_summary = {}
|
||||
|
||||
for doc_id, doc_status in docs_by_track_id.items():
|
||||
documents.append(
|
||||
DocStatusResponse(
|
||||
id=doc_id,
|
||||
content_summary=doc_status.content_summary,
|
||||
content_length=doc_status.content_length,
|
||||
status=doc_status.status,
|
||||
created_at=format_datetime(doc_status.created_at),
|
||||
updated_at=format_datetime(doc_status.updated_at),
|
||||
track_id=doc_status.track_id,
|
||||
chunks_count=doc_status.chunks_count,
|
||||
error=doc_status.error,
|
||||
metadata=doc_status.metadata,
|
||||
file_path=doc_status.file_path,
|
||||
)
|
||||
)
|
||||
|
||||
# Build status summary
|
||||
status_key = doc_status.status.value
|
||||
status_summary[status_key] = status_summary.get(status_key, 0) + 1
|
||||
|
||||
return TrackStatusResponse(
|
||||
track_id=track_id,
|
||||
documents=documents,
|
||||
total_count=len(documents),
|
||||
status_summary=status_summary,
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting track status for {track_id}: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
return router
|
||||
|
|
|
|||
|
|
@ -641,6 +641,8 @@ class DocProcessingStatus:
|
|||
"""ISO format timestamp when document was created"""
|
||||
updated_at: str
|
||||
"""ISO format timestamp when document was last updated"""
|
||||
track_id: str | None = None
|
||||
"""Tracking ID for monitoring progress"""
|
||||
chunks_count: int | None = None
|
||||
"""Number of chunks after splitting, used for processing"""
|
||||
chunks_list: list[str] | None = field(default_factory=list)
|
||||
|
|
@ -665,6 +667,12 @@ class DocStatusStorage(BaseKVStorage, ABC):
|
|||
) -> dict[str, DocProcessingStatus]:
|
||||
"""Get all documents with a specific status"""
|
||||
|
||||
@abstractmethod
|
||||
async def get_docs_by_track_id(
|
||||
self, track_id: str
|
||||
) -> dict[str, DocProcessingStatus]:
|
||||
"""Get all documents with a specific track_id"""
|
||||
|
||||
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
|
||||
"""Drop cache is not supported for Doc Status storage"""
|
||||
return False
|
||||
|
|
|
|||
|
|
@ -104,6 +104,26 @@ class JsonDocStatusStorage(DocStatusStorage):
|
|||
continue
|
||||
return result
|
||||
|
||||
async def get_docs_by_track_id(
|
||||
self, track_id: str
|
||||
) -> dict[str, DocProcessingStatus]:
|
||||
"""Get all documents with a specific track_id"""
|
||||
result = {}
|
||||
async with self._storage_lock:
|
||||
for k, v in self._data.items():
|
||||
if v.get("track_id") == track_id:
|
||||
try:
|
||||
# Make a copy of the data to avoid modifying the original
|
||||
data = v.copy()
|
||||
# If file_path is not in data, use document id as file path
|
||||
if "file_path" not in data:
|
||||
data["file_path"] = "no-file-path"
|
||||
result[k] = DocProcessingStatus(**data)
|
||||
except KeyError as e:
|
||||
logger.error(f"Missing required field for document {k}: {e}")
|
||||
continue
|
||||
return result
|
||||
|
||||
async def index_done_callback(self) -> None:
|
||||
async with self._storage_lock:
|
||||
if self.storage_updated.value:
|
||||
|
|
|
|||
|
|
@ -321,6 +321,10 @@ class MongoDocStatusStorage(DocStatusStorage):
|
|||
if self.db is None:
|
||||
self.db = await ClientManager.get_client()
|
||||
self._data = await get_or_create_collection(self.db, self._collection_name)
|
||||
|
||||
# Create track_id index for better query performance
|
||||
await self.create_track_id_index_if_not_exists()
|
||||
|
||||
logger.debug(f"Use MongoDB as DocStatus {self._collection_name}")
|
||||
|
||||
async def finalize(self):
|
||||
|
|
@ -386,6 +390,26 @@ class MongoDocStatusStorage(DocStatusStorage):
|
|||
continue
|
||||
return processed_result
|
||||
|
||||
async def get_docs_by_track_id(
|
||||
self, track_id: str
|
||||
) -> dict[str, DocProcessingStatus]:
|
||||
"""Get all documents with a specific track_id"""
|
||||
cursor = self._data.find({"track_id": track_id})
|
||||
result = await cursor.to_list()
|
||||
processed_result = {}
|
||||
for doc in result:
|
||||
try:
|
||||
# Make a copy of the data to avoid modifying the original
|
||||
data = doc.copy()
|
||||
# If file_path is not in data, use document id as file path
|
||||
if "file_path" not in data:
|
||||
data["file_path"] = "no-file-path"
|
||||
processed_result[doc["_id"]] = DocProcessingStatus(**data)
|
||||
except KeyError as e:
|
||||
logger.error(f"Missing required field for document {doc['_id']}: {e}")
|
||||
continue
|
||||
return processed_result
|
||||
|
||||
async def index_done_callback(self) -> None:
|
||||
# Mongo handles persistence automatically
|
||||
pass
|
||||
|
|
@ -414,6 +438,30 @@ class MongoDocStatusStorage(DocStatusStorage):
|
|||
async def delete(self, ids: list[str]) -> None:
|
||||
await self._data.delete_many({"_id": {"$in": ids}})
|
||||
|
||||
async def create_track_id_index_if_not_exists(self):
|
||||
"""Create track_id index for better query performance"""
|
||||
try:
|
||||
# Check if index already exists
|
||||
existing_indexes = await self._data.list_indexes().to_list(length=None)
|
||||
track_id_index_exists = any(
|
||||
"track_id" in idx.get("key", {}) for idx in existing_indexes
|
||||
)
|
||||
|
||||
if not track_id_index_exists:
|
||||
await self._data.create_index("track_id")
|
||||
logger.info(
|
||||
f"Created track_id index for collection {self._collection_name}"
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
f"track_id index already exists for collection {self._collection_name}"
|
||||
)
|
||||
|
||||
except PyMongoError as e:
|
||||
logger.error(
|
||||
f"Error creating track_id index for {self._collection_name}: {e}"
|
||||
)
|
||||
|
||||
|
||||
@final
|
||||
@dataclass
|
||||
|
|
|
|||
|
|
@ -567,6 +567,35 @@ class PostgreSQLDB:
|
|||
f"Failed to add llm_cache_list column to LIGHTRAG_DOC_CHUNKS: {e}"
|
||||
)
|
||||
|
||||
async def _migrate_doc_status_add_track_id(self):
|
||||
"""Add track_id column to LIGHTRAG_DOC_STATUS table if it doesn't exist"""
|
||||
try:
|
||||
# Check if track_id column exists
|
||||
check_column_sql = """
|
||||
SELECT column_name
|
||||
FROM information_schema.columns
|
||||
WHERE table_name = 'lightrag_doc_status'
|
||||
AND column_name = 'track_id'
|
||||
"""
|
||||
|
||||
column_info = await self.query(check_column_sql)
|
||||
if not column_info:
|
||||
logger.info("Adding track_id column to LIGHTRAG_DOC_STATUS table")
|
||||
add_column_sql = """
|
||||
ALTER TABLE LIGHTRAG_DOC_STATUS
|
||||
ADD COLUMN track_id VARCHAR(255) NULL
|
||||
"""
|
||||
await self.execute(add_column_sql)
|
||||
logger.info(
|
||||
"Successfully added track_id column to LIGHTRAG_DOC_STATUS table"
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"track_id column already exists in LIGHTRAG_DOC_STATUS table"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to add track_id column to LIGHTRAG_DOC_STATUS: {e}")
|
||||
|
||||
async def _migrate_field_lengths(self):
|
||||
"""Migrate database field lengths: entity_name, source_id, target_id, and file_path"""
|
||||
# Define the field changes needed
|
||||
|
|
@ -785,6 +814,14 @@ class PostgreSQLDB:
|
|||
except Exception as e:
|
||||
logger.error(f"PostgreSQL, Failed to migrate field lengths: {e}")
|
||||
|
||||
# Migrate doc status to add track_id field if needed
|
||||
try:
|
||||
await self._migrate_doc_status_add_track_id()
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"PostgreSQL, Failed to migrate doc status track_id field: {e}"
|
||||
)
|
||||
|
||||
async def query(
|
||||
self,
|
||||
sql: str,
|
||||
|
|
@ -1775,6 +1812,43 @@ class PGDocStatusStorage(DocStatusStorage):
|
|||
|
||||
return docs_by_status
|
||||
|
||||
async def get_docs_by_track_id(
|
||||
self, track_id: str
|
||||
) -> dict[str, DocProcessingStatus]:
|
||||
"""Get all documents with a specific track_id"""
|
||||
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and track_id=$2"
|
||||
params = {"workspace": self.db.workspace, "track_id": track_id}
|
||||
result = await self.db.query(sql, params, True)
|
||||
|
||||
docs_by_track_id = {}
|
||||
for element in result:
|
||||
# Parse chunks_list JSON string back to list
|
||||
chunks_list = element.get("chunks_list", [])
|
||||
if isinstance(chunks_list, str):
|
||||
try:
|
||||
chunks_list = json.loads(chunks_list)
|
||||
except json.JSONDecodeError:
|
||||
chunks_list = []
|
||||
|
||||
# Convert datetime objects to ISO format strings with timezone info
|
||||
created_at = self._format_datetime_with_timezone(element["created_at"])
|
||||
updated_at = self._format_datetime_with_timezone(element["updated_at"])
|
||||
|
||||
docs_by_track_id[element["id"]] = DocProcessingStatus(
|
||||
# content=element["content"],
|
||||
content_summary=element["content_summary"],
|
||||
content_length=element["content_length"],
|
||||
status=element["status"],
|
||||
created_at=created_at,
|
||||
updated_at=updated_at,
|
||||
chunks_count=element["chunks_count"],
|
||||
file_path=element["file_path"],
|
||||
chunks_list=chunks_list,
|
||||
track_id=element.get("track_id"),
|
||||
)
|
||||
|
||||
return docs_by_track_id
|
||||
|
||||
async def index_done_callback(self) -> None:
|
||||
# PG handles persistence automatically
|
||||
pass
|
||||
|
|
@ -1843,10 +1917,10 @@ class PGDocStatusStorage(DocStatusStorage):
|
|||
logger.warning(f"Unable to parse datetime string: {dt_str}")
|
||||
return None
|
||||
|
||||
# Modified SQL to include created_at, updated_at, and chunks_list in both INSERT and UPDATE operations
|
||||
# Modified SQL to include created_at, updated_at, chunks_list, and track_id in both INSERT and UPDATE operations
|
||||
# All fields are updated from the input data in both INSERT and UPDATE cases
|
||||
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status,file_path,chunks_list,created_at,updated_at)
|
||||
values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
|
||||
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status,file_path,chunks_list,track_id,created_at,updated_at)
|
||||
values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
|
||||
on conflict(id,workspace) do update set
|
||||
content_summary = EXCLUDED.content_summary,
|
||||
content_length = EXCLUDED.content_length,
|
||||
|
|
@ -1854,6 +1928,7 @@ class PGDocStatusStorage(DocStatusStorage):
|
|||
status = EXCLUDED.status,
|
||||
file_path = EXCLUDED.file_path,
|
||||
chunks_list = EXCLUDED.chunks_list,
|
||||
track_id = EXCLUDED.track_id,
|
||||
created_at = EXCLUDED.created_at,
|
||||
updated_at = EXCLUDED.updated_at"""
|
||||
for k, v in data.items():
|
||||
|
|
@ -1861,7 +1936,7 @@ class PGDocStatusStorage(DocStatusStorage):
|
|||
created_at = parse_datetime(v.get("created_at"))
|
||||
updated_at = parse_datetime(v.get("updated_at"))
|
||||
|
||||
# chunks_count and chunks_list are optional
|
||||
# chunks_count, chunks_list, and track_id are optional
|
||||
await self.db.execute(
|
||||
sql,
|
||||
{
|
||||
|
|
@ -1874,6 +1949,7 @@ class PGDocStatusStorage(DocStatusStorage):
|
|||
"status": v["status"],
|
||||
"file_path": v["file_path"],
|
||||
"chunks_list": json.dumps(v.get("chunks_list", [])),
|
||||
"track_id": v.get("track_id"), # Add track_id support
|
||||
"created_at": created_at, # Use the converted datetime object
|
||||
"updated_at": updated_at, # Use the converted datetime object
|
||||
},
|
||||
|
|
@ -3375,6 +3451,7 @@ TABLES = {
|
|||
status varchar(64) NULL,
|
||||
file_path TEXT NULL,
|
||||
chunks_list JSONB NULL DEFAULT '[]'::jsonb,
|
||||
track_id varchar(255) NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id)
|
||||
|
|
|
|||
|
|
@ -804,6 +804,55 @@ class RedisDocStatusStorage(DocStatusStorage):
|
|||
|
||||
return result
|
||||
|
||||
async def get_docs_by_track_id(
|
||||
self, track_id: str
|
||||
) -> dict[str, DocProcessingStatus]:
|
||||
"""Get all documents with a specific track_id"""
|
||||
result = {}
|
||||
async with self._get_redis_connection() as redis:
|
||||
try:
|
||||
# Use SCAN to iterate through all keys in the namespace
|
||||
cursor = 0
|
||||
while True:
|
||||
cursor, keys = await redis.scan(
|
||||
cursor, match=f"{self.namespace}:*", count=1000
|
||||
)
|
||||
if keys:
|
||||
# Get all values in batch
|
||||
pipe = redis.pipeline()
|
||||
for key in keys:
|
||||
pipe.get(key)
|
||||
values = await pipe.execute()
|
||||
|
||||
# Filter by track_id and create DocProcessingStatus objects
|
||||
for key, value in zip(keys, values):
|
||||
if value:
|
||||
try:
|
||||
doc_data = json.loads(value)
|
||||
if doc_data.get("track_id") == track_id:
|
||||
# Extract document ID from key
|
||||
doc_id = key.split(":", 1)[1]
|
||||
|
||||
# Make a copy of the data to avoid modifying the original
|
||||
data = doc_data.copy()
|
||||
# If file_path is not in data, use document id as file path
|
||||
if "file_path" not in data:
|
||||
data["file_path"] = "no-file-path"
|
||||
|
||||
result[doc_id] = DocProcessingStatus(**data)
|
||||
except (json.JSONDecodeError, KeyError) as e:
|
||||
logger.error(
|
||||
f"Error processing document {key}: {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
if cursor == 0:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting docs by track_id: {e}")
|
||||
|
||||
return result
|
||||
|
||||
async def index_done_callback(self) -> None:
|
||||
"""Redis handles persistence automatically"""
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -88,6 +88,7 @@ from .utils import (
|
|||
get_content_summary,
|
||||
clean_text,
|
||||
check_storage_env_vars,
|
||||
generate_track_id,
|
||||
logger,
|
||||
)
|
||||
from .types import KnowledgeGraph
|
||||
|
|
@ -659,7 +660,8 @@ class LightRAG:
|
|||
split_by_character_only: bool = False,
|
||||
ids: str | list[str] | None = None,
|
||||
file_paths: str | list[str] | None = None,
|
||||
) -> None:
|
||||
track_id: str | None = None,
|
||||
) -> str:
|
||||
"""Sync Insert documents with checkpoint support
|
||||
|
||||
Args:
|
||||
|
|
@ -670,11 +672,20 @@ class LightRAG:
|
|||
split_by_character is None, this parameter is ignored.
|
||||
ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated
|
||||
file_paths: single string of the file path or list of file paths, used for citation
|
||||
track_id: tracking ID for monitoring processing status, if not provided, will be generated
|
||||
|
||||
Returns:
|
||||
str: tracking ID for monitoring processing status
|
||||
"""
|
||||
loop = always_get_an_event_loop()
|
||||
loop.run_until_complete(
|
||||
return loop.run_until_complete(
|
||||
self.ainsert(
|
||||
input, split_by_character, split_by_character_only, ids, file_paths
|
||||
input,
|
||||
split_by_character,
|
||||
split_by_character_only,
|
||||
ids,
|
||||
file_paths,
|
||||
track_id,
|
||||
)
|
||||
)
|
||||
|
||||
|
|
@ -685,7 +696,8 @@ class LightRAG:
|
|||
split_by_character_only: bool = False,
|
||||
ids: str | list[str] | None = None,
|
||||
file_paths: str | list[str] | None = None,
|
||||
) -> None:
|
||||
track_id: str | None = None,
|
||||
) -> str:
|
||||
"""Async Insert documents with checkpoint support
|
||||
|
||||
Args:
|
||||
|
|
@ -696,12 +708,22 @@ class LightRAG:
|
|||
split_by_character is None, this parameter is ignored.
|
||||
ids: list of unique document IDs, if not provided, MD5 hash IDs will be generated
|
||||
file_paths: list of file paths corresponding to each document, used for citation
|
||||
track_id: tracking ID for monitoring processing status, if not provided, will be generated
|
||||
|
||||
Returns:
|
||||
str: tracking ID for monitoring processing status
|
||||
"""
|
||||
await self.apipeline_enqueue_documents(input, ids, file_paths)
|
||||
# Generate track_id if not provided
|
||||
if track_id is None:
|
||||
track_id = generate_track_id("insert")
|
||||
|
||||
await self.apipeline_enqueue_documents(input, ids, file_paths, track_id)
|
||||
await self.apipeline_process_enqueue_documents(
|
||||
split_by_character, split_by_character_only
|
||||
)
|
||||
|
||||
return track_id
|
||||
|
||||
# TODO: deprecated, use insert instead
|
||||
def insert_custom_chunks(
|
||||
self,
|
||||
|
|
@ -779,6 +801,7 @@ class LightRAG:
|
|||
input: str | list[str],
|
||||
ids: list[str] | None = None,
|
||||
file_paths: str | list[str] | None = None,
|
||||
track_id: str | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Pipeline for Processing Documents
|
||||
|
|
@ -874,6 +897,7 @@ class LightRAG:
|
|||
"file_path": content_data[
|
||||
"file_path"
|
||||
], # Store file path in document status
|
||||
"track_id": track_id, # Store track_id in document status
|
||||
}
|
||||
for id_, content_data in contents.items()
|
||||
}
|
||||
|
|
@ -2199,6 +2223,19 @@ class LightRAG:
|
|||
"""
|
||||
return await self.doc_status.get_status_counts()
|
||||
|
||||
async def aget_docs_by_track_id(
|
||||
self, track_id: str
|
||||
) -> dict[str, DocProcessingStatus]:
|
||||
"""Get documents by track_id
|
||||
|
||||
Args:
|
||||
track_id: The tracking ID to search for
|
||||
|
||||
Returns:
|
||||
Dict with document id as keys and document status as values
|
||||
"""
|
||||
return await self.doc_status.get_docs_by_track_id(track_id)
|
||||
|
||||
async def get_entity_info(
|
||||
self, entity_name: str, include_vector_data: bool = False
|
||||
) -> dict[str, str | None | dict[str, str]]:
|
||||
|
|
|
|||
|
|
@ -9,7 +9,9 @@ import logging
|
|||
import logging.handlers
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from functools import wraps
|
||||
from hashlib import md5
|
||||
from typing import Any, Protocol, Callable, TYPE_CHECKING, List
|
||||
|
|
@ -1979,3 +1981,17 @@ def build_file_path(already_file_paths, data_list, target):
|
|||
f"Length of file_path exceeds {target}, ignoring new file: {file_paths_ignore}"
|
||||
)
|
||||
return file_paths
|
||||
|
||||
|
||||
def generate_track_id(prefix: str = "upload") -> str:
|
||||
"""Generate a unique tracking ID with timestamp and UUID
|
||||
|
||||
Args:
|
||||
prefix: Prefix for the track ID (e.g., 'upload', 'insert')
|
||||
|
||||
Returns:
|
||||
str: Unique tracking ID in format: {prefix}_{timestamp}_{uuid}
|
||||
"""
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
unique_id = str(uuid.uuid4())[:8] # Use first 8 characters of UUID
|
||||
return f"{prefix}_{timestamp}_{unique_id}"
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue