diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 68642909..822b797b 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -210,18 +210,21 @@ class InsertResponse(BaseModel): Attributes: status: Status of the operation (success, duplicated, partial_success, failure) message: Detailed message describing the operation result + track_id: Tracking ID for monitoring processing status """ status: Literal["success", "duplicated", "partial_success", "failure"] = Field( description="Status of the operation" ) message: str = Field(description="Message describing the operation result") + track_id: str = Field(description="Tracking ID for monitoring processing status") class Config: json_schema_extra = { "example": { "status": "success", "message": "File 'document.pdf' uploaded successfully. Processing will continue in background.", + "track_id": "upload_20250729_170612_abc123", } } @@ -360,6 +363,9 @@ class DocStatusResponse(BaseModel): status: DocStatus = Field(description="Current processing status") created_at: str = Field(description="Creation timestamp (ISO format string)") updated_at: str = Field(description="Last update timestamp (ISO format string)") + track_id: Optional[str] = Field( + default=None, description="Tracking ID for monitoring progress" + ) chunks_count: Optional[int] = Field( default=None, description="Number of chunks the document was split into" ) @@ -380,6 +386,7 @@ class DocStatusResponse(BaseModel): "status": "PROCESSED", "created_at": "2025-03-31T12:34:56", "updated_at": "2025-03-31T12:35:30", + "track_id": "upload_20250729_170612_abc123", "chunks_count": 12, "error": None, "metadata": {"author": "John Doe", "year": 2025}, @@ -412,6 +419,10 @@ class DocsStatusesResponse(BaseModel): "status": "PENDING", "created_at": "2025-03-31T10:00:00", "updated_at": "2025-03-31T10:00:00", + "track_id": "upload_20250331_100000_abc123", + "chunks_count": None, + "error": None, + "metadata": None, "file_path": "pending_doc.pdf", } ], @@ -423,7 +434,10 @@ class DocsStatusesResponse(BaseModel): "status": "PROCESSED", "created_at": "2025-03-31T09:00:00", "updated_at": "2025-03-31T09:05:00", + "track_id": "insert_20250331_090000_def456", "chunks_count": 8, + "error": None, + "metadata": {"author": "John Doe"}, "file_path": "processed_doc.pdf", } ], @@ -432,6 +446,48 @@ class DocsStatusesResponse(BaseModel): } +class TrackStatusResponse(BaseModel): + """Response model for tracking document processing status by track_id + + Attributes: + track_id: The tracking ID + documents: List of documents associated with this track_id + total_count: Total number of documents for this track_id + status_summary: Count of documents by status + """ + + track_id: str = Field(description="The tracking ID") + documents: List[DocStatusResponse] = Field( + description="List of documents associated with this track_id" + ) + total_count: int = Field(description="Total number of documents for this track_id") + status_summary: Dict[str, int] = Field(description="Count of documents by status") + + class Config: + json_schema_extra = { + "example": { + "track_id": "upload_20250729_170612_abc123", + "documents": [ + { + "id": "doc_123456", + "content_summary": "Research paper on machine learning", + "content_length": 15240, + "status": "PROCESSED", + "created_at": "2025-03-31T12:34:56", + "updated_at": "2025-03-31T12:35:30", + "track_id": "upload_20250729_170612_abc123", + "chunks_count": 12, + "error": None, + "metadata": {"author": "John Doe", "year": 2025}, + "file_path": "research_paper.pdf", + } + ], + "total_count": 1, + "status_summary": {"PROCESSED": 1}, + } + } + + class PipelineStatusResponse(BaseModel): """Response model for pipeline status @@ -549,14 +605,17 @@ class DocumentManager: return any(filename.lower().endswith(ext) for ext in self.supported_extensions) -async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool: +async def pipeline_enqueue_file( + rag: LightRAG, file_path: Path, track_id: str = None +) -> tuple[bool, str]: """Add a file to the queue for processing Args: rag: LightRAG instance file_path: Path to the saved file + track_id: Optional tracking ID, if not provided will be generated Returns: - bool: True if the file was successfully enqueued, False otherwise + tuple: (success: bool, track_id: str) """ try: @@ -730,9 +789,17 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool: f"File contains only whitespace characters. file_paths={file_path.name}" ) - await rag.apipeline_enqueue_documents(content, file_paths=file_path.name) + # Generate track_id if not provided + if track_id is None: + from lightrag.utils import generate_track_id + + track_id = generate_track_id("upload") + + returned_track_id = await rag.ainsert( + content, file_paths=file_path.name, track_id=track_id + ) logger.info(f"Successfully fetched and enqueued file: {file_path.name}") - return True + return True, returned_track_id else: logger.error(f"No content could be extracted from file: {file_path.name}") @@ -745,18 +812,22 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool: file_path.unlink() except Exception as e: logger.error(f"Error deleting file {file_path}: {str(e)}") - return False + return False, "" -async def pipeline_index_file(rag: LightRAG, file_path: Path): - """Index a file +async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None): + """Index a file with track_id Args: rag: LightRAG instance file_path: Path to the saved file + track_id: Optional tracking ID """ try: - if await pipeline_enqueue_file(rag, file_path): + success, returned_track_id = await pipeline_enqueue_file( + rag, file_path, track_id + ) + if success: await rag.apipeline_process_enqueue_documents() except Exception as e: @@ -794,14 +865,18 @@ async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]): async def pipeline_index_texts( - rag: LightRAG, texts: List[str], file_sources: List[str] = None + rag: LightRAG, + texts: List[str], + file_sources: List[str] = None, + track_id: str = None, ): - """Index a list of texts + """Index a list of texts with track_id Args: rag: LightRAG instance texts: The texts to index file_sources: Sources of the texts + track_id: Optional tracking ID """ if not texts: return @@ -811,7 +886,9 @@ async def pipeline_index_texts( file_sources.append("unknown_source") for _ in range(len(file_sources), len(texts)) ] - await rag.apipeline_enqueue_documents(input=texts, file_paths=file_sources) + await rag.apipeline_enqueue_documents( + input=texts, file_paths=file_sources, track_id=track_id + ) await rag.apipeline_process_enqueue_documents() @@ -1080,18 +1157,26 @@ def create_document_routes( return InsertResponse( status="duplicated", message=f"File '{safe_filename}' already exists in the input directory.", + track_id="", ) with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) - # Add to background tasks - background_tasks.add_task(pipeline_index_file, rag, file_path) - - return InsertResponse( - status="success", - message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.", - ) + # Add to background tasks and get track_id + success, track_id = await pipeline_enqueue_file(rag, file_path) + if success: + background_tasks.add_task(rag.apipeline_process_enqueue_documents) + return InsertResponse( + status="success", + message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.", + track_id=track_id, + ) + else: + raise HTTPException( + status_code=500, + detail=f"Failed to enqueue file '{safe_filename}' for processing.", + ) except Exception as e: logger.error(f"Error /documents/upload: {file.filename}: {str(e)}") logger.error(traceback.format_exc()) @@ -1120,15 +1205,20 @@ def create_document_routes( HTTPException: If an error occurs during text processing (500). """ try: - background_tasks.add_task( - pipeline_index_texts, - rag, - [request.text], - file_sources=[request.file_source], + from lightrag.utils import generate_track_id + + # Generate track_id for text insertion + track_id = generate_track_id("insert") + + # Insert text and get track_id + returned_track_id = await rag.ainsert( + request.text, file_paths=request.file_source, track_id=track_id ) + return InsertResponse( status="success", message="Text successfully received. Processing will continue in background.", + track_id=returned_track_id, ) except Exception as e: logger.error(f"Error /documents/text: {str(e)}") @@ -1160,18 +1250,23 @@ def create_document_routes( HTTPException: If an error occurs during text processing (500). """ try: - background_tasks.add_task( - pipeline_index_texts, - rag, - request.texts, - file_sources=request.file_sources, + from lightrag.utils import generate_track_id + + # Generate track_id for texts insertion + track_id = generate_track_id("insert") + + # Insert texts and get track_id + returned_track_id = await rag.ainsert( + request.texts, file_paths=request.file_sources, track_id=track_id ) + return InsertResponse( status="success", - message="Text successfully received. Processing will continue in background.", + message="Texts successfully received. Processing will continue in background.", + track_id=returned_track_id, ) except Exception as e: - logger.error(f"Error /documents/text: {str(e)}") + logger.error(f"Error /documents/texts: {str(e)}") logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @@ -1473,6 +1568,7 @@ def create_document_routes( status=doc_status.status, created_at=format_datetime(doc_status.created_at), updated_at=format_datetime(doc_status.updated_at), + track_id=doc_status.track_id, chunks_count=doc_status.chunks_count, error=doc_status.error, metadata=doc_status.metadata, @@ -1699,4 +1795,77 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=error_msg) + @router.get( + "/track_status/{track_id}", + response_model=TrackStatusResponse, + dependencies=[Depends(combined_auth)], + ) + async def get_track_status(track_id: str) -> TrackStatusResponse: + """ + Get the processing status of documents by tracking ID. + + This endpoint retrieves all documents associated with a specific tracking ID, + allowing users to monitor the processing progress of their uploaded files or inserted texts. + + Args: + track_id (str): The tracking ID returned from upload, text, or texts endpoints + + Returns: + TrackStatusResponse: A response object containing: + - track_id: The tracking ID + - documents: List of documents associated with this track_id + - total_count: Total number of documents for this track_id + + Raises: + HTTPException: If track_id is invalid (400) or an error occurs (500). + """ + try: + # Validate track_id + if not track_id or not track_id.strip(): + raise HTTPException(status_code=400, detail="Track ID cannot be empty") + + track_id = track_id.strip() + + # Get documents by track_id + docs_by_track_id = await rag.aget_docs_by_track_id(track_id) + + # Convert to response format + documents = [] + status_summary = {} + + for doc_id, doc_status in docs_by_track_id.items(): + documents.append( + DocStatusResponse( + id=doc_id, + content_summary=doc_status.content_summary, + content_length=doc_status.content_length, + status=doc_status.status, + created_at=format_datetime(doc_status.created_at), + updated_at=format_datetime(doc_status.updated_at), + track_id=doc_status.track_id, + chunks_count=doc_status.chunks_count, + error=doc_status.error, + metadata=doc_status.metadata, + file_path=doc_status.file_path, + ) + ) + + # Build status summary + status_key = doc_status.status.value + status_summary[status_key] = status_summary.get(status_key, 0) + 1 + + return TrackStatusResponse( + track_id=track_id, + documents=documents, + total_count=len(documents), + status_summary=status_summary, + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting track status for {track_id}: {str(e)}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + return router diff --git a/lightrag/base.py b/lightrag/base.py index 9af3250a..ed50c324 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -641,6 +641,8 @@ class DocProcessingStatus: """ISO format timestamp when document was created""" updated_at: str """ISO format timestamp when document was last updated""" + track_id: str | None = None + """Tracking ID for monitoring progress""" chunks_count: int | None = None """Number of chunks after splitting, used for processing""" chunks_list: list[str] | None = field(default_factory=list) @@ -665,6 +667,12 @@ class DocStatusStorage(BaseKVStorage, ABC): ) -> dict[str, DocProcessingStatus]: """Get all documents with a specific status""" + @abstractmethod + async def get_docs_by_track_id( + self, track_id: str + ) -> dict[str, DocProcessingStatus]: + """Get all documents with a specific track_id""" + async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: """Drop cache is not supported for Doc Status storage""" return False diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 46673521..a3695891 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -104,6 +104,26 @@ class JsonDocStatusStorage(DocStatusStorage): continue return result + async def get_docs_by_track_id( + self, track_id: str + ) -> dict[str, DocProcessingStatus]: + """Get all documents with a specific track_id""" + result = {} + async with self._storage_lock: + for k, v in self._data.items(): + if v.get("track_id") == track_id: + try: + # Make a copy of the data to avoid modifying the original + data = v.copy() + # If file_path is not in data, use document id as file path + if "file_path" not in data: + data["file_path"] = "no-file-path" + result[k] = DocProcessingStatus(**data) + except KeyError as e: + logger.error(f"Missing required field for document {k}: {e}") + continue + return result + async def index_done_callback(self) -> None: async with self._storage_lock: if self.storage_updated.value: diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index 1d6dc04b..84f78f77 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -321,6 +321,10 @@ class MongoDocStatusStorage(DocStatusStorage): if self.db is None: self.db = await ClientManager.get_client() self._data = await get_or_create_collection(self.db, self._collection_name) + + # Create track_id index for better query performance + await self.create_track_id_index_if_not_exists() + logger.debug(f"Use MongoDB as DocStatus {self._collection_name}") async def finalize(self): @@ -386,6 +390,26 @@ class MongoDocStatusStorage(DocStatusStorage): continue return processed_result + async def get_docs_by_track_id( + self, track_id: str + ) -> dict[str, DocProcessingStatus]: + """Get all documents with a specific track_id""" + cursor = self._data.find({"track_id": track_id}) + result = await cursor.to_list() + processed_result = {} + for doc in result: + try: + # Make a copy of the data to avoid modifying the original + data = doc.copy() + # If file_path is not in data, use document id as file path + if "file_path" not in data: + data["file_path"] = "no-file-path" + processed_result[doc["_id"]] = DocProcessingStatus(**data) + except KeyError as e: + logger.error(f"Missing required field for document {doc['_id']}: {e}") + continue + return processed_result + async def index_done_callback(self) -> None: # Mongo handles persistence automatically pass @@ -414,6 +438,30 @@ class MongoDocStatusStorage(DocStatusStorage): async def delete(self, ids: list[str]) -> None: await self._data.delete_many({"_id": {"$in": ids}}) + async def create_track_id_index_if_not_exists(self): + """Create track_id index for better query performance""" + try: + # Check if index already exists + existing_indexes = await self._data.list_indexes().to_list(length=None) + track_id_index_exists = any( + "track_id" in idx.get("key", {}) for idx in existing_indexes + ) + + if not track_id_index_exists: + await self._data.create_index("track_id") + logger.info( + f"Created track_id index for collection {self._collection_name}" + ) + else: + logger.debug( + f"track_id index already exists for collection {self._collection_name}" + ) + + except PyMongoError as e: + logger.error( + f"Error creating track_id index for {self._collection_name}: {e}" + ) + @final @dataclass diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index d2b170bf..c08c800b 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -567,6 +567,35 @@ class PostgreSQLDB: f"Failed to add llm_cache_list column to LIGHTRAG_DOC_CHUNKS: {e}" ) + async def _migrate_doc_status_add_track_id(self): + """Add track_id column to LIGHTRAG_DOC_STATUS table if it doesn't exist""" + try: + # Check if track_id column exists + check_column_sql = """ + SELECT column_name + FROM information_schema.columns + WHERE table_name = 'lightrag_doc_status' + AND column_name = 'track_id' + """ + + column_info = await self.query(check_column_sql) + if not column_info: + logger.info("Adding track_id column to LIGHTRAG_DOC_STATUS table") + add_column_sql = """ + ALTER TABLE LIGHTRAG_DOC_STATUS + ADD COLUMN track_id VARCHAR(255) NULL + """ + await self.execute(add_column_sql) + logger.info( + "Successfully added track_id column to LIGHTRAG_DOC_STATUS table" + ) + else: + logger.info( + "track_id column already exists in LIGHTRAG_DOC_STATUS table" + ) + except Exception as e: + logger.warning(f"Failed to add track_id column to LIGHTRAG_DOC_STATUS: {e}") + async def _migrate_field_lengths(self): """Migrate database field lengths: entity_name, source_id, target_id, and file_path""" # Define the field changes needed @@ -785,6 +814,14 @@ class PostgreSQLDB: except Exception as e: logger.error(f"PostgreSQL, Failed to migrate field lengths: {e}") + # Migrate doc status to add track_id field if needed + try: + await self._migrate_doc_status_add_track_id() + except Exception as e: + logger.error( + f"PostgreSQL, Failed to migrate doc status track_id field: {e}" + ) + async def query( self, sql: str, @@ -1775,6 +1812,43 @@ class PGDocStatusStorage(DocStatusStorage): return docs_by_status + async def get_docs_by_track_id( + self, track_id: str + ) -> dict[str, DocProcessingStatus]: + """Get all documents with a specific track_id""" + sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and track_id=$2" + params = {"workspace": self.db.workspace, "track_id": track_id} + result = await self.db.query(sql, params, True) + + docs_by_track_id = {} + for element in result: + # Parse chunks_list JSON string back to list + chunks_list = element.get("chunks_list", []) + if isinstance(chunks_list, str): + try: + chunks_list = json.loads(chunks_list) + except json.JSONDecodeError: + chunks_list = [] + + # Convert datetime objects to ISO format strings with timezone info + created_at = self._format_datetime_with_timezone(element["created_at"]) + updated_at = self._format_datetime_with_timezone(element["updated_at"]) + + docs_by_track_id[element["id"]] = DocProcessingStatus( + # content=element["content"], + content_summary=element["content_summary"], + content_length=element["content_length"], + status=element["status"], + created_at=created_at, + updated_at=updated_at, + chunks_count=element["chunks_count"], + file_path=element["file_path"], + chunks_list=chunks_list, + track_id=element.get("track_id"), + ) + + return docs_by_track_id + async def index_done_callback(self) -> None: # PG handles persistence automatically pass @@ -1843,10 +1917,10 @@ class PGDocStatusStorage(DocStatusStorage): logger.warning(f"Unable to parse datetime string: {dt_str}") return None - # Modified SQL to include created_at, updated_at, and chunks_list in both INSERT and UPDATE operations + # Modified SQL to include created_at, updated_at, chunks_list, and track_id in both INSERT and UPDATE operations # All fields are updated from the input data in both INSERT and UPDATE cases - sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status,file_path,chunks_list,created_at,updated_at) - values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) + sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status,file_path,chunks_list,track_id,created_at,updated_at) + values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) on conflict(id,workspace) do update set content_summary = EXCLUDED.content_summary, content_length = EXCLUDED.content_length, @@ -1854,6 +1928,7 @@ class PGDocStatusStorage(DocStatusStorage): status = EXCLUDED.status, file_path = EXCLUDED.file_path, chunks_list = EXCLUDED.chunks_list, + track_id = EXCLUDED.track_id, created_at = EXCLUDED.created_at, updated_at = EXCLUDED.updated_at""" for k, v in data.items(): @@ -1861,7 +1936,7 @@ class PGDocStatusStorage(DocStatusStorage): created_at = parse_datetime(v.get("created_at")) updated_at = parse_datetime(v.get("updated_at")) - # chunks_count and chunks_list are optional + # chunks_count, chunks_list, and track_id are optional await self.db.execute( sql, { @@ -1874,6 +1949,7 @@ class PGDocStatusStorage(DocStatusStorage): "status": v["status"], "file_path": v["file_path"], "chunks_list": json.dumps(v.get("chunks_list", [])), + "track_id": v.get("track_id"), # Add track_id support "created_at": created_at, # Use the converted datetime object "updated_at": updated_at, # Use the converted datetime object }, @@ -3375,6 +3451,7 @@ TABLES = { status varchar(64) NULL, file_path TEXT NULL, chunks_list JSONB NULL DEFAULT '[]'::jsonb, + track_id varchar(255) NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id) diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index 6518cf04..ca9d51f5 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -804,6 +804,55 @@ class RedisDocStatusStorage(DocStatusStorage): return result + async def get_docs_by_track_id( + self, track_id: str + ) -> dict[str, DocProcessingStatus]: + """Get all documents with a specific track_id""" + result = {} + async with self._get_redis_connection() as redis: + try: + # Use SCAN to iterate through all keys in the namespace + cursor = 0 + while True: + cursor, keys = await redis.scan( + cursor, match=f"{self.namespace}:*", count=1000 + ) + if keys: + # Get all values in batch + pipe = redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + # Filter by track_id and create DocProcessingStatus objects + for key, value in zip(keys, values): + if value: + try: + doc_data = json.loads(value) + if doc_data.get("track_id") == track_id: + # Extract document ID from key + doc_id = key.split(":", 1)[1] + + # Make a copy of the data to avoid modifying the original + data = doc_data.copy() + # If file_path is not in data, use document id as file path + if "file_path" not in data: + data["file_path"] = "no-file-path" + + result[doc_id] = DocProcessingStatus(**data) + except (json.JSONDecodeError, KeyError) as e: + logger.error( + f"Error processing document {key}: {e}" + ) + continue + + if cursor == 0: + break + except Exception as e: + logger.error(f"Error getting docs by track_id: {e}") + + return result + async def index_done_callback(self) -> None: """Redis handles persistence automatically""" pass diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 336604e1..df4e92c1 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -88,6 +88,7 @@ from .utils import ( get_content_summary, clean_text, check_storage_env_vars, + generate_track_id, logger, ) from .types import KnowledgeGraph @@ -659,7 +660,8 @@ class LightRAG: split_by_character_only: bool = False, ids: str | list[str] | None = None, file_paths: str | list[str] | None = None, - ) -> None: + track_id: str | None = None, + ) -> str: """Sync Insert documents with checkpoint support Args: @@ -670,11 +672,20 @@ class LightRAG: split_by_character is None, this parameter is ignored. ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated file_paths: single string of the file path or list of file paths, used for citation + track_id: tracking ID for monitoring processing status, if not provided, will be generated + + Returns: + str: tracking ID for monitoring processing status """ loop = always_get_an_event_loop() - loop.run_until_complete( + return loop.run_until_complete( self.ainsert( - input, split_by_character, split_by_character_only, ids, file_paths + input, + split_by_character, + split_by_character_only, + ids, + file_paths, + track_id, ) ) @@ -685,7 +696,8 @@ class LightRAG: split_by_character_only: bool = False, ids: str | list[str] | None = None, file_paths: str | list[str] | None = None, - ) -> None: + track_id: str | None = None, + ) -> str: """Async Insert documents with checkpoint support Args: @@ -696,12 +708,22 @@ class LightRAG: split_by_character is None, this parameter is ignored. ids: list of unique document IDs, if not provided, MD5 hash IDs will be generated file_paths: list of file paths corresponding to each document, used for citation + track_id: tracking ID for monitoring processing status, if not provided, will be generated + + Returns: + str: tracking ID for monitoring processing status """ - await self.apipeline_enqueue_documents(input, ids, file_paths) + # Generate track_id if not provided + if track_id is None: + track_id = generate_track_id("insert") + + await self.apipeline_enqueue_documents(input, ids, file_paths, track_id) await self.apipeline_process_enqueue_documents( split_by_character, split_by_character_only ) + return track_id + # TODO: deprecated, use insert instead def insert_custom_chunks( self, @@ -779,6 +801,7 @@ class LightRAG: input: str | list[str], ids: list[str] | None = None, file_paths: str | list[str] | None = None, + track_id: str | None = None, ) -> None: """ Pipeline for Processing Documents @@ -874,6 +897,7 @@ class LightRAG: "file_path": content_data[ "file_path" ], # Store file path in document status + "track_id": track_id, # Store track_id in document status } for id_, content_data in contents.items() } @@ -2199,6 +2223,19 @@ class LightRAG: """ return await self.doc_status.get_status_counts() + async def aget_docs_by_track_id( + self, track_id: str + ) -> dict[str, DocProcessingStatus]: + """Get documents by track_id + + Args: + track_id: The tracking ID to search for + + Returns: + Dict with document id as keys and document status as values + """ + return await self.doc_status.get_docs_by_track_id(track_id) + async def get_entity_info( self, entity_name: str, include_vector_data: bool = False ) -> dict[str, str | None | dict[str, str]]: diff --git a/lightrag/utils.py b/lightrag/utils.py index 5f4c5a12..06fcbfe9 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -9,7 +9,9 @@ import logging import logging.handlers import os import re +import uuid from dataclasses import dataclass +from datetime import datetime from functools import wraps from hashlib import md5 from typing import Any, Protocol, Callable, TYPE_CHECKING, List @@ -1979,3 +1981,17 @@ def build_file_path(already_file_paths, data_list, target): f"Length of file_path exceeds {target}, ignoring new file: {file_paths_ignore}" ) return file_paths + + +def generate_track_id(prefix: str = "upload") -> str: + """Generate a unique tracking ID with timestamp and UUID + + Args: + prefix: Prefix for the track ID (e.g., 'upload', 'insert') + + Returns: + str: Unique tracking ID in format: {prefix}_{timestamp}_{uuid} + """ + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + unique_id = str(uuid.uuid4())[:8] # Use first 8 characters of UUID + return f"{prefix}_{timestamp}_{unique_id}"