fix: sync core modules with upstream for compatibility

This commit is contained in:
Raphaël MANSUY 2025-12-04 19:10:46 +08:00
parent 395b2d82de
commit ed73def994
4 changed files with 472 additions and 101 deletions

View file

@ -13,6 +13,7 @@ DEFAULT_MAX_GRAPH_NODES = 1000
# Default values for extraction settings
DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for document processing
DEFAULT_MAX_GLEANING = 1
DEFAULT_ENTITY_NAME_MAX_LENGTH = 256
# Number of description fragments to trigger LLM summary
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8
@ -37,7 +38,7 @@ DEFAULT_ENTITY_TYPES = [
"NaturalObject",
]
# Separator for graph fields
# Separator for: description, source_id and relation-key fields(Can not be changed after data inserted)
GRAPH_FIELD_SEP = "<SEP>"
# Query and retrieval configuration defaults
@ -58,17 +59,20 @@ DEFAULT_MIN_RERANK_SCORE = 0.0
DEFAULT_RERANK_BINDING = "null"
# Default source ids limit in meta data for entity and relation
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 3
DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 3
SOURCE_IDS_LIMIT_METHOD_KEEP = "KEEP" # Keep oldest
SOURCE_IDS_LIMIT_METHOD_FIFO = "FIFO" # First In First Out (Keep newest)
DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_KEEP
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 300
DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 300
### control chunk_ids limitation method: FIFO, FIFO
### FIFO: First in first out
### KEEP: Keep oldest (less merge action and faster)
SOURCE_IDS_LIMIT_METHOD_KEEP = "KEEP"
SOURCE_IDS_LIMIT_METHOD_FIFO = "FIFO"
DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_FIFO
VALID_SOURCE_IDS_LIMIT_METHODS = {
SOURCE_IDS_LIMIT_METHOD_KEEP,
SOURCE_IDS_LIMIT_METHOD_FIFO,
}
# Default file_path limit in meta data for entity and relation (Use same limit method as source_ids)
DEFAULT_MAX_FILE_PATHS = 2
# Maximum number of file paths stored in entity/relation file_path field (For displayed only, does not affect query performance)
DEFAULT_MAX_FILE_PATHS = 100
# Field length of file_path in Milvus Schema for entity and relation (Should not be changed)
# file_path must store all file paths up to the DEFAULT_MAX_FILE_PATHS limit within the metadata.

View file

@ -96,3 +96,41 @@ class PipelineNotInitializedError(KeyError):
f" await initialize_pipeline_status(workspace='your_workspace')"
)
super().__init__(msg)
class PipelineCancelledException(Exception):
"""Raised when pipeline processing is cancelled by user request."""
def __init__(self, message: str = "User cancelled"):
super().__init__(message)
self.message = message
class ChunkTokenLimitExceededError(ValueError):
"""Raised when a chunk exceeds the configured token limit."""
def __init__(
self,
chunk_tokens: int,
chunk_token_limit: int,
chunk_preview: str | None = None,
) -> None:
preview = chunk_preview.strip() if chunk_preview else None
truncated_preview = preview[:80] if preview else None
preview_note = f" Preview: '{truncated_preview}'" if truncated_preview else ""
message = (
f"Chunk token length {chunk_tokens} exceeds chunk_token_size {chunk_token_limit}."
f"{preview_note}"
)
super().__init__(message)
self.chunk_tokens = chunk_tokens
self.chunk_token_limit = chunk_token_limit
self.chunk_preview = truncated_preview
class QdrantMigrationError(Exception):
"""Raised when Qdrant data migration from legacy collections fails."""
def __init__(self, message: str):
super().__init__(message)
self.message = message

View file

