changed flows service to get flow by id, not by path
This commit is contained in:
parent
ee506e60b7
commit
606cd35e9a
1 changed files with 87 additions and 53 deletions
|
|
@ -29,6 +29,74 @@ logger = get_logger(__name__)
|
|||
|
||||
|
||||
class FlowsService:
|
||||
def __init__(self):
|
||||
# Cache for flow file mappings to avoid repeated filesystem scans
|
||||
self._flow_file_cache = {}
|
||||
|
||||
def _get_flows_directory(self):
|
||||
"""Get the flows directory path"""
|
||||
current_file_dir = os.path.dirname(os.path.abspath(__file__)) # src/services/
|
||||
src_dir = os.path.dirname(current_file_dir) # src/
|
||||
project_root = os.path.dirname(src_dir) # project root
|
||||
return os.path.join(project_root, "flows")
|
||||
|
||||
def _find_flow_file_by_id(self, flow_id: str):
|
||||
"""
|
||||
Scan the flows directory and find the JSON file that contains the specified flow ID.
|
||||
|
||||
Args:
|
||||
flow_id: The flow ID to search for
|
||||
|
||||
Returns:
|
||||
str: The path to the flow file, or None if not found
|
||||
"""
|
||||
if not flow_id:
|
||||
return None
|
||||
|
||||
# Check cache first
|
||||
if flow_id in self._flow_file_cache:
|
||||
cached_path = self._flow_file_cache[flow_id]
|
||||
if os.path.exists(cached_path):
|
||||
return cached_path
|
||||
else:
|
||||
# Remove stale cache entry
|
||||
del self._flow_file_cache[flow_id]
|
||||
|
||||
flows_dir = self._get_flows_directory()
|
||||
|
||||
if not os.path.exists(flows_dir):
|
||||
logger.warning(f"Flows directory not found: {flows_dir}")
|
||||
return None
|
||||
|
||||
# Scan all JSON files in the flows directory
|
||||
try:
|
||||
for filename in os.listdir(flows_dir):
|
||||
if not filename.endswith('.json'):
|
||||
continue
|
||||
|
||||
file_path = os.path.join(flows_dir, filename)
|
||||
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
flow_data = json.load(f)
|
||||
|
||||
# Check if this file contains the flow we're looking for
|
||||
if flow_data.get('id') == flow_id:
|
||||
# Cache the result
|
||||
self._flow_file_cache[flow_id] = file_path
|
||||
logger.info(f"Found flow {flow_id} in file: {filename}")
|
||||
return file_path
|
||||
|
||||
except (json.JSONDecodeError, FileNotFoundError) as e:
|
||||
logger.warning(f"Error reading flow file {filename}: {e}")
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error scanning flows directory: {e}")
|
||||
return None
|
||||
|
||||
logger.warning(f"Flow with ID {flow_id} not found in flows directory")
|
||||
return None
|
||||
async def reset_langflow_flow(self, flow_type: str):
|
||||
"""Reset a Langflow flow by uploading the corresponding JSON file
|
||||
|
||||
|
|
@ -41,59 +109,35 @@ class FlowsService:
|
|||
if not LANGFLOW_URL:
|
||||
raise ValueError("LANGFLOW_URL environment variable is required")
|
||||
|
||||
# Determine flow file and ID based on type
|
||||
# Determine flow ID based on type
|
||||
if flow_type == "nudges":
|
||||
flow_file = "flows/openrag_nudges.json"
|
||||
flow_id = NUDGES_FLOW_ID
|
||||
elif flow_type == "retrieval":
|
||||
flow_file = "flows/openrag_agent.json"
|
||||
flow_id = LANGFLOW_CHAT_FLOW_ID
|
||||
elif flow_type == "ingest":
|
||||
flow_file = "flows/ingestion_flow.json"
|
||||
flow_id = LANGFLOW_INGEST_FLOW_ID
|
||||
else:
|
||||
raise ValueError(
|
||||
"flow_type must be either 'nudges', 'retrieval', or 'ingest'"
|
||||
)
|
||||
|
||||
if not flow_id:
|
||||
raise ValueError(f"Flow ID not configured for flow_type '{flow_type}'")
|
||||
|
||||
# Dynamically find the flow file by ID
|
||||
flow_path = self._find_flow_file_by_id(flow_id)
|
||||
if not flow_path:
|
||||
raise FileNotFoundError(f"Flow file not found for flow ID: {flow_id}")
|
||||
|
||||
# Load flow JSON file
|
||||
try:
|
||||
# Get the project root directory (go up from src/services/ to project root)
|
||||
# __file__ is src/services/chat_service.py
|
||||
# os.path.dirname(__file__) is src/services/
|
||||
# os.path.dirname(os.path.dirname(__file__)) is src/
|
||||
# os.path.dirname(os.path.dirname(os.path.dirname(__file__))) is project root
|
||||
current_file_dir = os.path.dirname(
|
||||
os.path.abspath(__file__)
|
||||
) # src/services/
|
||||
src_dir = os.path.dirname(current_file_dir) # src/
|
||||
project_root = os.path.dirname(src_dir) # project root
|
||||
flow_path = os.path.join(project_root, flow_file)
|
||||
|
||||
if not os.path.exists(flow_path):
|
||||
# List contents of project root to help debug
|
||||
try:
|
||||
contents = os.listdir(project_root)
|
||||
logger.info(f"Project root contents: {contents}")
|
||||
|
||||
flows_dir = os.path.join(project_root, "flows")
|
||||
if os.path.exists(flows_dir):
|
||||
flows_contents = os.listdir(flows_dir)
|
||||
logger.info(f"Flows directory contents: {flows_contents}")
|
||||
else:
|
||||
logger.info("Flows directory does not exist")
|
||||
except Exception as e:
|
||||
logger.error(f"Error listing directory contents: {e}")
|
||||
|
||||
raise FileNotFoundError(f"Flow file not found at: {flow_path}")
|
||||
|
||||
with open(flow_path, "r") as f:
|
||||
flow_data = json.load(f)
|
||||
logger.info(f"Successfully loaded flow data from {flow_file}")
|
||||
logger.info(f"Successfully loaded flow data for {flow_type} from {os.path.basename(flow_path)}")
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"Invalid JSON in flow file {flow_path}: {e}")
|
||||
except FileNotFoundError:
|
||||
raise ValueError(f"Flow file not found: {flow_path}")
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"Invalid JSON in flow file {flow_file}: {e}")
|
||||
|
||||
# Make PATCH request to Langflow API to update the flow using shared client
|
||||
try:
|
||||
|
|
@ -106,7 +150,7 @@ class FlowsService:
|
|||
logger.info(
|
||||
f"Successfully reset {flow_type} flow",
|
||||
flow_id=flow_id,
|
||||
flow_file=flow_file,
|
||||
flow_file=os.path.basename(flow_path),
|
||||
)
|
||||
return {
|
||||
"success": True,
|
||||
|
|
@ -155,11 +199,10 @@ class FlowsService:
|
|||
|
||||
logger.info(f"Assigning {provider} components")
|
||||
|
||||
# Define flow configurations
|
||||
# Define flow configurations (removed hardcoded file paths)
|
||||
flow_configs = [
|
||||
{
|
||||
"name": "nudges",
|
||||
"file": "flows/openrag_nudges.json",
|
||||
"flow_id": NUDGES_FLOW_ID,
|
||||
"embedding_id": OPENAI_EMBEDDING_COMPONENT_ID,
|
||||
"llm_id": OPENAI_LLM_COMPONENT_ID,
|
||||
|
|
@ -167,7 +210,6 @@ class FlowsService:
|
|||
},
|
||||
{
|
||||
"name": "retrieval",
|
||||
"file": "flows/openrag_agent.json",
|
||||
"flow_id": LANGFLOW_CHAT_FLOW_ID,
|
||||
"embedding_id": OPENAI_EMBEDDING_COMPONENT_ID,
|
||||
"llm_id": OPENAI_LLM_COMPONENT_ID,
|
||||
|
|
@ -175,7 +217,6 @@ class FlowsService:
|
|||
},
|
||||
{
|
||||
"name": "ingest",
|
||||
"file": "flows/ingestion_flow.json",
|
||||
"flow_id": LANGFLOW_INGEST_FLOW_ID,
|
||||
"embedding_id": OPENAI_EMBEDDING_COMPONENT_ID,
|
||||
"llm_id": None, # Ingestion flow might not have LLM
|
||||
|
|
@ -272,7 +313,6 @@ class FlowsService:
|
|||
async def _update_flow_components(self, config, llm_template, embedding_template, llm_text_template):
|
||||
"""Update components in a specific flow"""
|
||||
flow_name = config["name"]
|
||||
flow_file = config["file"]
|
||||
flow_id = config["flow_id"]
|
||||
old_embedding_id = config["embedding_id"]
|
||||
old_llm_id = config["llm_id"]
|
||||
|
|
@ -281,14 +321,11 @@ class FlowsService:
|
|||
new_llm_id = llm_template["data"]["id"]
|
||||
new_embedding_id = embedding_template["data"]["id"]
|
||||
new_llm_text_id = llm_text_template["data"]["id"]
|
||||
# Get the project root directory
|
||||
current_file_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
src_dir = os.path.dirname(current_file_dir)
|
||||
project_root = os.path.dirname(src_dir)
|
||||
flow_path = os.path.join(project_root, flow_file)
|
||||
|
||||
if not os.path.exists(flow_path):
|
||||
raise FileNotFoundError(f"Flow file not found at: {flow_path}")
|
||||
# Dynamically find the flow file by ID
|
||||
flow_path = self._find_flow_file_by_id(flow_id)
|
||||
if not flow_path:
|
||||
raise FileNotFoundError(f"Flow file not found for flow ID: {flow_id}")
|
||||
|
||||
# Load flow JSON
|
||||
with open(flow_path, "r") as f:
|
||||
|
|
@ -552,21 +589,18 @@ class FlowsService:
|
|||
f"Changing dropdown values for provider {provider}, embedding: {embedding_model}, llm: {llm_model}, endpoint: {endpoint}"
|
||||
)
|
||||
|
||||
# Define flow configurations with provider-specific component IDs
|
||||
# Define flow configurations with provider-specific component IDs (removed hardcoded file paths)
|
||||
flow_configs = [
|
||||
{
|
||||
"name": "nudges",
|
||||
"file": "flows/openrag_nudges.json",
|
||||
"flow_id": NUDGES_FLOW_ID,
|
||||
},
|
||||
{
|
||||
"name": "retrieval",
|
||||
"file": "flows/openrag_agent.json",
|
||||
"flow_id": LANGFLOW_CHAT_FLOW_ID,
|
||||
},
|
||||
{
|
||||
"name": "ingest",
|
||||
"file": "flows/ingestion_flow.json",
|
||||
"flow_id": LANGFLOW_INGEST_FLOW_ID,
|
||||
},
|
||||
]
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue