feat: Add support for KV_STORE_FULL_ENTITIES and KV_STORE_FULL_RELATIONS namespaces in PGKVStorage

- Add LIGHTRAG_FULL_ENTITIES and LIGHTRAG_FULL_RELATIONS table schemas
- Implement complete CRUD operations for both namespaces
- Add automatic table creation and migration support
- Add SQL templates and namespace mappings
- Ensure workspace isolation and proper indexing
This commit is contained in:
yangdx 2025-08-03 22:54:56 +08:00
parent bf9a6d699b
commit 952d1feb07
2 changed files with 266 additions and 0 deletions

View file

@ -660,6 +660,7 @@ class BaseGraphStorage(StorageNameSpace, ABC):
Returns:
A list of all nodes, where each node is a dictionary of its properties
(Edge is bidirectional for some storage implementation; deduplication must be handled by the caller)
"""
@abstractmethod

View file

@ -920,6 +920,80 @@ class PostgreSQLDB:
except Exception as e:
logger.error(f"PostgreSQL, Failed to create pagination indexes: {e}")
# Migrate to ensure new tables LIGHTRAG_FULL_ENTITIES and LIGHTRAG_FULL_RELATIONS exist
try:
await self._migrate_create_full_entities_relations_tables()
except Exception as e:
logger.error(
f"PostgreSQL, Failed to create full entities/relations tables: {e}"
)
async def _migrate_create_full_entities_relations_tables(self):
"""Create LIGHTRAG_FULL_ENTITIES and LIGHTRAG_FULL_RELATIONS tables if they don't exist"""
tables_to_check = [
{
"name": "LIGHTRAG_FULL_ENTITIES",
"ddl": TABLES["LIGHTRAG_FULL_ENTITIES"]["ddl"],
"description": "Full entities storage table",
},
{
"name": "LIGHTRAG_FULL_RELATIONS",
"ddl": TABLES["LIGHTRAG_FULL_RELATIONS"]["ddl"],
"description": "Full relations storage table",
},
]
for table_info in tables_to_check:
table_name = table_info["name"]
try:
# Check if table exists
check_table_sql = """
SELECT table_name
FROM information_schema.tables
WHERE table_name = $1
AND table_schema = 'public'
"""
table_exists = await self.query(
check_table_sql, {"table_name": table_name.lower()}
)
if not table_exists:
logger.info(f"Creating table {table_name}")
await self.execute(table_info["ddl"])
logger.info(
f"Successfully created {table_info['description']}: {table_name}"
)
# Create basic indexes for the new table
try:
# Create index for id column
index_name = f"idx_{table_name.lower()}_id"
create_index_sql = (
f"CREATE INDEX {index_name} ON {table_name}(id)"
)
await self.execute(create_index_sql)
logger.info(f"Created index {index_name} on table {table_name}")
# Create composite index for (workspace, id) columns
composite_index_name = f"idx_{table_name.lower()}_workspace_id"
create_composite_index_sql = f"CREATE INDEX {composite_index_name} ON {table_name}(workspace, id)"
await self.execute(create_composite_index_sql)
logger.info(
f"Created composite index {composite_index_name} on table {table_name}"
)
except Exception as e:
logger.warning(
f"Failed to create indexes for table {table_name}: {e}"
)
else:
logger.debug(f"Table {table_name} already exists")
except Exception as e:
logger.error(f"Failed to create table {table_name}: {e}")
async def _create_pagination_indexes(self):
"""Create indexes to optimize pagination queries for LIGHTRAG_DOC_STATUS"""
indexes = [
@ -1233,6 +1307,46 @@ class PGKVStorage(BaseKVStorage):
processed_results[row["id"]] = row
return processed_results
# For FULL_ENTITIES namespace, parse entity_names JSON string back to list
if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES):
processed_results = {}
for row in results:
entity_names = row.get("entity_names", [])
if isinstance(entity_names, str):
try:
entity_names = json.loads(entity_names)
except json.JSONDecodeError:
entity_names = []
row["entity_names"] = entity_names
create_time = row.get("create_time", 0)
update_time = row.get("update_time", 0)
row["create_time"] = create_time
row["update_time"] = (
create_time if update_time == 0 else update_time
)
processed_results[row["id"]] = row
return processed_results
# For FULL_RELATIONS namespace, parse relation_pairs JSON string back to list
if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS):
processed_results = {}
for row in results:
relation_pairs = row.get("relation_pairs", [])
if isinstance(relation_pairs, str):
try:
relation_pairs = json.loads(relation_pairs)
except json.JSONDecodeError:
relation_pairs = []
row["relation_pairs"] = relation_pairs
create_time = row.get("create_time", 0)
update_time = row.get("update_time", 0)
row["create_time"] = create_time
row["update_time"] = (
create_time if update_time == 0 else update_time
)
processed_results[row["id"]] = row
return processed_results
# For other namespaces, return as-is
return {row["id"]: row for row in results}
except Exception as e:
@ -1277,6 +1391,36 @@ class PGKVStorage(BaseKVStorage):
"update_time": create_time if update_time == 0 else update_time,
}
# Special handling for FULL_ENTITIES namespace
if response and is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES):
# Parse entity_names JSON string back to list
entity_names = response.get("entity_names", [])
if isinstance(entity_names, str):
try:
entity_names = json.loads(entity_names)
except json.JSONDecodeError:
entity_names = []
response["entity_names"] = entity_names
create_time = response.get("create_time", 0)
update_time = response.get("update_time", 0)
response["create_time"] = create_time
response["update_time"] = create_time if update_time == 0 else update_time
# Special handling for FULL_RELATIONS namespace
if response and is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS):
# Parse relation_pairs JSON string back to list
relation_pairs = response.get("relation_pairs", [])
if isinstance(relation_pairs, str):
try:
relation_pairs = json.loads(relation_pairs)
except json.JSONDecodeError:
relation_pairs = []
response["relation_pairs"] = relation_pairs
create_time = response.get("create_time", 0)
update_time = response.get("update_time", 0)
response["create_time"] = create_time
response["update_time"] = create_time if update_time == 0 else update_time
return response if response else None
# Query by id
@ -1325,6 +1469,38 @@ class PGKVStorage(BaseKVStorage):
processed_results.append(processed_row)
return processed_results
# Special handling for FULL_ENTITIES namespace
if results and is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES):
for result in results:
# Parse entity_names JSON string back to list
entity_names = result.get("entity_names", [])
if isinstance(entity_names, str):
try:
entity_names = json.loads(entity_names)
except json.JSONDecodeError:
entity_names = []
result["entity_names"] = entity_names
create_time = result.get("create_time", 0)
update_time = result.get("update_time", 0)
result["create_time"] = create_time
result["update_time"] = create_time if update_time == 0 else update_time
# Special handling for FULL_RELATIONS namespace
if results and is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS):
for result in results:
# Parse relation_pairs JSON string back to list
relation_pairs = result.get("relation_pairs", [])
if isinstance(relation_pairs, str):
try:
relation_pairs = json.loads(relation_pairs)
except json.JSONDecodeError:
relation_pairs = []
result["relation_pairs"] = relation_pairs
create_time = result.get("create_time", 0)
update_time = result.get("update_time", 0)
result["create_time"] = create_time
result["update_time"] = create_time if update_time == 0 else update_time
return results if results else []
async def filter_keys(self, keys: set[str]) -> set[str]:
@ -1397,6 +1573,34 @@ class PGKVStorage(BaseKVStorage):
}
await self.db.execute(upsert_sql, _data)
elif is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES):
# Get current UTC time and convert to naive datetime for database storage
current_time = datetime.datetime.now(timezone.utc).replace(tzinfo=None)
for k, v in data.items():
upsert_sql = SQL_TEMPLATES["upsert_full_entities"]
_data = {
"workspace": self.db.workspace,
"id": k,
"entity_names": json.dumps(v["entity_names"]),
"count": v["count"],
"create_time": current_time,
"update_time": current_time,
}
await self.db.execute(upsert_sql, _data)
elif is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS):
# Get current UTC time and convert to naive datetime for database storage
current_time = datetime.datetime.now(timezone.utc).replace(tzinfo=None)
for k, v in data.items():
upsert_sql = SQL_TEMPLATES["upsert_full_relations"]
_data = {
"workspace": self.db.workspace,
"id": k,
"relation_pairs": json.dumps(v["relation_pairs"]),
"count": v["count"],
"create_time": current_time,
"update_time": current_time,
}
await self.db.execute(upsert_sql, _data)
async def index_done_callback(self) -> None:
# PG handles persistence automatically
@ -3703,6 +3907,7 @@ class PGGraphStorage(BaseGraphStorage):
Returns:
A list of all edges, where each edge is a dictionary of its properties
(The edge is bidirectional; deduplication must be handled by the caller)
"""
query = f"""SELECT * FROM cypher('{self.graph_name}', $$
MATCH (a:base)-[r]-(b:base)
@ -3755,6 +3960,8 @@ NAMESPACE_TABLE_MAP = {
NameSpace.VECTOR_STORE_RELATIONSHIPS: "LIGHTRAG_VDB_RELATION",
NameSpace.DOC_STATUS: "LIGHTRAG_DOC_STATUS",
NameSpace.KV_STORE_LLM_RESPONSE_CACHE: "LIGHTRAG_LLM_CACHE",
NameSpace.KV_STORE_FULL_ENTITIES: "LIGHTRAG_FULL_ENTITIES",
NameSpace.KV_STORE_FULL_RELATIONS: "LIGHTRAG_FULL_RELATIONS",
}
@ -3867,6 +4074,28 @@ TABLES = {
CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id)
)"""
},
"LIGHTRAG_FULL_ENTITIES": {
"ddl": """CREATE TABLE LIGHTRAG_FULL_ENTITIES (
id VARCHAR(255),
workspace VARCHAR(255),
entity_names JSONB,
count INTEGER,
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_FULL_ENTITIES_PK PRIMARY KEY (workspace, id)
)"""
},
"LIGHTRAG_FULL_RELATIONS": {
"ddl": """CREATE TABLE LIGHTRAG_FULL_RELATIONS (
id VARCHAR(255),
workspace VARCHAR(255),
relation_pairs JSONB,
count INTEGER,
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_FULL_RELATIONS_PK PRIMARY KEY (workspace, id)
)"""
},
}
@ -3905,6 +4134,26 @@ SQL_TEMPLATES = {
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id IN ({ids})
""",
"get_by_id_full_entities": """SELECT id, entity_names, count,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_FULL_ENTITIES WHERE workspace=$1 AND id=$2
""",
"get_by_id_full_relations": """SELECT id, relation_pairs, count,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_FULL_RELATIONS WHERE workspace=$1 AND id=$2
""",
"get_by_ids_full_entities": """SELECT id, entity_names, count,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_FULL_ENTITIES WHERE workspace=$1 AND id IN ({ids})
""",
"get_by_ids_full_relations": """SELECT id, relation_pairs, count,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_FULL_RELATIONS WHERE workspace=$1 AND id IN ({ids})
""",
"filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})",
"upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, workspace)
VALUES ($1, $2, $3)
@ -3934,6 +4183,22 @@ SQL_TEMPLATES = {
llm_cache_list=EXCLUDED.llm_cache_list,
update_time = EXCLUDED.update_time
""",
"upsert_full_entities": """INSERT INTO LIGHTRAG_FULL_ENTITIES (workspace, id, entity_names, count,
create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (workspace,id) DO UPDATE
SET entity_names=EXCLUDED.entity_names,
count=EXCLUDED.count,
update_time = EXCLUDED.update_time
""",
"upsert_full_relations": """INSERT INTO LIGHTRAG_FULL_RELATIONS (workspace, id, relation_pairs, count,
create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (workspace,id) DO UPDATE
SET relation_pairs=EXCLUDED.relation_pairs,
count=EXCLUDED.count,
update_time = EXCLUDED.update_time
""",
# SQL for VectorStorage
"upsert_chunk": """INSERT INTO LIGHTRAG_VDB_CHUNKS (workspace, id, tokens,
chunk_order_index, full_doc_id, content, content_vector, file_path,