diff --git a/README-zh.md b/README-zh.md index d3403b35..ea2ad24b 100644 --- a/README-zh.md +++ b/README-zh.md @@ -275,7 +275,7 @@ if __name__ == "__main__": | **vector_db_storage_cls_kwargs** | `dict` | 向量数据库的附加参数,如设置节点和关系检索的阈值 | cosine_better_than_threshold: 0.2(默认值由环境变量COSINE_THRESHOLD更改) | | **enable_llm_cache** | `bool` | 如果为`TRUE`,将LLM结果存储在缓存中;重复的提示返回缓存的响应 | `TRUE` | | **enable_llm_cache_for_entity_extract** | `bool` | 如果为`TRUE`,将实体提取的LLM结果存储在缓存中;适合初学者调试应用程序 | `TRUE` | -| **addon_params** | `dict` | 附加参数,例如`{"example_number": 1, "language": "Simplified Chinese", "entity_types": ["organization", "person", "geo", "event"]}`:设置示例限制、输出语言和文档处理的批量大小 | `example_number: 所有示例, language: English` | +| **addon_params** | `dict` | 附加参数,例如`{"language": "Simplified Chinese", "entity_types": ["organization", "person", "location", "event"]}`:设置示例限制、输出语言和文档处理的批量大小 | language: English` | | **embedding_cache_config** | `dict` | 问答缓存的配置。包含三个参数:`enabled`:布尔值,启用/禁用缓存查找功能。启用时,系统将在生成新答案之前检查缓存的响应。`similarity_threshold`:浮点值(0-1),相似度阈值。当新问题与缓存问题的相似度超过此阈值时,将直接返回缓存的答案而不调用LLM。`use_llm_check`:布尔值,启用/禁用LLM相似度验证。启用时,在返回缓存答案之前,将使用LLM作为二次检查来验证问题之间的相似度。 | 默认:`{"enabled": False, "similarity_threshold": 0.95, "use_llm_check": False}` | diff --git a/README.md b/README.md index 5ad37f01..e5a1625f 100644 --- a/README.md +++ b/README.md @@ -282,7 +282,7 @@ A full list of LightRAG init parameters: | **vector_db_storage_cls_kwargs** | `dict` | Additional parameters for vector database, like setting the threshold for nodes and relations retrieval | cosine_better_than_threshold: 0.2(default value changed by env var COSINE_THRESHOLD) | | **enable_llm_cache** | `bool` | If `TRUE`, stores LLM results in cache; repeated prompts return cached responses | `TRUE` | | **enable_llm_cache_for_entity_extract** | `bool` | If `TRUE`, stores LLM results in cache for entity extraction; Good for beginners to debug your application | `TRUE` | -| **addon_params** | `dict` | Additional parameters, e.g., `{"example_number": 1, "language": "Simplified Chinese", "entity_types": ["organization", "person", "geo", "event"]}`: sets example limit, entiy/relation extraction output language | `example_number: all examples, language: English` | +| **addon_params** | `dict` | Additional parameters, e.g., `{"language": "Simplified Chinese", "entity_types": ["organization", "person", "location", "event"]}`: sets example limit, entiy/relation extraction output language | language: English` | | **embedding_cache_config** | `dict` | Configuration for question-answer caching. Contains three parameters: `enabled`: Boolean value to enable/disable cache lookup functionality. When enabled, the system will check cached responses before generating new answers. `similarity_threshold`: Float value (0-1), similarity threshold. When a new question's similarity with a cached question exceeds this threshold, the cached answer will be returned directly without calling the LLM. `use_llm_check`: Boolean value to enable/disable LLM similarity verification. When enabled, LLM will be used as a secondary check to verify the similarity between questions before returning cached answers. | Default: `{"enabled": False, "similarity_threshold": 0.95, "use_llm_check": False}` | diff --git a/env.example b/env.example index 6a18a68c..4ab64aba 100644 --- a/env.example +++ b/env.example @@ -125,7 +125,7 @@ ENABLE_LLM_CACHE_FOR_EXTRACT=true SUMMARY_LANGUAGE=English ### Entity types that the LLM will attempt to recognize -# ENTITY_TYPES=["person", "organization", "location", "event", "concept"] +# ENTITY_TYPES='["Organization", "Person", "Equiment", "Product", "Technology", "Location", "Event", "Category"]' ### Chunk size for document splitting, 500~1500 is recommended # CHUNK_SIZE=1200 diff --git a/lightrag/api/config.py b/lightrag/api/config.py index eae2f45b..f17d50f0 100644 --- a/lightrag/api/config.py +++ b/lightrag/api/config.py @@ -352,7 +352,7 @@ def parse_args() -> argparse.Namespace: # Add environment variables that were previously read directly args.cors_origins = get_env_value("CORS_ORIGINS", "*") args.summary_language = get_env_value("SUMMARY_LANGUAGE", DEFAULT_SUMMARY_LANGUAGE) - args.entity_types = get_env_value("ENTITY_TYPES", DEFAULT_ENTITY_TYPES) + args.entity_types = get_env_value("ENTITY_TYPES", DEFAULT_ENTITY_TYPES, list) args.whitelist_paths = get_env_value("WHITELIST_PATHS", "/health,/api/*") # For JWT Auth diff --git a/lightrag/constants.py b/lightrag/constants.py index d78d869c..9accdc52 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -24,11 +24,14 @@ DEFAULT_SUMMARY_LENGTH_RECOMMENDED = 600 DEFAULT_SUMMARY_CONTEXT_SIZE = 12000 # Default entities to extract if ENTITY_TYPES is not specified in .env DEFAULT_ENTITY_TYPES = [ - "organization", - "person", - "geo", - "event", - "category", + "Organization", + "Person", + "Equiment", + "Product", + "Technology", + "Location", + "Event", + "Category", ] # Separator for graph fields diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 23e6f575..43614a93 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -469,6 +469,7 @@ class LightRAG: self.embedding_func = priority_limit_async_func_call( self.embedding_func_max_async, llm_timeout=self.default_embedding_timeout, + queue_name="Embedding func:", )(self.embedding_func) # Initialize all storages @@ -565,6 +566,7 @@ class LightRAG: self.llm_model_func = priority_limit_async_func_call( self.llm_model_max_async, llm_timeout=self.default_llm_timeout, + queue_name="LLM func:", )( partial( self.llm_model_func, # type: ignore diff --git a/lightrag/operate.py b/lightrag/operate.py index afa8205f..1a4b9266 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -11,11 +11,10 @@ from collections import Counter, defaultdict from .utils import ( logger, - clean_str, compute_mdhash_id, Tokenizer, is_float_regex, - normalize_extracted_info, + sanitize_and_normalize_extracted_text, pack_user_ass_to_openai_messages, split_string_by_multi_markers, truncate_list_by_token_size, @@ -31,7 +30,6 @@ from .utils import ( pick_by_vector_similarity, process_chunks_unified, build_file_path, - sanitize_text_for_encoding, ) from .base import ( BaseGraphStorage, @@ -316,18 +314,18 @@ async def _handle_single_entity_extraction( chunk_key: str, file_path: str = "unknown_source", ): - if len(record_attributes) < 4 or '"entity"' not in record_attributes[0]: + if len(record_attributes) < 4 or "entity" not in record_attributes[0]: + if len(record_attributes) > 1 and "entity" in record_attributes[0]: + logger.warning( + f"Entity extraction failed in {chunk_key}: expecting 4 fields but got {len(record_attributes)}" + ) + logger.warning(f"Entity extracted: {record_attributes[1]}") return None try: - # Step 1: Strict UTF-8 encoding sanitization (fail-fast approach) - entity_name = sanitize_text_for_encoding(record_attributes[1]) - - # Step 2: HTML and control character cleaning - entity_name = clean_str(entity_name).strip() - - # Step 3: Business logic normalization - entity_name = normalize_extracted_info(entity_name, is_entity=True) + entity_name = sanitize_and_normalize_extracted_text( + record_attributes[1], remove_inner_quotes=True + ) # Validate entity name after all cleaning steps if not entity_name or not entity_name.strip(): @@ -337,18 +335,20 @@ async def _handle_single_entity_extraction( return None # Process entity type with same cleaning pipeline - entity_type = sanitize_text_for_encoding(record_attributes[2]) - entity_type = clean_str(entity_type).strip('"') - if not entity_type.strip() or entity_type.startswith('("'): + entity_type = sanitize_and_normalize_extracted_text( + record_attributes[2], remove_inner_quotes=True + ) + + if not entity_type.strip() or any( + char in entity_type for char in ["'", "(", ")", "<", ">", "|", "/", "\\"] + ): logger.warning( f"Entity extraction error: invalid entity type in: {record_attributes}" ) return None # Process entity description with same cleaning pipeline - entity_description = sanitize_text_for_encoding(record_attributes[3]) - entity_description = clean_str(entity_description) - entity_description = normalize_extracted_info(entity_description) + entity_description = sanitize_and_normalize_extracted_text(record_attributes[3]) if not entity_description.strip(): logger.warning( @@ -381,31 +381,30 @@ async def _handle_single_relationship_extraction( chunk_key: str, file_path: str = "unknown_source", ): - if len(record_attributes) < 5 or '"relationship"' not in record_attributes[0]: + if len(record_attributes) < 5 or "relationship" not in record_attributes[0]: + if len(record_attributes) > 1 and "relationship" in record_attributes[0]: + logger.warning( + f"Relation extraction failed in {chunk_key}: expecting 5 fields but got {len(record_attributes)}" + ) + logger.warning(f"Relation extracted: {record_attributes[1]}") return None try: - # Process source and target entities with strict cleaning pipeline - # Step 1: Strict UTF-8 encoding sanitization (fail-fast approach) - source = sanitize_text_for_encoding(record_attributes[1]) - # Step 2: HTML and control character cleaning - source = clean_str(source) - # Step 3: Business logic normalization - source = normalize_extracted_info(source, is_entity=True) - - # Same pipeline for target entity - target = sanitize_text_for_encoding(record_attributes[2]) - target = clean_str(target) - target = normalize_extracted_info(target, is_entity=True) + source = sanitize_and_normalize_extracted_text( + record_attributes[1], remove_inner_quotes=True + ) + target = sanitize_and_normalize_extracted_text( + record_attributes[2], remove_inner_quotes=True + ) # Validate entity names after all cleaning steps - if not source or not source.strip(): + if not source: logger.warning( f"Relationship extraction error: source entity became empty after cleaning. Original: '{record_attributes[1]}'" ) return None - if not target or not target.strip(): + if not target: logger.warning( f"Relationship extraction error: target entity became empty after cleaning. Original: '{record_attributes[2]}'" ) @@ -417,17 +416,15 @@ async def _handle_single_relationship_extraction( ) return None - # Process relationship description with same cleaning pipeline - edge_description = sanitize_text_for_encoding(record_attributes[3]) - edge_description = clean_str(edge_description) - edge_description = normalize_extracted_info(edge_description) - # Process keywords with same cleaning pipeline - edge_keywords = sanitize_text_for_encoding(record_attributes[4]) - edge_keywords = clean_str(edge_keywords) - edge_keywords = normalize_extracted_info(edge_keywords, is_entity=True) + edge_keywords = sanitize_and_normalize_extracted_text( + record_attributes[3], remove_inner_quotes=True + ) edge_keywords = edge_keywords.replace(",", ",") + # Process relationship description with same cleaning pipeline + edge_description = sanitize_and_normalize_extracted_text(record_attributes[4]) + edge_source_id = chunk_key weight = ( float(record_attributes[-1].strip('"').strip("'")) @@ -446,12 +443,12 @@ async def _handle_single_relationship_extraction( ) except ValueError as e: - logger.error( + logger.warning( f"Relationship extraction failed due to encoding issues in chunk {chunk_key}: {e}" ) return None except Exception as e: - logger.error( + logger.warning( f"Relationship extraction failed with unexpected error in chunk {chunk_key}: {e}" ) return None @@ -1689,13 +1686,8 @@ async def extract_entities( entity_types = global_config["addon_params"].get( "entity_types", DEFAULT_ENTITY_TYPES ) - example_number = global_config["addon_params"].get("example_number", None) - if example_number and example_number < len(PROMPTS["entity_extraction_examples"]): - examples = "\n".join( - PROMPTS["entity_extraction_examples"][: int(example_number)] - ) - else: - examples = "\n".join(PROMPTS["entity_extraction_examples"]) + + examples = "\n".join(PROMPTS["entity_extraction_examples"]) example_context_base = dict( tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"], @@ -2140,13 +2132,8 @@ async def extract_keywords_only( ) # 2. Build the examples - example_number = global_config["addon_params"].get("example_number", None) - if example_number and example_number < len(PROMPTS["keywords_extraction_examples"]): - examples = "\n".join( - PROMPTS["keywords_extraction_examples"][: int(example_number)] - ) - else: - examples = "\n".join(PROMPTS["keywords_extraction_examples"]) + examples = "\n".join(PROMPTS["keywords_extraction_examples"]) + language = global_config["addon_params"].get("language", DEFAULT_SUMMARY_LANGUAGE) # 3. Process conversation history diff --git a/lightrag/prompt.py b/lightrag/prompt.py index f8ea6589..51d15bc0 100644 --- a/lightrag/prompt.py +++ b/lightrag/prompt.py @@ -15,27 +15,35 @@ Given a text document that is potentially relevant to this activity and a list o Use {language} as output language. ---Steps--- -1. Identify all entities. For each identified entity, extract the following information: +1. Recognizing definitively conceptualized entities in text. For each identified entity, extract the following information: - entity_name: Name of the entity, use same language as input text. If English, capitalized the name -- entity_type: One of the following types: [{entity_types}] -- entity_description: Provide a comprehensive description of the entity's attributes and activities *based solely on the information present in the input text*. **Do not infer or hallucinate information not explicitly stated.** If the text provides insufficient information to create a comprehensive description, state "Description not available in text." -Format each entity as ("entity"{tuple_delimiter}{tuple_delimiter}{tuple_delimiter}) +- entity_type: One of the following types: [{entity_types}]. If the entity doesn't clearly fit any category, classify it as "Other". +- entity_description: Provide a comprehensive description of the entity's attributes and activities based on the information present in the input text. Do not add external knowledge. -2. From the entities identified in step 1, identify all pairs of (source_entity, target_entity) that are *clearly related* to each other. +2. Format each entity as: +("entity"{tuple_delimiter}{tuple_delimiter}{tuple_delimiter}) + +3. From the entities identified in step 1, identify all pairs of (source_entity, target_entity) that are directly and clearly related based on the text. Unsubstantiated relationships must be excluded from the output. For each pair of related entities, extract the following information: - source_entity: name of the source entity, as identified in step 1 - target_entity: name of the target entity, as identified in step 1 -- relationship_description: explanation as to why you think the source entity and the target entity are related to each other -- relationship_strength: a numeric score indicating strength of the relationship between the source entity and target entity - relationship_keywords: one or more high-level key words that summarize the overarching nature of the relationship, focusing on concepts or themes rather than specific details -Format each relationship as ("relationship"{tuple_delimiter}{tuple_delimiter}{tuple_delimiter}{tuple_delimiter}{tuple_delimiter}) +- relationship_description: Explain the nature of the relationship between the source and target entities, providing a clear rationale for their connection -3. Identify high-level key words that summarize the main concepts, themes, or topics of the entire text. These should capture the overarching ideas present in the document. -Format the content-level key words as ("content_keywords"{tuple_delimiter}) +4. Format each relationship as: +("relationship"{tuple_delimiter}{tuple_delimiter}{tuple_delimiter}{tuple_delimiter}) -4. Return output in {language} as a single list of all the entities and relationships identified in steps 1 and 2. Use **{record_delimiter}** as the list delimiter. +5. Use `{tuple_delimiter}` as field delimiter. Use `{record_delimiter}` as the list delimiter. Ensure no spaces are added around the delimiters. -5. When finished, output {completion_delimiter} +6. When finished, output `{completion_delimiter}` + +7. Return identified entities and relationships in {language}. + +---Quality Guidelines--- +- Only extract entities that are clearly defined and meaningful in the context +- Avoid over-interpretation; stick to what is explicitly stated in the text +- Include specific numerical data in entity name when relevant +- Ensure entity names are consistent throughout the extraction ---Examples--- {examples} @@ -43,15 +51,18 @@ Format the content-level key words as ("content_keywords"{tuple_delimiter}{tuple_delimiter}{tuple_delimiter}) +- entity_type: One of the following types: [{entity_types}]. If the entity doesn't clearly fit any category, classify it as "Other". +- entity_description: Provide a comprehensive description of the entity's attributes and activities based on the information present in the input text. Do not add external knowledge. -2. From the entities identified in step 1, identify all pairs of (source_entity, target_entity) that are *clearly related* to each other. +2. Format each entity as: +("entity"{tuple_delimiter}{tuple_delimiter}{tuple_delimiter}) + +3. From the entities identified in step 1, identify all pairs of (source_entity, target_entity) that are directly and clearly related based on the text. Unsubstantiated relationships must be excluded from the output. For each pair of related entities, extract the following information: - source_entity: name of the source entity, as identified in step 1 - target_entity: name of the target entity, as identified in step 1 -- relationship_description: explanation as to why you think the source entity and the target entity are related to each other -- relationship_strength: a numeric score indicating strength of the relationship between the source entity and target entity - relationship_keywords: one or more high-level key words that summarize the overarching nature of the relationship, focusing on concepts or themes rather than specific details -Format each relationship as ("relationship"{tuple_delimiter}{tuple_delimiter}{tuple_delimiter}{tuple_delimiter}{tuple_delimiter}) +- relationship_description: Explain the nature of the relationship between the source and target entities, providing a clear rationale for their connection -3. Identify high-level key words that summarize the main concepts, themes, or topics of the entire text. These should capture the overarching ideas present in the document. -Format the content-level key words as ("content_keywords"{tuple_delimiter}) +4. Format each relationship as: +("relationship"{tuple_delimiter}{tuple_delimiter}{tuple_delimiter}{tuple_delimiter}) -4. Return output in {language} as a single list of all the entities and relationships identified in steps 1 and 2. Use **{record_delimiter}** as the list delimiter. +5. Use `{tuple_delimiter}` as field delimiter. Use `{record_delimiter}` as the list delimiter. Ensure no spaces are added around the delimiters. -5. When finished, output {completion_delimiter} +6. When finished, output `{completion_delimiter}` + +7. Return identified entities and relationships in {language}. ---Output--- - -Add new entities and relations below using the same format, and do not include entities and relations that have been previously extracted. :\n -""".strip() +Output: +""" PROMPTS["entity_if_loop_extraction"] = """ ---Goal---' diff --git a/lightrag/utils.py b/lightrag/utils.py index 87ce5b6a..7c92d99c 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -82,6 +82,27 @@ def get_env_value( if value_type is bool: return value.lower() in ("true", "1", "yes", "t", "on") + + # Handle list type with JSON parsing + if value_type is list: + try: + import json + + parsed_value = json.loads(value) + # Ensure the parsed value is actually a list + if isinstance(parsed_value, list): + return parsed_value + else: + logger.warning( + f"Environment variable {env_key} is not a valid JSON list, using default" + ) + return default + except (json.JSONDecodeError, ValueError) as e: + logger.warning( + f"Failed to parse {env_key} as JSON list: {e}, using default" + ) + return default + try: return value_type(value) except (ValueError, TypeError): @@ -374,6 +395,7 @@ def priority_limit_async_func_call( max_task_duration: float = None, max_queue_size: int = 1000, cleanup_timeout: float = 2.0, + queue_name: str = "limit_async", ): """ Enhanced priority-limited asynchronous function call decorator with robust timeout handling @@ -391,6 +413,7 @@ def priority_limit_async_func_call( max_execution_timeout: Maximum time for worker to execute function (defaults to llm_timeout + 30s) max_task_duration: Maximum time before health check intervenes (defaults to llm_timeout + 60s) cleanup_timeout: Maximum time to wait for cleanup operations (defaults to 2.0s) + queue_name: Optional queue name for logging identification (defaults to "limit_async") Returns: Decorator function @@ -482,7 +505,7 @@ def priority_limit_async_func_call( except asyncio.TimeoutError: # Worker-level timeout (max_execution_timeout exceeded) logger.warning( - f"limit_async: Worker timeout for task {task_id} after {max_execution_timeout}s" + f"{queue_name}: Worker timeout for task {task_id} after {max_execution_timeout}s" ) if not task_state.future.done(): task_state.future.set_exception( @@ -495,12 +518,12 @@ def priority_limit_async_func_call( if not task_state.future.done(): task_state.future.cancel() logger.debug( - f"limit_async: Task {task_id} cancelled during execution" + f"{queue_name}: Task {task_id} cancelled during execution" ) except Exception as e: # Function execution error logger.error( - f"limit_async: Error in decorated function for task {task_id}: {str(e)}" + f"{queue_name}: Error in decorated function for task {task_id}: {str(e)}" ) if not task_state.future.done(): task_state.future.set_exception(e) @@ -512,10 +535,12 @@ def priority_limit_async_func_call( except Exception as e: # Critical error in worker loop - logger.error(f"limit_async: Critical error in worker: {str(e)}") + logger.error( + f"{queue_name}: Critical error in worker: {str(e)}" + ) await asyncio.sleep(0.1) finally: - logger.debug("limit_async: Worker exiting") + logger.debug(f"{queue_name}: Worker exiting") async def enhanced_health_check(): """Enhanced health check with stuck task detection and recovery""" @@ -549,7 +574,7 @@ def priority_limit_async_func_call( # Force cleanup of stuck tasks for task_id, execution_duration in stuck_tasks: logger.warning( - f"limit_async: Detected stuck task {task_id} (execution time: {execution_duration:.1f}s), forcing cleanup" + f"{queue_name}: Detected stuck task {task_id} (execution time: {execution_duration:.1f}s), forcing cleanup" ) async with task_states_lock: if task_id in task_states: @@ -572,7 +597,7 @@ def priority_limit_async_func_call( if workers_needed > 0: logger.info( - f"limit_async: Creating {workers_needed} new workers" + f"{queue_name}: Creating {workers_needed} new workers" ) new_tasks = set() for _ in range(workers_needed): @@ -582,9 +607,9 @@ def priority_limit_async_func_call( tasks.update(new_tasks) except Exception as e: - logger.error(f"limit_async: Error in enhanced health check: {str(e)}") + logger.error(f"{queue_name}: Error in enhanced health check: {str(e)}") finally: - logger.debug("limit_async: Enhanced health check task exiting") + logger.debug(f"{queue_name}: Enhanced health check task exiting") initialized = False async def ensure_workers(): @@ -601,7 +626,7 @@ def priority_limit_async_func_call( if reinit_count > 0: reinit_count += 1 logger.warning( - f"limit_async: Reinitializing system (count: {reinit_count})" + f"{queue_name}: Reinitializing system (count: {reinit_count})" ) else: reinit_count = 1 @@ -614,7 +639,7 @@ def priority_limit_async_func_call( active_tasks_count = len(tasks) if active_tasks_count > 0 and reinit_count > 1: logger.warning( - f"limit_async: {active_tasks_count} tasks still running during reinitialization" + f"{queue_name}: {active_tasks_count} tasks still running during reinitialization" ) # Create worker tasks @@ -641,12 +666,12 @@ def priority_limit_async_func_call( f" (Timeouts: {', '.join(timeout_info)})" if timeout_info else "" ) logger.info( - f"limit_async: {workers_needed} new workers initialized {timeout_str}" + f"{queue_name}: {workers_needed} new workers initialized {timeout_str}" ) async def shutdown(): """Gracefully shut down all workers and cleanup resources""" - logger.info("limit_async: Shutting down priority queue workers") + logger.info(f"{queue_name}: Shutting down priority queue workers") shutdown_event.set() @@ -667,7 +692,7 @@ def priority_limit_async_func_call( await asyncio.wait_for(queue.join(), timeout=5.0) except asyncio.TimeoutError: logger.warning( - "limit_async: Timeout waiting for queue to empty during shutdown" + f"{queue_name}: Timeout waiting for queue to empty during shutdown" ) # Cancel worker tasks @@ -687,7 +712,7 @@ def priority_limit_async_func_call( except asyncio.CancelledError: pass - logger.info("limit_async: Priority queue workers shutdown complete") + logger.info(f"{queue_name}: Priority queue workers shutdown complete") @wraps(func) async def wait_func( @@ -750,7 +775,7 @@ def priority_limit_async_func_call( ) except asyncio.TimeoutError: raise QueueFullError( - f"Queue full, timeout after {_queue_timeout} seconds" + f"{queue_name}: Queue full, timeout after {_queue_timeout} seconds" ) except Exception as e: # Clean up on queue error @@ -785,14 +810,14 @@ def priority_limit_async_func_call( await asyncio.sleep(0.1) raise TimeoutError( - f"limit_async: User timeout after {_timeout} seconds" + f"{queue_name}: User timeout after {_timeout} seconds" ) except WorkerTimeoutError as e: # This is Worker-level timeout, directly propagate exception information - raise TimeoutError(f"limit_async: {str(e)}") + raise TimeoutError(f"{queue_name}: {str(e)}") except HealthCheckTimeoutError as e: # This is Health Check-level timeout, directly propagate exception information - raise TimeoutError(f"limit_async: {str(e)}") + raise TimeoutError(f"{queue_name}: {str(e)}") finally: # Ensure cleanup @@ -931,19 +956,6 @@ def split_string_by_multi_markers(content: str, markers: list[str]) -> list[str] return [r.strip() for r in results if r.strip()] -# Refer the utils functions of the official GraphRAG implementation: -# https://github.com/microsoft/graphrag -def clean_str(input: Any) -> str: - """Clean an input string by removing HTML escapes, control characters, and other unwanted characters.""" - # If we get non-string input, just give it back - if not isinstance(input, str): - return input - - result = html.unescape(input.strip()) - # https://stackoverflow.com/questions/4324790/removing-control-characters-from-a-string-in-python - return re.sub(r"[\x00-\x1f\x7f-\x9f]", "", result) - - def is_float_regex(value: str) -> bool: return bool(re.match(r"^[-+]?[0-9]*\.?[0-9]+$", value)) @@ -1728,29 +1740,78 @@ def get_content_summary(content: str, max_length: int = 250) -> str: return content[:max_length] + "..." -def normalize_extracted_info(name: str, is_entity=False) -> str: +def sanitize_and_normalize_extracted_text( + input_text: str, remove_inner_quotes=False +) -> str: + """Santitize and normalize extracted text + Args: + input_text: text string to be processed + is_name: whether the input text is a entity or relation name + + Returns: + Santitized and normalized text string + """ + safe_input_text = sanitize_text_for_encoding(input_text) + if safe_input_text: + normalized_text = normalize_extracted_info( + safe_input_text, remove_inner_quotes=remove_inner_quotes + ) + return normalized_text + return "" + + +def normalize_extracted_info(name: str, remove_inner_quotes=False) -> str: """Normalize entity/relation names and description with the following rules: - 1. Remove spaces between Chinese characters - 2. Remove spaces between Chinese characters and English letters/numbers - 3. Preserve spaces within English text and numbers - 4. Replace Chinese parentheses with English parentheses - 5. Replace Chinese dash with English dash - 6. Remove English quotation marks from the beginning and end of the text - 7. Remove English quotation marks in and around chinese - 8. Remove Chinese quotation marks + 1. Clean HTML tags (paragraph and line break tags) + 2. Convert Chinese symbols to English symbols + 3. Remove spaces between Chinese characters + 4. Remove spaces between Chinese characters and English letters/numbers + 5. Preserve spaces within English text and numbers + 6. Replace Chinese parentheses with English parentheses + 7. Replace Chinese dash with English dash + 8. Remove English quotation marks from the beginning and end of the text + 9. Remove English quotation marks in and around chinese + 10. Remove Chinese quotation marks + 11. Filter out short numeric-only text (length < 3 and only digits/dots) Args: name: Entity name to normalize + is_entity: Whether this is an entity name (affects quote handling) Returns: Normalized entity name """ + # 1. Clean HTML tags - remove paragraph and line break tags + name = re.sub(r"||