@ -260,13 +260,15 @@ class LightRAG:
- `content`: The text to be split into chunks.
- `split_by_character`: The character to split the text on. If None, the text is split into chunks of `chunk_token_size` tokens.
- `split_by_character_only`: If True, the text is split only on the specified character.
- `chunk_token_size`: The maximum number of tokens per chunk.
- `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks.
- `chunk_token_size`: The maximum number of tokens per chunk.
The function should return a list of dictionaries (or an awaitable that resolves to a list),
where each dictionary contains the following keys:
- `tokens`: The number of tokens in the chunk.
- `content`: The text content of the chunk.
- `tokens` (int): The number of tokens in the chunk.
- `content` (str): The text content of the chunk.
- `chunk_order_index` (int): Zero-based index indicating the chunk's order in the document.
Defaults to `chunking_by_token_size` if not specified.
"""
@ -2948,6 +2950,26 @@ class LightRAG:
data across different storage layers are removed or rebuiled. If entities or relationships
are partially affected, they will be rebuilded using LLM cached from remaining documents.
**Concurrency Control Design:**
This function implements a pipeline-based concurrency control to prevent data corruption:
1. **Single Document Deletion** (when WE acquire pipeline):
- Sets job_name to "Single document deletion" (NOT starting with "deleting")
- Prevents other adelete_by_doc_id calls from running concurrently
- Ensures exclusive access to graph operations for this deletion
2. **Batch Document Deletion** (when background_delete_documents acquires pipeline):
- Sets job_name to "Deleting {N} Documents" (starts with "deleting")
- Allows multiple adelete_by_doc_id calls to join the deletion queue
- Each call validates the job name to ensure it's part of a deletion operation
The validation logic `if not job_name.startswith("deleting") or "document" not in job_name`
ensures that:
- adelete_by_doc_id can only run when pipeline is idle OR during batch deletion
- Prevents concurrent single deletions that could cause race conditions
- Rejects operations when pipeline is busy with non-deletion tasks
Args:
doc_id (str): The unique identifier of the document to be deleted.
delete_llm_cache (bool): Whether to delete cached LLM extraction results
@ -2955,17 +2977,13 @@ class LightRAG:
Returns:
DeletionResult: An object containing the outcome of the deletion process.
- `status` (str): "success", "not_found", or "failure".
- `status` (str): "success", "not_found", "not_allowed", or "failure".
- `doc_id` (str): The ID of the document attempted to be deleted.
- `message` (str): A summary of the operation's result.
- `status_code` (int): HTTP status code (e.g., 200, 404, 500).
- `status_code` (int): HTTP status code (e.g., 200, 404, 403, 500).
- `file_path` (str | None): The file path of the deleted document, if available.
"""
deletion_operations_started = False
original_exception = None
doc_llm_cache_ids: list[str] = []
# Get pipeline status shared data and lock for status updates
# Get pipeline status shared data and lock for validation
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=self.workspace
)
@ -2973,6 +2991,48 @@ class LightRAG:
"pipeline_status", workspace=self.workspace
)
# Track whether WE acquired the pipeline
we_acquired_pipeline = False
# Check and acquire pipeline if needed
async with pipeline_status_lock:
if not pipeline_status.get("busy", False):
# Pipeline is idle - WE acquire it for this deletion
we_acquired_pipeline = True
pipeline_status.update(
{
"busy": True,
"job_name": "Single document deletion",
"job_start": datetime.now(timezone.utc).isoformat(),
"docs": 1,
"batchs": 1,
"cur_batch": 0,
"request_pending": False,
"cancellation_requested": False,
"latest_message": f"Starting deletion for document: {doc_id}",
}
)
# Initialize history messages
pipeline_status["history_messages"][:] = [
f"Starting deletion for document: {doc_id}"
]
else:
# Pipeline already busy - verify it's a deletion job
job_name = pipeline_status.get("job_name", "").lower()
if not job_name.startswith("deleting") or "document" not in job_name:
return DeletionResult(
status="not_allowed",
doc_id=doc_id,
message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job",
status_code=403,
file_path=None,
)
# Pipeline is busy with deletion - proceed without acquiring
deletion_operations_started = False
original_exception = None
doc_llm_cache_ids: list[str] = []
async with pipeline_status_lock:
log_message = f"Starting deletion process for document {doc_id}"
logger.info(log_message)
@ -3585,6 +3645,18 @@ class LightRAG:
f"No deletion operations were started for document {doc_id}, skipping persistence"
)
# Release pipeline only if WE acquired it
if we_acquired_pipeline:
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["cancellation_requested"] = False
completion_msg = (
f"Deletion process completed for document: {doc_id}"
)
pipeline_status["latest_message"] = completion_msg
pipeline_status["history_messages"].append(completion_msg)
logger.info(completion_msg)
async def adelete_by_entity(self, entity_name: str) -> DeletionResult:
"""Asynchronously delete an entity and all its relationships.

View file

@ -1,5 +1,6 @@
from __future__ import annotations
from functools import partial
from pathlib import Path
import asyncio
import json
@ -7,6 +8,10 @@ import json_repair
from typing import Any, AsyncIterator, overload, Literal
from collections import Counter, defaultdict
from lightrag.exceptions import (
PipelineCancelledException,
ChunkTokenLimitExceededError,
)
from lightrag.utils import (
logger,
compute_mdhash_id,
@ -67,7 +72,7 @@ from dotenv import load_dotenv
# use the .env that is inside the current folder
# allows to use different .env file for each lightrag instance
# the OS environment variables take precedence over the .env file
load_dotenv(dotenv_path=".env", override=False)
load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False)
def _truncate_entity_identifier(
@ -96,8 +101,8 @@ def chunking_by_token_size(
content: str,
split_by_character: str | None = None,
split_by_character_only: bool = False,
overlap_token_size: int = 128,
max_token_size: int = 1024,
chunk_overlap_token_size: int = 100,
chunk_token_size: int = 1200,
) -> list[dict[str, Any]]:
tokens = tokenizer.encode(content)
results: list[dict[str, Any]] = []
@ -107,19 +112,30 @@ def chunking_by_token_size(
if split_by_character_only:
for chunk in raw_chunks:
_tokens = tokenizer.encode(chunk)
if len(_tokens) > chunk_token_size:
logger.warning(
"Chunk split_by_character exceeds token limit: len=%d limit=%d",
len(_tokens),
chunk_token_size,
)
raise ChunkTokenLimitExceededError(
chunk_tokens=len(_tokens),
chunk_token_limit=chunk_token_size,
chunk_preview=chunk[:120],
)
new_chunks.append((len(_tokens), chunk))
else:
for chunk in raw_chunks:
_tokens = tokenizer.encode(chunk)
if len(_tokens) > max_token_size:
if len(_tokens) > chunk_token_size:
for start in range(
0, len(_tokens), max_token_size - overlap_token_size
0, len(_tokens), chunk_token_size - chunk_overlap_token_size
):
chunk_content = tokenizer.decode(
_tokens[start : start + max_token_size]
_tokens[start : start + chunk_token_size]
)
new_chunks.append(
(min(max_token_size, len(_tokens) - start), chunk_content)
(min(chunk_token_size, len(_tokens) - start), chunk_content)
)
else:
new_chunks.append((len(_tokens), chunk))
@ -133,12 +149,12 @@ def chunking_by_token_size(
)
else:
for index, start in enumerate(
range(0, len(tokens), max_token_size - overlap_token_size)
range(0, len(tokens), chunk_token_size - chunk_overlap_token_size)
):
chunk_content = tokenizer.decode(tokens[start : start + max_token_size])
chunk_content = tokenizer.decode(tokens[start : start + chunk_token_size])
results.append(
{
"tokens": min(max_token_size, len(tokens) - start),
"tokens": min(chunk_token_size, len(tokens) - start),
"content": chunk_content.strip(),
"chunk_order_index": index,
}
@ -343,6 +359,20 @@ async def _summarize_descriptions(
llm_response_cache=llm_response_cache,
cache_type="summary",
)
# Check summary token length against embedding limit
embedding_token_limit = global_config.get("embedding_token_limit")
if embedding_token_limit is not None and summary:
tokenizer = global_config["tokenizer"]
summary_token_count = len(tokenizer.encode(summary))
threshold = int(embedding_token_limit * 0.9)
if summary_token_count > threshold:
logger.warning(
f"Summary tokens ({summary_token_count}) exceeds 90% of embedding limit "
f"({embedding_token_limit}) for {description_type}: {description_name}"
)
return summary
@ -367,8 +397,8 @@ async def _handle_single_entity_extraction(
# Validate entity name after all cleaning steps
if not entity_name or not entity_name.strip():
logger.warning(
f"Entity extraction error: entity name became empty after cleaning. Original: '{record_attributes[1]}'"
logger.info(
f"Empty entity name found after sanitization. Original: '{record_attributes[1]}'"
)
return None
@ -444,14 +474,14 @@ async def _handle_single_relationship_extraction(
# Validate entity names after all cleaning steps
if not source:
logger.warning(
f"Relationship extraction error: source entity became empty after cleaning. Original: '{record_attributes[1]}'"
logger.info(
f"Empty source entity found after sanitization. Original: '{record_attributes[1]}'"
)
return None
if not target:
logger.warning(
f"Relationship extraction error: target entity became empty after cleaning. Original: '{record_attributes[2]}'"
logger.info(
f"Empty target entity found after sanitization. Original: '{record_attributes[2]}'"
)
return None
@ -500,7 +530,7 @@ async def _handle_single_relationship_extraction(
return None
async def _rebuild_knowledge_from_chunks(
async def rebuild_knowledge_from_chunks(
entities_to_rebuild: dict[str, list[str]],
relationships_to_rebuild: dict[tuple[str, str], list[str]],
knowledge_graph_inst: BaseGraphStorage,
@ -675,14 +705,6 @@ async def _rebuild_knowledge_from_chunks(
entity_chunks_storage=entity_chunks_storage,
)
rebuilt_entities_count += 1
status_message = (
f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
failed_entities_count += 1
status_message = f"Failed to rebuild `{entity_name}`: {e}"
@ -708,6 +730,7 @@ async def _rebuild_knowledge_from_chunks(
await _rebuild_single_relationship(
knowledge_graph_inst=knowledge_graph_inst,
relationships_vdb=relationships_vdb,
entities_vdb=entities_vdb,
src=src,
tgt=tgt,
chunk_ids=chunk_ids,
@ -715,13 +738,14 @@ async def _rebuild_knowledge_from_chunks(
llm_response_cache=llm_response_cache,
global_config=global_config,
relation_chunks_storage=relation_chunks_storage,
entity_chunks_storage=entity_chunks_storage,
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
)
rebuilt_relationships_count += 1
except Exception as e:
failed_relationships_count += 1
status_message = f"Failed to rebuild `{src} - {tgt}`: {e}"
status_message = f"Failed to rebuild `{src}`~`{tgt}`: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
@ -1290,6 +1314,7 @@ async def _rebuild_single_entity(
async def _rebuild_single_relationship(
knowledge_graph_inst: BaseGraphStorage,
relationships_vdb: BaseVectorStorage,
entities_vdb: BaseVectorStorage,
src: str,
tgt: str,
chunk_ids: list[str],
@ -1297,6 +1322,7 @@ async def _rebuild_single_relationship(
llm_response_cache: BaseKVStorage,
global_config: dict[str, str],
relation_chunks_storage: BaseKVStorage | None = None,
entity_chunks_storage: BaseKVStorage | None = None,
pipeline_status: dict | None = None,
pipeline_status_lock=None,
) -> None:
@ -1440,9 +1466,69 @@ async def _rebuild_single_relationship(
else current_relationship.get("file_path", "unknown_source"),
"truncate": truncation_info,
}
# Ensure both endpoint nodes exist before writing the edge back
# (certain storage backends require pre-existing nodes).
node_description = (
updated_relationship_data["description"]
if updated_relationship_data.get("description")
else current_relationship.get("description", "")
)
node_source_id = updated_relationship_data.get("source_id", "")
node_file_path = updated_relationship_data.get("file_path", "unknown_source")
for node_id in {src, tgt}:
if not (await knowledge_graph_inst.has_node(node_id)):
node_created_at = int(time.time())
node_data = {
"entity_id": node_id,
"source_id": node_source_id,
"description": node_description,
"entity_type": "UNKNOWN",
"file_path": node_file_path,
"created_at": node_created_at,
"truncate": "",
}
await knowledge_graph_inst.upsert_node(node_id, node_data=node_data)
# Update entity_chunks_storage for the newly created entity
if entity_chunks_storage is not None and limited_chunk_ids:
await entity_chunks_storage.upsert(
{
node_id: {
"chunk_ids": limited_chunk_ids,
"count": len(limited_chunk_ids),
}
}
)
# Update entity_vdb for the newly created entity
if entities_vdb is not None:
entity_vdb_id = compute_mdhash_id(node_id, prefix="ent-")
entity_content = f"{node_id}\n{node_description}"
vdb_data = {
entity_vdb_id: {
"content": entity_content,
"entity_name": node_id,
"source_id": node_source_id,
"entity_type": "UNKNOWN",
"file_path": node_file_path,
}
}
await safe_vdb_operation_with_exception(
operation=lambda payload=vdb_data: entities_vdb.upsert(payload),
operation_name="rebuild_added_entity_upsert",
entity_name=node_id,
max_retries=3,
retry_delay=0.1,
)
await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data)
# Update relationship in vector database
# Sort src and tgt to ensure consistent ordering (smaller string first)
if src > tgt:
src, tgt = tgt, src
try:
rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-")
rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-")
@ -1485,7 +1571,7 @@ async def _rebuild_single_relationship(
raise # Re-raise exception
# Log rebuild completion with truncation info
status_message = f"Rebuild `{src} - {tgt}` from {len(chunk_ids)} chunks"
status_message = f"Rebuild `{src}`~`{tgt}` from {len(chunk_ids)} chunks"
if truncation_info:
status_message += f" ({truncation_info})"
# Add truncation info from apply_source_ids_limit if truncation occurred
@ -1637,6 +1723,12 @@ async def _merge_nodes_then_upsert(
logger.error(f"Entity {entity_name} has no description")
raise ValueError(f"Entity {entity_name} has no description")
# Check for cancellation before LLM summary
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled during entity summary")
# 8. Get summary description an LLM usage status
description, llm_was_used = await _handle_entity_relation_summary(
"Entity",
@ -1789,6 +1881,7 @@ async def _merge_edges_then_upsert(
llm_response_cache: BaseKVStorage | None = None,
added_entities: list = None, # New parameter to track entities added during edge processing
relation_chunks_storage: BaseKVStorage | None = None,
entity_chunks_storage: BaseKVStorage | None = None,
):
if src_id == tgt_id:
return None
@ -1957,6 +2050,14 @@ async def _merge_edges_then_upsert(
logger.error(f"Relation {src_id}~{tgt_id} has no description")
raise ValueError(f"Relation {src_id}~{tgt_id} has no description")
# Check for cancellation before LLM summary
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during relation summary"
)
# 8. Get summary description an LLM usage status
description, llm_was_used = await _handle_entity_relation_summary(
"Relation",
@ -2065,7 +2166,11 @@ async def _merge_edges_then_upsert(
# 11. Update both graph and vector db
for need_insert_id in [src_id, tgt_id]:
if not (await knowledge_graph_inst.has_node(need_insert_id)):
# Optimization: Use get_node instead of has_node + get_node
existing_node = await knowledge_graph_inst.get_node(need_insert_id)
if existing_node is None:
# Node doesn't exist - create new node
node_created_at = int(time.time())
node_data = {
"entity_id": need_insert_id,
@ -2078,6 +2183,19 @@ async def _merge_edges_then_upsert(
}
await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data)
# Update entity_chunks_storage for the newly created entity
if entity_chunks_storage is not None:
chunk_ids = [chunk_id for chunk_id in full_source_ids if chunk_id]
if chunk_ids:
await entity_chunks_storage.upsert(
{
need_insert_id: {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
}
}
)
if entity_vdb is not None:
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
entity_content = f"{need_insert_id}\n{description}"
@ -2109,6 +2227,109 @@ async def _merge_edges_then_upsert(
"created_at": node_created_at,
}
added_entities.append(entity_data)
else:
# Node exists - update its source_ids by merging with new source_ids
updated = False # Track if any update occurred
# 1. Get existing full source_ids from entity_chunks_storage
existing_full_source_ids = []
if entity_chunks_storage is not None:
stored_chunks = await entity_chunks_storage.get_by_id(need_insert_id)
if stored_chunks and isinstance(stored_chunks, dict):
existing_full_source_ids = [
chunk_id
for chunk_id in stored_chunks.get("chunk_ids", [])
if chunk_id
]
# If not in entity_chunks_storage, get from graph database
if not existing_full_source_ids:
if existing_node.get("source_id"):
existing_full_source_ids = existing_node["source_id"].split(
GRAPH_FIELD_SEP
)
# 2. Merge with new source_ids from this relationship
new_source_ids_from_relation = [
chunk_id for chunk_id in source_ids if chunk_id
]
merged_full_source_ids = merge_source_ids(
existing_full_source_ids, new_source_ids_from_relation
)
# 3. Save merged full list to entity_chunks_storage (conditional)
if (
entity_chunks_storage is not None
and merged_full_source_ids != existing_full_source_ids
):
updated = True
await entity_chunks_storage.upsert(
{
need_insert_id: {
"chunk_ids": merged_full_source_ids,
"count": len(merged_full_source_ids),
}
}
)
# 4. Apply source_ids limit for graph and vector db
limit_method = global_config.get(
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
)
max_source_limit = global_config.get("max_source_ids_per_entity")
limited_source_ids = apply_source_ids_limit(
merged_full_source_ids,
max_source_limit,
limit_method,
identifier=f"`{need_insert_id}`",
)
# 5. Update graph database and vector database with limited source_ids (conditional)
limited_source_id_str = GRAPH_FIELD_SEP.join(limited_source_ids)
if limited_source_id_str != existing_node.get("source_id", ""):
updated = True
updated_node_data = {
**existing_node,
"source_id": limited_source_id_str,
}
await knowledge_graph_inst.upsert_node(
need_insert_id, node_data=updated_node_data
)
# Update vector database
if entity_vdb is not None:
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
entity_content = (
f"{need_insert_id}\n{existing_node.get('description', '')}"
)
vdb_data = {
entity_vdb_id: {
"content": entity_content,
"entity_name": need_insert_id,
"source_id": limited_source_id_str,
"entity_type": existing_node.get("entity_type", "UNKNOWN"),
"file_path": existing_node.get(
"file_path", "unknown_source"
),
}
}
await safe_vdb_operation_with_exception(
operation=lambda payload=vdb_data: entity_vdb.upsert(payload),
operation_name="existing_entity_update",
entity_name=need_insert_id,
max_retries=3,
retry_delay=0.1,
)
# 6. Log once at the end if any update occurred
if updated:
status_message = f"Chunks appended from relation: `{need_insert_id}`"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
edge_created_at = int(time.time())
await knowledge_graph_inst.upsert_edge(
@ -2137,6 +2358,10 @@ async def _merge_edges_then_upsert(
weight=weight,
)
# Sort src_id and tgt_id to ensure consistent ordering (smaller string first)
if src_id > tgt_id:
src_id, tgt_id = tgt_id, src_id
if relationships_vdb is not None:
rel_vdb_id = compute_mdhash_id(src_id + tgt_id, prefix="rel-")
rel_vdb_id_reverse = compute_mdhash_id(tgt_id + src_id, prefix="rel-")
@ -2214,6 +2439,12 @@ async def merge_nodes_and_edges(
file_path: File path for logging
"""
# Check for cancellation at the start of merge
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled during merge phase")
# Collect all nodes and edges from all chunks
all_nodes = defaultdict(list)
all_edges = defaultdict(list)
@ -2250,6 +2481,14 @@ async def merge_nodes_and_edges(
async def _locked_process_entity_name(entity_name, entities):
async with semaphore:
# Check for cancellation before processing entity
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during entity merge"
)
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
@ -2343,6 +2582,14 @@ async def merge_nodes_and_edges(
async def _locked_process_edges(edge_key, edges):
async with semaphore:
# Check for cancellation before processing edges
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during relation merge"
)
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
sorted_edge_key = sorted([edge_key[0], edge_key[1]])
@ -2369,6 +2616,7 @@ async def merge_nodes_and_edges(
llm_response_cache,
added_entities, # Pass list to collect added entities
relation_chunks_storage,
entity_chunks_storage, # Add entity_chunks_storage parameter
)
if edge_data is None:
@ -2525,6 +2773,14 @@ async def extract_entities(
llm_response_cache: BaseKVStorage | None = None,
text_chunks_storage: BaseKVStorage | None = None,
) -> list:
# Check for cancellation at the start of entity extraction
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during entity extraction"
)
use_llm_func: callable = global_config["llm_model_func"]
entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"]
@ -2692,6 +2948,14 @@ async def extract_entities(
async def _process_with_semaphore(chunk):
async with semaphore:
# Check for cancellation before processing chunk
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during chunk processing"
)
try:
return await _process_single_content(chunk)
except Exception as e:
@ -3189,10 +3453,10 @@ async def _perform_kg_search(
)
query_embedding = None
if query and (kg_chunk_pick_method == "VECTOR" or chunks_vdb):
embedding_func_config = text_chunks_db.embedding_func
if embedding_func_config and embedding_func_config.func:
actual_embedding_func = text_chunks_db.embedding_func
if actual_embedding_func:
try:
query_embedding = await embedding_func_config.func([query])
query_embedding = await actual_embedding_func([query])
query_embedding = query_embedding[
0
] # Extract first embedding from batch result
@ -3596,7 +3860,7 @@ async def _merge_all_chunks(
return merged_chunks
async def _build_llm_context(
async def _build_context_str(
entities_context: list[dict],
relations_context: list[dict],
merged_chunks: list[dict],
@ -3696,23 +3960,32 @@ async def _build_llm_context(
truncated_chunks
)
# Rebuild text_units_context with truncated chunks
# Rebuild chunks_context with truncated chunks
# The actual tokens may be slightly less than available_chunk_tokens due to deduplication logic
text_units_context = []
chunks_context = []
for i, chunk in enumerate(truncated_chunks):
text_units_context.append(
chunks_context.append(
{
"reference_id": chunk["reference_id"],
"content": chunk["content"],
}
)
text_units_str = "\n".join(
json.dumps(text_unit, ensure_ascii=False) for text_unit in chunks_context
)
reference_list_str = "\n".join(
f"[{ref['reference_id']}] {ref['file_path']}"
for ref in reference_list
if ref["reference_id"]
)
logger.info(
f"Final context: {len(entities_context)} entities, {len(relations_context)} relations, {len(text_units_context)} chunks"
f"Final context: {len(entities_context)} entities, {len(relations_context)} relations, {len(chunks_context)} chunks"
)
# not necessary to use LLM to generate a response
if not entities_context and not relations_context:
if not entities_context and not relations_context and not chunks_context:
# Return empty raw data structure when no entities/relations
empty_raw_data = convert_to_user_format(
[],
@ -3743,15 +4016,6 @@ async def _build_llm_context(
if chunk_tracking_log:
logger.info(f"Final chunks S+F/O: {' '.join(chunk_tracking_log)}")
text_units_str = "\n".join(
json.dumps(text_unit, ensure_ascii=False) for text_unit in text_units_context
)
reference_list_str = "\n".join(
f"[{ref['reference_id']}] {ref['file_path']}"
for ref in reference_list
if ref["reference_id"]
)
result = kg_context_template.format(
entities_str=entities_str,
relations_str=relations_str,
@ -3761,7 +4025,7 @@ async def _build_llm_context(
# Always return both context and complete data structure (unified approach)
logger.debug(
f"[_build_llm_context] Converting to user format: {len(entities_context)} entities, {len(relations_context)} relations, {len(truncated_chunks)} chunks"
f"[_build_context_str] Converting to user format: {len(entities_context)} entities, {len(relations_context)} relations, {len(truncated_chunks)} chunks"
)
final_data = convert_to_user_format(
entities_context,
@ -3773,7 +4037,7 @@ async def _build_llm_context(
relation_id_to_original,
)
logger.debug(
f"[_build_llm_context] Final data after conversion: {len(final_data.get('entities', []))} entities, {len(final_data.get('relationships', []))} relationships, {len(final_data.get('chunks', []))} chunks"
f"[_build_context_str] Final data after conversion: {len(final_data.get('entities', []))} entities, {len(final_data.get('relationships', []))} relationships, {len(final_data.get('chunks', []))} chunks"
)
return result, final_data
@ -3850,8 +4114,8 @@ async def _build_query_context(
return None
# Stage 4: Build final LLM context with dynamic token processing
# _build_llm_context now always returns tuple[str, dict]
context, raw_data = await _build_llm_context(
# _build_context_str now always returns tuple[str, dict]
context, raw_data = await _build_context_str(
entities_context=truncation_result["entities_context"],
relations_context=truncation_result["relations_context"],
merged_chunks=merged_chunks,
@ -4100,25 +4364,21 @@ async def _find_related_text_unit_from_entities(
num_of_chunks = int(max_related_chunks * len(entities_with_chunks) / 2)
# Get embedding function from global config
embedding_func_config = text_chunks_db.embedding_func
if not embedding_func_config:
actual_embedding_func = text_chunks_db.embedding_func
if not actual_embedding_func:
logger.warning("No embedding function found, falling back to WEIGHT method")
kg_chunk_pick_method = "WEIGHT"
else:
try:
actual_embedding_func = embedding_func_config.func
selected_chunk_ids = None
if actual_embedding_func:
selected_chunk_ids = await pick_by_vector_similarity(
query=query,
text_chunks_storage=text_chunks_db,
chunks_vdb=chunks_vdb,
num_of_chunks=num_of_chunks,
entity_info=entities_with_chunks,
embedding_func=actual_embedding_func,
query_embedding=query_embedding,
)
selected_chunk_ids = await pick_by_vector_similarity(
query=query,
text_chunks_storage=text_chunks_db,
chunks_vdb=chunks_vdb,
num_of_chunks=num_of_chunks,
entity_info=entities_with_chunks,
embedding_func=actual_embedding_func,
query_embedding=query_embedding,
)
if selected_chunk_ids == []:
kg_chunk_pick_method = "WEIGHT"
@ -4393,24 +4653,21 @@ async def _find_related_text_unit_from_relations(
num_of_chunks = int(max_related_chunks * len(relations_with_chunks) / 2)
# Get embedding function from global config
embedding_func_config = text_chunks_db.embedding_func
if not embedding_func_config:
actual_embedding_func = text_chunks_db.embedding_func
if not actual_embedding_func:
logger.warning("No embedding function found, falling back to WEIGHT method")
kg_chunk_pick_method = "WEIGHT"
else:
try:
actual_embedding_func = embedding_func_config.func
if actual_embedding_func:
selected_chunk_ids = await pick_by_vector_similarity(
query=query,
text_chunks_storage=text_chunks_db,
chunks_vdb=chunks_vdb,
num_of_chunks=num_of_chunks,
entity_info=relations_with_chunks,
embedding_func=actual_embedding_func,
query_embedding=query_embedding,
)
selected_chunk_ids = await pick_by_vector_similarity(
query=query,
text_chunks_storage=text_chunks_db,
chunks_vdb=chunks_vdb,
num_of_chunks=num_of_chunks,
entity_info=relations_with_chunks,
embedding_func=actual_embedding_func,
query_embedding=query_embedding,
)
if selected_chunk_ids == []:
kg_chunk_pick_method = "WEIGHT"
@ -4624,10 +4881,10 @@ async def naive_query(
"final_chunks_count": len(processed_chunks_with_ref_ids),
}
# Build text_units_context from processed chunks with reference IDs
text_units_context = []
# Build chunks_context from processed chunks with reference IDs
chunks_context = []
for i, chunk in enumerate(processed_chunks_with_ref_ids):
text_units_context.append(
chunks_context.append(
{
"reference_id": chunk["reference_id"],
"content": chunk["content"],
@ -4635,7 +4892,7 @@ async def naive_query(
)
text_units_str = "\n".join(
json.dumps(text_unit, ensure_ascii=False) for text_unit in text_units_context
json.dumps(text_unit, ensure_ascii=False) for text_unit in chunks_context
)
reference_list_str = "\n".join(
f"[{ref['reference_id']}] {ref['file_path']}"