Improve graph query robustness and error handling

This commit is contained in:
yangdx 2025-07-05 00:31:52 +08:00
parent a69194c079
commit 4ea38456f0

View file

@ -660,16 +660,23 @@ class MemgraphStorage(BaseGraphStorage):
) as session: ) as session:
try: try:
if node_label == "*": if node_label == "*":
# First check if database has any nodes
count_query = "MATCH (n) RETURN count(n) as total" count_query = "MATCH (n) RETURN count(n) as total"
count_result = None count_result = None
total_count = 0
try: try:
count_result = await session.run(count_query) count_result = await session.run(count_query)
count_record = await count_result.single() count_record = await count_result.single()
if count_record and count_record["total"] > max_nodes: if count_record:
result.is_truncated = True total_count = count_record["total"]
logger.info( if total_count == 0:
f"Graph truncated: {count_record['total']} nodes found, limited to {max_nodes}" logger.debug("No nodes found in database")
) return result
if total_count > max_nodes:
result.is_truncated = True
logger.info(
f"Graph truncated: {total_count} nodes found, limited to {max_nodes}"
)
finally: finally:
if count_result: if count_result:
await count_result.consume() await count_result.consume()
@ -695,6 +702,9 @@ class MemgraphStorage(BaseGraphStorage):
main_query, {"max_nodes": max_nodes} main_query, {"max_nodes": max_nodes}
) )
record = await result_set.single() record = await result_set.single()
if not record:
logger.debug("No record returned from main query")
return result
finally: finally:
if result_set: if result_set:
await result_set.consume() await result_set.consume()
@ -738,14 +748,22 @@ class MemgraphStorage(BaseGraphStorage):
) )
record = await result_set.single() record = await result_set.single()
if not record: if not record:
logger.debug(f"No record found for node {node_label}") logger.debug(f"No nodes found for entity_id: {node_label}")
return result return result
# Check if the query indicates truncation
if "is_truncated" in record and record["is_truncated"]:
result.is_truncated = True
logger.info(
f"Graph truncated: breadth-first search limited to {max_nodes} nodes"
)
finally: finally:
if result_set: if result_set:
await result_set.consume() await result_set.consume()
if record: # Process the record if it exists
if record and record["node_info"]:
for node_info in record["node_info"]: for node_info in record["node_info"]:
node = node_info["node"] node = node_info["node"]
node_id = node.id node_id = node.id
@ -759,28 +777,30 @@ class MemgraphStorage(BaseGraphStorage):
) )
) )
for rel in record["relationships"]: if "relationships" in record and record["relationships"]:
edge_id = rel.id for rel in record["relationships"]:
if edge_id not in seen_edges: edge_id = rel.id
seen_edges.add(edge_id) if edge_id not in seen_edges:
start = rel.start_node seen_edges.add(edge_id)
end = rel.end_node start = rel.start_node
result.edges.append( end = rel.end_node
KnowledgeGraphEdge( result.edges.append(
id=f"{edge_id}", KnowledgeGraphEdge(
type=rel.type, id=f"{edge_id}",
source=f"{start.id}", type=rel.type,
target=f"{end.id}", source=f"{start.id}",
properties=dict(rel), target=f"{end.id}",
properties=dict(rel),
)
) )
)
logger.info( logger.info(
f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}" f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}"
) )
return result
except Exception as e: except Exception as e:
logger.error(f"Error getting knowledge graph: {str(e)}") logger.error(f"Error getting knowledge graph: {str(e)}")
return result # Return empty but properly initialized KnowledgeGraph on error
return KnowledgeGraph()
return result