feat: add processing time tracking to document status with metadata field

- Add metadata field to DocProcessingStatus with start_time and end_time tracking
- Record processing timestamps using Unix time format (seconds precision)
- Update all storage backends (JSON, MongoDB, Redis, PostgreSQL) for new field support
- Maintain backward compatibility with default values for existing data
- Add error_msg field for better error tracking during document processing
This commit is contained in:
yangdx 2025-07-29 23:42:33 +08:00
parent 7206c07468
commit 93afa7d8a7
5 changed files with 175 additions and 6 deletions

View file

@ -98,6 +98,11 @@ class JsonDocStatusStorage(DocStatusStorage):
# If file_path is not in data, use document id as file path
if "file_path" not in data:
data["file_path"] = "no-file-path"
# Ensure new fields exist with default values
if "metadata" not in data:
data["metadata"] = {}
if "error_msg" not in data:
data["error_msg"] = None
result[k] = DocProcessingStatus(**data)
except KeyError as e:
logger.error(f"Missing required field for document {k}: {e}")
@ -118,6 +123,11 @@ class JsonDocStatusStorage(DocStatusStorage):
# If file_path is not in data, use document id as file path
if "file_path" not in data:
data["file_path"] = "no-file-path"
# Ensure new fields exist with default values
if "metadata" not in data:
data["metadata"] = {}
if "error_msg" not in data:
data["error_msg"] = None
result[k] = DocProcessingStatus(**data)
except KeyError as e:
logger.error(f"Missing required field for document {k}: {e}")

View file

@ -384,6 +384,11 @@ class MongoDocStatusStorage(DocStatusStorage):
# If file_path is not in data, use document id as file path
if "file_path" not in data:
data["file_path"] = "no-file-path"
# Ensure new fields exist with default values
if "metadata" not in data:
data["metadata"] = {}
if "error_msg" not in data:
data["error_msg"] = None
processed_result[doc["_id"]] = DocProcessingStatus(**data)
except KeyError as e:
logger.error(f"Missing required field for document {doc['_id']}: {e}")
@ -404,6 +409,11 @@ class MongoDocStatusStorage(DocStatusStorage):
# If file_path is not in data, use document id as file path
if "file_path" not in data:
data["file_path"] = "no-file-path"
# Ensure new fields exist with default values
if "metadata" not in data:
data["metadata"] = {}
if "error_msg" not in data:
data["error_msg"] = None
processed_result[doc["_id"]] = DocProcessingStatus(**data)
except KeyError as e:
logger.error(f"Missing required field for document {doc['_id']}: {e}")

View file

