Fix track_id bugs and add track_id to scanning response

This commit is contained in:
yangdx 2025-07-30 03:06:20 +08:00
parent 75de799353
commit 7207598fc4
2 changed files with 63 additions and 40 deletions

View file

@ -24,6 +24,7 @@ from pydantic import BaseModel, Field, field_validator
from lightrag import LightRAG
from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus
from lightrag.utils import generate_track_id
from lightrag.api.utils_api import get_combined_auth_dependency
from ..config import global_args
@ -113,6 +114,7 @@ class ScanResponse(BaseModel):
Attributes:
status: Status of the scanning operation
message: Optional message with additional details
track_id: Tracking ID for monitoring scanning progress
"""
status: Literal["scanning_started"] = Field(
@ -121,12 +123,14 @@ class ScanResponse(BaseModel):
message: Optional[str] = Field(
default=None, description="Additional details about the scanning operation"
)
track_id: str = Field(description="Tracking ID for monitoring scanning progress")
class Config:
json_schema_extra = {
"example": {
"status": "scanning_started",
"message": "Scanning process has been initiated in the background",
"track_id": "scan_20250729_170612_abc123",
}
}
@ -791,15 +795,14 @@ async def pipeline_enqueue_file(
# Generate track_id if not provided
if track_id is None:
from lightrag.utils import generate_track_id
track_id = generate_track_id("unkown")
track_id = generate_track_id("upload")
returned_track_id = await rag.ainsert(
await rag.apipeline_enqueue_documents(
content, file_paths=file_path.name, track_id=track_id
)
logger.info(f"Successfully fetched and enqueued file: {file_path.name}")
return True, returned_track_id
return True, track_id
else:
logger.error(f"No content could be extracted from file: {file_path.name}")
@ -835,12 +838,15 @@ async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = No
logger.error(traceback.format_exc())
async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]):
async def pipeline_index_files(
rag: LightRAG, file_paths: List[Path], track_id: str = None
):
"""Index multiple files sequentially to avoid high CPU load
Args:
rag: LightRAG instance
file_paths: Paths to the files to index
track_id: Optional tracking ID to pass to all files
"""
if not file_paths:
return
@ -851,9 +857,10 @@ async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]):
collator = Collator()
sorted_file_paths = sorted(file_paths, key=lambda p: collator.sort_key(str(p)))
# Process files sequentially
# Process files sequentially with track_id
for file_path in sorted_file_paths:
if await pipeline_enqueue_file(rag, file_path):
success, _ = await pipeline_enqueue_file(rag, file_path, track_id)
if success:
enqueued = True
# Process the queue only if at least one file was successfully enqueued
@ -916,8 +923,16 @@ async def save_temp_file(input_dir: Path, file: UploadFile = File(...)) -> Path:
return temp_path
async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
"""Background task to scan and index documents"""
async def run_scanning_process(
rag: LightRAG, doc_manager: DocumentManager, track_id: str = None
):
"""Background task to scan and index documents
Args:
rag: LightRAG instance
doc_manager: DocumentManager instance
track_id: Optional tracking ID to pass to all scanned files
"""
try:
new_files = doc_manager.scan_directory_for_new_files()
total_files = len(new_files)
@ -926,8 +941,8 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
if not new_files:
return
# Process all files at once
await pipeline_index_files(rag, new_files)
# Process all files at once with track_id
await pipeline_index_files(rag, new_files, track_id)
logger.info(f"Scanning process completed: {total_files} files Processed.")
except Exception as e:
@ -1108,13 +1123,17 @@ def create_document_routes(
that fact.
Returns:
ScanResponse: A response object containing the scanning status
ScanResponse: A response object containing the scanning status and track_id
"""
# Start the scanning process in the background
background_tasks.add_task(run_scanning_process, rag, doc_manager)
# Generate track_id with "scan" prefix for scanning operation
track_id = generate_track_id("scan")
# Start the scanning process in the background with track_id
background_tasks.add_task(run_scanning_process, rag, doc_manager, track_id)
return ScanResponse(
status="scanning_started",
message="Scanning process has been initiated in the background",
track_id=track_id,
)
@router.post(
@ -1163,20 +1182,17 @@ def create_document_routes(
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
track_id = generate_track_id("upload")
# Add to background tasks and get track_id
success, track_id = await pipeline_enqueue_file(rag, file_path)
if success:
background_tasks.add_task(rag.apipeline_process_enqueue_documents)
return InsertResponse(
status="success",
message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.",
track_id=track_id,
)
else:
raise HTTPException(
status_code=500,
detail=f"Failed to enqueue file '{safe_filename}' for processing.",
)
background_tasks.add_task(pipeline_index_file, rag, file_path, track_id)
return InsertResponse(
status="success",
message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.",
track_id=track_id,
)
except Exception as e:
logger.error(f"Error /documents/upload: {file.filename}: {str(e)}")
logger.error(traceback.format_exc())
@ -1205,20 +1221,21 @@ def create_document_routes(
HTTPException: If an error occurs during text processing (500).
"""
try:
from lightrag.utils import generate_track_id
# Generate track_id for text insertion
track_id = generate_track_id("insert")
# Insert text and get track_id
returned_track_id = await rag.ainsert(
request.text, file_paths=request.file_source, track_id=track_id
background_tasks.add_task(
pipeline_index_texts,
rag,
[request.text],
file_sources=[request.file_source],
track_id=track_id,
)
return InsertResponse(
status="success",
message="Text successfully received. Processing will continue in background.",
track_id=returned_track_id,
track_id=track_id,
)
except Exception as e:
logger.error(f"Error /documents/text: {str(e)}")
@ -1250,20 +1267,21 @@ def create_document_routes(
HTTPException: If an error occurs during text processing (500).
"""
try:
from lightrag.utils import generate_track_id
# Generate track_id for texts insertion
track_id = generate_track_id("insert")
# Insert texts and get track_id
returned_track_id = await rag.ainsert(
request.texts, file_paths=request.file_sources, track_id=track_id
background_tasks.add_task(
pipeline_index_texts,
rag,
request.texts,
file_sources=request.file_sources,
track_id=track_id,
)
return InsertResponse(
status="success",
message="Texts successfully received. Processing will continue in background.",
track_id=returned_track_id,
track_id=track_id,
)
except Exception as e:
logger.error(f"Error /documents/texts: {str(e)}")

View file

@ -940,6 +940,7 @@ class LightRAG:
# Store document status (without content)
await self.doc_status.upsert(new_docs)
logger.info(f"New documents: {new_docs}")
logger.info(f"Stored {len(new_docs)} new unique documents")
async def apipeline_process_enqueue_documents(
@ -1130,6 +1131,7 @@ class LightRAG:
timezone.utc
).isoformat(),
"file_path": file_path,
"track_id": status_doc.track_id, # Preserve existing track_id
"metadata": {
"processing_start_time": processing_start_time
},
@ -1206,6 +1208,7 @@ class LightRAG:
timezone.utc
).isoformat(),
"file_path": file_path,
"track_id": status_doc.track_id, # Preserve existing track_id
"metadata": {
"processing_start_time": processing_start_time,
"processing_end_time": processing_end_time,
@ -1251,6 +1254,7 @@ class LightRAG:
timezone.utc
).isoformat(),
"file_path": file_path,
"track_id": status_doc.track_id, # Preserve existing track_id
"metadata": {
"processing_start_time": processing_start_time,
"processing_end_time": processing_end_time,
@ -1302,6 +1306,7 @@ class LightRAG:
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
"file_path": file_path,
"track_id": status_doc.track_id, # Preserve existing track_id
"metadata": {
"processing_start_time": processing_start_time,
"processing_end_time": processing_end_time,