diff --git a/lightrag/operate.py b/lightrag/operate.py index 1d0821c3..ccf2550d 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -793,6 +793,87 @@ async def _get_cached_extraction_results( return sorted_cached_results +async def _process_extraction_result( + result: str, + chunk_key: str, + file_path: str = "unknown_source", + tuple_delimiter: str = "<|>", + record_delimiter: str = "##", + completion_delimiter: str = "<|COMPLETE|>", +) -> tuple[dict, dict]: + """Process a single extraction result (either initial or gleaning) + Args: + result (str): The extraction result to process + chunk_key (str): The chunk key for source tracking + file_path (str): The file path for citation + tuple_delimiter (str): Delimiter for tuple fields + record_delimiter (str): Delimiter for records + completion_delimiter (str): Delimiter for completion + Returns: + tuple: (nodes_dict, edges_dict) containing the extracted entities and relationships + """ + maybe_nodes = defaultdict(list) + maybe_edges = defaultdict(list) + + # Standardize Chinese brackets around record_delimiter to English brackets + bracket_pattern = f"[))](\\s*{re.escape(record_delimiter)}\\s*)[((]" + result = re.sub(bracket_pattern, ")\\1(", result) + + records = split_string_by_multi_markers( + result, + [record_delimiter, completion_delimiter], + ) + + for record in records: + # Remove outer brackets (support English and Chinese brackets) + record = record.strip() + if record.startswith("(") or record.startswith("("): + record = record[1:] + if record.endswith(")") or record.endswith(")"): + record = record[:-1] + + record = record.strip() + if record is None: + continue + + if tuple_delimiter == "<|>": + # fix entity<| with entity<|> + record = re.sub(r"^entity<\|(?!>)", r"entity<|>", record) + # fix relationship<| with relationship<|> + record = re.sub(r"^relationship<\|(?!>)", r"relationship<|>", record) + # fix <||> with <|> + record = record.replace("<||>", "<|>") + # fix < | > with <|> + record = record.replace("< | >", "<|>") + # fix <<|>> with <|> + record = record.replace("<<|>>", "<|>") + # fix <|>> with <|> + record = record.replace("<|>>", "<|>") + # fix <<|> with <|> + record = record.replace("<<|>", "<|>") + + record_attributes = split_string_by_multi_markers(record, [tuple_delimiter]) + + # Try to parse as entity + entity_data = await _handle_single_entity_extraction( + record_attributes, chunk_key, file_path + ) + if entity_data is not None: + maybe_nodes[entity_data["entity_name"]].append(entity_data) + continue + + # Try to parse as relationship + relationship_data = await _handle_single_relationship_extraction( + record_attributes, chunk_key, file_path + ) + if relationship_data is not None: + maybe_edges[ + (relationship_data["src_id"], relationship_data["tgt_id"]) + ].append(relationship_data) + + return dict(maybe_nodes), dict(maybe_edges) + + async def _parse_extraction_result( text_chunks_storage: BaseKVStorage, extraction_result: str, chunk_id: str ) -> tuple[dict, dict]: @@ -814,69 +895,16 @@ async def _parse_extraction_result( if chunk_data else "unknown_source" ) - context_base = dict( + + # Call the shared processing function + return await _process_extraction_result( + extraction_result, + chunk_id, + file_path, tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"], record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"], completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"], ) - maybe_nodes = defaultdict(list) - maybe_edges = defaultdict(list) - - # Standardize Chinese brackets around record_delimiter to English brackets - record_delimiter = context_base["record_delimiter"] - bracket_pattern = f"[))](\\s*{re.escape(record_delimiter)}\\s*)[((]" - extraction_result = re.sub(bracket_pattern, ")\\1(", extraction_result) - - # Parse the extraction result using the same logic as in extract_entities - records = split_string_by_multi_markers( - extraction_result, - [context_base["record_delimiter"], context_base["completion_delimiter"]], - ) - - for record in records: - # Remove outer brackets - record = record.strip() - if record.startswith("(") or record.startswith("("): - record = record[1:] - if record.endswith(")") or record.endswith(")"): - record = record[:-1] - - record = record.strip() - if record is None: - continue - - if context_base["tuple_delimiter"] == "<|>": - # fix entity<| with entity<|> - record = re.sub(r"^entity<\|(?!>)", r"entity<|>", record) - # fix relationship<| with relationship<|> - record = re.sub(r"^relationship<\|(?!>)", r"relationship<|>", record) - # fix <||> with <|> - record = record.replace("<||>", "<|>") - # fix < | > with <|> - record = record.replace("< | >", "<|>") - - record_attributes = split_string_by_multi_markers( - record, [context_base["tuple_delimiter"]] - ) - - # Try to parse as entity - entity_data = await _handle_single_entity_extraction( - record_attributes, chunk_id, file_path - ) - if entity_data is not None: - maybe_nodes[entity_data["entity_name"]].append(entity_data) - continue - - # Try to parse as relationship - relationship_data = await _handle_single_relationship_extraction( - record_attributes, chunk_id, file_path - ) - if relationship_data is not None: - maybe_edges[ - (relationship_data["src_id"], relationship_data["tgt_id"]) - ].append(relationship_data) - - return dict(maybe_nodes), dict(maybe_edges) async def _rebuild_single_entity( @@ -1738,73 +1766,6 @@ async def extract_entities( processed_chunks = 0 total_chunks = len(ordered_chunks) - async def _process_extraction_result( - result: str, chunk_key: str, file_path: str = "unknown_source" - ): - """Process a single extraction result (either initial or gleaning) - Args: - result (str): The extraction result to process - chunk_key (str): The chunk key for source tracking - file_path (str): The file path for citation - Returns: - tuple: (nodes_dict, edges_dict) containing the extracted entities and relationships - """ - maybe_nodes = defaultdict(list) - maybe_edges = defaultdict(list) - - # Standardize Chinese brackets around record_delimiter to English brackets - record_delimiter = context_base["record_delimiter"] - bracket_pattern = f"[))](\\s*{re.escape(record_delimiter)}\\s*)[((]" - result = re.sub(bracket_pattern, ")\\1(", result) - - records = split_string_by_multi_markers( - result, - [context_base["record_delimiter"], context_base["completion_delimiter"]], - ) - - for record in records: - # Remove outer brackets (support English and Chinese brackets) - record = record.strip() - if record.startswith("(") or record.startswith("("): - record = record[1:] - if record.endswith(")") or record.endswith(")"): - record = record[:-1] - - record = record.strip() - if record is None: - continue - - if context_base["tuple_delimiter"] == "<|>": - # fix entity<| with entity<|> - record = re.sub(r"^entity<\|(?!>)", r"entity<|>", record) - # fix relationship<| with relationship<|> - record = re.sub(r"^relationship<\|(?!>)", r"relationship<|>", record) - # fix <||> with <|> - record = record.replace("<||>", "<|>") - # fix < | > with <|> - record = record.replace("< | >", "<|>") - - record_attributes = split_string_by_multi_markers( - record, [context_base["tuple_delimiter"]] - ) - - if_entities = await _handle_single_entity_extraction( - record_attributes, chunk_key, file_path - ) - if if_entities is not None: - maybe_nodes[if_entities["entity_name"]].append(if_entities) - continue - - if_relation = await _handle_single_relationship_extraction( - record_attributes, chunk_key, file_path - ) - if if_relation is not None: - maybe_edges[(if_relation["src_id"], if_relation["tgt_id"])].append( - if_relation - ) - - return maybe_nodes, maybe_edges - async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]): """Process a single chunk Args: @@ -1842,7 +1803,12 @@ async def extract_entities( # Process initial extraction with file path maybe_nodes, maybe_edges = await _process_extraction_result( - final_result, chunk_key, file_path + final_result, + chunk_key, + file_path, + tuple_delimiter=context_base["tuple_delimiter"], + record_delimiter=context_base["record_delimiter"], + completion_delimiter=context_base["completion_delimiter"], ) # Process additional gleaning results @@ -1861,7 +1827,12 @@ async def extract_entities( # Process gleaning result separately with file path glean_nodes, glean_edges = await _process_extraction_result( - glean_result, chunk_key, file_path + glean_result, + chunk_key, + file_path, + tuple_delimiter=context_base["tuple_delimiter"], + record_delimiter=context_base["record_delimiter"], + completion_delimiter=context_base["completion_delimiter"], ) # Merge results - only add entities and edges with new names @@ -1869,11 +1840,13 @@ async def extract_entities( if ( entity_name not in maybe_nodes ): # Only accetp entities with new name in gleaning stage + maybe_nodes[entity_name] = [] # Explicitly create the list maybe_nodes[entity_name].extend(entities) for edge_key, edges in glean_edges.items(): if ( edge_key not in maybe_edges ): # Only accetp edges with new name in gleaning stage + maybe_edges[edge_key] = [] # Explicitly create the list maybe_edges[edge_key].extend(edges) if now_glean_index == entity_extract_max_gleaning - 1: