Add duplicate document detection and skip processed files in scanning

- Add get_doc_by_file_path to all storages
- Skip processed files in scan operation
- Check duplicates in upload endpoints
- Check duplicates in text insert APIs
- Return status info in duplicate responses
This commit is contained in:
yangdx 2025-09-23 17:30:54 +08:00
parent 6b953fa53d
commit 2adb8efdc7
6 changed files with 223 additions and 4 deletions

View file

@ -1395,9 +1395,37 @@ async def run_scanning_process(
logger.info(f"Found {total_files} files to index.")
if new_files:
# Process all files at once with track_id
await pipeline_index_files(rag, new_files, track_id)
logger.info(f"Scanning process completed: {total_files} files Processed.")
# Check for files with PROCESSED status and filter them out
valid_files = []
processed_files = []
for file_path in new_files:
filename = file_path.name
existing_doc_data = await rag.doc_status.get_doc_by_file_path(filename)
if existing_doc_data and existing_doc_data.get("status") == "processed":
# File is already PROCESSED, skip it with warning
processed_files.append(filename)
logger.warning(f"Skipping already processed file: {filename}")
else:
# File is new or in non-PROCESSED status, add to processing list
valid_files.append(file_path)
# Process valid files (new files + non-PROCESSED status files)
if valid_files:
await pipeline_index_files(rag, valid_files, track_id)
if processed_files:
logger.info(
f"Scanning process completed: {len(valid_files)} files Processed {len(processed_files)} skipped."
)
else:
logger.info(
f"Scanning process completed: {len(valid_files)} files Processed."
)
else:
logger.info(
"No files to process after filtering already processed files."
)
else:
# No new files to index, check if there are any documents in the queue
logger.info(
@ -1697,8 +1725,19 @@ def create_document_routes(
detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}",
)
# Check if filename already exists in doc_status storage
existing_doc_data = await rag.doc_status.get_doc_by_file_path(safe_filename)
if existing_doc_data:
# Get document status information for error message
status = existing_doc_data.get("status", "unknown")
return InsertResponse(
status="duplicated",
message=f"File '{safe_filename}' already exists in document storage (Status: {status}).",
track_id="",
)
file_path = doc_manager.input_dir / safe_filename
# Check if file already exists
# Check if file already exists in file system
if file_path.exists():
return InsertResponse(
status="duplicated",
@ -1748,6 +1787,24 @@ def create_document_routes(
HTTPException: If an error occurs during text processing (500).
"""
try:
# Check if file_source already exists in doc_status storage
if (
request.file_source
and request.file_source.strip()
and request.file_source != "unknown_source"
):
existing_doc_data = await rag.doc_status.get_doc_by_file_path(
request.file_source
)
if existing_doc_data:
# Get document status information for error message
status = existing_doc_data.get("status", "unknown")
return InsertResponse(
status="duplicated",
message=f"File source '{request.file_source}' already exists in document storage (Status: {status}).",
track_id="",
)
# Generate track_id for text insertion
track_id = generate_track_id("insert")
@ -1794,6 +1851,26 @@ def create_document_routes(
HTTPException: If an error occurs during text processing (500).
"""
try:
# Check if any file_sources already exist in doc_status storage
if request.file_sources:
for file_source in request.file_sources:
if (
file_source
and file_source.strip()
and file_source != "unknown_source"
):
existing_doc_data = await rag.doc_status.get_doc_by_file_path(
file_source
)
if existing_doc_data:
# Get document status information for error message
status = existing_doc_data.get("status", "unknown")
return InsertResponse(
status="duplicated",
message=f"File source '{file_source}' already exists in document storage (Status: {status}).",
track_id="",
)
# Generate track_id for texts insertion
track_id = generate_track_id("insert")

View file

@ -783,6 +783,18 @@ class DocStatusStorage(BaseKVStorage, ABC):
Dictionary mapping status names to counts
"""
@abstractmethod
async def get_doc_by_file_path(self, file_path: str) -> dict[str, Any] | None:
"""Get document by file path
Args:
file_path: The file path to search for
Returns:
dict[str, Any] | None: Document data if found, None otherwise
Returns the same format as get_by_ids method
"""
class StoragesStatus(str, Enum):
"""Storages status"""

View file

@ -323,6 +323,27 @@ class JsonDocStatusStorage(DocStatusStorage):
if any_deleted:
await set_all_update_flags(self.final_namespace)
async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]:
"""Get document by file path
Args:
file_path: The file path to search for
Returns:
Union[dict[str, Any], None]: Document data if found, None otherwise
Returns the same format as get_by_ids method
"""
if self._storage_lock is None:
raise StorageNotInitializedError("JsonDocStatusStorage")
async with self._storage_lock:
for doc_id, doc_data in self._data.items():
if doc_data.get("file_path") == file_path:
# Return complete document data, consistent with get_by_ids method
return doc_data
return None
async def drop(self) -> dict[str, str]:
"""Drop all document status data from storage and clean up resources

View file

@ -683,6 +683,18 @@ class MongoDocStatusStorage(DocStatusStorage):
return counts
async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]:
"""Get document by file path
Args:
file_path: The file path to search for
Returns:
Union[dict[str, Any], None]: Document data if found, None otherwise
Returns the same format as get_by_id method
"""
return await self._data.find_one({"file_path": file_path})
@final
@dataclass

View file

@ -2382,6 +2382,57 @@ class PGDocStatusStorage(DocStatusStorage):
return processed_results
async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]:
"""Get document by file path
Args:
file_path: The file path to search for
Returns:
Union[dict[str, Any], None]: Document data if found, None otherwise
Returns the same format as get_by_id method
"""
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and file_path=$2"
params = {"workspace": self.workspace, "file_path": file_path}
result = await self.db.query(sql, list(params.values()), True)
if result is None or result == []:
return None
else:
# Parse chunks_list JSON string back to list
chunks_list = result[0].get("chunks_list", [])
if isinstance(chunks_list, str):
try:
chunks_list = json.loads(chunks_list)
except json.JSONDecodeError:
chunks_list = []
# Parse metadata JSON string back to dict
metadata = result[0].get("metadata", {})
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
metadata = {}
# Convert datetime objects to ISO format strings with timezone info
created_at = self._format_datetime_with_timezone(result[0]["created_at"])
updated_at = self._format_datetime_with_timezone(result[0]["updated_at"])
return dict(
content_length=result[0]["content_length"],
content_summary=result[0]["content_summary"],
status=result[0]["status"],
chunks_count=result[0]["chunks_count"],
created_at=created_at,
updated_at=updated_at,
file_path=result[0]["file_path"],
chunks_list=chunks_list,
metadata=metadata,
error_msg=result[0].get("error_msg"),
track_id=result[0].get("track_id"),
)
async def get_status_counts(self) -> dict[str, int]:
"""Get counts of documents in each status"""
sql = """SELECT status as "status", COUNT(1) as "count"

View file

@ -1052,6 +1052,52 @@ class RedisDocStatusStorage(DocStatusStorage):
return counts
async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]:
"""Get document by file path
Args:
file_path: The file path to search for
Returns:
Union[dict[str, Any], None]: Document data if found, None otherwise
Returns the same format as get_by_id method
"""
async with self._get_redis_connection() as redis:
try:
# Use SCAN to iterate through all keys in the namespace
cursor = 0
while True:
cursor, keys = await redis.scan(
cursor, match=f"{self.final_namespace}:*", count=1000
)
if keys:
# Get all values in batch
pipe = redis.pipeline()
for key in keys:
pipe.get(key)
values = await pipe.execute()
# Check each document for matching file_path
for value in values:
if value:
try:
doc_data = json.loads(value)
if doc_data.get("file_path") == file_path:
return doc_data
except json.JSONDecodeError as e:
logger.error(
f"[{self.workspace}] JSON decode error in get_doc_by_file_path: {e}"
)
continue
if cursor == 0:
break
return None
except Exception as e:
logger.error(f"[{self.workspace}] Error in get_doc_by_file_path: {e}")
return None
async def drop(self) -> dict[str, str]:
"""Drop all document status data from storage and clean up resources"""
async with get_storage_lock():