diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index d08b5ae9..4c97b978 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -454,7 +454,7 @@ class DocStatusResponse(BaseModel): scheme_name: str = Field( default=None, description="Name of the processing scheme used for this document" ) - multimodal_content: Optional[list[dict[str, any]]] = Field( + multimodal_content: Optional[list[dict[str, Any]]] = Field( default=None, description="Multimodal content of the document" ) @@ -870,7 +870,7 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str async def pipeline_enqueue_file( - rag: LightRAG, file_path: Path, track_id: str = None + rag: LightRAG, file_path: Path, track_id: str = None, scheme_name: str = None ) -> tuple[bool, str]: """Add a file to the queue for processing @@ -1250,7 +1250,7 @@ async def pipeline_enqueue_file( try: await rag.apipeline_enqueue_documents( - content, file_paths=file_path.name, track_id=track_id + content, file_paths=file_path.name, track_id=track_id, scheme_name=scheme_name ) logger.info( @@ -1334,7 +1334,7 @@ async def pipeline_enqueue_file( logger.error(f"Error deleting file {file_path}: {str(e)}") -async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None): +async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None, scheme_name: str = None): """Index a file with track_id Args: @@ -1344,7 +1344,7 @@ async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = No """ try: success, returned_track_id = await pipeline_enqueue_file( - rag, file_path, track_id + rag, file_path, track_id, scheme_name ) if success: await rag.apipeline_process_enqueue_documents() @@ -1355,7 +1355,7 @@ async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = No async def pipeline_index_files( - rag: LightRAG, file_paths: List[Path], track_id: str = None + rag: LightRAG, file_paths: List[Path], track_id: str = None, scheme_name: str = None ): """Index multiple files sequentially to avoid high CPU load @@ -1376,7 +1376,7 @@ async def pipeline_index_files( # Process files sequentially with track_id for file_path in sorted_file_paths: - success, _ = await pipeline_enqueue_file(rag, file_path, track_id) + success, _ = await pipeline_enqueue_file(rag, file_path, track_id, scheme_name) if success: enqueued = True @@ -1404,7 +1404,7 @@ async def pipeline_index_files_raganything( Defaults to None. Note: - - Uses RAGAnything's process_document_complete_with_multimodal_content method for each file + - Uses RAGAnything's process_document_complete_lightrag_api method for each file - Supports multimodal content processing (images, tables, equations) - Files are processed with "auto" parse method and "modelscope" source - Output is saved to "./output" directory @@ -1422,11 +1422,10 @@ async def pipeline_index_files_raganything( # Process files sequentially with track_id for file_path in sorted_file_paths: success = ( - await rag_anything.process_document_complete_with_multimodal_content( + await rag_anything.process_document_complete_lightrag_api( file_path=str(file_path), output_dir="./output", parse_method="auto", - source="modelscope", scheme_name=scheme_name, ) ) @@ -2089,11 +2088,10 @@ def create_document_routes( ) else: background_tasks.add_task( - rag_anything.process_document_complete_with_multimodal_content, + rag_anything.process_document_complete_lightrag_api, file_path=str(file_path), output_dir="./output", parse_method="auto", - source="modelscope", scheme_name=current_framework, ) diff --git a/lightrag/base.py b/lightrag/base.py index be55f457..55a7c0cc 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -709,7 +709,7 @@ class DocProcessingStatus: """Error message if failed""" metadata: dict[str, Any] = field(default_factory=dict) """Additional metadata""" - multimodal_content: list[dict[str, any]] | None = None + multimodal_content: list[dict[str, Any]] | None = None """raganything: multimodal_content""" multimodal_processed: bool | None = None """raganything: multimodal_processed""" diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 0d3fb3da..e83bdf0b 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1,5 +1,6 @@ from __future__ import annotations +from re import A import traceback import asyncio import configparser @@ -131,7 +132,7 @@ class LightRAG: doc_status_storage: str = field(default="JsonDocStatusStorage") """Storage type for tracking document processing statuses.""" - input_dir: str = field(default_factory=lambda: os.getenv("INPUT_DIR", "./input")) + input_dir: str = field(default_factory=lambda: os.getenv("INPUT_DIR", "./inputs")) """Directory containing input documents""" # Workspace @@ -826,8 +827,8 @@ class LightRAG: def insert( self, input: str | list[str], - multimodal_content: list[dict[str, any]] - | list[list[dict[str, any]]] + multimodal_content: list[dict[str, Any]] + | list[list[dict[str, Any]]] | None = None, split_by_character: str | None = None, split_by_character_only: bool = False, @@ -868,8 +869,8 @@ class LightRAG: async def ainsert( self, input: str | list[str], - multimodal_content: list[dict[str, any]] - | list[list[dict[str, any]]] + multimodal_content: list[dict[str, Any]] + | list[list[dict[str, Any]]] | None = None, split_by_character: str | None = None, split_by_character_only: bool = False, @@ -882,7 +883,7 @@ class LightRAG: Args: input: Single document string or list of document strings - multimodal_content (list[dict[str, any]] | list[list[dict[str, any]]] | None, optional): + multimodal_content (list[dict[str, Any]] | list[list[dict[str, Any]]] | None, optional): Multimodal content (images, tables, equations) associated with documents split_by_character: if split_by_character is not None, split the string by character, if chunk longer than chunk_token_size, it will be split again by token size. @@ -929,6 +930,55 @@ class LightRAG: return track_id + + def move_file_to_enqueue(self, file_path): + + try: + enqueued_dir = file_path.parent / "__enqueued__" + enqueued_dir.mkdir(exist_ok=True) + + # Generate unique filename to avoid conflicts + unique_filename = self.get_unique_filename_in_enqueued( + enqueued_dir, file_path.name + ) + target_path = enqueued_dir / unique_filename + + # Move the file + file_path.rename(target_path) + logger.debug( + f"Moved file to enqueued directory: {file_path.name} -> {unique_filename}" + ) + + except Exception as move_error: + logger.error( + f"Failed to move file {file_path.name} to __enqueued__ directory: {move_error}" + ) + # Don't affect the main function's success status + + + def get_unique_filename_in_enqueued(self, target_dir: Path, original_name: str) -> str: + from pathlib import Path + import time + + original_path = Path(original_name) + base_name = original_path.stem + extension = original_path.suffix + + # Try original name first + if not (target_dir / original_name).exists(): + return original_name + + # Try with numeric suffixes 001-999 + for i in range(1, 1000): + suffix = f"{i:03d}" + new_name = f"{base_name}_{suffix}{extension}" + if not (target_dir / new_name).exists(): + return new_name + + # Fallback with timestamp if all 999 slots are taken + timestamp = int(time.time()) + return f"{base_name}_{timestamp}{extension}" + # TODO: deprecated, use insert instead def insert_custom_chunks( self, @@ -1004,7 +1054,7 @@ class LightRAG: async def apipeline_enqueue_documents( self, input: str | list[str], - multimodal_content: list[dict[str, any]] | None = None, + multimodal_content: list[dict[str, Any]] | None = None, ids: list[str] | None = None, file_paths: str | list[str] | None = None, track_id: str | None = None, @@ -1020,7 +1070,7 @@ class LightRAG: Args: input: Single document string or list of document strings - multimodal_content (list[dict[str, any]] | list[list[dict[str, any]]] | None, optional): + multimodal_content (list[dict[str, Any]] | list[list[dict[str, Any]]] | None, optional): Multimodal content (images, tables, equations) associated with documents ids: list of unique document IDs, if not provided, MD5 hash IDs will be generated file_paths: list of file paths corresponding to each document, used for citation @@ -2268,50 +2318,50 @@ class LightRAG: async def aclean_parse_cache_by_doc_ids( self, doc_ids: str | list[str] ) -> dict[str, Any]: - """异步清理指定文档ID的parse_cache条目 + """Asynchronously clean parse_cache entries for specified document IDs Args: - doc_ids: 单个文档ID字符串或文档ID列表 + doc_ids: Single document ID string or list of document IDs Returns: - 包含清理结果的字典: - - deleted_entries: 已删除的缓存条目列表 - - not_found: 未找到的文档ID列表 - - error: 错误信息(如果操作失败) + Dictionary containing cleanup results: + - deleted_entries: List of deleted cache entries + - not_found: List of document IDs not found + - error: Error message (if operation fails) """ import json from pathlib import Path - # 标准化输入为列表 + # Normalize input to list if isinstance(doc_ids, str): doc_ids = [doc_ids] result = {"deleted_entries": [], "not_found": [], "error": None} try: - # 构建parse_cache文件路径,使用类的存储位置变量 + # Build parse_cache file path using class storage location variables if self.workspace: - # 如果有workspace,使用workspace子目录 + # If workspace exists, use workspace subdirectory cache_file_path = ( Path(self.working_dir) / self.workspace / "kv_store_parse_cache.json" ) else: - # 默认使用working_dir + # Default to using working_dir cache_file_path = Path(self.working_dir) / "kv_store_parse_cache.json" - # 检查parse_cache文件是否存在 + # Check if parse_cache file exists if not cache_file_path.exists(): - logger.warning(f"Parse cache文件未找到: {cache_file_path}") + logger.warning(f"Parse cache file not found: {cache_file_path}") result["not_found"] = doc_ids.copy() return result - # 读取当前的parse_cache数据 + # Read current parse_cache data with open(cache_file_path, "r", encoding="utf-8") as f: cache_data = json.load(f) - # 查找需要删除的条目并记录找到的doc_ids + # Find entries to delete and record found doc_ids entries_to_delete = [] doc_ids_set = set(doc_ids) found_doc_ids = set() @@ -2325,47 +2375,47 @@ class LightRAG: result["deleted_entries"].append(cache_key) found_doc_ids.add(cache_entry.get("doc_id")) - # 删除找到的条目 + # Delete found entries for cache_key in entries_to_delete: del cache_data[cache_key] - # 找出未找到的doc_ids + # Find doc_ids not found result["not_found"] = list(doc_ids_set - found_doc_ids) - # 写回更新后的缓存数据 + # Write back updated cache data with open(cache_file_path, "w", encoding="utf-8") as f: json.dump(cache_data, f, indent=2, ensure_ascii=False) logger.info( - f"已删除 {len(entries_to_delete)} 个parse_cache条目,文档ID: {doc_ids}" + f"Deleted {len(entries_to_delete)} parse_cache entries, document IDs: {doc_ids}" ) except Exception as e: - error_msg = f"清理parse_cache时出错: {str(e)}" + error_msg = f"Error cleaning parse_cache: {str(e)}" logger.error(error_msg) result["error"] = error_msg return result def clean_parse_cache_by_doc_ids(self, doc_ids: str | list[str]) -> dict[str, Any]: - """同步清理指定文档ID的parse_cache条目 + """Synchronously clean parse_cache entries for specified document IDs Args: - doc_ids: 单个文档ID字符串或文档ID列表 + doc_ids: Single document ID string or list of document IDs Returns: - 包含清理结果的字典 + Dictionary containing cleanup results """ loop = always_get_an_event_loop() return loop.run_until_complete(self.aclean_parse_cache_by_doc_ids(doc_ids)) async def aclean_all_parse_cache(self) -> dict[str, Any]: - """异步清理所有parse_cache条目 + """Asynchronously clean all parse_cache entries Returns: - 包含清理结果的字典: - - deleted_count: 删除的条目数量 - - error: 错误信息(如果操作失败) + Dictionary containing cleanup results: + - deleted_count: Number of deleted entries + - error: Error message (if operation fails) """ import json from pathlib import Path @@ -2373,7 +2423,7 @@ class LightRAG: result = {"deleted_count": 0, "error": None} try: - # 构建parse_cache文件路径 + # Build parse_cache file path if self.workspace: cache_file_path = ( Path(self.working_dir) @@ -2384,33 +2434,33 @@ class LightRAG: cache_file_path = Path(self.working_dir) / "kv_store_parse_cache.json" if not cache_file_path.exists(): - logger.warning(f"Parse cache文件未找到: {cache_file_path}") + logger.warning(f"Parse cache file not found: {cache_file_path}") return result - # 读取当前缓存以统计条目数量 + # Read current cache to count entries with open(cache_file_path, "r", encoding="utf-8") as f: cache_data = json.load(f) result["deleted_count"] = len(cache_data) - # 清空所有条目 + # Clear all entries with open(cache_file_path, "w", encoding="utf-8") as f: json.dump({}, f, indent=2) - logger.info(f"已清空所有 {result['deleted_count']} 个parse_cache条目") + logger.info(f"Cleared all {result['deleted_count']} parse_cache entries") except Exception as e: - error_msg = f"清空parse_cache时出错: {str(e)}" + error_msg = f"Error clearing parse_cache: {str(e)}" logger.error(error_msg) result["error"] = error_msg return result def clean_all_parse_cache(self) -> dict[str, Any]: - """同步清理所有parse_cache条目 + """Synchronously clean all parse_cache entries Returns: - 包含清理结果的字典 + Dictionary containing cleanup results """ loop = always_get_an_event_loop() return loop.run_until_complete(self.aclean_all_parse_cache())