This commit is contained in:
Raphaël MANSUY 2025-12-04 19:19:01 +08:00
parent c62aab61f0
commit 51170bcb4a

View file

@ -3487,17 +3487,13 @@ class PGGraphStorage(BaseGraphStorage):
async def get_node(self, node_id: str) -> dict[str, str] | None:
"""Get node by its label identifier, return only node properties"""
label = self._normalize_node_id(node_id)
result = await self.get_nodes_batch(node_ids=[label])
result = await self.get_nodes_batch(node_ids=[node_id])
if result and node_id in result:
return result[node_id]
return None
async def node_degree(self, node_id: str) -> int:
label = self._normalize_node_id(node_id)
result = await self.node_degrees_batch(node_ids=[label])
result = await self.node_degrees_batch(node_ids=[node_id])
if result and node_id in result:
return result[node_id]
@ -3510,12 +3506,11 @@ class PGGraphStorage(BaseGraphStorage):
self, source_node_id: str, target_node_id: str
) -> dict[str, str] | None:
"""Get edge properties between two nodes"""
src_label = self._normalize_node_id(source_node_id)
tgt_label = self._normalize_node_id(target_node_id)
result = await self.get_edges_batch([{"src": src_label, "tgt": tgt_label}])
if result and (src_label, tgt_label) in result:
return result[(src_label, tgt_label)]
result = await self.get_edges_batch(
[{"src": source_node_id, "tgt": target_node_id}]
)
if result and (source_node_id, target_node_id) in result:
return result[(source_node_id, target_node_id)]
return None
async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
@ -3713,13 +3708,17 @@ class PGGraphStorage(BaseGraphStorage):
if not node_ids:
return {}
seen = set()
unique_ids = []
seen: set[str] = set()
unique_ids: list[str] = []
lookup: dict[str, str] = {}
requested: set[str] = set()
for nid in node_ids:
nid_norm = self._normalize_node_id(nid)
if nid_norm not in seen:
seen.add(nid_norm)
unique_ids.append(nid_norm)
if nid not in seen:
seen.add(nid)
unique_ids.append(nid)
requested.add(nid)
lookup[nid] = nid
lookup[self._normalize_node_id(nid)] = nid
# Build result dictionary
nodes_dict = {}
@ -3758,10 +3757,16 @@ class PGGraphStorage(BaseGraphStorage):
node_dict = json.loads(node_dict)
except json.JSONDecodeError:
logger.warning(
f"Failed to parse node string in batch: {node_dict}"
f"[{self.workspace}] Failed to parse node string in batch: {node_dict}"
)
nodes_dict[result["node_id"]] = node_dict
node_key = result["node_id"]
original_key = lookup.get(node_key)
if original_key is None:
logger.warning(f"[{self.workspace}] Node {node_key} not found in lookup map")
original_key = node_key
if original_key in requested:
nodes_dict[original_key] = node_dict
return nodes_dict
@ -3784,13 +3789,17 @@ class PGGraphStorage(BaseGraphStorage):
if not node_ids:
return {}
seen = set()
seen: set[str] = set()
unique_ids: list[str] = []
lookup: dict[str, str] = {}
requested: set[str] = set()
for nid in node_ids:
n = self._normalize_node_id(nid)
if n not in seen:
seen.add(n)
unique_ids.append(n)
if nid not in seen:
seen.add(nid)
unique_ids.append(nid)
requested.add(nid)
lookup[nid] = nid
lookup[self._normalize_node_id(nid)] = nid
out_degrees = {}
in_degrees = {}
@ -3842,8 +3851,14 @@ class PGGraphStorage(BaseGraphStorage):
node_id = row["node_id"]
if not node_id:
continue
out_degrees[node_id] = int(row.get("out_degree", 0) or 0)
in_degrees[node_id] = int(row.get("in_degree", 0) or 0)
node_key = node_id
original_key = lookup.get(node_key)
if original_key is None:
logger.warning(f"[{self.workspace}] Node {node_key} not found in lookup map")
original_key = node_key
if original_key in requested:
out_degrees[original_key] = int(row.get("out_degree", 0) or 0)
in_degrees[original_key] = int(row.get("in_degree", 0) or 0)
degrees_dict = {}
for node_id in node_ids:
@ -3972,7 +3987,7 @@ class PGGraphStorage(BaseGraphStorage):
edge_props = json.loads(edge_props)
except json.JSONDecodeError:
logger.warning(
f"Failed to parse edge properties string: {edge_props}"
f"[{self.workspace}]Failed to parse edge properties string: {edge_props}"
)
continue
@ -3988,7 +4003,7 @@ class PGGraphStorage(BaseGraphStorage):
edge_props = json.loads(edge_props)
except json.JSONDecodeError:
logger.warning(
f"Failed to parse edge properties string: {edge_props}"
f"[{self.workspace}] Failed to parse edge properties string: {edge_props}"
)
continue