", "", name, flags=re.IGNORECASE) + name = re.sub(r"||
", "", name, flags=re.IGNORECASE) + + # 2. Convert Chinese symbols to English symbols + # Chinese full-width letters to half-width (A-Z, a-z) + name = name.translate( + str.maketrans( + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz", + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz", + ) + ) + + # Chinese full-width numbers to half-width + name = name.translate(str.maketrans("0123456789", "0123456789")) + + # Chinese full-width symbols to half-width + name = name.replace("-", "-") # Chinese minus + name = name.replace("+", "+") # Chinese plus + name = name.replace("/", "/") # Chinese slash + name = name.replace("*", "*") # Chinese asterisk + # Replace Chinese parentheses with English parentheses name = name.replace("(", "(").replace(")", ")") - # Replace Chinese dash with English dash + # Replace Chinese dash with English dash (additional patterns) name = name.replace("—", "-").replace("-", "-") + # Chinese full-width space to regular space (after other replacements) + name = name.replace(" ", " ") + # Use regex to remove spaces between Chinese characters # Regex explanation: # (?<=[\u4e00-\u9fa5]): Positive lookbehind for Chinese character @@ -1766,19 +1827,57 @@ def normalize_extracted_info(name: str, is_entity=False) -> str: r"(?<=[a-zA-Z0-9\(\)\[\]@#$%!&\*\-=+_])\s+(?=[\u4e00-\u9fa5])", "", name ) - # Remove English quotation marks from the beginning and end - if len(name) >= 2 and name.startswith('"') and name.endswith('"'): - name = name[1:-1] - if len(name) >= 2 and name.startswith("'") and name.endswith("'"): - name = name[1:-1] + # Remove outer quotes + if len(name) >= 2: + # Handle double quotes + if name.startswith('"') and name.endswith('"'): + inner_content = name[1:-1] + if '"' not in inner_content: # No double quotes inside + name = inner_content - if is_entity: + # Handle single quotes + if name.startswith("'") and name.endswith("'"): + inner_content = name[1:-1] + if "'" not in inner_content: # No single quotes inside + name = inner_content + + # Handle Chinese-style double quotes + if name.startswith("“") and name.endswith("”"): + inner_content = name[1:-1] + if "“" not in inner_content and "”" not in inner_content: + name = inner_content + if name.startswith("‘") and name.endswith("’"): + inner_content = name[1:-1] + if "‘" not in inner_content and "’" not in inner_content: + name = inner_content + + if remove_inner_quotes: # remove Chinese quotes name = name.replace("“", "").replace("”", "").replace("‘", "").replace("’", "") # remove English queotes in and around chinese name = re.sub(r"['\"]+(?=[\u4e00-\u9fa5])", "", name) name = re.sub(r"(?<=[\u4e00-\u9fa5])['\"]+", "", name) + # Remove spaces from the beginning and end of the text + name = name.strip() + + # Filter out pure numeric content with length < 3 + if len(name) < 3 and re.match(r"^[0-9]+$", name): + return "" + + def should_filter_by_dots(text): + """ + Check if the string consists only of dots and digits, with at least one dot + Filter cases include: 1.2.3, 12.3, .123, 123., 12.3., .1.23 etc. + """ + return all(c.isdigit() or c == "." for c in text) and "." in text + + if len(name) < 6 and should_filter_by_dots(name): + # Filter out mixed numeric and dot content with length < 6 + return "" + # Filter out mixed numeric and dot content with length < 6, requiring at least one dot + return "" + return name @@ -1789,6 +1888,8 @@ def sanitize_text_for_encoding(text: str, replacement_char: str = "") -> str: - Surrogate characters (the main cause of encoding errors) - Other invalid Unicode sequences - Control characters that might cause issues + - Unescape HTML escapes + - Remove control characters - Whitespace trimming Args: @@ -1801,9 +1902,6 @@ def sanitize_text_for_encoding(text: str, replacement_char: str = "") -> str: Raises: ValueError: When text contains uncleanable encoding issues that cannot be safely processed """ - if not isinstance(text, str): - return str(text) - if not text: return text @@ -1845,7 +1943,13 @@ def sanitize_text_for_encoding(text: str, replacement_char: str = "") -> str: # Test final encoding to ensure it's safe sanitized.encode("utf-8") - return sanitized + # Unescape HTML escapes + sanitized = html.unescape(sanitized) + + # Remove control characters + sanitized = re.sub(r"[\x00-\x1f\x7f-\x9f]", "", sanitized) + + return sanitized.strip() except UnicodeEncodeError as e: # Critical change: Don't return placeholder, raise exception for caller to handle