From 952d1feb07a3bcd39df4e584eddb6bb642281d3c Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 3 Aug 2025 22:54:56 +0800 Subject: [PATCH] feat: Add support for KV_STORE_FULL_ENTITIES and KV_STORE_FULL_RELATIONS namespaces in PGKVStorage - Add LIGHTRAG_FULL_ENTITIES and LIGHTRAG_FULL_RELATIONS table schemas - Implement complete CRUD operations for both namespaces - Add automatic table creation and migration support - Add SQL templates and namespace mappings - Ensure workspace isolation and proper indexing --- lightrag/base.py | 1 + lightrag/kg/postgres_impl.py | 265 +++++++++++++++++++++++++++++++++++ 2 files changed, 266 insertions(+) diff --git a/lightrag/base.py b/lightrag/base.py index 9f82c6eb..045bcb07 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -660,6 +660,7 @@ class BaseGraphStorage(StorageNameSpace, ABC): Returns: A list of all nodes, where each node is a dictionary of its properties + (Edge is bidirectional for some storage implementation; deduplication must be handled by the caller) """ @abstractmethod diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index fc21a50b..6f8529e6 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -920,6 +920,80 @@ class PostgreSQLDB: except Exception as e: logger.error(f"PostgreSQL, Failed to create pagination indexes: {e}") + # Migrate to ensure new tables LIGHTRAG_FULL_ENTITIES and LIGHTRAG_FULL_RELATIONS exist + try: + await self._migrate_create_full_entities_relations_tables() + except Exception as e: + logger.error( + f"PostgreSQL, Failed to create full entities/relations tables: {e}" + ) + + async def _migrate_create_full_entities_relations_tables(self): + """Create LIGHTRAG_FULL_ENTITIES and LIGHTRAG_FULL_RELATIONS tables if they don't exist""" + tables_to_check = [ + { + "name": "LIGHTRAG_FULL_ENTITIES", + "ddl": TABLES["LIGHTRAG_FULL_ENTITIES"]["ddl"], + "description": "Full entities storage table", + }, + { + "name": "LIGHTRAG_FULL_RELATIONS", + "ddl": TABLES["LIGHTRAG_FULL_RELATIONS"]["ddl"], + "description": "Full relations storage table", + }, + ] + + for table_info in tables_to_check: + table_name = table_info["name"] + try: + # Check if table exists + check_table_sql = """ + SELECT table_name + FROM information_schema.tables + WHERE table_name = $1 + AND table_schema = 'public' + """ + + table_exists = await self.query( + check_table_sql, {"table_name": table_name.lower()} + ) + + if not table_exists: + logger.info(f"Creating table {table_name}") + await self.execute(table_info["ddl"]) + logger.info( + f"Successfully created {table_info['description']}: {table_name}" + ) + + # Create basic indexes for the new table + try: + # Create index for id column + index_name = f"idx_{table_name.lower()}_id" + create_index_sql = ( + f"CREATE INDEX {index_name} ON {table_name}(id)" + ) + await self.execute(create_index_sql) + logger.info(f"Created index {index_name} on table {table_name}") + + # Create composite index for (workspace, id) columns + composite_index_name = f"idx_{table_name.lower()}_workspace_id" + create_composite_index_sql = f"CREATE INDEX {composite_index_name} ON {table_name}(workspace, id)" + await self.execute(create_composite_index_sql) + logger.info( + f"Created composite index {composite_index_name} on table {table_name}" + ) + + except Exception as e: + logger.warning( + f"Failed to create indexes for table {table_name}: {e}" + ) + + else: + logger.debug(f"Table {table_name} already exists") + + except Exception as e: + logger.error(f"Failed to create table {table_name}: {e}") + async def _create_pagination_indexes(self): """Create indexes to optimize pagination queries for LIGHTRAG_DOC_STATUS""" indexes = [ @@ -1233,6 +1307,46 @@ class PGKVStorage(BaseKVStorage): processed_results[row["id"]] = row return processed_results + # For FULL_ENTITIES namespace, parse entity_names JSON string back to list + if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES): + processed_results = {} + for row in results: + entity_names = row.get("entity_names", []) + if isinstance(entity_names, str): + try: + entity_names = json.loads(entity_names) + except json.JSONDecodeError: + entity_names = [] + row["entity_names"] = entity_names + create_time = row.get("create_time", 0) + update_time = row.get("update_time", 0) + row["create_time"] = create_time + row["update_time"] = ( + create_time if update_time == 0 else update_time + ) + processed_results[row["id"]] = row + return processed_results + + # For FULL_RELATIONS namespace, parse relation_pairs JSON string back to list + if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS): + processed_results = {} + for row in results: + relation_pairs = row.get("relation_pairs", []) + if isinstance(relation_pairs, str): + try: + relation_pairs = json.loads(relation_pairs) + except json.JSONDecodeError: + relation_pairs = [] + row["relation_pairs"] = relation_pairs + create_time = row.get("create_time", 0) + update_time = row.get("update_time", 0) + row["create_time"] = create_time + row["update_time"] = ( + create_time if update_time == 0 else update_time + ) + processed_results[row["id"]] = row + return processed_results + # For other namespaces, return as-is return {row["id"]: row for row in results} except Exception as e: @@ -1277,6 +1391,36 @@ class PGKVStorage(BaseKVStorage): "update_time": create_time if update_time == 0 else update_time, } + # Special handling for FULL_ENTITIES namespace + if response and is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES): + # Parse entity_names JSON string back to list + entity_names = response.get("entity_names", []) + if isinstance(entity_names, str): + try: + entity_names = json.loads(entity_names) + except json.JSONDecodeError: + entity_names = [] + response["entity_names"] = entity_names + create_time = response.get("create_time", 0) + update_time = response.get("update_time", 0) + response["create_time"] = create_time + response["update_time"] = create_time if update_time == 0 else update_time + + # Special handling for FULL_RELATIONS namespace + if response and is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS): + # Parse relation_pairs JSON string back to list + relation_pairs = response.get("relation_pairs", []) + if isinstance(relation_pairs, str): + try: + relation_pairs = json.loads(relation_pairs) + except json.JSONDecodeError: + relation_pairs = [] + response["relation_pairs"] = relation_pairs + create_time = response.get("create_time", 0) + update_time = response.get("update_time", 0) + response["create_time"] = create_time + response["update_time"] = create_time if update_time == 0 else update_time + return response if response else None # Query by id @@ -1325,6 +1469,38 @@ class PGKVStorage(BaseKVStorage): processed_results.append(processed_row) return processed_results + # Special handling for FULL_ENTITIES namespace + if results and is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES): + for result in results: + # Parse entity_names JSON string back to list + entity_names = result.get("entity_names", []) + if isinstance(entity_names, str): + try: + entity_names = json.loads(entity_names) + except json.JSONDecodeError: + entity_names = [] + result["entity_names"] = entity_names + create_time = result.get("create_time", 0) + update_time = result.get("update_time", 0) + result["create_time"] = create_time + result["update_time"] = create_time if update_time == 0 else update_time + + # Special handling for FULL_RELATIONS namespace + if results and is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS): + for result in results: + # Parse relation_pairs JSON string back to list + relation_pairs = result.get("relation_pairs", []) + if isinstance(relation_pairs, str): + try: + relation_pairs = json.loads(relation_pairs) + except json.JSONDecodeError: + relation_pairs = [] + result["relation_pairs"] = relation_pairs + create_time = result.get("create_time", 0) + update_time = result.get("update_time", 0) + result["create_time"] = create_time + result["update_time"] = create_time if update_time == 0 else update_time + return results if results else [] async def filter_keys(self, keys: set[str]) -> set[str]: @@ -1397,6 +1573,34 @@ class PGKVStorage(BaseKVStorage): } await self.db.execute(upsert_sql, _data) + elif is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES): + # Get current UTC time and convert to naive datetime for database storage + current_time = datetime.datetime.now(timezone.utc).replace(tzinfo=None) + for k, v in data.items(): + upsert_sql = SQL_TEMPLATES["upsert_full_entities"] + _data = { + "workspace": self.db.workspace, + "id": k, + "entity_names": json.dumps(v["entity_names"]), + "count": v["count"], + "create_time": current_time, + "update_time": current_time, + } + await self.db.execute(upsert_sql, _data) + elif is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS): + # Get current UTC time and convert to naive datetime for database storage + current_time = datetime.datetime.now(timezone.utc).replace(tzinfo=None) + for k, v in data.items(): + upsert_sql = SQL_TEMPLATES["upsert_full_relations"] + _data = { + "workspace": self.db.workspace, + "id": k, + "relation_pairs": json.dumps(v["relation_pairs"]), + "count": v["count"], + "create_time": current_time, + "update_time": current_time, + } + await self.db.execute(upsert_sql, _data) async def index_done_callback(self) -> None: # PG handles persistence automatically @@ -3703,6 +3907,7 @@ class PGGraphStorage(BaseGraphStorage): Returns: A list of all edges, where each edge is a dictionary of its properties + (The edge is bidirectional; deduplication must be handled by the caller) """ query = f"""SELECT * FROM cypher('{self.graph_name}', $$ MATCH (a:base)-[r]-(b:base) @@ -3755,6 +3960,8 @@ NAMESPACE_TABLE_MAP = { NameSpace.VECTOR_STORE_RELATIONSHIPS: "LIGHTRAG_VDB_RELATION", NameSpace.DOC_STATUS: "LIGHTRAG_DOC_STATUS", NameSpace.KV_STORE_LLM_RESPONSE_CACHE: "LIGHTRAG_LLM_CACHE", + NameSpace.KV_STORE_FULL_ENTITIES: "LIGHTRAG_FULL_ENTITIES", + NameSpace.KV_STORE_FULL_RELATIONS: "LIGHTRAG_FULL_RELATIONS", } @@ -3867,6 +4074,28 @@ TABLES = { CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id) )""" }, + "LIGHTRAG_FULL_ENTITIES": { + "ddl": """CREATE TABLE LIGHTRAG_FULL_ENTITIES ( + id VARCHAR(255), + workspace VARCHAR(255), + entity_names JSONB, + count INTEGER, + create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, + update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT LIGHTRAG_FULL_ENTITIES_PK PRIMARY KEY (workspace, id) + )""" + }, + "LIGHTRAG_FULL_RELATIONS": { + "ddl": """CREATE TABLE LIGHTRAG_FULL_RELATIONS ( + id VARCHAR(255), + workspace VARCHAR(255), + relation_pairs JSONB, + count INTEGER, + create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, + update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT LIGHTRAG_FULL_RELATIONS_PK PRIMARY KEY (workspace, id) + )""" + }, } @@ -3905,6 +4134,26 @@ SQL_TEMPLATES = { EXTRACT(EPOCH FROM update_time)::BIGINT as update_time FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id IN ({ids}) """, + "get_by_id_full_entities": """SELECT id, entity_names, count, + EXTRACT(EPOCH FROM create_time)::BIGINT as create_time, + EXTRACT(EPOCH FROM update_time)::BIGINT as update_time + FROM LIGHTRAG_FULL_ENTITIES WHERE workspace=$1 AND id=$2 + """, + "get_by_id_full_relations": """SELECT id, relation_pairs, count, + EXTRACT(EPOCH FROM create_time)::BIGINT as create_time, + EXTRACT(EPOCH FROM update_time)::BIGINT as update_time + FROM LIGHTRAG_FULL_RELATIONS WHERE workspace=$1 AND id=$2 + """, + "get_by_ids_full_entities": """SELECT id, entity_names, count, + EXTRACT(EPOCH FROM create_time)::BIGINT as create_time, + EXTRACT(EPOCH FROM update_time)::BIGINT as update_time + FROM LIGHTRAG_FULL_ENTITIES WHERE workspace=$1 AND id IN ({ids}) + """, + "get_by_ids_full_relations": """SELECT id, relation_pairs, count, + EXTRACT(EPOCH FROM create_time)::BIGINT as create_time, + EXTRACT(EPOCH FROM update_time)::BIGINT as update_time + FROM LIGHTRAG_FULL_RELATIONS WHERE workspace=$1 AND id IN ({ids}) + """, "filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})", "upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, workspace) VALUES ($1, $2, $3) @@ -3934,6 +4183,22 @@ SQL_TEMPLATES = { llm_cache_list=EXCLUDED.llm_cache_list, update_time = EXCLUDED.update_time """, + "upsert_full_entities": """INSERT INTO LIGHTRAG_FULL_ENTITIES (workspace, id, entity_names, count, + create_time, update_time) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (workspace,id) DO UPDATE + SET entity_names=EXCLUDED.entity_names, + count=EXCLUDED.count, + update_time = EXCLUDED.update_time + """, + "upsert_full_relations": """INSERT INTO LIGHTRAG_FULL_RELATIONS (workspace, id, relation_pairs, count, + create_time, update_time) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (workspace,id) DO UPDATE + SET relation_pairs=EXCLUDED.relation_pairs, + count=EXCLUDED.count, + update_time = EXCLUDED.update_time + """, # SQL for VectorStorage "upsert_chunk": """INSERT INTO LIGHTRAG_VDB_CHUNKS (workspace, id, tokens, chunk_order_index, full_doc_id, content, content_vector, file_path,