perf (metadata): optimized metadata query with gin indexes for postgres
This commit is contained in:
parent
f4c2823c82
commit
a57d4ec0cc
1 changed files with 33 additions and 70 deletions
|
|
@ -2104,75 +2104,42 @@ class PGVectorStorage(BaseVectorStorage):
|
|||
############# Metadata building function #################
|
||||
@staticmethod
|
||||
def build_metadata_filter_clause(metadata_filter):
|
||||
def escape_str(val: str) -> str:
|
||||
return str(val).replace("'", "''") # escape single quotes
|
||||
|
||||
if not metadata_filter:
|
||||
return ""
|
||||
|
||||
def build_single_condition(key, value):
|
||||
if isinstance(value, list):
|
||||
# If list contains only scalars, use IN clause
|
||||
if all(isinstance(v, (str, int, float, bool)) for v in value):
|
||||
escaped_values = ", ".join(f"'{escape_str(str(v))}'" for v in value)
|
||||
return f"metadata->>'{key}' IN ({escaped_values})"
|
||||
else:
|
||||
# fallback to JSON matching for complex types
|
||||
json_value = json.dumps(value).replace("'", "''")
|
||||
return f"metadata->'{key}' = '{json_value}'::jsonb"
|
||||
elif isinstance(value, dict):
|
||||
json_value = json.dumps(value).replace("'", "''")
|
||||
return f"metadata->'{key}' = '{json_value}'::jsonb"
|
||||
elif isinstance(value, (int, float, bool)):
|
||||
return f"metadata->'{key}' = '{json.dumps(value)}'::jsonb"
|
||||
else: # string
|
||||
return f"metadata->>'{key}' = '{escape_str(value)}'"
|
||||
if isinstance(value, (list, dict)):
|
||||
# Use GIN-optimized containment operator
|
||||
json_value = json.dumps(value)
|
||||
return f"metadata @> '{{\"{ key}\" : {json_value}}}'"
|
||||
else:
|
||||
# Use containment for scalars too - faster with GIN index
|
||||
json_value = json.dumps(value)
|
||||
return f"metadata @> '{{\"{ key}\" : {json_value}}}'"
|
||||
|
||||
def build_conditions(filter_dict):
|
||||
conditions = []
|
||||
for key, value in filter_dict.items():
|
||||
if isinstance(value, (list, tuple)):
|
||||
conds = [build_single_condition(key, v) for v in value]
|
||||
conditions.append("(" + " OR ".join(conds) + ")")
|
||||
else:
|
||||
conditions.append(build_single_condition(key, value))
|
||||
return conditions
|
||||
|
||||
def recurse(filter_obj):
|
||||
if isinstance(filter_obj, dict):
|
||||
return build_conditions(filter_obj)
|
||||
|
||||
if isinstance(filter_obj, MetadataFilter):
|
||||
try:
|
||||
if isinstance(metadata_filter, dict):
|
||||
conditions = [build_single_condition(k, v) for k, v in metadata_filter.items()]
|
||||
return " AND " + " AND ".join(conditions) if conditions else ""
|
||||
elif hasattr(metadata_filter, 'operands'):
|
||||
sub_conditions = []
|
||||
for operand in filter_obj.operands:
|
||||
for operand in metadata_filter.operands:
|
||||
if isinstance(operand, dict):
|
||||
sub_conditions.append(
|
||||
"(" + " AND ".join(build_conditions(operand)) + ")"
|
||||
)
|
||||
elif isinstance(operand, MetadataFilter):
|
||||
nested = recurse(operand)
|
||||
if nested:
|
||||
sub_conditions.append("(" + " AND ".join(nested) + ")")
|
||||
|
||||
if not sub_conditions:
|
||||
return []
|
||||
|
||||
op = filter_obj.operator.upper()
|
||||
if op == "AND":
|
||||
return [" AND ".join(sub_conditions)]
|
||||
elif op == "OR":
|
||||
return [" OR ".join(sub_conditions)]
|
||||
elif op == "NOT":
|
||||
if len(sub_conditions) == 1:
|
||||
return [f"NOT {sub_conditions[0]}"]
|
||||
else:
|
||||
return [f"NOT ({' AND '.join(sub_conditions)})"]
|
||||
|
||||
return []
|
||||
|
||||
conditions = recurse(metadata_filter)
|
||||
clause = ""
|
||||
if conditions:
|
||||
clause = " AND " + " AND ".join(conditions)
|
||||
|
||||
return clause
|
||||
conds = [build_single_condition(k, v) for k, v in operand.items()]
|
||||
if conds:
|
||||
sub_conditions.append("(" + " AND ".join(conds) + ")")
|
||||
|
||||
if sub_conditions:
|
||||
op = getattr(metadata_filter, 'operator', 'AND').upper()
|
||||
connector = " OR " if op == "OR" else " AND "
|
||||
prefix = " AND NOT (" if op == "NOT" else " AND ("
|
||||
return prefix + connector.join(sub_conditions) + ")"
|
||||
return ""
|
||||
except Exception:
|
||||
# Simple fallback
|
||||
if isinstance(metadata_filter, dict):
|
||||
return f" AND metadata @> '{json.dumps(metadata_filter)}'"
|
||||
return ""
|
||||
|
||||
#################### query method ###############
|
||||
async def query(
|
||||
|
|
@ -2191,7 +2158,7 @@ class PGVectorStorage(BaseVectorStorage):
|
|||
embedding = embeddings[0]
|
||||
|
||||
embedding_string = ",".join(map(str, embedding))
|
||||
metadata_filter_clause = self.build_metadata_filter_clause(metadata_filter)
|
||||
metadata_filter_clause = self.build_metadata_filter_clause(metadata_filter) if metadata_filter else ""
|
||||
sql = SQL_TEMPLATES[self.namespace].format(
|
||||
embedding_string=embedding_string,
|
||||
metadata_filter_clause=metadata_filter_clause,
|
||||
|
|
@ -4909,7 +4876,6 @@ SQL_TEMPLATES = {
|
|||
""",
|
||||
"relationships": """
|
||||
WITH filtered_chunks AS (
|
||||
-- Step 1: Select only the chunk IDs that match the metadata filter
|
||||
SELECT
|
||||
c.id
|
||||
FROM
|
||||
|
|
@ -4918,7 +4884,6 @@ SQL_TEMPLATES = {
|
|||
c.workspace = $1
|
||||
{metadata_filter_clause}
|
||||
)
|
||||
-- Step 2 & 3: Join relationships with the filtered chunks and rank by similarity
|
||||
SELECT
|
||||
r.source_id AS src_id,
|
||||
r.target_id AS tgt_id,
|
||||
|
|
@ -4936,7 +4901,6 @@ SQL_TEMPLATES = {
|
|||
""",
|
||||
"entities": """
|
||||
WITH filtered_chunks AS (
|
||||
-- Step 1: Select only the chunk IDs that match the metadata filter
|
||||
SELECT
|
||||
c.id
|
||||
FROM
|
||||
|
|
@ -4945,7 +4909,6 @@ SQL_TEMPLATES = {
|
|||
c.workspace = $1
|
||||
{metadata_filter_clause}
|
||||
)
|
||||
-- Step 2 & 3: Join entities with the filtered chunks and rank by similarity
|
||||
SELECT
|
||||
e.entity_name,
|
||||
EXTRACT(EPOCH FROM e.create_time)::BIGINT AS created_at
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue