Add workspace parameter and remove chunk-based query unit tests

- Add workspace param to test storage init
- Remove get_nodes_by_chunk_ids tests
- Remove get_edges_by_chunk_ids tests
- Clean up batch operations test function
This commit is contained in:
yangdx 2025-11-06 18:18:01 +08:00
parent 807d2461d3
commit 6b0f9795be

View file

@ -115,6 +115,7 @@ async def initialize_graph_storage():
try:
storage = storage_class(
namespace="test_graph",
workspace="test_workspace",
global_config=global_config,
embedding_func=mock_embedding_func,
)
@ -760,76 +761,6 @@ async def test_graph_batch_operations(storage):
print("无向图特性验证成功:批量获取的节点边包含所有相关的边(无论方向)")
# 7. 测试 get_nodes_by_chunk_ids - 批量根据 chunk_ids 获取多个节点
print("== 测试 get_nodes_by_chunk_ids")
print("== 测试单个 chunk_id匹配多个节点")
nodes = await storage.get_nodes_by_chunk_ids([chunk2_id])
assert len(nodes) == 2, f"{chunk1_id} 应有2个节点实际有 {len(nodes)}"
has_node1 = any(node["entity_id"] == node1_id for node in nodes)
has_node2 = any(node["entity_id"] == node2_id for node in nodes)
assert has_node1, f"节点 {node1_id} 应在返回结果中"
assert has_node2, f"节点 {node2_id} 应在返回结果中"
print("== 测试多个 chunk_id部分匹配多个节点")
nodes = await storage.get_nodes_by_chunk_ids([chunk2_id, chunk3_id])
assert (
len(nodes) == 3
), f"{chunk2_id}, {chunk3_id} 应有3个节点实际有 {len(nodes)}"
has_node1 = any(node["entity_id"] == node1_id for node in nodes)
has_node2 = any(node["entity_id"] == node2_id for node in nodes)
has_node3 = any(node["entity_id"] == node3_id for node in nodes)
assert has_node1, f"节点 {node1_id} 应在返回结果中"
assert has_node2, f"节点 {node2_id} 应在返回结果中"
assert has_node3, f"节点 {node3_id} 应在返回结果中"
# 8. 测试 get_edges_by_chunk_ids - 批量根据 chunk_ids 获取多条边
print("== 测试 get_edges_by_chunk_ids")
print("== 测试单个 chunk_id匹配多条边")
edges = await storage.get_edges_by_chunk_ids([chunk2_id])
assert len(edges) == 2, f"{chunk2_id} 应有2条边实际有 {len(edges)}"
has_edge_node1_node2 = any(
edge["source"] == node1_id and edge["target"] == node2_id for edge in edges
)
has_edge_node2_node3 = any(
edge["source"] == node2_id and edge["target"] == node3_id for edge in edges
)
assert has_edge_node1_node2, f"{chunk2_id} 应包含 {node1_id}{node2_id} 的边"
assert has_edge_node2_node3, f"{chunk2_id} 应包含 {node2_id}{node3_id} 的边"
print("== 测试多个 chunk_id部分匹配多条边")
edges = await storage.get_edges_by_chunk_ids([chunk2_id, chunk3_id])
assert (
len(edges) == 3
), f"{chunk2_id}, {chunk3_id} 应有3条边实际有 {len(edges)}"
has_edge_node1_node2 = any(
edge["source"] == node1_id and edge["target"] == node2_id for edge in edges
)
has_edge_node2_node3 = any(
edge["source"] == node2_id and edge["target"] == node3_id for edge in edges
)
has_edge_node1_node4 = any(
edge["source"] == node1_id and edge["target"] == node4_id for edge in edges
)
assert (
has_edge_node1_node2
), f"{chunk2_id}, {chunk3_id} 应包含 {node1_id}{node2_id} 的边"
assert (
has_edge_node2_node3
), f"{chunk2_id}, {chunk3_id} 应包含 {node2_id}{node3_id} 的边"
assert (
has_edge_node1_node4
), f"{chunk2_id}, {chunk3_id} 应包含 {node1_id}{node4_id} 的边"
print("\n批量操作测试完成")
return True