Refactor extraction result processing to reduce code duplication
• Extract shared processing logic • Add delimiter pattern fixes • Improve bracket standardization
This commit is contained in:
parent
3cdc98f366
commit
3f8a9abe7e
1 changed files with 101 additions and 128 deletions
|
|
@ -793,6 +793,87 @@ async def _get_cached_extraction_results(
|
||||||
return sorted_cached_results
|
return sorted_cached_results
|
||||||
|
|
||||||
|
|
||||||
|
async def _process_extraction_result(
|
||||||
|
result: str,
|
||||||
|
chunk_key: str,
|
||||||
|
file_path: str = "unknown_source",
|
||||||
|
tuple_delimiter: str = "<|>",
|
||||||
|
record_delimiter: str = "##",
|
||||||
|
completion_delimiter: str = "<|COMPLETE|>",
|
||||||
|
) -> tuple[dict, dict]:
|
||||||
|
"""Process a single extraction result (either initial or gleaning)
|
||||||
|
Args:
|
||||||
|
result (str): The extraction result to process
|
||||||
|
chunk_key (str): The chunk key for source tracking
|
||||||
|
file_path (str): The file path for citation
|
||||||
|
tuple_delimiter (str): Delimiter for tuple fields
|
||||||
|
record_delimiter (str): Delimiter for records
|
||||||
|
completion_delimiter (str): Delimiter for completion
|
||||||
|
Returns:
|
||||||
|
tuple: (nodes_dict, edges_dict) containing the extracted entities and relationships
|
||||||
|
"""
|
||||||
|
maybe_nodes = defaultdict(list)
|
||||||
|
maybe_edges = defaultdict(list)
|
||||||
|
|
||||||
|
# Standardize Chinese brackets around record_delimiter to English brackets
|
||||||
|
bracket_pattern = f"[))](\\s*{re.escape(record_delimiter)}\\s*)[((]"
|
||||||
|
result = re.sub(bracket_pattern, ")\\1(", result)
|
||||||
|
|
||||||
|
records = split_string_by_multi_markers(
|
||||||
|
result,
|
||||||
|
[record_delimiter, completion_delimiter],
|
||||||
|
)
|
||||||
|
|
||||||
|
for record in records:
|
||||||
|
# Remove outer brackets (support English and Chinese brackets)
|
||||||
|
record = record.strip()
|
||||||
|
if record.startswith("(") or record.startswith("("):
|
||||||
|
record = record[1:]
|
||||||
|
if record.endswith(")") or record.endswith(")"):
|
||||||
|
record = record[:-1]
|
||||||
|
|
||||||
|
record = record.strip()
|
||||||
|
if record is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if tuple_delimiter == "<|>":
|
||||||
|
# fix entity<| with entity<|>
|
||||||
|
record = re.sub(r"^entity<\|(?!>)", r"entity<|>", record)
|
||||||
|
# fix relationship<| with relationship<|>
|
||||||
|
record = re.sub(r"^relationship<\|(?!>)", r"relationship<|>", record)
|
||||||
|
# fix <||> with <|>
|
||||||
|
record = record.replace("<||>", "<|>")
|
||||||
|
# fix < | > with <|>
|
||||||
|
record = record.replace("< | >", "<|>")
|
||||||
|
# fix <<|>> with <|>
|
||||||
|
record = record.replace("<<|>>", "<|>")
|
||||||
|
# fix <|>> with <|>
|
||||||
|
record = record.replace("<|>>", "<|>")
|
||||||
|
# fix <<|> with <|>
|
||||||
|
record = record.replace("<<|>", "<|>")
|
||||||
|
|
||||||
|
record_attributes = split_string_by_multi_markers(record, [tuple_delimiter])
|
||||||
|
|
||||||
|
# Try to parse as entity
|
||||||
|
entity_data = await _handle_single_entity_extraction(
|
||||||
|
record_attributes, chunk_key, file_path
|
||||||
|
)
|
||||||
|
if entity_data is not None:
|
||||||
|
maybe_nodes[entity_data["entity_name"]].append(entity_data)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Try to parse as relationship
|
||||||
|
relationship_data = await _handle_single_relationship_extraction(
|
||||||
|
record_attributes, chunk_key, file_path
|
||||||
|
)
|
||||||
|
if relationship_data is not None:
|
||||||
|
maybe_edges[
|
||||||
|
(relationship_data["src_id"], relationship_data["tgt_id"])
|
||||||
|
].append(relationship_data)
|
||||||
|
|
||||||
|
return dict(maybe_nodes), dict(maybe_edges)
|
||||||
|
|
||||||
|
|
||||||
async def _parse_extraction_result(
|
async def _parse_extraction_result(
|
||||||
text_chunks_storage: BaseKVStorage, extraction_result: str, chunk_id: str
|
text_chunks_storage: BaseKVStorage, extraction_result: str, chunk_id: str
|
||||||
) -> tuple[dict, dict]:
|
) -> tuple[dict, dict]:
|
||||||
|
|
@ -814,69 +895,16 @@ async def _parse_extraction_result(
|
||||||
if chunk_data
|
if chunk_data
|
||||||
else "unknown_source"
|
else "unknown_source"
|
||||||
)
|
)
|
||||||
context_base = dict(
|
|
||||||
|
# Call the shared processing function
|
||||||
|
return await _process_extraction_result(
|
||||||
|
extraction_result,
|
||||||
|
chunk_id,
|
||||||
|
file_path,
|
||||||
tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
|
tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
|
||||||
record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"],
|
record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"],
|
||||||
completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
|
completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
|
||||||
)
|
)
|
||||||
maybe_nodes = defaultdict(list)
|
|
||||||
maybe_edges = defaultdict(list)
|
|
||||||
|
|
||||||
# Standardize Chinese brackets around record_delimiter to English brackets
|
|
||||||
record_delimiter = context_base["record_delimiter"]
|
|
||||||
bracket_pattern = f"[))](\\s*{re.escape(record_delimiter)}\\s*)[((]"
|
|
||||||
extraction_result = re.sub(bracket_pattern, ")\\1(", extraction_result)
|
|
||||||
|
|
||||||
# Parse the extraction result using the same logic as in extract_entities
|
|
||||||
records = split_string_by_multi_markers(
|
|
||||||
extraction_result,
|
|
||||||
[context_base["record_delimiter"], context_base["completion_delimiter"]],
|
|
||||||
)
|
|
||||||
|
|
||||||
for record in records:
|
|
||||||
# Remove outer brackets
|
|
||||||
record = record.strip()
|
|
||||||
if record.startswith("(") or record.startswith("("):
|
|
||||||
record = record[1:]
|
|
||||||
if record.endswith(")") or record.endswith(")"):
|
|
||||||
record = record[:-1]
|
|
||||||
|
|
||||||
record = record.strip()
|
|
||||||
if record is None:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if context_base["tuple_delimiter"] == "<|>":
|
|
||||||
# fix entity<| with entity<|>
|
|
||||||
record = re.sub(r"^entity<\|(?!>)", r"entity<|>", record)
|
|
||||||
# fix relationship<| with relationship<|>
|
|
||||||
record = re.sub(r"^relationship<\|(?!>)", r"relationship<|>", record)
|
|
||||||
# fix <||> with <|>
|
|
||||||
record = record.replace("<||>", "<|>")
|
|
||||||
# fix < | > with <|>
|
|
||||||
record = record.replace("< | >", "<|>")
|
|
||||||
|
|
||||||
record_attributes = split_string_by_multi_markers(
|
|
||||||
record, [context_base["tuple_delimiter"]]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Try to parse as entity
|
|
||||||
entity_data = await _handle_single_entity_extraction(
|
|
||||||
record_attributes, chunk_id, file_path
|
|
||||||
)
|
|
||||||
if entity_data is not None:
|
|
||||||
maybe_nodes[entity_data["entity_name"]].append(entity_data)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Try to parse as relationship
|
|
||||||
relationship_data = await _handle_single_relationship_extraction(
|
|
||||||
record_attributes, chunk_id, file_path
|
|
||||||
)
|
|
||||||
if relationship_data is not None:
|
|
||||||
maybe_edges[
|
|
||||||
(relationship_data["src_id"], relationship_data["tgt_id"])
|
|
||||||
].append(relationship_data)
|
|
||||||
|
|
||||||
return dict(maybe_nodes), dict(maybe_edges)
|
|
||||||
|
|
||||||
|
|
||||||
async def _rebuild_single_entity(
|
async def _rebuild_single_entity(
|
||||||
|
|
@ -1738,73 +1766,6 @@ async def extract_entities(
|
||||||
processed_chunks = 0
|
processed_chunks = 0
|
||||||
total_chunks = len(ordered_chunks)
|
total_chunks = len(ordered_chunks)
|
||||||
|
|
||||||
async def _process_extraction_result(
|
|
||||||
result: str, chunk_key: str, file_path: str = "unknown_source"
|
|
||||||
):
|
|
||||||
"""Process a single extraction result (either initial or gleaning)
|
|
||||||
Args:
|
|
||||||
result (str): The extraction result to process
|
|
||||||
chunk_key (str): The chunk key for source tracking
|
|
||||||
file_path (str): The file path for citation
|
|
||||||
Returns:
|
|
||||||
tuple: (nodes_dict, edges_dict) containing the extracted entities and relationships
|
|
||||||
"""
|
|
||||||
maybe_nodes = defaultdict(list)
|
|
||||||
maybe_edges = defaultdict(list)
|
|
||||||
|
|
||||||
# Standardize Chinese brackets around record_delimiter to English brackets
|
|
||||||
record_delimiter = context_base["record_delimiter"]
|
|
||||||
bracket_pattern = f"[))](\\s*{re.escape(record_delimiter)}\\s*)[((]"
|
|
||||||
result = re.sub(bracket_pattern, ")\\1(", result)
|
|
||||||
|
|
||||||
records = split_string_by_multi_markers(
|
|
||||||
result,
|
|
||||||
[context_base["record_delimiter"], context_base["completion_delimiter"]],
|
|
||||||
)
|
|
||||||
|
|
||||||
for record in records:
|
|
||||||
# Remove outer brackets (support English and Chinese brackets)
|
|
||||||
record = record.strip()
|
|
||||||
if record.startswith("(") or record.startswith("("):
|
|
||||||
record = record[1:]
|
|
||||||
if record.endswith(")") or record.endswith(")"):
|
|
||||||
record = record[:-1]
|
|
||||||
|
|
||||||
record = record.strip()
|
|
||||||
if record is None:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if context_base["tuple_delimiter"] == "<|>":
|
|
||||||
# fix entity<| with entity<|>
|
|
||||||
record = re.sub(r"^entity<\|(?!>)", r"entity<|>", record)
|
|
||||||
# fix relationship<| with relationship<|>
|
|
||||||
record = re.sub(r"^relationship<\|(?!>)", r"relationship<|>", record)
|
|
||||||
# fix <||> with <|>
|
|
||||||
record = record.replace("<||>", "<|>")
|
|
||||||
# fix < | > with <|>
|
|
||||||
record = record.replace("< | >", "<|>")
|
|
||||||
|
|
||||||
record_attributes = split_string_by_multi_markers(
|
|
||||||
record, [context_base["tuple_delimiter"]]
|
|
||||||
)
|
|
||||||
|
|
||||||
if_entities = await _handle_single_entity_extraction(
|
|
||||||
record_attributes, chunk_key, file_path
|
|
||||||
)
|
|
||||||
if if_entities is not None:
|
|
||||||
maybe_nodes[if_entities["entity_name"]].append(if_entities)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if_relation = await _handle_single_relationship_extraction(
|
|
||||||
record_attributes, chunk_key, file_path
|
|
||||||
)
|
|
||||||
if if_relation is not None:
|
|
||||||
maybe_edges[(if_relation["src_id"], if_relation["tgt_id"])].append(
|
|
||||||
if_relation
|
|
||||||
)
|
|
||||||
|
|
||||||
return maybe_nodes, maybe_edges
|
|
||||||
|
|
||||||
async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
|
async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
|
||||||
"""Process a single chunk
|
"""Process a single chunk
|
||||||
Args:
|
Args:
|
||||||
|
|
@ -1842,7 +1803,12 @@ async def extract_entities(
|
||||||
|
|
||||||
# Process initial extraction with file path
|
# Process initial extraction with file path
|
||||||
maybe_nodes, maybe_edges = await _process_extraction_result(
|
maybe_nodes, maybe_edges = await _process_extraction_result(
|
||||||
final_result, chunk_key, file_path
|
final_result,
|
||||||
|
chunk_key,
|
||||||
|
file_path,
|
||||||
|
tuple_delimiter=context_base["tuple_delimiter"],
|
||||||
|
record_delimiter=context_base["record_delimiter"],
|
||||||
|
completion_delimiter=context_base["completion_delimiter"],
|
||||||
)
|
)
|
||||||
|
|
||||||
# Process additional gleaning results
|
# Process additional gleaning results
|
||||||
|
|
@ -1861,7 +1827,12 @@ async def extract_entities(
|
||||||
|
|
||||||
# Process gleaning result separately with file path
|
# Process gleaning result separately with file path
|
||||||
glean_nodes, glean_edges = await _process_extraction_result(
|
glean_nodes, glean_edges = await _process_extraction_result(
|
||||||
glean_result, chunk_key, file_path
|
glean_result,
|
||||||
|
chunk_key,
|
||||||
|
file_path,
|
||||||
|
tuple_delimiter=context_base["tuple_delimiter"],
|
||||||
|
record_delimiter=context_base["record_delimiter"],
|
||||||
|
completion_delimiter=context_base["completion_delimiter"],
|
||||||
)
|
)
|
||||||
|
|
||||||
# Merge results - only add entities and edges with new names
|
# Merge results - only add entities and edges with new names
|
||||||
|
|
@ -1869,11 +1840,13 @@ async def extract_entities(
|
||||||
if (
|
if (
|
||||||
entity_name not in maybe_nodes
|
entity_name not in maybe_nodes
|
||||||
): # Only accetp entities with new name in gleaning stage
|
): # Only accetp entities with new name in gleaning stage
|
||||||
|
maybe_nodes[entity_name] = [] # Explicitly create the list
|
||||||
maybe_nodes[entity_name].extend(entities)
|
maybe_nodes[entity_name].extend(entities)
|
||||||
for edge_key, edges in glean_edges.items():
|
for edge_key, edges in glean_edges.items():
|
||||||
if (
|
if (
|
||||||
edge_key not in maybe_edges
|
edge_key not in maybe_edges
|
||||||
): # Only accetp edges with new name in gleaning stage
|
): # Only accetp edges with new name in gleaning stage
|
||||||
|
maybe_edges[edge_key] = [] # Explicitly create the list
|
||||||
maybe_edges[edge_key].extend(edges)
|
maybe_edges[edge_key].extend(edges)
|
||||||
|
|
||||||
if now_glean_index == entity_extract_max_gleaning - 1:
|
if now_glean_index == entity_extract_max_gleaning - 1:
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue