diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 32520d33..c7d9dd97 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1395,9 +1395,37 @@ async def run_scanning_process( logger.info(f"Found {total_files} files to index.") if new_files: - # Process all files at once with track_id - await pipeline_index_files(rag, new_files, track_id) - logger.info(f"Scanning process completed: {total_files} files Processed.") + # Check for files with PROCESSED status and filter them out + valid_files = [] + processed_files = [] + + for file_path in new_files: + filename = file_path.name + existing_doc_data = await rag.doc_status.get_doc_by_file_path(filename) + + if existing_doc_data and existing_doc_data.get("status") == "processed": + # File is already PROCESSED, skip it with warning + processed_files.append(filename) + logger.warning(f"Skipping already processed file: {filename}") + else: + # File is new or in non-PROCESSED status, add to processing list + valid_files.append(file_path) + + # Process valid files (new files + non-PROCESSED status files) + if valid_files: + await pipeline_index_files(rag, valid_files, track_id) + if processed_files: + logger.info( + f"Scanning process completed: {len(valid_files)} files Processed {len(processed_files)} skipped." + ) + else: + logger.info( + f"Scanning process completed: {len(valid_files)} files Processed." + ) + else: + logger.info( + "No files to process after filtering already processed files." + ) else: # No new files to index, check if there are any documents in the queue logger.info( @@ -1697,8 +1725,19 @@ def create_document_routes( detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}", ) + # Check if filename already exists in doc_status storage + existing_doc_data = await rag.doc_status.get_doc_by_file_path(safe_filename) + if existing_doc_data: + # Get document status information for error message + status = existing_doc_data.get("status", "unknown") + return InsertResponse( + status="duplicated", + message=f"File '{safe_filename}' already exists in document storage (Status: {status}).", + track_id="", + ) + file_path = doc_manager.input_dir / safe_filename - # Check if file already exists + # Check if file already exists in file system if file_path.exists(): return InsertResponse( status="duplicated", @@ -1748,6 +1787,24 @@ def create_document_routes( HTTPException: If an error occurs during text processing (500). """ try: + # Check if file_source already exists in doc_status storage + if ( + request.file_source + and request.file_source.strip() + and request.file_source != "unknown_source" + ): + existing_doc_data = await rag.doc_status.get_doc_by_file_path( + request.file_source + ) + if existing_doc_data: + # Get document status information for error message + status = existing_doc_data.get("status", "unknown") + return InsertResponse( + status="duplicated", + message=f"File source '{request.file_source}' already exists in document storage (Status: {status}).", + track_id="", + ) + # Generate track_id for text insertion track_id = generate_track_id("insert") @@ -1794,6 +1851,26 @@ def create_document_routes( HTTPException: If an error occurs during text processing (500). """ try: + # Check if any file_sources already exist in doc_status storage + if request.file_sources: + for file_source in request.file_sources: + if ( + file_source + and file_source.strip() + and file_source != "unknown_source" + ): + existing_doc_data = await rag.doc_status.get_doc_by_file_path( + file_source + ) + if existing_doc_data: + # Get document status information for error message + status = existing_doc_data.get("status", "unknown") + return InsertResponse( + status="duplicated", + message=f"File source '{file_source}' already exists in document storage (Status: {status}).", + track_id="", + ) + # Generate track_id for texts insertion track_id = generate_track_id("insert") diff --git a/lightrag/base.py b/lightrag/base.py index cc8e3c09..4ffc9505 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -783,6 +783,18 @@ class DocStatusStorage(BaseKVStorage, ABC): Dictionary mapping status names to counts """ + @abstractmethod + async def get_doc_by_file_path(self, file_path: str) -> dict[str, Any] | None: + """Get document by file path + + Args: + file_path: The file path to search for + + Returns: + dict[str, Any] | None: Document data if found, None otherwise + Returns the same format as get_by_ids method + """ + class StoragesStatus(str, Enum): """Storages status""" diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 5464d0c3..329c61c6 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -323,6 +323,27 @@ class JsonDocStatusStorage(DocStatusStorage): if any_deleted: await set_all_update_flags(self.final_namespace) + async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]: + """Get document by file path + + Args: + file_path: The file path to search for + + Returns: + Union[dict[str, Any], None]: Document data if found, None otherwise + Returns the same format as get_by_ids method + """ + if self._storage_lock is None: + raise StorageNotInitializedError("JsonDocStatusStorage") + + async with self._storage_lock: + for doc_id, doc_data in self._data.items(): + if doc_data.get("file_path") == file_path: + # Return complete document data, consistent with get_by_ids method + return doc_data + + return None + async def drop(self) -> dict[str, str]: """Drop all document status data from storage and clean up resources diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index ce8e10fd..0c11022e 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -683,6 +683,18 @@ class MongoDocStatusStorage(DocStatusStorage): return counts + async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]: + """Get document by file path + + Args: + file_path: The file path to search for + + Returns: + Union[dict[str, Any], None]: Document data if found, None otherwise + Returns the same format as get_by_id method + """ + return await self._data.find_one({"file_path": file_path}) + @final @dataclass diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 2394ecb4..ad271b15 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -2382,6 +2382,57 @@ class PGDocStatusStorage(DocStatusStorage): return processed_results + async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]: + """Get document by file path + + Args: + file_path: The file path to search for + + Returns: + Union[dict[str, Any], None]: Document data if found, None otherwise + Returns the same format as get_by_id method + """ + sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and file_path=$2" + params = {"workspace": self.workspace, "file_path": file_path} + result = await self.db.query(sql, list(params.values()), True) + + if result is None or result == []: + return None + else: + # Parse chunks_list JSON string back to list + chunks_list = result[0].get("chunks_list", []) + if isinstance(chunks_list, str): + try: + chunks_list = json.loads(chunks_list) + except json.JSONDecodeError: + chunks_list = [] + + # Parse metadata JSON string back to dict + metadata = result[0].get("metadata", {}) + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except json.JSONDecodeError: + metadata = {} + + # Convert datetime objects to ISO format strings with timezone info + created_at = self._format_datetime_with_timezone(result[0]["created_at"]) + updated_at = self._format_datetime_with_timezone(result[0]["updated_at"]) + + return dict( + content_length=result[0]["content_length"], + content_summary=result[0]["content_summary"], + status=result[0]["status"], + chunks_count=result[0]["chunks_count"], + created_at=created_at, + updated_at=updated_at, + file_path=result[0]["file_path"], + chunks_list=chunks_list, + metadata=metadata, + error_msg=result[0].get("error_msg"), + track_id=result[0].get("track_id"), + ) + async def get_status_counts(self) -> dict[str, int]: """Get counts of documents in each status""" sql = """SELECT status as "status", COUNT(1) as "count" diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index f7f11285..476344a0 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -1052,6 +1052,52 @@ class RedisDocStatusStorage(DocStatusStorage): return counts + async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]: + """Get document by file path + + Args: + file_path: The file path to search for + + Returns: + Union[dict[str, Any], None]: Document data if found, None otherwise + Returns the same format as get_by_id method + """ + async with self._get_redis_connection() as redis: + try: + # Use SCAN to iterate through all keys in the namespace + cursor = 0 + while True: + cursor, keys = await redis.scan( + cursor, match=f"{self.final_namespace}:*", count=1000 + ) + if keys: + # Get all values in batch + pipe = redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + # Check each document for matching file_path + for value in values: + if value: + try: + doc_data = json.loads(value) + if doc_data.get("file_path") == file_path: + return doc_data + except json.JSONDecodeError as e: + logger.error( + f"[{self.workspace}] JSON decode error in get_doc_by_file_path: {e}" + ) + continue + + if cursor == 0: + break + + return None + except Exception as e: + logger.error(f"[{self.workspace}] Error in get_doc_by_file_path: {e}") + return None + async def drop(self) -> dict[str, str]: """Drop all document status data from storage and clean up resources""" async with get_storage_lock():