From a57d4ec0cc8978e28e78aff0a925b0c030cd9ce3 Mon Sep 17 00:00:00 2001 From: GGrassia Date: Thu, 9 Oct 2025 10:56:34 +0200 Subject: [PATCH] perf (metadata): optimized metadata query with gin indexes for postgres --- lightrag/kg/postgres_impl.py | 103 +++++++++++------------------------ 1 file changed, 33 insertions(+), 70 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index b758aad0..ff4260f4 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -2104,75 +2104,42 @@ class PGVectorStorage(BaseVectorStorage): ############# Metadata building function ################# @staticmethod def build_metadata_filter_clause(metadata_filter): - def escape_str(val: str) -> str: - return str(val).replace("'", "''") # escape single quotes - + if not metadata_filter: + return "" + def build_single_condition(key, value): - if isinstance(value, list): - # If list contains only scalars, use IN clause - if all(isinstance(v, (str, int, float, bool)) for v in value): - escaped_values = ", ".join(f"'{escape_str(str(v))}'" for v in value) - return f"metadata->>'{key}' IN ({escaped_values})" - else: - # fallback to JSON matching for complex types - json_value = json.dumps(value).replace("'", "''") - return f"metadata->'{key}' = '{json_value}'::jsonb" - elif isinstance(value, dict): - json_value = json.dumps(value).replace("'", "''") - return f"metadata->'{key}' = '{json_value}'::jsonb" - elif isinstance(value, (int, float, bool)): - return f"metadata->'{key}' = '{json.dumps(value)}'::jsonb" - else: # string - return f"metadata->>'{key}' = '{escape_str(value)}'" + if isinstance(value, (list, dict)): + # Use GIN-optimized containment operator + json_value = json.dumps(value) + return f"metadata @> '{{\"{ key}\" : {json_value}}}'" + else: + # Use containment for scalars too - faster with GIN index + json_value = json.dumps(value) + return f"metadata @> '{{\"{ key}\" : {json_value}}}'" - def build_conditions(filter_dict): - conditions = [] - for key, value in filter_dict.items(): - if isinstance(value, (list, tuple)): - conds = [build_single_condition(key, v) for v in value] - conditions.append("(" + " OR ".join(conds) + ")") - else: - conditions.append(build_single_condition(key, value)) - return conditions - - def recurse(filter_obj): - if isinstance(filter_obj, dict): - return build_conditions(filter_obj) - - if isinstance(filter_obj, MetadataFilter): + try: + if isinstance(metadata_filter, dict): + conditions = [build_single_condition(k, v) for k, v in metadata_filter.items()] + return " AND " + " AND ".join(conditions) if conditions else "" + elif hasattr(metadata_filter, 'operands'): sub_conditions = [] - for operand in filter_obj.operands: + for operand in metadata_filter.operands: if isinstance(operand, dict): - sub_conditions.append( - "(" + " AND ".join(build_conditions(operand)) + ")" - ) - elif isinstance(operand, MetadataFilter): - nested = recurse(operand) - if nested: - sub_conditions.append("(" + " AND ".join(nested) + ")") - - if not sub_conditions: - return [] - - op = filter_obj.operator.upper() - if op == "AND": - return [" AND ".join(sub_conditions)] - elif op == "OR": - return [" OR ".join(sub_conditions)] - elif op == "NOT": - if len(sub_conditions) == 1: - return [f"NOT {sub_conditions[0]}"] - else: - return [f"NOT ({' AND '.join(sub_conditions)})"] - - return [] - - conditions = recurse(metadata_filter) - clause = "" - if conditions: - clause = " AND " + " AND ".join(conditions) - - return clause + conds = [build_single_condition(k, v) for k, v in operand.items()] + if conds: + sub_conditions.append("(" + " AND ".join(conds) + ")") + + if sub_conditions: + op = getattr(metadata_filter, 'operator', 'AND').upper() + connector = " OR " if op == "OR" else " AND " + prefix = " AND NOT (" if op == "NOT" else " AND (" + return prefix + connector.join(sub_conditions) + ")" + return "" + except Exception: + # Simple fallback + if isinstance(metadata_filter, dict): + return f" AND metadata @> '{json.dumps(metadata_filter)}'" + return "" #################### query method ############### async def query( @@ -2191,7 +2158,7 @@ class PGVectorStorage(BaseVectorStorage): embedding = embeddings[0] embedding_string = ",".join(map(str, embedding)) - metadata_filter_clause = self.build_metadata_filter_clause(metadata_filter) + metadata_filter_clause = self.build_metadata_filter_clause(metadata_filter) if metadata_filter else "" sql = SQL_TEMPLATES[self.namespace].format( embedding_string=embedding_string, metadata_filter_clause=metadata_filter_clause, @@ -4909,7 +4876,6 @@ SQL_TEMPLATES = { """, "relationships": """ WITH filtered_chunks AS ( - -- Step 1: Select only the chunk IDs that match the metadata filter SELECT c.id FROM @@ -4918,7 +4884,6 @@ SQL_TEMPLATES = { c.workspace = $1 {metadata_filter_clause} ) - -- Step 2 & 3: Join relationships with the filtered chunks and rank by similarity SELECT r.source_id AS src_id, r.target_id AS tgt_id, @@ -4936,7 +4901,6 @@ SQL_TEMPLATES = { """, "entities": """ WITH filtered_chunks AS ( - -- Step 1: Select only the chunk IDs that match the metadata filter SELECT c.id FROM @@ -4945,7 +4909,6 @@ SQL_TEMPLATES = { c.workspace = $1 {metadata_filter_clause} ) - -- Step 2 & 3: Join entities with the filtered chunks and rank by similarity SELECT e.entity_name, EXTRACT(EPOCH FROM e.create_time)::BIGINT AS created_at