fix (metadata): Corrected metadata management in enqueued documents

This commit is contained in:
Giulio Grassia 2025-09-25 14:21:31 +02:00
parent 0c721fa7f1
commit 40afb0441a
2 changed files with 21 additions and 22 deletions

View file

@ -1810,7 +1810,7 @@ def create_document_routes(
""" """
# Use the existing pipeline to enqueue the file # Use the existing pipeline to enqueue the file
success, returned_track_id = await pipeline_enqueue_file( success, returned_track_id = await pipeline_enqueue_file(
rag, file_path, track_id rag, file_path, track_id, metadata
) )
if success: if success:
@ -1819,7 +1819,7 @@ def create_document_routes(
logger.error("Failed to enqueue file with metadata") logger.error("Failed to enqueue file with metadata")
# Trigger the pipeline processing # Trigger the pipeline processing
await rag.apipeline_process_enqueue_documents(metadata=metadata) await rag.apipeline_process_enqueue_documents()
return success, returned_track_id return success, returned_track_id

View file

@ -1107,9 +1107,9 @@ class LightRAG:
"file_path" "file_path"
], # Store file path in document status ], # Store file path in document status
"track_id": track_id, # Store track_id in document status "track_id": track_id, # Store track_id in document status
"metadata": metadata, # added provided custom metadata "metadata": metadata[i] if isinstance(metadata, list) and i < len(metadata) else metadata, # added provided custom metadata per document
} }
for id_, content_data in contents.items() for i, (id_, content_data) in enumerate(contents.items())
} }
# 3. Filter out already processed documents # 3. Filter out already processed documents
@ -1337,7 +1337,7 @@ class LightRAG:
"track_id": getattr(status_doc, "track_id", ""), "track_id": getattr(status_doc, "track_id", ""),
# Clear any error messages and processing metadata # Clear any error messages and processing metadata
"error_msg": "", "error_msg": "",
"metadata": {}, "metadata": getattr(status_doc.metadata),
} }
# Update the status in to_process_docs as well # Update the status in to_process_docs as well
@ -1360,7 +1360,6 @@ class LightRAG:
self, self,
split_by_character: str | None = None, split_by_character: str | None = None,
split_by_character_only: bool = False, split_by_character_only: bool = False,
metadata: dict | None = None,
) -> None: ) -> None:
""" """
Process pending documents by splitting them into chunks, processing Process pending documents by splitting them into chunks, processing
@ -1482,9 +1481,9 @@ class LightRAG:
pipeline_status: dict, pipeline_status: dict,
pipeline_status_lock: asyncio.Lock, pipeline_status_lock: asyncio.Lock,
semaphore: asyncio.Semaphore, semaphore: asyncio.Semaphore,
metadata: dict | None = None,
) -> None: ) -> None:
"""Process single document""" """Process single document"""
doc_metadata = getattr(status_doc, "metadata", None)
file_extraction_stage_ok = False file_extraction_stage_ok = False
async with semaphore: async with semaphore:
nonlocal processed_count nonlocal processed_count
@ -1496,6 +1495,7 @@ class LightRAG:
# Get file path from status document # Get file path from status document
file_path = getattr( file_path = getattr(
status_doc, "file_path", "unknown_source" status_doc, "file_path", "unknown_source"
) )
async with pipeline_status_lock: async with pipeline_status_lock:
@ -1538,7 +1538,7 @@ class LightRAG:
"full_doc_id": doc_id, "full_doc_id": doc_id,
"file_path": file_path, # Add file path to each chunk "file_path": file_path, # Add file path to each chunk
"llm_cache_list": [], # Initialize empty LLM cache list for each chunk "llm_cache_list": [], # Initialize empty LLM cache list for each chunk
"metadata": metadata, "metadata": doc_metadata,
} }
for dp in self.chunking_func( for dp in self.chunking_func(
self.tokenizer, self.tokenizer,
@ -1558,7 +1558,7 @@ class LightRAG:
# Process document in two stages # Process document in two stages
# Stage 1: Process text chunks and docs (parallel execution) # Stage 1: Process text chunks and docs (parallel execution)
metadata["processing_start_time"] = processing_start_time doc_metadata["processing_start_time"] = processing_start_time
doc_status_task = asyncio.create_task( doc_status_task = asyncio.create_task(
self.doc_status.upsert( self.doc_status.upsert(
@ -1577,7 +1577,7 @@ class LightRAG:
).isoformat(), ).isoformat(),
"file_path": file_path, "file_path": file_path,
"track_id": status_doc.track_id, # Preserve existing track_id "track_id": status_doc.track_id, # Preserve existing track_id
"metadata": metadata, "metadata": doc_metadata,
} }
} }
) )
@ -1604,7 +1604,7 @@ class LightRAG:
entity_relation_task = asyncio.create_task( entity_relation_task = asyncio.create_task(
self._process_entity_relation_graph( self._process_entity_relation_graph(
chunks, chunks,
metadata, doc_metadata,
pipeline_status, pipeline_status,
pipeline_status_lock, pipeline_status_lock,
) )
@ -1640,8 +1640,8 @@ class LightRAG:
processing_end_time = int(time.time()) processing_end_time = int(time.time())
# Update document status to failed # Update document status to failed
metadata["processing_start_time"] = processing_start_time doc_metadata["processing_start_time"] = processing_start_time
metadata["processing_end_time"] = processing_end_time doc_metadata["processing_end_time"] = processing_end_time
await self.doc_status.upsert( await self.doc_status.upsert(
{ {
doc_id: { doc_id: {
@ -1655,7 +1655,7 @@ class LightRAG:
).isoformat(), ).isoformat(),
"file_path": file_path, "file_path": file_path,
"track_id": status_doc.track_id, # Preserve existing track_id "track_id": status_doc.track_id, # Preserve existing track_id
"metadata": metadata, "metadata": doc_metadata,
} }
} }
) )
@ -1680,15 +1680,15 @@ class LightRAG:
current_file_number=current_file_number, current_file_number=current_file_number,
total_files=total_files, total_files=total_files,
file_path=file_path, file_path=file_path,
metadata=metadata, # NEW: Pass metadata to merge function metadata=doc_metadata, # NEW: Pass metadata to merge function
) )
# Record processing end time # Record processing end time
processing_end_time = int(time.time()) processing_end_time = int(time.time())
metadata["processing_start_time"] = ( doc_metadata["processing_start_time"] = (
processing_start_time processing_start_time
) )
metadata["processing_end_time"] = processing_end_time doc_metadata["processing_end_time"] = processing_end_time
await self.doc_status.upsert( await self.doc_status.upsert(
{ {
@ -1704,7 +1704,7 @@ class LightRAG:
).isoformat(), ).isoformat(),
"file_path": file_path, "file_path": file_path,
"track_id": status_doc.track_id, # Preserve existing track_id "track_id": status_doc.track_id, # Preserve existing track_id
"metadata": metadata, "metadata": doc_metadata,
} }
} }
) )
@ -1742,10 +1742,10 @@ class LightRAG:
processing_end_time = int(time.time()) processing_end_time = int(time.time())
# Update document status to failed # Update document status to failed
metadata["processing_start_time"] = ( doc_metadata["processing_start_time"] = (
processing_start_time processing_start_time
) )
metadata["processing_end_time"] = processing_end_time doc_metadata["processing_end_time"] = processing_end_time
await self.doc_status.upsert( await self.doc_status.upsert(
{ {
@ -1758,7 +1758,7 @@ class LightRAG:
"updated_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat(),
"file_path": file_path, "file_path": file_path,
"track_id": status_doc.track_id, # Preserve existing track_id "track_id": status_doc.track_id, # Preserve existing track_id
"metadata": metadata, "metadata": doc_metadata,
} }
} }
) )
@ -1775,7 +1775,6 @@ class LightRAG:
pipeline_status, pipeline_status,
pipeline_status_lock, pipeline_status_lock,
semaphore, semaphore,
metadata,
) )
) )