Merge pull request #35 from langflow-ai/fix-logger
This commit is contained in:
commit
ffaec28596
7 changed files with 91 additions and 74 deletions
62
src/agent.py
62
src/agent.py
|
|
@ -8,6 +8,7 @@ from services.conversation_persistence_service import conversation_persistence
|
||||||
# In-memory storage for active conversation threads (preserves function calls)
|
# In-memory storage for active conversation threads (preserves function calls)
|
||||||
active_conversations = {}
|
active_conversations = {}
|
||||||
|
|
||||||
|
|
||||||
def get_user_conversations(user_id: str):
|
def get_user_conversations(user_id: str):
|
||||||
"""Get conversation metadata for a user from persistent storage"""
|
"""Get conversation metadata for a user from persistent storage"""
|
||||||
return conversation_persistence.get_user_conversations(user_id)
|
return conversation_persistence.get_user_conversations(user_id)
|
||||||
|
|
@ -23,7 +24,9 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None):
|
||||||
|
|
||||||
# If we have a previous_response_id, try to get the existing conversation
|
# If we have a previous_response_id, try to get the existing conversation
|
||||||
if previous_response_id and previous_response_id in active_conversations[user_id]:
|
if previous_response_id and previous_response_id in active_conversations[user_id]:
|
||||||
logger.debug(f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}")
|
logger.debug(
|
||||||
|
f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}"
|
||||||
|
)
|
||||||
return active_conversations[user_id][previous_response_id]
|
return active_conversations[user_id][previous_response_id]
|
||||||
|
|
||||||
# Create new conversation thread
|
# Create new conversation thread
|
||||||
|
|
@ -64,11 +67,15 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
|
||||||
"created_at": conversation_state.get("created_at"),
|
"created_at": conversation_state.get("created_at"),
|
||||||
"last_activity": conversation_state.get("last_activity"),
|
"last_activity": conversation_state.get("last_activity"),
|
||||||
"previous_response_id": conversation_state.get("previous_response_id"),
|
"previous_response_id": conversation_state.get("previous_response_id"),
|
||||||
"total_messages": len([msg for msg in messages if msg.get("role") in ["user", "assistant"]]),
|
"total_messages": len(
|
||||||
|
[msg for msg in messages if msg.get("role") in ["user", "assistant"]]
|
||||||
|
),
|
||||||
# Don't store actual messages - Langflow has them
|
# Don't store actual messages - Langflow has them
|
||||||
}
|
}
|
||||||
|
|
||||||
conversation_persistence.store_conversation_thread(user_id, response_id, metadata_only)
|
conversation_persistence.store_conversation_thread(
|
||||||
|
user_id, response_id, metadata_only
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# Legacy function for backward compatibility
|
# Legacy function for backward compatibility
|
||||||
|
|
@ -76,8 +83,10 @@ def get_user_conversation(user_id: str):
|
||||||
"""Get the most recent conversation for a user (for backward compatibility)"""
|
"""Get the most recent conversation for a user (for backward compatibility)"""
|
||||||
# Check in-memory conversations first (with function calls)
|
# Check in-memory conversations first (with function calls)
|
||||||
if user_id in active_conversations and active_conversations[user_id]:
|
if user_id in active_conversations and active_conversations[user_id]:
|
||||||
latest_response_id = max(active_conversations[user_id].keys(),
|
latest_response_id = max(
|
||||||
key=lambda k: active_conversations[user_id][k]["last_activity"])
|
active_conversations[user_id].keys(),
|
||||||
|
key=lambda k: active_conversations[user_id][k]["last_activity"],
|
||||||
|
)
|
||||||
return active_conversations[user_id][latest_response_id]
|
return active_conversations[user_id][latest_response_id]
|
||||||
|
|
||||||
# Fallback to metadata-only conversations
|
# Fallback to metadata-only conversations
|
||||||
|
|
@ -342,7 +351,9 @@ async def async_chat(
|
||||||
"content": response_text,
|
"content": response_text,
|
||||||
"response_id": response_id,
|
"response_id": response_id,
|
||||||
"timestamp": datetime.now(),
|
"timestamp": datetime.now(),
|
||||||
"response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls
|
"response_data": response_obj.model_dump()
|
||||||
|
if hasattr(response_obj, "model_dump")
|
||||||
|
else str(response_obj), # Store complete response for function calls
|
||||||
}
|
}
|
||||||
conversation_state["messages"].append(assistant_message)
|
conversation_state["messages"].append(assistant_message)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
|
@ -428,7 +439,7 @@ async def async_chat_stream(
|
||||||
conversation_state["last_activity"] = datetime.now()
|
conversation_state["last_activity"] = datetime.now()
|
||||||
store_conversation_thread(user_id, response_id, conversation_state)
|
store_conversation_thread(user_id, response_id, conversation_state)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Stored conversation thread", user_id=user_id, response_id=response_id
|
f"Stored conversation thread for user {user_id} with response_id: {response_id}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -443,11 +454,8 @@ async def async_langflow_chat(
|
||||||
store_conversation: bool = True,
|
store_conversation: bool = True,
|
||||||
):
|
):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
|
||||||
"async_langflow_chat called",
|
"async_langflow_chat called",
|
||||||
|
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
|
|
||||||
previous_response_id=previous_response_id,
|
previous_response_id=previous_response_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -496,7 +504,9 @@ async def async_langflow_chat(
|
||||||
"content": response_text,
|
"content": response_text,
|
||||||
"response_id": response_id,
|
"response_id": response_id,
|
||||||
"timestamp": datetime.now(),
|
"timestamp": datetime.now(),
|
||||||
"response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls
|
"response_data": response_obj.model_dump()
|
||||||
|
if hasattr(response_obj, "model_dump")
|
||||||
|
else str(response_obj), # Store complete response for function calls
|
||||||
}
|
}
|
||||||
conversation_state["messages"].append(assistant_message)
|
conversation_state["messages"].append(assistant_message)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
|
@ -515,13 +525,14 @@ async def async_langflow_chat(
|
||||||
# Claim session ownership for this user
|
# Claim session ownership for this user
|
||||||
try:
|
try:
|
||||||
from services.session_ownership_service import session_ownership_service
|
from services.session_ownership_service import session_ownership_service
|
||||||
session_ownership_service.claim_session(user_id, response_id)
|
|
||||||
print(f"[DEBUG] Claimed session {response_id} for user {user_id}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"[WARNING] Failed to claim session ownership: {e}")
|
|
||||||
|
|
||||||
print(
|
session_ownership_service.claim_session(user_id, response_id)
|
||||||
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
|
logger.debug(f"Claimed session {response_id} for user {user_id}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to claim session ownership: {e}")
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
|
||||||
)
|
)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Stored langflow conversation thread",
|
"Stored langflow conversation thread",
|
||||||
|
|
@ -616,16 +627,15 @@ async def async_langflow_chat_stream(
|
||||||
# Claim session ownership for this user
|
# Claim session ownership for this user
|
||||||
try:
|
try:
|
||||||
from services.session_ownership_service import session_ownership_service
|
from services.session_ownership_service import session_ownership_service
|
||||||
session_ownership_service.claim_session(user_id, response_id)
|
|
||||||
print(f"[DEBUG] Claimed session {response_id} for user {user_id}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"[WARNING] Failed to claim session ownership: {e}")
|
|
||||||
|
|
||||||
print(
|
session_ownership_service.claim_session(user_id, response_id)
|
||||||
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
|
logger.debug(f"Claimed session {response_id} for user {user_id}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to claim session ownership: {e}")
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
|
||||||
)
|
)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Stored langflow conversation thread",
|
f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
|
||||||
user_id=user_id,
|
|
||||||
response_id=response_id,
|
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
from starlette.responses import JSONResponse
|
from starlette.responses import JSONResponse
|
||||||
|
from utils.logging_config import get_logger
|
||||||
from config.settings import (
|
from config.settings import (
|
||||||
LANGFLOW_URL,
|
LANGFLOW_URL,
|
||||||
LANGFLOW_CHAT_FLOW_ID,
|
LANGFLOW_CHAT_FLOW_ID,
|
||||||
|
|
@ -7,6 +8,9 @@ from config.settings import (
|
||||||
clients,
|
clients,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def get_settings(request, session_manager):
|
async def get_settings(request, session_manager):
|
||||||
"""Get application settings"""
|
"""Get application settings"""
|
||||||
|
|
@ -91,7 +95,7 @@ async def get_settings(request, session_manager):
|
||||||
settings["ingestion_defaults"] = ingestion_defaults
|
settings["ingestion_defaults"] = ingestion_defaults
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[WARNING] Failed to fetch ingestion flow defaults: {e}")
|
logger.warning(f"Failed to fetch ingestion flow defaults: {e}")
|
||||||
# Continue without ingestion defaults
|
# Continue without ingestion defaults
|
||||||
|
|
||||||
return JSONResponse(settings)
|
return JSONResponse(settings)
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,6 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
Integration points:
|
Integration points:
|
||||||
- `BaseConnector` is your project’s base class; minimum methods used here:
|
- `BaseConnector` is your project’s base class; minimum methods used here:
|
||||||
* self.emit(doc: ConnectorDocument) -> None (or adapt to your ingestion pipeline)
|
* self.emit(doc: ConnectorDocument) -> None (or adapt to your ingestion pipeline)
|
||||||
* self.log/info/warn/error (optional)
|
|
||||||
- Adjust paths, logging, and error handling to match your project style.
|
- Adjust paths, logging, and error handling to match your project style.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
@ -81,8 +80,6 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
_FILE_ID_ALIASES = ("file_ids", "selected_file_ids", "selected_files")
|
_FILE_ID_ALIASES = ("file_ids", "selected_file_ids", "selected_files")
|
||||||
_FOLDER_ID_ALIASES = ("folder_ids", "selected_folder_ids", "selected_folders")
|
_FOLDER_ID_ALIASES = ("folder_ids", "selected_folder_ids", "selected_folders")
|
||||||
|
|
||||||
def log(self, message: str) -> None:
|
|
||||||
print(message)
|
|
||||||
|
|
||||||
def emit(self, doc: ConnectorDocument) -> None:
|
def emit(self, doc: ConnectorDocument) -> None:
|
||||||
"""
|
"""
|
||||||
|
|
@ -91,7 +88,7 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
"""
|
"""
|
||||||
# If BaseConnector has an emit method, call super().emit(doc)
|
# If BaseConnector has an emit method, call super().emit(doc)
|
||||||
# Otherwise, implement your custom logic here.
|
# Otherwise, implement your custom logic here.
|
||||||
print(f"Emitting document: {doc.id} ({doc.filename})")
|
logger.debug(f"Emitting document: {doc.id} ({doc.filename})")
|
||||||
|
|
||||||
def __init__(self, config: Dict[str, Any]) -> None:
|
def __init__(self, config: Dict[str, Any]) -> None:
|
||||||
# Read from config OR env (backend env, not NEXT_PUBLIC_*):
|
# Read from config OR env (backend env, not NEXT_PUBLIC_*):
|
||||||
|
|
@ -433,7 +430,7 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
|
|
||||||
# If still not authenticated, bail (caller should kick off OAuth init)
|
# If still not authenticated, bail (caller should kick off OAuth init)
|
||||||
if not await self.oauth.is_authenticated():
|
if not await self.oauth.is_authenticated():
|
||||||
self.log("authenticate: no valid credentials; run OAuth init/callback first.")
|
logger.debug("authenticate: no valid credentials; run OAuth init/callback first.")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Build Drive service from OAuth helper
|
# Build Drive service from OAuth helper
|
||||||
|
|
@ -482,7 +479,7 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Optionally log error with your base class logger
|
# Optionally log error with your base class logger
|
||||||
try:
|
try:
|
||||||
self.log(f"GoogleDriveConnector.list_files failed: {e}")
|
logger.error(f"GoogleDriveConnector.list_files failed: {e}")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
return {"files": [], "next_page_token": None}
|
return {"files": [], "next_page_token": None}
|
||||||
|
|
@ -500,7 +497,7 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Use your base class logger if available
|
# Use your base class logger if available
|
||||||
try:
|
try:
|
||||||
self.log(f"Download failed for {file_id}: {e}")
|
logger.error(f"Download failed for {file_id}: {e}")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
raise
|
raise
|
||||||
|
|
@ -567,7 +564,7 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Optional: use your base logger
|
# Optional: use your base logger
|
||||||
try:
|
try:
|
||||||
self.log(f"Failed to get start page token: {e}")
|
logger.error(f"Failed to get start page token: {e}")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
raise
|
raise
|
||||||
|
|
@ -634,7 +631,7 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
ok = await self.authenticate()
|
ok = await self.authenticate()
|
||||||
if not ok:
|
if not ok:
|
||||||
try:
|
try:
|
||||||
self.log("cleanup_subscription: not authenticated")
|
logger.error("cleanup_subscription: not authenticated")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
return False
|
return False
|
||||||
|
|
@ -662,7 +659,7 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
|
|
||||||
if not resource_id:
|
if not resource_id:
|
||||||
try:
|
try:
|
||||||
self.log(
|
logger.error(
|
||||||
f"cleanup_subscription: missing resource_id for channel {subscription_id}. "
|
f"cleanup_subscription: missing resource_id for channel {subscription_id}. "
|
||||||
f"Persist (channel_id, resource_id) when creating the subscription."
|
f"Persist (channel_id, resource_id) when creating the subscription."
|
||||||
)
|
)
|
||||||
|
|
@ -684,7 +681,7 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
try:
|
try:
|
||||||
self.log(f"cleanup_subscription failed for {subscription_id}: {e}")
|
logger.error(f"cleanup_subscription failed for {subscription_id}: {e}")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
return False
|
return False
|
||||||
|
|
@ -708,7 +705,7 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
ok = await self.authenticate()
|
ok = await self.authenticate()
|
||||||
if not ok:
|
if not ok:
|
||||||
try:
|
try:
|
||||||
self.log("handle_webhook: not authenticated")
|
logger.error("handle_webhook: not authenticated")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
return affected
|
return affected
|
||||||
|
|
@ -728,7 +725,7 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
selected_ids = set()
|
selected_ids = set()
|
||||||
try:
|
try:
|
||||||
self.log(f"handle_webhook: scope build failed, proceeding unfiltered: {e}")
|
logger.error(f"handle_webhook: scope build failed, proceeding unfiltered: {e}")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
@ -797,7 +794,7 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
try:
|
try:
|
||||||
self.log(f"handle_webhook failed: {e}")
|
logger.error(f"handle_webhook failed: {e}")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
return []
|
return []
|
||||||
|
|
@ -814,7 +811,7 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
blob = self._download_file_bytes(meta)
|
blob = self._download_file_bytes(meta)
|
||||||
except HttpError as e:
|
except HttpError as e:
|
||||||
# Skip/record failures
|
# Skip/record failures
|
||||||
self.log(f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}")
|
logger.error(f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
|
||||||
|
|
@ -391,7 +391,7 @@ class ChatService:
|
||||||
local_metadata[response_id] = conversation_metadata
|
local_metadata[response_id] = conversation_metadata
|
||||||
|
|
||||||
# 2. Get actual conversations from Langflow database (source of truth for messages)
|
# 2. Get actual conversations from Langflow database (source of truth for messages)
|
||||||
print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}")
|
logger.debug(f"Attempting to fetch Langflow history for user: {user_id}")
|
||||||
langflow_history = (
|
langflow_history = (
|
||||||
await langflow_history_service.get_user_conversation_history(
|
await langflow_history_service.get_user_conversation_history(
|
||||||
user_id, flow_id=LANGFLOW_CHAT_FLOW_ID
|
user_id, flow_id=LANGFLOW_CHAT_FLOW_ID
|
||||||
|
|
@ -462,24 +462,24 @@ class ChatService:
|
||||||
)
|
)
|
||||||
|
|
||||||
if langflow_history.get("conversations"):
|
if langflow_history.get("conversations"):
|
||||||
print(
|
logger.debug(
|
||||||
f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow"
|
f"Added {len(langflow_history['conversations'])} historical conversations from Langflow"
|
||||||
)
|
)
|
||||||
elif langflow_history.get("error"):
|
elif langflow_history.get("error"):
|
||||||
print(
|
logger.debug(
|
||||||
f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}"
|
f"Could not fetch Langflow history for user {user_id}: {langflow_history['error']}"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
print(f"[DEBUG] No Langflow conversations found for user {user_id}")
|
logger.debug(f"No Langflow conversations found for user {user_id}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[ERROR] Failed to fetch Langflow history: {e}")
|
logger.error(f"Failed to fetch Langflow history: {e}")
|
||||||
# Continue with just in-memory conversations
|
# Continue with just in-memory conversations
|
||||||
|
|
||||||
# Sort by last activity (most recent first)
|
# Sort by last activity (most recent first)
|
||||||
all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
|
all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
|
||||||
|
|
||||||
print(
|
logger.debug(
|
||||||
f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)"
|
f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,9 @@ import os
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import threading
|
import threading
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
class ConversationPersistenceService:
|
class ConversationPersistenceService:
|
||||||
"""Simple service to persist conversations to disk"""
|
"""Simple service to persist conversations to disk"""
|
||||||
|
|
@ -24,10 +26,10 @@ class ConversationPersistenceService:
|
||||||
try:
|
try:
|
||||||
with open(self.storage_file, 'r', encoding='utf-8') as f:
|
with open(self.storage_file, 'r', encoding='utf-8') as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
print(f"Loaded {self._count_total_conversations(data)} conversations from {self.storage_file}")
|
logger.debug(f"Loaded {self._count_total_conversations(data)} conversations from {self.storage_file}")
|
||||||
return data
|
return data
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error loading conversations from {self.storage_file}: {e}")
|
logger.error(f"Error loading conversations from {self.storage_file}: {e}")
|
||||||
return {}
|
return {}
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|
@ -37,9 +39,9 @@ class ConversationPersistenceService:
|
||||||
with self.lock:
|
with self.lock:
|
||||||
with open(self.storage_file, 'w', encoding='utf-8') as f:
|
with open(self.storage_file, 'w', encoding='utf-8') as f:
|
||||||
json.dump(self._conversations, f, indent=2, ensure_ascii=False, default=str)
|
json.dump(self._conversations, f, indent=2, ensure_ascii=False, default=str)
|
||||||
print(f"Saved {self._count_total_conversations(self._conversations)} conversations to {self.storage_file}")
|
logger.debug(f"Saved {self._count_total_conversations(self._conversations)} conversations to {self.storage_file}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error saving conversations to {self.storage_file}: {e}")
|
logger.error(f"Error saving conversations to {self.storage_file}: {e}")
|
||||||
|
|
||||||
def _count_total_conversations(self, data: Dict[str, Any]) -> int:
|
def _count_total_conversations(self, data: Dict[str, Any]) -> int:
|
||||||
"""Count total conversations across all users"""
|
"""Count total conversations across all users"""
|
||||||
|
|
@ -89,14 +91,14 @@ class ConversationPersistenceService:
|
||||||
if user_id in self._conversations and response_id in self._conversations[user_id]:
|
if user_id in self._conversations and response_id in self._conversations[user_id]:
|
||||||
del self._conversations[user_id][response_id]
|
del self._conversations[user_id][response_id]
|
||||||
self._save_conversations()
|
self._save_conversations()
|
||||||
print(f"Deleted conversation {response_id} for user {user_id}")
|
logger.debug(f"Deleted conversation {response_id} for user {user_id}")
|
||||||
|
|
||||||
def clear_user_conversations(self, user_id: str):
|
def clear_user_conversations(self, user_id: str):
|
||||||
"""Clear all conversations for a user"""
|
"""Clear all conversations for a user"""
|
||||||
if user_id in self._conversations:
|
if user_id in self._conversations:
|
||||||
del self._conversations[user_id]
|
del self._conversations[user_id]
|
||||||
self._save_conversations()
|
self._save_conversations()
|
||||||
print(f"Cleared all conversations for user {user_id}")
|
logger.debug(f"Cleared all conversations for user {user_id}")
|
||||||
|
|
||||||
def get_storage_stats(self) -> Dict[str, Any]:
|
def get_storage_stats(self) -> Dict[str, Any]:
|
||||||
"""Get statistics about stored conversations"""
|
"""Get statistics about stored conversations"""
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,9 @@ Simplified service that retrieves message history from Langflow using shared cli
|
||||||
from typing import List, Dict, Optional, Any
|
from typing import List, Dict, Optional, Any
|
||||||
|
|
||||||
from config.settings import clients
|
from config.settings import clients
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
class LangflowHistoryService:
|
class LangflowHistoryService:
|
||||||
"""Simplified service to retrieve message history from Langflow"""
|
"""Simplified service to retrieve message history from Langflow"""
|
||||||
|
|
@ -29,14 +31,14 @@ class LangflowHistoryService:
|
||||||
|
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
session_ids = response.json()
|
session_ids = response.json()
|
||||||
print(f"Found {len(session_ids)} total sessions from Langflow")
|
logger.debug(f"Found {len(session_ids)} total sessions from Langflow")
|
||||||
return session_ids
|
return session_ids
|
||||||
else:
|
else:
|
||||||
print(f"Failed to get sessions: {response.status_code} - {response.text}")
|
logger.error(f"Failed to get sessions: {response.status_code} - {response.text}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error getting user sessions: {e}")
|
logger.error(f"Error getting user sessions: {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]:
|
async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]:
|
||||||
|
|
@ -56,11 +58,11 @@ class LangflowHistoryService:
|
||||||
# Convert to OpenRAG format
|
# Convert to OpenRAG format
|
||||||
return self._convert_langflow_messages(messages)
|
return self._convert_langflow_messages(messages)
|
||||||
else:
|
else:
|
||||||
print(f"Failed to get messages for session {session_id}: {response.status_code}")
|
logger.error(f"Failed to get messages for session {session_id}: {response.status_code}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error getting session messages: {e}")
|
logger.error(f"Error getting session messages: {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def _convert_langflow_messages(self, langflow_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
def _convert_langflow_messages(self, langflow_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||||
|
|
@ -114,7 +116,7 @@ class LangflowHistoryService:
|
||||||
converted_messages.append(converted_msg)
|
converted_messages.append(converted_msg)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error converting message: {e}")
|
logger.error(f"Error converting message: {e}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
return converted_messages
|
return converted_messages
|
||||||
|
|
@ -159,7 +161,7 @@ class LangflowHistoryService:
|
||||||
}
|
}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error getting user conversation history: {e}")
|
logger.error(f"Error getting user conversation history: {e}")
|
||||||
return {
|
return {
|
||||||
"error": str(e),
|
"error": str(e),
|
||||||
"conversations": []
|
"conversations": []
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,9 @@ import json
|
||||||
import os
|
import os
|
||||||
from typing import Dict, List, Optional
|
from typing import Dict, List, Optional
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
class SessionOwnershipService:
|
class SessionOwnershipService:
|
||||||
"""Simple service to track which user owns which session"""
|
"""Simple service to track which user owns which session"""
|
||||||
|
|
@ -23,7 +25,7 @@ class SessionOwnershipService:
|
||||||
with open(self.ownership_file, 'r') as f:
|
with open(self.ownership_file, 'r') as f:
|
||||||
return json.load(f)
|
return json.load(f)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error loading session ownership data: {e}")
|
logger.error(f"Error loading session ownership data: {e}")
|
||||||
return {}
|
return {}
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|
@ -32,9 +34,9 @@ class SessionOwnershipService:
|
||||||
try:
|
try:
|
||||||
with open(self.ownership_file, 'w') as f:
|
with open(self.ownership_file, 'w') as f:
|
||||||
json.dump(self.ownership_data, f, indent=2)
|
json.dump(self.ownership_data, f, indent=2)
|
||||||
print(f"Saved session ownership data to {self.ownership_file}")
|
logger.debug(f"Saved session ownership data to {self.ownership_file}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error saving session ownership data: {e}")
|
logger.error(f"Error saving session ownership data: {e}")
|
||||||
|
|
||||||
def claim_session(self, user_id: str, session_id: str):
|
def claim_session(self, user_id: str, session_id: str):
|
||||||
"""Claim a session for a user"""
|
"""Claim a session for a user"""
|
||||||
|
|
@ -45,7 +47,7 @@ class SessionOwnershipService:
|
||||||
"last_accessed": datetime.now().isoformat()
|
"last_accessed": datetime.now().isoformat()
|
||||||
}
|
}
|
||||||
self._save_ownership_data()
|
self._save_ownership_data()
|
||||||
print(f"Claimed session {session_id} for user {user_id}")
|
logger.debug(f"Claimed session {session_id} for user {user_id}")
|
||||||
else:
|
else:
|
||||||
# Update last accessed time
|
# Update last accessed time
|
||||||
self.ownership_data[session_id]["last_accessed"] = datetime.now().isoformat()
|
self.ownership_data[session_id]["last_accessed"] = datetime.now().isoformat()
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue