Merge pull request #1881 from danielaskdd/remove-content-in-doc-status

Refact remove content in doc status to improve performance
This commit is contained in:
Daniel.y 2025-07-30 09:15:06 +08:00 committed by GitHub
commit 6fa5a6f634
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 39 additions and 46 deletions

View file

@ -629,8 +629,6 @@ class DocStatus(str, Enum):
class DocProcessingStatus: class DocProcessingStatus:
"""Document processing status data structure""" """Document processing status data structure"""
content: str
"""Original content of the document"""
content_summary: str content_summary: str
"""First 100 chars of document content, used for preview""" """First 100 chars of document content, used for preview"""
content_length: int content_length: int

View file

@ -95,9 +95,6 @@ class JsonDocStatusStorage(DocStatusStorage):
try: try:
# Make a copy of the data to avoid modifying the original # Make a copy of the data to avoid modifying the original
data = v.copy() data = v.copy()
# If content is missing, use content_summary as content
if "content" not in data and "content_summary" in data:
data["content"] = data["content_summary"]
# If file_path is not in data, use document id as file path # If file_path is not in data, use document id as file path
if "file_path" not in data: if "file_path" not in data:
data["file_path"] = "no-file-path" data["file_path"] = "no-file-path"

View file

@ -372,20 +372,19 @@ class MongoDocStatusStorage(DocStatusStorage):
"""Get all documents with a specific status""" """Get all documents with a specific status"""
cursor = self._data.find({"status": status.value}) cursor = self._data.find({"status": status.value})
result = await cursor.to_list() result = await cursor.to_list()
return { processed_result = {}
doc["_id"]: DocProcessingStatus( for doc in result:
content=doc["content"], try:
content_summary=doc.get("content_summary"), # Make a copy of the data to avoid modifying the original
content_length=doc["content_length"], data = doc.copy()
status=doc["status"], # If file_path is not in data, use document id as file path
created_at=doc.get("created_at"), if "file_path" not in data:
updated_at=doc.get("updated_at"), data["file_path"] = "no-file-path"
chunks_count=doc.get("chunks_count", -1), processed_result[doc["_id"]] = DocProcessingStatus(**data)
file_path=doc.get("file_path", doc["_id"]), except KeyError as e:
chunks_list=doc.get("chunks_list", []), logger.error(f"Missing required field for document {doc['_id']}: {e}")
) continue
for doc in result return processed_result
}
async def index_done_callback(self) -> None: async def index_done_callback(self) -> None:
# Mongo handles persistence automatically # Mongo handles persistence automatically

View file

