Improve entity chunk migration with fallback and metadata refresh

• Handle missing chunk data gracefully
• Add fallback from source_id field
• Refresh timestamps on migration
• Support multiple chunk_ids formats
This commit is contained in:
yangdx 2025-10-26 09:31:48 +08:00
parent c74eb3b8ad
commit 0d69bb78e6

View file

@ -279,17 +279,68 @@ async def aedit_entity(
# Migrate entity_chunks data from old entity to new entity
if entity_chunks_storage:
old_chunks_data = await entity_chunks_storage.get_by_id(entity_name)
old_entity_name = entity_name
now_ts = int(time.time())
old_chunks_data = await entity_chunks_storage.get_by_id(
old_entity_name
)
new_chunk_payload: dict[str, Any] | None = None
if old_chunks_data:
# Create chunk tracking for new entity
await entity_chunks_storage.upsert(
{new_entity_name: old_chunks_data}
)
# Delete old entity's chunk tracking
await entity_chunks_storage.delete([entity_name])
# Copy chunk tracking and refresh metadata for the renamed entity
raw_chunk_ids = old_chunks_data.get("chunk_ids", [])
if isinstance(raw_chunk_ids, list):
chunk_ids = list(raw_chunk_ids)
elif isinstance(raw_chunk_ids, str):
chunk_ids = [
cid
for cid in raw_chunk_ids.split(GRAPH_FIELD_SEP)
if cid
]
elif isinstance(raw_chunk_ids, (set, tuple)):
chunk_ids = list(raw_chunk_ids)
else:
chunk_ids = list(raw_chunk_ids) if raw_chunk_ids else []
count = old_chunks_data.get("count", len(chunk_ids))
new_chunk_payload = {
"chunk_ids": chunk_ids,
"count": count,
"updated_at": now_ts,
}
if "created_at" in old_chunks_data:
new_chunk_payload["created_at"] = old_chunks_data[
"created_at"
]
logger.info(
f"Migrated entity_chunks data from '{entity_name}' to '{new_entity_name}'"
f"Migrated entity_chunks data from '{old_entity_name}' to '{new_entity_name}'"
)
else:
# Fall back to rebuilding from the node's source_id if persisted data is missing
source_id_field = new_node_data.get("source_id", "")
if source_id_field and source_id_field != "manual_creation":
chunk_ids = [
cid
for cid in source_id_field.split(GRAPH_FIELD_SEP)
if cid
]
if chunk_ids:
new_chunk_payload = {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
"created_at": now_ts,
"updated_at": now_ts,
}
logger.info(
f"Rebuilt entity_chunks data for renamed entity '{new_entity_name}' from source_id"
)
if new_chunk_payload:
await entity_chunks_storage.upsert(
{new_entity_name: new_chunk_payload}
)
# Always remove the stale record under the old entity name
await entity_chunks_storage.delete([old_entity_name])
# Delete old relation records from vector database
await relationships_vdb.delete(relations_to_delete)