Merge pull request #2135 from danielaskdd/reject-duplicate-file

feat: Implement Comprehensive Document Duplication Prevention System
This commit is contained in:
Daniel.y 2025-09-23 17:32:51 +08:00 committed by GitHub
commit 203004ead6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 223 additions and 58 deletions

View file

@ -140,18 +140,6 @@ docker compose up
```
> 可以通过以下链接获取官方的docker compose文件[docker-compose.yml]( https://raw.githubusercontent.com/HKUDS/LightRAG/refs/heads/main/docker-compose.yml) 。如需获取LightRAG的历史版本镜像可以访问以下链接: [LightRAG Docker Images]( https://github.com/HKUDS/LightRAG/pkgs/container/lightrag)
### 启动时自动扫描
当使用 `--auto-scan-at-startup` 参数启动LightRAG Server时系统将自动
1. 扫描输入目录中的新文件
2. 为尚未在数据库中的新文档建立索引
3. 使所有内容立即可用于 RAG 查询
这种工作模式给启动一个临时的RAG任务提供给了方便。
> `--input-dir` 参数指定要扫描的输入目录。您可以从 webui 触发输入目录扫描。
### 启动多个LightRAG实例
有两种方式可以启动多个LightRAG实例。第一种方式是为每个实例配置一个完全独立的工作环境。此时需要为每个实例创建一个独立的工作目录然后在这个工作目录上放置一个当前实例专用的`.env`配置文件。不同实例的配置文件中的服务器监听端口不能重复,然后在工作目录上执行 lightrag-server 启动服务即可。
@ -432,7 +420,6 @@ LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage
| --ssl-keyfile | None | SSL 私钥文件路径(如果启用 --ssl 则必需) |
| --llm-binding | ollama | LLM 绑定类型lollms、ollama、openai、openai-ollama、azure_openai、aws_bedrock |
| --embedding-binding | ollama | 嵌入绑定类型lollms、ollama、openai、azure_openai、aws_bedrock |
| auto-scan-at-startup | - | 扫描输入目录中的新文件并开始索引 |
### Reranking 配置

View file

@ -143,18 +143,6 @@ docker compose up
> You can get the official docker compose file from here: [docker-compose.yml](https://raw.githubusercontent.com/HKUDS/LightRAG/refs/heads/main/docker-compose.yml). For historical versions of LightRAG docker images, visit this link: [LightRAG Docker Images](https://github.com/HKUDS/LightRAG/pkgs/container/lightrag)
### Auto scan on startup
When starting the LightRAG Server with the `--auto-scan-at-startup` parameter, the system will automatically:
1. Scan for new files in the input directory
2. Index new documents that aren't already in the database
3. Make all content immediately available for RAG queries
This offers an efficient method for deploying ad-hoc RAG processes.
> The `--input-dir` parameter specifies the input directory to scan. You can trigger the input directory scan from the Web UI.
### Starting Multiple LightRAG Instances
There are two ways to start multiple LightRAG instances. The first way is to configure a completely independent working environment for each instance. This requires creating a separate working directory for each instance and placing a dedicated `.env` configuration file in that directory. The server listening ports in the configuration files of different instances cannot be the same. Then, you can start the service by running `lightrag-server` in the working directory.
@ -434,7 +422,6 @@ You cannot change storage implementation selection after adding documents to Lig
| --ssl-keyfile | None | Path to SSL private key file (required if --ssl is enabled) |
| --llm-binding | ollama | LLM binding type (lollms, ollama, openai, openai-ollama, azure_openai, aws_bedrock) |
| --embedding-binding | ollama | Embedding binding type (lollms, ollama, openai, azure_openai, aws_bedrock) |
| --auto-scan-at-startup| - | Scan input directory for new files and start indexing |
### Reranking Configuration

View file

@ -206,13 +206,6 @@ def parse_args() -> argparse.Namespace:
help="Default workspace for all storage",
)
parser.add_argument(
"--auto-scan-at-startup",
action="store_true",
default=False,
help="Enable automatic scanning when the program starts",
)
# Server workers configuration
parser.add_argument(
"--workers",

View file

@ -3,7 +3,6 @@ LightRAG FastAPI Server
"""
from fastapi import FastAPI, Depends, HTTPException
import asyncio
import os
import logging
import logging.config
@ -45,7 +44,6 @@ from lightrag.constants import (
from lightrag.api.routers.document_routes import (
DocumentManager,
create_document_routes,
run_scanning_process,
)
from lightrag.api.routers.query_routes import create_query_routes
from lightrag.api.routers.graph_routes import create_graph_routes
@ -54,7 +52,6 @@ from lightrag.api.routers.ollama_api import OllamaAPI
from lightrag.utils import logger, set_verbose_debug
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
initialize_pipeline_status,
cleanup_keyed_lock,
finalize_share_data,
@ -212,24 +209,6 @@ def create_app(args):
# Data migration regardless of storage implementation
await rag.check_and_migrate_data()
pipeline_status = await get_namespace_data("pipeline_status")
should_start_autoscan = False
async with get_pipeline_status_lock():
# Auto scan documents if enabled
if args.auto_scan_at_startup:
if not pipeline_status.get("autoscanned", False):
pipeline_status["autoscanned"] = True
should_start_autoscan = True
# Only run auto scan when no other process started it first
if should_start_autoscan:
# Create background task
task = asyncio.create_task(run_scanning_process(rag, doc_manager))
app.state.background_tasks.add(task)
task.add_done_callback(app.state.background_tasks.discard)
logger.info(f"Process {os.getpid()} auto scan task started at startup.")
ASCIIColors.green("\nServer is ready to accept connections! 🚀\n")
yield

View file

@ -1395,9 +1395,37 @@ async def run_scanning_process(
logger.info(f"Found {total_files} files to index.")
if new_files:
# Process all files at once with track_id
await pipeline_index_files(rag, new_files, track_id)
logger.info(f"Scanning process completed: {total_files} files Processed.")
# Check for files with PROCESSED status and filter them out
valid_files = []
processed_files = []
for file_path in new_files:
filename = file_path.name
existing_doc_data = await rag.doc_status.get_doc_by_file_path(filename)
if existing_doc_data and existing_doc_data.get("status") == "processed":
# File is already PROCESSED, skip it with warning
processed_files.append(filename)
logger.warning(f"Skipping already processed file: {filename}")
else:
# File is new or in non-PROCESSED status, add to processing list
valid_files.append(file_path)
# Process valid files (new files + non-PROCESSED status files)
if valid_files:
await pipeline_index_files(rag, valid_files, track_id)
if processed_files:
logger.info(
f"Scanning process completed: {len(valid_files)} files Processed {len(processed_files)} skipped."
)
else:
logger.info(
f"Scanning process completed: {len(valid_files)} files Processed."
)
else:
logger.info(
"No files to process after filtering already processed files."
)
else:
# No new files to index, check if there are any documents in the queue
logger.info(
@ -1697,8 +1725,19 @@ def create_document_routes(
detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}",
)
# Check if filename already exists in doc_status storage
existing_doc_data = await rag.doc_status.get_doc_by_file_path(safe_filename)
if existing_doc_data:
# Get document status information for error message
status = existing_doc_data.get("status", "unknown")
return InsertResponse(
status="duplicated",
message=f"File '{safe_filename}' already exists in document storage (Status: {status}).",
track_id="",
)
file_path = doc_manager.input_dir / safe_filename
# Check if file already exists
# Check if file already exists in file system
if file_path.exists():
return InsertResponse(
status="duplicated",
@ -1748,6 +1787,24 @@ def create_document_routes(
HTTPException: If an error occurs during text processing (500).
"""
try:
# Check if file_source already exists in doc_status storage
if (
request.file_source
and request.file_source.strip()
and request.file_source != "unknown_source"
):
existing_doc_data = await rag.doc_status.get_doc_by_file_path(
request.file_source
)
if existing_doc_data:
# Get document status information for error message
status = existing_doc_data.get("status", "unknown")
return InsertResponse(
status="duplicated",
message=f"File source '{request.file_source}' already exists in document storage (Status: {status}).",
track_id="",
)
# Generate track_id for text insertion
track_id = generate_track_id("insert")
@ -1794,6 +1851,26 @@ def create_document_routes(
HTTPException: If an error occurs during text processing (500).
"""
try:
# Check if any file_sources already exist in doc_status storage
if request.file_sources:
for file_source in request.file_sources:
if (
file_source
and file_source.strip()
and file_source != "unknown_source"
):
existing_doc_data = await rag.doc_status.get_doc_by_file_path(
file_source
)
if existing_doc_data:
# Get document status information for error message
status = existing_doc_data.get("status", "unknown")
return InsertResponse(
status="duplicated",
message=f"File source '{file_source}' already exists in document storage (Status: {status}).",
track_id="",
)
# Generate track_id for texts insertion
track_id = generate_track_id("insert")

View file

@ -783,6 +783,18 @@ class DocStatusStorage(BaseKVStorage, ABC):
Dictionary mapping status names to counts
"""
@abstractmethod
async def get_doc_by_file_path(self, file_path: str) -> dict[str, Any] | None:
"""Get document by file path
Args:
file_path: The file path to search for
Returns:
dict[str, Any] | None: Document data if found, None otherwise
Returns the same format as get_by_ids method
"""
class StoragesStatus(str, Enum):
"""Storages status"""

View file

@ -323,6 +323,27 @@ class JsonDocStatusStorage(DocStatusStorage):
if any_deleted:
await set_all_update_flags(self.final_namespace)
async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]:
"""Get document by file path
Args:
file_path: The file path to search for
Returns:
Union[dict[str, Any], None]: Document data if found, None otherwise
Returns the same format as get_by_ids method
"""
if self._storage_lock is None:
raise StorageNotInitializedError("JsonDocStatusStorage")
async with self._storage_lock:
for doc_id, doc_data in self._data.items():
if doc_data.get("file_path") == file_path:
# Return complete document data, consistent with get_by_ids method
return doc_data
return None
async def drop(self) -> dict[str, str]:
"""Drop all document status data from storage and clean up resources

View file

@ -683,6 +683,18 @@ class MongoDocStatusStorage(DocStatusStorage):
return counts
async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]:
"""Get document by file path
Args:
file_path: The file path to search for
Returns:
Union[dict[str, Any], None]: Document data if found, None otherwise
Returns the same format as get_by_id method
"""
return await self._data.find_one({"file_path": file_path})
@final
@dataclass

View file

@ -2382,6 +2382,57 @@ class PGDocStatusStorage(DocStatusStorage):
return processed_results
async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]:
"""Get document by file path
Args:
file_path: The file path to search for
Returns:
Union[dict[str, Any], None]: Document data if found, None otherwise
Returns the same format as get_by_id method
"""
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and file_path=$2"
params = {"workspace": self.workspace, "file_path": file_path}
result = await self.db.query(sql, list(params.values()), True)
if result is None or result == []:
return None
else:
# Parse chunks_list JSON string back to list
chunks_list = result[0].get("chunks_list", [])
if isinstance(chunks_list, str):
try:
chunks_list = json.loads(chunks_list)
except json.JSONDecodeError:
chunks_list = []
# Parse metadata JSON string back to dict
metadata = result[0].get("metadata", {})
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
metadata = {}
# Convert datetime objects to ISO format strings with timezone info
created_at = self._format_datetime_with_timezone(result[0]["created_at"])
updated_at = self._format_datetime_with_timezone(result[0]["updated_at"])
return dict(
content_length=result[0]["content_length"],
content_summary=result[0]["content_summary"],
status=result[0]["status"],
chunks_count=result[0]["chunks_count"],
created_at=created_at,
updated_at=updated_at,
file_path=result[0]["file_path"],
chunks_list=chunks_list,
metadata=metadata,
error_msg=result[0].get("error_msg"),
track_id=result[0].get("track_id"),
)
async def get_status_counts(self) -> dict[str, int]:
"""Get counts of documents in each status"""
sql = """SELECT status as "status", COUNT(1) as "count"

View file

@ -1052,6 +1052,52 @@ class RedisDocStatusStorage(DocStatusStorage):
return counts
async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]:
"""Get document by file path
Args:
file_path: The file path to search for
Returns:
Union[dict[str, Any], None]: Document data if found, None otherwise
Returns the same format as get_by_id method
"""
async with self._get_redis_connection() as redis:
try:
# Use SCAN to iterate through all keys in the namespace
cursor = 0
while True:
cursor, keys = await redis.scan(
cursor, match=f"{self.final_namespace}:*", count=1000
)
if keys:
# Get all values in batch
pipe = redis.pipeline()
for key in keys:
pipe.get(key)
values = await pipe.execute()
# Check each document for matching file_path
for value in values:
if value:
try:
doc_data = json.loads(value)
if doc_data.get("file_path") == file_path:
return doc_data
except json.JSONDecodeError as e:
logger.error(
f"[{self.workspace}] JSON decode error in get_doc_by_file_path: {e}"
)
continue
if cursor == 0:
break
return None
except Exception as e:
logger.error(f"[{self.workspace}] Error in get_doc_by_file_path: {e}")
return None
async def drop(self) -> dict[str, str]:
"""Drop all document status data from storage and clean up resources"""
async with get_storage_lock():