This commit is contained in:
hzywhite 2025-09-05 15:01:50 +08:00
parent 2a453fbe37
commit e3ea87da24
2 changed files with 31 additions and 2 deletions

View file

@ -9,6 +9,7 @@ import warnings
from dataclasses import asdict, dataclass, field from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone from datetime import datetime, timezone
from functools import partial from functools import partial
from pathlib import Path
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,

View file

@ -1760,7 +1760,29 @@ async def merge_nodes_and_edges(
if full_entities_storage and full_relations_storage and doc_id: if full_entities_storage and full_relations_storage and doc_id:
try: try:
# Merge all entities: original entities + entities added during edge processing # Merge all entities: original entities + entities added during edge processing
final_entity_names = set() existing_entites_data = None
existing_relations_data = None
try:
existing_entites_data = await full_entities_storage.get_by_id(doc_id)
existing_relations_data = await full_relations_storage.get_by_id(doc_id)
except Exception as e:
logger.debug(
f"Could not retrieve existing entity/relation data for {doc_id}: {e}"
)
existing_entites_names = set()
if existing_entites_data and existing_entites_data.get("entity_names"):
existing_entites_names.update(existing_entites_data["entity_names"])
existing_relation_pairs = set()
if existing_relations_data and existing_relations_data.get(
"relation_pairs"
):
for pair in existing_relations_data["relation_pairs"]:
existing_relation_pairs.add(tuple(sorted(pair)))
final_entity_names = existing_entites_names.copy()
# Add original processed entities # Add original processed entities
for entity_data in processed_entities: for entity_data in processed_entities:
@ -1773,7 +1795,7 @@ async def merge_nodes_and_edges(
final_entity_names.add(added_entity["entity_name"]) final_entity_names.add(added_entity["entity_name"])
# Collect all relation pairs # Collect all relation pairs
final_relation_pairs = set() final_relation_pairs = existing_relation_pairs.copy()
for edge_data in processed_edges: for edge_data in processed_edges:
if edge_data: if edge_data:
src_id = edge_data.get("src_id") src_id = edge_data.get("src_id")
@ -1783,6 +1805,12 @@ async def merge_nodes_and_edges(
final_relation_pairs.add(relation_pair) final_relation_pairs.add(relation_pair)
log_message = f"Phase 3: Updating final {len(final_entity_names)}({len(processed_entities)}+{len(all_added_entities)}) entities and {len(final_relation_pairs)} relations from {doc_id}" log_message = f"Phase 3: Updating final {len(final_entity_names)}({len(processed_entities)}+{len(all_added_entities)}) entities and {len(final_relation_pairs)} relations from {doc_id}"
new_entities_count = len(final_entity_names) - len(existing_entites_names)
new_relation_count = len(final_relation_pairs) - len(
existing_relation_pairs
)
log_message = f"Phase 3: Merging storage - existing: {len(existing_entites_names)} entitites, {len(existing_relation_pairs)} relations; new: {new_entities_count} entities. {new_relation_count} relations; total: {len(final_entity_names)} entities, {len(final_relation_pairs)} relations"
logger.info(log_message) logger.info(log_message)
async with pipeline_status_lock: async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message pipeline_status["latest_message"] = log_message