fix(postgres): improve AGE agtype parsing and simplify error logging

- Fix JSON parsing errors caused by :: characters in data content
- Implement precise agtype string parsing using rfind() to separate JSON content from type identifiers
- Add robust error handling for malformed JSON in graph data
This commit is contained in:
yangdx 2025-07-18 08:50:47 +08:00
parent 83c8691221
commit f033fd6f87

View file

@ -1905,53 +1905,101 @@ class PGGraphStorage(BaseGraphStorage):
the dictionary key is the field name and the value is the
value converted to a python type
"""
@staticmethod
def parse_agtype_string(agtype_str: str) -> tuple[str, str]:
"""
Parse agtype string precisely, separating JSON content and type identifier
Args:
agtype_str: String like '{"json": "content"}::vertex'
Returns:
(json_content, type_identifier)
"""
if not isinstance(agtype_str, str) or "::" not in agtype_str:
return agtype_str, ""
# Find the last :: from the right, which is the start of type identifier
last_double_colon = agtype_str.rfind("::")
if last_double_colon == -1:
return agtype_str, ""
# Separate JSON content and type identifier
json_content = agtype_str[:last_double_colon]
type_identifier = agtype_str[last_double_colon + 2 :]
return json_content, type_identifier
@staticmethod
def safe_json_parse(json_str: str, context: str = "") -> dict:
"""
Safe JSON parsing with simplified error logging
"""
try:
return json.loads(json_str)
except json.JSONDecodeError as e:
logger.error(f"JSON parsing failed ({context}): {e}")
logger.error(f"Raw data (first 100 chars): {repr(json_str[:100])}")
logger.error(f"Error position: line {e.lineno}, column {e.colno}")
return None
# result holder
d = {}
# prebuild a mapping of vertex_id to vertex mappings to be used
# later to build edges
vertices = {}
# First pass: preprocess vertices
for k in record.keys():
v = record[k]
# agtype comes back '{key: value}::type' which must be parsed
if isinstance(v, str) and "::" in v:
if v.startswith("[") and v.endswith("]"):
if "::vertex" not in v:
continue
v = v.replace("::vertex", "")
vertexes = json.loads(v)
for vertex in vertexes:
vertices[vertex["id"]] = vertex.get("properties")
# Handle vertex arrays
json_content, type_id = parse_agtype_string(v)
if type_id == "vertex":
vertexes = safe_json_parse(
json_content, f"vertices array for {k}"
)
if vertexes:
for vertex in vertexes:
vertices[vertex["id"]] = vertex.get("properties")
else:
dtype = v.split("::")[-1]
v = v.split("::")[0]
if dtype == "vertex":
vertex = json.loads(v)
vertices[vertex["id"]] = vertex.get("properties")
# Handle single vertex
json_content, type_id = parse_agtype_string(v)
if type_id == "vertex":
vertex = safe_json_parse(json_content, f"single vertex for {k}")
if vertex:
vertices[vertex["id"]] = vertex.get("properties")
# iterate returned fields and parse appropriately
# Second pass: process all fields
for k in record.keys():
v = record[k]
if isinstance(v, str) and "::" in v:
if v.startswith("[") and v.endswith("]"):
if "::vertex" in v:
v = v.replace("::vertex", "")
d[k] = json.loads(v)
elif "::edge" in v:
v = v.replace("::edge", "")
d[k] = json.loads(v)
# Handle array types
json_content, type_id = parse_agtype_string(v)
if type_id in ["vertex", "edge"]:
parsed_data = safe_json_parse(
json_content, f"array {type_id} for field {k}"
)
d[k] = parsed_data if parsed_data is not None else None
else:
print("WARNING: unsupported type")
continue
logger.warning(f"Unknown array type: {type_id}")
d[k] = None
else:
dtype = v.split("::")[-1]
v = v.split("::")[0]
if dtype == "vertex":
d[k] = json.loads(v)
elif dtype == "edge":
d[k] = json.loads(v)
# Handle single objects
json_content, type_id = parse_agtype_string(v)
if type_id in ["vertex", "edge"]:
parsed_data = safe_json_parse(
json_content, f"single {type_id} for field {k}"
)
d[k] = parsed_data if parsed_data is not None else None
else:
# May be other types of agtype data, keep as is
d[k] = v
else:
d[k] = v # Keep as string