From 091f2b42c3e64861aaf0299c5a1de422779bfccb Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 3 Aug 2025 09:19:02 +0800 Subject: [PATCH 01/11] feat(performance): Optimize document deletion with entity/relation index - Introduces an index mapping documents to their corresponding entities and relations. This significantly speeds up `adelete_by_doc_id` by replacing slow graph traversal with a fast key-value lookup. - Refactors the ingestion pipeline (`merge_nodes_and_edges`) to populate this new index. Adds a one-time data migration script to backfill the index for existing data. --- lightrag/base.py | 16 +++ lightrag/kg/networkx_impl.py | 29 ++++ lightrag/lightrag.py | 249 ++++++++++++++++++++++++++++++++--- lightrag/namespace.py | 2 + lightrag/operate.py | 219 ++++++++++++++++++++++-------- 5 files changed, 446 insertions(+), 69 deletions(-) diff --git a/lightrag/base.py b/lightrag/base.py index 9a5fbeb6..9f82c6eb 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -654,6 +654,22 @@ class BaseGraphStorage(StorageNameSpace, ABC): indicating whether the graph was truncated due to max_nodes limit """ + @abstractmethod + async def get_all_nodes(self) -> list[dict]: + """Get all nodes in the graph. + + Returns: + A list of all nodes, where each node is a dictionary of its properties + """ + + @abstractmethod + async def get_all_edges(self) -> list[dict]: + """Get all edges in the graph. + + Returns: + A list of all edges, where each edge is a dictionary of its properties + """ + class DocStatus(str, Enum): """Document processing status""" diff --git a/lightrag/kg/networkx_impl.py b/lightrag/kg/networkx_impl.py index eeca53ed..f6f77597 100644 --- a/lightrag/kg/networkx_impl.py +++ b/lightrag/kg/networkx_impl.py @@ -393,6 +393,35 @@ class NetworkXStorage(BaseGraphStorage): matching_edges.append(edge_data_with_nodes) return matching_edges + async def get_all_nodes(self) -> list[dict]: + """Get all nodes in the graph. + + Returns: + A list of all nodes, where each node is a dictionary of its properties + """ + graph = await self._get_graph() + all_nodes = [] + for node_id, node_data in graph.nodes(data=True): + node_data_with_id = node_data.copy() + node_data_with_id["id"] = node_id + all_nodes.append(node_data_with_id) + return all_nodes + + async def get_all_edges(self) -> list[dict]: + """Get all edges in the graph. + + Returns: + A list of all edges, where each edge is a dictionary of its properties + """ + graph = await self._get_graph() + all_edges = [] + for u, v, edge_data in graph.edges(data=True): + edge_data_with_nodes = edge_data.copy() + edge_data_with_nodes["source"] = u + edge_data_with_nodes["target"] = v + all_edges.append(edge_data_with_nodes) + return all_edges + async def index_done_callback(self) -> bool: """Save data to disk""" async with self._storage_lock: diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ae42e5df..73386b0b 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -453,14 +453,26 @@ class LightRAG: embedding_func=self.embedding_func, ) + self.text_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore + namespace=NameSpace.KV_STORE_TEXT_CHUNKS, + workspace=self.workspace, + embedding_func=self.embedding_func, + ) + self.full_docs: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore namespace=NameSpace.KV_STORE_FULL_DOCS, workspace=self.workspace, embedding_func=self.embedding_func, ) - self.text_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore - namespace=NameSpace.KV_STORE_TEXT_CHUNKS, + self.full_entities: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore + namespace=NameSpace.KV_STORE_FULL_ENTITIES, + workspace=self.workspace, + embedding_func=self.embedding_func, + ) + + self.full_relations: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore + namespace=NameSpace.KV_STORE_FULL_RELATIONS, workspace=self.workspace, embedding_func=self.embedding_func, ) @@ -553,6 +565,8 @@ class LightRAG: for storage in ( self.full_docs, self.text_chunks, + self.full_entities, + self.full_relations, self.entities_vdb, self.relationships_vdb, self.chunks_vdb, @@ -576,6 +590,8 @@ class LightRAG: for storage in ( self.full_docs, self.text_chunks, + self.full_entities, + self.full_relations, self.entities_vdb, self.relationships_vdb, self.chunks_vdb, @@ -591,6 +607,159 @@ class LightRAG: self._storages_status = StoragesStatus.FINALIZED logger.debug("Finalized Storages") + async def _check_and_migrate_data(self): + """Check if data migration is needed and perform migration if necessary""" + try: + # Check if migration is needed: + # 1. chunk_entity_relation_graph has entities and relations (count > 0) + # 2. full_entities and full_relations are empty + + # Get all entity labels from graph + all_entity_labels = await self.chunk_entity_relation_graph.get_all_labels() + + if not all_entity_labels: + logger.debug("No entities found in graph, skipping migration check") + return + + # Check if full_entities and full_relations are empty + # Get all processed documents to check their entity/relation data + try: + processed_docs = await self.doc_status.get_docs_by_status( + DocStatus.PROCESSED + ) + + if not processed_docs: + logger.debug("No processed documents found, skipping migration") + return + + # Check first few documents to see if they have full_entities/full_relations data + migration_needed = True + checked_count = 0 + max_check = min(5, len(processed_docs)) # Check up to 5 documents + + for doc_id in list(processed_docs.keys())[:max_check]: + checked_count += 1 + entity_data = await self.full_entities.get_by_id(doc_id) + relation_data = await self.full_relations.get_by_id(doc_id) + + if entity_data or relation_data: + migration_needed = False + break + + if not migration_needed: + logger.debug( + "Full entities/relations data already exists, no migration needed" + ) + return + + logger.info( + f"Data migration needed: found {len(all_entity_labels)} entities in graph but no full_entities/full_relations data" + ) + + # Perform migration + await self._migrate_entity_relation_data(processed_docs) + + except Exception as e: + logger.error(f"Error during migration check: {e}") + # Don't raise the error, just log it to avoid breaking initialization + + except Exception as e: + logger.error(f"Error in data migration check: {e}") + # Don't raise the error to avoid breaking initialization + + async def _migrate_entity_relation_data(self, processed_docs: dict): + """Migrate existing entity and relation data to full_entities and full_relations storage""" + logger.info(f"Starting data migration for {len(processed_docs)} documents") + + # Create mapping from chunk_id to doc_id + chunk_to_doc = {} + for doc_id, doc_status in processed_docs.items(): + chunk_ids = ( + doc_status.chunks_list + if hasattr(doc_status, "chunks_list") and doc_status.chunks_list + else [] + ) + for chunk_id in chunk_ids: + chunk_to_doc[chunk_id] = doc_id + + # Initialize document entity and relation mappings + doc_entities = {} # doc_id -> set of entity_names + doc_relations = {} # doc_id -> set of relation_pairs (as tuples) + + # Get all nodes and edges from graph + all_nodes = await self.chunk_entity_relation_graph.get_all_nodes() + all_edges = await self.chunk_entity_relation_graph.get_all_edges() + + # Process all nodes once + for node in all_nodes: + if "source_id" in node: + entity_id = node.get("entity_id") or node.get("id") + if not entity_id: + continue + + # Get chunk IDs from source_id + source_ids = node["source_id"].split(GRAPH_FIELD_SEP) + + # Find which documents this entity belongs to + for chunk_id in source_ids: + doc_id = chunk_to_doc.get(chunk_id) + if doc_id: + if doc_id not in doc_entities: + doc_entities[doc_id] = set() + doc_entities[doc_id].add(entity_id) + + # Process all edges once + for edge in all_edges: + if "source_id" in edge: + src = edge.get("source") + tgt = edge.get("target") + if not src or not tgt: + continue + + # Get chunk IDs from source_id + source_ids = edge["source_id"].split(GRAPH_FIELD_SEP) + + # Find which documents this relation belongs to + for chunk_id in source_ids: + doc_id = chunk_to_doc.get(chunk_id) + if doc_id: + if doc_id not in doc_relations: + doc_relations[doc_id] = set() + # Use tuple for set operations, convert to list later + doc_relations[doc_id].add((src, tgt)) + + # Store the results in full_entities and full_relations + migration_count = 0 + + # Store entities + if doc_entities: + entities_data = {} + for doc_id, entity_set in doc_entities.items(): + entities_data[doc_id] = {"entity_names": list(entity_set)} + await self.full_entities.upsert(entities_data) + + # Store relations + if doc_relations: + relations_data = {} + for doc_id, relation_set in doc_relations.items(): + # Convert tuples back to lists + relations_data[doc_id] = { + "relation_pairs": [list(pair) for pair in relation_set] + } + await self.full_relations.upsert(relations_data) + + migration_count = len( + set(list(doc_entities.keys()) + list(doc_relations.keys())) + ) + + # Persist the migrated data + await self.full_entities.index_done_callback() + await self.full_relations.index_done_callback() + + logger.info( + f"Data migration completed: migrated {migration_count} documents with entities/relations" + ) + async def get_graph_labels(self): text = await self.chunk_entity_relation_graph.get_all_labels() return text @@ -1229,6 +1398,9 @@ class LightRAG: entity_vdb=self.entities_vdb, relationships_vdb=self.relationships_vdb, global_config=asdict(self), + full_entities_storage=self.full_entities, + full_relations_storage=self.full_relations, + doc_id=doc_id, pipeline_status=pipeline_status, pipeline_status_lock=pipeline_status_lock, llm_response_cache=self.llm_response_cache, @@ -1401,6 +1573,8 @@ class LightRAG: self.full_docs, self.doc_status, self.text_chunks, + self.full_entities, + self.full_relations, self.llm_response_cache, self.entities_vdb, self.relationships_vdb, @@ -1959,21 +2133,54 @@ class LightRAG: graph_db_lock = get_graph_db_lock(enable_logging=False) async with graph_db_lock: try: - # Get all affected nodes and edges in batch - # logger.info( - # f"Analyzing affected entities and relationships for {len(chunk_ids)} chunks" - # ) - affected_nodes = ( - await self.chunk_entity_relation_graph.get_nodes_by_chunk_ids( - list(chunk_ids) - ) - ) + # Get affected entities and relations from full_entities and full_relations storage + doc_entities_data = await self.full_entities.get_by_id(doc_id) + doc_relations_data = await self.full_relations.get_by_id(doc_id) - affected_edges = ( - await self.chunk_entity_relation_graph.get_edges_by_chunk_ids( - list(chunk_ids) + affected_nodes = [] + affected_edges = [] + + # Get entity data from graph storage using entity names from full_entities + if doc_entities_data and "entity_names" in doc_entities_data: + entity_names = doc_entities_data["entity_names"] + # get_nodes_batch returns dict[str, dict], need to convert to list[dict] + nodes_dict = ( + await self.chunk_entity_relation_graph.get_nodes_batch( + entity_names + ) ) - ) + for entity_name in entity_names: + node_data = nodes_dict.get(entity_name) + if node_data: + # Ensure compatibility with existing logic that expects "id" field + if "id" not in node_data: + node_data["id"] = entity_name + affected_nodes.append(node_data) + + # Get relation data from graph storage using relation pairs from full_relations + if doc_relations_data and "relation_pairs" in doc_relations_data: + relation_pairs = doc_relations_data["relation_pairs"] + edge_pairs_dicts = [ + {"src": pair[0], "tgt": pair[1]} for pair in relation_pairs + ] + # get_edges_batch returns dict[tuple[str, str], dict], need to convert to list[dict] + edges_dict = ( + await self.chunk_entity_relation_graph.get_edges_batch( + edge_pairs_dicts + ) + ) + + for pair in relation_pairs: + src, tgt = pair[0], pair[1] + edge_key = (src, tgt) + edge_data = edges_dict.get(edge_key) + if edge_data: + # Ensure compatibility with existing logic that expects "source" and "target" fields + if "source" not in edge_data: + edge_data["source"] = src + if "target" not in edge_data: + edge_data["target"] = tgt + affected_edges.append(edge_data) except Exception as e: logger.error(f"Failed to analyze affected graph elements: {e}") @@ -2125,7 +2332,17 @@ class LightRAG: f"Failed to rebuild knowledge graph: {e}" ) from e - # 9. Delete original document and status + # 9. Delete from full_entities and full_relations storage + try: + await self.full_entities.delete([doc_id]) + await self.full_relations.delete([doc_id]) + except Exception as e: + logger.error(f"Failed to delete from full_entities/full_relations: {e}") + raise Exception( + f"Failed to delete from full_entities/full_relations: {e}" + ) from e + + # 10. Delete original document and status try: await self.full_docs.delete([doc_id]) await self.doc_status.delete([doc_id]) diff --git a/lightrag/namespace.py b/lightrag/namespace.py index 657d65ac..5c042713 100644 --- a/lightrag/namespace.py +++ b/lightrag/namespace.py @@ -7,6 +7,8 @@ class NameSpace: KV_STORE_FULL_DOCS = "full_docs" KV_STORE_TEXT_CHUNKS = "text_chunks" KV_STORE_LLM_RESPONSE_CACHE = "llm_response_cache" + KV_STORE_FULL_ENTITIES = "full_entities" + KV_STORE_FULL_RELATIONS = "full_relations" VECTOR_STORE_ENTITIES = "entities" VECTOR_STORE_RELATIONSHIPS = "relationships" diff --git a/lightrag/operate.py b/lightrag/operate.py index 45153bc5..ca21881b 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -504,9 +504,6 @@ async def _rebuild_knowledge_from_chunks( # Re-raise the exception to notify the caller raise task.exception() - # If all tasks completed successfully, collect results - # (No need to collect results since these tasks don't return values) - # Final status report status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships rebuilt successfully." if failed_entities_count > 0 or failed_relationships_count > 0: @@ -1024,6 +1021,7 @@ async def _merge_edges_then_upsert( pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, + added_entities: list = None, # New parameter to track entities added during edge processing ): if src_id == tgt_id: return None @@ -1105,17 +1103,27 @@ async def _merge_edges_then_upsert( for need_insert_id in [src_id, tgt_id]: if not (await knowledge_graph_inst.has_node(need_insert_id)): - await knowledge_graph_inst.upsert_node( - need_insert_id, - node_data={ - "entity_id": need_insert_id, - "source_id": source_id, - "description": description, + node_data = { + "entity_id": need_insert_id, + "source_id": source_id, + "description": description, + "entity_type": "UNKNOWN", + "file_path": file_path, + "created_at": int(time.time()), + } + await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data) + + # Track entities added during edge processing + if added_entities is not None: + entity_data = { + "entity_name": need_insert_id, "entity_type": "UNKNOWN", + "description": description, + "source_id": source_id, "file_path": file_path, "created_at": int(time.time()), - }, - ) + } + added_entities.append(entity_data) force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] @@ -1178,6 +1186,9 @@ async def merge_nodes_and_edges( entity_vdb: BaseVectorStorage, relationships_vdb: BaseVectorStorage, global_config: dict[str, str], + full_entities_storage: BaseKVStorage = None, + full_relations_storage: BaseKVStorage = None, + doc_id: str = None, pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, @@ -1185,7 +1196,12 @@ async def merge_nodes_and_edges( total_files: int = 0, file_path: str = "unknown_source", ) -> None: - """Merge nodes and edges from extraction results + """Two-phase merge: process all entities first, then all relationships + + This approach ensures data consistency by: + 1. Phase 1: Process all entities concurrently + 2. Phase 2: Process all relationships concurrently (may add missing entities) + 3. Phase 3: Update full_entities and full_relations storage with final results Args: chunk_results: List of tuples (maybe_nodes, maybe_edges) containing extracted entities and relationships @@ -1193,9 +1209,15 @@ async def merge_nodes_and_edges( entity_vdb: Entity vector database relationships_vdb: Relationship vector database global_config: Global configuration + full_entities_storage: Storage for document entity lists + full_relations_storage: Storage for document relation lists + doc_id: Document ID for storage indexing pipeline_status: Pipeline status dictionary pipeline_status_lock: Lock for pipeline status llm_response_cache: LLM response cache + current_file_number: Current file number for logging + total_files: Total files for logging + file_path: File path for logging """ # Collect all nodes and edges from all chunks @@ -1212,11 +1234,9 @@ async def merge_nodes_and_edges( sorted_edge_key = tuple(sorted(edge_key)) all_edges[sorted_edge_key].extend(edges) - # Centralized processing of all nodes and edges total_entities_count = len(all_nodes) total_relations_count = len(all_edges) - # Merge nodes and edges log_message = f"Merging stage {current_file_number}/{total_files}: {file_path}" logger.info(log_message) async with pipeline_status_lock: @@ -1227,8 +1247,8 @@ async def merge_nodes_and_edges( graph_max_async = global_config.get("llm_model_max_async", 4) * 2 semaphore = asyncio.Semaphore(graph_max_async) - # Process and update all entities and relationships in parallel - log_message = f"Processing: {total_entities_count} entities and {total_relations_count} relations (async: {graph_max_async})" + # ===== Phase 1: Process all entities concurrently ===== + log_message = f"Phase 1: Processing {total_entities_count} entities (async: {graph_max_async})" logger.info(log_message) async with pipeline_status_lock: pipeline_status["latest_message"] = log_message @@ -1263,18 +1283,53 @@ async def merge_nodes_and_edges( await entity_vdb.upsert(data_for_vdb) return entity_data + # Create entity processing tasks + entity_tasks = [] + for entity_name, entities in all_nodes.items(): + task = asyncio.create_task(_locked_process_entity_name(entity_name, entities)) + entity_tasks.append(task) + + # Execute entity tasks with error handling + processed_entities = [] + if entity_tasks: + done, pending = await asyncio.wait( + entity_tasks, return_when=asyncio.FIRST_EXCEPTION + ) + + # Check if any task raised an exception + for task in done: + if task.exception(): + # If a task failed, cancel all pending tasks + for pending_task in pending: + pending_task.cancel() + # Wait for cancellation to complete + if pending: + await asyncio.wait(pending) + # Re-raise the exception to notify the caller + raise task.exception() + + # If all tasks completed successfully, collect results + processed_entities = [task.result() for task in entity_tasks] + + # ===== Phase 2: Process all relationships concurrently ===== + log_message = f"Phase 2: Processing {total_relations_count} relations (async: {graph_max_async})" + logger.info(log_message) + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + async def _locked_process_edges(edge_key, edges): async with semaphore: workspace = global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" - # Sort the edge_key components to ensure consistent lock key generation sorted_edge_key = sorted([edge_key[0], edge_key[1]]) - # logger.info(f"Processing edge: {sorted_edge_key[0]} - {sorted_edge_key[1]}") + async with get_storage_keyed_lock( sorted_edge_key, namespace=namespace, enable_logging=False, ): + added_entities = [] # Track entities added during edge processing edge_data = await _merge_edges_then_upsert( edge_key[0], edge_key[1], @@ -1284,9 +1339,11 @@ async def merge_nodes_and_edges( pipeline_status, pipeline_status_lock, llm_response_cache, + added_entities, # Pass list to collect added entities ) + if edge_data is None: - return None + return None, [] if relationships_vdb is not None: data_for_vdb = { @@ -1303,50 +1360,106 @@ async def merge_nodes_and_edges( } } await relationships_vdb.upsert(data_for_vdb) - return edge_data + return edge_data, added_entities - # Create a single task queue for both entities and edges - tasks = [] + # Create relationship processing tasks + edge_tasks = [] + for edge_key, edges in all_edges.items(): + task = asyncio.create_task(_locked_process_edges(edge_key, edges)) + edge_tasks.append(task) - # Add entity processing tasks - for entity_name, entities in all_nodes.items(): - tasks.append( - asyncio.create_task(_locked_process_entity_name(entity_name, entities)) + # Execute relationship tasks with error handling + processed_edges = [] + all_added_entities = [] + + if edge_tasks: + done, pending = await asyncio.wait( + edge_tasks, return_when=asyncio.FIRST_EXCEPTION ) - # Add edge processing tasks - for edge_key, edges in all_edges.items(): - tasks.append(asyncio.create_task(_locked_process_edges(edge_key, edges))) + # Check if any task raised an exception + for task in done: + if task.exception(): + # If a task failed, cancel all pending tasks + for pending_task in pending: + pending_task.cancel() + # Wait for cancellation to complete + if pending: + await asyncio.wait(pending) + # Re-raise the exception to notify the caller + raise task.exception() - # Check if there are any tasks to process - if not tasks: - log_message = f"No entities or relationships to process for {file_path}" - logger.info(log_message) - if pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - return + # If all tasks completed successfully, collect results + for task in edge_tasks: + edge_data, added_entities = task.result() + if edge_data is not None: + processed_edges.append(edge_data) + all_added_entities.extend(added_entities) - # Execute all tasks in parallel with semaphore control and early failure detection - done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) + # ===== Phase 3: Update full_entities and full_relations storage ===== + if full_entities_storage and full_relations_storage and doc_id: + try: + # Merge all entities: original entities + entities added during edge processing + final_entity_names = set() - # Check if any task raised an exception - for task in done: - if task.exception(): - # If a task failed, cancel all pending tasks - for pending_task in pending: - pending_task.cancel() + # Add original processed entities + for entity_data in processed_entities: + if entity_data and entity_data.get("entity_name"): + final_entity_names.add(entity_data["entity_name"]) - # Wait for cancellation to complete - if pending: - await asyncio.wait(pending) + # Add entities that were added during relationship processing + for added_entity in all_added_entities: + if added_entity and added_entity.get("entity_name"): + final_entity_names.add(added_entity["entity_name"]) - # Re-raise the exception to notify the caller - raise task.exception() + # Collect all relation pairs + final_relation_pairs = set() + for edge_data in processed_edges: + if edge_data: + src_id = edge_data.get("src_id") + tgt_id = edge_data.get("tgt_id") + if src_id and tgt_id: + relation_pair = tuple(sorted([src_id, tgt_id])) + final_relation_pairs.add(relation_pair) - # If all tasks completed successfully, collect results - # (No need to collect results since these tasks don't return values) + # Update storage + if final_entity_names: + await full_entities_storage.upsert( + { + doc_id: { + "entity_names": list(final_entity_names), + "count": len(final_entity_names), + } + } + ) + + if final_relation_pairs: + await full_relations_storage.upsert( + { + doc_id: { + "relation_pairs": [ + list(pair) for pair in final_relation_pairs + ], + "count": len(final_relation_pairs), + } + } + ) + + logger.debug( + f"Updated entity-relation index for document {doc_id}: {len(final_entity_names)} entities (original: {len(processed_entities)}, added: {len(all_added_entities)}), {len(final_relation_pairs)} relations" + ) + + except Exception as e: + logger.error( + f"Failed to update entity-relation index for document {doc_id}: {e}" + ) + # Don't raise exception to avoid affecting main flow + + log_message = f"Completed merging: {len(processed_entities)} entities, {len(all_added_entities)} added entities, {len(processed_edges)} relations" + logger.info(log_message) + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) async def extract_entities( From bfe6657b316f7e50bc9c5f0cc71d9fbb2b605ddd Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 3 Aug 2025 10:29:36 +0800 Subject: [PATCH 02/11] Remove auto_manage_storages_states option - Always manage storage states by LightRAG - Remove rag.initialize_storages() from all examples --- README-zh.md | 8 ----- README.md | 40 ++++++--------------- docs/rerank_integration.md | 1 - examples/lightrag_openai_compatible_demo.py | 33 +++++++++++------ lightrag/api/lightrag_server.py | 5 --- lightrag/lightrag.py | 11 +++--- reproduce/Step_1.py | 1 - reproduce/Step_1_openai_compatible.py | 1 - 8 files changed, 40 insertions(+), 60 deletions(-) diff --git a/README-zh.md b/README-zh.md index 2d08ff51..2e240375 100644 --- a/README-zh.md +++ b/README-zh.md @@ -204,7 +204,6 @@ async def initialize_rag(): embedding_func=openai_embed, llm_model_func=gpt_4o_mini_complete, ) - await rag.initialize_storages() await initialize_pipeline_status() return rag @@ -400,7 +399,6 @@ async def initialize_rag(): ) ) - await rag.initialize_storages() await initialize_pipeline_status() return rag @@ -547,7 +545,6 @@ async def initialize_rag(): ), ) - await rag.initialize_storages() await initialize_pipeline_status() return rag @@ -765,8 +762,6 @@ async def initialize_rag(): graph_storage="Neo4JStorage", #<-----------覆盖KG默认值 ) - # 初始化数据库连接 - await rag.initialize_storages() # 初始化文档处理的管道状态 await initialize_pipeline_status() @@ -1193,9 +1188,6 @@ LightRAG 现已与 [RAG-Anything](https://github.com/HKUDS/RAG-Anything) 实现 ) ) - # 初始化存储(如果有现有数据,这将加载现有数据) - await lightrag_instance.initialize_storages() - # 现在使用现有的 LightRAG 实例初始化 RAGAnything rag = RAGAnything( lightrag=lightrag_instance, # 传递现有的 LightRAG 实例 diff --git a/README.md b/README.md index bf319763..9172b621 100644 --- a/README.md +++ b/README.md @@ -181,8 +181,7 @@ For a streaming response implementation example, please see `examples/lightrag_o ### ⚠️ Important: Initialization Requirements -**LightRAG requires explicit initialization before use.** You must call both `await rag.initialize_storages()` and `await initialize_pipeline_status()` after creating a LightRAG instance, otherwise you will encounter errors like: -- `AttributeError: __aenter__` - if storages are not initialized +**LightRAG requires explicit initialization before use.** You must call `await initialize_pipeline_status()` after creating a LightRAG instance, otherwise you will encounter errors like: - `KeyError: 'history_messages'` - if pipeline status is not initialized ### A Simple Program @@ -209,9 +208,8 @@ async def initialize_rag(): embedding_func=openai_embed, llm_model_func=gpt_4o_mini_complete, ) - # IMPORTANT: Both initialization calls are required! - await rag.initialize_storages() # Initialize storage backends - await initialize_pipeline_status() # Initialize processing pipeline + # IMPORTANT: Initialize document processing pipeline status is required! + await initialize_pipeline_status() # return rag async def main(): @@ -401,7 +399,6 @@ async def initialize_rag(): ) ) - await rag.initialize_storages() await initialize_pipeline_status() return rag @@ -550,7 +547,6 @@ async def initialize_rag(): ), ) - await rag.initialize_storages() await initialize_pipeline_status() return rag @@ -776,8 +772,6 @@ async def initialize_rag(): graph_storage="Neo4JStorage", #<-----------override KG default ) - # Initialize database connections - await rag.initialize_storages() # Initialize pipeline status for document processing await initialize_pipeline_status() @@ -862,8 +856,6 @@ async def initialize_rag(): graph_storage="MemgraphStorage", #<-----------override KG default ) - # Initialize database connections - await rag.initialize_storages() # Initialize pipeline status for document processing await initialize_pipeline_status() @@ -1239,8 +1231,6 @@ LightRAG now seamlessly integrates with [RAG-Anything](https://github.com/HKUDS/ ), ) ) - # Initialize storage (this will load existing data if available) - await lightrag_instance.initialize_storages() # Now initialize RAGAnything with the existing LightRAG instance rag = RAGAnything( lightrag=lightrag_instance, # Pass the existing LightRAG instance @@ -1433,24 +1423,16 @@ Valid modes are: ### Common Initialization Errors -If you encounter these errors when using LightRAG: +If you encounter the following error when using LightRAG: -1. **`AttributeError: __aenter__`** - - **Cause**: Storage backends not initialized - - **Solution**: Call `await rag.initialize_storages()` after creating the LightRAG instance +- **`KeyError: 'history_messages'`** +- **Cause**: Pipeline status not initialized +- **Solution**: Call `await initialize_pipeline_status()` after initializing storages -2. **`KeyError: 'history_messages'`** - - **Cause**: Pipeline status not initialized - - **Solution**: Call `await initialize_pipeline_status()` after initializing storages - -3. **Both errors in sequence** - - **Cause**: Neither initialization method was called - - **Solution**: Always follow this pattern: - ```python - rag = LightRAG(...) - await rag.initialize_storages() - await initialize_pipeline_status() - ``` +```python +rag = LightRAG(...) +await initialize_pipeline_status() +``` ### Model Switching Issues diff --git a/docs/rerank_integration.md b/docs/rerank_integration.md index 0e6c5169..36058c43 100644 --- a/docs/rerank_integration.md +++ b/docs/rerank_integration.md @@ -174,7 +174,6 @@ async def main(): rerank_model_func=my_rerank_func, ) - await rag.initialize_storages() await initialize_pipeline_status() # Insert documents diff --git a/examples/lightrag_openai_compatible_demo.py b/examples/lightrag_openai_compatible_demo.py index 15187d25..0816fb31 100644 --- a/examples/lightrag_openai_compatible_demo.py +++ b/examples/lightrag_openai_compatible_demo.py @@ -4,7 +4,7 @@ import inspect import logging import logging.config from lightrag import LightRAG, QueryParam -from lightrag.llm.openai import openai_complete_if_cache +from lightrag.llm.openai import openai_complete_if_cache, openai_embed from lightrag.llm.ollama import ollama_embed from lightrag.utils import EmbeddingFunc, logger, set_verbose_debug from lightrag.kg.shared_storage import initialize_pipeline_status @@ -99,6 +99,26 @@ async def llm_model_func( ) +ollama_embedding_func = EmbeddingFunc( + embedding_dim=int(os.getenv("EMBEDDING_DIM", "1024")), + func=lambda texts: ollama_embed( + texts, + embed_model=os.getenv("EMBEDDING_MODEL", "bge-m3:latest"), + host=os.getenv("EMBEDDING_BINDING_HOST", "http://localhost:11434"), + ), +) + +openai_embedding_func = EmbeddingFunc( + embedding_dim=int(os.getenv("EMBEDDING_DIM", "1024")), + func=lambda texts: openai_embed( + texts, + model=os.getenv("EMBEDDING_MODEL", "BAAI/bge-m3"), + base_url=os.getenv("EMBEDDING_BINDING_HOST", "https://api.deepseek.com"), + api_key=os.getenv("EMBEDDING_BINDING_API_KEY") or os.getenv("OPENAI_API_KEY"), + ), +) + + async def print_stream(stream): async for chunk in stream: if chunk: @@ -109,18 +129,9 @@ async def initialize_rag(): rag = LightRAG( working_dir=WORKING_DIR, llm_model_func=llm_model_func, - embedding_func=EmbeddingFunc( - embedding_dim=int(os.getenv("EMBEDDING_DIM", "1024")), - max_token_size=int(os.getenv("MAX_EMBED_TOKENS", "8192")), - func=lambda texts: ollama_embed( - texts, - embed_model=os.getenv("EMBEDDING_MODEL", "bge-m3:latest"), - host=os.getenv("EMBEDDING_BINDING_HOST", "http://localhost:11434"), - ), - ), + embedding_func=openai_embedding_func, ) - await rag.initialize_storages() await initialize_pipeline_status() return rag diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 606becb6..0fc098bb 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -149,9 +149,6 @@ def create_app(args): app.state.background_tasks = set() try: - # Initialize database connections - await rag.initialize_storages() - await initialize_pipeline_status() pipeline_status = await get_namespace_data("pipeline_status") @@ -401,7 +398,6 @@ def create_app(args): enable_llm_cache_for_entity_extract=args.enable_llm_cache_for_extract, enable_llm_cache=args.enable_llm_cache, rerank_model_func=rerank_model_func, - auto_manage_storages_states=False, max_parallel_insert=args.max_parallel_insert, max_graph_nodes=args.max_graph_nodes, addon_params={"language": args.summary_language}, @@ -431,7 +427,6 @@ def create_app(args): enable_llm_cache_for_entity_extract=args.enable_llm_cache_for_extract, enable_llm_cache=args.enable_llm_cache, rerank_model_func=rerank_model_func, - auto_manage_storages_states=False, max_parallel_insert=args.max_parallel_insert, max_graph_nodes=args.max_graph_nodes, addon_params={"language": args.summary_language}, diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 73386b0b..b46cae1b 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -334,6 +334,7 @@ class LightRAG: # Storages Management # --- + # TODO: Deprecated (LightRAG will always manage storages states) auto_manage_storages_states: bool = field(default=True) """If True, lightrag will automatically calls initialize_storages and finalize_storages at the appropriate times.""" @@ -531,12 +532,14 @@ class LightRAG: self._storages_status = StoragesStatus.CREATED - if self.auto_manage_storages_states: - self._run_async_safely(self.initialize_storages, "Storage Initialization") + # Initialize storages + self._run_async_safely(self.initialize_storages, "Storage Initialization") + + # Check and perform data migration if needed + self._run_async_safely(self._check_and_migrate_data, "Data Migration Check") def __del__(self): - if self.auto_manage_storages_states: - self._run_async_safely(self.finalize_storages, "Storage Finalization") + self._run_async_safely(self.finalize_storages, "Storage Finalization") def _run_async_safely(self, async_func, action_name=""): """Safely execute an async function, avoiding event loop conflicts.""" diff --git a/reproduce/Step_1.py b/reproduce/Step_1.py index c94015ad..d74b358a 100644 --- a/reproduce/Step_1.py +++ b/reproduce/Step_1.py @@ -35,7 +35,6 @@ if not os.path.exists(WORKING_DIR): async def initialize_rag(): rag = LightRAG(working_dir=WORKING_DIR) - await rag.initialize_storages() await initialize_pipeline_status() return rag diff --git a/reproduce/Step_1_openai_compatible.py b/reproduce/Step_1_openai_compatible.py index 8093a9ee..4040810d 100644 --- a/reproduce/Step_1_openai_compatible.py +++ b/reproduce/Step_1_openai_compatible.py @@ -70,7 +70,6 @@ async def initialize_rag(): embedding_func=EmbeddingFunc(embedding_dim=4096, func=embedding_func), ) - await rag.initialize_storages() await initialize_pipeline_status() return rag From d2dd137f83e94956fc7450ce1e98c25050e6618c Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 3 Aug 2025 11:02:37 +0800 Subject: [PATCH 03/11] feat: implement get_all_nodes and get_all_edges methods for graph storage backends Add get_all_nodes() and get_all_edges() methods to Neo4JStorage, PGGraphStorage, MongoGraphStorage, and MemgraphStorage classes. These methods return all nodes and edges in the graph with consistent formatting matching NetworkXStorage for compatibility across different storage backends. --- lightrag/kg/memgraph_impl.py | 57 ++++++++++++++++++++++++++++++++++ lightrag/kg/mongo_impl.py | 30 ++++++++++++++++++ lightrag/kg/neo4j_impl.py | 49 +++++++++++++++++++++++++++++ lightrag/kg/postgres_impl.py | 60 ++++++++++++++++++++++++++++++++++++ 4 files changed, 196 insertions(+) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 86958a1a..af26b961 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -997,3 +997,60 @@ class MemgraphStorage(BaseGraphStorage): logger.warning(f"Memgraph error during subgraph query: {str(e)}") return result + + async def get_all_nodes(self) -> list[dict]: + """Get all nodes in the graph. + + Returns: + A list of all nodes, where each node is a dictionary of its properties + """ + if self._driver is None: + raise RuntimeError( + "Memgraph driver is not initialized. Call 'await initialize()' first." + ) + workspace_label = self._get_workspace_label() + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + query = f""" + MATCH (n:`{workspace_label}`) + RETURN n + """ + result = await session.run(query) + nodes = [] + async for record in result: + node = record["n"] + node_dict = dict(node) + # Add node id (entity_id) to the dictionary for easier access + node_dict["id"] = node_dict.get("entity_id") + nodes.append(node_dict) + await result.consume() + return nodes + + async def get_all_edges(self) -> list[dict]: + """Get all edges in the graph. + + Returns: + A list of all edges, where each edge is a dictionary of its properties + """ + if self._driver is None: + raise RuntimeError( + "Memgraph driver is not initialized. Call 'await initialize()' first." + ) + workspace_label = self._get_workspace_label() + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + query = f""" + MATCH (a:`{workspace_label}`)-[r]-(b:`{workspace_label}`) + RETURN DISTINCT a.entity_id AS source, b.entity_id AS target, properties(r) AS properties + """ + result = await session.run(query) + edges = [] + async for record in result: + edge_properties = record["properties"] + edge_properties["source"] = record["source"] + edge_properties["target"] = record["target"] + edges.append(edge_properties) + await result.consume() + return edges diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index 14bd6633..9e2847f2 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -1508,6 +1508,36 @@ class MongoGraphStorage(BaseGraphStorage): logger.debug(f"Successfully deleted edges: {edges}") + async def get_all_nodes(self) -> list[dict]: + """Get all nodes in the graph. + + Returns: + A list of all nodes, where each node is a dictionary of its properties + """ + cursor = self.collection.find({}) + nodes = [] + async for node in cursor: + node_dict = dict(node) + # Add node id (entity_id) to the dictionary for easier access + node_dict["id"] = node_dict.get("_id") + nodes.append(node_dict) + return nodes + + async def get_all_edges(self) -> list[dict]: + """Get all edges in the graph. + + Returns: + A list of all edges, where each edge is a dictionary of its properties + """ + cursor = self.edge_collection.find({}) + edges = [] + async for edge in cursor: + edge_dict = dict(edge) + edge_dict["source"] = edge_dict.get("source_node_id") + edge_dict["target"] = edge_dict.get("target_node_id") + edges.append(edge_dict) + return edges + async def drop(self) -> dict[str, str]: """Drop the storage by removing all documents in the collection. diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index d68707b0..3ae22927 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -1400,6 +1400,55 @@ class Neo4JStorage(BaseGraphStorage): logger.error(f"Error during edge deletion: {str(e)}") raise + async def get_all_nodes(self) -> list[dict]: + """Get all nodes in the graph. + + Returns: + A list of all nodes, where each node is a dictionary of its properties + """ + workspace_label = self._get_workspace_label() + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + query = f""" + MATCH (n:`{workspace_label}`) + RETURN n + """ + result = await session.run(query) + nodes = [] + async for record in result: + node = record["n"] + node_dict = dict(node) + # Add node id (entity_id) to the dictionary for easier access + node_dict["id"] = node_dict.get("entity_id") + nodes.append(node_dict) + await result.consume() + return nodes + + async def get_all_edges(self) -> list[dict]: + """Get all edges in the graph. + + Returns: + A list of all edges, where each edge is a dictionary of its properties + """ + workspace_label = self._get_workspace_label() + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + query = f""" + MATCH (a:`{workspace_label}`)-[r]-(b:`{workspace_label}`) + RETURN DISTINCT a.entity_id AS source, b.entity_id AS target, properties(r) AS properties + """ + result = await session.run(query) + edges = [] + async for record in result: + edge_properties = record["properties"] + edge_properties["source"] = record["source"] + edge_properties["target"] = record["target"] + edges.append(edge_properties) + await result.consume() + return edges + async def drop(self) -> dict[str, str]: """Drop all data from current workspace storage and clean up resources diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 7edfdad1..fc21a50b 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -3669,6 +3669,66 @@ class PGGraphStorage(BaseGraphStorage): return kg + async def get_all_nodes(self) -> list[dict]: + """Get all nodes in the graph. + + Returns: + A list of all nodes, where each node is a dictionary of its properties + """ + query = f"""SELECT * FROM cypher('{self.graph_name}', $$ + MATCH (n:base) + RETURN n + $$) AS (n agtype)""" + + results = await self._query(query) + nodes = [] + for result in results: + if result["n"]: + node_dict = result["n"]["properties"] + + # Process string result, parse it to JSON dictionary + if isinstance(node_dict, str): + try: + node_dict = json.loads(node_dict) + except json.JSONDecodeError: + logger.warning(f"Failed to parse node string: {node_dict}") + + # Add node id (entity_id) to the dictionary for easier access + node_dict["id"] = node_dict.get("entity_id") + nodes.append(node_dict) + return nodes + + async def get_all_edges(self) -> list[dict]: + """Get all edges in the graph. + + Returns: + A list of all edges, where each edge is a dictionary of its properties + """ + query = f"""SELECT * FROM cypher('{self.graph_name}', $$ + MATCH (a:base)-[r]-(b:base) + RETURN DISTINCT a.entity_id AS source, b.entity_id AS target, properties(r) AS properties + $$) AS (source text, target text, properties agtype)""" + + results = await self._query(query) + edges = [] + for result in results: + edge_properties = result["properties"] + + # Process string result, parse it to JSON dictionary + if isinstance(edge_properties, str): + try: + edge_properties = json.loads(edge_properties) + except json.JSONDecodeError: + logger.warning( + f"Failed to parse edge properties string: {edge_properties}" + ) + edge_properties = {} + + edge_properties["source"] = result["source"] + edge_properties["target"] = result["target"] + edges.append(edge_properties) + return edges + async def drop(self) -> dict[str, str]: """Drop the storage""" try: From 06efab4af270905d83b5e3145626ed3a47808e25 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 3 Aug 2025 12:12:13 +0800 Subject: [PATCH 04/11] Revert "Remove auto_manage_storages_states option" This reverts commit bfe6657b316f7e50bc9c5f0cc71d9fbb2b605ddd. --- README-zh.md | 8 +++++ README.md | 40 +++++++++++++++------ docs/rerank_integration.md | 1 + examples/lightrag_openai_compatible_demo.py | 33 ++++++----------- lightrag/api/lightrag_server.py | 5 +++ lightrag/lightrag.py | 11 +++--- reproduce/Step_1.py | 1 + reproduce/Step_1_openai_compatible.py | 1 + 8 files changed, 60 insertions(+), 40 deletions(-) diff --git a/README-zh.md b/README-zh.md index 2e240375..2d08ff51 100644 --- a/README-zh.md +++ b/README-zh.md @@ -204,6 +204,7 @@ async def initialize_rag(): embedding_func=openai_embed, llm_model_func=gpt_4o_mini_complete, ) + await rag.initialize_storages() await initialize_pipeline_status() return rag @@ -399,6 +400,7 @@ async def initialize_rag(): ) ) + await rag.initialize_storages() await initialize_pipeline_status() return rag @@ -545,6 +547,7 @@ async def initialize_rag(): ), ) + await rag.initialize_storages() await initialize_pipeline_status() return rag @@ -762,6 +765,8 @@ async def initialize_rag(): graph_storage="Neo4JStorage", #<-----------覆盖KG默认值 ) + # 初始化数据库连接 + await rag.initialize_storages() # 初始化文档处理的管道状态 await initialize_pipeline_status() @@ -1188,6 +1193,9 @@ LightRAG 现已与 [RAG-Anything](https://github.com/HKUDS/RAG-Anything) 实现 ) ) + # 初始化存储(如果有现有数据,这将加载现有数据) + await lightrag_instance.initialize_storages() + # 现在使用现有的 LightRAG 实例初始化 RAGAnything rag = RAGAnything( lightrag=lightrag_instance, # 传递现有的 LightRAG 实例 diff --git a/README.md b/README.md index 9172b621..bf319763 100644 --- a/README.md +++ b/README.md @@ -181,7 +181,8 @@ For a streaming response implementation example, please see `examples/lightrag_o ### ⚠️ Important: Initialization Requirements -**LightRAG requires explicit initialization before use.** You must call `await initialize_pipeline_status()` after creating a LightRAG instance, otherwise you will encounter errors like: +**LightRAG requires explicit initialization before use.** You must call both `await rag.initialize_storages()` and `await initialize_pipeline_status()` after creating a LightRAG instance, otherwise you will encounter errors like: +- `AttributeError: __aenter__` - if storages are not initialized - `KeyError: 'history_messages'` - if pipeline status is not initialized ### A Simple Program @@ -208,8 +209,9 @@ async def initialize_rag(): embedding_func=openai_embed, llm_model_func=gpt_4o_mini_complete, ) - # IMPORTANT: Initialize document processing pipeline status is required! - await initialize_pipeline_status() # + # IMPORTANT: Both initialization calls are required! + await rag.initialize_storages() # Initialize storage backends + await initialize_pipeline_status() # Initialize processing pipeline return rag async def main(): @@ -399,6 +401,7 @@ async def initialize_rag(): ) ) + await rag.initialize_storages() await initialize_pipeline_status() return rag @@ -547,6 +550,7 @@ async def initialize_rag(): ), ) + await rag.initialize_storages() await initialize_pipeline_status() return rag @@ -772,6 +776,8 @@ async def initialize_rag(): graph_storage="Neo4JStorage", #<-----------override KG default ) + # Initialize database connections + await rag.initialize_storages() # Initialize pipeline status for document processing await initialize_pipeline_status() @@ -856,6 +862,8 @@ async def initialize_rag(): graph_storage="MemgraphStorage", #<-----------override KG default ) + # Initialize database connections + await rag.initialize_storages() # Initialize pipeline status for document processing await initialize_pipeline_status() @@ -1231,6 +1239,8 @@ LightRAG now seamlessly integrates with [RAG-Anything](https://github.com/HKUDS/ ), ) ) + # Initialize storage (this will load existing data if available) + await lightrag_instance.initialize_storages() # Now initialize RAGAnything with the existing LightRAG instance rag = RAGAnything( lightrag=lightrag_instance, # Pass the existing LightRAG instance @@ -1423,16 +1433,24 @@ Valid modes are: ### Common Initialization Errors -If you encounter the following error when using LightRAG: +If you encounter these errors when using LightRAG: -- **`KeyError: 'history_messages'`** -- **Cause**: Pipeline status not initialized -- **Solution**: Call `await initialize_pipeline_status()` after initializing storages +1. **`AttributeError: __aenter__`** + - **Cause**: Storage backends not initialized + - **Solution**: Call `await rag.initialize_storages()` after creating the LightRAG instance -```python -rag = LightRAG(...) -await initialize_pipeline_status() -``` +2. **`KeyError: 'history_messages'`** + - **Cause**: Pipeline status not initialized + - **Solution**: Call `await initialize_pipeline_status()` after initializing storages + +3. **Both errors in sequence** + - **Cause**: Neither initialization method was called + - **Solution**: Always follow this pattern: + ```python + rag = LightRAG(...) + await rag.initialize_storages() + await initialize_pipeline_status() + ``` ### Model Switching Issues diff --git a/docs/rerank_integration.md b/docs/rerank_integration.md index 36058c43..0e6c5169 100644 --- a/docs/rerank_integration.md +++ b/docs/rerank_integration.md @@ -174,6 +174,7 @@ async def main(): rerank_model_func=my_rerank_func, ) + await rag.initialize_storages() await initialize_pipeline_status() # Insert documents diff --git a/examples/lightrag_openai_compatible_demo.py b/examples/lightrag_openai_compatible_demo.py index 0816fb31..15187d25 100644 --- a/examples/lightrag_openai_compatible_demo.py +++ b/examples/lightrag_openai_compatible_demo.py @@ -4,7 +4,7 @@ import inspect import logging import logging.config from lightrag import LightRAG, QueryParam -from lightrag.llm.openai import openai_complete_if_cache, openai_embed +from lightrag.llm.openai import openai_complete_if_cache from lightrag.llm.ollama import ollama_embed from lightrag.utils import EmbeddingFunc, logger, set_verbose_debug from lightrag.kg.shared_storage import initialize_pipeline_status @@ -99,26 +99,6 @@ async def llm_model_func( ) -ollama_embedding_func = EmbeddingFunc( - embedding_dim=int(os.getenv("EMBEDDING_DIM", "1024")), - func=lambda texts: ollama_embed( - texts, - embed_model=os.getenv("EMBEDDING_MODEL", "bge-m3:latest"), - host=os.getenv("EMBEDDING_BINDING_HOST", "http://localhost:11434"), - ), -) - -openai_embedding_func = EmbeddingFunc( - embedding_dim=int(os.getenv("EMBEDDING_DIM", "1024")), - func=lambda texts: openai_embed( - texts, - model=os.getenv("EMBEDDING_MODEL", "BAAI/bge-m3"), - base_url=os.getenv("EMBEDDING_BINDING_HOST", "https://api.deepseek.com"), - api_key=os.getenv("EMBEDDING_BINDING_API_KEY") or os.getenv("OPENAI_API_KEY"), - ), -) - - async def print_stream(stream): async for chunk in stream: if chunk: @@ -129,9 +109,18 @@ async def initialize_rag(): rag = LightRAG( working_dir=WORKING_DIR, llm_model_func=llm_model_func, - embedding_func=openai_embedding_func, + embedding_func=EmbeddingFunc( + embedding_dim=int(os.getenv("EMBEDDING_DIM", "1024")), + max_token_size=int(os.getenv("MAX_EMBED_TOKENS", "8192")), + func=lambda texts: ollama_embed( + texts, + embed_model=os.getenv("EMBEDDING_MODEL", "bge-m3:latest"), + host=os.getenv("EMBEDDING_BINDING_HOST", "http://localhost:11434"), + ), + ), ) + await rag.initialize_storages() await initialize_pipeline_status() return rag diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 0fc098bb..606becb6 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -149,6 +149,9 @@ def create_app(args): app.state.background_tasks = set() try: + # Initialize database connections + await rag.initialize_storages() + await initialize_pipeline_status() pipeline_status = await get_namespace_data("pipeline_status") @@ -398,6 +401,7 @@ def create_app(args): enable_llm_cache_for_entity_extract=args.enable_llm_cache_for_extract, enable_llm_cache=args.enable_llm_cache, rerank_model_func=rerank_model_func, + auto_manage_storages_states=False, max_parallel_insert=args.max_parallel_insert, max_graph_nodes=args.max_graph_nodes, addon_params={"language": args.summary_language}, @@ -427,6 +431,7 @@ def create_app(args): enable_llm_cache_for_entity_extract=args.enable_llm_cache_for_extract, enable_llm_cache=args.enable_llm_cache, rerank_model_func=rerank_model_func, + auto_manage_storages_states=False, max_parallel_insert=args.max_parallel_insert, max_graph_nodes=args.max_graph_nodes, addon_params={"language": args.summary_language}, diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index b46cae1b..73386b0b 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -334,7 +334,6 @@ class LightRAG: # Storages Management # --- - # TODO: Deprecated (LightRAG will always manage storages states) auto_manage_storages_states: bool = field(default=True) """If True, lightrag will automatically calls initialize_storages and finalize_storages at the appropriate times.""" @@ -532,14 +531,12 @@ class LightRAG: self._storages_status = StoragesStatus.CREATED - # Initialize storages - self._run_async_safely(self.initialize_storages, "Storage Initialization") - - # Check and perform data migration if needed - self._run_async_safely(self._check_and_migrate_data, "Data Migration Check") + if self.auto_manage_storages_states: + self._run_async_safely(self.initialize_storages, "Storage Initialization") def __del__(self): - self._run_async_safely(self.finalize_storages, "Storage Finalization") + if self.auto_manage_storages_states: + self._run_async_safely(self.finalize_storages, "Storage Finalization") def _run_async_safely(self, async_func, action_name=""): """Safely execute an async function, avoiding event loop conflicts.""" diff --git a/reproduce/Step_1.py b/reproduce/Step_1.py index d74b358a..c94015ad 100644 --- a/reproduce/Step_1.py +++ b/reproduce/Step_1.py @@ -35,6 +35,7 @@ if not os.path.exists(WORKING_DIR): async def initialize_rag(): rag = LightRAG(working_dir=WORKING_DIR) + await rag.initialize_storages() await initialize_pipeline_status() return rag diff --git a/reproduce/Step_1_openai_compatible.py b/reproduce/Step_1_openai_compatible.py index 4040810d..8093a9ee 100644 --- a/reproduce/Step_1_openai_compatible.py +++ b/reproduce/Step_1_openai_compatible.py @@ -70,6 +70,7 @@ async def initialize_rag(): embedding_func=EmbeddingFunc(embedding_dim=4096, func=embedding_func), ) + await rag.initialize_storages() await initialize_pipeline_status() return rag From e8d8afa84604a5dc96188f514730536f955e39f8 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 3 Aug 2025 12:42:57 +0800 Subject: [PATCH 05/11] Removed auto storage management from LightRAG instance creation - The `initialize_storages` method must be explicitly called after LightRAG creation. The `finalize_storages` method should be called before LightRAG destyoyed. - Added explicit data migration check --- lightrag/api/lightrag_server.py | 3 +- lightrag/lightrag.py | 89 +++++++++++++++------------------ 2 files changed, 42 insertions(+), 50 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 606becb6..55fd5645 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -151,6 +151,7 @@ def create_app(args): try: # Initialize database connections await rag.initialize_storages() + await rag.check_and_migrate_data() await initialize_pipeline_status() pipeline_status = await get_namespace_data("pipeline_status") @@ -401,7 +402,6 @@ def create_app(args): enable_llm_cache_for_entity_extract=args.enable_llm_cache_for_extract, enable_llm_cache=args.enable_llm_cache, rerank_model_func=rerank_model_func, - auto_manage_storages_states=False, max_parallel_insert=args.max_parallel_insert, max_graph_nodes=args.max_graph_nodes, addon_params={"language": args.summary_language}, @@ -431,7 +431,6 @@ def create_app(args): enable_llm_cache_for_entity_extract=args.enable_llm_cache_for_extract, enable_llm_cache=args.enable_llm_cache, rerank_model_func=rerank_model_func, - auto_manage_storages_states=False, max_parallel_insert=args.max_parallel_insert, max_graph_nodes=args.max_graph_nodes, addon_params={"language": args.summary_language}, diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 73386b0b..288655f7 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -334,12 +334,10 @@ class LightRAG: # Storages Management # --- - auto_manage_storages_states: bool = field(default=True) + # TODO: Deprecated (LightRAG will never initialize storage automatically on creation,and finalize should be call before destroying) + auto_manage_storages_states: bool = field(default=False) """If True, lightrag will automatically calls initialize_storages and finalize_storages at the appropriate times.""" - # Storages Management - # --- - cosine_better_than_threshold: float = field( default=float(os.getenv("COSINE_THRESHOLD", 0.2)) ) @@ -531,32 +529,6 @@ class LightRAG: self._storages_status = StoragesStatus.CREATED - if self.auto_manage_storages_states: - self._run_async_safely(self.initialize_storages, "Storage Initialization") - - def __del__(self): - if self.auto_manage_storages_states: - self._run_async_safely(self.finalize_storages, "Storage Finalization") - - def _run_async_safely(self, async_func, action_name=""): - """Safely execute an async function, avoiding event loop conflicts.""" - try: - loop = always_get_an_event_loop() - if loop.is_running(): - task = loop.create_task(async_func()) - task.add_done_callback( - lambda t: logger.info(f"{action_name} completed!") - ) - else: - loop.run_until_complete(async_func()) - except RuntimeError: - logger.warning( - f"No running event loop, creating a new loop for {action_name}." - ) - loop = asyncio.new_event_loop() - loop.run_until_complete(async_func()) - loop.close() - async def initialize_storages(self): """Asynchronously initialize the storages""" if self._storages_status == StoragesStatus.CREATED: @@ -583,31 +555,52 @@ class LightRAG: logger.debug("All storage types initialized") async def finalize_storages(self): - """Asynchronously finalize the storages""" + """Asynchronously finalize the storages with improved error handling""" if self._storages_status == StoragesStatus.INITIALIZED: - tasks = [] + storages = [ + ("full_docs", self.full_docs), + ("text_chunks", self.text_chunks), + ("full_entities", self.full_entities), + ("full_relations", self.full_relations), + ("entities_vdb", self.entities_vdb), + ("relationships_vdb", self.relationships_vdb), + ("chunks_vdb", self.chunks_vdb), + ("chunk_entity_relation_graph", self.chunk_entity_relation_graph), + ("llm_response_cache", self.llm_response_cache), + ("doc_status", self.doc_status), + ] - for storage in ( - self.full_docs, - self.text_chunks, - self.full_entities, - self.full_relations, - self.entities_vdb, - self.relationships_vdb, - self.chunks_vdb, - self.chunk_entity_relation_graph, - self.llm_response_cache, - self.doc_status, - ): + # Finalize each storage individually to ensure one failure doesn't prevent others from closing + successful_finalizations = [] + failed_finalizations = [] + + for storage_name, storage in storages: if storage: - tasks.append(storage.finalize()) + try: + await storage.finalize() + successful_finalizations.append(storage_name) + logger.debug(f"Successfully finalized {storage_name}") + except Exception as e: + error_msg = f"Failed to finalize {storage_name}: {e}" + logger.error(error_msg) + failed_finalizations.append(storage_name) - await asyncio.gather(*tasks) + # Log summary of finalization results + if successful_finalizations: + logger.info( + f"Successfully finalized {len(successful_finalizations)} storages" + ) + + if failed_finalizations: + logger.error( + f"Failed to finalize {len(failed_finalizations)} storages: {', '.join(failed_finalizations)}" + ) + else: + logger.debug("All storages finalized successfully") self._storages_status = StoragesStatus.FINALIZED - logger.debug("Finalized Storages") - async def _check_and_migrate_data(self): + async def check_and_migrate_data(self): """Check if data migration is needed and perform migration if necessary""" try: # Check if migration is needed: From bf9a6d699b0747b75c299b8240f86ae8aaedc40d Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 3 Aug 2025 22:14:24 +0800 Subject: [PATCH 06/11] Fix(lightrag): Handle undirected edges in data migration The `_migrate_entity_relation_data` function previously processed directed edges from `get_all_edges`, which could lead to duplicates (e.g., (A,B) and (B,A)) and an incorrect relation count. This commit normalizes edges by sorting their source and target nodes before adding them to the relation set. This ensures all edges are treated as undirected and are properly deduplicated. --- lightrag/lightrag.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 288655f7..2bd710df 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -719,7 +719,7 @@ class LightRAG: if doc_id not in doc_relations: doc_relations[doc_id] = set() # Use tuple for set operations, convert to list later - doc_relations[doc_id].add((src, tgt)) + doc_relations[doc_id].add(tuple(sorted((src, tgt)))) # Store the results in full_entities and full_relations migration_count = 0 @@ -728,7 +728,10 @@ class LightRAG: if doc_entities: entities_data = {} for doc_id, entity_set in doc_entities.items(): - entities_data[doc_id] = {"entity_names": list(entity_set)} + entities_data[doc_id] = { + "entity_names": list(entity_set), + "count": len(entity_set), + } await self.full_entities.upsert(entities_data) # Store relations @@ -737,7 +740,8 @@ class LightRAG: for doc_id, relation_set in doc_relations.items(): # Convert tuples back to lists relations_data[doc_id] = { - "relation_pairs": [list(pair) for pair in relation_set] + "relation_pairs": [list(pair) for pair in relation_set], + "count": len(relation_set), } await self.full_relations.upsert(relations_data) From 952d1feb07a3bcd39df4e584eddb6bb642281d3c Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 3 Aug 2025 22:54:56 +0800 Subject: [PATCH 07/11] feat: Add support for KV_STORE_FULL_ENTITIES and KV_STORE_FULL_RELATIONS namespaces in PGKVStorage - Add LIGHTRAG_FULL_ENTITIES and LIGHTRAG_FULL_RELATIONS table schemas - Implement complete CRUD operations for both namespaces - Add automatic table creation and migration support - Add SQL templates and namespace mappings - Ensure workspace isolation and proper indexing --- lightrag/base.py | 1 + lightrag/kg/postgres_impl.py | 265 +++++++++++++++++++++++++++++++++++ 2 files changed, 266 insertions(+) diff --git a/lightrag/base.py b/lightrag/base.py index 9f82c6eb..045bcb07 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -660,6 +660,7 @@ class BaseGraphStorage(StorageNameSpace, ABC): Returns: A list of all nodes, where each node is a dictionary of its properties + (Edge is bidirectional for some storage implementation; deduplication must be handled by the caller) """ @abstractmethod diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index fc21a50b..6f8529e6 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -920,6 +920,80 @@ class PostgreSQLDB: except Exception as e: logger.error(f"PostgreSQL, Failed to create pagination indexes: {e}") + # Migrate to ensure new tables LIGHTRAG_FULL_ENTITIES and LIGHTRAG_FULL_RELATIONS exist + try: + await self._migrate_create_full_entities_relations_tables() + except Exception as e: + logger.error( + f"PostgreSQL, Failed to create full entities/relations tables: {e}" + ) + + async def _migrate_create_full_entities_relations_tables(self): + """Create LIGHTRAG_FULL_ENTITIES and LIGHTRAG_FULL_RELATIONS tables if they don't exist""" + tables_to_check = [ + { + "name": "LIGHTRAG_FULL_ENTITIES", + "ddl": TABLES["LIGHTRAG_FULL_ENTITIES"]["ddl"], + "description": "Full entities storage table", + }, + { + "name": "LIGHTRAG_FULL_RELATIONS", + "ddl": TABLES["LIGHTRAG_FULL_RELATIONS"]["ddl"], + "description": "Full relations storage table", + }, + ] + + for table_info in tables_to_check: + table_name = table_info["name"] + try: + # Check if table exists + check_table_sql = """ + SELECT table_name + FROM information_schema.tables + WHERE table_name = $1 + AND table_schema = 'public' + """ + + table_exists = await self.query( + check_table_sql, {"table_name": table_name.lower()} + ) + + if not table_exists: + logger.info(f"Creating table {table_name}") + await self.execute(table_info["ddl"]) + logger.info( + f"Successfully created {table_info['description']}: {table_name}" + ) + + # Create basic indexes for the new table + try: + # Create index for id column + index_name = f"idx_{table_name.lower()}_id" + create_index_sql = ( + f"CREATE INDEX {index_name} ON {table_name}(id)" + ) + await self.execute(create_index_sql) + logger.info(f"Created index {index_name} on table {table_name}") + + # Create composite index for (workspace, id) columns + composite_index_name = f"idx_{table_name.lower()}_workspace_id" + create_composite_index_sql = f"CREATE INDEX {composite_index_name} ON {table_name}(workspace, id)" + await self.execute(create_composite_index_sql) + logger.info( + f"Created composite index {composite_index_name} on table {table_name}" + ) + + except Exception as e: + logger.warning( + f"Failed to create indexes for table {table_name}: {e}" + ) + + else: + logger.debug(f"Table {table_name} already exists") + + except Exception as e: + logger.error(f"Failed to create table {table_name}: {e}") + async def _create_pagination_indexes(self): """Create indexes to optimize pagination queries for LIGHTRAG_DOC_STATUS""" indexes = [ @@ -1233,6 +1307,46 @@ class PGKVStorage(BaseKVStorage): processed_results[row["id"]] = row return processed_results + # For FULL_ENTITIES namespace, parse entity_names JSON string back to list + if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES): + processed_results = {} + for row in results: + entity_names = row.get("entity_names", []) + if isinstance(entity_names, str): + try: + entity_names = json.loads(entity_names) + except json.JSONDecodeError: + entity_names = [] + row["entity_names"] = entity_names + create_time = row.get("create_time", 0) + update_time = row.get("update_time", 0) + row["create_time"] = create_time + row["update_time"] = ( + create_time if update_time == 0 else update_time + ) + processed_results[row["id"]] = row + return processed_results + + # For FULL_RELATIONS namespace, parse relation_pairs JSON string back to list + if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS): + processed_results = {} + for row in results: + relation_pairs = row.get("relation_pairs", []) + if isinstance(relation_pairs, str): + try: + relation_pairs = json.loads(relation_pairs) + except json.JSONDecodeError: + relation_pairs = [] + row["relation_pairs"] = relation_pairs + create_time = row.get("create_time", 0) + update_time = row.get("update_time", 0) + row["create_time"] = create_time + row["update_time"] = ( + create_time if update_time == 0 else update_time + ) + processed_results[row["id"]] = row + return processed_results + # For other namespaces, return as-is return {row["id"]: row for row in results} except Exception as e: @@ -1277,6 +1391,36 @@ class PGKVStorage(BaseKVStorage): "update_time": create_time if update_time == 0 else update_time, } + # Special handling for FULL_ENTITIES namespace + if response and is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES): + # Parse entity_names JSON string back to list + entity_names = response.get("entity_names", []) + if isinstance(entity_names, str): + try: + entity_names = json.loads(entity_names) + except json.JSONDecodeError: + entity_names = [] + response["entity_names"] = entity_names + create_time = response.get("create_time", 0) + update_time = response.get("update_time", 0) + response["create_time"] = create_time + response["update_time"] = create_time if update_time == 0 else update_time + + # Special handling for FULL_RELATIONS namespace + if response and is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS): + # Parse relation_pairs JSON string back to list + relation_pairs = response.get("relation_pairs", []) + if isinstance(relation_pairs, str): + try: + relation_pairs = json.loads(relation_pairs) + except json.JSONDecodeError: + relation_pairs = [] + response["relation_pairs"] = relation_pairs + create_time = response.get("create_time", 0) + update_time = response.get("update_time", 0) + response["create_time"] = create_time + response["update_time"] = create_time if update_time == 0 else update_time + return response if response else None # Query by id @@ -1325,6 +1469,38 @@ class PGKVStorage(BaseKVStorage): processed_results.append(processed_row) return processed_results + # Special handling for FULL_ENTITIES namespace + if results and is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES): + for result in results: + # Parse entity_names JSON string back to list + entity_names = result.get("entity_names", []) + if isinstance(entity_names, str): + try: + entity_names = json.loads(entity_names) + except json.JSONDecodeError: + entity_names = [] + result["entity_names"] = entity_names + create_time = result.get("create_time", 0) + update_time = result.get("update_time", 0) + result["create_time"] = create_time + result["update_time"] = create_time if update_time == 0 else update_time + + # Special handling for FULL_RELATIONS namespace + if results and is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS): + for result in results: + # Parse relation_pairs JSON string back to list + relation_pairs = result.get("relation_pairs", []) + if isinstance(relation_pairs, str): + try: + relation_pairs = json.loads(relation_pairs) + except json.JSONDecodeError: + relation_pairs = [] + result["relation_pairs"] = relation_pairs + create_time = result.get("create_time", 0) + update_time = result.get("update_time", 0) + result["create_time"] = create_time + result["update_time"] = create_time if update_time == 0 else update_time + return results if results else [] async def filter_keys(self, keys: set[str]) -> set[str]: @@ -1397,6 +1573,34 @@ class PGKVStorage(BaseKVStorage): } await self.db.execute(upsert_sql, _data) + elif is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES): + # Get current UTC time and convert to naive datetime for database storage + current_time = datetime.datetime.now(timezone.utc).replace(tzinfo=None) + for k, v in data.items(): + upsert_sql = SQL_TEMPLATES["upsert_full_entities"] + _data = { + "workspace": self.db.workspace, + "id": k, + "entity_names": json.dumps(v["entity_names"]), + "count": v["count"], + "create_time": current_time, + "update_time": current_time, + } + await self.db.execute(upsert_sql, _data) + elif is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS): + # Get current UTC time and convert to naive datetime for database storage + current_time = datetime.datetime.now(timezone.utc).replace(tzinfo=None) + for k, v in data.items(): + upsert_sql = SQL_TEMPLATES["upsert_full_relations"] + _data = { + "workspace": self.db.workspace, + "id": k, + "relation_pairs": json.dumps(v["relation_pairs"]), + "count": v["count"], + "create_time": current_time, + "update_time": current_time, + } + await self.db.execute(upsert_sql, _data) async def index_done_callback(self) -> None: # PG handles persistence automatically @@ -3703,6 +3907,7 @@ class PGGraphStorage(BaseGraphStorage): Returns: A list of all edges, where each edge is a dictionary of its properties + (The edge is bidirectional; deduplication must be handled by the caller) """ query = f"""SELECT * FROM cypher('{self.graph_name}', $$ MATCH (a:base)-[r]-(b:base) @@ -3755,6 +3960,8 @@ NAMESPACE_TABLE_MAP = { NameSpace.VECTOR_STORE_RELATIONSHIPS: "LIGHTRAG_VDB_RELATION", NameSpace.DOC_STATUS: "LIGHTRAG_DOC_STATUS", NameSpace.KV_STORE_LLM_RESPONSE_CACHE: "LIGHTRAG_LLM_CACHE", + NameSpace.KV_STORE_FULL_ENTITIES: "LIGHTRAG_FULL_ENTITIES", + NameSpace.KV_STORE_FULL_RELATIONS: "LIGHTRAG_FULL_RELATIONS", } @@ -3867,6 +4074,28 @@ TABLES = { CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id) )""" }, + "LIGHTRAG_FULL_ENTITIES": { + "ddl": """CREATE TABLE LIGHTRAG_FULL_ENTITIES ( + id VARCHAR(255), + workspace VARCHAR(255), + entity_names JSONB, + count INTEGER, + create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, + update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT LIGHTRAG_FULL_ENTITIES_PK PRIMARY KEY (workspace, id) + )""" + }, + "LIGHTRAG_FULL_RELATIONS": { + "ddl": """CREATE TABLE LIGHTRAG_FULL_RELATIONS ( + id VARCHAR(255), + workspace VARCHAR(255), + relation_pairs JSONB, + count INTEGER, + create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, + update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT LIGHTRAG_FULL_RELATIONS_PK PRIMARY KEY (workspace, id) + )""" + }, } @@ -3905,6 +4134,26 @@ SQL_TEMPLATES = { EXTRACT(EPOCH FROM update_time)::BIGINT as update_time FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id IN ({ids}) """, + "get_by_id_full_entities": """SELECT id, entity_names, count, + EXTRACT(EPOCH FROM create_time)::BIGINT as create_time, + EXTRACT(EPOCH FROM update_time)::BIGINT as update_time + FROM LIGHTRAG_FULL_ENTITIES WHERE workspace=$1 AND id=$2 + """, + "get_by_id_full_relations": """SELECT id, relation_pairs, count, + EXTRACT(EPOCH FROM create_time)::BIGINT as create_time, + EXTRACT(EPOCH FROM update_time)::BIGINT as update_time + FROM LIGHTRAG_FULL_RELATIONS WHERE workspace=$1 AND id=$2 + """, + "get_by_ids_full_entities": """SELECT id, entity_names, count, + EXTRACT(EPOCH FROM create_time)::BIGINT as create_time, + EXTRACT(EPOCH FROM update_time)::BIGINT as update_time + FROM LIGHTRAG_FULL_ENTITIES WHERE workspace=$1 AND id IN ({ids}) + """, + "get_by_ids_full_relations": """SELECT id, relation_pairs, count, + EXTRACT(EPOCH FROM create_time)::BIGINT as create_time, + EXTRACT(EPOCH FROM update_time)::BIGINT as update_time + FROM LIGHTRAG_FULL_RELATIONS WHERE workspace=$1 AND id IN ({ids}) + """, "filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})", "upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, workspace) VALUES ($1, $2, $3) @@ -3934,6 +4183,22 @@ SQL_TEMPLATES = { llm_cache_list=EXCLUDED.llm_cache_list, update_time = EXCLUDED.update_time """, + "upsert_full_entities": """INSERT INTO LIGHTRAG_FULL_ENTITIES (workspace, id, entity_names, count, + create_time, update_time) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (workspace,id) DO UPDATE + SET entity_names=EXCLUDED.entity_names, + count=EXCLUDED.count, + update_time = EXCLUDED.update_time + """, + "upsert_full_relations": """INSERT INTO LIGHTRAG_FULL_RELATIONS (workspace, id, relation_pairs, count, + create_time, update_time) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (workspace,id) DO UPDATE + SET relation_pairs=EXCLUDED.relation_pairs, + count=EXCLUDED.count, + update_time = EXCLUDED.update_time + """, # SQL for VectorStorage "upsert_chunk": """INSERT INTO LIGHTRAG_VDB_CHUNKS (workspace, id, tokens, chunk_order_index, full_doc_id, content, content_vector, file_path, From 75051953038cf2711ddbe3a7db673d7167da5120 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 3 Aug 2025 23:02:58 +0800 Subject: [PATCH 08/11] fix: add full_entities and full_relations to clear_documents storage list --- lightrag/api/routers/document_routes.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 4dc17dc3..cacc135b 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1473,6 +1473,8 @@ def create_document_routes( storages = [ rag.text_chunks, rag.full_docs, + rag.full_entities, + rag.full_relations, rag.entities_vdb, rag.relationships_vdb, rag.chunks_vdb, From 2f1d1b69e5c751ee965367d03abfa1ceb2f24196 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 2 Aug 2025 01:17:58 +0800 Subject: [PATCH 09/11] Bump api version to 0197 --- lightrag/api/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightrag/api/__init__.py b/lightrag/api/__init__.py index d173904e..1e2f9a11 100644 --- a/lightrag/api/__init__.py +++ b/lightrag/api/__init__.py @@ -1 +1 @@ -__api_version__ = "0196" +__api_version__ = "0197" From daf2633dc2175a856a07f748fced2826a0fcfe41 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 3 Aug 2025 23:04:42 +0800 Subject: [PATCH 10/11] Bump api version to 0198 --- lightrag/api/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightrag/api/__init__.py b/lightrag/api/__init__.py index 1e2f9a11..f3602985 100644 --- a/lightrag/api/__init__.py +++ b/lightrag/api/__init__.py @@ -1 +1 @@ -__api_version__ = "0197" +__api_version__ = "0198" From 5513155808dff38f38e4dffae7c61fe78746e7a4 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 4 Aug 2025 00:21:20 +0800 Subject: [PATCH 11/11] Fix namespace tablename translate error - Reorder namespace table map for PostgreSQL - Ensure specific namespaces come first --- lightrag/kg/postgres_impl.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 6f8529e6..c30657cd 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -3952,16 +3952,18 @@ class PGGraphStorage(BaseGraphStorage): return {"status": "error", "message": str(e)} +# Note: Order matters! More specific namespaces (e.g., "full_entities") must come before +# more general ones (e.g., "entities") because is_namespace() uses endswith() matching NAMESPACE_TABLE_MAP = { NameSpace.KV_STORE_FULL_DOCS: "LIGHTRAG_DOC_FULL", NameSpace.KV_STORE_TEXT_CHUNKS: "LIGHTRAG_DOC_CHUNKS", + NameSpace.KV_STORE_FULL_ENTITIES: "LIGHTRAG_FULL_ENTITIES", + NameSpace.KV_STORE_FULL_RELATIONS: "LIGHTRAG_FULL_RELATIONS", + NameSpace.KV_STORE_LLM_RESPONSE_CACHE: "LIGHTRAG_LLM_CACHE", NameSpace.VECTOR_STORE_CHUNKS: "LIGHTRAG_VDB_CHUNKS", NameSpace.VECTOR_STORE_ENTITIES: "LIGHTRAG_VDB_ENTITY", NameSpace.VECTOR_STORE_RELATIONSHIPS: "LIGHTRAG_VDB_RELATION", NameSpace.DOC_STATUS: "LIGHTRAG_DOC_STATUS", - NameSpace.KV_STORE_LLM_RESPONSE_CACHE: "LIGHTRAG_LLM_CACHE", - NameSpace.KV_STORE_FULL_ENTITIES: "LIGHTRAG_FULL_ENTITIES", - NameSpace.KV_STORE_FULL_RELATIONS: "LIGHTRAG_FULL_RELATIONS", }