Add efficient vector retrieval by IDs to PGVectorStorage

This commit is contained in:
yangdx 2025-08-15 16:51:41 +08:00
parent 8f7031b882
commit 7a7385a200

View file

@ -2158,6 +2158,53 @@ class PGVectorStorage(BaseVectorStorage):
)
return []
async def get_vectors_by_ids(self, ids: list[str]) -> dict[str, list[float]]:
"""Get vectors by their IDs, returning only ID and vector data for efficiency
Args:
ids: List of unique identifiers
Returns:
Dictionary mapping IDs to their vector embeddings
Format: {id: [vector_values], ...}
"""
if not ids:
return {}
table_name = namespace_to_table_name(self.namespace)
if not table_name:
logger.error(
f"[{self.workspace}] Unknown namespace for vector lookup: {self.namespace}"
)
return {}
ids_str = ",".join([f"'{id}'" for id in ids])
query = f"SELECT id, content_vector FROM {table_name} WHERE workspace=$1 AND id IN ({ids_str})"
params = {"workspace": self.workspace}
try:
results = await self.db.query(query, params, multirows=True)
vectors_dict = {}
for result in results:
if result and "content_vector" in result and "id" in result:
try:
# Parse JSON string to get vector as list of floats
vector_data = json.loads(result["content_vector"])
if isinstance(vector_data, list):
vectors_dict[result["id"]] = vector_data
except (json.JSONDecodeError, TypeError) as e:
logger.warning(
f"[{self.workspace}] Failed to parse vector data for ID {result['id']}: {e}"
)
return vectors_dict
except Exception as e:
logger.error(
f"[{self.workspace}] Error retrieving vectors by IDs from {self.namespace}: {e}"
)
return {}
async def drop(self) -> dict[str, str]:
"""Drop the storage"""
async with get_storage_lock():