Merge branch 'main' into feat-add-run-query-opensearch

This commit is contained in:
Edwin Jose 2025-12-12 16:16:15 -05:00 committed by GitHub
commit 2ed6623bdd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 306 additions and 104 deletions

File diff suppressed because one or more lines are too long

View file

@ -162,6 +162,19 @@ export function useChatStreaming({
if (line.trim()) {
try {
const chunk = JSON.parse(line);
// Investigation logging for Granite 3.3 8b tool call detection
const chunkKeys = Object.keys(chunk);
const toolRelatedKeys = chunkKeys.filter(key =>
key.toLowerCase().includes('tool') ||
key.toLowerCase().includes('call') ||
key.toLowerCase().includes('retrieval') ||
key.toLowerCase().includes('function') ||
key.toLowerCase().includes('result')
);
if (toolRelatedKeys.length > 0) {
console.log('[Tool Detection] Found tool-related keys:', toolRelatedKeys, chunk);
}
// Extract response ID if present
if (chunk.id) {
@ -449,6 +462,42 @@ export function useChatStreaming({
}
}
}
// Heuristic detection for implicit tool calls (Granite 3.3 8b workaround)
// Check if chunk contains retrieval results without explicit tool call markers
const hasImplicitToolCall = (
// Check for various result indicators in the chunk
(chunk.results && Array.isArray(chunk.results) && chunk.results.length > 0) ||
(chunk.outputs && Array.isArray(chunk.outputs) && chunk.outputs.length > 0) ||
// Check for retrieval-related fields
chunk.retrieved_documents ||
chunk.retrieval_results ||
// Check for nested data structures that might contain results
(chunk.data && typeof chunk.data === 'object' && (
chunk.data.results ||
chunk.data.retrieved_documents ||
chunk.data.retrieval_results
))
);
if (hasImplicitToolCall && currentFunctionCalls.length === 0) {
console.log('[Heuristic Detection] Detected implicit tool call:', chunk);
// Create a synthetic function call for the UI
const results = chunk.results || chunk.outputs || chunk.retrieved_documents ||
chunk.retrieval_results || chunk.data?.results ||
chunk.data?.retrieved_documents || [];
const syntheticFunctionCall: FunctionCall = {
name: "Retrieval",
arguments: { implicit: true, detected_heuristically: true },
status: "completed",
type: "retrieval_call",
result: results,
};
currentFunctionCalls.push(syntheticFunctionCall);
console.log('[Heuristic Detection] Created synthetic function call');
}
// Update streaming message in real-time
if (
@ -486,6 +535,29 @@ export function useChatStreaming({
"No response received from the server. Please try again.",
);
}
// Post-processing: Heuristic detection based on final content
// If no explicit tool calls detected but content shows RAG indicators
if (currentFunctionCalls.length === 0 && currentContent) {
// Check for citation patterns that indicate RAG usage
const hasCitations = /\(Source:|\[Source:|\bSource:|filename:|document:/i.test(currentContent);
// Check for common RAG response patterns
const hasRAGPattern = /based on.*(?:document|file|information|data)|according to.*(?:document|file)/i.test(currentContent);
if (hasCitations || hasRAGPattern) {
console.log('[Post-Processing] Detected RAG usage from content patterns');
const syntheticFunctionCall: FunctionCall = {
name: "Retrieval",
arguments: {
implicit: true,
detected_from: hasCitations ? "citations" : "content_patterns"
},
status: "completed",
type: "retrieval_call",
};
currentFunctionCalls.push(syntheticFunctionCall);
}
}
// Finalize the message
const finalMessage: Message = {

View file

@ -135,6 +135,7 @@ async def async_response_stream(
full_response = ""
chunk_count = 0
detected_tool_call = False # Track if we've detected a tool call
async for chunk in response:
chunk_count += 1
logger.debug(
@ -158,6 +159,17 @@ async def async_response_stream(
else:
delta_text = str(chunk.delta)
full_response += delta_text
# Enhanced logging for tool call detection (Granite 3.3 8b investigation)
chunk_attrs = dir(chunk) if hasattr(chunk, '__dict__') else []
tool_related_attrs = [attr for attr in chunk_attrs if 'tool' in attr.lower() or 'call' in attr.lower() or 'retrieval' in attr.lower()]
if tool_related_attrs:
logger.info(
"Tool-related attributes found in chunk",
chunk_count=chunk_count,
attributes=tool_related_attrs,
chunk_type=type(chunk).__name__
)
# Send the raw event as JSON followed by newline for easy parsing
try:
@ -169,7 +181,57 @@ async def async_response_stream(
chunk_data = chunk.__dict__
else:
chunk_data = str(chunk)
# Log detailed chunk structure for investigation (especially for Granite 3.3 8b)
if isinstance(chunk_data, dict):
# Check for any fields that might indicate tool usage
potential_tool_fields = {
k: v for k, v in chunk_data.items()
if any(keyword in str(k).lower() for keyword in ['tool', 'call', 'retrieval', 'function', 'result', 'output'])
}
if potential_tool_fields:
logger.info(
"Potential tool-related fields in chunk",
chunk_count=chunk_count,
fields=list(potential_tool_fields.keys()),
sample_data=str(potential_tool_fields)[:500]
)
# Middleware: Detect implicit tool calls and inject standardized events
# This helps Granite 3.3 8b and other models that don't emit standard markers
if isinstance(chunk_data, dict) and not detected_tool_call:
# Check if this chunk contains retrieval results
has_results = any([
'results' in chunk_data and isinstance(chunk_data.get('results'), list),
'outputs' in chunk_data and isinstance(chunk_data.get('outputs'), list),
'retrieved_documents' in chunk_data,
'retrieval_results' in chunk_data,
])
if has_results:
logger.info(
"Detected implicit tool call in backend, injecting synthetic event",
chunk_fields=list(chunk_data.keys())
)
# Inject a synthetic tool call event before this chunk
synthetic_event = {
"type": "response.output_item.done",
"item": {
"type": "retrieval_call",
"id": f"synthetic_{chunk_count}",
"name": "Retrieval",
"tool_name": "Retrieval",
"status": "completed",
"inputs": {"implicit": True, "backend_detected": True},
"results": chunk_data.get('results') or chunk_data.get('outputs') or
chunk_data.get('retrieved_documents') or
chunk_data.get('retrieval_results') or []
}
}
# Send the synthetic event first
yield (json.dumps(synthetic_event, default=str) + "\n").encode("utf-8")
detected_tool_call = True # Mark that we've injected a tool call
yield (json.dumps(chunk_data, default=str) + "\n").encode("utf-8")
except Exception as e:
# Fallback to string representation

View file

@ -180,6 +180,22 @@ class LangflowFileService:
body=resp.text[:1000],
)
resp.raise_for_status()
# Check if response is actually JSON before parsing
content_type = resp.headers.get("content-type", "")
if "application/json" not in content_type:
logger.error(
"[LF] Unexpected response content type from Langflow",
content_type=content_type,
status_code=resp.status_code,
body=resp.text[:1000],
)
raise ValueError(
f"Langflow returned {content_type} instead of JSON. "
f"This may indicate the ingestion flow failed or the endpoint is incorrect. "
f"Response preview: {resp.text[:500]}"
)
try:
resp_json = resp.json()
except Exception as e:

View file

@ -88,6 +88,7 @@ class LangflowHistoryService:
}
# Extract function calls from content_blocks if present
# Convert to match streaming format: chunk.item.type === "tool_call"
content_blocks = msg.get("content_blocks", [])
if content_blocks:
chunks = []
@ -95,23 +96,23 @@ class LangflowHistoryService:
if block.get("title") == "Agent Steps" and block.get("contents"):
for content in block["contents"]:
if content.get("type") == "tool_use":
# Convert Langflow tool_use format to OpenRAG chunks format
# Convert Langflow tool_use format to match streaming chunks format
# Frontend expects: chunk.item.type === "tool_call" with tool_name, inputs, results
chunk = {
"type": "function",
"function": {
"name": content.get("name", ""),
"arguments": content.get("tool_input", {}),
"response": content.get("output", {})
},
"function_call_result": content.get("output", {}),
"duration": content.get("duration"),
"error": content.get("error")
"type": "response.output_item.added",
"item": {
"type": "tool_call",
"tool_name": content.get("name", ""),
"inputs": content.get("tool_input", {}),
"results": content.get("output", {}),
"id": content.get("id") or content.get("run_id", ""),
"status": "completed" if not content.get("error") else "error"
}
}
chunks.append(chunk)
if chunks:
converted_msg["chunks"] = chunks
converted_msg["response_data"] = {"tool_calls": chunks}
converted_messages.append(converted_msg)

View file

@ -2,7 +2,8 @@
import asyncio
import json
import subprocess
import os
import re
import time
from dataclasses import dataclass, field
from enum import Enum
@ -121,6 +122,36 @@ class ContainerManager:
self._compose_search_log += f"\n 3. Falling back to: {cwd_path.absolute()}"
return Path(filename)
def _get_env_from_file(self) -> Dict[str, str]:
"""Read environment variables from .env file, prioritizing file values over os.environ.
Uses python-dotenv's load_dotenv() for standard .env file parsing, which handles:
- Quoted values (single and double quotes)
- Variable expansion (${VAR})
- Multiline values
- Escaped characters
- Comments
This ensures Docker Compose commands use the latest values from .env file,
even if os.environ has stale values.
"""
from dotenv import load_dotenv
env = dict(os.environ) # Start with current environment
env_file = Path(".env")
if env_file.exists():
try:
# Load .env file with override=True to ensure file values take precedence
# This loads into os.environ, then we copy to our dict
load_dotenv(dotenv_path=env_file, override=True)
# Update our dict with all environment variables (including those from .env)
env.update(os.environ)
except Exception as e:
logger.debug(f"Error reading .env file for Docker Compose: {e}")
return env
def is_available(self) -> bool:
"""Check if container runtime with compose is available."""
return (self.runtime_info.runtime_type != RuntimeType.NONE and
@ -153,7 +184,6 @@ class ContainerManager:
continue
try:
import re
content = compose_file.read_text()
current_service = None
in_ports_section = False
@ -245,11 +275,15 @@ class ContainerManager:
cmd.extend(args)
try:
# Get environment variables from .env file to ensure latest values
env = self._get_env_from_file()
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=Path.cwd(),
env=env,
)
stdout, stderr = await process.communicate()
@ -287,11 +321,15 @@ class ContainerManager:
cmd.extend(args)
try:
# Get environment variables from .env file to ensure latest values
env = self._get_env_from_file()
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=Path.cwd(),
env=env,
)
if process.stdout:
@ -356,11 +394,15 @@ class ContainerManager:
cmd.extend(args)
try:
# Get environment variables from .env file to ensure latest values
env = self._get_env_from_file()
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=Path.cwd(),
env=env,
)
except Exception as e:
success_flag["value"] = False
@ -926,7 +968,6 @@ class ContainerManager:
async for message, replace_last in self._stream_compose_command(["up", "-d"], up_success, cpu_mode):
# Detect error patterns in the output
import re
lower_msg = message.lower()
# Check for common error patterns
@ -1110,11 +1151,15 @@ class ContainerManager:
cmd.extend(["logs", "-f", service_name])
try:
# Get environment variables from .env file to ensure latest values
env = self._get_env_from_file()
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=Path.cwd(),
env=env,
)
if process.stdout:

View file

@ -1,5 +1,6 @@
"""Environment configuration manager for OpenRAG TUI."""
import os
import secrets
import string
from dataclasses import dataclass, field
@ -7,12 +8,10 @@ from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from dotenv import load_dotenv
from utils.logging_config import get_logger
logger = get_logger(__name__)
from ..utils.validation import (
sanitize_env_value,
validate_documents_paths,
validate_google_oauth_client_id,
validate_non_empty,
@ -20,6 +19,8 @@ from ..utils.validation import (
validate_url,
)
logger = get_logger(__name__)
@dataclass
class EnvConfig:
@ -119,9 +120,15 @@ class EnvManager:
return f"'{escaped_value}'"
def load_existing_env(self) -> bool:
"""Load existing .env file if it exists, or fall back to environment variables."""
import os
"""Load existing .env file if it exists, or fall back to environment variables.
Uses python-dotenv's load_dotenv() for standard .env file parsing, which handles:
- Quoted values (single and double quotes)
- Variable expansion (${VAR})
- Multiline values
- Escaped characters
- Comments
"""
# Map env vars to config attributes
# These are environment variable names, not actual secrets
attr_map = { # pragma: allowlist secret
@ -158,36 +165,23 @@ class EnvManager:
loaded_from_file = False
# Try to load from .env file first
# Load .env file using python-dotenv for standard parsing
# override=True ensures .env file values take precedence over existing environment variables
if self.env_file.exists():
try:
with open(self.env_file, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = sanitize_env_value(value)
if key in attr_map:
setattr(self.config, attr_map[key], value)
# Load .env file with override=True to ensure file values take precedence
load_dotenv(dotenv_path=self.env_file, override=True)
loaded_from_file = True
logger.debug(f"Loaded .env file from {self.env_file}")
except Exception as e:
logger.error("Error loading .env file", error=str(e))
# Fall back to environment variables if .env file doesn't exist or failed to load
if not loaded_from_file:
logger.info("No .env file found, loading from environment variables")
for env_key, attr_name in attr_map.items():
value = os.environ.get(env_key, "")
if value:
setattr(self.config, attr_name, value)
return True
# Map environment variables to config attributes
# This works whether values came from .env file or existing environment variables
for env_key, attr_name in attr_map.items():
value = os.environ.get(env_key, "")
if value:
setattr(self.config, attr_name, value)
return loaded_from_file
@ -546,23 +540,19 @@ class EnvManager:
"""Ensure OPENRAG_VERSION is set in .env file to match TUI version."""
try:
from ..utils.version_check import get_current_version
import os
current_version = get_current_version()
if current_version == "unknown":
return
# Check if OPENRAG_VERSION is already set in .env
if self.env_file.exists():
env_content = self.env_file.read_text()
if "OPENRAG_VERSION" in env_content:
# Already set, check if it needs updating
for line in env_content.splitlines():
if line.strip().startswith("OPENRAG_VERSION"):
existing_value = line.split("=", 1)[1].strip()
existing_value = sanitize_env_value(existing_value)
if existing_value == current_version:
# Already correct, no update needed
return
break
# Load .env file using load_dotenv
load_dotenv(dotenv_path=self.env_file, override=False)
existing_value = os.environ.get("OPENRAG_VERSION", "")
if existing_value and existing_value == current_version:
# Already correct, no update needed
return
# Set or update OPENRAG_VERSION
self.config.openrag_version = current_version

View file

@ -41,7 +41,8 @@ class WelcomeScreen(Screen):
self.has_env_file = self.env_manager.env_file.exists()
# Load .env file if it exists
load_dotenv()
# override=True ensures .env file values take precedence over existing environment variables
load_dotenv(override=True)
# Check OAuth config immediately
self.has_oauth_config = bool(os.getenv("GOOGLE_OAUTH_CLIENT_ID")) or bool(

View file

@ -96,21 +96,6 @@ def validate_non_empty(value: str) -> bool:
return bool(value and value.strip())
def sanitize_env_value(value: str) -> str:
"""Sanitize environment variable value."""
# Remove leading/trailing whitespace
value = value.strip()
# Remove quotes if they wrap the entire value
if len(value) >= 2:
if (value.startswith('"') and value.endswith('"')) or (
value.startswith("'") and value.endswith("'")
):
value = value[1:-1]
return value
def validate_documents_paths(paths_str: str) -> tuple[bool, str, list[str]]:
"""
Validate comma-separated documents paths for volume mounting.