From 6b953fa53dac7042b81405bd6b3382a8ab11a383 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 23 Sep 2025 16:24:53 +0800 Subject: [PATCH 1/2] Remove auto-scan-at-startup feature and related documentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Remove --auto-scan-at-startup arg • Delete auto scan docs sections • Remove startup scanning logic --- lightrag/api/README-zh.md | 13 ------------- lightrag/api/README.md | 13 ------------- lightrag/api/config.py | 7 ------- lightrag/api/lightrag_server.py | 21 --------------------- 4 files changed, 54 deletions(-) diff --git a/lightrag/api/README-zh.md b/lightrag/api/README-zh.md index 1aa22403..7e76d694 100644 --- a/lightrag/api/README-zh.md +++ b/lightrag/api/README-zh.md @@ -140,18 +140,6 @@ docker compose up ``` > 可以通过以下链接获取官方的docker compose文件:[docker-compose.yml]( https://raw.githubusercontent.com/HKUDS/LightRAG/refs/heads/main/docker-compose.yml) 。如需获取LightRAG的历史版本镜像,可以访问以下链接: [LightRAG Docker Images]( https://github.com/HKUDS/LightRAG/pkgs/container/lightrag) -### 启动时自动扫描 - -当使用 `--auto-scan-at-startup` 参数启动LightRAG Server时,系统将自动: - -1. 扫描输入目录中的新文件 -2. 为尚未在数据库中的新文档建立索引 -3. 使所有内容立即可用于 RAG 查询 - -这种工作模式给启动一个临时的RAG任务提供给了方便。 - -> `--input-dir` 参数指定要扫描的输入目录。您可以从 webui 触发输入目录扫描。 - ### 启动多个LightRAG实例 有两种方式可以启动多个LightRAG实例。第一种方式是为每个实例配置一个完全独立的工作环境。此时需要为每个实例创建一个独立的工作目录,然后在这个工作目录上放置一个当前实例专用的`.env`配置文件。不同实例的配置文件中的服务器监听端口不能重复,然后在工作目录上执行 lightrag-server 启动服务即可。 @@ -432,7 +420,6 @@ LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage | --ssl-keyfile | None | SSL 私钥文件路径(如果启用 --ssl 则必需) | | --llm-binding | ollama | LLM 绑定类型(lollms、ollama、openai、openai-ollama、azure_openai、aws_bedrock) | | --embedding-binding | ollama | 嵌入绑定类型(lollms、ollama、openai、azure_openai、aws_bedrock) | -| auto-scan-at-startup | - | 扫描输入目录中的新文件并开始索引 | ### Reranking 配置 diff --git a/lightrag/api/README.md b/lightrag/api/README.md index 44fb6d28..732a686f 100644 --- a/lightrag/api/README.md +++ b/lightrag/api/README.md @@ -143,18 +143,6 @@ docker compose up > You can get the official docker compose file from here: [docker-compose.yml](https://raw.githubusercontent.com/HKUDS/LightRAG/refs/heads/main/docker-compose.yml). For historical versions of LightRAG docker images, visit this link: [LightRAG Docker Images](https://github.com/HKUDS/LightRAG/pkgs/container/lightrag) -### Auto scan on startup - -When starting the LightRAG Server with the `--auto-scan-at-startup` parameter, the system will automatically: - -1. Scan for new files in the input directory -2. Index new documents that aren't already in the database -3. Make all content immediately available for RAG queries - -This offers an efficient method for deploying ad-hoc RAG processes. - -> The `--input-dir` parameter specifies the input directory to scan. You can trigger the input directory scan from the Web UI. - ### Starting Multiple LightRAG Instances There are two ways to start multiple LightRAG instances. The first way is to configure a completely independent working environment for each instance. This requires creating a separate working directory for each instance and placing a dedicated `.env` configuration file in that directory. The server listening ports in the configuration files of different instances cannot be the same. Then, you can start the service by running `lightrag-server` in the working directory. @@ -434,7 +422,6 @@ You cannot change storage implementation selection after adding documents to Lig | --ssl-keyfile | None | Path to SSL private key file (required if --ssl is enabled) | | --llm-binding | ollama | LLM binding type (lollms, ollama, openai, openai-ollama, azure_openai, aws_bedrock) | | --embedding-binding | ollama | Embedding binding type (lollms, ollama, openai, azure_openai, aws_bedrock) | -| --auto-scan-at-startup| - | Scan input directory for new files and start indexing | ### Reranking Configuration diff --git a/lightrag/api/config.py b/lightrag/api/config.py index 6966a7f2..de569f47 100644 --- a/lightrag/api/config.py +++ b/lightrag/api/config.py @@ -206,13 +206,6 @@ def parse_args() -> argparse.Namespace: help="Default workspace for all storage", ) - parser.add_argument( - "--auto-scan-at-startup", - action="store_true", - default=False, - help="Enable automatic scanning when the program starts", - ) - # Server workers configuration parser.add_argument( "--workers", diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index a32c9c3e..256c7d6c 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -3,7 +3,6 @@ LightRAG FastAPI Server """ from fastapi import FastAPI, Depends, HTTPException -import asyncio import os import logging import logging.config @@ -45,7 +44,6 @@ from lightrag.constants import ( from lightrag.api.routers.document_routes import ( DocumentManager, create_document_routes, - run_scanning_process, ) from lightrag.api.routers.query_routes import create_query_routes from lightrag.api.routers.graph_routes import create_graph_routes @@ -54,7 +52,6 @@ from lightrag.api.routers.ollama_api import OllamaAPI from lightrag.utils import logger, set_verbose_debug from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, initialize_pipeline_status, cleanup_keyed_lock, finalize_share_data, @@ -212,24 +209,6 @@ def create_app(args): # Data migration regardless of storage implementation await rag.check_and_migrate_data() - pipeline_status = await get_namespace_data("pipeline_status") - - should_start_autoscan = False - async with get_pipeline_status_lock(): - # Auto scan documents if enabled - if args.auto_scan_at_startup: - if not pipeline_status.get("autoscanned", False): - pipeline_status["autoscanned"] = True - should_start_autoscan = True - - # Only run auto scan when no other process started it first - if should_start_autoscan: - # Create background task - task = asyncio.create_task(run_scanning_process(rag, doc_manager)) - app.state.background_tasks.add(task) - task.add_done_callback(app.state.background_tasks.discard) - logger.info(f"Process {os.getpid()} auto scan task started at startup.") - ASCIIColors.green("\nServer is ready to accept connections! 🚀\n") yield From 2adb8efdc78aa78261e4a7d249bdc0d7937ecc23 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 23 Sep 2025 17:30:54 +0800 Subject: [PATCH 2/2] Add duplicate document detection and skip processed files in scanning - Add get_doc_by_file_path to all storages - Skip processed files in scan operation - Check duplicates in upload endpoints - Check duplicates in text insert APIs - Return status info in duplicate responses --- lightrag/api/routers/document_routes.py | 85 +++++++++++++++++++++++-- lightrag/base.py | 12 ++++ lightrag/kg/json_doc_status_impl.py | 21 ++++++ lightrag/kg/mongo_impl.py | 12 ++++ lightrag/kg/postgres_impl.py | 51 +++++++++++++++ lightrag/kg/redis_impl.py | 46 +++++++++++++ 6 files changed, 223 insertions(+), 4 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 32520d33..c7d9dd97 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1395,9 +1395,37 @@ async def run_scanning_process( logger.info(f"Found {total_files} files to index.") if new_files: - # Process all files at once with track_id - await pipeline_index_files(rag, new_files, track_id) - logger.info(f"Scanning process completed: {total_files} files Processed.") + # Check for files with PROCESSED status and filter them out + valid_files = [] + processed_files = [] + + for file_path in new_files: + filename = file_path.name + existing_doc_data = await rag.doc_status.get_doc_by_file_path(filename) + + if existing_doc_data and existing_doc_data.get("status") == "processed": + # File is already PROCESSED, skip it with warning + processed_files.append(filename) + logger.warning(f"Skipping already processed file: {filename}") + else: + # File is new or in non-PROCESSED status, add to processing list + valid_files.append(file_path) + + # Process valid files (new files + non-PROCESSED status files) + if valid_files: + await pipeline_index_files(rag, valid_files, track_id) + if processed_files: + logger.info( + f"Scanning process completed: {len(valid_files)} files Processed {len(processed_files)} skipped." + ) + else: + logger.info( + f"Scanning process completed: {len(valid_files)} files Processed." + ) + else: + logger.info( + "No files to process after filtering already processed files." + ) else: # No new files to index, check if there are any documents in the queue logger.info( @@ -1697,8 +1725,19 @@ def create_document_routes( detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}", ) + # Check if filename already exists in doc_status storage + existing_doc_data = await rag.doc_status.get_doc_by_file_path(safe_filename) + if existing_doc_data: + # Get document status information for error message + status = existing_doc_data.get("status", "unknown") + return InsertResponse( + status="duplicated", + message=f"File '{safe_filename}' already exists in document storage (Status: {status}).", + track_id="", + ) + file_path = doc_manager.input_dir / safe_filename - # Check if file already exists + # Check if file already exists in file system if file_path.exists(): return InsertResponse( status="duplicated", @@ -1748,6 +1787,24 @@ def create_document_routes( HTTPException: If an error occurs during text processing (500). """ try: + # Check if file_source already exists in doc_status storage + if ( + request.file_source + and request.file_source.strip() + and request.file_source != "unknown_source" + ): + existing_doc_data = await rag.doc_status.get_doc_by_file_path( + request.file_source + ) + if existing_doc_data: + # Get document status information for error message + status = existing_doc_data.get("status", "unknown") + return InsertResponse( + status="duplicated", + message=f"File source '{request.file_source}' already exists in document storage (Status: {status}).", + track_id="", + ) + # Generate track_id for text insertion track_id = generate_track_id("insert") @@ -1794,6 +1851,26 @@ def create_document_routes( HTTPException: If an error occurs during text processing (500). """ try: + # Check if any file_sources already exist in doc_status storage + if request.file_sources: + for file_source in request.file_sources: + if ( + file_source + and file_source.strip() + and file_source != "unknown_source" + ): + existing_doc_data = await rag.doc_status.get_doc_by_file_path( + file_source + ) + if existing_doc_data: + # Get document status information for error message + status = existing_doc_data.get("status", "unknown") + return InsertResponse( + status="duplicated", + message=f"File source '{file_source}' already exists in document storage (Status: {status}).", + track_id="", + ) + # Generate track_id for texts insertion track_id = generate_track_id("insert") diff --git a/lightrag/base.py b/lightrag/base.py index cc8e3c09..4ffc9505 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -783,6 +783,18 @@ class DocStatusStorage(BaseKVStorage, ABC): Dictionary mapping status names to counts """ + @abstractmethod + async def get_doc_by_file_path(self, file_path: str) -> dict[str, Any] | None: + """Get document by file path + + Args: + file_path: The file path to search for + + Returns: + dict[str, Any] | None: Document data if found, None otherwise + Returns the same format as get_by_ids method + """ + class StoragesStatus(str, Enum): """Storages status""" diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 5464d0c3..329c61c6 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -323,6 +323,27 @@ class JsonDocStatusStorage(DocStatusStorage): if any_deleted: await set_all_update_flags(self.final_namespace) + async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]: + """Get document by file path + + Args: + file_path: The file path to search for + + Returns: + Union[dict[str, Any], None]: Document data if found, None otherwise + Returns the same format as get_by_ids method + """ + if self._storage_lock is None: + raise StorageNotInitializedError("JsonDocStatusStorage") + + async with self._storage_lock: + for doc_id, doc_data in self._data.items(): + if doc_data.get("file_path") == file_path: + # Return complete document data, consistent with get_by_ids method + return doc_data + + return None + async def drop(self) -> dict[str, str]: """Drop all document status data from storage and clean up resources diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index ce8e10fd..0c11022e 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -683,6 +683,18 @@ class MongoDocStatusStorage(DocStatusStorage): return counts + async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]: + """Get document by file path + + Args: + file_path: The file path to search for + + Returns: + Union[dict[str, Any], None]: Document data if found, None otherwise + Returns the same format as get_by_id method + """ + return await self._data.find_one({"file_path": file_path}) + @final @dataclass diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 2394ecb4..ad271b15 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -2382,6 +2382,57 @@ class PGDocStatusStorage(DocStatusStorage): return processed_results + async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]: + """Get document by file path + + Args: + file_path: The file path to search for + + Returns: + Union[dict[str, Any], None]: Document data if found, None otherwise + Returns the same format as get_by_id method + """ + sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and file_path=$2" + params = {"workspace": self.workspace, "file_path": file_path} + result = await self.db.query(sql, list(params.values()), True) + + if result is None or result == []: + return None + else: + # Parse chunks_list JSON string back to list + chunks_list = result[0].get("chunks_list", []) + if isinstance(chunks_list, str): + try: + chunks_list = json.loads(chunks_list) + except json.JSONDecodeError: + chunks_list = [] + + # Parse metadata JSON string back to dict + metadata = result[0].get("metadata", {}) + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except json.JSONDecodeError: + metadata = {} + + # Convert datetime objects to ISO format strings with timezone info + created_at = self._format_datetime_with_timezone(result[0]["created_at"]) + updated_at = self._format_datetime_with_timezone(result[0]["updated_at"]) + + return dict( + content_length=result[0]["content_length"], + content_summary=result[0]["content_summary"], + status=result[0]["status"], + chunks_count=result[0]["chunks_count"], + created_at=created_at, + updated_at=updated_at, + file_path=result[0]["file_path"], + chunks_list=chunks_list, + metadata=metadata, + error_msg=result[0].get("error_msg"), + track_id=result[0].get("track_id"), + ) + async def get_status_counts(self) -> dict[str, int]: """Get counts of documents in each status""" sql = """SELECT status as "status", COUNT(1) as "count" diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index f7f11285..476344a0 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -1052,6 +1052,52 @@ class RedisDocStatusStorage(DocStatusStorage): return counts + async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]: + """Get document by file path + + Args: + file_path: The file path to search for + + Returns: + Union[dict[str, Any], None]: Document data if found, None otherwise + Returns the same format as get_by_id method + """ + async with self._get_redis_connection() as redis: + try: + # Use SCAN to iterate through all keys in the namespace + cursor = 0 + while True: + cursor, keys = await redis.scan( + cursor, match=f"{self.final_namespace}:*", count=1000 + ) + if keys: + # Get all values in batch + pipe = redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + # Check each document for matching file_path + for value in values: + if value: + try: + doc_data = json.loads(value) + if doc_data.get("file_path") == file_path: + return doc_data + except json.JSONDecodeError as e: + logger.error( + f"[{self.workspace}] JSON decode error in get_doc_by_file_path: {e}" + ) + continue + + if cursor == 0: + break + + return None + except Exception as e: + logger.error(f"[{self.workspace}] Error in get_doc_by_file_path: {e}") + return None + async def drop(self) -> dict[str, str]: """Drop all document status data from storage and clean up resources""" async with get_storage_lock():