This commit is contained in:
hzywhite 2025-09-02 06:15:29 +08:00
parent d8b2264d8b
commit 36c81039b1
3 changed files with 103 additions and 55 deletions

View file

@ -454,7 +454,7 @@ class DocStatusResponse(BaseModel):
scheme_name: str = Field(
default=None, description="Name of the processing scheme used for this document"
)
multimodal_content: Optional[list[dict[str, any]]] = Field(
multimodal_content: Optional[list[dict[str, Any]]] = Field(
default=None, description="Multimodal content of the document"
)
@ -870,7 +870,7 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str
async def pipeline_enqueue_file(
rag: LightRAG, file_path: Path, track_id: str = None
rag: LightRAG, file_path: Path, track_id: str = None, scheme_name: str = None
) -> tuple[bool, str]:
"""Add a file to the queue for processing
@ -1250,7 +1250,7 @@ async def pipeline_enqueue_file(
try:
await rag.apipeline_enqueue_documents(
content, file_paths=file_path.name, track_id=track_id
content, file_paths=file_path.name, track_id=track_id, scheme_name=scheme_name
)
logger.info(
@ -1334,7 +1334,7 @@ async def pipeline_enqueue_file(
logger.error(f"Error deleting file {file_path}: {str(e)}")
async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None):
async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None, scheme_name: str = None):
"""Index a file with track_id
Args:
@ -1344,7 +1344,7 @@ async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = No
"""
try:
success, returned_track_id = await pipeline_enqueue_file(
rag, file_path, track_id
rag, file_path, track_id, scheme_name
)
if success:
await rag.apipeline_process_enqueue_documents()
@ -1355,7 +1355,7 @@ async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = No
async def pipeline_index_files(
rag: LightRAG, file_paths: List[Path], track_id: str = None
rag: LightRAG, file_paths: List[Path], track_id: str = None, scheme_name: str = None
):
"""Index multiple files sequentially to avoid high CPU load
@ -1376,7 +1376,7 @@ async def pipeline_index_files(
# Process files sequentially with track_id
for file_path in sorted_file_paths:
success, _ = await pipeline_enqueue_file(rag, file_path, track_id)
success, _ = await pipeline_enqueue_file(rag, file_path, track_id, scheme_name)
if success:
enqueued = True
@ -1404,7 +1404,7 @@ async def pipeline_index_files_raganything(
Defaults to None.
Note:
- Uses RAGAnything's process_document_complete_with_multimodal_content method for each file
- Uses RAGAnything's process_document_complete_lightrag_api method for each file
- Supports multimodal content processing (images, tables, equations)
- Files are processed with "auto" parse method and "modelscope" source
- Output is saved to "./output" directory
@ -1422,11 +1422,10 @@ async def pipeline_index_files_raganything(
# Process files sequentially with track_id
for file_path in sorted_file_paths:
success = (
await rag_anything.process_document_complete_with_multimodal_content(
await rag_anything.process_document_complete_lightrag_api(
file_path=str(file_path),
output_dir="./output",
parse_method="auto",
source="modelscope",
scheme_name=scheme_name,
)
)
@ -2089,11 +2088,10 @@ def create_document_routes(
)
else:
background_tasks.add_task(
rag_anything.process_document_complete_with_multimodal_content,
rag_anything.process_document_complete_lightrag_api,
file_path=str(file_path),
output_dir="./output",
parse_method="auto",
source="modelscope",
scheme_name=current_framework,
)

View file

@ -709,7 +709,7 @@ class DocProcessingStatus:
"""Error message if failed"""
metadata: dict[str, Any] = field(default_factory=dict)
"""Additional metadata"""
multimodal_content: list[dict[str, any]] | None = None
multimodal_content: list[dict[str, Any]] | None = None
"""raganything: multimodal_content"""
multimodal_processed: bool | None = None
"""raganything: multimodal_processed"""

View file

@ -1,5 +1,6 @@
from __future__ import annotations
from re import A
import traceback
import asyncio
import configparser
@ -131,7 +132,7 @@ class LightRAG:
doc_status_storage: str = field(default="JsonDocStatusStorage")
"""Storage type for tracking document processing statuses."""
input_dir: str = field(default_factory=lambda: os.getenv("INPUT_DIR", "./input"))
input_dir: str = field(default_factory=lambda: os.getenv("INPUT_DIR", "./inputs"))
"""Directory containing input documents"""
# Workspace
@ -826,8 +827,8 @@ class LightRAG:
def insert(
self,
input: str | list[str],
multimodal_content: list[dict[str, any]]
| list[list[dict[str, any]]]
multimodal_content: list[dict[str, Any]]
| list[list[dict[str, Any]]]
| None = None,
split_by_character: str | None = None,
split_by_character_only: bool = False,
@ -868,8 +869,8 @@ class LightRAG:
async def ainsert(
self,
input: str | list[str],
multimodal_content: list[dict[str, any]]
| list[list[dict[str, any]]]
multimodal_content: list[dict[str, Any]]
| list[list[dict[str, Any]]]
| None = None,
split_by_character: str | None = None,
split_by_character_only: bool = False,
@ -882,7 +883,7 @@ class LightRAG:
Args:
input: Single document string or list of document strings
multimodal_content (list[dict[str, any]] | list[list[dict[str, any]]] | None, optional):
multimodal_content (list[dict[str, Any]] | list[list[dict[str, Any]]] | None, optional):
Multimodal content (images, tables, equations) associated with documents
split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
chunk_token_size, it will be split again by token size.
@ -929,6 +930,55 @@ class LightRAG:
return track_id
def move_file_to_enqueue(self, file_path):
try:
enqueued_dir = file_path.parent / "__enqueued__"
enqueued_dir.mkdir(exist_ok=True)
# Generate unique filename to avoid conflicts
unique_filename = self.get_unique_filename_in_enqueued(
enqueued_dir, file_path.name
)
target_path = enqueued_dir / unique_filename
# Move the file
file_path.rename(target_path)
logger.debug(
f"Moved file to enqueued directory: {file_path.name} -> {unique_filename}"
)
except Exception as move_error:
logger.error(
f"Failed to move file {file_path.name} to __enqueued__ directory: {move_error}"
)
# Don't affect the main function's success status
def get_unique_filename_in_enqueued(self, target_dir: Path, original_name: str) -> str:
from pathlib import Path
import time
original_path = Path(original_name)
base_name = original_path.stem
extension = original_path.suffix
# Try original name first
if not (target_dir / original_name).exists():
return original_name
# Try with numeric suffixes 001-999
for i in range(1, 1000):
suffix = f"{i:03d}"
new_name = f"{base_name}_{suffix}{extension}"
if not (target_dir / new_name).exists():
return new_name
# Fallback with timestamp if all 999 slots are taken
timestamp = int(time.time())
return f"{base_name}_{timestamp}{extension}"
# TODO: deprecated, use insert instead
def insert_custom_chunks(
self,
@ -1004,7 +1054,7 @@ class LightRAG:
async def apipeline_enqueue_documents(
self,
input: str | list[str],
multimodal_content: list[dict[str, any]] | None = None,
multimodal_content: list[dict[str, Any]] | None = None,
ids: list[str] | None = None,
file_paths: str | list[str] | None = None,
track_id: str | None = None,
@ -1020,7 +1070,7 @@ class LightRAG:
Args:
input: Single document string or list of document strings
multimodal_content (list[dict[str, any]] | list[list[dict[str, any]]] | None, optional):
multimodal_content (list[dict[str, Any]] | list[list[dict[str, Any]]] | None, optional):
Multimodal content (images, tables, equations) associated with documents
ids: list of unique document IDs, if not provided, MD5 hash IDs will be generated
file_paths: list of file paths corresponding to each document, used for citation
@ -2268,50 +2318,50 @@ class LightRAG:
async def aclean_parse_cache_by_doc_ids(
self, doc_ids: str | list[str]
) -> dict[str, Any]:
"""异步清理指定文档ID的parse_cache条目
"""Asynchronously clean parse_cache entries for specified document IDs
Args:
doc_ids: 单个文档ID字符串或文档ID列表
doc_ids: Single document ID string or list of document IDs
Returns:
包含清理结果的字典:
- deleted_entries: 已删除的缓存条目列表
- not_found: 未找到的文档ID列表
- error: 错误信息如果操作失败
Dictionary containing cleanup results:
- deleted_entries: List of deleted cache entries
- not_found: List of document IDs not found
- error: Error message (if operation fails)
"""
import json
from pathlib import Path
# 标准化输入为列表
# Normalize input to list
if isinstance(doc_ids, str):
doc_ids = [doc_ids]
result = {"deleted_entries": [], "not_found": [], "error": None}
try:
# 构建parse_cache文件路径使用类的存储位置变量
# Build parse_cache file path using class storage location variables
if self.workspace:
# 如果有workspace使用workspace子目录
# If workspace exists, use workspace subdirectory
cache_file_path = (
Path(self.working_dir)
/ self.workspace
/ "kv_store_parse_cache.json"
)
else:
# 默认使用working_dir
# Default to using working_dir
cache_file_path = Path(self.working_dir) / "kv_store_parse_cache.json"
# 检查parse_cache文件是否存在
# Check if parse_cache file exists
if not cache_file_path.exists():
logger.warning(f"Parse cache文件未找到: {cache_file_path}")
logger.warning(f"Parse cache file not found: {cache_file_path}")
result["not_found"] = doc_ids.copy()
return result
# 读取当前的parse_cache数据
# Read current parse_cache data
with open(cache_file_path, "r", encoding="utf-8") as f:
cache_data = json.load(f)
# 查找需要删除的条目并记录找到的doc_ids
# Find entries to delete and record found doc_ids
entries_to_delete = []
doc_ids_set = set(doc_ids)
found_doc_ids = set()
@ -2325,47 +2375,47 @@ class LightRAG:
result["deleted_entries"].append(cache_key)
found_doc_ids.add(cache_entry.get("doc_id"))
# 删除找到的条目
# Delete found entries
for cache_key in entries_to_delete:
del cache_data[cache_key]
# 找出未找到的doc_ids
# Find doc_ids not found
result["not_found"] = list(doc_ids_set - found_doc_ids)
# 写回更新后的缓存数据
# Write back updated cache data
with open(cache_file_path, "w", encoding="utf-8") as f:
json.dump(cache_data, f, indent=2, ensure_ascii=False)
logger.info(
f"已删除 {len(entries_to_delete)} 个parse_cache条目文档ID: {doc_ids}"
f"Deleted {len(entries_to_delete)} parse_cache entries, document IDs: {doc_ids}"
)
except Exception as e:
error_msg = f"清理parse_cache时出错: {str(e)}"
error_msg = f"Error cleaning parse_cache: {str(e)}"
logger.error(error_msg)
result["error"] = error_msg
return result
def clean_parse_cache_by_doc_ids(self, doc_ids: str | list[str]) -> dict[str, Any]:
"""同步清理指定文档ID的parse_cache条目
"""Synchronously clean parse_cache entries for specified document IDs
Args:
doc_ids: 单个文档ID字符串或文档ID列表
doc_ids: Single document ID string or list of document IDs
Returns:
包含清理结果的字典
Dictionary containing cleanup results
"""
loop = always_get_an_event_loop()
return loop.run_until_complete(self.aclean_parse_cache_by_doc_ids(doc_ids))
async def aclean_all_parse_cache(self) -> dict[str, Any]:
"""异步清理所有parse_cache条目
"""Asynchronously clean all parse_cache entries
Returns:
包含清理结果的字典:
- deleted_count: 删除的条目数量
- error: 错误信息如果操作失败
Dictionary containing cleanup results:
- deleted_count: Number of deleted entries
- error: Error message (if operation fails)
"""
import json
from pathlib import Path
@ -2373,7 +2423,7 @@ class LightRAG:
result = {"deleted_count": 0, "error": None}
try:
# 构建parse_cache文件路径
# Build parse_cache file path
if self.workspace:
cache_file_path = (
Path(self.working_dir)
@ -2384,33 +2434,33 @@ class LightRAG:
cache_file_path = Path(self.working_dir) / "kv_store_parse_cache.json"
if not cache_file_path.exists():
logger.warning(f"Parse cache文件未找到: {cache_file_path}")
logger.warning(f"Parse cache file not found: {cache_file_path}")
return result
# 读取当前缓存以统计条目数量
# Read current cache to count entries
with open(cache_file_path, "r", encoding="utf-8") as f:
cache_data = json.load(f)
result["deleted_count"] = len(cache_data)
# 清空所有条目
# Clear all entries
with open(cache_file_path, "w", encoding="utf-8") as f:
json.dump({}, f, indent=2)
logger.info(f"已清空所有 {result['deleted_count']} 个parse_cache条目")
logger.info(f"Cleared all {result['deleted_count']} parse_cache entries")
except Exception as e:
error_msg = f"清空parse_cache时出错: {str(e)}"
error_msg = f"Error clearing parse_cache: {str(e)}"
logger.error(error_msg)
result["error"] = error_msg
return result
def clean_all_parse_cache(self) -> dict[str, Any]:
"""同步清理所有parse_cache条目
"""Synchronously clean all parse_cache entries
Returns:
包含清理结果的字典
Dictionary containing cleanup results
"""
loop = always_get_an_event_loop()
return loop.run_until_complete(self.aclean_all_parse_cache())