diff --git a/README-zh.md b/README-zh.md
index ea2ad24b..9f80b7c0 100644
--- a/README-zh.md
+++ b/README-zh.md
@@ -33,6 +33,9 @@
+
+
+
diff --git a/README.md b/README.md
index e5a1625f..7e9315a3 100644
--- a/README.md
+++ b/README.md
@@ -33,6 +33,9 @@
+
+
+
diff --git a/env.example b/env.example
index b322baa7..1ad15243 100644
--- a/env.example
+++ b/env.example
@@ -65,7 +65,7 @@ ENABLE_LLM_CACHE=true
# COSINE_THRESHOLD=0.2
### Number of entities or relations retrieved from KG
# TOP_K=40
-### Maxmium number or chunks for naive vactor search
+### Maxmium number or chunks for naive vector search
# CHUNK_TOP_K=20
### control the actual enties send to LLM
# MAX_ENTITY_TOKENS=6000
@@ -125,7 +125,7 @@ ENABLE_LLM_CACHE_FOR_EXTRACT=true
SUMMARY_LANGUAGE=English
### Entity types that the LLM will attempt to recognize
-# ENTITY_TYPES='["Organization", "Person", "Location", "Event", "Technology", "Equipment", "Product", "Document", "Category"]'
+# ENTITY_TYPES='["Person", "Organization", "Location", "Event", "Concept", "Method", "Content", "Data", "Artifact", "NaturalObject"]'
### Chunk size for document splitting, 500~1500 is recommended
# CHUNK_SIZE=1200
@@ -174,9 +174,17 @@ LLM_BINDING_API_KEY=your_api_key
# LLM_BINDING_API_KEY=your_api_key
# LLM_BINDING=openai
-### OpenAI Specific Parameters
-### Set the max_output_tokens to mitigate endless output of some LLM (less than LLM_TIMEOUT * llm_output_tokens/second, i.e. 9000 = 180s * 50 tokens/s)
+### OpenAI Compatible API Specific Parameters
+### Set the max_tokens to mitigate endless output of some LLM (less than LLM_TIMEOUT * llm_output_tokens/second, i.e. 9000 = 180s * 50 tokens/s)
+### Typically, max_tokens does not include prompt content, though some models, such as Gemini Models, are exceptions
+### For vLLM/SGLang doployed models, or most of OpenAI compatible API provider
# OPENAI_LLM_MAX_TOKENS=9000
+### For OpenAI o1-mini or newer modles
+OPENAI_LLM_MAX_COMPLETION_TOKENS=9000
+
+#### OpenAI's new API utilizes max_completion_tokens instead of max_tokens
+# OPENAI_LLM_MAX_TOKENS=9000
+# OPENAI_LLM_MAX_COMPLETION_TOKENS=9000
### OpenRouter Specific Parameters
# OPENAI_LLM_EXTRA_BODY='{"reasoning": {"enabled": false}}'
diff --git a/lightrag/__init__.py b/lightrag/__init__.py
index c10bea8d..87c4b622 100644
--- a/lightrag/__init__.py
+++ b/lightrag/__init__.py
@@ -1,5 +1,5 @@
from .lightrag import LightRAG as LightRAG, QueryParam as QueryParam
-__version__ = "1.4.8"
+__version__ = "1.4.8.1"
__author__ = "Zirui Guo"
__url__ = "https://github.com/HKUDS/LightRAG"
diff --git a/lightrag/api/README-zh.md b/lightrag/api/README-zh.md
index 9f940df1..1aa22403 100644
--- a/lightrag/api/README-zh.md
+++ b/lightrag/api/README-zh.md
@@ -372,7 +372,20 @@ lightrag-server --llm-binding ollama --help
lightrag-server --embedding-binding ollama --help
```
-> 请使用openai兼容方式访问OpenRouter或vLLM部署的LLM。可以通过 `OPENAI_LLM_EXTRA_BODY` 环境变量给OpenRouter或vLLM传递额外的参数,实现推理模式的关闭或者其它个性化控制。
+> 请使用openai兼容方式访问OpenRouter、vLLM或SLang部署的LLM。可以通过 `OPENAI_LLM_EXTRA_BODY` 环境变量给OpenRouter、vLLM或SGLang推理框架传递额外的参数,实现推理模式的关闭或者其它个性化控制。
+
+设置 `max_tokens` 参数旨在**防止在实体关系提取阶段出现LLM 响应输出过长或无休止的循环输出的问题**。设置 `max_tokens` 参数的目的是在超时发生之前截断 LLM 输出,从而防止文档提取失败。这解决了某些包含大量实体和关系的文本块(例如表格或引文)可能导致 LLM 产生过长甚至无限循环输出的问题。此设置对于本地部署的小参数模型尤为重要。`max_tokens` 值可以通过以下公式计算:
+
+```
+# For vLLM/SGLang doployed models, or most of OpenAI compatible API provider
+OPENAI_LLM_MAX_TOKENS=9000
+
+# For Ollama Deployed Modeles
+OLLAMA_LLM_NUM_PREDICT=9000
+
+# For OpenAI o1-mini or newer modles
+OPENAI_LLM_MAX_COMPLETION_TOKENS=9000
+```
### 实体提取配置
diff --git a/lightrag/api/README.md b/lightrag/api/README.md
index 0f010234..44fb6d28 100644
--- a/lightrag/api/README.md
+++ b/lightrag/api/README.md
@@ -374,9 +374,23 @@ lightrag-server --llm-binding ollama --help
lightrag-server --embedding-binding ollama --help
```
-> Please use OpenAI-compatible method to access LLMs deployed by OpenRouter or vLLM. You can pass additional parameters to OpenRouter or vLLM through the `OPENAI_LLM_EXTRA_BODY` environment variable to disable reasoning mode or achieve other personalized controls.
+> Please use OpenAI-compatible method to access LLMs deployed by OpenRouter or vLLM/SGLang. You can pass additional parameters to OpenRouter or vLLM/SGLang through the `OPENAI_LLM_EXTRA_BODY` environment variable to disable reasoning mode or achieve other personalized controls.
+
+Set the max_tokens to **prevent excessively long or endless output loop** during the entity relationship extraction phase for Large Language Model (LLM) responses. The purpose of setting max_tokens parameter is to truncate LLM output before timeouts occur, thereby preventing document extraction failures. This addresses issues where certain text blocks (e.g., tables or citations) containing numerous entities and relationships can lead to overly long or even endless loop outputs from LLMs. This setting is particularly crucial for locally deployed, smaller-parameter models. Max tokens value can be calculated by this formula: `LLM_TIMEOUT * llm_output_tokens/second` (i.e. `180s * 50 tokens/s = 9000`)
+
+```
+# For vLLM/SGLang doployed models, or most of OpenAI compatible API provider
+OPENAI_LLM_MAX_TOKENS=9000
+
+# For Ollama Deployed Modeles
+OLLAMA_LLM_NUM_PREDICT=9000
+
+# For OpenAI o1-mini or newer modles
+OPENAI_LLM_MAX_COMPLETION_TOKENS=9000
+```
### Entity Extraction Configuration
+
* ENABLE_LLM_CACHE_FOR_EXTRACT: Enable LLM cache for entity extraction (default: true)
It's very common to set `ENABLE_LLM_CACHE_FOR_EXTRACT` to true for a test environment to reduce the cost of LLM calls.
diff --git a/lightrag/api/__init__.py b/lightrag/api/__init__.py
index 597178e8..961816ea 100644
--- a/lightrag/api/__init__.py
+++ b/lightrag/api/__init__.py
@@ -1 +1 @@
-__api_version__ = "0218"
+__api_version__ = "0222"
diff --git a/lightrag/api/routers/query_routes.py b/lightrag/api/routers/query_routes.py
index eec4d5e0..7ece18c6 100644
--- a/lightrag/api/routers/query_routes.py
+++ b/lightrag/api/routers/query_routes.py
@@ -134,6 +134,21 @@ class QueryResponse(BaseModel):
)
+class QueryDataResponse(BaseModel):
+ entities: List[Dict[str, Any]] = Field(
+ description="Retrieved entities from knowledge graph"
+ )
+ relationships: List[Dict[str, Any]] = Field(
+ description="Retrieved relationships from knowledge graph"
+ )
+ chunks: List[Dict[str, Any]] = Field(
+ description="Retrieved text chunks from documents"
+ )
+ metadata: Dict[str, Any] = Field(
+ description="Query metadata including mode, keywords, and processing information"
+ )
+
+
def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
combined_auth = get_combined_auth_dependency(api_key)
@@ -221,4 +236,71 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
trace_exception(e)
raise HTTPException(status_code=500, detail=str(e))
+ @router.post(
+ "/query/data",
+ response_model=QueryDataResponse,
+ dependencies=[Depends(combined_auth)],
+ )
+ async def query_data(request: QueryRequest):
+ """
+ Retrieve structured data without LLM generation.
+
+ This endpoint returns raw retrieval results including entities, relationships,
+ and text chunks that would be used for RAG, but without generating a final response.
+ All parameters are compatible with the regular /query endpoint.
+
+ Parameters:
+ request (QueryRequest): The request object containing the query parameters.
+
+ Returns:
+ QueryDataResponse: A Pydantic model containing structured data with entities,
+ relationships, chunks, and metadata.
+
+ Raises:
+ HTTPException: Raised when an error occurs during the request handling process,
+ with status code 500 and detail containing the exception message.
+ """
+ try:
+ param = request.to_query_params(False) # No streaming for data endpoint
+ response = await rag.aquery_data(request.query, param=param)
+
+ # The aquery_data method returns a dict with entities, relationships, chunks, and metadata
+ if isinstance(response, dict):
+ # Ensure all required fields exist and are lists/dicts
+ entities = response.get("entities", [])
+ relationships = response.get("relationships", [])
+ chunks = response.get("chunks", [])
+ metadata = response.get("metadata", {})
+
+ # Validate data types
+ if not isinstance(entities, list):
+ entities = []
+ if not isinstance(relationships, list):
+ relationships = []
+ if not isinstance(chunks, list):
+ chunks = []
+ if not isinstance(metadata, dict):
+ metadata = {}
+
+ return QueryDataResponse(
+ entities=entities,
+ relationships=relationships,
+ chunks=chunks,
+ metadata=metadata,
+ )
+ else:
+ # Fallback for unexpected response format
+ return QueryDataResponse(
+ entities=[],
+ relationships=[],
+ chunks=[],
+ metadata={
+ "error": "Unexpected response format",
+ "raw_response": str(response),
+ },
+ )
+ except Exception as e:
+ trace_exception(e)
+ raise HTTPException(status_code=500, detail=str(e))
+
return router
diff --git a/lightrag/constants.py b/lightrag/constants.py
index de810bc9..7bc70957 100644
--- a/lightrag/constants.py
+++ b/lightrag/constants.py
@@ -24,15 +24,16 @@ DEFAULT_SUMMARY_LENGTH_RECOMMENDED = 600
DEFAULT_SUMMARY_CONTEXT_SIZE = 12000
# Default entities to extract if ENTITY_TYPES is not specified in .env
DEFAULT_ENTITY_TYPES = [
- "Organization",
"Person",
+ "Organization",
"Location",
"Event",
- "Technology",
- "Equipment",
- "Product",
- "Document",
- "Category",
+ "Concept",
+ "Method",
+ "Content",
+ "Data",
+ "Artifact",
+ "NaturalObject",
]
# Separator for graph fields
diff --git a/lightrag/kg/qdrant_impl.py b/lightrag/kg/qdrant_impl.py
index dad95bbc..de1d07e7 100644
--- a/lightrag/kg/qdrant_impl.py
+++ b/lightrag/kg/qdrant_impl.py
@@ -67,9 +67,21 @@ class QdrantVectorDBStorage(BaseVectorStorage):
def create_collection_if_not_exist(
client: QdrantClient, collection_name: str, **kwargs
):
- if client.collection_exists(collection_name):
- return
- client.create_collection(collection_name, **kwargs)
+ exists = False
+ if hasattr(client, "collection_exists"):
+ try:
+ exists = client.collection_exists(collection_name)
+ except Exception:
+ exists = False
+ else:
+ try:
+ client.get_collection(collection_name)
+ exists = True
+ except Exception:
+ exists = False
+
+ if not exists:
+ client.create_collection(collection_name, **kwargs)
def __post_init__(self):
# Check for QDRANT_WORKSPACE environment variable first (higher priority)
@@ -464,7 +476,20 @@ class QdrantVectorDBStorage(BaseVectorStorage):
async with get_storage_lock():
try:
# Delete the collection and recreate it
- if self._client.collection_exists(self.final_namespace):
+ exists = False
+ if hasattr(self._client, "collection_exists"):
+ try:
+ exists = self._client.collection_exists(self.final_namespace)
+ except Exception:
+ exists = False
+ else:
+ try:
+ self._client.get_collection(self.final_namespace)
+ exists = True
+ except Exception:
+ exists = False
+
+ if exists:
self._client.delete_collection(self.final_namespace)
# Recreate the collection
diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py
index 38027a2d..43ee1ef4 100644
--- a/lightrag/lightrag.py
+++ b/lightrag/lightrag.py
@@ -2263,6 +2263,78 @@ class LightRAG:
await self._query_done()
return response
+ async def aquery_data(
+ self,
+ query: str,
+ param: QueryParam = QueryParam(),
+ ) -> dict[str, Any]:
+ """
+ Asynchronous data retrieval API: returns structured retrieval results without LLM generation.
+
+ This function reuses the same logic as aquery but stops before LLM generation,
+ returning the final processed entities, relationships, and chunks data that would be sent to LLM.
+
+ Args:
+ query: Query text.
+ param: Query parameters (same as aquery).
+
+ Returns:
+ dict[str, Any]: Structured data result with entities, relationships, chunks, and metadata
+ """
+ global_config = asdict(self)
+
+ if param.mode in ["local", "global", "hybrid", "mix"]:
+ logger.debug(f"[aquery_data] Using kg_query for mode: {param.mode}")
+ final_data = await kg_query(
+ query.strip(),
+ self.chunk_entity_relation_graph,
+ self.entities_vdb,
+ self.relationships_vdb,
+ self.text_chunks,
+ param,
+ global_config,
+ hashing_kv=self.llm_response_cache,
+ system_prompt=None,
+ chunks_vdb=self.chunks_vdb,
+ return_raw_data=True, # Get final processed data
+ )
+ elif param.mode == "naive":
+ logger.debug(f"[aquery_data] Using naive_query for mode: {param.mode}")
+ final_data = await naive_query(
+ query.strip(),
+ self.chunks_vdb,
+ param,
+ global_config,
+ hashing_kv=self.llm_response_cache,
+ system_prompt=None,
+ return_raw_data=True, # Get final processed data
+ )
+ elif param.mode == "bypass":
+ logger.debug("[aquery_data] Using bypass mode")
+ # bypass mode returns empty data
+ final_data = {
+ "entities": [],
+ "relationships": [],
+ "chunks": [],
+ "metadata": {
+ "query_mode": "bypass",
+ "keywords": {"high_level": [], "low_level": []},
+ },
+ }
+ else:
+ raise ValueError(f"Unknown mode {param.mode}")
+
+ # Log final result counts
+ entities_count = len(final_data.get("entities", []))
+ relationships_count = len(final_data.get("relationships", []))
+ chunks_count = len(final_data.get("chunks", []))
+ logger.debug(
+ f"[aquery_data] Final result: {entities_count} entities, {relationships_count} relationships, {chunks_count} chunks"
+ )
+
+ await self._query_done()
+ return final_data
+
async def _query_done(self):
await self.llm_response_cache.index_done_callback()
diff --git a/lightrag/operate.py b/lightrag/operate.py
index 5e83be95..7da24734 100644
--- a/lightrag/operate.py
+++ b/lightrag/operate.py
@@ -3,10 +3,8 @@ from functools import partial
import asyncio
import json
-import re
-import os
import json_repair
-from typing import Any, AsyncIterator
+from typing import Any, AsyncIterator, overload, Literal
from collections import Counter, defaultdict
from .utils import (
@@ -22,7 +20,6 @@ from .utils import (
handle_cache,
save_to_cache,
CacheData,
- get_conversation_turns,
use_llm_func_with_cache,
update_chunk_cache_list,
remove_think_tags,
@@ -32,6 +29,8 @@ from .utils import (
build_file_path,
safe_vdb_operation_with_exception,
create_prefixed_exception,
+ fix_tuple_delimiter_corruption,
+ _convert_to_user_format,
)
from .base import (
BaseGraphStorage,
@@ -276,16 +275,25 @@ async def _summarize_descriptions(
prompt_template = PROMPTS["summarize_entity_descriptions"]
- # Join descriptions and apply token-based truncation if necessary
- joined_descriptions = "\n\n".join(description_list)
+ # Convert descriptions to JSONL format and apply token-based truncation
tokenizer = global_config["tokenizer"]
summary_context_size = global_config["summary_context_size"]
- # Token-based truncation to ensure input fits within limits
- tokens = tokenizer.encode(joined_descriptions)
- if len(tokens) > summary_context_size:
- truncated_tokens = tokens[:summary_context_size]
- joined_descriptions = tokenizer.decode(truncated_tokens)
+ # Create list of JSON objects with "Description" field
+ json_descriptions = [{"Description": desc} for desc in description_list]
+
+ # Use truncate_list_by_token_size for length truncation
+ truncated_json_descriptions = truncate_list_by_token_size(
+ json_descriptions,
+ key=lambda x: json.dumps(x, ensure_ascii=False),
+ max_token_size=summary_context_size,
+ tokenizer=tokenizer,
+ )
+
+ # Convert to JSONL format (one JSON object per line)
+ joined_descriptions = "\n".join(
+ json.dumps(desc, ensure_ascii=False) for desc in truncated_json_descriptions
+ )
# Prepare context for the prompt
context_base = dict(
@@ -297,12 +305,8 @@ async def _summarize_descriptions(
)
use_prompt = prompt_template.format(**context_base)
- logger.debug(
- f"Summarizing {len(description_list)} descriptions for: {description_name}"
- )
-
# Use LLM function with cache (higher priority for summary generation)
- summary = await use_llm_func_with_cache(
+ summary, _ = await use_llm_func_with_cache(
use_prompt,
use_llm_func,
llm_response_cache=llm_response_cache,
@@ -314,13 +318,15 @@ async def _summarize_descriptions(
async def _handle_single_entity_extraction(
record_attributes: list[str],
chunk_key: str,
+ timestamp: int,
file_path: str = "unknown_source",
):
- if len(record_attributes) < 4 or "entity" not in record_attributes[0]:
+ if len(record_attributes) != 4 or "entity" not in record_attributes[0]:
if len(record_attributes) > 1 and "entity" in record_attributes[0]:
logger.warning(
- f"{chunk_key}: Entity `{record_attributes[1]}` extraction failed -- expecting 4 fields but got {len(record_attributes)}"
+ f"{chunk_key}: LLM output format error; found {len(record_attributes)}/4 feilds on ENTITY `{record_attributes[1]}` @ `{record_attributes[2] if len(record_attributes) > 2 else 'N/A'}`"
)
+ logger.debug(record_attributes)
return None
try:
@@ -366,6 +372,7 @@ async def _handle_single_entity_extraction(
description=entity_description,
source_id=chunk_key,
file_path=file_path,
+ timestamp=timestamp,
)
except ValueError as e:
@@ -383,13 +390,17 @@ async def _handle_single_entity_extraction(
async def _handle_single_relationship_extraction(
record_attributes: list[str],
chunk_key: str,
+ timestamp: int,
file_path: str = "unknown_source",
):
- if len(record_attributes) < 5 or "relationship" not in record_attributes[0]:
- if len(record_attributes) > 1 and "relationship" in record_attributes[0]:
+ if (
+ len(record_attributes) != 5 or "relation" not in record_attributes[0]
+ ): # treat "relationship" and "relation" interchangeable
+ if len(record_attributes) > 1 and "relation" in record_attributes[0]:
logger.warning(
- f"{chunk_key}: Relation `{record_attributes[1]}` extraction failed -- expecting 5 fields but got {len(record_attributes)}"
+ f"{chunk_key}: LLM output format error; found {len(record_attributes)}/5 fields on REALTION `{record_attributes[1]}`~`{record_attributes[2] if len(record_attributes) >2 else 'N/A'}`"
)
+ logger.debug(record_attributes)
return None
try:
@@ -443,6 +454,7 @@ async def _handle_single_relationship_extraction(
keywords=edge_keywords,
source_id=edge_source_id,
file_path=file_path,
+ timestamp=timestamp,
)
except ValueError as e:
@@ -505,7 +517,7 @@ async def _rebuild_knowledge_from_chunks(
pipeline_status["history_messages"].append(status_message)
# Get cached extraction results for these chunks using storage
- # cached_results: chunk_id -> [list of extraction result from LLM cache sorted by created_at]
+ # cached_results: chunk_id -> [list of (extraction_result, create_time) from LLM cache sorted by create_time of the first extraction_result]
cached_results = await _get_cached_extraction_results(
llm_response_cache,
all_referenced_chunk_ids,
@@ -525,36 +537,67 @@ async def _rebuild_knowledge_from_chunks(
chunk_entities = {} # chunk_id -> {entity_name: [entity_data]}
chunk_relationships = {} # chunk_id -> {(src, tgt): [relationship_data]}
- for chunk_id, extraction_results in cached_results.items():
+ for chunk_id, results in cached_results.items():
try:
# Handle multiple extraction results per chunk
chunk_entities[chunk_id] = defaultdict(list)
chunk_relationships[chunk_id] = defaultdict(list)
# process multiple LLM extraction results for a single chunk_id
- for extraction_result in extraction_results:
- entities, relationships = await _parse_extraction_result(
+ for result in results:
+ entities, relationships = await _rebuild_from_extraction_result(
text_chunks_storage=text_chunks_storage,
- extraction_result=extraction_result,
chunk_id=chunk_id,
+ extraction_result=result[0],
+ timestamp=result[1],
)
# Merge entities and relationships from this extraction result
- # Only keep the first occurrence of each entity_name in the same chunk_id
+ # Compare description lengths and keep the better version for the same chunk_id
for entity_name, entity_list in entities.items():
- if (
- entity_name not in chunk_entities[chunk_id]
- or len(chunk_entities[chunk_id][entity_name]) == 0
- ):
+ if entity_name not in chunk_entities[chunk_id]:
+ # New entity for this chunk_id
chunk_entities[chunk_id][entity_name].extend(entity_list)
+ elif len(chunk_entities[chunk_id][entity_name]) == 0:
+ # Empty list, add the new entities
+ chunk_entities[chunk_id][entity_name].extend(entity_list)
+ else:
+ # Compare description lengths and keep the better one
+ existing_desc_len = len(
+ chunk_entities[chunk_id][entity_name][0].get(
+ "description", ""
+ )
+ or ""
+ )
+ new_desc_len = len(entity_list[0].get("description", "") or "")
- # Only keep the first occurrence of each rel_key in the same chunk_id
+ if new_desc_len > existing_desc_len:
+ # Replace with the new entity that has longer description
+ chunk_entities[chunk_id][entity_name] = list(entity_list)
+ # Otherwise keep existing version
+
+ # Compare description lengths and keep the better version for the same chunk_id
for rel_key, rel_list in relationships.items():
- if (
- rel_key not in chunk_relationships[chunk_id]
- or len(chunk_relationships[chunk_id][rel_key]) == 0
- ):
+ if rel_key not in chunk_relationships[chunk_id]:
+ # New relationship for this chunk_id
chunk_relationships[chunk_id][rel_key].extend(rel_list)
+ elif len(chunk_relationships[chunk_id][rel_key]) == 0:
+ # Empty list, add the new relationships
+ chunk_relationships[chunk_id][rel_key].extend(rel_list)
+ else:
+ # Compare description lengths and keep the better one
+ existing_desc_len = len(
+ chunk_relationships[chunk_id][rel_key][0].get(
+ "description", ""
+ )
+ or ""
+ )
+ new_desc_len = len(rel_list[0].get("description", "") or "")
+
+ if new_desc_len > existing_desc_len:
+ # Replace with the new relationship that has longer description
+ chunk_relationships[chunk_id][rel_key] = list(rel_list)
+ # Otherwise keep existing version
except Exception as e:
status_message = (
@@ -792,8 +835,6 @@ async def _get_cached_extraction_results(
cached_results[chunk_id].sort(key=lambda x: x[1])
# Store the earliest create_time for this chunk (first item after sorting)
chunk_earliest_times[chunk_id] = cached_results[chunk_id][0][1]
- # Extract only extraction_result (x[0])
- cached_results[chunk_id] = [item[0] for item in cached_results[chunk_id]]
# Sort cached_results by the earliest create_time of each chunk
sorted_chunk_ids = sorted(
@@ -808,15 +849,15 @@ async def _get_cached_extraction_results(
logger.info(
f"Found {valid_entries} valid cache entries, {len(sorted_cached_results)} chunks with results"
)
- return sorted_cached_results
+ return sorted_cached_results # each item: list(extraction_result, create_time)
async def _process_extraction_result(
result: str,
chunk_key: str,
+ timestamp: int,
file_path: str = "unknown_source",
- tuple_delimiter: str = "<|>",
- record_delimiter: str = "##",
+ tuple_delimiter: str = "<|#|>",
completion_delimiter: str = "<|COMPLETE|>",
) -> tuple[dict, dict]:
"""Process a single extraction result (either initial or gleaning)
@@ -833,57 +874,73 @@ async def _process_extraction_result(
maybe_nodes = defaultdict(list)
maybe_edges = defaultdict(list)
- # Standardize Chinese brackets around record_delimiter to English brackets
- bracket_pattern = f"[))](\\s*{re.escape(record_delimiter)}\\s*)[((]"
- result = re.sub(bracket_pattern, ")\\1(", result)
-
if completion_delimiter not in result:
logger.warning(
f"{chunk_key}: Complete delimiter can not be found in extraction result"
)
+ # Split LLL output result to records by "\n"
records = split_string_by_multi_markers(
result,
- [record_delimiter, completion_delimiter],
+ ["\n", completion_delimiter, completion_delimiter.lower()],
)
+ # Fix LLM output format error which use tuple_delimiter to seperate record instead of "\n"
+ fixed_records = []
for record in records:
- # Remove outer brackets (support English and Chinese brackets)
record = record.strip()
- if record.startswith("(") or record.startswith("("):
- record = record[1:]
- if record.endswith(")") or record.endswith(")"):
- record = record[:-1]
+ if record is None:
+ continue
+ entity_records = split_string_by_multi_markers(
+ record, [f"{tuple_delimiter}entity{tuple_delimiter}"]
+ )
+ for entity_record in entity_records:
+ if not entity_record.startswith("entity") and not entity_record.startswith(
+ "relation"
+ ):
+ entity_record = f"entity<|{entity_record}"
+ entity_relation_records = split_string_by_multi_markers(
+ # treat "relationship" and "relation" interchangeable
+ entity_record,
+ [
+ f"{tuple_delimiter}relationship{tuple_delimiter}",
+ f"{tuple_delimiter}relation{tuple_delimiter}",
+ ],
+ )
+ for entity_relation_record in entity_relation_records:
+ if not entity_relation_record.startswith(
+ "entity"
+ ) and not entity_relation_record.startswith("relation"):
+ entity_relation_record = (
+ f"relation{tuple_delimiter}{entity_relation_record}"
+ )
+ fixed_records = fixed_records + [entity_relation_record]
+ if len(fixed_records) != len(records):
+ logger.warning(
+ f"{chunk_key}: LLM output format error; find LLM use {tuple_delimiter} as record seperators instead new-line"
+ )
+
+ for record in fixed_records:
record = record.strip()
if record is None:
continue
- if tuple_delimiter == "<|>":
- # fix entity<| with entity<|>
- record = re.sub(r"^entity<\|(?!>)", r"entity<|>", record)
- # fix relationship<| with relationship<|>
- record = re.sub(r"^relationship<\|(?!>)", r"relationship<|>", record)
- # fix <||> with <|>
- record = record.replace("<||>", "<|>")
- # fix < | > with <|>
- record = record.replace("< | >", "<|>")
- # fix <<|>> with <|>
- record = record.replace("<<|>>", "<|>")
- # fix <|>> with <|>
- record = record.replace("<|>>", "<|>")
- # fix <<|> with <|>
- record = record.replace("<<|>", "<|>")
- # fix <.|> with <|>
- record = record.replace("<.|>", "<|>")
- # fix <|.> with <|>
- record = record.replace("<|.>", "<|>")
+ # Fix various forms of tuple_delimiter corruption from the LLM output using the dedicated function
+ delimiter_core = tuple_delimiter[2:-2] # Extract "#" from "<|#|>"
+ record = fix_tuple_delimiter_corruption(record, delimiter_core, tuple_delimiter)
+ if delimiter_core != delimiter_core.lower():
+ # change delimiter_core to lower case, and fix again
+ delimiter_core = delimiter_core.lower()
+ record = fix_tuple_delimiter_corruption(
+ record, delimiter_core, tuple_delimiter
+ )
record_attributes = split_string_by_multi_markers(record, [tuple_delimiter])
# Try to parse as entity
entity_data = await _handle_single_entity_extraction(
- record_attributes, chunk_key, file_path
+ record_attributes, chunk_key, timestamp, file_path
)
if entity_data is not None:
maybe_nodes[entity_data["entity_name"]].append(entity_data)
@@ -891,7 +948,7 @@ async def _process_extraction_result(
# Try to parse as relationship
relationship_data = await _handle_single_relationship_extraction(
- record_attributes, chunk_key, file_path
+ record_attributes, chunk_key, timestamp, file_path
)
if relationship_data is not None:
maybe_edges[
@@ -901,8 +958,11 @@ async def _process_extraction_result(
return dict(maybe_nodes), dict(maybe_edges)
-async def _parse_extraction_result(
- text_chunks_storage: BaseKVStorage, extraction_result: str, chunk_id: str
+async def _rebuild_from_extraction_result(
+ text_chunks_storage: BaseKVStorage,
+ extraction_result: str,
+ chunk_id: str,
+ timestamp: int,
) -> tuple[dict, dict]:
"""Parse cached extraction result using the same logic as extract_entities
@@ -927,9 +987,9 @@ async def _parse_extraction_result(
return await _process_extraction_result(
extraction_result,
chunk_id,
+ timestamp,
file_path,
tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
- record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"],
completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
)
@@ -1254,13 +1314,22 @@ async def _merge_nodes_then_upsert(
reverse=True,
)[0][0] # Get the entity type with the highest count
- # merge and deduplicate description
- description_list = list(
- dict.fromkeys(
- already_description
- + [dp["description"] for dp in nodes_data if dp.get("description")]
- )
+ # Deduplicate by description, keeping first occurrence
+ unique_nodes = {}
+ for dp in nodes_data:
+ desc = dp["description"]
+ if desc not in unique_nodes:
+ unique_nodes[desc] = dp
+
+ # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same
+ sorted_nodes = sorted(
+ unique_nodes.values(),
+ key=lambda x: (x.get("timestamp", 0), -len(x.get("description", ""))),
)
+ sorted_descriptions = [dp["description"] for dp in sorted_nodes]
+
+ # Combine already_description with sorted new sorted descriptions
+ description_list = already_description + sorted_descriptions
num_fragment = len(description_list)
already_fragment = len(already_description)
@@ -1282,15 +1351,19 @@ async def _merge_nodes_then_upsert(
# Log based on actual LLM usage
if llm_was_used:
- status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
+ status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}"
else:
- status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
+ status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}"
+
+ if already_fragment > 0 or llm_was_used:
+ logger.info(status_message)
+ if pipeline_status is not None and pipeline_status_lock is not None:
+ async with pipeline_status_lock:
+ pipeline_status["latest_message"] = status_message
+ pipeline_status["history_messages"].append(status_message)
+ else:
+ logger.debug(status_message)
- logger.info(status_message)
- if pipeline_status is not None and pipeline_status_lock is not None:
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = status_message
- pipeline_status["history_messages"].append(status_message)
else:
logger.error(f"Entity {entity_name} has no description")
description = "(no description)"
@@ -1372,12 +1445,23 @@ async def _merge_edges_then_upsert(
# Process edges_data with None checks
weight = sum([dp["weight"] for dp in edges_data] + already_weights)
- description_list = list(
- dict.fromkeys(
- already_description
- + [dp["description"] for dp in edges_data if dp.get("description")]
- )
+ # Deduplicate by description, keeping first occurrence
+ unique_edges = {}
+ for dp in edges_data:
+ if dp.get("description"):
+ desc = dp["description"]
+ if desc not in unique_edges:
+ unique_edges[desc] = dp
+
+ # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same
+ sorted_edges = sorted(
+ unique_edges.values(),
+ key=lambda x: (x.get("timestamp", 0), -len(x.get("description", ""))),
)
+ sorted_descriptions = [dp["description"] for dp in sorted_edges]
+
+ # Combine already_description with sorted new descriptions
+ description_list = already_description + sorted_descriptions
num_fragment = len(description_list)
already_fragment = len(already_description)
@@ -1399,15 +1483,19 @@ async def _merge_edges_then_upsert(
# Log based on actual LLM usage
if llm_was_used:
- status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
+ status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}"
else:
- status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
+ status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}"
+
+ if already_fragment > 0 or llm_was_used:
+ logger.info(status_message)
+ if pipeline_status is not None and pipeline_status_lock is not None:
+ async with pipeline_status_lock:
+ pipeline_status["latest_message"] = status_message
+ pipeline_status["history_messages"].append(status_message)
+ else:
+ logger.debug(status_message)
- logger.info(status_message)
- if pipeline_status is not None and pipeline_status_lock is not None:
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = status_message
- pipeline_status["history_messages"].append(status_message)
else:
logger.error(f"Edge {src_id} - {tgt_id} has no description")
description = "(no description)"
@@ -1739,6 +1827,35 @@ async def merge_nodes_and_edges(
retry_delay=0.1,
)
+ # Update added_entities to entity vector database using safe operation wrapper
+ if added_entities and entity_vdb is not None:
+ for entity_data in added_entities:
+ entity_vdb_id = compute_mdhash_id(
+ entity_data["entity_name"], prefix="ent-"
+ )
+ entity_content = f"{entity_data['entity_name']}\n{entity_data['description']}"
+
+ vdb_data = {
+ entity_vdb_id: {
+ "content": entity_content,
+ "entity_name": entity_data["entity_name"],
+ "source_id": entity_data["source_id"],
+ "entity_type": entity_data["entity_type"],
+ "file_path": entity_data.get(
+ "file_path", "unknown_source"
+ ),
+ }
+ }
+
+ # Use safe operation wrapper - VDB failure must throw exception
+ await safe_vdb_operation_with_exception(
+ operation=lambda data=vdb_data: entity_vdb.upsert(data),
+ operation_name="added_entity_upsert",
+ entity_name=entity_data["entity_name"],
+ max_retries=3,
+ retry_delay=0.1,
+ )
+
return edge_data, added_entities
except Exception as e:
@@ -1937,7 +2054,6 @@ async def extract_entities(
example_context_base = dict(
tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
- record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"],
completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
entity_types=", ".join(entity_types),
language=language,
@@ -1947,7 +2063,6 @@ async def extract_entities(
context_base = dict(
tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
- record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"],
completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
entity_types=",".join(entity_types),
examples=examples,
@@ -1986,7 +2101,7 @@ async def extract_entities(
"entity_continue_extraction_user_prompt"
].format(**{**context_base, "input_text": content})
- final_result = await use_llm_func_with_cache(
+ final_result, timestamp = await use_llm_func_with_cache(
entity_extraction_user_prompt,
use_llm_func,
system_prompt=entity_extraction_system_prompt,
@@ -1996,7 +2111,6 @@ async def extract_entities(
cache_keys_collector=cache_keys_collector,
)
- # Store LLM cache reference in chunk (will be handled by use_llm_func_with_cache)
history = pack_user_ass_to_openai_messages(
entity_extraction_user_prompt, final_result
)
@@ -2005,15 +2119,15 @@ async def extract_entities(
maybe_nodes, maybe_edges = await _process_extraction_result(
final_result,
chunk_key,
+ timestamp,
file_path,
tuple_delimiter=context_base["tuple_delimiter"],
- record_delimiter=context_base["record_delimiter"],
completion_delimiter=context_base["completion_delimiter"],
)
- # Process additional gleaning results
+ # Process additional gleaning results only 1 time when entity_extract_max_gleaning is greater than zero.
if entity_extract_max_gleaning > 0:
- glean_result = await use_llm_func_with_cache(
+ glean_result, timestamp = await use_llm_func_with_cache(
entity_continue_extraction_user_prompt,
use_llm_func,
system_prompt=entity_extraction_system_prompt,
@@ -2028,25 +2142,42 @@ async def extract_entities(
glean_nodes, glean_edges = await _process_extraction_result(
glean_result,
chunk_key,
+ timestamp,
file_path,
tuple_delimiter=context_base["tuple_delimiter"],
- record_delimiter=context_base["record_delimiter"],
completion_delimiter=context_base["completion_delimiter"],
)
- # Merge results - only add entities and edges with new names
- for entity_name, entities in glean_nodes.items():
- if (
- entity_name not in maybe_nodes
- ): # Only accetp entities with new name in gleaning stage
- maybe_nodes[entity_name] = [] # Explicitly create the list
- maybe_nodes[entity_name].extend(entities)
- for edge_key, edges in glean_edges.items():
- if (
- edge_key not in maybe_edges
- ): # Only accetp edges with new name in gleaning stage
- maybe_edges[edge_key] = [] # Explicitly create the list
- maybe_edges[edge_key].extend(edges)
+ # Merge results - compare description lengths to choose better version
+ for entity_name, glean_entities in glean_nodes.items():
+ if entity_name in maybe_nodes:
+ # Compare description lengths and keep the better one
+ original_desc_len = len(
+ maybe_nodes[entity_name][0].get("description", "") or ""
+ )
+ glean_desc_len = len(glean_entities[0].get("description", "") or "")
+
+ if glean_desc_len > original_desc_len:
+ maybe_nodes[entity_name] = list(glean_entities)
+ # Otherwise keep original version
+ else:
+ # New entity from gleaning stage
+ maybe_nodes[entity_name] = list(glean_entities)
+
+ for edge_key, glean_edges in glean_edges.items():
+ if edge_key in maybe_edges:
+ # Compare description lengths and keep the better one
+ original_desc_len = len(
+ maybe_edges[edge_key][0].get("description", "") or ""
+ )
+ glean_desc_len = len(glean_edges[0].get("description", "") or "")
+
+ if glean_desc_len > original_desc_len:
+ maybe_edges[edge_key] = list(glean_edges)
+ # Otherwise keep original version
+ else:
+ # New edge from gleaning stage
+ maybe_edges[edge_key] = list(glean_edges)
# Batch update chunk's llm_cache_list with all collected cache keys
if cache_keys_collector and text_chunks_storage:
@@ -2125,6 +2256,7 @@ async def extract_entities(
return chunk_results
+@overload
async def kg_query(
query: str,
knowledge_graph_inst: BaseGraphStorage,
@@ -2136,7 +2268,39 @@ async def kg_query(
hashing_kv: BaseKVStorage | None = None,
system_prompt: str | None = None,
chunks_vdb: BaseVectorStorage = None,
-) -> str | AsyncIterator[str]:
+ return_raw_data: Literal[True] = False,
+) -> dict[str, Any]: ...
+
+
+@overload
+async def kg_query(
+ query: str,
+ knowledge_graph_inst: BaseGraphStorage,
+ entities_vdb: BaseVectorStorage,
+ relationships_vdb: BaseVectorStorage,
+ text_chunks_db: BaseKVStorage,
+ query_param: QueryParam,
+ global_config: dict[str, str],
+ hashing_kv: BaseKVStorage | None = None,
+ system_prompt: str | None = None,
+ chunks_vdb: BaseVectorStorage = None,
+ return_raw_data: Literal[False] = False,
+) -> str | AsyncIterator[str]: ...
+
+
+async def kg_query(
+ query: str,
+ knowledge_graph_inst: BaseGraphStorage,
+ entities_vdb: BaseVectorStorage,
+ relationships_vdb: BaseVectorStorage,
+ text_chunks_db: BaseKVStorage,
+ query_param: QueryParam,
+ global_config: dict[str, str],
+ hashing_kv: BaseKVStorage | None = None,
+ system_prompt: str | None = None,
+ chunks_vdb: BaseVectorStorage = None,
+ return_raw_data: bool = False,
+) -> str | AsyncIterator[str] | dict[str, Any]:
if not query:
return PROMPTS["fail_response"]
@@ -2162,11 +2326,13 @@ async def kg_query(
query_param.user_prompt or "",
query_param.enable_rerank,
)
- cached_response = await handle_cache(
+ cached_result = await handle_cache(
hashing_kv, args_hash, query, query_param.mode, cache_type="query"
)
- if cached_response is not None:
- return cached_response
+ if cached_result is not None:
+ cached_response, _ = cached_result # Extract content, ignore timestamp
+ if not query_param.only_need_context and not query_param.only_need_prompt:
+ return cached_response
hl_keywords, ll_keywords = await get_keywords_from_query(
query, query_param, global_config, hashing_kv
@@ -2190,7 +2356,34 @@ async def kg_query(
ll_keywords_str = ", ".join(ll_keywords) if ll_keywords else ""
hl_keywords_str = ", ".join(hl_keywords) if hl_keywords else ""
- # Build context
+ # If raw data is requested, get both context and raw data
+ if return_raw_data:
+ context_result = await _build_query_context(
+ query,
+ ll_keywords_str,
+ hl_keywords_str,
+ knowledge_graph_inst,
+ entities_vdb,
+ relationships_vdb,
+ text_chunks_db,
+ query_param,
+ chunks_vdb,
+ return_raw_data=True,
+ )
+
+ if isinstance(context_result, tuple):
+ context, raw_data = context_result
+ logger.debug(f"[kg_query] Context length: {len(context) if context else 0}")
+ logger.debug(
+ f"[kg_query] Raw data entities: {len(raw_data.get('entities', []))}, relationships: {len(raw_data.get('relationships', []))}, chunks: {len(raw_data.get('chunks', []))}"
+ )
+ return raw_data
+ else:
+ raise RuntimeError(
+ "Failed to build query context for raw data. Expected a tuple, but got a different type."
+ )
+
+ # Build context (normal flow)
context = await _build_query_context(
query,
ll_keywords_str,
@@ -2203,18 +2396,11 @@ async def kg_query(
chunks_vdb,
)
- if query_param.only_need_context:
+ if query_param.only_need_context and not query_param.only_need_prompt:
return context if context is not None else PROMPTS["fail_response"]
if context is None:
return PROMPTS["fail_response"]
- # Process conversation history
- history_context = ""
- if query_param.conversation_history:
- history_context = get_conversation_turns(
- query_param.conversation_history, query_param.history_turns
- )
-
# Build system prompt
user_prompt = (
query_param.user_prompt
@@ -2225,7 +2411,6 @@ async def kg_query(
sys_prompt = sys_prompt_temp.format(
context_data=context,
response_type=query_param.response_type,
- history=history_context,
user_prompt=user_prompt,
)
@@ -2241,8 +2426,9 @@ async def kg_query(
response = await use_model_func(
query,
system_prompt=sys_prompt,
- stream=query_param.stream,
+ history_messages=query_param.conversation_history,
enable_cot=True,
+ stream=query_param.stream,
)
if isinstance(response, str) and len(response) > len(sys_prompt):
response = (
@@ -2336,10 +2522,11 @@ async def extract_keywords_only(
param.hl_keywords or [],
param.ll_keywords or [],
)
- cached_response = await handle_cache(
+ cached_result = await handle_cache(
hashing_kv, args_hash, text, param.mode, cache_type="keywords"
)
- if cached_response is not None:
+ if cached_result is not None:
+ cached_response, _ = cached_result # Extract content, ignore timestamp
try:
keywords_data = json_repair.loads(cached_response)
return keywords_data.get("high_level_keywords", []), keywords_data.get(
@@ -2355,14 +2542,7 @@ async def extract_keywords_only(
language = global_config["addon_params"].get("language", DEFAULT_SUMMARY_LANGUAGE)
- # 3. Process conversation history
- # history_context = ""
- # if param.conversation_history:
- # history_context = get_conversation_turns(
- # param.conversation_history, param.history_turns
- # )
-
- # 4. Build the keyword-extraction prompt
+ # 3. Build the keyword-extraction prompt
kw_prompt = PROMPTS["keywords_extraction"].format(
query=text,
examples=examples,
@@ -2375,7 +2555,7 @@ async def extract_keywords_only(
f"[extract_keywords] Sending to LLM: {len_of_prompts:,} tokens (Prompt: {len_of_prompts})"
)
- # 5. Call the LLM for keyword extraction
+ # 4. Call the LLM for keyword extraction
if param.model_func:
use_model_func = param.model_func
else:
@@ -2385,7 +2565,7 @@ async def extract_keywords_only(
result = await use_model_func(kw_prompt, keyword_extraction=True)
- # 6. Parse out JSON from the LLM response
+ # 5. Parse out JSON from the LLM response
result = remove_think_tags(result)
try:
keywords_data = json_repair.loads(result)
@@ -2400,7 +2580,7 @@ async def extract_keywords_only(
hl_keywords = keywords_data.get("high_level_keywords", [])
ll_keywords = keywords_data.get("low_level_keywords", [])
- # 7. Cache only the processed keywords with cache type
+ # 6. Cache only the processed keywords with cache type
if hl_keywords or ll_keywords:
cache_data = {
"high_level_keywords": hl_keywords,
@@ -2460,12 +2640,15 @@ async def _get_vector_context(
try:
# Use chunk_top_k if specified, otherwise fall back to top_k
search_top_k = query_param.chunk_top_k or query_param.top_k
+ cosine_threshold = chunks_vdb.cosine_better_than_threshold
results = await chunks_vdb.query(
query, top_k=search_top_k, query_embedding=query_embedding
)
if not results:
- logger.info(f"Naive query: 0 chunks (chunk_top_k: {search_top_k})")
+ logger.info(
+ f"Naive query: 0 chunks (chunk_top_k:{search_top_k} cosine:{cosine_threshold})"
+ )
return []
valid_chunks = []
@@ -2481,7 +2664,7 @@ async def _get_vector_context(
valid_chunks.append(chunk_with_metadata)
logger.info(
- f"Naive query: {len(valid_chunks)} chunks (chunk_top_k: {search_top_k})"
+ f"Naive query: {len(valid_chunks)} chunks (chunk_top_k:{search_top_k} cosine:{cosine_threshold})"
)
return valid_chunks
@@ -2490,7 +2673,7 @@ async def _get_vector_context(
return []
-async def _build_query_context(
+async def _perform_kg_search(
query: str,
ll_keywords: str,
hl_keywords: str,
@@ -2500,25 +2683,21 @@ async def _build_query_context(
text_chunks_db: BaseKVStorage,
query_param: QueryParam,
chunks_vdb: BaseVectorStorage = None,
-):
- if not query:
- logger.warning("Query is empty, skipping context building")
- return ""
+) -> dict[str, Any]:
+ """
+ Pure search logic that retrieves raw entities, relations, and vector chunks.
+ No token truncation or formatting - just raw search results.
+ """
- logger.info(f"Process {os.getpid()} building query context...")
-
- # Collect chunks from different sources separately
- vector_chunks = []
- entity_chunks = []
- relation_chunks = []
- entities_context = []
- relations_context = []
-
- # Store original data for later text chunk retrieval
+ # Initialize result containers
local_entities = []
local_relations = []
global_entities = []
global_relations = []
+ vector_chunks = []
+ chunk_tracking = {}
+
+ # Handle different query modes
# Track chunk sources and metadata for final logging
chunk_tracking = {} # chunk_id -> {source, frequency, order}
@@ -2574,7 +2753,7 @@ async def _build_query_context(
query_param,
)
- # Get vector chunks first if in mix mode
+ # Get vector chunks for mix mode
if query_param.mode == "mix" and chunks_vdb:
vector_chunks = await _get_vector_context(
query,
@@ -2594,11 +2773,9 @@ async def _build_query_context(
else:
logger.warning(f"Vector chunk missing chunk_id: {chunk}")
- # Use round-robin merge to combine local and global data fairly
+ # Round-robin merge entities
final_entities = []
seen_entities = set()
-
- # Round-robin merge entities
max_len = max(len(local_entities), len(global_entities))
for i in range(max_len):
# First from local
@@ -2620,7 +2797,6 @@ async def _build_query_context(
# Round-robin merge relations
final_relations = []
seen_relations = set()
-
max_len = max(len(local_relations), len(global_relations))
for i in range(max_len):
# First from local
@@ -2653,153 +2829,219 @@ async def _build_query_context(
final_relations.append(relation)
seen_relations.add(rel_key)
- # Generate entities context
+ logger.info(
+ f"Raw search results: {len(final_entities)} entities, {len(final_relations)} relations, {len(vector_chunks)} vector chunks"
+ )
+
+ return {
+ "final_entities": final_entities,
+ "final_relations": final_relations,
+ "vector_chunks": vector_chunks,
+ "chunk_tracking": chunk_tracking,
+ "query_embedding": query_embedding,
+ }
+
+
+async def _apply_token_truncation(
+ search_result: dict[str, Any],
+ query_param: QueryParam,
+ global_config: dict[str, str],
+) -> dict[str, Any]:
+ """
+ Apply token-based truncation to entities and relations for LLM efficiency.
+ """
+ tokenizer = global_config.get("tokenizer")
+ if not tokenizer:
+ logger.warning("No tokenizer found, skipping truncation")
+ return {
+ "truncated_entities": search_result["final_entities"],
+ "truncated_relations": search_result["final_relations"],
+ "entities_context": [],
+ "relations_context": [],
+ "filtered_entities": search_result["final_entities"],
+ "filtered_relations": search_result["final_relations"],
+ "entity_id_to_original": {},
+ "relation_id_to_original": {},
+ }
+
+ # Get token limits from query_param with fallbacks
+ max_entity_tokens = getattr(
+ query_param,
+ "max_entity_tokens",
+ global_config.get("max_entity_tokens", DEFAULT_MAX_ENTITY_TOKENS),
+ )
+ max_relation_tokens = getattr(
+ query_param,
+ "max_relation_tokens",
+ global_config.get("max_relation_tokens", DEFAULT_MAX_RELATION_TOKENS),
+ )
+
+ final_entities = search_result["final_entities"]
+ final_relations = search_result["final_relations"]
+
+ # Create mappings from entity/relation identifiers to original data
+ entity_id_to_original = {}
+ relation_id_to_original = {}
+
+ # Generate entities context for truncation
entities_context = []
- for i, n in enumerate(final_entities):
- created_at = n.get("created_at", "UNKNOWN")
+ for i, entity in enumerate(final_entities):
+ entity_name = entity["entity_name"]
+ created_at = entity.get("created_at", "UNKNOWN")
if isinstance(created_at, (int, float)):
created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at))
- # Get file path from node data
- file_path = n.get("file_path", "unknown_source")
+ # Store mapping from entity name to original data
+ entity_id_to_original[entity_name] = entity
entities_context.append(
{
"id": i + 1,
- "entity": n["entity_name"],
- "type": n.get("entity_type", "UNKNOWN"),
- "description": n.get("description", "UNKNOWN"),
+ "entity": entity_name,
+ "type": entity.get("entity_type", "UNKNOWN"),
+ "description": entity.get("description", "UNKNOWN"),
"created_at": created_at,
- "file_path": file_path,
+ "file_path": entity.get("file_path", "unknown_source"),
}
)
- # Generate relations context
+ # Generate relations context for truncation
relations_context = []
- for i, e in enumerate(final_relations):
- created_at = e.get("created_at", "UNKNOWN")
- # Convert timestamp to readable format
+ for i, relation in enumerate(final_relations):
+ created_at = relation.get("created_at", "UNKNOWN")
if isinstance(created_at, (int, float)):
created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at))
- # Get file path from edge data
- file_path = e.get("file_path", "unknown_source")
-
# Handle different relation data formats
- if "src_tgt" in e:
- entity1, entity2 = e["src_tgt"]
+ if "src_tgt" in relation:
+ entity1, entity2 = relation["src_tgt"]
else:
- entity1, entity2 = e.get("src_id"), e.get("tgt_id")
+ entity1, entity2 = relation.get("src_id"), relation.get("tgt_id")
+
+ # Store mapping from relation pair to original data
+ relation_key = (entity1, entity2)
+ relation_id_to_original[relation_key] = relation
relations_context.append(
{
"id": i + 1,
"entity1": entity1,
"entity2": entity2,
- "description": e.get("description", "UNKNOWN"),
+ "description": relation.get("description", "UNKNOWN"),
"created_at": created_at,
- "file_path": file_path,
+ "file_path": relation.get("file_path", "unknown_source"),
}
)
logger.debug(
- f"Initial KG query results: {len(entities_context)} entities, {len(relations_context)} relations"
+ f"Before truncation: {len(entities_context)} entities, {len(relations_context)} relations"
)
- # Unified token control system - Apply precise token limits to entities and relations
- tokenizer = text_chunks_db.global_config.get("tokenizer")
- # Get new token limits from query_param (with fallback to global_config)
- max_entity_tokens = getattr(
- query_param,
- "max_entity_tokens",
- text_chunks_db.global_config.get(
- "max_entity_tokens", DEFAULT_MAX_ENTITY_TOKENS
- ),
- )
- max_relation_tokens = getattr(
- query_param,
- "max_relation_tokens",
- text_chunks_db.global_config.get(
- "max_relation_tokens", DEFAULT_MAX_RELATION_TOKENS
- ),
- )
- max_total_tokens = getattr(
- query_param,
- "max_total_tokens",
- text_chunks_db.global_config.get("max_total_tokens", DEFAULT_MAX_TOTAL_TOKENS),
- )
-
- # Truncate entities based on complete JSON serialization
+ # Apply token-based truncation
if entities_context:
- # Process entities context to replace GRAPH_FIELD_SEP with : in file_path fields
+ # Remove file_path and created_at for token calculation
+ entities_context_for_truncation = []
for entity in entities_context:
- # remove file_path and created_at
- entity.pop("file_path", None)
- entity.pop("created_at", None)
- # if "file_path" in entity and entity["file_path"]:
- # entity["file_path"] = entity["file_path"].replace(GRAPH_FIELD_SEP, ";")
+ entity_copy = entity.copy()
+ entity_copy.pop("file_path", None)
+ entity_copy.pop("created_at", None)
+ entities_context_for_truncation.append(entity_copy)
entities_context = truncate_list_by_token_size(
- entities_context,
- key=lambda x: json.dumps(x, ensure_ascii=False),
+ entities_context_for_truncation,
+ key=lambda x: "\n".join(
+ json.dumps(item, ensure_ascii=False) for item in [x]
+ ),
max_token_size=max_entity_tokens,
tokenizer=tokenizer,
)
- # Truncate relations based on complete JSON serialization
if relations_context:
- # Process relations context to replace GRAPH_FIELD_SEP with : in file_path fields
+ # Remove file_path and created_at for token calculation
+ relations_context_for_truncation = []
for relation in relations_context:
- # remove file_path and created_at
- relation.pop("file_path", None)
- relation.pop("created_at", None)
- # if "file_path" in relation and relation["file_path"]:
- # relation["file_path"] = relation["file_path"].replace(
- # GRAPH_FIELD_SEP, ";"
- # )
+ relation_copy = relation.copy()
+ relation_copy.pop("file_path", None)
+ relation_copy.pop("created_at", None)
+ relations_context_for_truncation.append(relation_copy)
relations_context = truncate_list_by_token_size(
- relations_context,
- key=lambda x: json.dumps(x, ensure_ascii=False),
+ relations_context_for_truncation,
+ key=lambda x: "\n".join(
+ json.dumps(item, ensure_ascii=False) for item in [x]
+ ),
max_token_size=max_relation_tokens,
tokenizer=tokenizer,
)
- # After truncation, get text chunks based on final entities and relations
logger.info(
- f"Truncated KG query results: {len(entities_context)} entities, {len(relations_context)} relations"
+ f"After truncation: {len(entities_context)} entities, {len(relations_context)} relations"
)
- # Create filtered data based on truncated context
- final_node_datas = []
- if entities_context and final_entities:
+ # Create filtered original data based on truncated context
+ filtered_entities = []
+ filtered_entity_id_to_original = {}
+ if entities_context:
final_entity_names = {e["entity"] for e in entities_context}
seen_nodes = set()
- for node in final_entities:
- name = node.get("entity_name")
+ for entity in final_entities:
+ name = entity.get("entity_name")
if name in final_entity_names and name not in seen_nodes:
- final_node_datas.append(node)
+ filtered_entities.append(entity)
+ filtered_entity_id_to_original[name] = entity
seen_nodes.add(name)
- final_edge_datas = []
- if relations_context and final_relations:
+ filtered_relations = []
+ filtered_relation_id_to_original = {}
+ if relations_context:
final_relation_pairs = {(r["entity1"], r["entity2"]) for r in relations_context}
seen_edges = set()
- for edge in final_relations:
- src, tgt = edge.get("src_id"), edge.get("tgt_id")
+ for relation in final_relations:
+ src, tgt = relation.get("src_id"), relation.get("tgt_id")
if src is None or tgt is None:
- src, tgt = edge.get("src_tgt", (None, None))
+ src, tgt = relation.get("src_tgt", (None, None))
pair = (src, tgt)
if pair in final_relation_pairs and pair not in seen_edges:
- final_edge_datas.append(edge)
+ filtered_relations.append(relation)
+ filtered_relation_id_to_original[pair] = relation
seen_edges.add(pair)
- # Get text chunks based on final filtered data
- # To preserve the influence of entity order, entiy-based chunks should not be deduplcicated by vector_chunks
- if final_node_datas:
+ return {
+ "truncated_entities": final_entities, # Keep original for backward compatibility
+ "truncated_relations": final_relations, # Keep original for backward compatibility
+ "entities_context": entities_context, # Formatted and truncated for LLM
+ "relations_context": relations_context, # Formatted and truncated for LLM
+ "filtered_entities": filtered_entities, # Original entities that passed truncation
+ "filtered_relations": filtered_relations, # Original relations that passed truncation
+ "entity_id_to_original": filtered_entity_id_to_original, # Mapping for original data lookup
+ "relation_id_to_original": filtered_relation_id_to_original, # Mapping for original data lookup
+ }
+
+
+async def _merge_all_chunks(
+ filtered_entities: list[dict],
+ filtered_relations: list[dict],
+ vector_chunks: list[dict],
+ query: str = "",
+ knowledge_graph_inst: BaseGraphStorage = None,
+ text_chunks_db: BaseKVStorage = None,
+ query_param: QueryParam = None,
+ chunks_vdb: BaseVectorStorage = None,
+ chunk_tracking: dict = None,
+ query_embedding: list[float] = None,
+) -> list[dict]:
+ """
+ Merge chunks from different sources: vector_chunks + entity_chunks + relation_chunks.
+ """
+ if chunk_tracking is None:
+ chunk_tracking = {}
+
+ # Get chunks from entities
+ entity_chunks = []
+ if filtered_entities and text_chunks_db:
entity_chunks = await _find_related_text_unit_from_entities(
- final_node_datas,
+ filtered_entities,
query_param,
text_chunks_db,
knowledge_graph_inst,
@@ -2809,21 +3051,21 @@ async def _build_query_context(
query_embedding=query_embedding,
)
- # Find deduplcicated chunks from edge
- # Deduplication cause chunks solely relation-based to be prioritized and sent to the LLM when re-ranking is disabled
- if final_edge_datas:
+ # Get chunks from relations
+ relation_chunks = []
+ if filtered_relations and text_chunks_db:
relation_chunks = await _find_related_text_unit_from_relations(
- final_edge_datas,
+ filtered_relations,
query_param,
text_chunks_db,
- entity_chunks,
+ entity_chunks, # For deduplication
query,
chunks_vdb,
chunk_tracking=chunk_tracking,
query_embedding=query_embedding,
)
- # Round-robin merge chunks from different sources with deduplication by chunk_id
+ # Round-robin merge chunks from different sources with deduplication
merged_chunks = []
seen_chunk_ids = set()
max_len = max(len(vector_chunks), len(entity_chunks), len(relation_chunks))
@@ -2873,16 +3115,90 @@ async def _build_query_context(
)
logger.info(
- f"Round-robin merged total chunks from {origin_len} to {len(merged_chunks)}"
+ f"Round-robin merged chunks: {origin_len} -> {len(merged_chunks)} (deduplication: {origin_len - len(merged_chunks)})"
+ )
+
+ return merged_chunks
+
+
+async def _build_llm_context(
+ entities_context: list[dict],
+ relations_context: list[dict],
+ merged_chunks: list[dict],
+ query: str,
+ query_param: QueryParam,
+ global_config: dict[str, str],
+ chunk_tracking: dict = None,
+ return_raw_data: bool = False,
+ entity_id_to_original: dict = None,
+ relation_id_to_original: dict = None,
+) -> str | tuple[str, dict[str, Any]]:
+ """
+ Build the final LLM context string with token processing.
+ This includes dynamic token calculation and final chunk truncation.
+ """
+ tokenizer = global_config.get("tokenizer")
+ if not tokenizer:
+ logger.warning("No tokenizer found, building context without token limits")
+
+ # Build basic context without token processing
+ entities_str = "\n".join(
+ json.dumps(entity, ensure_ascii=False) for entity in entities_context
+ )
+ relations_str = "\n".join(
+ json.dumps(relation, ensure_ascii=False) for relation in relations_context
+ )
+
+ text_units_context = []
+ for i, chunk in enumerate(merged_chunks):
+ text_units_context.append(
+ {
+ "id": i + 1,
+ "content": chunk["content"],
+ "file_path": chunk.get("file_path", "unknown_source"),
+ }
+ )
+
+ text_units_str = json.dumps(text_units_context, ensure_ascii=False)
+
+ return f"""-----Entities(KG)-----
+
+```json
+{entities_str}
+```
+
+-----Relationships(KG)-----
+
+```json
+{relations_str}
+```
+
+-----Document Chunks(DC)-----
+
+```json
+{text_units_str}
+```
+
+"""
+
+ # Get token limits
+ max_total_tokens = getattr(
+ query_param,
+ "max_total_tokens",
+ global_config.get("max_total_tokens", DEFAULT_MAX_TOTAL_TOKENS),
)
- # Apply token processing to merged chunks
text_units_context = []
truncated_chunks = []
+
if merged_chunks:
# Calculate dynamic token limit for text chunks
- entities_str = json.dumps(entities_context, ensure_ascii=False)
- relations_str = json.dumps(relations_context, ensure_ascii=False)
+ entities_str = "\n".join(
+ json.dumps(entity, ensure_ascii=False) for entity in entities_context
+ )
+ relations_str = "\n".join(
+ json.dumps(relation, ensure_ascii=False) for relation in relations_context
+ )
# Calculate base context tokens (entities + relations + template)
kg_context_template = """-----Entities(KG)-----
@@ -2909,18 +3225,7 @@ async def _build_query_context(
)
kg_context_tokens = len(tokenizer.encode(kg_context))
- # Calculate actual system prompt overhead dynamically
- # 1. Converstion history not included in context length calculation
- history_context = ""
- # if query_param.conversation_history:
- # history_context = get_conversation_turns(
- # query_param.conversation_history, query_param.history_turns
- # )
- # history_tokens = (
- # len(tokenizer.encode(history_context)) if history_context else 0
- # )
-
- # 2. Calculate system prompt template tokens (excluding context_data)
+ # Calculate system prompt template overhead
user_prompt = query_param.user_prompt if query_param.user_prompt else ""
response_type = (
query_param.response_type
@@ -2928,14 +3233,13 @@ async def _build_query_context(
else "Multiple Paragraphs"
)
- # Get the system prompt template from PROMPTS
- sys_prompt_template = text_chunks_db.global_config.get(
+ # Get the system prompt template from PROMPTS or global_config
+ sys_prompt_template = global_config.get(
"system_prompt_template", PROMPTS["rag_response"]
)
- # Create a sample system prompt with placeholders filled (excluding context_data)
+ # Create sample system prompt for overhead calculation
sample_sys_prompt = sys_prompt_template.format(
- history=history_context,
context_data="", # Empty for overhead calculation
response_type=response_type,
user_prompt=user_prompt,
@@ -2961,7 +3265,7 @@ async def _build_query_context(
query=query,
unique_chunks=merged_chunks,
query_param=query_param,
- global_config=text_chunks_db.global_config,
+ global_config=global_config,
source_type=query_param.mode,
chunk_token_limit=available_chunk_tokens, # Pass dynamic limit
)
@@ -2986,7 +3290,17 @@ async def _build_query_context(
# not necessary to use LLM to generate a response
if not entities_context and not relations_context:
- return None
+ if return_raw_data:
+ # Return empty raw data structure when no entities/relations
+ empty_raw_data = _convert_to_user_format(
+ [],
+ [],
+ [],
+ query_param.mode,
+ )
+ return None, empty_raw_data
+ else:
+ return None
# output chunks tracking infomations
# format: / (e.g., E5/2 R2/1 C1/1)
@@ -3006,9 +3320,15 @@ async def _build_query_context(
if chunk_tracking_log:
logger.info(f"chunks: {' '.join(chunk_tracking_log)}")
- entities_str = json.dumps(entities_context, ensure_ascii=False)
- relations_str = json.dumps(relations_context, ensure_ascii=False)
- text_units_str = json.dumps(text_units_context, ensure_ascii=False)
+ entities_str = "\n".join(
+ json.dumps(entity, ensure_ascii=False) for entity in entities_context
+ )
+ relations_str = "\n".join(
+ json.dumps(relation, ensure_ascii=False) for relation in relations_context
+ )
+ text_units_str = "\n".join(
+ json.dumps(text_unit, ensure_ascii=False) for text_unit in text_units_context
+ )
result = f"""-----Entities(KG)-----
@@ -3029,7 +3349,160 @@ async def _build_query_context(
```
"""
- return result
+
+ # If final data is requested, return both context and complete data structure
+ if return_raw_data:
+ logger.debug(
+ f"[_build_llm_context] Converting to user format: {len(entities_context)} entities, {len(relations_context)} relations, {len(truncated_chunks)} chunks"
+ )
+ final_data = _convert_to_user_format(
+ entities_context,
+ relations_context,
+ truncated_chunks,
+ query_param.mode,
+ entity_id_to_original,
+ relation_id_to_original,
+ )
+ logger.debug(
+ f"[_build_llm_context] Final data after conversion: {len(final_data.get('entities', []))} entities, {len(final_data.get('relationships', []))} relationships, {len(final_data.get('chunks', []))} chunks"
+ )
+ return result, final_data
+ else:
+ return result
+
+
+# Now let's update the old _build_query_context to use the new architecture
+async def _build_query_context(
+ query: str,
+ ll_keywords: str,
+ hl_keywords: str,
+ knowledge_graph_inst: BaseGraphStorage,
+ entities_vdb: BaseVectorStorage,
+ relationships_vdb: BaseVectorStorage,
+ text_chunks_db: BaseKVStorage,
+ query_param: QueryParam,
+ chunks_vdb: BaseVectorStorage = None,
+ return_raw_data: bool = False,
+) -> str | tuple[str, dict[str, Any]]:
+ """
+ Main query context building function using the new 4-stage architecture:
+ 1. Search -> 2. Truncate -> 3. Merge chunks -> 4. Build LLM context
+ """
+
+ if not query:
+ logger.warning("Query is empty, skipping context building")
+ return ""
+
+ # Stage 1: Pure search
+ search_result = await _perform_kg_search(
+ query,
+ ll_keywords,
+ hl_keywords,
+ knowledge_graph_inst,
+ entities_vdb,
+ relationships_vdb,
+ text_chunks_db,
+ query_param,
+ chunks_vdb,
+ )
+
+ if not search_result["final_entities"] and not search_result["final_relations"]:
+ if query_param.mode != "mix":
+ return None
+ else:
+ if not search_result["chunk_tracking"]:
+ return None
+
+ # Stage 2: Apply token truncation for LLM efficiency
+ truncation_result = await _apply_token_truncation(
+ search_result,
+ query_param,
+ text_chunks_db.global_config,
+ )
+
+ # Stage 3: Merge chunks using filtered entities/relations
+ merged_chunks = await _merge_all_chunks(
+ filtered_entities=truncation_result["filtered_entities"],
+ filtered_relations=truncation_result["filtered_relations"],
+ vector_chunks=search_result["vector_chunks"],
+ query=query,
+ knowledge_graph_inst=knowledge_graph_inst,
+ text_chunks_db=text_chunks_db,
+ query_param=query_param,
+ chunks_vdb=chunks_vdb,
+ chunk_tracking=search_result["chunk_tracking"],
+ query_embedding=search_result["query_embedding"],
+ )
+
+ if (
+ not merged_chunks
+ and not truncation_result["entities_context"]
+ and not truncation_result["relations_context"]
+ ):
+ return None
+
+ # Stage 4: Build final LLM context with dynamic token processing
+
+ if return_raw_data:
+ # Convert keywords strings to lists
+ hl_keywords_list = hl_keywords.split(", ") if hl_keywords else []
+ ll_keywords_list = ll_keywords.split(", ") if ll_keywords else []
+
+ # Get both context and final data - when return_raw_data=True, _build_llm_context always returns tuple
+ context, raw_data = await _build_llm_context(
+ entities_context=truncation_result["entities_context"],
+ relations_context=truncation_result["relations_context"],
+ merged_chunks=merged_chunks,
+ query=query,
+ query_param=query_param,
+ global_config=text_chunks_db.global_config,
+ chunk_tracking=search_result["chunk_tracking"],
+ return_raw_data=True,
+ entity_id_to_original=truncation_result["entity_id_to_original"],
+ relation_id_to_original=truncation_result["relation_id_to_original"],
+ )
+
+ # Convert keywords strings to lists and add complete metadata to raw_data
+ hl_keywords_list = hl_keywords.split(", ") if hl_keywords else []
+ ll_keywords_list = ll_keywords.split(", ") if ll_keywords else []
+
+ # Add complete metadata to raw_data
+ raw_data["metadata"]["keywords"] = {
+ "high_level": hl_keywords_list,
+ "low_level": ll_keywords_list,
+ }
+ raw_data["metadata"]["processing_info"] = {
+ "total_entities_found": len(search_result.get("final_entities", [])),
+ "total_relations_found": len(search_result.get("final_relations", [])),
+ "entities_after_truncation": len(
+ truncation_result.get("filtered_entities", [])
+ ),
+ "relations_after_truncation": len(
+ truncation_result.get("filtered_relations", [])
+ ),
+ "merged_chunks_count": len(merged_chunks),
+ "final_chunks_count": len(raw_data.get("chunks", [])),
+ }
+
+ logger.debug(
+ f"[_build_query_context] Context length: {len(context) if context else 0}"
+ )
+ logger.debug(
+ f"[_build_query_context] Raw data entities: {len(raw_data.get('entities', []))}, relationships: {len(raw_data.get('relationships', []))}, chunks: {len(raw_data.get('chunks', []))}"
+ )
+ return context, raw_data
+ else:
+ # Normal context building (existing logic)
+ context = await _build_llm_context(
+ entities_context=truncation_result["entities_context"],
+ relations_context=truncation_result["relations_context"],
+ merged_chunks=merged_chunks,
+ query=query,
+ query_param=query_param,
+ global_config=text_chunks_db.global_config,
+ chunk_tracking=search_result["chunk_tracking"],
+ )
+ return context
async def _get_node_data(
@@ -3604,6 +4077,7 @@ async def _find_related_text_unit_from_relations(
return result_chunks
+@overload
async def naive_query(
query: str,
chunks_vdb: BaseVectorStorage,
@@ -3611,7 +4085,31 @@ async def naive_query(
global_config: dict[str, str],
hashing_kv: BaseKVStorage | None = None,
system_prompt: str | None = None,
-) -> str | AsyncIterator[str]:
+ return_raw_data: Literal[True] = True,
+) -> dict[str, Any]: ...
+
+
+@overload
+async def naive_query(
+ query: str,
+ chunks_vdb: BaseVectorStorage,
+ query_param: QueryParam,
+ global_config: dict[str, str],
+ hashing_kv: BaseKVStorage | None = None,
+ system_prompt: str | None = None,
+ return_raw_data: Literal[False] = False,
+) -> str | AsyncIterator[str]: ...
+
+
+async def naive_query(
+ query: str,
+ chunks_vdb: BaseVectorStorage,
+ query_param: QueryParam,
+ global_config: dict[str, str],
+ hashing_kv: BaseKVStorage | None = None,
+ system_prompt: str | None = None,
+ return_raw_data: bool = False,
+) -> str | AsyncIterator[str] | dict[str, Any]:
if query_param.model_func:
use_model_func = query_param.model_func
else:
@@ -3634,17 +4132,34 @@ async def naive_query(
query_param.user_prompt or "",
query_param.enable_rerank,
)
- cached_response = await handle_cache(
+ cached_result = await handle_cache(
hashing_kv, args_hash, query, query_param.mode, cache_type="query"
)
- if cached_response is not None:
- return cached_response
+ if cached_result is not None:
+ cached_response, _ = cached_result # Extract content, ignore timestamp
+ if not query_param.only_need_context and not query_param.only_need_prompt:
+ return cached_response
tokenizer: Tokenizer = global_config["tokenizer"]
chunks = await _get_vector_context(query, chunks_vdb, query_param, None)
if chunks is None or len(chunks) == 0:
+ # Build empty raw data for consistency
+ empty_raw_data = {
+ "entities": [], # naive mode has no entities
+ "relationships": [], # naive mode has no relationships
+ "chunks": [],
+ "metadata": {
+ "query_mode": "naive",
+ "keywords": {"high_level": [], "low_level": []},
+ },
+ }
+
+ # If only raw data is requested, return it directly
+ if return_raw_data:
+ return empty_raw_data
+
return PROMPTS["fail_response"]
# Calculate dynamic token limit for chunks
@@ -3655,14 +4170,6 @@ async def naive_query(
global_config.get("max_total_tokens", DEFAULT_MAX_TOTAL_TOKENS),
)
- # Calculate conversation history tokens
- history_context = ""
- if query_param.conversation_history:
- history_context = get_conversation_turns(
- query_param.conversation_history, query_param.history_turns
- )
- history_tokens = len(tokenizer.encode(history_context)) if history_context else 0
-
# Calculate system prompt template tokens (excluding content_data)
user_prompt = query_param.user_prompt if query_param.user_prompt else ""
response_type = (
@@ -3680,7 +4187,6 @@ async def naive_query(
sample_sys_prompt = sys_prompt_template.format(
content_data="", # Empty for overhead calculation
response_type=response_type,
- history=history_context,
user_prompt=user_prompt,
)
sys_prompt_template_tokens = len(tokenizer.encode(sample_sys_prompt))
@@ -3696,7 +4202,7 @@ async def naive_query(
available_chunk_tokens = max_total_tokens - used_tokens
logger.debug(
- f"Naive query token allocation - Total: {max_total_tokens}, History: {history_tokens}, SysPrompt: {sys_prompt_overhead}, Buffer: {buffer_tokens}, Available for chunks: {available_chunk_tokens}"
+ f"Naive query token allocation - Total: {max_total_tokens}, SysPrompt: {sys_prompt_overhead}, Buffer: {buffer_tokens}, Available for chunks: {available_chunk_tokens}"
)
# Process chunks using unified processing with dynamic token limit
@@ -3711,6 +4217,28 @@ async def naive_query(
logger.info(f"Final context: {len(processed_chunks)} chunks")
+ # If only raw data is requested, return it directly
+ if return_raw_data:
+ # Build raw data structure for naive mode using processed chunks
+ raw_data = _convert_to_user_format(
+ [], # naive mode has no entities
+ [], # naive mode has no relationships
+ processed_chunks,
+ "naive",
+ )
+
+ # Add complete metadata for naive mode
+ raw_data["metadata"]["keywords"] = {
+ "high_level": [], # naive mode has no keyword extraction
+ "low_level": [], # naive mode has no keyword extraction
+ }
+ raw_data["metadata"]["processing_info"] = {
+ "total_chunks_found": len(chunks),
+ "final_chunks_count": len(processed_chunks),
+ }
+
+ return raw_data
+
# Build text_units_context from processed chunks
text_units_context = []
for i, chunk in enumerate(processed_chunks):
@@ -3722,8 +4250,11 @@ async def naive_query(
}
)
- text_units_str = json.dumps(text_units_context, ensure_ascii=False)
- if query_param.only_need_context:
+ text_units_str = "\n".join(
+ json.dumps(text_unit, ensure_ascii=False) for text_unit in text_units_context
+ )
+
+ if query_param.only_need_context and not query_param.only_need_prompt:
return f"""
---Document Chunks(DC)---
@@ -3732,12 +4263,6 @@ async def naive_query(
```
"""
- # Process conversation history
- history_context = ""
- if query_param.conversation_history:
- history_context = get_conversation_turns(
- query_param.conversation_history, query_param.history_turns
- )
# Build system prompt
user_prompt = (
@@ -3749,7 +4274,6 @@ async def naive_query(
sys_prompt = sys_prompt_temp.format(
content_data=text_units_str,
response_type=query_param.response_type,
- history=history_context,
user_prompt=user_prompt,
)
@@ -3764,8 +4288,9 @@ async def naive_query(
response = await use_model_func(
query,
system_prompt=sys_prompt,
- stream=query_param.stream,
+ history_messages=query_param.conversation_history,
enable_cot=True,
+ stream=query_param.stream,
)
if isinstance(response, str) and len(response) > len(sys_prompt):
diff --git a/lightrag/prompt.py b/lightrag/prompt.py
index b53566b3..d8d51bb9 100644
--- a/lightrag/prompt.py
+++ b/lightrag/prompt.py
@@ -4,8 +4,8 @@ from typing import Any
PROMPTS: dict[str, Any] = {}
-PROMPTS["DEFAULT_TUPLE_DELIMITER"] = "<|>"
-PROMPTS["DEFAULT_RECORD_DELIMITER"] = "##"
+# All delimiters must be formatted as "<|UPPER_CASE_STRING|>"
+PROMPTS["DEFAULT_TUPLE_DELIMITER"] = "<|#|>"
PROMPTS["DEFAULT_COMPLETION_DELIMITER"] = "<|COMPLETE|>"
PROMPTS["DEFAULT_USER_PROMPT"] = "n/a"
@@ -16,22 +16,49 @@ You are a Knowledge Graph Specialist responsible for extracting entities and rel
---Instructions---
-1. **Entity Extraction:** Identify clearly defined and meaningful entities in the input text, and extract the following information:
- - entity_name: Name of the entity, ensure entity names are consistent throughout the extraction.
- - entity_type: Categorize the entity using the following entity types: {entity_types}; if none of the provided types are suitable, classify it as `Other`.
- - entity_description: Provide a comprehensive description of the entity's attributes and activities based on the information present in the input text.
-2. **Entity Output Format:** (entity{tuple_delimiter}entity_name{tuple_delimiter}entity_type{tuple_delimiter}entity_description)
-3. **Relationship Extraction:** Identify direct, clearly-stated and meaningful relationships between extracted entities within the input text, and extract the following information:
- - source_entity: name of the source entity.
- - target_entity: name of the target entity.
- - relationship_keywords: one or more high-level key words that summarize the overarching nature of the relationship, focusing on concepts or themes rather than specific details.
- - relationship_description: Explain the nature of the relationship between the source and target entities, providing a clear rationale for their connection.
-4. **Relationship Output Format:** (relationship{tuple_delimiter}source_entity{tuple_delimiter}target_entity{tuple_delimiter}relationship_keywords{tuple_delimiter}relationship_description)
-5. **Relationship Order:** Prioritize relationships based on their significance to the intended meaning of input text, and output more crucial relationships first.
-6. **Avoid Pronouns:** For entity names and all descriptions, explicitly name the subject or object instead of using pronouns; avoid pronouns such as `this document`, `our company`, `I`, `you`, and `he/she`.
-7. **Undirectional Relationship:** Treat relationships as undirected; swapping the source and target entities does not constitute a new relationship. Avoid outputting duplicate relationships.
-8. **Language:** Output entity names, keywords and descriptions in {language}.
-9. **Delimiter:** Use `{record_delimiter}` as the entity or relationship list delimiter; output `{completion_delimiter}` when all the entities and relationships are extracted.
+1. **Entity Extraction & Output:**
+ * **Identification:** Identify clearly defined and meaningful entities in the input text.
+ * **Entity Details:** For each identified entity, extract the following information:
+ * `entity_name`: The name of the entity. If the entity name is case-insensitive, capitalize the first letter of each significant word (title case). Ensure **consistent naming** across the entire extraction process.
+ * `entity_type`: Categorize the entity using one of the following types: `{entity_types}`. If none of the provided entity types apply, do not add new entity type and classify it as `Other`.
+ * `entity_description`: Provide a concise yet comprehensive description of the entity's attributes and activities, based *solely* on the information present in the input text.
+ * **Output Format - Entities:** Output a total of 4 fields for each entity, delimited by `{tuple_delimiter}`, on a single line. The first field *must* be the literal string `entity`.
+ * Format: `entity{tuple_delimiter}entity_name{tuple_delimiter}entity_type{tuple_delimiter}entity_description`
+
+2. **Relationship Extraction & Output:**
+ * **Identification:** Identify direct, clearly stated, and meaningful relationships between previously extracted entities.
+ * **N-ary Relationship Decomposition:** If a single statement describes a relationship involving more than two entities (an N-ary relationship), decompose it into multiple binary (two-entity) relationship pairs for separate description.
+ * **Example:** For "Alice, Bob, and Carol collaborated on Project X," extract binary relationships such as "Alice collaborated with Project X," "Bob collaborated with Project X," and "Carol collaborated with Project X," or "Alice collaborated with Bob," based on the most reasonable binary interpretations.
+ * **Relationship Details:** For each binary relationship, extract the following fields:
+ * `source_entity`: The name of the source entity. Ensure **consistent naming** with entity extraction. Capitalize the first letter of each significant word (title case) if the name is case-insensitive.
+ * `target_entity`: The name of the target entity. Ensure **consistent naming** with entity extraction. Capitalize the first letter of each significant word (title case) if the name is case-insensitive.
+ * `relationship_keywords`: One or more high-level keywords summarizing the overarching nature, concepts, or themes of the relationship. Multiple keywords within this field must be separated by a comma `,`. **DO NOT use `{tuple_delimiter}` for separating multiple keywords within this field.**
+ * `relationship_description`: A concise explanation of the nature of the relationship between the source and target entities, providing a clear rationale for their connection.
+ * **Output Format - Relationships:** Output a total of 5 fields for each relationship, delimited by `{tuple_delimiter}`, on a single line. The first field *must* be the literal string `relation`.
+ * Format: `relation{tuple_delimiter}source_entity{tuple_delimiter}target_entity{tuple_delimiter}relationship_keywords{tuple_delimiter}relationship_description`
+
+3. **Delimiter Usage Protocol:**
+ * The `{tuple_delimiter}` is a complete, atomic marker and **must not be filled with content**. It serves strictly as a field separator.
+ * **Incorrect Example:** `entity{tuple_delimiter}Tokyo<|location|>Tokyo is the capital of Japan.`
+ * **Correct Example:** `entity{tuple_delimiter}Tokyo{tuple_delimiter}location{tuple_delimiter}Tokyo is the capital of Japan.`
+
+4. **Relationship Direction & Duplication:**
+ * Treat all relationships as **undirected** unless explicitly stated otherwise. Swapping the source and target entities for an undirected relationship does not constitute a new relationship.
+ * Avoid outputting duplicate relationships.
+
+5. **Output Order & Prioritization:**
+ * Output all extracted entities first, followed by all extracted relationships.
+ * Within the list of relationships, prioritize and output those relationships that are **most significant** to the core meaning of the input text first.
+
+6. **Context & Objectivity:**
+ * Ensure all entity names and descriptions are written in the **third person**.
+ * Explicitly name the subject or object; **avoid using pronouns** such as `this article`, `this paper`, `our company`, `I`, `you`, and `he/she`.
+
+7. **Language & Proper Nouns:**
+ * The entire output (entity names, keywords, and descriptions) must be written in `{language}`.
+ * Proper nouns (e.g., personal names, place names, organization names) should be retained in their original language if a proper, widely accepted translation is not available or would cause ambiguity.
+
+8. **Completion Signal:** Output the literal string `{completion_delimiter}` only after all entities and relationships, following all criteria, have been completely extracted and outputted.
---Examples---
@@ -47,25 +74,31 @@ Text:
"""
PROMPTS["entity_extraction_user_prompt"] = """---Task---
-Extract entities and relationships from the input text to be Processed.
+Extract entities and relationships from the input text to be processed.
---Instructions---
-1. Output entities and relationships, prioritized by their relevance to the input text's core meaning.
-2. Output `{completion_delimiter}` when all the entities and relationships are extracted.
-3. Ensure the output language is {language}.
+1. **Strict Adherence to Format:** Strictly adhere to all format requirements for entity and relationship lists, including output order, field delimiters, and proper noun handling, as specified in the system prompt.
+2. **Output Content Only:** Output *only* the extracted list of entities and relationships. Do not include any introductory or concluding remarks, explanations, or additional text before or after the list.
+3. **Completion Signal:** Output `{completion_delimiter}` as the final line after all relevant entities and relationships have been extracted and presented.
+4. **Oputput Language:** Ensure the output language is {language}. Proper nouns (e.g., personal names, place names, organization names) must be kept in their original language and not translated.