# # Copyright 2025 The InfiniFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """ Hierarchical Retrieval Architecture for Production-Grade RAG Implements a three-tier retrieval system: 1. Tier 1: Knowledge Base Routing - Routes queries to relevant KBs 2. Tier 2: Document Filtering - Filters by metadata 3. Tier 3: Chunk Refinement - Precise vector retrieval This architecture addresses scalability and precision limitations in production environments with large document collections. """ import logging from typing import List, Dict, Any, Optional from dataclasses import dataclass, field @dataclass class RetrievalConfig: """Configuration for hierarchical retrieval""" # Tier 1: KB Routing enable_kb_routing: bool = True kb_routing_method: str = "auto" # "auto", "rule_based", "llm_based", "all" kb_routing_threshold: float = 0.5 # Tier 2: Document Filtering enable_doc_filtering: bool = True metadata_fields: List[str] = field(default_factory=list) # Key metadata fields to use enable_metadata_similarity: bool = False # Fuzzy matching for text metadata metadata_similarity_threshold: float = 0.7 # Tier 3: Chunk Refinement enable_parent_child_chunking: bool = False use_summary_mapping: bool = False chunk_refinement_top_k: int = 10 # General settings max_candidates_per_tier: int = 100 enable_hybrid_search: bool = True vector_weight: float = 0.7 keyword_weight: float = 0.3 @dataclass class RetrievalResult: """Result from hierarchical retrieval""" query: str selected_kbs: List[str] = field(default_factory=list) filtered_docs: List[Dict[str, Any]] = field(default_factory=list) retrieved_chunks: List[Dict[str, Any]] = field(default_factory=list) # Metadata about retrieval process tier1_candidates: int = 0 tier2_candidates: int = 0 tier3_candidates: int = 0 total_time_ms: float = 0.0 tier1_time_ms: float = 0.0 tier2_time_ms: float = 0.0 tier3_time_ms: float = 0.0 class HierarchicalRetrieval: """ Three-tier hierarchical retrieval system for production-grade RAG. This class orchestrates the retrieval process across three tiers: - Tier 1: Routes query to relevant knowledge bases - Tier 2: Filters documents by metadata - Tier 3: Performs precise chunk-level retrieval """ def __init__(self, config: Optional[RetrievalConfig] = None): """ Initialize hierarchical retrieval system. Args: config: Configuration for retrieval behavior """ self.config = config or RetrievalConfig() self.logger = logging.getLogger(__name__) def retrieve( self, query: str, kb_ids: List[str], top_k: int = 10, filters: Optional[Dict[str, Any]] = None ) -> RetrievalResult: """ Perform hierarchical retrieval. Args: query: User query kb_ids: List of knowledge base IDs to search top_k: Number of final chunks to return filters: Optional metadata filters Returns: RetrievalResult with chunks and metadata """ import time start_time = time.time() result = RetrievalResult(query=query) # Tier 1: Knowledge Base Routing tier1_start = time.time() selected_kbs = self._tier1_kb_routing(query, kb_ids) result.selected_kbs = selected_kbs result.tier1_candidates = len(selected_kbs) result.tier1_time_ms = (time.time() - tier1_start) * 1000 if not selected_kbs: self.logger.warning(f"No knowledge bases selected for query: {query}") result.total_time_ms = (time.time() - start_time) * 1000 return result # Tier 2: Document Filtering tier2_start = time.time() filtered_docs = self._tier2_document_filtering( query, selected_kbs, filters ) result.filtered_docs = filtered_docs result.tier2_candidates = len(filtered_docs) result.tier2_time_ms = (time.time() - tier2_start) * 1000 if not filtered_docs: self.logger.warning(f"No documents passed filtering for query: {query}") result.total_time_ms = (time.time() - start_time) * 1000 return result # Tier 3: Chunk Refinement tier3_start = time.time() chunks = self._tier3_chunk_refinement( query, filtered_docs, top_k ) result.retrieved_chunks = chunks result.tier3_candidates = len(chunks) result.tier3_time_ms = (time.time() - tier3_start) * 1000 result.total_time_ms = (time.time() - start_time) * 1000 self.logger.info( f"Hierarchical retrieval completed: " f"{result.tier1_candidates} KBs -> " f"{result.tier2_candidates} docs -> " f"{result.tier3_candidates} chunks " f"in {result.total_time_ms:.2f}ms" ) return result def _tier1_kb_routing( self, query: str, kb_ids: List[str] ) -> List[str]: """ Tier 1: Route query to relevant knowledge bases. Args: query: User query kb_ids: Available knowledge base IDs Returns: List of selected KB IDs """ if not self.config.enable_kb_routing: return kb_ids method = self.config.kb_routing_method if method == "all": # Use all provided KBs return kb_ids elif method == "rule_based": # Rule-based routing (placeholder for custom rules) return self._rule_based_routing(query, kb_ids) elif method == "llm_based": # LLM-based routing (placeholder for LLM integration) return self._llm_based_routing(query, kb_ids) else: # "auto" # Auto mode: intelligent routing based on query analysis return self._auto_routing(query, kb_ids) def _rule_based_routing( self, query: str, kb_ids: List[str] ) -> List[str]: """ Rule-based KB routing using predefined rules. This is a placeholder for custom routing logic. Users can extend this with domain-specific rules. """ # TODO: Implement rule-based routing # For now, return all KBs return kb_ids def _llm_based_routing( self, query: str, kb_ids: List[str] ) -> List[str]: """ LLM-based KB routing using language model. This is a placeholder for LLM integration. """ # TODO: Implement LLM-based routing # For now, return all KBs return kb_ids def _auto_routing( self, query: str, kb_ids: List[str] ) -> List[str]: """ Auto routing using query analysis. This is a placeholder for intelligent routing. """ # TODO: Implement auto routing with query analysis # For now, return all KBs return kb_ids def _tier2_document_filtering( self, query: str, kb_ids: List[str], filters: Optional[Dict[str, Any]] = None ) -> List[Dict[str, Any]]: """ Tier 2: Filter documents by metadata. Args: query: User query kb_ids: Selected knowledge base IDs filters: Metadata filters Returns: List of filtered documents """ if not self.config.enable_doc_filtering: # Skip filtering, return placeholder return [] # TODO: Implement document filtering logic # This would integrate with the existing document service # to filter by metadata fields filtered_docs = [] # Placeholder: would query documents from selected KBs # and apply metadata filters return filtered_docs def _tier3_chunk_refinement( self, query: str, filtered_docs: List[Dict[str, Any]], top_k: int ) -> List[Dict[str, Any]]: """ Tier 3: Perform precise chunk-level retrieval. Args: query: User query filtered_docs: Documents that passed Tier 2 filtering top_k: Number of chunks to return Returns: List of retrieved chunks """ # TODO: Implement chunk refinement logic # This would integrate with existing vector search # and optionally use parent-child chunking chunks = [] # Placeholder: would perform vector search within # the filtered document set return chunks class KBRouter: """ Knowledge Base Router for Tier 1. Handles routing logic to select relevant KBs based on query intent. """ def __init__(self): self.logger = logging.getLogger(__name__) def route( self, query: str, available_kbs: List[Dict[str, Any]], method: str = "auto" ) -> List[str]: """ Route query to relevant knowledge bases. Args: query: User query available_kbs: List of available KB metadata method: Routing method ("auto", "rule_based", "llm_based") Returns: List of selected KB IDs """ # TODO: Implement routing logic return [kb["id"] for kb in available_kbs] class DocumentFilter: """ Document Filter for Tier 2. Handles metadata-based document filtering. """ def __init__(self): self.logger = logging.getLogger(__name__) def filter( self, query: str, documents: List[Dict[str, Any]], metadata_fields: List[str], filters: Optional[Dict[str, Any]] = None ) -> List[Dict[str, Any]]: """ Filter documents by metadata. Args: query: User query documents: List of documents to filter metadata_fields: Key metadata fields to consider filters: Explicit metadata filters Returns: Filtered list of documents """ # TODO: Implement filtering logic return documents class ChunkRefiner: """ Chunk Refiner for Tier 3. Handles precise chunk-level retrieval with optional parent-child support. """ def __init__(self): self.logger = logging.getLogger(__name__) def refine( self, query: str, doc_ids: List[str], top_k: int, use_parent_child: bool = False ) -> List[Dict[str, Any]]: """ Perform precise chunk retrieval. Args: query: User query doc_ids: Document IDs to search within top_k: Number of chunks to return use_parent_child: Whether to use parent-child chunking Returns: List of retrieved chunks """ # TODO: Implement chunk refinement logic return []