Quick fix to limit source_id ballooning while inserting nodes
This commit is contained in:
parent
cc4369102b
commit
54f0a7d1ca
3 changed files with 27 additions and 4 deletions
|
|
@ -13,6 +13,7 @@ DEFAULT_MAX_GRAPH_NODES = 1000
|
||||||
# Default values for extraction settings
|
# Default values for extraction settings
|
||||||
DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for document processing
|
DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for document processing
|
||||||
DEFAULT_MAX_GLEANING = 1
|
DEFAULT_MAX_GLEANING = 1
|
||||||
|
DEFAULT_MAX_CHUNK_IDS_PER_ENTITY = 500 # Applies to Both Graph + Vector DBs
|
||||||
|
|
||||||
# Number of description fragments to trigger LLM summary
|
# Number of description fragments to trigger LLM summary
|
||||||
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8
|
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ from .utils import (
|
||||||
pick_by_vector_similarity,
|
pick_by_vector_similarity,
|
||||||
process_chunks_unified,
|
process_chunks_unified,
|
||||||
build_file_path,
|
build_file_path,
|
||||||
|
truncate_entity_source_id,
|
||||||
safe_vdb_operation_with_exception,
|
safe_vdb_operation_with_exception,
|
||||||
create_prefixed_exception,
|
create_prefixed_exception,
|
||||||
fix_tuple_delimiter_corruption,
|
fix_tuple_delimiter_corruption,
|
||||||
|
|
@ -52,6 +53,7 @@ from .constants import (
|
||||||
DEFAULT_KG_CHUNK_PICK_METHOD,
|
DEFAULT_KG_CHUNK_PICK_METHOD,
|
||||||
DEFAULT_ENTITY_TYPES,
|
DEFAULT_ENTITY_TYPES,
|
||||||
DEFAULT_SUMMARY_LANGUAGE,
|
DEFAULT_SUMMARY_LANGUAGE,
|
||||||
|
DEFAULT_MAX_CHUNK_IDS_PER_ENTITY,
|
||||||
)
|
)
|
||||||
from .kg.shared_storage import get_storage_keyed_lock
|
from .kg.shared_storage import get_storage_keyed_lock
|
||||||
import time
|
import time
|
||||||
|
|
@ -1371,9 +1373,11 @@ async def _merge_nodes_then_upsert(
|
||||||
logger.error(f"Entity {entity_name} has no description")
|
logger.error(f"Entity {entity_name} has no description")
|
||||||
description = "(no description)"
|
description = "(no description)"
|
||||||
|
|
||||||
source_id = GRAPH_FIELD_SEP.join(
|
merged_source_ids: set = set([dp["source_id"] for dp in nodes_data] + already_source_ids)
|
||||||
set([dp["source_id"] for dp in nodes_data] + already_source_ids)
|
|
||||||
)
|
source_ids = truncate_entity_source_id(merged_source_ids, entity_name)
|
||||||
|
source_id = GRAPH_FIELD_SEP.join(source_ids)
|
||||||
|
|
||||||
file_path = build_file_path(already_file_paths, nodes_data, entity_name)
|
file_path = build_file_path(already_file_paths, nodes_data, entity_name)
|
||||||
|
|
||||||
node_data = dict(
|
node_data = dict(
|
||||||
|
|
@ -1658,6 +1662,7 @@ async def merge_nodes_and_edges(
|
||||||
[entity_name], namespace=namespace, enable_logging=False
|
[entity_name], namespace=namespace, enable_logging=False
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
|
logger.info(f"Inserting {entity_name} in Graph")
|
||||||
# Graph database operation (critical path, must succeed)
|
# Graph database operation (critical path, must succeed)
|
||||||
entity_data = await _merge_nodes_then_upsert(
|
entity_data = await _merge_nodes_then_upsert(
|
||||||
entity_name,
|
entity_name,
|
||||||
|
|
@ -1673,7 +1678,7 @@ async def merge_nodes_and_edges(
|
||||||
if entity_vdb is not None and entity_data:
|
if entity_vdb is not None and entity_data:
|
||||||
data_for_vdb = {
|
data_for_vdb = {
|
||||||
compute_mdhash_id(
|
compute_mdhash_id(
|
||||||
entity_data["entity_name"], prefix="ent-"
|
str(entity_data["entity_name"]), prefix="ent-"
|
||||||
): {
|
): {
|
||||||
"entity_name": entity_data["entity_name"],
|
"entity_name": entity_data["entity_name"],
|
||||||
"entity_type": entity_data["entity_type"],
|
"entity_type": entity_data["entity_type"],
|
||||||
|
|
@ -1685,6 +1690,8 @@ async def merge_nodes_and_edges(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
logger.info(f"Inserting {entity_name} in Graph")
|
||||||
# Use safe operation wrapper - VDB failure must throw exception
|
# Use safe operation wrapper - VDB failure must throw exception
|
||||||
await safe_vdb_operation_with_exception(
|
await safe_vdb_operation_with_exception(
|
||||||
operation=lambda: entity_vdb.upsert(data_for_vdb),
|
operation=lambda: entity_vdb.upsert(data_for_vdb),
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ from lightrag.constants import (
|
||||||
GRAPH_FIELD_SEP,
|
GRAPH_FIELD_SEP,
|
||||||
DEFAULT_MAX_TOTAL_TOKENS,
|
DEFAULT_MAX_TOTAL_TOKENS,
|
||||||
DEFAULT_MAX_FILE_PATH_LENGTH,
|
DEFAULT_MAX_FILE_PATH_LENGTH,
|
||||||
|
DEFAULT_MAX_CHUNK_IDS_PER_ENTITY,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Initialize logger with basic configuration
|
# Initialize logger with basic configuration
|
||||||
|
|
@ -2464,6 +2465,20 @@ async def process_chunks_unified(
|
||||||
|
|
||||||
return final_chunks
|
return final_chunks
|
||||||
|
|
||||||
|
def truncate_entity_source_id(chunk_ids: set, entity_name: str) -> set:
|
||||||
|
"""Limit chunk_ids, for entities that appear a HUGE no of times (To not break VDB hard upper limits)"""
|
||||||
|
already_len: int = len(chunk_ids)
|
||||||
|
|
||||||
|
if already_len >= DEFAULT_MAX_CHUNK_IDS_PER_ENTITY:
|
||||||
|
logger.warning(
|
||||||
|
f"Chunk Ids already exceeds {DEFAULT_MAX_CHUNK_IDS_PER_ENTITY} for {entity_name}, "
|
||||||
|
f"current size: {already_len} entries."
|
||||||
|
)
|
||||||
|
|
||||||
|
truncated_chunk_ids = set(list(chunk_ids)[0:DEFAULT_MAX_CHUNK_IDS_PER_ENTITY])
|
||||||
|
|
||||||
|
return truncated_chunk_ids
|
||||||
|
|
||||||
|
|
||||||
def build_file_path(already_file_paths, data_list, target):
|
def build_file_path(already_file_paths, data_list, target):
|
||||||
"""Build file path string with UTF-8 byte length limit and deduplication
|
"""Build file path string with UTF-8 byte length limit and deduplication
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue