Fix node ID normalization and improve batch operation consistency
• Remove premature ID normalization • Add lookup mapping for node resolution • Filter results by requested nodes only • Improve error logging with workspace
This commit is contained in:
parent
edf48d7965
commit
155f59759b
1 changed files with 44 additions and 29 deletions
|
|
@ -3568,17 +3568,13 @@ class PGGraphStorage(BaseGraphStorage):
|
||||||
async def get_node(self, node_id: str) -> dict[str, str] | None:
|
async def get_node(self, node_id: str) -> dict[str, str] | None:
|
||||||
"""Get node by its label identifier, return only node properties"""
|
"""Get node by its label identifier, return only node properties"""
|
||||||
|
|
||||||
label = self._normalize_node_id(node_id)
|
result = await self.get_nodes_batch(node_ids=[node_id])
|
||||||
|
|
||||||
result = await self.get_nodes_batch(node_ids=[label])
|
|
||||||
if result and node_id in result:
|
if result and node_id in result:
|
||||||
return result[node_id]
|
return result[node_id]
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def node_degree(self, node_id: str) -> int:
|
async def node_degree(self, node_id: str) -> int:
|
||||||
label = self._normalize_node_id(node_id)
|
result = await self.node_degrees_batch(node_ids=[node_id])
|
||||||
|
|
||||||
result = await self.node_degrees_batch(node_ids=[label])
|
|
||||||
if result and node_id in result:
|
if result and node_id in result:
|
||||||
return result[node_id]
|
return result[node_id]
|
||||||
|
|
||||||
|
|
@ -3591,12 +3587,11 @@ class PGGraphStorage(BaseGraphStorage):
|
||||||
self, source_node_id: str, target_node_id: str
|
self, source_node_id: str, target_node_id: str
|
||||||
) -> dict[str, str] | None:
|
) -> dict[str, str] | None:
|
||||||
"""Get edge properties between two nodes"""
|
"""Get edge properties between two nodes"""
|
||||||
src_label = self._normalize_node_id(source_node_id)
|
result = await self.get_edges_batch(
|
||||||
tgt_label = self._normalize_node_id(target_node_id)
|
[{"src": source_node_id, "tgt": target_node_id}]
|
||||||
|
)
|
||||||
result = await self.get_edges_batch([{"src": src_label, "tgt": tgt_label}])
|
if result and (source_node_id, target_node_id) in result:
|
||||||
if result and (src_label, tgt_label) in result:
|
return result[(source_node_id, target_node_id)]
|
||||||
return result[(src_label, tgt_label)]
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
|
async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
|
||||||
|
|
@ -3794,13 +3789,17 @@ class PGGraphStorage(BaseGraphStorage):
|
||||||
if not node_ids:
|
if not node_ids:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
seen = set()
|
seen: set[str] = set()
|
||||||
unique_ids = []
|
unique_ids: list[str] = []
|
||||||
|
lookup: dict[str, str] = {}
|
||||||
|
requested: set[str] = set()
|
||||||
for nid in node_ids:
|
for nid in node_ids:
|
||||||
nid_norm = self._normalize_node_id(nid)
|
if nid not in seen:
|
||||||
if nid_norm not in seen:
|
seen.add(nid)
|
||||||
seen.add(nid_norm)
|
unique_ids.append(nid)
|
||||||
unique_ids.append(nid_norm)
|
requested.add(nid)
|
||||||
|
lookup[nid] = nid
|
||||||
|
lookup[self._normalize_node_id(nid)] = nid
|
||||||
|
|
||||||
# Build result dictionary
|
# Build result dictionary
|
||||||
nodes_dict = {}
|
nodes_dict = {}
|
||||||
|
|
@ -3839,10 +3838,16 @@ class PGGraphStorage(BaseGraphStorage):
|
||||||
node_dict = json.loads(node_dict)
|
node_dict = json.loads(node_dict)
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Failed to parse node string in batch: {node_dict}"
|
f"[{self.workspace}] Failed to parse node string in batch: {node_dict}"
|
||||||
)
|
)
|
||||||
|
|
||||||
nodes_dict[result["node_id"]] = node_dict
|
node_key = result["node_id"]
|
||||||
|
original_key = lookup.get(node_key)
|
||||||
|
if original_key is None:
|
||||||
|
logger.warning(f"[{self.workspace}] Node {node_key} not found in lookup map")
|
||||||
|
original_key = node_key
|
||||||
|
if original_key in requested:
|
||||||
|
nodes_dict[original_key] = node_dict
|
||||||
|
|
||||||
return nodes_dict
|
return nodes_dict
|
||||||
|
|
||||||
|
|
@ -3865,13 +3870,17 @@ class PGGraphStorage(BaseGraphStorage):
|
||||||
if not node_ids:
|
if not node_ids:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
seen = set()
|
seen: set[str] = set()
|
||||||
unique_ids: list[str] = []
|
unique_ids: list[str] = []
|
||||||
|
lookup: dict[str, str] = {}
|
||||||
|
requested: set[str] = set()
|
||||||
for nid in node_ids:
|
for nid in node_ids:
|
||||||
n = self._normalize_node_id(nid)
|
if nid not in seen:
|
||||||
if n not in seen:
|
seen.add(nid)
|
||||||
seen.add(n)
|
unique_ids.append(nid)
|
||||||
unique_ids.append(n)
|
requested.add(nid)
|
||||||
|
lookup[nid] = nid
|
||||||
|
lookup[self._normalize_node_id(nid)] = nid
|
||||||
|
|
||||||
out_degrees = {}
|
out_degrees = {}
|
||||||
in_degrees = {}
|
in_degrees = {}
|
||||||
|
|
@ -3923,8 +3932,14 @@ class PGGraphStorage(BaseGraphStorage):
|
||||||
node_id = row["node_id"]
|
node_id = row["node_id"]
|
||||||
if not node_id:
|
if not node_id:
|
||||||
continue
|
continue
|
||||||
out_degrees[node_id] = int(row.get("out_degree", 0) or 0)
|
node_key = node_id
|
||||||
in_degrees[node_id] = int(row.get("in_degree", 0) or 0)
|
original_key = lookup.get(node_key)
|
||||||
|
if original_key is None:
|
||||||
|
logger.warning(f"[{self.workspace}] Node {node_key} not found in lookup map")
|
||||||
|
original_key = node_key
|
||||||
|
if original_key in requested:
|
||||||
|
out_degrees[original_key] = int(row.get("out_degree", 0) or 0)
|
||||||
|
in_degrees[original_key] = int(row.get("in_degree", 0) or 0)
|
||||||
|
|
||||||
degrees_dict = {}
|
degrees_dict = {}
|
||||||
for node_id in node_ids:
|
for node_id in node_ids:
|
||||||
|
|
@ -4053,7 +4068,7 @@ class PGGraphStorage(BaseGraphStorage):
|
||||||
edge_props = json.loads(edge_props)
|
edge_props = json.loads(edge_props)
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Failed to parse edge properties string: {edge_props}"
|
f"[{self.workspace}]Failed to parse edge properties string: {edge_props}"
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
@ -4069,7 +4084,7 @@ class PGGraphStorage(BaseGraphStorage):
|
||||||
edge_props = json.loads(edge_props)
|
edge_props = json.loads(edge_props)
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Failed to parse edge properties string: {edge_props}"
|
f"[{self.workspace}] Failed to parse edge properties string: {edge_props}"
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue