From 40afb0441a423069fcd632bccbfbbdc6ddc3c5ac Mon Sep 17 00:00:00 2001 From: Giulio Grassia Date: Thu, 25 Sep 2025 14:21:31 +0200 Subject: [PATCH] fix (metadata): Corrected metadata management in enqueued documents --- lightrag/api/routers/document_routes.py | 4 +-- lightrag/lightrag.py | 39 ++++++++++++------------- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 259f9b41..e7e068fa 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1810,7 +1810,7 @@ def create_document_routes( """ # Use the existing pipeline to enqueue the file success, returned_track_id = await pipeline_enqueue_file( - rag, file_path, track_id + rag, file_path, track_id, metadata ) if success: @@ -1819,7 +1819,7 @@ def create_document_routes( logger.error("Failed to enqueue file with metadata") # Trigger the pipeline processing - await rag.apipeline_process_enqueue_documents(metadata=metadata) + await rag.apipeline_process_enqueue_documents() return success, returned_track_id diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index d6945493..a37f4c09 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1107,9 +1107,9 @@ class LightRAG: "file_path" ], # Store file path in document status "track_id": track_id, # Store track_id in document status - "metadata": metadata, # added provided custom metadata + "metadata": metadata[i] if isinstance(metadata, list) and i < len(metadata) else metadata, # added provided custom metadata per document } - for id_, content_data in contents.items() + for i, (id_, content_data) in enumerate(contents.items()) } # 3. Filter out already processed documents @@ -1337,7 +1337,7 @@ class LightRAG: "track_id": getattr(status_doc, "track_id", ""), # Clear any error messages and processing metadata "error_msg": "", - "metadata": {}, + "metadata": getattr(status_doc.metadata), } # Update the status in to_process_docs as well @@ -1360,7 +1360,6 @@ class LightRAG: self, split_by_character: str | None = None, split_by_character_only: bool = False, - metadata: dict | None = None, ) -> None: """ Process pending documents by splitting them into chunks, processing @@ -1482,9 +1481,9 @@ class LightRAG: pipeline_status: dict, pipeline_status_lock: asyncio.Lock, semaphore: asyncio.Semaphore, - metadata: dict | None = None, ) -> None: """Process single document""" + doc_metadata = getattr(status_doc, "metadata", None) file_extraction_stage_ok = False async with semaphore: nonlocal processed_count @@ -1496,6 +1495,7 @@ class LightRAG: # Get file path from status document file_path = getattr( status_doc, "file_path", "unknown_source" + ) async with pipeline_status_lock: @@ -1538,7 +1538,7 @@ class LightRAG: "full_doc_id": doc_id, "file_path": file_path, # Add file path to each chunk "llm_cache_list": [], # Initialize empty LLM cache list for each chunk - "metadata": metadata, + "metadata": doc_metadata, } for dp in self.chunking_func( self.tokenizer, @@ -1558,7 +1558,7 @@ class LightRAG: # Process document in two stages # Stage 1: Process text chunks and docs (parallel execution) - metadata["processing_start_time"] = processing_start_time + doc_metadata["processing_start_time"] = processing_start_time doc_status_task = asyncio.create_task( self.doc_status.upsert( @@ -1577,7 +1577,7 @@ class LightRAG: ).isoformat(), "file_path": file_path, "track_id": status_doc.track_id, # Preserve existing track_id - "metadata": metadata, + "metadata": doc_metadata, } } ) @@ -1604,7 +1604,7 @@ class LightRAG: entity_relation_task = asyncio.create_task( self._process_entity_relation_graph( chunks, - metadata, + doc_metadata, pipeline_status, pipeline_status_lock, ) @@ -1640,8 +1640,8 @@ class LightRAG: processing_end_time = int(time.time()) # Update document status to failed - metadata["processing_start_time"] = processing_start_time - metadata["processing_end_time"] = processing_end_time + doc_metadata["processing_start_time"] = processing_start_time + doc_metadata["processing_end_time"] = processing_end_time await self.doc_status.upsert( { doc_id: { @@ -1655,7 +1655,7 @@ class LightRAG: ).isoformat(), "file_path": file_path, "track_id": status_doc.track_id, # Preserve existing track_id - "metadata": metadata, + "metadata": doc_metadata, } } ) @@ -1680,15 +1680,15 @@ class LightRAG: current_file_number=current_file_number, total_files=total_files, file_path=file_path, - metadata=metadata, # NEW: Pass metadata to merge function + metadata=doc_metadata, # NEW: Pass metadata to merge function ) # Record processing end time processing_end_time = int(time.time()) - metadata["processing_start_time"] = ( + doc_metadata["processing_start_time"] = ( processing_start_time ) - metadata["processing_end_time"] = processing_end_time + doc_metadata["processing_end_time"] = processing_end_time await self.doc_status.upsert( { @@ -1704,7 +1704,7 @@ class LightRAG: ).isoformat(), "file_path": file_path, "track_id": status_doc.track_id, # Preserve existing track_id - "metadata": metadata, + "metadata": doc_metadata, } } ) @@ -1742,10 +1742,10 @@ class LightRAG: processing_end_time = int(time.time()) # Update document status to failed - metadata["processing_start_time"] = ( + doc_metadata["processing_start_time"] = ( processing_start_time ) - metadata["processing_end_time"] = processing_end_time + doc_metadata["processing_end_time"] = processing_end_time await self.doc_status.upsert( { @@ -1758,7 +1758,7 @@ class LightRAG: "updated_at": datetime.now().isoformat(), "file_path": file_path, "track_id": status_doc.track_id, # Preserve existing track_id - "metadata": metadata, + "metadata": doc_metadata, } } ) @@ -1775,7 +1775,6 @@ class LightRAG: pipeline_status, pipeline_status_lock, semaphore, - metadata, ) )