@ -624,6 +624,62 @@ class PostgreSQLDB:
f"Failed to add track_id column or index to LIGHTRAG_DOC_STATUS: {e}"
)
async def _migrate_doc_status_add_metadata_error_msg(self):
"""Add metadata and error_msg columns to LIGHTRAG_DOC_STATUS table if they don't exist"""
try:
# Check if metadata column exists
check_metadata_sql = """
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'lightrag_doc_status'
AND column_name = 'metadata'
"""
metadata_info = await self.query(check_metadata_sql)
if not metadata_info:
logger.info("Adding metadata column to LIGHTRAG_DOC_STATUS table")
add_metadata_sql = """
ALTER TABLE LIGHTRAG_DOC_STATUS
ADD COLUMN metadata JSONB NULL DEFAULT '{}'::jsonb
"""
await self.execute(add_metadata_sql)
logger.info(
"Successfully added metadata column to LIGHTRAG_DOC_STATUS table"
)
else:
logger.info(
"metadata column already exists in LIGHTRAG_DOC_STATUS table"
)
# Check if error_msg column exists
check_error_msg_sql = """
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'lightrag_doc_status'
AND column_name = 'error_msg'
"""
error_msg_info = await self.query(check_error_msg_sql)
if not error_msg_info:
logger.info("Adding error_msg column to LIGHTRAG_DOC_STATUS table")
add_error_msg_sql = """
ALTER TABLE LIGHTRAG_DOC_STATUS
ADD COLUMN error_msg TEXT NULL
"""
await self.execute(add_error_msg_sql)
logger.info(
"Successfully added error_msg column to LIGHTRAG_DOC_STATUS table"
)
else:
logger.info(
"error_msg column already exists in LIGHTRAG_DOC_STATUS table"
)
except Exception as e:
logger.warning(
f"Failed to add metadata/error_msg columns to LIGHTRAG_DOC_STATUS: {e}"
)
async def _migrate_field_lengths(self):
"""Migrate database field lengths: entity_name, source_id, target_id, and file_path"""
# Define the field changes needed
@ -850,6 +906,14 @@ class PostgreSQLDB:
f"PostgreSQL, Failed to migrate doc status track_id field: {e}"
)
# Migrate doc status to add metadata and error_msg fields if needed
try:
await self._migrate_doc_status_add_metadata_error_msg()
except Exception as e:
logger.error(
f"PostgreSQL, Failed to migrate doc status metadata/error_msg fields: {e}"
)
async def query(
self,
sql: str,
@ -1733,6 +1797,14 @@ class PGDocStatusStorage(DocStatusStorage):
except json.JSONDecodeError:
chunks_list = []
# Parse metadata JSON string back to dict
metadata = result[0].get("metadata", {})
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
metadata = {}
# Convert datetime objects to ISO format strings with timezone info
created_at = self._format_datetime_with_timezone(result[0]["created_at"])
updated_at = self._format_datetime_with_timezone(result[0]["updated_at"])
@ -1746,6 +1818,8 @@ class PGDocStatusStorage(DocStatusStorage):
updated_at=updated_at,
file_path=result[0]["file_path"],
chunks_list=chunks_list,
metadata=metadata,
error_msg=result[0].get("error_msg"),
)
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
@ -1771,6 +1845,14 @@ class PGDocStatusStorage(DocStatusStorage):
except json.JSONDecodeError:
chunks_list = []
# Parse metadata JSON string back to dict
metadata = row.get("metadata", {})
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
metadata = {}
# Convert datetime objects to ISO format strings with timezone info
created_at = self._format_datetime_with_timezone(row["created_at"])
updated_at = self._format_datetime_with_timezone(row["updated_at"])
@ -1785,6 +1867,8 @@ class PGDocStatusStorage(DocStatusStorage):
"updated_at": updated_at,
"file_path": row["file_path"],
"chunks_list": chunks_list,
"metadata": metadata,
"error_msg": row.get("error_msg"),
}
)
@ -1820,6 +1904,14 @@ class PGDocStatusStorage(DocStatusStorage):
except json.JSONDecodeError:
chunks_list = []
# Parse metadata JSON string back to dict
metadata = element.get("metadata", {})
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
metadata = {}
# Convert datetime objects to ISO format strings with timezone info
created_at = self._format_datetime_with_timezone(element["created_at"])
updated_at = self._format_datetime_with_timezone(element["updated_at"])
@ -1833,6 +1925,8 @@ class PGDocStatusStorage(DocStatusStorage):
chunks_count=element["chunks_count"],
file_path=element["file_path"],
chunks_list=chunks_list,
metadata=metadata,
error_msg=element.get("error_msg"),
)
return docs_by_status
@ -1855,6 +1949,14 @@ class PGDocStatusStorage(DocStatusStorage):
except json.JSONDecodeError:
chunks_list = []
# Parse metadata JSON string back to dict
metadata = element.get("metadata", {})
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
metadata = {}
# Convert datetime objects to ISO format strings with timezone info
created_at = self._format_datetime_with_timezone(element["created_at"])
updated_at = self._format_datetime_with_timezone(element["updated_at"])
@ -1869,6 +1971,8 @@ class PGDocStatusStorage(DocStatusStorage):
file_path=element["file_path"],
chunks_list=chunks_list,
track_id=element.get("track_id"),
metadata=metadata,
error_msg=element.get("error_msg"),
)
return docs_by_track_id
@ -1941,10 +2045,10 @@ class PGDocStatusStorage(DocStatusStorage):
logger.warning(f"Unable to parse datetime string: {dt_str}")
return None
# Modified SQL to include created_at, updated_at, chunks_list, and track_id in both INSERT and UPDATE operations
# Modified SQL to include created_at, updated_at, chunks_list, track_id, metadata, and error_msg in both INSERT and UPDATE operations
# All fields are updated from the input data in both INSERT and UPDATE cases
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status,file_path,chunks_list,track_id,created_at,updated_at)
values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status,file_path,chunks_list,track_id,metadata,error_msg,created_at,updated_at)
values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)
on conflict(id,workspace) do update set
content_summary = EXCLUDED.content_summary,
content_length = EXCLUDED.content_length,
@ -1953,6 +2057,8 @@ class PGDocStatusStorage(DocStatusStorage):
file_path = EXCLUDED.file_path,
chunks_list = EXCLUDED.chunks_list,
track_id = EXCLUDED.track_id,
metadata = EXCLUDED.metadata,
error_msg = EXCLUDED.error_msg,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at"""
for k, v in data.items():
@ -1960,7 +2066,7 @@ class PGDocStatusStorage(DocStatusStorage):
created_at = parse_datetime(v.get("created_at"))
updated_at = parse_datetime(v.get("updated_at"))
# chunks_count, chunks_list, and track_id are optional
# chunks_count, chunks_list, track_id, metadata, and error_msg are optional
await self.db.execute(
sql,
{
@ -1973,6 +2079,10 @@ class PGDocStatusStorage(DocStatusStorage):
"file_path": v["file_path"],
"chunks_list": json.dumps(v.get("chunks_list", [])),
"track_id": v.get("track_id"), # Add track_id support
"metadata": json.dumps(
v.get("metadata", {})
), # Add metadata support
"error_msg": v.get("error_msg"), # Add error_msg support
"created_at": created_at, # Use the converted datetime object
"updated_at": updated_at, # Use the converted datetime object
},
@ -3473,6 +3583,8 @@ TABLES = {
file_path TEXT NULL,
chunks_list JSONB NULL DEFAULT '[]'::jsonb,
track_id varchar(255) NULL,
metadata JSONB NULL DEFAULT '{}'::jsonb,
error_msg TEXT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id)

View file

@ -789,6 +789,11 @@ class RedisDocStatusStorage(DocStatusStorage):
# If file_path is not in data, use document id as file path
if "file_path" not in data:
data["file_path"] = "no-file-path"
# Ensure new fields exist with default values
if "metadata" not in data:
data["metadata"] = {}
if "error_msg" not in data:
data["error_msg"] = None
result[doc_id] = DocProcessingStatus(**data)
except (json.JSONDecodeError, KeyError) as e:
@ -838,6 +843,11 @@ class RedisDocStatusStorage(DocStatusStorage):
# If file_path is not in data, use document id as file path
if "file_path" not in data:
data["file_path"] = "no-file-path"
# Ensure new fields exist with default values
if "metadata" not in data:
data["metadata"] = {}
if "error_msg" not in data:
data["error_msg"] = None
result[doc_id] = DocProcessingStatus(**data)
except (json.JSONDecodeError, KeyError) as e:

View file

@ -1109,6 +1109,9 @@ class LightRAG:
if not chunks:
logger.warning("No document chunks to process")
# Record processing start time
processing_start_time = int(time.time())
# Process document in two stages
# Stage 1: Process text chunks and docs (parallel execution)
doc_status_task = asyncio.create_task(
@ -1127,6 +1130,9 @@ class LightRAG:
timezone.utc
).isoformat(),
"file_path": file_path,
"metadata": {
"processing_start_time": processing_start_time
},
}
}
)
@ -1184,12 +1190,15 @@ class LightRAG:
if self.llm_response_cache:
await self.llm_response_cache.index_done_callback()
# Record processing end time for failed case
processing_end_time = int(time.time())
# Update document status to failed
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.FAILED,
"error": str(e),
"error_msg": str(e),
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
@ -1197,6 +1206,10 @@ class LightRAG:
timezone.utc
).isoformat(),
"file_path": file_path,
"metadata": {
"processing_start_time": processing_start_time,
"processing_end_time": processing_end_time,
},
}
}
)
@ -1220,6 +1233,9 @@ class LightRAG:
file_path=file_path,
)
# Record processing end time
processing_end_time = int(time.time())
await self.doc_status.upsert(
{
doc_id: {
@ -1235,6 +1251,10 @@ class LightRAG:
timezone.utc
).isoformat(),
"file_path": file_path,
"metadata": {
"processing_start_time": processing_start_time,
"processing_end_time": processing_end_time,
},
}
}
)
@ -1268,17 +1288,24 @@ class LightRAG:
if self.llm_response_cache:
await self.llm_response_cache.index_done_callback()
# Record processing end time for failed case
processing_end_time = int(time.time())
# Update document status to failed
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.FAILED,
"error": str(e),
"error_msg": str(e),
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
"file_path": file_path,
"metadata": {
"processing_start_time": processing_start_time,
"processing_end_time": processing_end_time,
},
}
}
)