From 7f7574c8b71d79a0c6e1bed11625e2228ab0cbc7 Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 19 Nov 2025 18:32:43 +0800 Subject: [PATCH] Add token limit validation for character-only chunking - Add ChunkTokenLimitExceededError exception - Validate chunks against token limits - Include chunk preview in error messages - Add comprehensive test coverage - Log warnings for oversized chunks (cherry picked from commit f988a226527d6de54ce264b61823a75fe7d9385f) --- lightrag/exceptions.py | 3 +- lightrag/operate.py | 1222 +++++++++++++++++++++++++++------------- tests/test_chunking.py | 407 ------------- 3 files changed, 841 insertions(+), 791 deletions(-) diff --git a/lightrag/exceptions.py b/lightrag/exceptions.py index 709f294d..64c7ea3e 100644 --- a/lightrag/exceptions.py +++ b/lightrag/exceptions.py @@ -118,11 +118,10 @@ class ChunkTokenLimitExceededError(ValueError): preview = chunk_preview.strip() if chunk_preview else None truncated_preview = preview[:80] if preview else None preview_note = f" Preview: '{truncated_preview}'" if truncated_preview else "" - message = ( + super().__init__( f"Chunk token length {chunk_tokens} exceeds chunk_token_size {chunk_token_limit}." f"{preview_note}" ) - super().__init__(message) self.chunk_tokens = chunk_tokens self.chunk_token_limit = chunk_token_limit self.chunk_preview = truncated_preview diff --git a/lightrag/operate.py b/lightrag/operate.py index 3a7502c1..5f824af0 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1,5 +1,6 @@ from __future__ import annotations from functools import partial +from pathlib import Path import asyncio import json @@ -7,6 +8,10 @@ import json_repair from typing import Any, AsyncIterator, overload, Literal from collections import Counter, defaultdict +from lightrag.exceptions import ( + PipelineCancelledException, + ChunkTokenLimitExceededError, +) from lightrag.utils import ( logger, compute_mdhash_id, @@ -26,7 +31,6 @@ from lightrag.utils import ( pick_by_weighted_polling, pick_by_vector_similarity, process_chunks_unified, - build_file_path, safe_vdb_operation_with_exception, create_prefixed_exception, fix_tuple_delimiter_corruption, @@ -56,6 +60,10 @@ from lightrag.constants import ( DEFAULT_ENTITY_TYPES, DEFAULT_SUMMARY_LANGUAGE, SOURCE_IDS_LIMIT_METHOD_KEEP, + SOURCE_IDS_LIMIT_METHOD_FIFO, + DEFAULT_FILE_PATH_MORE_PLACEHOLDER, + DEFAULT_MAX_FILE_PATHS, + DEFAULT_ENTITY_NAME_MAX_LENGTH, ) from lightrag.kg.shared_storage import get_storage_keyed_lock import time @@ -64,7 +72,28 @@ from dotenv import load_dotenv # use the .env that is inside the current folder # allows to use different .env file for each lightrag instance # the OS environment variables take precedence over the .env file -load_dotenv(dotenv_path=".env", override=False) +load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False) + + +def _truncate_entity_identifier( + identifier: str, limit: int, chunk_key: str, identifier_role: str +) -> str: + """Truncate entity identifiers that exceed the configured length limit.""" + + if len(identifier) <= limit: + return identifier + + display_value = identifier[:limit] + preview = identifier[:20] # Show first 20 characters as preview + logger.warning( + "%s: %s len %d > %d chars (Name: '%s...')", + chunk_key, + identifier_role, + len(identifier), + limit, + preview, + ) + return display_value def chunking_by_token_size( @@ -83,6 +112,17 @@ def chunking_by_token_size( if split_by_character_only: for chunk in raw_chunks: _tokens = tokenizer.encode(chunk) + if len(_tokens) > chunk_token_size: + logger.warning( + "Chunk split_by_character exceeds token limit: len=%d limit=%d", + len(_tokens), + chunk_token_size, + ) + raise ChunkTokenLimitExceededError( + chunk_tokens=len(_tokens), + chunk_token_limit=chunk_token_size, + chunk_preview=chunk[:120], + ) new_chunks.append((len(_tokens), chunk)) else: for chunk in raw_chunks: @@ -319,6 +359,20 @@ async def _summarize_descriptions( llm_response_cache=llm_response_cache, cache_type="summary", ) + + # Check summary token length against embedding limit + embedding_token_limit = global_config.get("embedding_token_limit") + if embedding_token_limit is not None and summary: + tokenizer = global_config["tokenizer"] + summary_token_count = len(tokenizer.encode(summary)) + threshold = int(embedding_token_limit * 0.9) + + if summary_token_count > threshold: + logger.warning( + f"Summary tokens ({summary_token_count}) exceeds 90% of embedding limit " + f"({embedding_token_limit}) for {description_type}: {description_name}" + ) + return summary @@ -476,7 +530,7 @@ async def _handle_single_relationship_extraction( return None -async def _rebuild_knowledge_from_chunks( +async def rebuild_knowledge_from_chunks( entities_to_rebuild: dict[str, list[str]], relationships_to_rebuild: dict[tuple[str, str], list[str]], knowledge_graph_inst: BaseGraphStorage, @@ -651,14 +705,6 @@ async def _rebuild_knowledge_from_chunks( entity_chunks_storage=entity_chunks_storage, ) rebuilt_entities_count += 1 - status_message = ( - f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks" - ) - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) except Exception as e: failed_entities_count += 1 status_message = f"Failed to rebuild `{entity_name}`: {e}" @@ -684,6 +730,7 @@ async def _rebuild_knowledge_from_chunks( await _rebuild_single_relationship( knowledge_graph_inst=knowledge_graph_inst, relationships_vdb=relationships_vdb, + entities_vdb=entities_vdb, src=src, tgt=tgt, chunk_ids=chunk_ids, @@ -691,13 +738,14 @@ async def _rebuild_knowledge_from_chunks( llm_response_cache=llm_response_cache, global_config=global_config, relation_chunks_storage=relation_chunks_storage, + entity_chunks_storage=entity_chunks_storage, pipeline_status=pipeline_status, pipeline_status_lock=pipeline_status_lock, ) rebuilt_relationships_count += 1 except Exception as e: failed_relationships_count += 1 - status_message = f"Failed to rebuild `{src} - {tgt}`: {e}" + status_message = f"Failed to rebuild `{src}`~`{tgt}`: {e}" logger.info(status_message) # Per requirement, change to info if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -950,7 +998,14 @@ async def _process_extraction_result( record_attributes, chunk_key, timestamp, file_path ) if entity_data is not None: - maybe_nodes[entity_data["entity_name"]].append(entity_data) + truncated_name = _truncate_entity_identifier( + entity_data["entity_name"], + DEFAULT_ENTITY_NAME_MAX_LENGTH, + chunk_key, + "Entity name", + ) + entity_data["entity_name"] = truncated_name + maybe_nodes[truncated_name].append(entity_data) continue # Try to parse as relationship @@ -958,9 +1013,21 @@ async def _process_extraction_result( record_attributes, chunk_key, timestamp, file_path ) if relationship_data is not None: - maybe_edges[ - (relationship_data["src_id"], relationship_data["tgt_id"]) - ].append(relationship_data) + truncated_source = _truncate_entity_identifier( + relationship_data["src_id"], + DEFAULT_ENTITY_NAME_MAX_LENGTH, + chunk_key, + "Relation entity", + ) + truncated_target = _truncate_entity_identifier( + relationship_data["tgt_id"], + DEFAULT_ENTITY_NAME_MAX_LENGTH, + chunk_key, + "Relation entity", + ) + relationship_data["src_id"] = truncated_source + relationship_data["tgt_id"] = truncated_target + maybe_edges[(truncated_source, truncated_target)].append(relationship_data) return dict(maybe_nodes), dict(maybe_edges) @@ -1024,7 +1091,7 @@ async def _rebuild_single_entity( async def _update_entity_storage( final_description: str, entity_type: str, - file_paths: set[str], + file_paths: list[str], source_chunk_ids: list[str], truncation_info: str = "", ): @@ -1156,7 +1223,8 @@ async def _rebuild_single_entity( # Process cached entity data descriptions = [] entity_types = [] - file_paths = set() + file_paths_list = [] + seen_paths = set() for entity_data in all_entity_data: if entity_data.get("description"): @@ -1164,7 +1232,33 @@ async def _rebuild_single_entity( if entity_data.get("entity_type"): entity_types.append(entity_data["entity_type"]) if entity_data.get("file_path"): - file_paths.add(entity_data["file_path"]) + file_path = entity_data["file_path"] + if file_path and file_path not in seen_paths: + file_paths_list.append(file_path) + seen_paths.add(file_path) + + # Apply MAX_FILE_PATHS limit + max_file_paths = global_config.get("max_file_paths") + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + limit_method = global_config.get("source_ids_limit_method") + + original_count = len(file_paths_list) + if original_count > max_file_paths: + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}...({limit_method} {max_file_paths}/{original_count})" + ) + logger.info( + f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) # Remove duplicates while preserving order description_list = list(dict.fromkeys(descriptions)) @@ -1192,7 +1286,7 @@ async def _rebuild_single_entity( if len(limited_chunk_ids) < len(normalized_chunk_ids): truncation_info = ( - f"{limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)}" + f"{limit_method} {len(limited_chunk_ids)}/{len(normalized_chunk_ids)}" ) else: truncation_info = "" @@ -1200,7 +1294,7 @@ async def _rebuild_single_entity( await _update_entity_storage( final_description, entity_type, - file_paths, + file_paths_list, limited_chunk_ids, truncation_info, ) @@ -1220,6 +1314,7 @@ async def _rebuild_single_entity( async def _rebuild_single_relationship( knowledge_graph_inst: BaseGraphStorage, relationships_vdb: BaseVectorStorage, + entities_vdb: BaseVectorStorage, src: str, tgt: str, chunk_ids: list[str], @@ -1227,6 +1322,7 @@ async def _rebuild_single_relationship( llm_response_cache: BaseKVStorage, global_config: dict[str, str], relation_chunks_storage: BaseKVStorage | None = None, + entity_chunks_storage: BaseKVStorage | None = None, pipeline_status: dict | None = None, pipeline_status_lock=None, ) -> None: @@ -1284,7 +1380,8 @@ async def _rebuild_single_relationship( descriptions = [] keywords = [] weights = [] - file_paths = set() + file_paths_list = [] + seen_paths = set() for rel_data in all_relationship_data: if rel_data.get("description"): @@ -1294,7 +1391,33 @@ async def _rebuild_single_relationship( if rel_data.get("weight"): weights.append(rel_data["weight"]) if rel_data.get("file_path"): - file_paths.add(rel_data["file_path"]) + file_path = rel_data["file_path"] + if file_path and file_path not in seen_paths: + file_paths_list.append(file_path) + seen_paths.add(file_path) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + limit_method = global_config.get("source_ids_limit_method") + + original_count = len(file_paths_list) + if original_count > max_file_paths: + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}...({limit_method} {max_file_paths}/{original_count})" + ) + logger.info( + f"Limited `{src}`~`{tgt}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) # Remove duplicates while preserving order description_list = list(dict.fromkeys(descriptions)) @@ -1324,7 +1447,7 @@ async def _rebuild_single_relationship( if len(limited_chunk_ids) < len(normalized_chunk_ids): truncation_info = ( - f"{limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)}" + f"{limit_method} {len(limited_chunk_ids)}/{len(normalized_chunk_ids)}" ) else: truncation_info = "" @@ -1338,14 +1461,74 @@ async def _rebuild_single_relationship( "keywords": combined_keywords, "weight": weight, "source_id": GRAPH_FIELD_SEP.join(limited_chunk_ids), - "file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths if fp]) - if file_paths + "file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths_list if fp]) + if file_paths_list else current_relationship.get("file_path", "unknown_source"), "truncate": truncation_info, } + + # Ensure both endpoint nodes exist before writing the edge back + # (certain storage backends require pre-existing nodes). + node_description = ( + updated_relationship_data["description"] + if updated_relationship_data.get("description") + else current_relationship.get("description", "") + ) + node_source_id = updated_relationship_data.get("source_id", "") + node_file_path = updated_relationship_data.get("file_path", "unknown_source") + + for node_id in {src, tgt}: + if not (await knowledge_graph_inst.has_node(node_id)): + node_created_at = int(time.time()) + node_data = { + "entity_id": node_id, + "source_id": node_source_id, + "description": node_description, + "entity_type": "UNKNOWN", + "file_path": node_file_path, + "created_at": node_created_at, + "truncate": "", + } + await knowledge_graph_inst.upsert_node(node_id, node_data=node_data) + + # Update entity_chunks_storage for the newly created entity + if entity_chunks_storage is not None and limited_chunk_ids: + await entity_chunks_storage.upsert( + { + node_id: { + "chunk_ids": limited_chunk_ids, + "count": len(limited_chunk_ids), + } + } + ) + + # Update entity_vdb for the newly created entity + if entities_vdb is not None: + entity_vdb_id = compute_mdhash_id(node_id, prefix="ent-") + entity_content = f"{node_id}\n{node_description}" + vdb_data = { + entity_vdb_id: { + "content": entity_content, + "entity_name": node_id, + "source_id": node_source_id, + "entity_type": "UNKNOWN", + "file_path": node_file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: entities_vdb.upsert(payload), + operation_name="rebuild_added_entity_upsert", + entity_name=node_id, + max_retries=3, + retry_delay=0.1, + ) + await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data) # Update relationship in vector database + # Sort src and tgt to ensure consistent ordering (smaller string first) + if src > tgt: + src, tgt = tgt, src try: rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-") rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-") @@ -1388,7 +1571,7 @@ async def _rebuild_single_relationship( raise # Re-raise exception # Log rebuild completion with truncation info - status_message = f"Rebuild `{src} - {tgt}` from {len(chunk_ids)} chunks" + status_message = f"Rebuild `{src}`~`{tgt}` from {len(chunk_ids)} chunks" if truncation_info: status_message += f" ({truncation_info})" # Add truncation info from apply_source_ids_limit if truncation occurred @@ -1411,6 +1594,7 @@ async def _merge_nodes_then_upsert( entity_name: str, nodes_data: list[dict], knowledge_graph_inst: BaseGraphStorage, + entity_vdb: BaseVectorStorage | None, global_config: dict, pipeline_status: dict = None, pipeline_status_lock=None, @@ -1423,6 +1607,7 @@ async def _merge_nodes_then_upsert( already_description = [] already_file_paths = [] + # 1. Get existing node data from knowledge graph already_node = await knowledge_graph_inst.get_node(entity_name) if already_node: already_entity_types.append(already_node["entity_type"]) @@ -1430,16 +1615,6 @@ async def _merge_nodes_then_upsert( already_file_paths.extend(already_node["file_path"].split(GRAPH_FIELD_SEP)) already_description.extend(already_node["description"].split(GRAPH_FIELD_SEP)) - entity_type = sorted( - Counter( - [dp["entity_type"] for dp in nodes_data] + already_entity_types - ).items(), - key=lambda x: x[1], - reverse=True, - )[0][0] # Get the entity type with the highest count - - original_nodes_count = len(nodes_data) - new_source_ids = [dp["source_id"] for dp in nodes_data if dp.get("source_id")] existing_full_source_ids = [] @@ -1455,6 +1630,7 @@ async def _merge_nodes_then_upsert( chunk_id for chunk_id in already_source_ids if chunk_id ] + # 2. Merging new source ids with existing ones full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids) if entity_chunks_storage is not None and full_source_ids: @@ -1467,23 +1643,23 @@ async def _merge_nodes_then_upsert( } ) - limit_method = ( - global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP - ) + # 3. Finalize source_id by applying source ids limit + limit_method = global_config.get("source_ids_limit_method") + max_source_limit = global_config.get("max_source_ids_per_entity") source_ids = apply_source_ids_limit( full_source_ids, - global_config["max_source_ids_per_entity"], + max_source_limit, limit_method, identifier=f"`{entity_name}`", ) - # Only apply filtering in IGNORE_NEW mode + # 4. Only keep nodes not filter by apply_source_ids_limit if limit_method is KEEP if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: allowed_source_ids = set(source_ids) filtered_nodes = [] for dp in nodes_data: source_id = dp.get("source_id") - # Skip descriptions sourced from chunks dropped by the IGNORE_NEW cap + # Skip descriptions sourced from chunks dropped by the limitation cap if ( source_id and source_id not in allowed_source_ids @@ -1492,19 +1668,40 @@ async def _merge_nodes_then_upsert( continue filtered_nodes.append(dp) nodes_data = filtered_nodes - else: - # In FIFO mode, keep all node descriptions - truncation happens at source_ids level only + else: # In FIFO mode, keep all nodes - truncation happens at source_ids level only nodes_data = list(nodes_data) - max_source_limit = global_config["max_source_ids_per_entity"] - skip_summary_due_to_limit = ( + # 5. Check if we need to skip summary due to source_ids limit + if ( limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP and len(existing_full_source_ids) >= max_source_limit and not nodes_data - and already_description - ) + ): + if already_node: + logger.info( + f"Skipped `{entity_name}`: KEEP old chunks {already_source_ids}/{len(full_source_ids)}" + ) + existing_node_data = dict(already_node) + return existing_node_data + else: + logger.error(f"Internal Error: already_node missing for `{entity_name}`") + raise ValueError( + f"Internal Error: already_node missing for `{entity_name}`" + ) - # Deduplicate by description, keeping first occurrence + # 6.1 Finalize source_id + source_id = GRAPH_FIELD_SEP.join(source_ids) + + # 6.2 Finalize entity type by highest count + entity_type = sorted( + Counter( + [dp["entity_type"] for dp in nodes_data] + already_entity_types + ).items(), + key=lambda x: x[1], + reverse=True, + )[0][0] + + # 7. Deduplicate nodes by description, keeping first occurrence in the same document unique_nodes = {} for dp in nodes_data: desc = dp.get("description") @@ -1513,78 +1710,128 @@ async def _merge_nodes_then_upsert( if desc not in unique_nodes: unique_nodes[desc] = dp - # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same + # Sort description by timestamp, then by description length when timestamps are the same sorted_nodes = sorted( unique_nodes.values(), key=lambda x: (x.get("timestamp", 0), -len(x.get("description", ""))), ) sorted_descriptions = [dp["description"] for dp in sorted_nodes] - truncation_info = "" - dd_message = "" - # Combine already_description with sorted new sorted descriptions description_list = already_description + sorted_descriptions - deduplicated_num = original_nodes_count - len(sorted_descriptions) - if deduplicated_num > 0: - dd_message = f"dd:{deduplicated_num}" + if not description_list: + logger.error(f"Entity {entity_name} has no description") + raise ValueError(f"Entity {entity_name} has no description") + # Check for cancellation before LLM summary + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException("User cancelled during entity summary") + + # 8. Get summary description an LLM usage status + description, llm_was_used = await _handle_entity_relation_summary( + "Entity", + entity_name, + description_list, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache, + ) + + # 9. Build file_path within MAX_FILE_PATHS + file_paths_list = [] + seen_paths = set() + has_placeholder = False # Indicating file_path has been truncated before + + max_file_paths = global_config.get("max_file_paths", DEFAULT_MAX_FILE_PATHS) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + + # Collect from already_file_paths, excluding placeholder + for fp in already_file_paths: + if fp and fp.startswith(f"...{file_path_placeholder}"): # Skip placeholders + has_placeholder = True + continue + if fp and fp not in seen_paths: + file_paths_list.append(fp) + seen_paths.add(fp) + + # Collect from new data + for dp in nodes_data: + file_path_item = dp.get("file_path") + if file_path_item and file_path_item not in seen_paths: + file_paths_list.append(file_path_item) + seen_paths.add(file_path_item) + + # Apply count limit + if len(file_paths_list) > max_file_paths: + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + # Add + sign to indicate actual file count is higher + original_count_str = ( + f"{len(file_paths_list)}+" if has_placeholder else str(len(file_paths_list)) + ) + + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + file_paths_list.append(f"...{file_path_placeholder}...(FIFO)") + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + file_paths_list.append(f"...{file_path_placeholder}...(KEEP Old)") + + logger.info( + f"Limited `{entity_name}`: file_path {original_count_str} -> {max_file_paths} ({limit_method})" + ) + # Finalize file_path + file_path = GRAPH_FIELD_SEP.join(file_paths_list) + + # 10.Log based on actual LLM usage num_fragment = len(description_list) already_fragment = len(already_description) - if skip_summary_due_to_limit: - description = ( - already_node.get("description", "(no description)") - if already_node - else "(no description)" + if llm_was_used: + status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" + else: + status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" + + truncation_info = truncation_info_log = "" + if len(source_ids) < len(full_source_ids): + # Add truncation info from apply_source_ids_limit if truncation occurred + truncation_info_log = f"{limit_method} {len(source_ids)}/{len(full_source_ids)}" + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + truncation_info = truncation_info_log + else: + truncation_info = "KEEP Old" + + deduplicated_num = already_fragment + len(nodes_data) - num_fragment + dd_message = "" + if deduplicated_num > 0: + # Duplicated description detected across multiple trucks for the same entity + dd_message = f"dd {deduplicated_num}" + + if dd_message or truncation_info_log: + status_message += ( + f" ({', '.join(filter(None, [truncation_info_log, dd_message]))})" ) - llm_was_used = False - status_message = f"Skip merge for `{entity_name}`: IGNORE_NEW limit reached" - logger.debug(status_message) + + # Add message to pipeline satus when merge happens + if already_fragment > 0 or llm_was_used: + logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: pipeline_status["latest_message"] = status_message pipeline_status["history_messages"].append(status_message) - elif num_fragment > 0: - # Get summary and LLM usage status - description, llm_was_used = await _handle_entity_relation_summary( - "Entity", - entity_name, - description_list, - GRAPH_FIELD_SEP, - global_config, - llm_response_cache, - ) - - # Log based on actual LLM usage - if llm_was_used: - status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" - else: - status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" - - # Add truncation info from apply_source_ids_limit if truncation occurred - if len(source_ids) < len(full_source_ids): - truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}" - - if dd_message or truncation_info: - status_message += f"({','.join([truncation_info, dd_message])})" - - if already_fragment > 0 or llm_was_used: - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - else: - logger.debug(status_message) - else: - logger.error(f"Entity {entity_name} has no description") - description = "(no description)" - - source_id = GRAPH_FIELD_SEP.join(source_ids) - - file_path = build_file_path(already_file_paths, nodes_data, entity_name) + logger.debug(status_message) + # 11. Update both graph and vector db node_data = dict( entity_id=entity_name, entity_type=entity_type, @@ -1599,6 +1846,25 @@ async def _merge_nodes_then_upsert( node_data=node_data, ) node_data["entity_name"] = entity_name + if entity_vdb is not None: + entity_vdb_id = compute_mdhash_id(str(entity_name), prefix="ent-") + entity_content = f"{entity_name}\n{description}" + data_for_vdb = { + entity_vdb_id: { + "entity_name": entity_name, + "entity_type": entity_type, + "content": entity_content, + "source_id": source_id, + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=data_for_vdb: entity_vdb.upsert(payload), + operation_name="entity_upsert", + entity_name=entity_name, + max_retries=3, + retry_delay=0.1, + ) return node_data @@ -1607,22 +1873,27 @@ async def _merge_edges_then_upsert( tgt_id: str, edges_data: list[dict], knowledge_graph_inst: BaseGraphStorage, + relationships_vdb: BaseVectorStorage | None, + entity_vdb: BaseVectorStorage | None, global_config: dict, pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, added_entities: list = None, # New parameter to track entities added during edge processing relation_chunks_storage: BaseKVStorage | None = None, + entity_chunks_storage: BaseKVStorage | None = None, ): if src_id == tgt_id: return None + already_edge = None already_weights = [] already_source_ids = [] already_description = [] already_keywords = [] already_file_paths = [] + # 1. Get existing edge data from graph storage if await knowledge_graph_inst.has_edge(src_id, tgt_id): already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id) # Handle the case where get_edge returns None or missing fields @@ -1656,8 +1927,6 @@ async def _merge_edges_then_upsert( ) ) - original_edges_count = len(edges_data) - new_source_ids = [dp["source_id"] for dp in edges_data if dp.get("source_id")] storage_key = make_relation_chunk_key(src_id, tgt_id) @@ -1674,6 +1943,7 @@ async def _merge_edges_then_upsert( chunk_id for chunk_id in already_source_ids if chunk_id ] + # 2. Merge new source ids with existing ones full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids) if relation_chunks_storage is not None and full_source_ids: @@ -1686,23 +1956,26 @@ async def _merge_edges_then_upsert( } ) + # 3. Finalize source_id by applying source ids limit + limit_method = global_config.get("source_ids_limit_method") + max_source_limit = global_config.get("max_source_ids_per_relation") source_ids = apply_source_ids_limit( full_source_ids, - global_config["max_source_ids_per_relation"], - global_config.get("source_ids_limit_method"), + max_source_limit, + limit_method, identifier=f"`{src_id}`~`{tgt_id}`", ) limit_method = ( global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP ) - # Only apply filtering in IGNORE_NEW mode + # 4. Only keep edges with source_id in the final source_ids list if in KEEP mode if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: allowed_source_ids = set(source_ids) filtered_edges = [] for dp in edges_data: source_id = dp.get("source_id") - # Skip relationship fragments sourced from chunks dropped by the IGNORE_NEW cap + # Skip relationship fragments sourced from chunks dropped by keep oldest cap if ( source_id and source_id not in allowed_source_ids @@ -1711,22 +1984,51 @@ async def _merge_edges_then_upsert( continue filtered_edges.append(dp) edges_data = filtered_edges - else: - # In FIFO mode, keep all edge descriptions - truncation happens at source_ids level only + else: # In FIFO mode, keep all edges - truncation happens at source_ids level only edges_data = list(edges_data) - max_source_limit = global_config["max_source_ids_per_relation"] - skip_summary_due_to_limit = ( + # 5. Check if we need to skip summary due to source_ids limit + if ( limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP and len(existing_full_source_ids) >= max_source_limit and not edges_data - and already_description - ) + ): + if already_edge: + logger.info( + f"Skipped `{src_id}`~`{tgt_id}`: KEEP old chunks {already_source_ids}/{len(full_source_ids)}" + ) + existing_edge_data = dict(already_edge) + return existing_edge_data + else: + logger.error( + f"Internal Error: already_node missing for `{src_id}`~`{tgt_id}`" + ) + raise ValueError( + f"Internal Error: already_node missing for `{src_id}`~`{tgt_id}`" + ) - # Process edges_data with None checks + # 6.1 Finalize source_id + source_id = GRAPH_FIELD_SEP.join(source_ids) + + # 6.2 Finalize weight by summing new edges and existing weights weight = sum([dp["weight"] for dp in edges_data] + already_weights) - # Deduplicate by description, keeping first occurrence + # 6.2 Finalize keywords by merging existing and new keywords + all_keywords = set() + # Process already_keywords (which are comma-separated) + for keyword_str in already_keywords: + if keyword_str: # Skip empty strings + all_keywords.update(k.strip() for k in keyword_str.split(",") if k.strip()) + # Process new keywords from edges_data + for edge in edges_data: + if edge.get("keywords"): + all_keywords.update( + k.strip() for k in edge["keywords"].split(",") if k.strip() + ) + # Join all unique keywords with commas + keywords = ",".join(sorted(all_keywords)) + + # 7. Deduplicate by description, keeping first occurrence in the same document unique_edges = {} for dp in edges_data: description_value = dp.get("description") @@ -1742,101 +2044,178 @@ async def _merge_edges_then_upsert( ) sorted_descriptions = [dp["description"] for dp in sorted_edges] - truncation_info = "" - dd_message = "" - # Combine already_description with sorted new descriptions description_list = already_description + sorted_descriptions - deduplicated_num = original_edges_count - len(sorted_descriptions) - if deduplicated_num > 0: - dd_message = f"dd:{deduplicated_num}" + if not description_list: + logger.error(f"Relation {src_id}~{tgt_id} has no description") + raise ValueError(f"Relation {src_id}~{tgt_id} has no description") + # Check for cancellation before LLM summary + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during relation summary" + ) + + # 8. Get summary description an LLM usage status + description, llm_was_used = await _handle_entity_relation_summary( + "Relation", + f"({src_id}, {tgt_id})", + description_list, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache, + ) + + # 9. Build file_path within MAX_FILE_PATHS limit + file_paths_list = [] + seen_paths = set() + has_placeholder = False # Track if already_file_paths contains placeholder + + max_file_paths = global_config.get("max_file_paths", DEFAULT_MAX_FILE_PATHS) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + + # Collect from already_file_paths, excluding placeholder + for fp in already_file_paths: + # Check if this is a placeholder record + if fp and fp.startswith(f"...{file_path_placeholder}"): # Skip placeholders + has_placeholder = True + continue + if fp and fp not in seen_paths: + file_paths_list.append(fp) + seen_paths.add(fp) + + # Collect from new data + for dp in edges_data: + file_path_item = dp.get("file_path") + if file_path_item and file_path_item not in seen_paths: + file_paths_list.append(file_path_item) + seen_paths.add(file_path_item) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + + if len(file_paths_list) > max_file_paths: + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + + # Add + sign to indicate actual file count is higher + original_count_str = ( + f"{len(file_paths_list)}+" if has_placeholder else str(len(file_paths_list)) + ) + + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + file_paths_list.append(f"...{file_path_placeholder}...(FIFO)") + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + file_paths_list.append(f"...{file_path_placeholder}...(KEEP Old)") + + logger.info( + f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count_str} -> {max_file_paths} ({limit_method})" + ) + # Finalize file_path + file_path = GRAPH_FIELD_SEP.join(file_paths_list) + + # 10. Log based on actual LLM usage num_fragment = len(description_list) already_fragment = len(already_description) + if llm_was_used: + status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" + else: + status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" - if skip_summary_due_to_limit: - description = ( - already_edge.get("description", "(no description)") - if already_edge - else "(no description)" + truncation_info = truncation_info_log = "" + if len(source_ids) < len(full_source_ids): + # Add truncation info from apply_source_ids_limit if truncation occurred + truncation_info_log = f"{limit_method} {len(source_ids)}/{len(full_source_ids)}" + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + truncation_info = truncation_info_log + else: + truncation_info = "KEEP Old" + + deduplicated_num = already_fragment + len(edges_data) - num_fragment + dd_message = "" + if deduplicated_num > 0: + # Duplicated description detected across multiple trucks for the same entity + dd_message = f"dd {deduplicated_num}" + + if dd_message or truncation_info_log: + status_message += ( + f" ({', '.join(filter(None, [truncation_info_log, dd_message]))})" ) - llm_was_used = False - status_message = ( - f"Skip merge for `{src_id}`~`{tgt_id}`: IGNORE_NEW limit reached" - ) - logger.debug(status_message) + + # Add message to pipeline satus when merge happens + if already_fragment > 0 or llm_was_used: + logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: pipeline_status["latest_message"] = status_message pipeline_status["history_messages"].append(status_message) - elif num_fragment > 0: - # Get summary and LLM usage status - description, llm_was_used = await _handle_entity_relation_summary( - "Relation", - f"({src_id}, {tgt_id})", - description_list, - GRAPH_FIELD_SEP, - global_config, - llm_response_cache, - ) - - # Log based on actual LLM usage - if llm_was_used: - status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" - else: - status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" - - # Add truncation info from apply_source_ids_limit if truncation occurred - if len(source_ids) < len(full_source_ids): - truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}" - - if dd_message or truncation_info: - status_message += f"({','.join([truncation_info, dd_message])})" - - if already_fragment > 0 or llm_was_used: - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - else: - logger.debug(status_message) - else: - logger.error(f"Edge {src_id} - {tgt_id} has no description") - description = "(no description)" - - # Split all existing and new keywords into individual terms, then combine and deduplicate - all_keywords = set() - # Process already_keywords (which are comma-separated) - for keyword_str in already_keywords: - if keyword_str: # Skip empty strings - all_keywords.update(k.strip() for k in keyword_str.split(",") if k.strip()) - # Process new keywords from edges_data - for edge in edges_data: - if edge.get("keywords"): - all_keywords.update( - k.strip() for k in edge["keywords"].split(",") if k.strip() - ) - # Join all unique keywords with commas - keywords = ",".join(sorted(all_keywords)) - - source_id = GRAPH_FIELD_SEP.join(source_ids) - file_path = build_file_path(already_file_paths, edges_data, f"{src_id}-{tgt_id}") + logger.debug(status_message) + # 11. Update both graph and vector db for need_insert_id in [src_id, tgt_id]: - if not (await knowledge_graph_inst.has_node(need_insert_id)): + # Optimization: Use get_node instead of has_node + get_node + existing_node = await knowledge_graph_inst.get_node(need_insert_id) + + if existing_node is None: + # Node doesn't exist - create new node + node_created_at = int(time.time()) node_data = { "entity_id": need_insert_id, "source_id": source_id, "description": description, "entity_type": "UNKNOWN", "file_path": file_path, - "created_at": int(time.time()), + "created_at": node_created_at, "truncate": "", } await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data) + # Update entity_chunks_storage for the newly created entity + if entity_chunks_storage is not None: + chunk_ids = [chunk_id for chunk_id in full_source_ids if chunk_id] + if chunk_ids: + await entity_chunks_storage.upsert( + { + need_insert_id: { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } + } + ) + + if entity_vdb is not None: + entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-") + entity_content = f"{need_insert_id}\n{description}" + vdb_data = { + entity_vdb_id: { + "content": entity_content, + "entity_name": need_insert_id, + "source_id": source_id, + "entity_type": "UNKNOWN", + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: entity_vdb.upsert(payload), + operation_name="added_entity_upsert", + entity_name=need_insert_id, + max_retries=3, + retry_delay=0.1, + ) + # Track entities added during edge processing if added_entities is not None: entity_data = { @@ -1845,10 +2224,114 @@ async def _merge_edges_then_upsert( "description": description, "source_id": source_id, "file_path": file_path, - "created_at": int(time.time()), + "created_at": node_created_at, } added_entities.append(entity_data) + else: + # Node exists - update its source_ids by merging with new source_ids + updated = False # Track if any update occurred + # 1. Get existing full source_ids from entity_chunks_storage + existing_full_source_ids = [] + if entity_chunks_storage is not None: + stored_chunks = await entity_chunks_storage.get_by_id(need_insert_id) + if stored_chunks and isinstance(stored_chunks, dict): + existing_full_source_ids = [ + chunk_id + for chunk_id in stored_chunks.get("chunk_ids", []) + if chunk_id + ] + + # If not in entity_chunks_storage, get from graph database + if not existing_full_source_ids: + if existing_node.get("source_id"): + existing_full_source_ids = existing_node["source_id"].split( + GRAPH_FIELD_SEP + ) + + # 2. Merge with new source_ids from this relationship + new_source_ids_from_relation = [ + chunk_id for chunk_id in source_ids if chunk_id + ] + merged_full_source_ids = merge_source_ids( + existing_full_source_ids, new_source_ids_from_relation + ) + + # 3. Save merged full list to entity_chunks_storage (conditional) + if ( + entity_chunks_storage is not None + and merged_full_source_ids != existing_full_source_ids + ): + updated = True + await entity_chunks_storage.upsert( + { + need_insert_id: { + "chunk_ids": merged_full_source_ids, + "count": len(merged_full_source_ids), + } + } + ) + + # 4. Apply source_ids limit for graph and vector db + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + max_source_limit = global_config.get("max_source_ids_per_entity") + limited_source_ids = apply_source_ids_limit( + merged_full_source_ids, + max_source_limit, + limit_method, + identifier=f"`{need_insert_id}`", + ) + + # 5. Update graph database and vector database with limited source_ids (conditional) + limited_source_id_str = GRAPH_FIELD_SEP.join(limited_source_ids) + + if limited_source_id_str != existing_node.get("source_id", ""): + updated = True + updated_node_data = { + **existing_node, + "source_id": limited_source_id_str, + } + await knowledge_graph_inst.upsert_node( + need_insert_id, node_data=updated_node_data + ) + + # Update vector database + if entity_vdb is not None: + entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-") + entity_content = ( + f"{need_insert_id}\n{existing_node.get('description', '')}" + ) + vdb_data = { + entity_vdb_id: { + "content": entity_content, + "entity_name": need_insert_id, + "source_id": limited_source_id_str, + "entity_type": existing_node.get("entity_type", "UNKNOWN"), + "file_path": existing_node.get( + "file_path", "unknown_source" + ), + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: entity_vdb.upsert(payload), + operation_name="existing_entity_update", + entity_name=need_insert_id, + max_retries=3, + retry_delay=0.1, + ) + + # 6. Log once at the end if any update occurred + if updated: + status_message = f"Chunks appended from relation: `{need_insert_id}`" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + + edge_created_at = int(time.time()) await knowledge_graph_inst.upsert_edge( src_id, tgt_id, @@ -1858,7 +2341,7 @@ async def _merge_edges_then_upsert( keywords=keywords, source_id=source_id, file_path=file_path, - created_at=int(time.time()), + created_at=edge_created_at, truncate=truncation_info, ), ) @@ -1870,10 +2353,45 @@ async def _merge_edges_then_upsert( keywords=keywords, source_id=source_id, file_path=file_path, - created_at=int(time.time()), + created_at=edge_created_at, truncate=truncation_info, + weight=weight, ) + # Sort src_id and tgt_id to ensure consistent ordering (smaller string first) + if src_id > tgt_id: + src_id, tgt_id = tgt_id, src_id + + if relationships_vdb is not None: + rel_vdb_id = compute_mdhash_id(src_id + tgt_id, prefix="rel-") + rel_vdb_id_reverse = compute_mdhash_id(tgt_id + src_id, prefix="rel-") + try: + await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse]) + except Exception as e: + logger.debug( + f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}" + ) + rel_content = f"{keywords}\t{src_id}\n{tgt_id}\n{description}" + vdb_data = { + rel_vdb_id: { + "src_id": src_id, + "tgt_id": tgt_id, + "source_id": source_id, + "content": rel_content, + "keywords": keywords, + "description": description, + "weight": weight, + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: relationships_vdb.upsert(payload), + operation_name="relationship_upsert", + entity_name=f"{src_id}-{tgt_id}", + max_retries=3, + retry_delay=0.2, + ) + return edge_data @@ -1921,6 +2439,12 @@ async def merge_nodes_and_edges( file_path: File path for logging """ + # Check for cancellation at the start of merge + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException("User cancelled during merge phase") + # Collect all nodes and edges from all chunks all_nodes = defaultdict(list) all_edges = defaultdict(list) @@ -1957,18 +2481,26 @@ async def merge_nodes_and_edges( async def _locked_process_entity_name(entity_name, entities): async with semaphore: + # Check for cancellation before processing entity + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during entity merge" + ) + workspace = global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" async with get_storage_keyed_lock( [entity_name], namespace=namespace, enable_logging=False ): try: - logger.debug(f"Inserting {entity_name} in Graph") - # Graph database operation (critical path, must succeed) + logger.debug(f"Processing entity {entity_name}") entity_data = await _merge_nodes_then_upsert( entity_name, entities, knowledge_graph_inst, + entity_vdb, global_config, pipeline_status, pipeline_status_lock, @@ -1976,39 +2508,10 @@ async def merge_nodes_and_edges( entity_chunks_storage, ) - # Vector database operation (equally critical, must succeed) - if entity_vdb is not None and entity_data: - data_for_vdb = { - compute_mdhash_id( - str(entity_data["entity_name"]), prefix="ent-" - ): { - "entity_name": entity_data["entity_name"], - "entity_type": entity_data["entity_type"], - "content": f"{entity_data['entity_name']}\n{entity_data['description']}", - "source_id": entity_data["source_id"], - "file_path": entity_data.get( - "file_path", "unknown_source" - ), - } - } - - logger.debug(f"Inserting {entity_name} in Graph") - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda: entity_vdb.upsert(data_for_vdb), - operation_name="entity_upsert", - entity_name=entity_name, - max_retries=3, - retry_delay=0.1, - ) - return entity_data except Exception as e: - # Any database operation failure is critical - error_msg = ( - f"Critical error in entity processing for `{entity_name}`: {e}" - ) + error_msg = f"Error processing entity `{entity_name}`: {e}" logger.error(error_msg) # Try to update pipeline status, but don't let status update failure affect main exception @@ -2044,36 +2547,32 @@ async def merge_nodes_and_edges( entity_tasks, return_when=asyncio.FIRST_EXCEPTION ) - # Check if any task raised an exception and ensure all exceptions are retrieved first_exception = None - successful_results = [] + processed_entities = [] for task in done: try: - exception = task.exception() - if exception is not None: - if first_exception is None: - first_exception = exception - else: - successful_results.append(task.result()) - except Exception as e: + result = task.result() + except BaseException as e: if first_exception is None: first_exception = e + else: + processed_entities.append(result) + + if pending: + for task in pending: + task.cancel() + pending_results = await asyncio.gather(*pending, return_exceptions=True) + for result in pending_results: + if isinstance(result, BaseException): + if first_exception is None: + first_exception = result + else: + processed_entities.append(result) - # If any task failed, cancel all pending tasks and raise the first exception if first_exception is not None: - # Cancel all pending tasks - for pending_task in pending: - pending_task.cancel() - # Wait for cancellation to complete - if pending: - await asyncio.wait(pending) - # Re-raise the first exception to notify the caller raise first_exception - # If all tasks completed successfully, collect results - processed_entities = [task.result() for task in entity_tasks] - # ===== Phase 2: Process all relationships concurrently ===== log_message = f"Phase 2: Processing {total_relations_count} relations from {doc_id} (async: {graph_max_async})" logger.info(log_message) @@ -2083,6 +2582,14 @@ async def merge_nodes_and_edges( async def _locked_process_edges(edge_key, edges): async with semaphore: + # Check for cancellation before processing edges + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during relation merge" + ) + workspace = global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" sorted_edge_key = sorted([edge_key[0], edge_key[1]]) @@ -2095,84 +2602,30 @@ async def merge_nodes_and_edges( try: added_entities = [] # Track entities added during edge processing - # Graph database operation (critical path, must succeed) + logger.debug(f"Processing relation {sorted_edge_key}") edge_data = await _merge_edges_then_upsert( edge_key[0], edge_key[1], edges, knowledge_graph_inst, + relationships_vdb, + entity_vdb, global_config, pipeline_status, pipeline_status_lock, llm_response_cache, added_entities, # Pass list to collect added entities relation_chunks_storage, + entity_chunks_storage, # Add entity_chunks_storage parameter ) if edge_data is None: return None, [] - # Vector database operation (equally critical, must succeed) - if relationships_vdb is not None: - data_for_vdb = { - compute_mdhash_id( - edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-" - ): { - "src_id": edge_data["src_id"], - "tgt_id": edge_data["tgt_id"], - "keywords": edge_data["keywords"], - "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", - "source_id": edge_data["source_id"], - "file_path": edge_data.get( - "file_path", "unknown_source" - ), - "weight": edge_data.get("weight", 1.0), - } - } - - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda: relationships_vdb.upsert(data_for_vdb), - operation_name="relationship_upsert", - entity_name=f"{edge_data['src_id']}-{edge_data['tgt_id']}", - max_retries=3, - retry_delay=0.1, - ) - - # Update added_entities to entity vector database using safe operation wrapper - if added_entities and entity_vdb is not None: - for entity_data in added_entities: - entity_vdb_id = compute_mdhash_id( - entity_data["entity_name"], prefix="ent-" - ) - entity_content = f"{entity_data['entity_name']}\n{entity_data['description']}" - - vdb_data = { - entity_vdb_id: { - "content": entity_content, - "entity_name": entity_data["entity_name"], - "source_id": entity_data["source_id"], - "entity_type": entity_data["entity_type"], - "file_path": entity_data.get( - "file_path", "unknown_source" - ), - } - } - - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda data=vdb_data: entity_vdb.upsert(data), - operation_name="added_entity_upsert", - entity_name=entity_data["entity_name"], - max_retries=3, - retry_delay=0.1, - ) - return edge_data, added_entities except Exception as e: - # Any database operation failure is critical - error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}" + error_msg = f"Error processing relation `{sorted_edge_key}`: {e}" logger.error(error_msg) # Try to update pipeline status, but don't let status update failure affect main exception @@ -2210,40 +2663,36 @@ async def merge_nodes_and_edges( edge_tasks, return_when=asyncio.FIRST_EXCEPTION ) - # Check if any task raised an exception and ensure all exceptions are retrieved first_exception = None - successful_results = [] for task in done: try: - exception = task.exception() - if exception is not None: - if first_exception is None: - first_exception = exception - else: - successful_results.append(task.result()) - except Exception as e: + edge_data, added_entities = task.result() + except BaseException as e: if first_exception is None: first_exception = e + else: + if edge_data is not None: + processed_edges.append(edge_data) + all_added_entities.extend(added_entities) + + if pending: + for task in pending: + task.cancel() + pending_results = await asyncio.gather(*pending, return_exceptions=True) + for result in pending_results: + if isinstance(result, BaseException): + if first_exception is None: + first_exception = result + else: + edge_data, added_entities = result + if edge_data is not None: + processed_edges.append(edge_data) + all_added_entities.extend(added_entities) - # If any task failed, cancel all pending tasks and raise the first exception if first_exception is not None: - # Cancel all pending tasks - for pending_task in pending: - pending_task.cancel() - # Wait for cancellation to complete - if pending: - await asyncio.wait(pending) - # Re-raise the first exception to notify the caller raise first_exception - # If all tasks completed successfully, collect results - for task in edge_tasks: - edge_data, added_entities = task.result() - if edge_data is not None: - processed_edges.append(edge_data) - all_added_entities.extend(added_entities) - # ===== Phase 3: Update full_entities and full_relations storage ===== if full_entities_storage and full_relations_storage and doc_id: try: @@ -2324,6 +2773,14 @@ async def extract_entities( llm_response_cache: BaseKVStorage | None = None, text_chunks_storage: BaseKVStorage | None = None, ) -> list: + # Check for cancellation at the start of entity extraction + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during entity extraction" + ) + use_llm_func: callable = global_config["llm_model_func"] entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"] @@ -2491,6 +2948,14 @@ async def extract_entities( async def _process_with_semaphore(chunk): async with semaphore: + # Check for cancellation before processing chunk + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during chunk processing" + ) + try: return await _process_single_content(chunk) except Exception as e: @@ -2988,10 +3453,10 @@ async def _perform_kg_search( ) query_embedding = None if query and (kg_chunk_pick_method == "VECTOR" or chunks_vdb): - embedding_func_config = text_chunks_db.embedding_func - if embedding_func_config and embedding_func_config.func: + actual_embedding_func = text_chunks_db.embedding_func + if actual_embedding_func: try: - query_embedding = await embedding_func_config.func([query]) + query_embedding = await actual_embedding_func([query]) query_embedding = query_embedding[ 0 ] # Extract first embedding from batch result @@ -3395,7 +3860,7 @@ async def _merge_all_chunks( return merged_chunks -async def _build_llm_context( +async def _build_context_str( entities_context: list[dict], relations_context: list[dict], merged_chunks: list[dict], @@ -3495,23 +3960,32 @@ async def _build_llm_context( truncated_chunks ) - # Rebuild text_units_context with truncated chunks + # Rebuild chunks_context with truncated chunks # The actual tokens may be slightly less than available_chunk_tokens due to deduplication logic - text_units_context = [] + chunks_context = [] for i, chunk in enumerate(truncated_chunks): - text_units_context.append( + chunks_context.append( { "reference_id": chunk["reference_id"], "content": chunk["content"], } ) + text_units_str = "\n".join( + json.dumps(text_unit, ensure_ascii=False) for text_unit in chunks_context + ) + reference_list_str = "\n".join( + f"[{ref['reference_id']}] {ref['file_path']}" + for ref in reference_list + if ref["reference_id"] + ) + logger.info( - f"Final context: {len(entities_context)} entities, {len(relations_context)} relations, {len(text_units_context)} chunks" + f"Final context: {len(entities_context)} entities, {len(relations_context)} relations, {len(chunks_context)} chunks" ) # not necessary to use LLM to generate a response - if not entities_context and not relations_context: + if not entities_context and not relations_context and not chunks_context: # Return empty raw data structure when no entities/relations empty_raw_data = convert_to_user_format( [], @@ -3542,15 +4016,6 @@ async def _build_llm_context( if chunk_tracking_log: logger.info(f"Final chunks S+F/O: {' '.join(chunk_tracking_log)}") - text_units_str = "\n".join( - json.dumps(text_unit, ensure_ascii=False) for text_unit in text_units_context - ) - reference_list_str = "\n".join( - f"[{ref['reference_id']}] {ref['file_path']}" - for ref in reference_list - if ref["reference_id"] - ) - result = kg_context_template.format( entities_str=entities_str, relations_str=relations_str, @@ -3560,7 +4025,7 @@ async def _build_llm_context( # Always return both context and complete data structure (unified approach) logger.debug( - f"[_build_llm_context] Converting to user format: {len(entities_context)} entities, {len(relations_context)} relations, {len(truncated_chunks)} chunks" + f"[_build_context_str] Converting to user format: {len(entities_context)} entities, {len(relations_context)} relations, {len(truncated_chunks)} chunks" ) final_data = convert_to_user_format( entities_context, @@ -3572,7 +4037,7 @@ async def _build_llm_context( relation_id_to_original, ) logger.debug( - f"[_build_llm_context] Final data after conversion: {len(final_data.get('entities', []))} entities, {len(final_data.get('relationships', []))} relationships, {len(final_data.get('chunks', []))} chunks" + f"[_build_context_str] Final data after conversion: {len(final_data.get('entities', []))} entities, {len(final_data.get('relationships', []))} relationships, {len(final_data.get('chunks', []))} chunks" ) return result, final_data @@ -3649,8 +4114,8 @@ async def _build_query_context( return None # Stage 4: Build final LLM context with dynamic token processing - # _build_llm_context now always returns tuple[str, dict] - context, raw_data = await _build_llm_context( + # _build_context_str now always returns tuple[str, dict] + context, raw_data = await _build_context_str( entities_context=truncation_result["entities_context"], relations_context=truncation_result["relations_context"], merged_chunks=merged_chunks, @@ -3899,25 +4364,21 @@ async def _find_related_text_unit_from_entities( num_of_chunks = int(max_related_chunks * len(entities_with_chunks) / 2) # Get embedding function from global config - embedding_func_config = text_chunks_db.embedding_func - if not embedding_func_config: + actual_embedding_func = text_chunks_db.embedding_func + if not actual_embedding_func: logger.warning("No embedding function found, falling back to WEIGHT method") kg_chunk_pick_method = "WEIGHT" else: try: - actual_embedding_func = embedding_func_config.func - - selected_chunk_ids = None - if actual_embedding_func: - selected_chunk_ids = await pick_by_vector_similarity( - query=query, - text_chunks_storage=text_chunks_db, - chunks_vdb=chunks_vdb, - num_of_chunks=num_of_chunks, - entity_info=entities_with_chunks, - embedding_func=actual_embedding_func, - query_embedding=query_embedding, - ) + selected_chunk_ids = await pick_by_vector_similarity( + query=query, + text_chunks_storage=text_chunks_db, + chunks_vdb=chunks_vdb, + num_of_chunks=num_of_chunks, + entity_info=entities_with_chunks, + embedding_func=actual_embedding_func, + query_embedding=query_embedding, + ) if selected_chunk_ids == []: kg_chunk_pick_method = "WEIGHT" @@ -4192,24 +4653,21 @@ async def _find_related_text_unit_from_relations( num_of_chunks = int(max_related_chunks * len(relations_with_chunks) / 2) # Get embedding function from global config - embedding_func_config = text_chunks_db.embedding_func - if not embedding_func_config: + actual_embedding_func = text_chunks_db.embedding_func + if not actual_embedding_func: logger.warning("No embedding function found, falling back to WEIGHT method") kg_chunk_pick_method = "WEIGHT" else: try: - actual_embedding_func = embedding_func_config.func - - if actual_embedding_func: - selected_chunk_ids = await pick_by_vector_similarity( - query=query, - text_chunks_storage=text_chunks_db, - chunks_vdb=chunks_vdb, - num_of_chunks=num_of_chunks, - entity_info=relations_with_chunks, - embedding_func=actual_embedding_func, - query_embedding=query_embedding, - ) + selected_chunk_ids = await pick_by_vector_similarity( + query=query, + text_chunks_storage=text_chunks_db, + chunks_vdb=chunks_vdb, + num_of_chunks=num_of_chunks, + entity_info=relations_with_chunks, + embedding_func=actual_embedding_func, + query_embedding=query_embedding, + ) if selected_chunk_ids == []: kg_chunk_pick_method = "WEIGHT" @@ -4423,10 +4881,10 @@ async def naive_query( "final_chunks_count": len(processed_chunks_with_ref_ids), } - # Build text_units_context from processed chunks with reference IDs - text_units_context = [] + # Build chunks_context from processed chunks with reference IDs + chunks_context = [] for i, chunk in enumerate(processed_chunks_with_ref_ids): - text_units_context.append( + chunks_context.append( { "reference_id": chunk["reference_id"], "content": chunk["content"], @@ -4434,7 +4892,7 @@ async def naive_query( ) text_units_str = "\n".join( - json.dumps(text_unit, ensure_ascii=False) for text_unit in text_units_context + json.dumps(text_unit, ensure_ascii=False) for text_unit in chunks_context ) reference_list_str = "\n".join( f"[{ref['reference_id']}] {ref['file_path']}" diff --git a/tests/test_chunking.py b/tests/test_chunking.py index c8fefafb..0650adc2 100644 --- a/tests/test_chunking.py +++ b/tests/test_chunking.py @@ -17,11 +17,6 @@ def make_tokenizer() -> Tokenizer: return Tokenizer(model_name="dummy", tokenizer=DummyTokenizer()) -# ============================================================================ -# Tests for split_by_character_only=True (raises error on oversized chunks) -# ============================================================================ - - @pytest.mark.offline def test_split_by_character_only_within_limit(): """Test chunking when all chunks are within token limit.""" @@ -116,405 +111,3 @@ def test_split_by_character_only_one_over_limit(): err = excinfo.value assert err.chunk_tokens == 11 assert err.chunk_token_limit == 10 - - -# ============================================================================ -# Tests for split_by_character_only=False (recursive splitting) -# ============================================================================ - - -@pytest.mark.offline -def test_split_recursive_oversized_chunk(): - """Test recursive splitting of oversized chunk with split_by_character_only=False.""" - tokenizer = make_tokenizer() - # 30 chars - should split into chunks of size 10 - oversized = "a" * 30 - - chunks = chunking_by_token_size( - tokenizer, - oversized, - split_by_character="\n\n", - split_by_character_only=False, - chunk_token_size=10, - chunk_overlap_token_size=0, - ) - - # Should create 3 chunks of 10 tokens each - assert len(chunks) == 3 - assert all(chunk["tokens"] == 10 for chunk in chunks) - assert all(chunk["content"] == "a" * 10 for chunk in chunks) - - -@pytest.mark.offline -def test_split_with_chunk_overlap(): - """Test chunk splitting with overlap.""" - tokenizer = make_tokenizer() - # 25 chars - content = "a" * 25 - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character="\n\n", - split_by_character_only=False, - chunk_token_size=10, - chunk_overlap_token_size=3, - ) - - # With overlap of 3, chunks start at: 0, 7, 14, 21 - # Chunk 1: [0:10] = 10 tokens - # Chunk 2: [7:17] = 10 tokens - # Chunk 3: [14:24] = 10 tokens - # Chunk 4: [21:25] = 4 tokens - assert len(chunks) == 4 - assert chunks[0]["tokens"] == 10 - assert chunks[1]["tokens"] == 10 - assert chunks[2]["tokens"] == 10 - assert chunks[3]["tokens"] == 4 - - -@pytest.mark.offline -def test_split_multiple_chunks_with_mixed_sizes(): - """Test splitting text with multiple chunks of different sizes.""" - tokenizer = make_tokenizer() - # "small\n\nlarge_chunk_here\n\nmedium" - # small: 5 tokens, large_chunk_here: 16 tokens, medium: 6 tokens - content = "small\n\n" + "a" * 16 + "\n\nmedium" - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character="\n\n", - split_by_character_only=False, - chunk_token_size=10, - chunk_overlap_token_size=2, - ) - - # First chunk "small" should be kept as is (5 tokens) - # Second chunk (16 tokens) should be split into 2 chunks - # Third chunk "medium" should be kept as is (6 tokens) - assert len(chunks) == 4 - assert chunks[0]["content"] == "small" - assert chunks[0]["tokens"] == 5 - - -@pytest.mark.offline -def test_split_exact_boundary(): - """Test splitting at exact chunk boundaries.""" - tokenizer = make_tokenizer() - # Exactly 20 chars, should split into 2 chunks of 10 - content = "a" * 20 - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character="\n\n", - split_by_character_only=False, - chunk_token_size=10, - chunk_overlap_token_size=0, - ) - - assert len(chunks) == 2 - assert chunks[0]["tokens"] == 10 - assert chunks[1]["tokens"] == 10 - - -@pytest.mark.offline -def test_split_very_large_text(): - """Test splitting very large text into multiple chunks.""" - tokenizer = make_tokenizer() - # 100 chars should create 10 chunks with chunk_size=10, overlap=0 - content = "a" * 100 - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character="\n\n", - split_by_character_only=False, - chunk_token_size=10, - chunk_overlap_token_size=0, - ) - - assert len(chunks) == 10 - assert all(chunk["tokens"] == 10 for chunk in chunks) - - -# ============================================================================ -# Edge Cases -# ============================================================================ - - -@pytest.mark.offline -def test_empty_content(): - """Test chunking with empty content.""" - tokenizer = make_tokenizer() - - chunks = chunking_by_token_size( - tokenizer, - "", - split_by_character="\n\n", - split_by_character_only=True, - chunk_token_size=10, - ) - - assert len(chunks) == 1 - assert chunks[0]["content"] == "" - assert chunks[0]["tokens"] == 0 - - -@pytest.mark.offline -def test_single_character(): - """Test chunking with single character.""" - tokenizer = make_tokenizer() - - chunks = chunking_by_token_size( - tokenizer, - "a", - split_by_character="\n\n", - split_by_character_only=True, - chunk_token_size=10, - ) - - assert len(chunks) == 1 - assert chunks[0]["content"] == "a" - assert chunks[0]["tokens"] == 1 - - -@pytest.mark.offline -def test_no_delimiter_in_content(): - """Test chunking when content has no delimiter.""" - tokenizer = make_tokenizer() - content = "a" * 30 - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character="\n\n", # Delimiter not in content - split_by_character_only=False, - chunk_token_size=10, - chunk_overlap_token_size=0, - ) - - # Should still split based on token size - assert len(chunks) == 3 - assert all(chunk["tokens"] == 10 for chunk in chunks) - - -@pytest.mark.offline -def test_no_split_character(): - """Test chunking without split_by_character (None).""" - tokenizer = make_tokenizer() - content = "a" * 30 - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character=None, - split_by_character_only=False, - chunk_token_size=10, - chunk_overlap_token_size=0, - ) - - # Should split based purely on token size - assert len(chunks) == 3 - assert all(chunk["tokens"] == 10 for chunk in chunks) - - -# ============================================================================ -# Parameter Combinations -# ============================================================================ - - -@pytest.mark.offline -def test_different_delimiter_newline(): - """Test with single newline delimiter.""" - tokenizer = make_tokenizer() - content = "alpha\nbeta\ngamma" - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character="\n", - split_by_character_only=True, - chunk_token_size=10, - ) - - assert len(chunks) == 3 - assert [c["content"] for c in chunks] == ["alpha", "beta", "gamma"] - - -@pytest.mark.offline -def test_different_delimiter_comma(): - """Test with comma delimiter.""" - tokenizer = make_tokenizer() - content = "one,two,three" - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character=",", - split_by_character_only=True, - chunk_token_size=10, - ) - - assert len(chunks) == 3 - assert [c["content"] for c in chunks] == ["one", "two", "three"] - - -@pytest.mark.offline -def test_zero_overlap(): - """Test with zero overlap (no overlap).""" - tokenizer = make_tokenizer() - content = "a" * 20 - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character=None, - split_by_character_only=False, - chunk_token_size=10, - chunk_overlap_token_size=0, - ) - - # Should create exactly 2 chunks with no overlap - assert len(chunks) == 2 - assert chunks[0]["tokens"] == 10 - assert chunks[1]["tokens"] == 10 - - -@pytest.mark.offline -def test_large_overlap(): - """Test with overlap close to chunk size.""" - tokenizer = make_tokenizer() - content = "a" * 30 - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character=None, - split_by_character_only=False, - chunk_token_size=10, - chunk_overlap_token_size=9, - ) - - # With overlap=9, chunks start at: 0, 1, 2, 3... - # Step size = chunk_size - overlap = 10 - 9 = 1 - # So we get: [0:10], [1:11], [2:12], ..., [29:30] - # range(0, 30, 1) = 0 to 29, so 30 chunks total - assert len(chunks) == 30 - - -# ============================================================================ -# Chunk Order Index Tests -# ============================================================================ - - -@pytest.mark.offline -def test_chunk_order_index_simple(): - """Test that chunk_order_index is correctly assigned.""" - tokenizer = make_tokenizer() - content = "a\n\nb\n\nc" - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character="\n\n", - split_by_character_only=True, - chunk_token_size=10, - ) - - assert len(chunks) == 3 - assert chunks[0]["chunk_order_index"] == 0 - assert chunks[1]["chunk_order_index"] == 1 - assert chunks[2]["chunk_order_index"] == 2 - - -@pytest.mark.offline -def test_chunk_order_index_with_splitting(): - """Test chunk_order_index with recursive splitting.""" - tokenizer = make_tokenizer() - content = "a" * 30 - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character=None, - split_by_character_only=False, - chunk_token_size=10, - chunk_overlap_token_size=0, - ) - - assert len(chunks) == 3 - assert chunks[0]["chunk_order_index"] == 0 - assert chunks[1]["chunk_order_index"] == 1 - assert chunks[2]["chunk_order_index"] == 2 - - -# ============================================================================ -# Integration Tests -# ============================================================================ - - -@pytest.mark.offline -def test_mixed_size_chunks_no_error(): - """Test that mixed size chunks work without error in recursive mode.""" - tokenizer = make_tokenizer() - # Mix of small and large chunks - content = "small\n\n" + "a" * 50 + "\n\nmedium" - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character="\n\n", - split_by_character_only=False, - chunk_token_size=10, - chunk_overlap_token_size=2, - ) - - # Should handle all chunks without error - assert len(chunks) > 0 - # Small chunk should remain intact - assert chunks[0]["content"] == "small" - # Large chunk should be split into multiple pieces - assert any(chunk["content"] == "a" * 10 for chunk in chunks) - # Last chunk should contain "medium" - assert any("medium" in chunk["content"] for chunk in chunks) - - -@pytest.mark.offline -def test_whitespace_handling(): - """Test that whitespace is properly handled in chunk content.""" - tokenizer = make_tokenizer() - content = " alpha \n\n beta " - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character="\n\n", - split_by_character_only=True, - chunk_token_size=20, - ) - - # Content should be stripped - assert chunks[0]["content"] == "alpha" - assert chunks[1]["content"] == "beta" - - -@pytest.mark.offline -def test_consecutive_delimiters(): - """Test handling of consecutive delimiters.""" - tokenizer = make_tokenizer() - content = "alpha\n\n\n\nbeta" - - chunks = chunking_by_token_size( - tokenizer, - content, - split_by_character="\n\n", - split_by_character_only=True, - chunk_token_size=20, - ) - - # Should split on delimiter and include empty chunks - assert len(chunks) >= 2 - assert "alpha" in [c["content"] for c in chunks] - assert "beta" in [c["content"] for c in chunks]