diff --git a/src/services/flows_service.py b/src/services/flows_service.py index 0d7a7bc8..2160ab5d 100644 --- a/src/services/flows_service.py +++ b/src/services/flows_service.py @@ -29,6 +29,74 @@ logger = get_logger(__name__) class FlowsService: + def __init__(self): + # Cache for flow file mappings to avoid repeated filesystem scans + self._flow_file_cache = {} + + def _get_flows_directory(self): + """Get the flows directory path""" + current_file_dir = os.path.dirname(os.path.abspath(__file__)) # src/services/ + src_dir = os.path.dirname(current_file_dir) # src/ + project_root = os.path.dirname(src_dir) # project root + return os.path.join(project_root, "flows") + + def _find_flow_file_by_id(self, flow_id: str): + """ + Scan the flows directory and find the JSON file that contains the specified flow ID. + + Args: + flow_id: The flow ID to search for + + Returns: + str: The path to the flow file, or None if not found + """ + if not flow_id: + return None + + # Check cache first + if flow_id in self._flow_file_cache: + cached_path = self._flow_file_cache[flow_id] + if os.path.exists(cached_path): + return cached_path + else: + # Remove stale cache entry + del self._flow_file_cache[flow_id] + + flows_dir = self._get_flows_directory() + + if not os.path.exists(flows_dir): + logger.warning(f"Flows directory not found: {flows_dir}") + return None + + # Scan all JSON files in the flows directory + try: + for filename in os.listdir(flows_dir): + if not filename.endswith('.json'): + continue + + file_path = os.path.join(flows_dir, filename) + + try: + with open(file_path, 'r') as f: + flow_data = json.load(f) + + # Check if this file contains the flow we're looking for + if flow_data.get('id') == flow_id: + # Cache the result + self._flow_file_cache[flow_id] = file_path + logger.info(f"Found flow {flow_id} in file: {filename}") + return file_path + + except (json.JSONDecodeError, FileNotFoundError) as e: + logger.warning(f"Error reading flow file {filename}: {e}") + continue + + except Exception as e: + logger.error(f"Error scanning flows directory: {e}") + return None + + logger.warning(f"Flow with ID {flow_id} not found in flows directory") + return None async def reset_langflow_flow(self, flow_type: str): """Reset a Langflow flow by uploading the corresponding JSON file @@ -41,59 +109,35 @@ class FlowsService: if not LANGFLOW_URL: raise ValueError("LANGFLOW_URL environment variable is required") - # Determine flow file and ID based on type + # Determine flow ID based on type if flow_type == "nudges": - flow_file = "flows/openrag_nudges.json" flow_id = NUDGES_FLOW_ID elif flow_type == "retrieval": - flow_file = "flows/openrag_agent.json" flow_id = LANGFLOW_CHAT_FLOW_ID elif flow_type == "ingest": - flow_file = "flows/ingestion_flow.json" flow_id = LANGFLOW_INGEST_FLOW_ID else: raise ValueError( "flow_type must be either 'nudges', 'retrieval', or 'ingest'" ) + if not flow_id: + raise ValueError(f"Flow ID not configured for flow_type '{flow_type}'") + + # Dynamically find the flow file by ID + flow_path = self._find_flow_file_by_id(flow_id) + if not flow_path: + raise FileNotFoundError(f"Flow file not found for flow ID: {flow_id}") + # Load flow JSON file try: - # Get the project root directory (go up from src/services/ to project root) - # __file__ is src/services/chat_service.py - # os.path.dirname(__file__) is src/services/ - # os.path.dirname(os.path.dirname(__file__)) is src/ - # os.path.dirname(os.path.dirname(os.path.dirname(__file__))) is project root - current_file_dir = os.path.dirname( - os.path.abspath(__file__) - ) # src/services/ - src_dir = os.path.dirname(current_file_dir) # src/ - project_root = os.path.dirname(src_dir) # project root - flow_path = os.path.join(project_root, flow_file) - - if not os.path.exists(flow_path): - # List contents of project root to help debug - try: - contents = os.listdir(project_root) - logger.info(f"Project root contents: {contents}") - - flows_dir = os.path.join(project_root, "flows") - if os.path.exists(flows_dir): - flows_contents = os.listdir(flows_dir) - logger.info(f"Flows directory contents: {flows_contents}") - else: - logger.info("Flows directory does not exist") - except Exception as e: - logger.error(f"Error listing directory contents: {e}") - - raise FileNotFoundError(f"Flow file not found at: {flow_path}") - with open(flow_path, "r") as f: flow_data = json.load(f) - logger.info(f"Successfully loaded flow data from {flow_file}") + logger.info(f"Successfully loaded flow data for {flow_type} from {os.path.basename(flow_path)}") + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in flow file {flow_path}: {e}") except FileNotFoundError: raise ValueError(f"Flow file not found: {flow_path}") - except json.JSONDecodeError as e: - raise ValueError(f"Invalid JSON in flow file {flow_file}: {e}") # Make PATCH request to Langflow API to update the flow using shared client try: @@ -106,7 +150,7 @@ class FlowsService: logger.info( f"Successfully reset {flow_type} flow", flow_id=flow_id, - flow_file=flow_file, + flow_file=os.path.basename(flow_path), ) return { "success": True, @@ -155,11 +199,10 @@ class FlowsService: logger.info(f"Assigning {provider} components") - # Define flow configurations + # Define flow configurations (removed hardcoded file paths) flow_configs = [ { "name": "nudges", - "file": "flows/openrag_nudges.json", "flow_id": NUDGES_FLOW_ID, "embedding_id": OPENAI_EMBEDDING_COMPONENT_ID, "llm_id": OPENAI_LLM_COMPONENT_ID, @@ -167,7 +210,6 @@ class FlowsService: }, { "name": "retrieval", - "file": "flows/openrag_agent.json", "flow_id": LANGFLOW_CHAT_FLOW_ID, "embedding_id": OPENAI_EMBEDDING_COMPONENT_ID, "llm_id": OPENAI_LLM_COMPONENT_ID, @@ -175,7 +217,6 @@ class FlowsService: }, { "name": "ingest", - "file": "flows/ingestion_flow.json", "flow_id": LANGFLOW_INGEST_FLOW_ID, "embedding_id": OPENAI_EMBEDDING_COMPONENT_ID, "llm_id": None, # Ingestion flow might not have LLM @@ -272,7 +313,6 @@ class FlowsService: async def _update_flow_components(self, config, llm_template, embedding_template, llm_text_template): """Update components in a specific flow""" flow_name = config["name"] - flow_file = config["file"] flow_id = config["flow_id"] old_embedding_id = config["embedding_id"] old_llm_id = config["llm_id"] @@ -281,14 +321,11 @@ class FlowsService: new_llm_id = llm_template["data"]["id"] new_embedding_id = embedding_template["data"]["id"] new_llm_text_id = llm_text_template["data"]["id"] - # Get the project root directory - current_file_dir = os.path.dirname(os.path.abspath(__file__)) - src_dir = os.path.dirname(current_file_dir) - project_root = os.path.dirname(src_dir) - flow_path = os.path.join(project_root, flow_file) - if not os.path.exists(flow_path): - raise FileNotFoundError(f"Flow file not found at: {flow_path}") + # Dynamically find the flow file by ID + flow_path = self._find_flow_file_by_id(flow_id) + if not flow_path: + raise FileNotFoundError(f"Flow file not found for flow ID: {flow_id}") # Load flow JSON with open(flow_path, "r") as f: @@ -552,21 +589,18 @@ class FlowsService: f"Changing dropdown values for provider {provider}, embedding: {embedding_model}, llm: {llm_model}, endpoint: {endpoint}" ) - # Define flow configurations with provider-specific component IDs + # Define flow configurations with provider-specific component IDs (removed hardcoded file paths) flow_configs = [ { "name": "nudges", - "file": "flows/openrag_nudges.json", "flow_id": NUDGES_FLOW_ID, }, { "name": "retrieval", - "file": "flows/openrag_agent.json", "flow_id": LANGFLOW_CHAT_FLOW_ID, }, { "name": "ingest", - "file": "flows/ingestion_flow.json", "flow_id": LANGFLOW_INGEST_FLOW_ID, }, ]