@ -1673,7 +1673,7 @@ class PGDocStatusStorage(DocStatusStorage):
updated_at = self._format_datetime_with_timezone(result[0]["updated_at"]) updated_at = self._format_datetime_with_timezone(result[0]["updated_at"])
return dict( return dict(
content=result[0]["content"], # content=result[0]["content"],
content_length=result[0]["content_length"], content_length=result[0]["content_length"],
content_summary=result[0]["content_summary"], content_summary=result[0]["content_summary"],
status=result[0]["status"], status=result[0]["status"],
@ -1713,7 +1713,7 @@ class PGDocStatusStorage(DocStatusStorage):
processed_results.append( processed_results.append(
{ {
"content": row["content"], # "content": row["content"],
"content_length": row["content_length"], "content_length": row["content_length"],
"content_summary": row["content_summary"], "content_summary": row["content_summary"],
"status": row["status"], "status": row["status"],
@ -1762,7 +1762,7 @@ class PGDocStatusStorage(DocStatusStorage):
updated_at = self._format_datetime_with_timezone(element["updated_at"]) updated_at = self._format_datetime_with_timezone(element["updated_at"])
docs_by_status[element["id"]] = DocProcessingStatus( docs_by_status[element["id"]] = DocProcessingStatus(
content=element["content"], # content=element["content"],
content_summary=element["content_summary"], content_summary=element["content_summary"],
content_length=element["content_length"], content_length=element["content_length"],
status=element["status"], status=element["status"],
@ -1845,10 +1845,9 @@ class PGDocStatusStorage(DocStatusStorage):
# Modified SQL to include created_at, updated_at, and chunks_list in both INSERT and UPDATE operations # Modified SQL to include created_at, updated_at, and chunks_list in both INSERT and UPDATE operations
# All fields are updated from the input data in both INSERT and UPDATE cases # All fields are updated from the input data in both INSERT and UPDATE cases
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status,file_path,chunks_list,created_at,updated_at) sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status,file_path,chunks_list,created_at,updated_at)
values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
on conflict(id,workspace) do update set on conflict(id,workspace) do update set
content = EXCLUDED.content,
content_summary = EXCLUDED.content_summary, content_summary = EXCLUDED.content_summary,
content_length = EXCLUDED.content_length, content_length = EXCLUDED.content_length,
chunks_count = EXCLUDED.chunks_count, chunks_count = EXCLUDED.chunks_count,
@ -1868,7 +1867,7 @@ class PGDocStatusStorage(DocStatusStorage):
{ {
"workspace": self.db.workspace, "workspace": self.db.workspace,
"id": k, "id": k,
"content": v["content"], # "content": v["content"],
"content_summary": v["content_summary"], "content_summary": v["content_summary"],
"content_length": v["content_length"], "content_length": v["content_length"],
"chunks_count": v["chunks_count"] if "chunks_count" in v else -1, "chunks_count": v["chunks_count"] if "chunks_count" in v else -1,
@ -3364,6 +3363,7 @@ TABLES = {
CONSTRAINT LIGHTRAG_LLM_CACHE_PK PRIMARY KEY (workspace, mode, id) CONSTRAINT LIGHTRAG_LLM_CACHE_PK PRIMARY KEY (workspace, mode, id)
)""" )"""
}, },
# content column in LIGHTRAG_DOC_STATUS is deprecated, use the same column in LIGHTRAG_DOC_FULL instead
"LIGHTRAG_DOC_STATUS": { "LIGHTRAG_DOC_STATUS": {
"ddl": """CREATE TABLE LIGHTRAG_DOC_STATUS ( "ddl": """CREATE TABLE LIGHTRAG_DOC_STATUS (
workspace varchar(255) NOT NULL, workspace varchar(255) NOT NULL,

View file

@ -786,12 +786,6 @@ class RedisDocStatusStorage(DocStatusStorage):
# Make a copy of the data to avoid modifying the original # Make a copy of the data to avoid modifying the original
data = doc_data.copy() data = doc_data.copy()
# If content is missing, use content_summary as content
if (
"content" not in data
and "content_summary" in data
):
data["content"] = data["content_summary"]
# If file_path is not in data, use document id as file path # If file_path is not in data, use document id as file path
if "file_path" not in data: if "file_path" not in data:
data["file_path"] = "no-file-path" data["file_path"] = "no-file-path"

View file

@ -863,11 +863,10 @@ class LightRAG:
for content, (id_, file_path) in unique_contents.items() for content, (id_, file_path) in unique_contents.items()
} }
# 3. Generate document initial status # 3. Generate document initial status (without content)
new_docs: dict[str, Any] = { new_docs: dict[str, Any] = {
id_: { id_: {
"status": DocStatus.PENDING, "status": DocStatus.PENDING,
"content": content_data["content"],
"content_summary": get_content_summary(content_data["content"]), "content_summary": get_content_summary(content_data["content"]),
"content_length": len(content_data["content"]), "content_length": len(content_data["content"]),
"created_at": datetime.now(timezone.utc).isoformat(), "created_at": datetime.now(timezone.utc).isoformat(),
@ -907,7 +906,15 @@ class LightRAG:
logger.info("No new unique documents were found.") logger.info("No new unique documents were found.")
return return
# 5. Store status document # 5. Store document content in full_docs and status in doc_status
# Store full document content separately
full_docs_data = {
doc_id: {"content": contents[doc_id]["content"]}
for doc_id in new_docs.keys()
}
await self.full_docs.upsert(full_docs_data)
# Store document status (without content)
await self.doc_status.upsert(new_docs) await self.doc_status.upsert(new_docs)
logger.info(f"Stored {len(new_docs)} new unique documents") logger.info(f"Stored {len(new_docs)} new unique documents")
@ -1049,6 +1056,14 @@ class LightRAG:
pipeline_status["latest_message"] = log_message pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message) pipeline_status["history_messages"].append(log_message)
# Get document content from full_docs
content_data = await self.full_docs.get_by_id(doc_id)
if not content_data:
raise Exception(
f"Document content not found in full_docs for doc_id: {doc_id}"
)
content = content_data["content"]
# Generate chunks from document # Generate chunks from document
chunks: dict[str, Any] = { chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): { compute_mdhash_id(dp["content"], prefix="chunk-"): {
@ -1059,7 +1074,7 @@ class LightRAG:
} }
for dp in self.chunking_func( for dp in self.chunking_func(
self.tokenizer, self.tokenizer,
status_doc.content, content,
split_by_character, split_by_character,
split_by_character_only, split_by_character_only,
self.chunk_overlap_token_size, self.chunk_overlap_token_size,
@ -1081,7 +1096,6 @@ class LightRAG:
"chunks_list": list( "chunks_list": list(
chunks.keys() chunks.keys()
), # Save chunks list ), # Save chunks list
"content": status_doc.content,
"content_summary": status_doc.content_summary, "content_summary": status_doc.content_summary,
"content_length": status_doc.content_length, "content_length": status_doc.content_length,
"created_at": status_doc.created_at, "created_at": status_doc.created_at,
@ -1096,11 +1110,6 @@ class LightRAG:
chunks_vdb_task = asyncio.create_task( chunks_vdb_task = asyncio.create_task(
self.chunks_vdb.upsert(chunks) self.chunks_vdb.upsert(chunks)
) )
full_docs_task = asyncio.create_task(
self.full_docs.upsert(
{doc_id: {"content": status_doc.content}}
)
)
text_chunks_task = asyncio.create_task( text_chunks_task = asyncio.create_task(
self.text_chunks.upsert(chunks) self.text_chunks.upsert(chunks)
) )
@ -1109,7 +1118,6 @@ class LightRAG:
first_stage_tasks = [ first_stage_tasks = [
doc_status_task, doc_status_task,
chunks_vdb_task, chunks_vdb_task,
full_docs_task,
text_chunks_task, text_chunks_task,
] ]
entity_relation_task = None entity_relation_task = None
@ -1158,7 +1166,6 @@ class LightRAG:
doc_id: { doc_id: {
"status": DocStatus.FAILED, "status": DocStatus.FAILED,
"error": str(e), "error": str(e),
"content": status_doc.content,
"content_summary": status_doc.content_summary, "content_summary": status_doc.content_summary,
"content_length": status_doc.content_length, "content_length": status_doc.content_length,
"created_at": status_doc.created_at, "created_at": status_doc.created_at,
@ -1197,7 +1204,6 @@ class LightRAG:
"chunks_list": list( "chunks_list": list(
chunks.keys() chunks.keys()
), # 保留 chunks_list ), # 保留 chunks_list
"content": status_doc.content,
"content_summary": status_doc.content_summary, "content_summary": status_doc.content_summary,
"content_length": status_doc.content_length, "content_length": status_doc.content_length,
"created_at": status_doc.created_at, "created_at": status_doc.created_at,
@ -1244,7 +1250,6 @@ class LightRAG:
doc_id: { doc_id: {
"status": DocStatus.FAILED, "status": DocStatus.FAILED,
"error": str(e), "error": str(e),
"content": status_doc.content,
"content_summary": status_doc.content_summary, "content_summary": status_doc.content_summary,
"content_length": status_doc.content_length, "content_length": status_doc.content_length,
"created_at": status_doc.created_at, "created_at": status_doc.created_at,