From 93afa7d8a7aed259ac3b886dda1c519f74d4fb55 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 29 Jul 2025 23:42:33 +0800 Subject: [PATCH] feat: add processing time tracking to document status with metadata field - Add metadata field to DocProcessingStatus with start_time and end_time tracking - Record processing timestamps using Unix time format (seconds precision) - Update all storage backends (JSON, MongoDB, Redis, PostgreSQL) for new field support - Maintain backward compatibility with default values for existing data - Add error_msg field for better error tracking during document processing --- lightrag/kg/json_doc_status_impl.py | 10 +++ lightrag/kg/mongo_impl.py | 10 +++ lightrag/kg/postgres_impl.py | 120 +++++++++++++++++++++++++++- lightrag/kg/redis_impl.py | 10 +++ lightrag/lightrag.py | 31 ++++++- 5 files changed, 175 insertions(+), 6 deletions(-) diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index a3695891..aecb8713 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -98,6 +98,11 @@ class JsonDocStatusStorage(DocStatusStorage): # If file_path is not in data, use document id as file path if "file_path" not in data: data["file_path"] = "no-file-path" + # Ensure new fields exist with default values + if "metadata" not in data: + data["metadata"] = {} + if "error_msg" not in data: + data["error_msg"] = None result[k] = DocProcessingStatus(**data) except KeyError as e: logger.error(f"Missing required field for document {k}: {e}") @@ -118,6 +123,11 @@ class JsonDocStatusStorage(DocStatusStorage): # If file_path is not in data, use document id as file path if "file_path" not in data: data["file_path"] = "no-file-path" + # Ensure new fields exist with default values + if "metadata" not in data: + data["metadata"] = {} + if "error_msg" not in data: + data["error_msg"] = None result[k] = DocProcessingStatus(**data) except KeyError as e: logger.error(f"Missing required field for document {k}: {e}") diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index 84f78f77..f15576a9 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -384,6 +384,11 @@ class MongoDocStatusStorage(DocStatusStorage): # If file_path is not in data, use document id as file path if "file_path" not in data: data["file_path"] = "no-file-path" + # Ensure new fields exist with default values + if "metadata" not in data: + data["metadata"] = {} + if "error_msg" not in data: + data["error_msg"] = None processed_result[doc["_id"]] = DocProcessingStatus(**data) except KeyError as e: logger.error(f"Missing required field for document {doc['_id']}: {e}") @@ -404,6 +409,11 @@ class MongoDocStatusStorage(DocStatusStorage): # If file_path is not in data, use document id as file path if "file_path" not in data: data["file_path"] = "no-file-path" + # Ensure new fields exist with default values + if "metadata" not in data: + data["metadata"] = {} + if "error_msg" not in data: + data["error_msg"] = None processed_result[doc["_id"]] = DocProcessingStatus(**data) except KeyError as e: logger.error(f"Missing required field for document {doc['_id']}: {e}") diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 32b5dc5b..0d2e7da3 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -624,6 +624,62 @@ class PostgreSQLDB: f"Failed to add track_id column or index to LIGHTRAG_DOC_STATUS: {e}" ) + async def _migrate_doc_status_add_metadata_error_msg(self): + """Add metadata and error_msg columns to LIGHTRAG_DOC_STATUS table if they don't exist""" + try: + # Check if metadata column exists + check_metadata_sql = """ + SELECT column_name + FROM information_schema.columns + WHERE table_name = 'lightrag_doc_status' + AND column_name = 'metadata' + """ + + metadata_info = await self.query(check_metadata_sql) + if not metadata_info: + logger.info("Adding metadata column to LIGHTRAG_DOC_STATUS table") + add_metadata_sql = """ + ALTER TABLE LIGHTRAG_DOC_STATUS + ADD COLUMN metadata JSONB NULL DEFAULT '{}'::jsonb + """ + await self.execute(add_metadata_sql) + logger.info( + "Successfully added metadata column to LIGHTRAG_DOC_STATUS table" + ) + else: + logger.info( + "metadata column already exists in LIGHTRAG_DOC_STATUS table" + ) + + # Check if error_msg column exists + check_error_msg_sql = """ + SELECT column_name + FROM information_schema.columns + WHERE table_name = 'lightrag_doc_status' + AND column_name = 'error_msg' + """ + + error_msg_info = await self.query(check_error_msg_sql) + if not error_msg_info: + logger.info("Adding error_msg column to LIGHTRAG_DOC_STATUS table") + add_error_msg_sql = """ + ALTER TABLE LIGHTRAG_DOC_STATUS + ADD COLUMN error_msg TEXT NULL + """ + await self.execute(add_error_msg_sql) + logger.info( + "Successfully added error_msg column to LIGHTRAG_DOC_STATUS table" + ) + else: + logger.info( + "error_msg column already exists in LIGHTRAG_DOC_STATUS table" + ) + + except Exception as e: + logger.warning( + f"Failed to add metadata/error_msg columns to LIGHTRAG_DOC_STATUS: {e}" + ) + async def _migrate_field_lengths(self): """Migrate database field lengths: entity_name, source_id, target_id, and file_path""" # Define the field changes needed @@ -850,6 +906,14 @@ class PostgreSQLDB: f"PostgreSQL, Failed to migrate doc status track_id field: {e}" ) + # Migrate doc status to add metadata and error_msg fields if needed + try: + await self._migrate_doc_status_add_metadata_error_msg() + except Exception as e: + logger.error( + f"PostgreSQL, Failed to migrate doc status metadata/error_msg fields: {e}" + ) + async def query( self, sql: str, @@ -1733,6 +1797,14 @@ class PGDocStatusStorage(DocStatusStorage): except json.JSONDecodeError: chunks_list = [] + # Parse metadata JSON string back to dict + metadata = result[0].get("metadata", {}) + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except json.JSONDecodeError: + metadata = {} + # Convert datetime objects to ISO format strings with timezone info created_at = self._format_datetime_with_timezone(result[0]["created_at"]) updated_at = self._format_datetime_with_timezone(result[0]["updated_at"]) @@ -1746,6 +1818,8 @@ class PGDocStatusStorage(DocStatusStorage): updated_at=updated_at, file_path=result[0]["file_path"], chunks_list=chunks_list, + metadata=metadata, + error_msg=result[0].get("error_msg"), ) async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: @@ -1771,6 +1845,14 @@ class PGDocStatusStorage(DocStatusStorage): except json.JSONDecodeError: chunks_list = [] + # Parse metadata JSON string back to dict + metadata = row.get("metadata", {}) + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except json.JSONDecodeError: + metadata = {} + # Convert datetime objects to ISO format strings with timezone info created_at = self._format_datetime_with_timezone(row["created_at"]) updated_at = self._format_datetime_with_timezone(row["updated_at"]) @@ -1785,6 +1867,8 @@ class PGDocStatusStorage(DocStatusStorage): "updated_at": updated_at, "file_path": row["file_path"], "chunks_list": chunks_list, + "metadata": metadata, + "error_msg": row.get("error_msg"), } ) @@ -1820,6 +1904,14 @@ class PGDocStatusStorage(DocStatusStorage): except json.JSONDecodeError: chunks_list = [] + # Parse metadata JSON string back to dict + metadata = element.get("metadata", {}) + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except json.JSONDecodeError: + metadata = {} + # Convert datetime objects to ISO format strings with timezone info created_at = self._format_datetime_with_timezone(element["created_at"]) updated_at = self._format_datetime_with_timezone(element["updated_at"]) @@ -1833,6 +1925,8 @@ class PGDocStatusStorage(DocStatusStorage): chunks_count=element["chunks_count"], file_path=element["file_path"], chunks_list=chunks_list, + metadata=metadata, + error_msg=element.get("error_msg"), ) return docs_by_status @@ -1855,6 +1949,14 @@ class PGDocStatusStorage(DocStatusStorage): except json.JSONDecodeError: chunks_list = [] + # Parse metadata JSON string back to dict + metadata = element.get("metadata", {}) + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except json.JSONDecodeError: + metadata = {} + # Convert datetime objects to ISO format strings with timezone info created_at = self._format_datetime_with_timezone(element["created_at"]) updated_at = self._format_datetime_with_timezone(element["updated_at"]) @@ -1869,6 +1971,8 @@ class PGDocStatusStorage(DocStatusStorage): file_path=element["file_path"], chunks_list=chunks_list, track_id=element.get("track_id"), + metadata=metadata, + error_msg=element.get("error_msg"), ) return docs_by_track_id @@ -1941,10 +2045,10 @@ class PGDocStatusStorage(DocStatusStorage): logger.warning(f"Unable to parse datetime string: {dt_str}") return None - # Modified SQL to include created_at, updated_at, chunks_list, and track_id in both INSERT and UPDATE operations + # Modified SQL to include created_at, updated_at, chunks_list, track_id, metadata, and error_msg in both INSERT and UPDATE operations # All fields are updated from the input data in both INSERT and UPDATE cases - sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status,file_path,chunks_list,track_id,created_at,updated_at) - values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) + sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status,file_path,chunks_list,track_id,metadata,error_msg,created_at,updated_at) + values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) on conflict(id,workspace) do update set content_summary = EXCLUDED.content_summary, content_length = EXCLUDED.content_length, @@ -1953,6 +2057,8 @@ class PGDocStatusStorage(DocStatusStorage): file_path = EXCLUDED.file_path, chunks_list = EXCLUDED.chunks_list, track_id = EXCLUDED.track_id, + metadata = EXCLUDED.metadata, + error_msg = EXCLUDED.error_msg, created_at = EXCLUDED.created_at, updated_at = EXCLUDED.updated_at""" for k, v in data.items(): @@ -1960,7 +2066,7 @@ class PGDocStatusStorage(DocStatusStorage): created_at = parse_datetime(v.get("created_at")) updated_at = parse_datetime(v.get("updated_at")) - # chunks_count, chunks_list, and track_id are optional + # chunks_count, chunks_list, track_id, metadata, and error_msg are optional await self.db.execute( sql, { @@ -1973,6 +2079,10 @@ class PGDocStatusStorage(DocStatusStorage): "file_path": v["file_path"], "chunks_list": json.dumps(v.get("chunks_list", [])), "track_id": v.get("track_id"), # Add track_id support + "metadata": json.dumps( + v.get("metadata", {}) + ), # Add metadata support + "error_msg": v.get("error_msg"), # Add error_msg support "created_at": created_at, # Use the converted datetime object "updated_at": updated_at, # Use the converted datetime object }, @@ -3473,6 +3583,8 @@ TABLES = { file_path TEXT NULL, chunks_list JSONB NULL DEFAULT '[]'::jsonb, track_id varchar(255) NULL, + metadata JSONB NULL DEFAULT '{}'::jsonb, + error_msg TEXT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id) diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index ca9d51f5..e5a59c80 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -789,6 +789,11 @@ class RedisDocStatusStorage(DocStatusStorage): # If file_path is not in data, use document id as file path if "file_path" not in data: data["file_path"] = "no-file-path" + # Ensure new fields exist with default values + if "metadata" not in data: + data["metadata"] = {} + if "error_msg" not in data: + data["error_msg"] = None result[doc_id] = DocProcessingStatus(**data) except (json.JSONDecodeError, KeyError) as e: @@ -838,6 +843,11 @@ class RedisDocStatusStorage(DocStatusStorage): # If file_path is not in data, use document id as file path if "file_path" not in data: data["file_path"] = "no-file-path" + # Ensure new fields exist with default values + if "metadata" not in data: + data["metadata"] = {} + if "error_msg" not in data: + data["error_msg"] = None result[doc_id] = DocProcessingStatus(**data) except (json.JSONDecodeError, KeyError) as e: diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index df4e92c1..f610c56c 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1109,6 +1109,9 @@ class LightRAG: if not chunks: logger.warning("No document chunks to process") + # Record processing start time + processing_start_time = int(time.time()) + # Process document in two stages # Stage 1: Process text chunks and docs (parallel execution) doc_status_task = asyncio.create_task( @@ -1127,6 +1130,9 @@ class LightRAG: timezone.utc ).isoformat(), "file_path": file_path, + "metadata": { + "processing_start_time": processing_start_time + }, } } ) @@ -1184,12 +1190,15 @@ class LightRAG: if self.llm_response_cache: await self.llm_response_cache.index_done_callback() + # Record processing end time for failed case + processing_end_time = int(time.time()) + # Update document status to failed await self.doc_status.upsert( { doc_id: { "status": DocStatus.FAILED, - "error": str(e), + "error_msg": str(e), "content_summary": status_doc.content_summary, "content_length": status_doc.content_length, "created_at": status_doc.created_at, @@ -1197,6 +1206,10 @@ class LightRAG: timezone.utc ).isoformat(), "file_path": file_path, + "metadata": { + "processing_start_time": processing_start_time, + "processing_end_time": processing_end_time, + }, } } ) @@ -1220,6 +1233,9 @@ class LightRAG: file_path=file_path, ) + # Record processing end time + processing_end_time = int(time.time()) + await self.doc_status.upsert( { doc_id: { @@ -1235,6 +1251,10 @@ class LightRAG: timezone.utc ).isoformat(), "file_path": file_path, + "metadata": { + "processing_start_time": processing_start_time, + "processing_end_time": processing_end_time, + }, } } ) @@ -1268,17 +1288,24 @@ class LightRAG: if self.llm_response_cache: await self.llm_response_cache.index_done_callback() + # Record processing end time for failed case + processing_end_time = int(time.time()) + # Update document status to failed await self.doc_status.upsert( { doc_id: { "status": DocStatus.FAILED, - "error": str(e), + "error_msg": str(e), "content_summary": status_doc.content_summary, "content_length": status_doc.content_length, "created_at": status_doc.created_at, "updated_at": datetime.now().isoformat(), "file_path": file_path, + "metadata": { + "processing_start_time": processing_start_time, + "processing_end_time": processing_end_time, + }, } } )