diff --git a/env.example b/env.example index 517de68f..8cfce683 100644 --- a/env.example +++ b/env.example @@ -351,10 +351,13 @@ POSTGRES_IVFFLAT_LISTS=100 # POSTGRES_SSL_ROOT_CERT=/path/to/ca-cert.pem # POSTGRES_SSL_CRL=/path/to/crl.pem -### PostgreSQL Server Options (for Supabase Supavisor) +### PostgreSQL Server Settings (for Supabase Supavisor) # Use this to pass extra options to the PostgreSQL connection string. # For Supabase, you might need to set it like this: -# POSTGRES_SERVER_OPTIONS="options=reference%3D[project-ref]" +# POSTGRES_SERVER_SETTINGS="options=reference%3D[project-ref]" + +# Default is 100 set to 0 to disable +# POSTGRES_STATEMENT_CACHE_SIZE=100 ### Neo4j Configuration NEO4J_URI=neo4j+s://xxxxxxxx.databases.neo4j.io diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 275ecbc6..99a93266 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -10,7 +10,6 @@ import numpy as np import configparser import ssl import itertools -import hashlib from lightrag.types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge @@ -33,7 +32,6 @@ from ..namespace import NameSpace, is_namespace from ..utils import logger from ..constants import GRAPH_FIELD_SEP from ..kg.shared_storage import get_data_init_lock, get_graph_db_lock, get_storage_lock -from ..utils_context import get_current_tenant_id import pipmaster as pm @@ -78,6 +76,7 @@ class PostgreSQLDB: # Server settings self.server_settings = config.get("server_settings") + self.statement_cache_size = int(config.get("statement_cache_size")) if self.user is None or self.password is None or self.database is None: raise ValueError("Missing database user, password, or database") @@ -163,8 +162,13 @@ class PostgreSQLDB: "port": self.port, "min_size": 1, "max_size": self.max, + "statement_cache_size": self.statement_cache_size, } + logger.info( + f"PostgreSQL, statement LRU cache size set as: {self.statement_cache_size}" + ) + # Add SSL configuration if provided ssl_context = self._create_ssl_context() if ssl_context is not None: @@ -237,40 +241,23 @@ class PostgreSQLDB: """Set the Apache AGE environment and creates a graph if it does not exist. This method: - - Loads the AGE extension into the current session (required for Cypher functions). - Sets the PostgreSQL `search_path` to include `ag_catalog`, ensuring that Apache AGE functions can be used without specifying the schema. - Attempts to create a new graph with the provided `graph_name` if it does not already exist. - Silently ignores errors related to the graph already existing. """ try: - # Load AGE extension - required for Cypher functions to work - await connection.execute("LOAD 'age'") # type: ignore await connection.execute( # type: ignore 'SET search_path = ag_catalog, "$user", public' ) - - # Check if graph exists first to avoid error logs - exists = await connection.fetchval( - "SELECT count(*) FROM ag_catalog.ag_graph WHERE name = $1", graph_name + await connection.execute( # type: ignore + f"select create_graph('{graph_name}')" ) - - if exists == 0: - await connection.execute( # type: ignore - f"select create_graph('{graph_name}')" - ) except ( asyncpg.exceptions.InvalidSchemaNameError, asyncpg.exceptions.UniqueViolationError, - asyncpg.exceptions.DuplicateObjectError, # Graph already exists ): pass - except Exception as e: - # Handle "already exists" error message for AGE graphs - if "already exists" in str(e): - pass - else: - raise async def _migrate_llm_cache_schema(self): """Migrate LLM cache schema: add new columns and remove deprecated mode field""" @@ -1089,7 +1076,9 @@ class PostgreSQLDB: try: # Create index for id column index_name = f"idx_{table_name.lower()}_id" - create_index_sql = f"CREATE INDEX {index_name} ON {table_name}(id)" + create_index_sql = ( + f"CREATE INDEX {index_name} ON {table_name}(id)" + ) await self.execute(create_index_sql) logger.info(f"Created index {index_name} on table {table_name}") @@ -1145,11 +1134,6 @@ class PostgreSQLDB: "sql": "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_lightrag_doc_status_workspace_file_path ON LIGHTRAG_DOC_STATUS (workspace, file_path)", "description": "Index for workspace + file_path sorting", }, - { - "name": "idx_lightrag_doc_status_workspace_external_id", - "sql": "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_lightrag_doc_status_workspace_external_id ON LIGHTRAG_DOC_STATUS (workspace, (metadata->>'external_id')) WHERE metadata->>'external_id' IS NOT NULL", - "description": "Index for workspace + external_id for idempotency lookups", - }, ] for index in indexes: @@ -1265,11 +1249,6 @@ class PostgreSQLDB: graph_name: str | None = None, ) -> dict[str, Any] | None | list[dict[str, Any]]: async with self.pool.acquire() as connection: # type: ignore - # Set tenant context if available - tenant_id = get_current_tenant_id() - if tenant_id: - await connection.execute(f"SET app.current_tenant = '{tenant_id}'") - if with_age and graph_name: await self.configure_age(connection, graph_name) # type: ignore elif with_age and not graph_name: @@ -1277,15 +1256,24 @@ class PostgreSQLDB: try: if params: - if multirows: - return await connection.fetch(sql, *params) - else: - return await connection.fetchrow(sql, *params) + rows = await connection.fetch(sql, *params) else: - if multirows: - return await connection.fetch(sql) + rows = await connection.fetch(sql) + + if multirows: + if rows: + columns = [col for col in rows[0].keys()] + data = [dict(zip(columns, row)) for row in rows] else: - return await connection.fetchrow(sql) + data = [] + else: + if rows: + columns = rows[0].keys() + data = dict(zip(columns, rows[0])) + else: + data = None + + return data except Exception as e: logger.error(f"PostgreSQL database, error:{e}") raise @@ -1301,11 +1289,6 @@ class PostgreSQLDB: ): try: async with self.pool.acquire() as connection: # type: ignore - # Set tenant context if available - tenant_id = get_current_tenant_id() - if tenant_id: - await connection.execute(f"SET app.current_tenant = '{tenant_id}'") - if with_age and graph_name: await self.configure_age(connection, graph_name) elif with_age and not graph_name: @@ -1320,7 +1303,6 @@ class PostgreSQLDB: asyncpg.exceptions.DuplicateTableError, asyncpg.exceptions.DuplicateObjectError, # Catch "already exists" error asyncpg.exceptions.InvalidSchemaNameError, # Also catch for AGE extension "already exists" - # asyncpg.exceptions.UndefinedTableError, # Catch "relation does not exist" for index creation ) as e: if ignore_if_exists: # If the flag is set, just ignore these specific errors @@ -1415,9 +1397,13 @@ class ClientManager: ), # Server settings for Supabase "server_settings": os.environ.get( - "POSTGRES_SERVER_OPTIONS", + "POSTGRES_SERVER_SETTINGS", config.get("postgres", "server_options", fallback=None), ), + "statement_cache_size": os.environ.get( + "POSTGRES_STATEMENT_CACHE_SIZE", + config.get("postgres", "statement_cache_size", fallback=None), + ), } @classmethod @@ -1471,9 +1457,6 @@ class PGKVStorage(BaseKVStorage): # Use "default" for compatibility (lowest priority) self.workspace = "default" - # Apply multi-tenant isolation - self.workspace = self._get_composite_workspace() - async def finalize(self): async with get_storage_lock(): if self.db is not None: @@ -1524,7 +1507,6 @@ class PGKVStorage(BaseKVStorage): if is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS): processed_results = {} for row in results: - row = dict(row) llm_cache_list = row.get("llm_cache_list", []) if isinstance(llm_cache_list, str): try: @@ -1545,7 +1527,6 @@ class PGKVStorage(BaseKVStorage): if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES): processed_results = {} for row in results: - row = dict(row) entity_names = row.get("entity_names", []) if isinstance(entity_names, str): try: @@ -1566,7 +1547,6 @@ class PGKVStorage(BaseKVStorage): if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS): processed_results = {} for row in results: - row = dict(row) relation_pairs = row.get("relation_pairs", []) if isinstance(relation_pairs, str): try: @@ -1596,9 +1576,6 @@ class PGKVStorage(BaseKVStorage): sql = SQL_TEMPLATES["get_by_id_" + self.namespace] params = {"workspace": self.workspace, "id": id} response = await self.db.query(sql, list(params.values())) - - if response: - response = dict(response) if response and is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS): # Parse llm_cache_list JSON string back to list @@ -1679,9 +1656,6 @@ class PGKVStorage(BaseKVStorage): ) params = {"workspace": self.workspace} results = await self.db.query(sql, list(params.values()), multirows=True) - - if results: - results = [dict(r) for r in results] if results and is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS): # Parse llm_cache_list JSON string back to list for each result @@ -1862,28 +1836,6 @@ class PGKVStorage(BaseKVStorage): "update_time": current_time, } await self.db.execute(upsert_sql, _data) - elif is_namespace(self.namespace, NameSpace.KV_STORE_TENANTS): - for k, v in data.items(): - upsert_sql = SQL_TEMPLATES["upsert_tenants"] - _data = { - "workspace": self.workspace, - "id": k, - "data": json.dumps(v), - "create_time": v.get("create_time"), - "update_time": v.get("update_time"), - } - await self.db.execute(upsert_sql, _data) - elif is_namespace(self.namespace, NameSpace.KV_STORE_KNOWLEDGE_BASES): - for k, v in data.items(): - upsert_sql = SQL_TEMPLATES["upsert_knowledge_bases"] - _data = { - "workspace": self.workspace, - "id": k, - "data": json.dumps(v), - "create_time": v.get("create_time"), - "update_time": v.get("update_time"), - } - await self.db.execute(upsert_sql, _data) async def index_done_callback(self) -> None: # PG handles persistence automatically @@ -1971,9 +1923,6 @@ class PGVectorStorage(BaseVectorStorage): # Use "default" for compatibility (lowest priority) self.workspace = "default" - # Apply multi-tenant isolation - self.workspace = self._get_composite_workspace() - async def finalize(self): async with get_storage_lock(): if self.db is not None: @@ -2521,62 +2470,6 @@ class PGDocStatusStorage(DocStatusStorage): track_id=result[0].get("track_id"), ) - async def get_doc_by_external_id(self, external_id: str) -> Union[dict[str, Any], None]: - """Get document by external_id for idempotency support. - - Uses indexed lookup on metadata->>'external_id' for efficient retrieval. - - Args: - external_id: The external unique identifier to search for - - Returns: - Union[dict[str, Any], None]: Document data if found, None otherwise - """ - sql = """ - SELECT * FROM LIGHTRAG_DOC_STATUS - WHERE workspace=$1 AND metadata->>'external_id' = $2 - """ - params = {"workspace": self.workspace, "external_id": external_id} - result = await self.db.query(sql, list(params.values()), True) - - if result is None or result == []: - return None - else: - # Parse chunks_list JSON string back to list - chunks_list = result[0].get("chunks_list", []) - if isinstance(chunks_list, str): - try: - chunks_list = json.loads(chunks_list) - except json.JSONDecodeError: - chunks_list = [] - - # Parse metadata JSON string back to dict - metadata = result[0].get("metadata", {}) - if isinstance(metadata, str): - try: - metadata = json.loads(metadata) - except json.JSONDecodeError: - metadata = {} - - # Convert datetime objects to ISO format strings with timezone info - created_at = self._format_datetime_with_timezone(result[0]["created_at"]) - updated_at = self._format_datetime_with_timezone(result[0]["updated_at"]) - - return dict( - id=result[0]["id"], - content_length=result[0]["content_length"], - content_summary=result[0]["content_summary"], - status=result[0]["status"], - chunks_count=result[0]["chunks_count"], - created_at=created_at, - updated_at=updated_at, - file_path=result[0]["file_path"], - chunks_list=chunks_list, - metadata=metadata, - error_msg=result[0].get("error_msg"), - track_id=result[0].get("track_id"), - ) - async def get_status_counts(self) -> dict[str, int]: """Get counts of documents in each status""" sql = """SELECT status as "status", COUNT(1) as "count" @@ -2700,7 +2593,7 @@ class PGDocStatusStorage(DocStatusStorage): async def get_docs_paginated( self, - status_filter: DocStatus | None, + status_filter: DocStatus | None = None, page: int = 1, page_size: int = 50, sort_field: str = "updated_at", @@ -3013,19 +2906,7 @@ class PGGraphStorage(BaseGraphStorage): # Ensure names comply with PostgreSQL identifier specifications safe_workspace = re.sub(r"[^a-zA-Z0-9_]", "_", workspace.strip()) safe_namespace = re.sub(r"[^a-zA-Z0-9_]", "_", namespace) - graph_name = f"{safe_workspace}_{safe_namespace}" - - # Ensure graph name starts with a letter (AGE requirement) - if not graph_name[0].isalpha(): - graph_name = f"g_{graph_name}" - - # PostgreSQL identifier limit is 63 bytes - if len(graph_name) > 63: - # Use MD5 hash to ensure uniqueness and fit within limit - hash_object = hashlib.md5(graph_name.encode()) - graph_name = f"g_{hash_object.hexdigest()}" - - return graph_name + return f"{safe_workspace}_{safe_namespace}" else: # When the workspace is "default", use the namespace directly (for backward compatibility with legacy implementations) return re.sub(r"[^a-zA-Z0-9_]", "_", namespace) @@ -3108,35 +2989,6 @@ class PGGraphStorage(BaseGraphStorage): graph_name=self.graph_name, ) - # Verify that essential labels exist by checking the ag_label catalog - # This helps catch cases where label creation silently failed - try: - async with self.db.pool.acquire() as connection: - await connection.execute("LOAD 'age'") # Required for AGE functions - await connection.execute('SET search_path = ag_catalog, "$user", public') - # Check if 'base' label exists for this graph - result = await connection.fetchrow( - """ - SELECT l.name - FROM ag_catalog.ag_label l - JOIN ag_catalog.ag_graph g ON l.graph = g.graphid - WHERE l.name = 'base' AND g.name = $1 - """, - self.graph_name - ) - if result is None: - logger.warning( - f"[{self.workspace}] 'base' vlabel not found for graph '{self.graph_name}', attempting to create..." - ) - # Retry creating the vlabel - await connection.execute( - f"SELECT create_vlabel('{self.graph_name}', 'base')" - ) - logger.info(f"[{self.workspace}] Successfully created 'base' vlabel for graph '{self.graph_name}'") - except Exception as e: - if "already exists" not in str(e): - logger.error(f"[{self.workspace}] Failed to verify/create 'base' vlabel: {e}") - async def finalize(self): async with get_graph_db_lock(): if self.db is not None: @@ -3303,7 +3155,6 @@ class PGGraphStorage(BaseGraphStorage): Returns: list[dict[str, Any]]: a list of dictionaries containing the result set """ - logger.info(f"[{self.workspace}] Executing query: {query}") try: if readonly: data = await self.db.query( @@ -3494,6 +3345,7 @@ class PGGraphStorage(BaseGraphStorage): raise @retry( + stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((PGGraphQueryException,)), ) @@ -3749,10 +3601,8 @@ class PGGraphStorage(BaseGraphStorage): node_id = row["node_id"] if not node_id: continue - out_degree = out_degrees.get(node_id, 0) - in_degree = in_degrees.get(node_id, 0) - out_degrees[node_id] = out_degree + int(row.get("out_degree", 0) or 0) - in_degrees[node_id] = in_degree + int(row.get("in_degree", 0) or 0) + out_degrees[node_id] = int(row.get("out_degree", 0) or 0) + in_degrees[node_id] = int(row.get("in_degree", 0) or 0) degrees_dict = {} for node_id in node_ids: @@ -3856,13 +3706,15 @@ class PGGraphStorage(BaseGraphStorage): SELECT * FROM cypher({dollar_quote(self.graph_name)}::name, {dollar_quote(forward_cypher)}::cstring, $1::agtype) - AS (source text, target text, edge_properties agtype)""" + AS (source text, target text, edge_properties agtype) + """ sql_bwd = f""" SELECT * FROM cypher({dollar_quote(self.graph_name)}::name, {dollar_quote(backward_cypher)}::cstring, $1::agtype) - AS (source text, target text, edge_properties agtype)""" + AS (source text, target text, edge_properties agtype) + """ pg_params = {"params": json.dumps({"pairs": pairs}, ensure_ascii=False)} @@ -4034,8 +3886,7 @@ class PGGraphStorage(BaseGraphStorage): f"[{self.workspace}] Failed to parse node string in batch: {node_dict}" ) - # Add node id (entity_id) to the dictionary for easier access - node_dict["id"] = node_dict.get("entity_id") + node_dict["id"] = node_dict["entity_id"] nodes.append(node_dict) return nodes @@ -4069,7 +3920,6 @@ class PGGraphStorage(BaseGraphStorage): logger.warning( f"[{self.workspace}] Failed to parse edge string in batch: {edge_agtype}" ) - edge_agtype = {} source_agtype = item["source"]["properties"] # Process string result, parse it to JSON dictionary @@ -4374,10 +4224,6 @@ class PGGraphStorage(BaseGraphStorage): $$) AS (a AGTYPE, r AGTYPE, b AGTYPE)""" results = await self._query(query) - logger.info(f"[{self.workspace}] Query results count: {len(results)}") - if results: - logger.info(f"[{self.workspace}] First result sample: {results[0]}") - # Process query results, deduplicate nodes and edges nodes_dict = {} edges_dict = {} @@ -4519,13 +4365,13 @@ class PGGraphStorage(BaseGraphStorage): GROUP BY node_id ) SELECT - (ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '"entity_id"'::agtype]))::text AS label + (ag_catalog.agtype_access_operator(VARIADIC ARRAY[v.properties, '"entity_id"'::agtype]))::text AS label FROM node_degrees d JOIN {self.graph_name}._ag_label_vertex v ON d.node_id = v.id WHERE - ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '"entity_id"'::agtype]) IS NOT NULL + ag_catalog.agtype_access_operator(VARIADIC ARRAY[v.properties, '"entity_id"'::agtype]) IS NOT NULL ORDER BY d.degree DESC, label ASC @@ -4615,7 +4461,7 @@ class PGGraphStorage(BaseGraphStorage): drop_query = f"""SELECT * FROM cypher('{self.graph_name}', $$ MATCH (n) DETACH DELETE n - $$) AS (n agtype)""" + $$) AS (result agtype)""" await self._query(drop_query, readonly=False) return { @@ -4639,8 +4485,6 @@ NAMESPACE_TABLE_MAP = { NameSpace.VECTOR_STORE_ENTITIES: "LIGHTRAG_VDB_ENTITY", NameSpace.VECTOR_STORE_RELATIONSHIPS: "LIGHTRAG_VDB_RELATION", NameSpace.DOC_STATUS: "LIGHTRAG_DOC_STATUS", - NameSpace.KV_STORE_TENANTS: "LIGHTRAG_TENANTS", - NameSpace.KV_STORE_KNOWLEDGE_BASES: "LIGHTRAG_KNOWLEDGE_BASES", } @@ -4651,26 +4495,6 @@ def namespace_to_table_name(namespace: str) -> str: TABLES = { - "LIGHTRAG_TENANTS": { - "ddl": """CREATE TABLE LIGHTRAG_TENANTS ( - id VARCHAR(255), - workspace VARCHAR(255), - data JSONB, - create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, - update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, - CONSTRAINT LIGHTRAG_TENANTS_PK PRIMARY KEY (workspace, id) - )""" - }, - "LIGHTRAG_KNOWLEDGE_BASES": { - "ddl": """CREATE TABLE LIGHTRAG_KNOWLEDGE_BASES ( - id VARCHAR(255), - workspace VARCHAR(255), - data JSONB, - create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, - update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, - CONSTRAINT LIGHTRAG_KNOWLEDGE_BASES_PK PRIMARY KEY (workspace, id) - )""" - }, "LIGHTRAG_DOC_FULL": { "ddl": """CREATE TABLE LIGHTRAG_DOC_FULL ( id VARCHAR(255), @@ -4853,10 +4677,6 @@ SQL_TEMPLATES = { EXTRACT(EPOCH FROM update_time)::BIGINT as update_time FROM LIGHTRAG_FULL_RELATIONS WHERE workspace=$1 AND id IN ({ids}) """, - "get_by_id_tenants": """SELECT data FROM LIGHTRAG_TENANTS WHERE workspace=$1 AND id=$2""", - "get_by_id_knowledge_bases": """SELECT data FROM LIGHTRAG_KNOWLEDGE_BASES WHERE workspace=$1 AND id=$2""", - "get_by_ids_tenants": """SELECT data FROM LIGHTRAG_TENANTS WHERE workspace=$1 AND id IN ({ids})""", - "get_by_ids_knowledge_bases": """SELECT data FROM LIGHTRAG_KNOWLEDGE_BASES WHERE workspace=$1 AND id IN ({ids})""", "filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})", "upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, doc_name, workspace) VALUES ($1, $2, $3, $4) @@ -4904,18 +4724,6 @@ SQL_TEMPLATES = { count=EXCLUDED.count, update_time = EXCLUDED.update_time """, - "upsert_tenants": """INSERT INTO LIGHTRAG_TENANTS (workspace, id, data, create_time, update_time) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (workspace,id) DO UPDATE - SET data=EXCLUDED.data, - update_time = EXCLUDED.update_time - """, - "upsert_knowledge_bases": """INSERT INTO LIGHTRAG_KNOWLEDGE_BASES (workspace, id, data, create_time, update_time) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (workspace,id) DO UPDATE - SET data=EXCLUDED.data, - update_time = EXCLUDED.update_time - """, # SQL for VectorStorage "upsert_chunk": """INSERT INTO LIGHTRAG_VDB_CHUNKS (workspace, id, tokens, chunk_order_index, full_doc_id, content, content_vector, file_path,