From 7207598fc4fc1f3e19ad1f62d877ff80488414cf Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 30 Jul 2025 03:06:20 +0800 Subject: [PATCH] Fix track_id bugs and add track_id to scanning response --- lightrag/api/routers/document_routes.py | 98 +++++++++++++++---------- lightrag/lightrag.py | 5 ++ 2 files changed, 63 insertions(+), 40 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 9cd421bf..1203f866 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -24,6 +24,7 @@ from pydantic import BaseModel, Field, field_validator from lightrag import LightRAG from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus +from lightrag.utils import generate_track_id from lightrag.api.utils_api import get_combined_auth_dependency from ..config import global_args @@ -113,6 +114,7 @@ class ScanResponse(BaseModel): Attributes: status: Status of the scanning operation message: Optional message with additional details + track_id: Tracking ID for monitoring scanning progress """ status: Literal["scanning_started"] = Field( @@ -121,12 +123,14 @@ class ScanResponse(BaseModel): message: Optional[str] = Field( default=None, description="Additional details about the scanning operation" ) + track_id: str = Field(description="Tracking ID for monitoring scanning progress") class Config: json_schema_extra = { "example": { "status": "scanning_started", "message": "Scanning process has been initiated in the background", + "track_id": "scan_20250729_170612_abc123", } } @@ -791,15 +795,14 @@ async def pipeline_enqueue_file( # Generate track_id if not provided if track_id is None: - from lightrag.utils import generate_track_id + track_id = generate_track_id("unkown") - track_id = generate_track_id("upload") - - returned_track_id = await rag.ainsert( + await rag.apipeline_enqueue_documents( content, file_paths=file_path.name, track_id=track_id ) + logger.info(f"Successfully fetched and enqueued file: {file_path.name}") - return True, returned_track_id + return True, track_id else: logger.error(f"No content could be extracted from file: {file_path.name}") @@ -835,12 +838,15 @@ async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = No logger.error(traceback.format_exc()) -async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]): +async def pipeline_index_files( + rag: LightRAG, file_paths: List[Path], track_id: str = None +): """Index multiple files sequentially to avoid high CPU load Args: rag: LightRAG instance file_paths: Paths to the files to index + track_id: Optional tracking ID to pass to all files """ if not file_paths: return @@ -851,9 +857,10 @@ async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]): collator = Collator() sorted_file_paths = sorted(file_paths, key=lambda p: collator.sort_key(str(p))) - # Process files sequentially + # Process files sequentially with track_id for file_path in sorted_file_paths: - if await pipeline_enqueue_file(rag, file_path): + success, _ = await pipeline_enqueue_file(rag, file_path, track_id) + if success: enqueued = True # Process the queue only if at least one file was successfully enqueued @@ -916,8 +923,16 @@ async def save_temp_file(input_dir: Path, file: UploadFile = File(...)) -> Path: return temp_path -async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager): - """Background task to scan and index documents""" +async def run_scanning_process( + rag: LightRAG, doc_manager: DocumentManager, track_id: str = None +): + """Background task to scan and index documents + + Args: + rag: LightRAG instance + doc_manager: DocumentManager instance + track_id: Optional tracking ID to pass to all scanned files + """ try: new_files = doc_manager.scan_directory_for_new_files() total_files = len(new_files) @@ -926,8 +941,8 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager): if not new_files: return - # Process all files at once - await pipeline_index_files(rag, new_files) + # Process all files at once with track_id + await pipeline_index_files(rag, new_files, track_id) logger.info(f"Scanning process completed: {total_files} files Processed.") except Exception as e: @@ -1108,13 +1123,17 @@ def create_document_routes( that fact. Returns: - ScanResponse: A response object containing the scanning status + ScanResponse: A response object containing the scanning status and track_id """ - # Start the scanning process in the background - background_tasks.add_task(run_scanning_process, rag, doc_manager) + # Generate track_id with "scan" prefix for scanning operation + track_id = generate_track_id("scan") + + # Start the scanning process in the background with track_id + background_tasks.add_task(run_scanning_process, rag, doc_manager, track_id) return ScanResponse( status="scanning_started", message="Scanning process has been initiated in the background", + track_id=track_id, ) @router.post( @@ -1163,20 +1182,17 @@ def create_document_routes( with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) + track_id = generate_track_id("upload") + # Add to background tasks and get track_id - success, track_id = await pipeline_enqueue_file(rag, file_path) - if success: - background_tasks.add_task(rag.apipeline_process_enqueue_documents) - return InsertResponse( - status="success", - message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.", - track_id=track_id, - ) - else: - raise HTTPException( - status_code=500, - detail=f"Failed to enqueue file '{safe_filename}' for processing.", - ) + background_tasks.add_task(pipeline_index_file, rag, file_path, track_id) + + return InsertResponse( + status="success", + message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.", + track_id=track_id, + ) + except Exception as e: logger.error(f"Error /documents/upload: {file.filename}: {str(e)}") logger.error(traceback.format_exc()) @@ -1205,20 +1221,21 @@ def create_document_routes( HTTPException: If an error occurs during text processing (500). """ try: - from lightrag.utils import generate_track_id - # Generate track_id for text insertion track_id = generate_track_id("insert") - # Insert text and get track_id - returned_track_id = await rag.ainsert( - request.text, file_paths=request.file_source, track_id=track_id + background_tasks.add_task( + pipeline_index_texts, + rag, + [request.text], + file_sources=[request.file_source], + track_id=track_id, ) return InsertResponse( status="success", message="Text successfully received. Processing will continue in background.", - track_id=returned_track_id, + track_id=track_id, ) except Exception as e: logger.error(f"Error /documents/text: {str(e)}") @@ -1250,20 +1267,21 @@ def create_document_routes( HTTPException: If an error occurs during text processing (500). """ try: - from lightrag.utils import generate_track_id - # Generate track_id for texts insertion track_id = generate_track_id("insert") - # Insert texts and get track_id - returned_track_id = await rag.ainsert( - request.texts, file_paths=request.file_sources, track_id=track_id + background_tasks.add_task( + pipeline_index_texts, + rag, + request.texts, + file_sources=request.file_sources, + track_id=track_id, ) return InsertResponse( status="success", message="Texts successfully received. Processing will continue in background.", - track_id=returned_track_id, + track_id=track_id, ) except Exception as e: logger.error(f"Error /documents/texts: {str(e)}") diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index f610c56c..aa2db374 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -940,6 +940,7 @@ class LightRAG: # Store document status (without content) await self.doc_status.upsert(new_docs) + logger.info(f"New documents: {new_docs}") logger.info(f"Stored {len(new_docs)} new unique documents") async def apipeline_process_enqueue_documents( @@ -1130,6 +1131,7 @@ class LightRAG: timezone.utc ).isoformat(), "file_path": file_path, + "track_id": status_doc.track_id, # Preserve existing track_id "metadata": { "processing_start_time": processing_start_time }, @@ -1206,6 +1208,7 @@ class LightRAG: timezone.utc ).isoformat(), "file_path": file_path, + "track_id": status_doc.track_id, # Preserve existing track_id "metadata": { "processing_start_time": processing_start_time, "processing_end_time": processing_end_time, @@ -1251,6 +1254,7 @@ class LightRAG: timezone.utc ).isoformat(), "file_path": file_path, + "track_id": status_doc.track_id, # Preserve existing track_id "metadata": { "processing_start_time": processing_start_time, "processing_end_time": processing_end_time, @@ -1302,6 +1306,7 @@ class LightRAG: "created_at": status_doc.created_at, "updated_at": datetime.now().isoformat(), "file_path": file_path, + "track_id": status_doc.track_id, # Preserve existing track_id "metadata": { "processing_start_time": processing_start_time, "processing_end_time": processing_end_time,