From f033fd6f87acdce08b19af595d079c1d09bef725 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 18 Jul 2025 08:50:47 +0800 Subject: [PATCH] fix(postgres): improve AGE agtype parsing and simplify error logging - Fix JSON parsing errors caused by :: characters in data content - Implement precise agtype string parsing using rfind() to separate JSON content from type identifiers - Add robust error handling for malformed JSON in graph data --- lightrag/kg/postgres_impl.py | 106 +++++++++++++++++++++++++---------- 1 file changed, 77 insertions(+), 29 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index f77d6bd0..43825128 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -1905,53 +1905,101 @@ class PGGraphStorage(BaseGraphStorage): the dictionary key is the field name and the value is the value converted to a python type """ + + @staticmethod + def parse_agtype_string(agtype_str: str) -> tuple[str, str]: + """ + Parse agtype string precisely, separating JSON content and type identifier + + Args: + agtype_str: String like '{"json": "content"}::vertex' + + Returns: + (json_content, type_identifier) + """ + if not isinstance(agtype_str, str) or "::" not in agtype_str: + return agtype_str, "" + + # Find the last :: from the right, which is the start of type identifier + last_double_colon = agtype_str.rfind("::") + + if last_double_colon == -1: + return agtype_str, "" + + # Separate JSON content and type identifier + json_content = agtype_str[:last_double_colon] + type_identifier = agtype_str[last_double_colon + 2 :] + + return json_content, type_identifier + + @staticmethod + def safe_json_parse(json_str: str, context: str = "") -> dict: + """ + Safe JSON parsing with simplified error logging + """ + try: + return json.loads(json_str) + except json.JSONDecodeError as e: + logger.error(f"JSON parsing failed ({context}): {e}") + logger.error(f"Raw data (first 100 chars): {repr(json_str[:100])}") + logger.error(f"Error position: line {e.lineno}, column {e.colno}") + return None + # result holder d = {} # prebuild a mapping of vertex_id to vertex mappings to be used # later to build edges vertices = {} + + # First pass: preprocess vertices for k in record.keys(): v = record[k] - # agtype comes back '{key: value}::type' which must be parsed if isinstance(v, str) and "::" in v: if v.startswith("[") and v.endswith("]"): - if "::vertex" not in v: - continue - v = v.replace("::vertex", "") - vertexes = json.loads(v) - for vertex in vertexes: - vertices[vertex["id"]] = vertex.get("properties") + # Handle vertex arrays + json_content, type_id = parse_agtype_string(v) + if type_id == "vertex": + vertexes = safe_json_parse( + json_content, f"vertices array for {k}" + ) + if vertexes: + for vertex in vertexes: + vertices[vertex["id"]] = vertex.get("properties") else: - dtype = v.split("::")[-1] - v = v.split("::")[0] - if dtype == "vertex": - vertex = json.loads(v) - vertices[vertex["id"]] = vertex.get("properties") + # Handle single vertex + json_content, type_id = parse_agtype_string(v) + if type_id == "vertex": + vertex = safe_json_parse(json_content, f"single vertex for {k}") + if vertex: + vertices[vertex["id"]] = vertex.get("properties") - # iterate returned fields and parse appropriately + # Second pass: process all fields for k in record.keys(): v = record[k] if isinstance(v, str) and "::" in v: if v.startswith("[") and v.endswith("]"): - if "::vertex" in v: - v = v.replace("::vertex", "") - d[k] = json.loads(v) - - elif "::edge" in v: - v = v.replace("::edge", "") - d[k] = json.loads(v) + # Handle array types + json_content, type_id = parse_agtype_string(v) + if type_id in ["vertex", "edge"]: + parsed_data = safe_json_parse( + json_content, f"array {type_id} for field {k}" + ) + d[k] = parsed_data if parsed_data is not None else None else: - print("WARNING: unsupported type") - continue - + logger.warning(f"Unknown array type: {type_id}") + d[k] = None else: - dtype = v.split("::")[-1] - v = v.split("::")[0] - if dtype == "vertex": - d[k] = json.loads(v) - elif dtype == "edge": - d[k] = json.loads(v) + # Handle single objects + json_content, type_id = parse_agtype_string(v) + if type_id in ["vertex", "edge"]: + parsed_data = safe_json_parse( + json_content, f"single {type_id} for field {k}" + ) + d[k] = parsed_data if parsed_data is not None else None + else: + # May be other types of agtype data, keep as is + d[k] = v else: d[k] = v # Keep as string