Optimize document processing pipeline by removing duplicate step

This commit is contained in:
yangdx 2025-08-16 17:23:01 +08:00
parent 5591ef3ac8
commit e1310c5262

View file

@ -971,11 +971,10 @@ class LightRAG:
"""
Pipeline for Processing Documents
1. Validate ids if provided or generate MD5 hash IDs
2. Remove duplicate contents
3. Generate document initial status
4. Filter out already processed documents
5. Enqueue document in status
1. Validate ids if provided or generate MD5 hash IDs and remove duplicate contents
2. Generate document initial status
3. Filter out already processed documents
4. Enqueue document in status
Args:
input: Single document string or list of document strings
@ -1008,7 +1007,7 @@ class LightRAG:
# If no file paths provided, use placeholder
file_paths = ["unknown_source"] * len(input)
# 1. Validate ids if provided or generate MD5 hash IDs
# 1. Validate ids if provided or generate MD5 hash IDs and remove duplicate contents
if ids is not None:
# Check if the number of IDs matches the number of documents
if len(ids) != len(input):
@ -1018,22 +1017,25 @@ class LightRAG:
if len(ids) != len(set(ids)):
raise ValueError("IDs must be unique")
# Generate contents dict of IDs provided by user and documents
# Generate contents dict and remove duplicates in one pass
unique_contents = {}
for id_, doc, path in zip(ids, input, file_paths):
cleaned_content = clean_text(doc)
if cleaned_content not in unique_contents:
unique_contents[cleaned_content] = (id_, path)
# Reconstruct contents with unique content
contents = {
id_: {"content": doc, "file_path": path}
for id_, doc, path in zip(ids, input, file_paths)
id_: {"content": content, "file_path": file_path}
for content, (id_, file_path) in unique_contents.items()
}
else:
# Clean input text and remove duplicates
cleaned_input = [
(clean_text(doc), path) for doc, path in zip(input, file_paths)
]
# Clean input text and remove duplicates in one pass
unique_content_with_paths = {}
# Keep track of unique content and their paths
for content, path in cleaned_input:
if content not in unique_content_with_paths:
unique_content_with_paths[content] = path
for doc, path in zip(input, file_paths):
cleaned_content = clean_text(doc)
if cleaned_content not in unique_content_with_paths:
unique_content_with_paths[cleaned_content] = path
# Generate contents dict of MD5 hash IDs and documents with paths
contents = {
@ -1044,21 +1046,7 @@ class LightRAG:
for content, path in unique_content_with_paths.items()
}
# 2. Remove duplicate contents
unique_contents = {}
for id_, content_data in contents.items():
content = content_data["content"]
file_path = content_data["file_path"]
if content not in unique_contents:
unique_contents[content] = (id_, file_path)
# Reconstruct contents with unique content
contents = {
id_: {"content": content, "file_path": file_path}
for content, (id_, file_path) in unique_contents.items()
}
# 3. Generate document initial status (without content)
# 2. Generate document initial status (without content)
new_docs: dict[str, Any] = {
id_: {
"status": DocStatus.PENDING,
@ -1074,7 +1062,7 @@ class LightRAG:
for id_, content_data in contents.items()
}
# 4. Filter out already processed documents
# 3. Filter out already processed documents
# Get docs ids
all_new_doc_ids = set(new_docs.keys())
# Exclude IDs of documents that are already enqueued
@ -1104,8 +1092,8 @@ class LightRAG:
logger.warning("No new unique documents were found.")
return
# 5. Store document content in full_docs and status in doc_status
# Store full document content separately
# 4. Store document content in full_docs and status in doc_status
# Store full document content separately
full_docs_data = {
doc_id: {"content": contents[doc_id]["content"]}
for doc_id in new_docs.keys()