Merge pull request #1883 from danielaskdd/add-metadata

feat: add processing time tracking to document status with metadata field
This commit is contained in:
Daniel.y 2025-07-30 09:15:36 +08:00 committed by GitHub
commit 6d0a644844
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 304 additions and 73 deletions

View file

@ -1 +1 @@
__api_version__ = "0192"
__api_version__ = "0193"

View file

@ -24,6 +24,7 @@ from pydantic import BaseModel, Field, field_validator
from lightrag import LightRAG
from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus
from lightrag.utils import generate_track_id
from lightrag.api.utils_api import get_combined_auth_dependency
from ..config import global_args
@ -113,6 +114,7 @@ class ScanResponse(BaseModel):
Attributes:
status: Status of the scanning operation
message: Optional message with additional details
track_id: Tracking ID for monitoring scanning progress
"""
status: Literal["scanning_started"] = Field(
@ -121,12 +123,14 @@ class ScanResponse(BaseModel):
message: Optional[str] = Field(
default=None, description="Additional details about the scanning operation"
)
track_id: str = Field(description="Tracking ID for monitoring scanning progress")
class Config:
json_schema_extra = {
"example": {
"status": "scanning_started",
"message": "Scanning process has been initiated in the background",
"track_id": "scan_20250729_170612_abc123",
}
}
@ -369,7 +373,7 @@ class DocStatusResponse(BaseModel):
chunks_count: Optional[int] = Field(
default=None, description="Number of chunks the document was split into"
)
error: Optional[str] = Field(
error_msg: Optional[str] = Field(
default=None, description="Error message if processing failed"
)
metadata: Optional[dict[str, Any]] = Field(
@ -791,15 +795,14 @@ async def pipeline_enqueue_file(
# Generate track_id if not provided
if track_id is None:
from lightrag.utils import generate_track_id
track_id = generate_track_id("unkown")
track_id = generate_track_id("upload")
returned_track_id = await rag.ainsert(
await rag.apipeline_enqueue_documents(
content, file_paths=file_path.name, track_id=track_id
)
logger.info(f"Successfully fetched and enqueued file: {file_path.name}")
return True, returned_track_id
return True, track_id
else:
logger.error(f"No content could be extracted from file: {file_path.name}")
@ -835,12 +838,15 @@ async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = No
logger.error(traceback.format_exc())
async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]):
async def pipeline_index_files(
rag: LightRAG, file_paths: List[Path], track_id: str = None
):
"""Index multiple files sequentially to avoid high CPU load
Args:
rag: LightRAG instance
file_paths: Paths to the files to index
track_id: Optional tracking ID to pass to all files
"""
if not file_paths:
return
@ -851,9 +857,10 @@ async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]):
collator = Collator()
sorted_file_paths = sorted(file_paths, key=lambda p: collator.sort_key(str(p)))
# Process files sequentially
# Process files sequentially with track_id
for file_path in sorted_file_paths:
if await pipeline_enqueue_file(rag, file_path):
success, _ = await pipeline_enqueue_file(rag, file_path, track_id)
if success:
enqueued = True
# Process the queue only if at least one file was successfully enqueued
@ -916,8 +923,16 @@ async def save_temp_file(input_dir: Path, file: UploadFile = File(...)) -> Path:
return temp_path
async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
"""Background task to scan and index documents"""
async def run_scanning_process(
rag: LightRAG, doc_manager: DocumentManager, track_id: str = None
):
"""Background task to scan and index documents
Args:
rag: LightRAG instance
doc_manager: DocumentManager instance
track_id: Optional tracking ID to pass to all scanned files
"""
try:
new_files = doc_manager.scan_directory_for_new_files()
total_files = len(new_files)
@ -926,8 +941,8 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
if not new_files:
return
# Process all files at once
await pipeline_index_files(rag, new_files)
# Process all files at once with track_id
await pipeline_index_files(rag, new_files, track_id)
logger.info(f"Scanning process completed: {total_files} files Processed.")
except Exception as e:
@ -1108,13 +1123,17 @@ def create_document_routes(
that fact.
Returns:
ScanResponse: A response object containing the scanning status
ScanResponse: A response object containing the scanning status and track_id
"""
# Start the scanning process in the background
background_tasks.add_task(run_scanning_process, rag, doc_manager)
# Generate track_id with "scan" prefix for scanning operation
track_id = generate_track_id("scan")
# Start the scanning process in the background with track_id
background_tasks.add_task(run_scanning_process, rag, doc_manager, track_id)
return ScanResponse(
status="scanning_started",
message="Scanning process has been initiated in the background",
track_id=track_id,
)
@router.post(
@ -1163,20 +1182,17 @@ def create_document_routes(
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
track_id = generate_track_id("upload")
# Add to background tasks and get track_id
success, track_id = await pipeline_enqueue_file(rag, file_path)
if success:
background_tasks.add_task(rag.apipeline_process_enqueue_documents)
return InsertResponse(
status="success",
message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.",
track_id=track_id,
)
else:
raise HTTPException(
status_code=500,
detail=f"Failed to enqueue file '{safe_filename}' for processing.",
)
background_tasks.add_task(pipeline_index_file, rag, file_path, track_id)
return InsertResponse(
status="success",
message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.",
track_id=track_id,
)
except Exception as e:
logger.error(f"Error /documents/upload: {file.filename}: {str(e)}")
logger.error(traceback.format_exc())
@ -1205,20 +1221,21 @@ def create_document_routes(
HTTPException: If an error occurs during text processing (500).
"""
try:
from lightrag.utils import generate_track_id
# Generate track_id for text insertion
track_id = generate_track_id("insert")
# Insert text and get track_id
returned_track_id = await rag.ainsert(
request.text, file_paths=request.file_source, track_id=track_id
background_tasks.add_task(
pipeline_index_texts,
rag,
[request.text],
file_sources=[request.file_source],
track_id=track_id,
)
return InsertResponse(
status="success",
message="Text successfully received. Processing will continue in background.",
track_id=returned_track_id,
track_id=track_id,
)
except Exception as e:
logger.error(f"Error /documents/text: {str(e)}")
@ -1250,20 +1267,21 @@ def create_document_routes(
HTTPException: If an error occurs during text processing (500).
"""
try:
from lightrag.utils import generate_track_id
# Generate track_id for texts insertion
track_id = generate_track_id("insert")
# Insert texts and get track_id
returned_track_id = await rag.ainsert(
request.texts, file_paths=request.file_sources, track_id=track_id
background_tasks.add_task(
pipeline_index_texts,
rag,
request.texts,
file_sources=request.file_sources,
track_id=track_id,
)
return InsertResponse(
status="success",
message="Texts successfully received. Processing will continue in background.",
track_id=returned_track_id,
track_id=track_id,
)
except Exception as e:
logger.error(f"Error /documents/texts: {str(e)}")
@ -1570,7 +1588,7 @@ def create_document_routes(
updated_at=format_datetime(doc_status.updated_at),
track_id=doc_status.track_id,
chunks_count=doc_status.chunks_count,
error=doc_status.error,
error_msg=doc_status.error_msg,
metadata=doc_status.metadata,
file_path=doc_status.file_path,
)
@ -1844,14 +1862,15 @@ def create_document_routes(
updated_at=format_datetime(doc_status.updated_at),
track_id=doc_status.track_id,
chunks_count=doc_status.chunks_count,
error=doc_status.error,
error_msg=doc_status.error_msg,
metadata=doc_status.metadata,
file_path=doc_status.file_path,
)
)
# Build status summary
status_key = doc_status.status.value
# Handle both DocStatus enum and string cases for robust deserialization
status_key = str(doc_status.status)
status_summary[status_key] = status_summary.get(status_key, 0) + 1
return TrackStatusResponse(

View file

@ -1,4 +1,4 @@
import{j as o,Y as td,O as fg,k as dg,u as ad,Z as mg,c as hg,l as gg,g as pg,S as yg,T as vg,n as bg,m as nd,o as Sg,p as Tg,$ as ud,a0 as id,a1 as cd,a2 as xg}from"./ui-vendor-CeCm8EER.js";import{d as Ag,h as Dg,r as N,u as sd,H as Eg,i as Ng,j as kf}from"./react-vendor-DEwriMA6.js";import{z as we,c as Ve,a8 as od,u as Ll,y as Gt,a9 as rd,aa as fd,I as us,B as Cn,D as Mg,i as zg,j as Cg,k as Og,l as _g,ab as jg,ac as Rg,ad as Ug,ae as Hg,af as Bl,ag as dd,ah as ss,ai as is,W as Bg,Y as Lg,Z as qg,_ as Gg,aj as Yg,ak as Xg,al as md,am as wg,an as Vg,ao as hd,ap as Qg,aq as gd,C as Kg,J as Zg,K as kg,d as Nn,ar as Jg,as as Fg,at as Pg}from"./feature-graph-ChwGSzXI.js";import{S as Jf,a as Ff,b as Pf,c as $f,d as ot,R as $g}from"./feature-retrieval-BebE__YQ.js";import{D as Wg}from"./feature-documents-D1A38Isy.js";import{i as cs}from"./utils-vendor-BysuhMZA.js";import"./graph-vendor-B-X5JegA.js";import"./mermaid-vendor-Bi3TzHdn.js";import"./markdown-vendor-DmIvJdn7.js";(function(){const y=document.createElement("link").relList;if(y&&y.supports&&y.supports("modulepreload"))return;for(const E of document.querySelectorAll('link[rel="modulepreload"]'))d(E);new MutationObserver(E=>{for(const _ of E)if(_.type==="childList")for(const H of _.addedNodes)H.tagName==="LINK"&&H.rel==="modulepreload"&&d(H)}).observe(document,{childList:!0,subtree:!0});function x(E){const _={};return E.integrity&&(_.integrity=E.integrity),E.referrerPolicy&&(_.referrerPolicy=E.referrerPolicy),E.crossOrigin==="use-credentials"?_.credentials="include":E.crossOrigin==="anonymous"?_.credentials="omit":_.credentials="same-origin",_}function d(E){if(E.ep)return;E.ep=!0;const _=x(E);fetch(E.href,_)}})();var ts={exports:{}},Mn={},as={exports:{}},ns={};/**
import{j as o,Y as td,O as fg,k as dg,u as ad,Z as mg,c as hg,l as gg,g as pg,S as yg,T as vg,n as bg,m as nd,o as Sg,p as Tg,$ as ud,a0 as id,a1 as cd,a2 as xg}from"./ui-vendor-CeCm8EER.js";import{d as Ag,h as Dg,r as N,u as sd,H as Eg,i as Ng,j as kf}from"./react-vendor-DEwriMA6.js";import{z as we,c as Ve,a8 as od,u as Ll,y as Gt,a9 as rd,aa as fd,I as us,B as Cn,D as Mg,i as zg,j as Cg,k as Og,l as _g,ab as jg,ac as Rg,ad as Ug,ae as Hg,af as Bl,ag as dd,ah as ss,ai as is,W as Bg,Y as Lg,Z as qg,_ as Gg,aj as Yg,ak as Xg,al as md,am as wg,an as Vg,ao as hd,ap as Qg,aq as gd,C as Kg,J as Zg,K as kg,d as Nn,ar as Jg,as as Fg,at as Pg}from"./feature-graph-ChwGSzXI.js";import{S as Jf,a as Ff,b as Pf,c as $f,d as ot,R as $g}from"./feature-retrieval-BebE__YQ.js";import{D as Wg}from"./feature-documents-DFri5mmM.js";import{i as cs}from"./utils-vendor-BysuhMZA.js";import"./graph-vendor-B-X5JegA.js";import"./mermaid-vendor-Bi3TzHdn.js";import"./markdown-vendor-DmIvJdn7.js";(function(){const y=document.createElement("link").relList;if(y&&y.supports&&y.supports("modulepreload"))return;for(const E of document.querySelectorAll('link[rel="modulepreload"]'))d(E);new MutationObserver(E=>{for(const _ of E)if(_.type==="childList")for(const H of _.addedNodes)H.tagName==="LINK"&&H.rel==="modulepreload"&&d(H)}).observe(document,{childList:!0,subtree:!0});function x(E){const _={};return E.integrity&&(_.integrity=E.integrity),E.referrerPolicy&&(_.referrerPolicy=E.referrerPolicy),E.crossOrigin==="use-credentials"?_.credentials="include":E.crossOrigin==="anonymous"?_.credentials="omit":_.credentials="same-origin",_}function d(E){if(E.ep)return;E.ep=!0;const _=x(E);fetch(E.href,_)}})();var ts={exports:{}},Mn={},as={exports:{}},ns={};/**
* @license React
* scheduler.production.js
*

View file

@ -8,7 +8,7 @@
<link rel="icon" type="image/png" href="favicon.png" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Lightrag</title>
<script type="module" crossorigin src="/webui/assets/index-B4xx55fn.js"></script>
<script type="module" crossorigin src="/webui/assets/index-BSksJLxv.js"></script>
<link rel="modulepreload" crossorigin href="/webui/assets/react-vendor-DEwriMA6.js">
<link rel="modulepreload" crossorigin href="/webui/assets/ui-vendor-CeCm8EER.js">
<link rel="modulepreload" crossorigin href="/webui/assets/graph-vendor-B-X5JegA.js">
@ -17,7 +17,7 @@
<link rel="modulepreload" crossorigin href="/webui/assets/mermaid-vendor-Bi3TzHdn.js">
<link rel="modulepreload" crossorigin href="/webui/assets/markdown-vendor-DmIvJdn7.js">
<link rel="modulepreload" crossorigin href="/webui/assets/feature-retrieval-BebE__YQ.js">
<link rel="modulepreload" crossorigin href="/webui/assets/feature-documents-D1A38Isy.js">
<link rel="modulepreload" crossorigin href="/webui/assets/feature-documents-DFri5mmM.js">
<link rel="stylesheet" crossorigin href="/webui/assets/feature-graph-BipNuM18.css">
<link rel="stylesheet" crossorigin href="/webui/assets/index-DwO2XWaU.css">
</head>

View file

@ -647,7 +647,7 @@ class DocProcessingStatus:
"""Number of chunks after splitting, used for processing"""
chunks_list: list[str] | None = field(default_factory=list)
"""List of chunk IDs associated with this document, used for deletion"""
error: str | None = None
error_msg: str | None = None
"""Error message if failed"""
metadata: dict[str, Any] = field(default_factory=dict)
"""Additional metadata"""

View file

@ -95,9 +95,16 @@ class JsonDocStatusStorage(DocStatusStorage):
try:
# Make a copy of the data to avoid modifying the original
data = v.copy()
# Remove deprecated content field if it exists
data.pop("content", None)
# If file_path is not in data, use document id as file path
if "file_path" not in data:
data["file_path"] = "no-file-path"
# Ensure new fields exist with default values
if "metadata" not in data:
data["metadata"] = {}
if "error_msg" not in data:
data["error_msg"] = None
result[k] = DocProcessingStatus(**data)
except KeyError as e:
logger.error(f"Missing required field for document {k}: {e}")
@ -115,9 +122,16 @@ class JsonDocStatusStorage(DocStatusStorage):
try:
# Make a copy of the data to avoid modifying the original
data = v.copy()
# Remove deprecated content field if it exists
data.pop("content", None)
# If file_path is not in data, use document id as file path
if "file_path" not in data:
data["file_path"] = "no-file-path"
# Ensure new fields exist with default values
if "metadata" not in data:
data["metadata"] = {}
if "error_msg" not in data:
data["error_msg"] = None
result[k] = DocProcessingStatus(**data)
except KeyError as e:
logger.error(f"Missing required field for document {k}: {e}")

View file

@ -381,9 +381,18 @@ class MongoDocStatusStorage(DocStatusStorage):
try:
# Make a copy of the data to avoid modifying the original
data = doc.copy()
# Remove deprecated content field if it exists
data.pop("content", None)
# Remove MongoDB _id field if it exists
data.pop("_id", None)
# If file_path is not in data, use document id as file path
if "file_path" not in data:
data["file_path"] = "no-file-path"
# Ensure new fields exist with default values
if "metadata" not in data:
data["metadata"] = {}
if "error_msg" not in data:
data["error_msg"] = None
processed_result[doc["_id"]] = DocProcessingStatus(**data)
except KeyError as e:
logger.error(f"Missing required field for document {doc['_id']}: {e}")
@ -401,9 +410,18 @@ class MongoDocStatusStorage(DocStatusStorage):
try:
# Make a copy of the data to avoid modifying the original
data = doc.copy()
# Remove deprecated content field if it exists
data.pop("content", None)
# Remove MongoDB _id field if it exists
data.pop("_id", None)
# If file_path is not in data, use document id as file path
if "file_path" not in data:
data["file_path"] = "no-file-path"
# Ensure new fields exist with default values
if "metadata" not in data:
data["metadata"] = {}
if "error_msg" not in data:
data["error_msg"] = None
processed_result[doc["_id"]] = DocProcessingStatus(**data)
except KeyError as e:
logger.error(f"Missing required field for document {doc['_id']}: {e}")
@ -442,7 +460,8 @@ class MongoDocStatusStorage(DocStatusStorage):
"""Create track_id index for better query performance"""
try:
# Check if index already exists
existing_indexes = await self._data.list_indexes().to_list(length=None)
indexes_cursor = await self._data.list_indexes()
existing_indexes = await indexes_cursor.to_list(length=None)
track_id_index_exists = any(
"track_id" in idx.get("key", {}) for idx in existing_indexes
)

View file

@ -624,6 +624,62 @@ class PostgreSQLDB:
f"Failed to add track_id column or index to LIGHTRAG_DOC_STATUS: {e}"
)
async def _migrate_doc_status_add_metadata_error_msg(self):
"""Add metadata and error_msg columns to LIGHTRAG_DOC_STATUS table if they don't exist"""
try:
# Check if metadata column exists
check_metadata_sql = """
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'lightrag_doc_status'
AND column_name = 'metadata'
"""
metadata_info = await self.query(check_metadata_sql)
if not metadata_info:
logger.info("Adding metadata column to LIGHTRAG_DOC_STATUS table")
add_metadata_sql = """
ALTER TABLE LIGHTRAG_DOC_STATUS
ADD COLUMN metadata JSONB NULL DEFAULT '{}'::jsonb
"""
await self.execute(add_metadata_sql)
logger.info(
"Successfully added metadata column to LIGHTRAG_DOC_STATUS table"
)
else:
logger.info(
"metadata column already exists in LIGHTRAG_DOC_STATUS table"
)
# Check if error_msg column exists
check_error_msg_sql = """
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'lightrag_doc_status'
AND column_name = 'error_msg'
"""
error_msg_info = await self.query(check_error_msg_sql)
if not error_msg_info:
logger.info("Adding error_msg column to LIGHTRAG_DOC_STATUS table")
add_error_msg_sql = """
ALTER TABLE LIGHTRAG_DOC_STATUS
ADD COLUMN error_msg TEXT NULL
"""
await self.execute(add_error_msg_sql)
logger.info(
"Successfully added error_msg column to LIGHTRAG_DOC_STATUS table"
)
else:
logger.info(
"error_msg column already exists in LIGHTRAG_DOC_STATUS table"
)
except Exception as e:
logger.warning(
f"Failed to add metadata/error_msg columns to LIGHTRAG_DOC_STATUS: {e}"
)
async def _migrate_field_lengths(self):
"""Migrate database field lengths: entity_name, source_id, target_id, and file_path"""
# Define the field changes needed
@ -850,6 +906,14 @@ class PostgreSQLDB:
f"PostgreSQL, Failed to migrate doc status track_id field: {e}"
)
# Migrate doc status to add metadata and error_msg fields if needed
try:
await self._migrate_doc_status_add_metadata_error_msg()
except Exception as e:
logger.error(
f"PostgreSQL, Failed to migrate doc status metadata/error_msg fields: {e}"
)
async def query(
self,
sql: str,
@ -1733,12 +1797,19 @@ class PGDocStatusStorage(DocStatusStorage):
except json.JSONDecodeError:
chunks_list = []
# Parse metadata JSON string back to dict
metadata = result[0].get("metadata", {})
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
metadata = {}
# Convert datetime objects to ISO format strings with timezone info
created_at = self._format_datetime_with_timezone(result[0]["created_at"])
updated_at = self._format_datetime_with_timezone(result[0]["updated_at"])
return dict(
# content=result[0]["content"],
content_length=result[0]["content_length"],
content_summary=result[0]["content_summary"],
status=result[0]["status"],
@ -1747,6 +1818,9 @@ class PGDocStatusStorage(DocStatusStorage):
updated_at=updated_at,
file_path=result[0]["file_path"],
chunks_list=chunks_list,
metadata=metadata,
error_msg=result[0].get("error_msg"),
track_id=result[0].get("track_id"),
)
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
@ -1772,13 +1846,20 @@ class PGDocStatusStorage(DocStatusStorage):
except json.JSONDecodeError:
chunks_list = []
# Parse metadata JSON string back to dict
metadata = row.get("metadata", {})
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
metadata = {}
# Convert datetime objects to ISO format strings with timezone info
created_at = self._format_datetime_with_timezone(row["created_at"])
updated_at = self._format_datetime_with_timezone(row["updated_at"])
processed_results.append(
{
# "content": row["content"],
"content_length": row["content_length"],
"content_summary": row["content_summary"],
"status": row["status"],
@ -1787,6 +1868,9 @@ class PGDocStatusStorage(DocStatusStorage):
"updated_at": updated_at,
"file_path": row["file_path"],
"chunks_list": chunks_list,
"metadata": metadata,
"error_msg": row.get("error_msg"),
"track_id": row.get("track_id"),
}
)
@ -1822,12 +1906,19 @@ class PGDocStatusStorage(DocStatusStorage):
except json.JSONDecodeError:
chunks_list = []
# Parse metadata JSON string back to dict
metadata = element.get("metadata", {})
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
metadata = {}
# Convert datetime objects to ISO format strings with timezone info
created_at = self._format_datetime_with_timezone(element["created_at"])
updated_at = self._format_datetime_with_timezone(element["updated_at"])
docs_by_status[element["id"]] = DocProcessingStatus(
# content=element["content"],
content_summary=element["content_summary"],
content_length=element["content_length"],
status=element["status"],
@ -1836,6 +1927,9 @@ class PGDocStatusStorage(DocStatusStorage):
chunks_count=element["chunks_count"],
file_path=element["file_path"],
chunks_list=chunks_list,
metadata=metadata,
error_msg=element.get("error_msg"),
track_id=element.get("track_id"),
)
return docs_by_status
@ -1858,12 +1952,19 @@ class PGDocStatusStorage(DocStatusStorage):
except json.JSONDecodeError:
chunks_list = []
# Parse metadata JSON string back to dict
metadata = element.get("metadata", {})
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
metadata = {}
# Convert datetime objects to ISO format strings with timezone info
created_at = self._format_datetime_with_timezone(element["created_at"])
updated_at = self._format_datetime_with_timezone(element["updated_at"])
docs_by_track_id[element["id"]] = DocProcessingStatus(
# content=element["content"],
content_summary=element["content_summary"],
content_length=element["content_length"],
status=element["status"],
@ -1873,6 +1974,8 @@ class PGDocStatusStorage(DocStatusStorage):
file_path=element["file_path"],
chunks_list=chunks_list,
track_id=element.get("track_id"),
metadata=metadata,
error_msg=element.get("error_msg"),
)
return docs_by_track_id
@ -1945,10 +2048,10 @@ class PGDocStatusStorage(DocStatusStorage):
logger.warning(f"Unable to parse datetime string: {dt_str}")
return None
# Modified SQL to include created_at, updated_at, chunks_list, and track_id in both INSERT and UPDATE operations
# Modified SQL to include created_at, updated_at, chunks_list, track_id, metadata, and error_msg in both INSERT and UPDATE operations
# All fields are updated from the input data in both INSERT and UPDATE cases
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status,file_path,chunks_list,track_id,created_at,updated_at)
values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status,file_path,chunks_list,track_id,metadata,error_msg,created_at,updated_at)
values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)
on conflict(id,workspace) do update set
content_summary = EXCLUDED.content_summary,
content_length = EXCLUDED.content_length,
@ -1957,6 +2060,8 @@ class PGDocStatusStorage(DocStatusStorage):
file_path = EXCLUDED.file_path,
chunks_list = EXCLUDED.chunks_list,
track_id = EXCLUDED.track_id,
metadata = EXCLUDED.metadata,
error_msg = EXCLUDED.error_msg,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at"""
for k, v in data.items():
@ -1964,13 +2069,12 @@ class PGDocStatusStorage(DocStatusStorage):
created_at = parse_datetime(v.get("created_at"))
updated_at = parse_datetime(v.get("updated_at"))
# chunks_count, chunks_list, and track_id are optional
# chunks_count, chunks_list, track_id, metadata, and error_msg are optional
await self.db.execute(
sql,
{
"workspace": self.db.workspace,
"id": k,
# "content": v["content"],
"content_summary": v["content_summary"],
"content_length": v["content_length"],
"chunks_count": v["chunks_count"] if "chunks_count" in v else -1,
@ -1978,6 +2082,10 @@ class PGDocStatusStorage(DocStatusStorage):
"file_path": v["file_path"],
"chunks_list": json.dumps(v.get("chunks_list", [])),
"track_id": v.get("track_id"), # Add track_id support
"metadata": json.dumps(
v.get("metadata", {})
), # Add metadata support
"error_msg": v.get("error_msg"), # Add error_msg support
"created_at": created_at, # Use the converted datetime object
"updated_at": updated_at, # Use the converted datetime object
},
@ -3467,12 +3575,10 @@ TABLES = {
CONSTRAINT LIGHTRAG_LLM_CACHE_PK PRIMARY KEY (workspace, mode, id)
)"""
},
# content column in LIGHTRAG_DOC_STATUS is deprecated, use the same column in LIGHTRAG_DOC_FULL instead
"LIGHTRAG_DOC_STATUS": {
"ddl": """CREATE TABLE LIGHTRAG_DOC_STATUS (
workspace varchar(255) NOT NULL,
id varchar(255) NOT NULL,
content TEXT NULL,
content_summary varchar(255) NULL,
content_length int4 NULL,
chunks_count int4 NULL,
@ -3480,6 +3586,8 @@ TABLES = {
file_path TEXT NULL,
chunks_list JSONB NULL DEFAULT '[]'::jsonb,
track_id varchar(255) NULL,
metadata JSONB NULL DEFAULT '{}'::jsonb,
error_msg TEXT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id)

View file

@ -786,9 +786,16 @@ class RedisDocStatusStorage(DocStatusStorage):
# Make a copy of the data to avoid modifying the original
data = doc_data.copy()
# Remove deprecated content field if it exists
data.pop("content", None)
# If file_path is not in data, use document id as file path
if "file_path" not in data:
data["file_path"] = "no-file-path"
# Ensure new fields exist with default values
if "metadata" not in data:
data["metadata"] = {}
if "error_msg" not in data:
data["error_msg"] = None
result[doc_id] = DocProcessingStatus(**data)
except (json.JSONDecodeError, KeyError) as e:
@ -835,9 +842,16 @@ class RedisDocStatusStorage(DocStatusStorage):
# Make a copy of the data to avoid modifying the original
data = doc_data.copy()
# Remove deprecated content field if it exists
data.pop("content", None)
# If file_path is not in data, use document id as file path
if "file_path" not in data:
data["file_path"] = "no-file-path"
# Ensure new fields exist with default values
if "metadata" not in data:
data["metadata"] = {}
if "error_msg" not in data:
data["error_msg"] = None
result[doc_id] = DocProcessingStatus(**data)
except (json.JSONDecodeError, KeyError) as e:

View file

@ -1109,6 +1109,9 @@ class LightRAG:
if not chunks:
logger.warning("No document chunks to process")
# Record processing start time
processing_start_time = int(time.time())
# Process document in two stages
# Stage 1: Process text chunks and docs (parallel execution)
doc_status_task = asyncio.create_task(
@ -1127,6 +1130,10 @@ class LightRAG:
timezone.utc
).isoformat(),
"file_path": file_path,
"track_id": status_doc.track_id, # Preserve existing track_id
"metadata": {
"processing_start_time": processing_start_time
},
}
}
)
@ -1184,12 +1191,15 @@ class LightRAG:
if self.llm_response_cache:
await self.llm_response_cache.index_done_callback()
# Record processing end time for failed case
processing_end_time = int(time.time())
# Update document status to failed
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.FAILED,
"error": str(e),
"error_msg": str(e),
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
@ -1197,6 +1207,11 @@ class LightRAG:
timezone.utc
).isoformat(),
"file_path": file_path,
"track_id": status_doc.track_id, # Preserve existing track_id
"metadata": {
"processing_start_time": processing_start_time,
"processing_end_time": processing_end_time,
},
}
}
)
@ -1220,6 +1235,9 @@ class LightRAG:
file_path=file_path,
)
# Record processing end time
processing_end_time = int(time.time())
await self.doc_status.upsert(
{
doc_id: {
@ -1235,6 +1253,11 @@ class LightRAG:
timezone.utc
).isoformat(),
"file_path": file_path,
"track_id": status_doc.track_id, # Preserve existing track_id
"metadata": {
"processing_start_time": processing_start_time,
"processing_end_time": processing_end_time,
},
}
}
)
@ -1268,17 +1291,25 @@ class LightRAG:
if self.llm_response_cache:
await self.llm_response_cache.index_done_callback()
# Record processing end time for failed case
processing_end_time = int(time.time())
# Update document status to failed
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.FAILED,
"error": str(e),
"error_msg": str(e),
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
"file_path": file_path,
"track_id": status_doc.track_id, # Preserve existing track_id
"metadata": {
"processing_start_time": processing_start_time,
"processing_end_time": processing_end_time,
},
}
}
)

View file

@ -143,6 +143,13 @@ export type QueryResponse = {
export type DocActionResponse = {
status: 'success' | 'partial_success' | 'failure' | 'duplicated'
message: string
track_id?: string
}
export type ScanResponse = {
status: 'scanning_started'
message: string
track_id: string
}
export type DeleteDocResponse = {
@ -160,8 +167,9 @@ export type DocStatusResponse = {
status: DocStatus
created_at: string
updated_at: string
track_id?: string
chunks_count?: number
error?: string
error_msg?: string
metadata?: Record<string, any>
file_path: string
}
@ -170,6 +178,13 @@ export type DocsStatusesResponse = {
statuses: Record<DocStatus, DocStatusResponse[]>
}
export type TrackStatusResponse = {
track_id: string
documents: DocStatusResponse[]
total_count: number
status_summary: Record<string, number>
}
export type AuthStatusResponse = {
auth_configured: boolean
access_token?: string
@ -293,7 +308,7 @@ export const getDocuments = async (): Promise<DocsStatusesResponse> => {
return response.data
}
export const scanNewDocuments = async (): Promise<{ status: string }> => {
export const scanNewDocuments = async (): Promise<ScanResponse> => {
const response = await axiosInstance.post('/documents/scan')
return response.data
}
@ -689,3 +704,13 @@ export const checkEntityNameExists = async (entityName: string): Promise<boolean
return false
}
}
/**
* Get the processing status of documents by tracking ID
* @param trackId The tracking ID returned from upload, text, or texts endpoints
* @returns Promise with the track status response containing documents and summary
*/
export const getTrackStatus = async (trackId: string): Promise<TrackStatusResponse> => {
const response = await axiosInstance.get(`/documents/track_status/${encodeURIComponent(trackId)}`)
return response.data
}

View file

@ -420,12 +420,13 @@ export default function DocumentManager() {
// Check if component is still mounted before starting the request
if (!isMountedRef.current) return;
const { status } = await scanNewDocuments();
const { status, message, track_id: _track_id } = await scanNewDocuments(); // eslint-disable-line @typescript-eslint/no-unused-vars
// Check again if component is still mounted after the request completes
if (!isMountedRef.current) return;
toast.message(status);
// Note: _track_id is available for future use (e.g., progress tracking)
toast.message(message || status);
} catch (err) {
// Only show error if component is still mounted
if (isMountedRef.current) {
@ -749,8 +750,8 @@ export default function DocumentManager() {
{doc.status === 'failed' && (
<span className="text-red-600">{t('documentPanel.documentManager.status.failed')}</span>
)}
{doc.error && (
<span className="ml-2 text-red-500" title={doc.error}>
{doc.error_msg && (
<span className="ml-2 text-red-500" title={doc.error_msg}>
</span>
)}