fix: renamed PostGreSQL options env variable and allowed LRU cache to be an optional env variable

(cherry picked from commit 22a7b482c5)
This commit is contained in:
kevinnkansah 2025-10-06 11:56:09 +02:00 committed by Raphaël MANSUY
parent 4e93c9c21d
commit 8f5af8199b
2 changed files with 50 additions and 239 deletions

View file

@ -351,10 +351,13 @@ POSTGRES_IVFFLAT_LISTS=100
# POSTGRES_SSL_ROOT_CERT=/path/to/ca-cert.pem
# POSTGRES_SSL_CRL=/path/to/crl.pem
### PostgreSQL Server Options (for Supabase Supavisor)
### PostgreSQL Server Settings (for Supabase Supavisor)
# Use this to pass extra options to the PostgreSQL connection string.
# For Supabase, you might need to set it like this:
# POSTGRES_SERVER_OPTIONS="options=reference%3D[project-ref]"
# POSTGRES_SERVER_SETTINGS="options=reference%3D[project-ref]"
# Default is 100 set to 0 to disable
# POSTGRES_STATEMENT_CACHE_SIZE=100
### Neo4j Configuration
NEO4J_URI=neo4j+s://xxxxxxxx.databases.neo4j.io

View file

@ -10,7 +10,6 @@ import numpy as np
import configparser
import ssl
import itertools
import hashlib
from lightrag.types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
@ -33,7 +32,6 @@ from ..namespace import NameSpace, is_namespace
from ..utils import logger
from ..constants import GRAPH_FIELD_SEP
from ..kg.shared_storage import get_data_init_lock, get_graph_db_lock, get_storage_lock
from ..utils_context import get_current_tenant_id
import pipmaster as pm
@ -78,6 +76,7 @@ class PostgreSQLDB:
# Server settings
self.server_settings = config.get("server_settings")
self.statement_cache_size = int(config.get("statement_cache_size"))
if self.user is None or self.password is None or self.database is None:
raise ValueError("Missing database user, password, or database")
@ -163,8 +162,13 @@ class PostgreSQLDB:
"port": self.port,
"min_size": 1,
"max_size": self.max,
"statement_cache_size": self.statement_cache_size,
}
logger.info(
f"PostgreSQL, statement LRU cache size set as: {self.statement_cache_size}"
)
# Add SSL configuration if provided
ssl_context = self._create_ssl_context()
if ssl_context is not None:
@ -237,40 +241,23 @@ class PostgreSQLDB:
"""Set the Apache AGE environment and creates a graph if it does not exist.
This method:
- Loads the AGE extension into the current session (required for Cypher functions).
- Sets the PostgreSQL `search_path` to include `ag_catalog`, ensuring that Apache AGE functions can be used without specifying the schema.
- Attempts to create a new graph with the provided `graph_name` if it does not already exist.
- Silently ignores errors related to the graph already existing.
"""
try:
# Load AGE extension - required for Cypher functions to work
await connection.execute("LOAD 'age'") # type: ignore
await connection.execute( # type: ignore
'SET search_path = ag_catalog, "$user", public'
)
# Check if graph exists first to avoid error logs
exists = await connection.fetchval(
"SELECT count(*) FROM ag_catalog.ag_graph WHERE name = $1", graph_name
await connection.execute( # type: ignore
f"select create_graph('{graph_name}')"
)
if exists == 0:
await connection.execute( # type: ignore
f"select create_graph('{graph_name}')"
)
except (
asyncpg.exceptions.InvalidSchemaNameError,
asyncpg.exceptions.UniqueViolationError,
asyncpg.exceptions.DuplicateObjectError, # Graph already exists
):
pass
except Exception as e:
# Handle "already exists" error message for AGE graphs
if "already exists" in str(e):
pass
else:
raise
async def _migrate_llm_cache_schema(self):
"""Migrate LLM cache schema: add new columns and remove deprecated mode field"""
@ -1089,7 +1076,9 @@ class PostgreSQLDB:
try:
# Create index for id column
index_name = f"idx_{table_name.lower()}_id"
create_index_sql = f"CREATE INDEX {index_name} ON {table_name}(id)"
create_index_sql = (
f"CREATE INDEX {index_name} ON {table_name}(id)"
)
await self.execute(create_index_sql)
logger.info(f"Created index {index_name} on table {table_name}")
@ -1145,11 +1134,6 @@ class PostgreSQLDB:
"sql": "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_lightrag_doc_status_workspace_file_path ON LIGHTRAG_DOC_STATUS (workspace, file_path)",
"description": "Index for workspace + file_path sorting",
},
{
"name": "idx_lightrag_doc_status_workspace_external_id",
"sql": "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_lightrag_doc_status_workspace_external_id ON LIGHTRAG_DOC_STATUS (workspace, (metadata->>'external_id')) WHERE metadata->>'external_id' IS NOT NULL",
"description": "Index for workspace + external_id for idempotency lookups",
},
]
for index in indexes:
@ -1265,11 +1249,6 @@ class PostgreSQLDB:
graph_name: str | None = None,
) -> dict[str, Any] | None | list[dict[str, Any]]:
async with self.pool.acquire() as connection: # type: ignore
# Set tenant context if available
tenant_id = get_current_tenant_id()
if tenant_id:
await connection.execute(f"SET app.current_tenant = '{tenant_id}'")
if with_age and graph_name:
await self.configure_age(connection, graph_name) # type: ignore
elif with_age and not graph_name:
@ -1277,15 +1256,24 @@ class PostgreSQLDB:
try:
if params:
if multirows:
return await connection.fetch(sql, *params)
else:
return await connection.fetchrow(sql, *params)
rows = await connection.fetch(sql, *params)
else:
if multirows:
return await connection.fetch(sql)
rows = await connection.fetch(sql)
if multirows:
if rows:
columns = [col for col in rows[0].keys()]
data = [dict(zip(columns, row)) for row in rows]
else:
return await connection.fetchrow(sql)
data = []
else:
if rows:
columns = rows[0].keys()
data = dict(zip(columns, rows[0]))
else:
data = None
return data
except Exception as e:
logger.error(f"PostgreSQL database, error:{e}")
raise
@ -1301,11 +1289,6 @@ class PostgreSQLDB:
):
try:
async with self.pool.acquire() as connection: # type: ignore
# Set tenant context if available
tenant_id = get_current_tenant_id()
if tenant_id:
await connection.execute(f"SET app.current_tenant = '{tenant_id}'")
if with_age and graph_name:
await self.configure_age(connection, graph_name)
elif with_age and not graph_name:
@ -1320,7 +1303,6 @@ class PostgreSQLDB:
asyncpg.exceptions.DuplicateTableError,
asyncpg.exceptions.DuplicateObjectError, # Catch "already exists" error
asyncpg.exceptions.InvalidSchemaNameError, # Also catch for AGE extension "already exists"
# asyncpg.exceptions.UndefinedTableError, # Catch "relation does not exist" for index creation
) as e:
if ignore_if_exists:
# If the flag is set, just ignore these specific errors
@ -1415,9 +1397,13 @@ class ClientManager:
),
# Server settings for Supabase
"server_settings": os.environ.get(
"POSTGRES_SERVER_OPTIONS",
"POSTGRES_SERVER_SETTINGS",
config.get("postgres", "server_options", fallback=None),
),
"statement_cache_size": os.environ.get(
"POSTGRES_STATEMENT_CACHE_SIZE",
config.get("postgres", "statement_cache_size", fallback=None),
),
}
@classmethod
@ -1471,9 +1457,6 @@ class PGKVStorage(BaseKVStorage):
# Use "default" for compatibility (lowest priority)
self.workspace = "default"
# Apply multi-tenant isolation
self.workspace = self._get_composite_workspace()
async def finalize(self):
async with get_storage_lock():
if self.db is not None:
@ -1524,7 +1507,6 @@ class PGKVStorage(BaseKVStorage):
if is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
processed_results = {}
for row in results:
row = dict(row)
llm_cache_list = row.get("llm_cache_list", [])
if isinstance(llm_cache_list, str):
try:
@ -1545,7 +1527,6 @@ class PGKVStorage(BaseKVStorage):
if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES):
processed_results = {}
for row in results:
row = dict(row)
entity_names = row.get("entity_names", [])
if isinstance(entity_names, str):
try:
@ -1566,7 +1547,6 @@ class PGKVStorage(BaseKVStorage):
if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS):
processed_results = {}
for row in results:
row = dict(row)
relation_pairs = row.get("relation_pairs", [])
if isinstance(relation_pairs, str):
try:
@ -1596,9 +1576,6 @@ class PGKVStorage(BaseKVStorage):
sql = SQL_TEMPLATES["get_by_id_" + self.namespace]
params = {"workspace": self.workspace, "id": id}
response = await self.db.query(sql, list(params.values()))
if response:
response = dict(response)
if response and is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
# Parse llm_cache_list JSON string back to list
@ -1679,9 +1656,6 @@ class PGKVStorage(BaseKVStorage):
)
params = {"workspace": self.workspace}
results = await self.db.query(sql, list(params.values()), multirows=True)
if results:
results = [dict(r) for r in results]
if results and is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
# Parse llm_cache_list JSON string back to list for each result
@ -1862,28 +1836,6 @@ class PGKVStorage(BaseKVStorage):
"update_time": current_time,
}
await self.db.execute(upsert_sql, _data)
elif is_namespace(self.namespace, NameSpace.KV_STORE_TENANTS):
for k, v in data.items():
upsert_sql = SQL_TEMPLATES["upsert_tenants"]
_data = {
"workspace": self.workspace,
"id": k,
"data": json.dumps(v),
"create_time": v.get("create_time"),
"update_time": v.get("update_time"),
}
await self.db.execute(upsert_sql, _data)
elif is_namespace(self.namespace, NameSpace.KV_STORE_KNOWLEDGE_BASES):
for k, v in data.items():
upsert_sql = SQL_TEMPLATES["upsert_knowledge_bases"]
_data = {
"workspace": self.workspace,
"id": k,
"data": json.dumps(v),
"create_time": v.get("create_time"),
"update_time": v.get("update_time"),
}
await self.db.execute(upsert_sql, _data)
async def index_done_callback(self) -> None:
# PG handles persistence automatically
@ -1971,9 +1923,6 @@ class PGVectorStorage(BaseVectorStorage):
# Use "default" for compatibility (lowest priority)
self.workspace = "default"
# Apply multi-tenant isolation
self.workspace = self._get_composite_workspace()
async def finalize(self):
async with get_storage_lock():
if self.db is not None:
@ -2521,62 +2470,6 @@ class PGDocStatusStorage(DocStatusStorage):
track_id=result[0].get("track_id"),
)
async def get_doc_by_external_id(self, external_id: str) -> Union[dict[str, Any], None]:
"""Get document by external_id for idempotency support.
Uses indexed lookup on metadata->>'external_id' for efficient retrieval.
Args:
external_id: The external unique identifier to search for
Returns:
Union[dict[str, Any], None]: Document data if found, None otherwise
"""
sql = """
SELECT * FROM LIGHTRAG_DOC_STATUS
WHERE workspace=$1 AND metadata->>'external_id' = $2
"""
params = {"workspace": self.workspace, "external_id": external_id}
result = await self.db.query(sql, list(params.values()), True)
if result is None or result == []:
return None
else:
# Parse chunks_list JSON string back to list
chunks_list = result[0].get("chunks_list", [])
if isinstance(chunks_list, str):
try:
chunks_list = json.loads(chunks_list)
except json.JSONDecodeError:
chunks_list = []
# Parse metadata JSON string back to dict
metadata = result[0].get("metadata", {})
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
metadata = {}
# Convert datetime objects to ISO format strings with timezone info
created_at = self._format_datetime_with_timezone(result[0]["created_at"])
updated_at = self._format_datetime_with_timezone(result[0]["updated_at"])
return dict(
id=result[0]["id"],
content_length=result[0]["content_length"],
content_summary=result[0]["content_summary"],
status=result[0]["status"],
chunks_count=result[0]["chunks_count"],
created_at=created_at,
updated_at=updated_at,
file_path=result[0]["file_path"],
chunks_list=chunks_list,
metadata=metadata,
error_msg=result[0].get("error_msg"),
track_id=result[0].get("track_id"),
)
async def get_status_counts(self) -> dict[str, int]:
"""Get counts of documents in each status"""
sql = """SELECT status as "status", COUNT(1) as "count"
@ -2700,7 +2593,7 @@ class PGDocStatusStorage(DocStatusStorage):
async def get_docs_paginated(
self,
status_filter: DocStatus | None,
status_filter: DocStatus | None = None,
page: int = 1,
page_size: int = 50,
sort_field: str = "updated_at",
@ -3013,19 +2906,7 @@ class PGGraphStorage(BaseGraphStorage):
# Ensure names comply with PostgreSQL identifier specifications
safe_workspace = re.sub(r"[^a-zA-Z0-9_]", "_", workspace.strip())
safe_namespace = re.sub(r"[^a-zA-Z0-9_]", "_", namespace)
graph_name = f"{safe_workspace}_{safe_namespace}"
# Ensure graph name starts with a letter (AGE requirement)
if not graph_name[0].isalpha():
graph_name = f"g_{graph_name}"
# PostgreSQL identifier limit is 63 bytes
if len(graph_name) > 63:
# Use MD5 hash to ensure uniqueness and fit within limit
hash_object = hashlib.md5(graph_name.encode())
graph_name = f"g_{hash_object.hexdigest()}"
return graph_name
return f"{safe_workspace}_{safe_namespace}"
else:
# When the workspace is "default", use the namespace directly (for backward compatibility with legacy implementations)
return re.sub(r"[^a-zA-Z0-9_]", "_", namespace)
@ -3108,35 +2989,6 @@ class PGGraphStorage(BaseGraphStorage):
graph_name=self.graph_name,
)
# Verify that essential labels exist by checking the ag_label catalog
# This helps catch cases where label creation silently failed
try:
async with self.db.pool.acquire() as connection:
await connection.execute("LOAD 'age'") # Required for AGE functions
await connection.execute('SET search_path = ag_catalog, "$user", public')
# Check if 'base' label exists for this graph
result = await connection.fetchrow(
"""
SELECT l.name
FROM ag_catalog.ag_label l
JOIN ag_catalog.ag_graph g ON l.graph = g.graphid
WHERE l.name = 'base' AND g.name = $1
""",
self.graph_name
)
if result is None:
logger.warning(
f"[{self.workspace}] 'base' vlabel not found for graph '{self.graph_name}', attempting to create..."
)
# Retry creating the vlabel
await connection.execute(
f"SELECT create_vlabel('{self.graph_name}', 'base')"
)
logger.info(f"[{self.workspace}] Successfully created 'base' vlabel for graph '{self.graph_name}'")
except Exception as e:
if "already exists" not in str(e):
logger.error(f"[{self.workspace}] Failed to verify/create 'base' vlabel: {e}")
async def finalize(self):
async with get_graph_db_lock():
if self.db is not None:
@ -3303,7 +3155,6 @@ class PGGraphStorage(BaseGraphStorage):
Returns:
list[dict[str, Any]]: a list of dictionaries containing the result set
"""
logger.info(f"[{self.workspace}] Executing query: {query}")
try:
if readonly:
data = await self.db.query(
@ -3494,6 +3345,7 @@ class PGGraphStorage(BaseGraphStorage):
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((PGGraphQueryException,)),
)
@ -3749,10 +3601,8 @@ class PGGraphStorage(BaseGraphStorage):
node_id = row["node_id"]
if not node_id:
continue
out_degree = out_degrees.get(node_id, 0)
in_degree = in_degrees.get(node_id, 0)
out_degrees[node_id] = out_degree + int(row.get("out_degree", 0) or 0)
in_degrees[node_id] = in_degree + int(row.get("in_degree", 0) or 0)
out_degrees[node_id] = int(row.get("out_degree", 0) or 0)
in_degrees[node_id] = int(row.get("in_degree", 0) or 0)
degrees_dict = {}
for node_id in node_ids:
@ -3856,13 +3706,15 @@ class PGGraphStorage(BaseGraphStorage):
SELECT * FROM cypher({dollar_quote(self.graph_name)}::name,
{dollar_quote(forward_cypher)}::cstring,
$1::agtype)
AS (source text, target text, edge_properties agtype)"""
AS (source text, target text, edge_properties agtype)
"""
sql_bwd = f"""
SELECT * FROM cypher({dollar_quote(self.graph_name)}::name,
{dollar_quote(backward_cypher)}::cstring,
$1::agtype)
AS (source text, target text, edge_properties agtype)"""
AS (source text, target text, edge_properties agtype)
"""
pg_params = {"params": json.dumps({"pairs": pairs}, ensure_ascii=False)}
@ -4034,8 +3886,7 @@ class PGGraphStorage(BaseGraphStorage):
f"[{self.workspace}] Failed to parse node string in batch: {node_dict}"
)
# Add node id (entity_id) to the dictionary for easier access
node_dict["id"] = node_dict.get("entity_id")
node_dict["id"] = node_dict["entity_id"]
nodes.append(node_dict)
return nodes
@ -4069,7 +3920,6 @@ class PGGraphStorage(BaseGraphStorage):
logger.warning(
f"[{self.workspace}] Failed to parse edge string in batch: {edge_agtype}"
)
edge_agtype = {}
source_agtype = item["source"]["properties"]
# Process string result, parse it to JSON dictionary
@ -4374,10 +4224,6 @@ class PGGraphStorage(BaseGraphStorage):
$$) AS (a AGTYPE, r AGTYPE, b AGTYPE)"""
results = await self._query(query)
logger.info(f"[{self.workspace}] Query results count: {len(results)}")
if results:
logger.info(f"[{self.workspace}] First result sample: {results[0]}")
# Process query results, deduplicate nodes and edges
nodes_dict = {}
edges_dict = {}
@ -4519,13 +4365,13 @@ class PGGraphStorage(BaseGraphStorage):
GROUP BY node_id
)
SELECT
(ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '"entity_id"'::agtype]))::text AS label
(ag_catalog.agtype_access_operator(VARIADIC ARRAY[v.properties, '"entity_id"'::agtype]))::text AS label
FROM
node_degrees d
JOIN
{self.graph_name}._ag_label_vertex v ON d.node_id = v.id
WHERE
ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '"entity_id"'::agtype]) IS NOT NULL
ag_catalog.agtype_access_operator(VARIADIC ARRAY[v.properties, '"entity_id"'::agtype]) IS NOT NULL
ORDER BY
d.degree DESC,
label ASC
@ -4615,7 +4461,7 @@ class PGGraphStorage(BaseGraphStorage):
drop_query = f"""SELECT * FROM cypher('{self.graph_name}', $$
MATCH (n)
DETACH DELETE n
$$) AS (n agtype)"""
$$) AS (result agtype)"""
await self._query(drop_query, readonly=False)
return {
@ -4639,8 +4485,6 @@ NAMESPACE_TABLE_MAP = {
NameSpace.VECTOR_STORE_ENTITIES: "LIGHTRAG_VDB_ENTITY",
NameSpace.VECTOR_STORE_RELATIONSHIPS: "LIGHTRAG_VDB_RELATION",
NameSpace.DOC_STATUS: "LIGHTRAG_DOC_STATUS",
NameSpace.KV_STORE_TENANTS: "LIGHTRAG_TENANTS",
NameSpace.KV_STORE_KNOWLEDGE_BASES: "LIGHTRAG_KNOWLEDGE_BASES",
}
@ -4651,26 +4495,6 @@ def namespace_to_table_name(namespace: str) -> str:
TABLES = {
"LIGHTRAG_TENANTS": {
"ddl": """CREATE TABLE LIGHTRAG_TENANTS (
id VARCHAR(255),
workspace VARCHAR(255),
data JSONB,
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_TENANTS_PK PRIMARY KEY (workspace, id)
)"""
},
"LIGHTRAG_KNOWLEDGE_BASES": {
"ddl": """CREATE TABLE LIGHTRAG_KNOWLEDGE_BASES (
id VARCHAR(255),
workspace VARCHAR(255),
data JSONB,
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_KNOWLEDGE_BASES_PK PRIMARY KEY (workspace, id)
)"""
},
"LIGHTRAG_DOC_FULL": {
"ddl": """CREATE TABLE LIGHTRAG_DOC_FULL (
id VARCHAR(255),
@ -4853,10 +4677,6 @@ SQL_TEMPLATES = {
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_FULL_RELATIONS WHERE workspace=$1 AND id IN ({ids})
""",
"get_by_id_tenants": """SELECT data FROM LIGHTRAG_TENANTS WHERE workspace=$1 AND id=$2""",
"get_by_id_knowledge_bases": """SELECT data FROM LIGHTRAG_KNOWLEDGE_BASES WHERE workspace=$1 AND id=$2""",
"get_by_ids_tenants": """SELECT data FROM LIGHTRAG_TENANTS WHERE workspace=$1 AND id IN ({ids})""",
"get_by_ids_knowledge_bases": """SELECT data FROM LIGHTRAG_KNOWLEDGE_BASES WHERE workspace=$1 AND id IN ({ids})""",
"filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})",
"upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, doc_name, workspace)
VALUES ($1, $2, $3, $4)
@ -4904,18 +4724,6 @@ SQL_TEMPLATES = {
count=EXCLUDED.count,
update_time = EXCLUDED.update_time
""",
"upsert_tenants": """INSERT INTO LIGHTRAG_TENANTS (workspace, id, data, create_time, update_time)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (workspace,id) DO UPDATE
SET data=EXCLUDED.data,
update_time = EXCLUDED.update_time
""",
"upsert_knowledge_bases": """INSERT INTO LIGHTRAG_KNOWLEDGE_BASES (workspace, id, data, create_time, update_time)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (workspace,id) DO UPDATE
SET data=EXCLUDED.data,
update_time = EXCLUDED.update_time
""",
# SQL for VectorStorage
"upsert_chunk": """INSERT INTO LIGHTRAG_VDB_CHUNKS (workspace, id, tokens,
chunk_order_index, full_doc_id, content, content_vector, file_path,