From 155f59759bdfbee9c714af59fe75443356246a33 Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 6 Nov 2025 20:34:53 +0800 Subject: [PATCH] Fix node ID normalization and improve batch operation consistency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Remove premature ID normalization • Add lookup mapping for node resolution • Filter results by requested nodes only • Improve error logging with workspace --- lightrag/kg/postgres_impl.py | 73 ++++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 29 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 2a7c6158..eab14993 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -3568,17 +3568,13 @@ class PGGraphStorage(BaseGraphStorage): async def get_node(self, node_id: str) -> dict[str, str] | None: """Get node by its label identifier, return only node properties""" - label = self._normalize_node_id(node_id) - - result = await self.get_nodes_batch(node_ids=[label]) + result = await self.get_nodes_batch(node_ids=[node_id]) if result and node_id in result: return result[node_id] return None async def node_degree(self, node_id: str) -> int: - label = self._normalize_node_id(node_id) - - result = await self.node_degrees_batch(node_ids=[label]) + result = await self.node_degrees_batch(node_ids=[node_id]) if result and node_id in result: return result[node_id] @@ -3591,12 +3587,11 @@ class PGGraphStorage(BaseGraphStorage): self, source_node_id: str, target_node_id: str ) -> dict[str, str] | None: """Get edge properties between two nodes""" - src_label = self._normalize_node_id(source_node_id) - tgt_label = self._normalize_node_id(target_node_id) - - result = await self.get_edges_batch([{"src": src_label, "tgt": tgt_label}]) - if result and (src_label, tgt_label) in result: - return result[(src_label, tgt_label)] + result = await self.get_edges_batch( + [{"src": source_node_id, "tgt": target_node_id}] + ) + if result and (source_node_id, target_node_id) in result: + return result[(source_node_id, target_node_id)] return None async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None: @@ -3794,13 +3789,17 @@ class PGGraphStorage(BaseGraphStorage): if not node_ids: return {} - seen = set() - unique_ids = [] + seen: set[str] = set() + unique_ids: list[str] = [] + lookup: dict[str, str] = {} + requested: set[str] = set() for nid in node_ids: - nid_norm = self._normalize_node_id(nid) - if nid_norm not in seen: - seen.add(nid_norm) - unique_ids.append(nid_norm) + if nid not in seen: + seen.add(nid) + unique_ids.append(nid) + requested.add(nid) + lookup[nid] = nid + lookup[self._normalize_node_id(nid)] = nid # Build result dictionary nodes_dict = {} @@ -3839,10 +3838,16 @@ class PGGraphStorage(BaseGraphStorage): node_dict = json.loads(node_dict) except json.JSONDecodeError: logger.warning( - f"Failed to parse node string in batch: {node_dict}" + f"[{self.workspace}] Failed to parse node string in batch: {node_dict}" ) - nodes_dict[result["node_id"]] = node_dict + node_key = result["node_id"] + original_key = lookup.get(node_key) + if original_key is None: + logger.warning(f"[{self.workspace}] Node {node_key} not found in lookup map") + original_key = node_key + if original_key in requested: + nodes_dict[original_key] = node_dict return nodes_dict @@ -3865,13 +3870,17 @@ class PGGraphStorage(BaseGraphStorage): if not node_ids: return {} - seen = set() + seen: set[str] = set() unique_ids: list[str] = [] + lookup: dict[str, str] = {} + requested: set[str] = set() for nid in node_ids: - n = self._normalize_node_id(nid) - if n not in seen: - seen.add(n) - unique_ids.append(n) + if nid not in seen: + seen.add(nid) + unique_ids.append(nid) + requested.add(nid) + lookup[nid] = nid + lookup[self._normalize_node_id(nid)] = nid out_degrees = {} in_degrees = {} @@ -3923,8 +3932,14 @@ class PGGraphStorage(BaseGraphStorage): node_id = row["node_id"] if not node_id: continue - out_degrees[node_id] = int(row.get("out_degree", 0) or 0) - in_degrees[node_id] = int(row.get("in_degree", 0) or 0) + node_key = node_id + original_key = lookup.get(node_key) + if original_key is None: + logger.warning(f"[{self.workspace}] Node {node_key} not found in lookup map") + original_key = node_key + if original_key in requested: + out_degrees[original_key] = int(row.get("out_degree", 0) or 0) + in_degrees[original_key] = int(row.get("in_degree", 0) or 0) degrees_dict = {} for node_id in node_ids: @@ -4053,7 +4068,7 @@ class PGGraphStorage(BaseGraphStorage): edge_props = json.loads(edge_props) except json.JSONDecodeError: logger.warning( - f"Failed to parse edge properties string: {edge_props}" + f"[{self.workspace}]Failed to parse edge properties string: {edge_props}" ) continue @@ -4069,7 +4084,7 @@ class PGGraphStorage(BaseGraphStorage): edge_props = json.loads(edge_props) except json.JSONDecodeError: logger.warning( - f"Failed to parse edge properties string: {edge_props}" + f"[{self.workspace}] Failed to parse edge properties string: {edge_props}" ) continue