fix: PostgreSQL read methods and delete_entity_relation bugs

Why this change is needed:
After implementing model isolation, two critical bugs were discovered that would cause data access failures:

Bug 1: In delete_entity_relation(), the SQL query uses positional parameters
($1, $2) but the parameter dict was not converted to a list of values before
passing to db.execute(). This caused parameter binding failures when trying to
delete entity relations.

Bug 2: Four read methods (get_by_id, get_by_ids, get_vectors_by_ids, drop)
were still using namespace_to_table_name(self.namespace) to get legacy table
names instead of self.table_name with model suffix. This meant these methods
would query the wrong table (legacy without suffix) while data was being
inserted into the new table (with suffix), causing data not found errors.

How it solves it:
- Bug 1: Convert parameter dict to list using list(params.values()) before
  passing to db.execute(), matching the pattern used in other methods
- Bug 2: Replace all namespace_to_table_name(self.namespace) calls with
  self.table_name in the four affected methods, ensuring they query the
  correct model-specific table

Impact:
- delete_entity_relation now correctly deletes relations by entity name
- All read operations now correctly query model-specific tables
- Data written with model isolation can now be properly retrieved
- Maintains consistency with write operations using self.table_name

Testing:
- All 6 PostgreSQL migration tests pass (test_postgres_migration.py)
- All 6 Qdrant migration tests pass (test_qdrant_migration.py)
- Verified parameter binding works correctly
- Verified read methods access correct tables
This commit is contained in:
BukeLy 2025-11-19 23:01:01 +08:00
parent ad68624d02
commit 7dc1f83efb

View file

@ -2604,9 +2604,8 @@ class PGVectorStorage(BaseVectorStorage):
delete_sql = f"""DELETE FROM {self.table_name}
WHERE workspace=$1 AND (source_id=$2 OR target_id=$2)"""
await self.db.execute(
delete_sql, {"workspace": self.workspace, "entity_name": entity_name}
)
params = {"workspace": self.workspace, "entity_name": entity_name}
await self.db.execute(delete_sql, list(params.values()))
logger.debug(
f"[{self.workspace}] Successfully deleted relations for entity {entity_name}"
)
@ -2624,14 +2623,7 @@ class PGVectorStorage(BaseVectorStorage):
Returns:
The vector data if found, or None if not found
"""
table_name = namespace_to_table_name(self.namespace)
if not table_name:
logger.error(
f"[{self.workspace}] Unknown namespace for ID lookup: {self.namespace}"
)
return None
query = f"SELECT *, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM {table_name} WHERE workspace=$1 AND id=$2"
query = f"SELECT *, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM {self.table_name} WHERE workspace=$1 AND id=$2"
params = {"workspace": self.workspace, "id": id}
try:
@ -2657,15 +2649,8 @@ class PGVectorStorage(BaseVectorStorage):
if not ids:
return []
table_name = namespace_to_table_name(self.namespace)
if not table_name:
logger.error(
f"[{self.workspace}] Unknown namespace for IDs lookup: {self.namespace}"
)
return []
ids_str = ",".join([f"'{id}'" for id in ids])
query = f"SELECT *, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM {table_name} WHERE workspace=$1 AND id IN ({ids_str})"
query = f"SELECT *, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM {self.table_name} WHERE workspace=$1 AND id IN ({ids_str})"
params = {"workspace": self.workspace}
try:
@ -2706,15 +2691,8 @@ class PGVectorStorage(BaseVectorStorage):
if not ids:
return {}
table_name = namespace_to_table_name(self.namespace)
if not table_name:
logger.error(
f"[{self.workspace}] Unknown namespace for vector lookup: {self.namespace}"
)
return {}
ids_str = ",".join([f"'{id}'" for id in ids])
query = f"SELECT id, content_vector FROM {table_name} WHERE workspace=$1 AND id IN ({ids_str})"
query = f"SELECT id, content_vector FROM {self.table_name} WHERE workspace=$1 AND id IN ({ids_str})"
params = {"workspace": self.workspace}
try:
@ -2743,15 +2721,8 @@ class PGVectorStorage(BaseVectorStorage):
async def drop(self) -> dict[str, str]:
"""Drop the storage"""
try:
table_name = namespace_to_table_name(self.namespace)
if not table_name:
return {
"status": "error",
"message": f"Unknown namespace: {self.namespace}",
}
drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
table_name=table_name
table_name=self.table_name
)
await self.db.execute(drop_sql, {"workspace": self.workspace})
return {"status": "success", "message": "data dropped"}