monitor service and kf subscription WIP

This commit is contained in:
phact 2025-08-22 01:09:18 -04:00
parent 147253716c
commit ab9ffa25bc
5 changed files with 636 additions and 3 deletions

View file

@ -7,6 +7,13 @@ openrag_user_role:
cluster_permissions:
- "indices:data/write/bulk"
- "indices:data/write/index"
- "cluster:admin/opensearch/notifications/configs/create"
- "cluster:admin/opensearch/notifications/configs/list"
- "cluster:admin/opensearch/notifications/configs/get"
- "cluster:admin/opensearch/notifications/configs/update"
- "cluster:admin/opensearch/notifications/configs/delete"
- "cluster:admin/opensearch/alerting/*"
index_permissions:
- index_patterns: ["documents", "documents*", "knowledge_filters", "knowledge_filters*"]
allowed_actions:
@ -17,4 +24,5 @@ openrag_user_role:
{"term":{"owner":"${user.name}"}},
{"term":{"allowed_users":"${user.name}"}},
{"bool":{"must_not":{"exists":{"field":"owner"}}}}
],"minimum_should_match":1}}
],"minimum_should_match":1}}

View file

@ -1,6 +1,7 @@
from starlette.requests import Request
from starlette.responses import JSONResponse
import uuid
import json
from datetime import datetime
async def create_knowledge_filter(request: Request, knowledge_filter_service, session_manager):
@ -160,4 +161,184 @@ async def delete_knowledge_filter(request: Request, knowledge_filter_service, se
elif "access denied" in error_msg.lower() or "insufficient permissions" in error_msg.lower():
return JSONResponse(result, status_code=403)
else:
return JSONResponse(result, status_code=500)
return JSONResponse(result, status_code=500)
async def subscribe_to_knowledge_filter(request: Request, knowledge_filter_service, monitor_service, session_manager):
"""Create a subscription to a knowledge filter"""
filter_id = request.path_params.get("filter_id")
if not filter_id:
return JSONResponse({"error": "Knowledge filter ID is required"}, status_code=400)
payload = await request.json()
user = request.state.user
jwt_token = request.cookies.get("auth_token")
# Get the knowledge filter to validate it exists and get its details
filter_result = await knowledge_filter_service.get_knowledge_filter(filter_id, user_id=user.user_id, jwt_token=jwt_token)
if not filter_result.get("success"):
return JSONResponse({"error": "Knowledge filter not found or access denied"}, status_code=404)
filter_doc = filter_result["filter"]
# Create the monitor for this subscription
monitor_result = await monitor_service.create_knowledge_filter_monitor(
filter_id=filter_id,
filter_name=filter_doc["name"],
query_data=filter_doc["query_data"],
user_id=user.user_id,
jwt_token=jwt_token,
notification_config=payload.get("notification_config")
)
if not monitor_result.get("success"):
return JSONResponse(monitor_result, status_code=500)
# Store subscription info in the knowledge filter document
subscription_data = {
"subscription_id": monitor_result["subscription_id"],
"monitor_id": monitor_result["monitor_id"],
"webhook_url": monitor_result["webhook_url"],
"created_at": datetime.utcnow().isoformat(),
"notification_config": payload.get("notification_config", {})
}
# Add subscription to the filter document
update_result = await knowledge_filter_service.add_subscription(
filter_id, subscription_data, user_id=user.user_id, jwt_token=jwt_token
)
if update_result.get("success"):
return JSONResponse({
"success": True,
"subscription_id": monitor_result["subscription_id"],
"monitor_id": monitor_result["monitor_id"],
"webhook_url": monitor_result["webhook_url"],
"message": f"Successfully subscribed to knowledge filter: {filter_doc['name']}"
}, status_code=201)
else:
# If we can't update the filter doc, clean up the monitor
await monitor_service.delete_monitor(monitor_result["monitor_id"], user.user_id, jwt_token)
return JSONResponse({"error": "Failed to create subscription"}, status_code=500)
async def list_knowledge_filter_subscriptions(request: Request, knowledge_filter_service, session_manager):
"""List subscriptions for a knowledge filter"""
filter_id = request.path_params.get("filter_id")
if not filter_id:
return JSONResponse({"error": "Knowledge filter ID is required"}, status_code=400)
user = request.state.user
jwt_token = request.cookies.get("auth_token")
result = await knowledge_filter_service.get_filter_subscriptions(filter_id, user_id=user.user_id, jwt_token=jwt_token)
if result.get("success"):
return JSONResponse(result, status_code=200)
else:
error_msg = result.get("error", "")
if "not found" in error_msg.lower():
return JSONResponse(result, status_code=404)
elif "access denied" in error_msg.lower():
return JSONResponse(result, status_code=403)
else:
return JSONResponse(result, status_code=500)
async def cancel_knowledge_filter_subscription(request: Request, knowledge_filter_service, monitor_service, session_manager):
"""Cancel a subscription to a knowledge filter"""
filter_id = request.path_params.get("filter_id")
subscription_id = request.path_params.get("subscription_id")
if not filter_id or not subscription_id:
return JSONResponse({"error": "Knowledge filter ID and subscription ID are required"}, status_code=400)
user = request.state.user
jwt_token = request.cookies.get("auth_token")
# Get subscription details to find the monitor ID
subscriptions_result = await knowledge_filter_service.get_filter_subscriptions(filter_id, user_id=user.user_id, jwt_token=jwt_token)
if not subscriptions_result.get("success"):
return JSONResponse({"error": "Knowledge filter not found or access denied"}, status_code=404)
# Find the specific subscription
subscription = None
for sub in subscriptions_result.get("subscriptions", []):
if sub.get("subscription_id") == subscription_id:
subscription = sub
break
if not subscription:
return JSONResponse({"error": "Subscription not found"}, status_code=404)
# Delete the monitor
monitor_result = await monitor_service.delete_monitor(
subscription["monitor_id"], user.user_id, jwt_token
)
# Remove subscription from the filter document
remove_result = await knowledge_filter_service.remove_subscription(
filter_id, subscription_id, user_id=user.user_id, jwt_token=jwt_token
)
if remove_result.get("success"):
return JSONResponse({
"success": True,
"message": "Subscription cancelled successfully"
}, status_code=200)
else:
return JSONResponse({"error": "Failed to cancel subscription"}, status_code=500)
async def knowledge_filter_webhook(request: Request, knowledge_filter_service, session_manager):
"""Handle webhook notifications from OpenSearch monitors"""
filter_id = request.path_params.get("filter_id")
subscription_id = request.path_params.get("subscription_id")
if not filter_id or not subscription_id:
return JSONResponse({"error": "Invalid webhook URL"}, status_code=400)
try:
# Get the webhook payload
payload = await request.json()
print(f"[WEBHOOK] Knowledge filter webhook received for filter {filter_id}, subscription {subscription_id}")
print(f"[WEBHOOK] Payload: {json.dumps(payload, indent=2)}")
# Extract findings from the payload
findings = payload.get("findings", [])
if not findings:
print(f"[WEBHOOK] No findings in webhook payload for subscription {subscription_id}")
return JSONResponse({"status": "no_findings"})
# Process the findings - these are the documents that matched the knowledge filter
matched_documents = []
for finding in findings:
# Extract document information from the finding
matched_documents.append({
"document_id": finding.get("_id"),
"index": finding.get("_index"),
"source": finding.get("_source", {}),
"score": finding.get("_score")
})
# Log the matched documents
print(f"[WEBHOOK] Knowledge filter {filter_id} matched {len(matched_documents)} documents")
for doc in matched_documents:
print(f"[WEBHOOK] Matched document: {doc['document_id']} from index {doc['index']}")
# Here you could add additional processing:
# - Send notifications to external webhooks
# - Email notifications
# - Store alerts in a database
# - Trigger other workflows
return JSONResponse({
"status": "processed",
"filter_id": filter_id,
"subscription_id": subscription_id,
"matched_documents": len(matched_documents),
"timestamp": datetime.utcnow().isoformat()
})
except Exception as e:
print(f"[ERROR] Failed to process knowledge filter webhook: {str(e)}")
import traceback
traceback.print_exc()
return JSONResponse({"error": f"Webhook processing failed: {str(e)}"}, status_code=500)

View file

@ -26,6 +26,7 @@ from services.task_service import TaskService
from services.auth_service import AuthService
from services.chat_service import ChatService
from services.knowledge_filter_service import KnowledgeFilterService
from services.monitor_service import MonitorService
# Existing services
from connectors.service import ConnectorService
@ -55,6 +56,27 @@ async def wait_for_opensearch():
else:
raise Exception("OpenSearch failed to become ready")
async def configure_alerting_security():
"""Configure OpenSearch alerting plugin security settings"""
try:
# For testing, disable backend role filtering to allow all authenticated users
# In production, you'd want to configure proper roles instead
alerting_settings = {
"persistent": {
"plugins.alerting.filter_by_backend_roles": "false",
"opendistro.alerting.filter_by_backend_roles": "false",
"opensearch.notifications.general.filter_by_backend_roles": "false"
}
}
# Use admin client (clients.opensearch uses admin credentials)
response = await clients.opensearch.cluster.put_settings(body=alerting_settings)
print("Alerting security settings configured successfully")
print(f"Response: {response}")
except Exception as e:
print(f"Warning: Failed to configure alerting security settings: {e}")
# Don't fail startup if alerting config fails
async def init_index():
"""Initialize OpenSearch index and security roles"""
await wait_for_opensearch()
@ -78,6 +100,7 @@ async def init_index():
"owner": {"type": "keyword"},
"allowed_users": {"type": "keyword"},
"allowed_groups": {"type": "keyword"},
"subscriptions": {"type": "object"}, # Store subscription data
"created_at": {"type": "date"},
"updated_at": {"type": "date"}
}
@ -89,6 +112,9 @@ async def init_index():
print(f"Created index '{knowledge_filter_index_name}'")
else:
print(f"Index '{knowledge_filter_index_name}' already exists, skipping creation.")
# Configure alerting plugin security settings
await configure_alerting_security()
def generate_jwt_keys():
"""Generate RSA keys for JWT signing if they don't exist"""
@ -146,6 +172,7 @@ def initialize_services():
task_service = TaskService(document_service, process_pool)
chat_service = ChatService()
knowledge_filter_service = KnowledgeFilterService(session_manager)
monitor_service = MonitorService(session_manager)
# Set process pool for document service
document_service.process_pool = process_pool
@ -171,6 +198,7 @@ def initialize_services():
'auth_service': auth_service,
'connector_service': connector_service,
'knowledge_filter_service': knowledge_filter_service,
'monitor_service': monitor_service,
'session_manager': session_manager
}
@ -281,6 +309,37 @@ def create_app():
session_manager=services['session_manager'])
), methods=["DELETE"]),
# Knowledge Filter Subscription endpoints
Route("/knowledge-filter/{filter_id}/subscribe",
require_auth(services['session_manager'])(
partial(knowledge_filter.subscribe_to_knowledge_filter,
knowledge_filter_service=services['knowledge_filter_service'],
monitor_service=services['monitor_service'],
session_manager=services['session_manager'])
), methods=["POST"]),
Route("/knowledge-filter/{filter_id}/subscriptions",
require_auth(services['session_manager'])(
partial(knowledge_filter.list_knowledge_filter_subscriptions,
knowledge_filter_service=services['knowledge_filter_service'],
session_manager=services['session_manager'])
), methods=["GET"]),
Route("/knowledge-filter/{filter_id}/subscribe/{subscription_id}",
require_auth(services['session_manager'])(
partial(knowledge_filter.cancel_knowledge_filter_subscription,
knowledge_filter_service=services['knowledge_filter_service'],
monitor_service=services['monitor_service'],
session_manager=services['session_manager'])
), methods=["DELETE"]),
# Knowledge Filter Webhook endpoint (no auth required - called by OpenSearch)
Route("/knowledge-filter/{filter_id}/webhook/{subscription_id}",
partial(knowledge_filter.knowledge_filter_webhook,
knowledge_filter_service=services['knowledge_filter_service'],
session_manager=services['session_manager']),
methods=["POST"]),
# Chat endpoints
Route("/chat",
require_auth(services['session_manager'])(

View file

@ -134,4 +134,104 @@ class KnowledgeFilterService:
elif "AuthenticationException" in error_str:
return {"success": False, "error": "Access denied: insufficient permissions"}
else:
return {"success": False, "error": f"Delete operation failed: {error_str}"}
return {"success": False, "error": f"Delete operation failed: {error_str}"}
async def add_subscription(self, filter_id: str, subscription_data: Dict[str, Any], user_id: str = None, jwt_token: str = None) -> Dict[str, Any]:
"""Add a subscription to a knowledge filter"""
try:
opensearch_client = self.session_manager.get_user_opensearch_client(user_id, jwt_token)
# Get the current filter document
filter_result = await self.get_knowledge_filter(filter_id, user_id, jwt_token)
if not filter_result.get("success"):
return filter_result
filter_doc = filter_result["filter"]
# Add subscription to the subscriptions array
subscriptions = filter_doc.get("subscriptions", [])
subscriptions.append(subscription_data)
# Update the filter document
update_body = {
"doc": {
"subscriptions": subscriptions,
"updated_at": subscription_data["created_at"] # Use the same timestamp
}
}
result = await opensearch_client.update(
index=KNOWLEDGE_FILTERS_INDEX_NAME,
id=filter_id,
body=update_body
)
if result.get("result") in ["updated", "noop"]:
return {"success": True, "subscription": subscription_data}
else:
return {"success": False, "error": "Failed to add subscription"}
except Exception as e:
return {"success": False, "error": str(e)}
async def remove_subscription(self, filter_id: str, subscription_id: str, user_id: str = None, jwt_token: str = None) -> Dict[str, Any]:
"""Remove a subscription from a knowledge filter"""
try:
opensearch_client = self.session_manager.get_user_opensearch_client(user_id, jwt_token)
# Get the current filter document
filter_result = await self.get_knowledge_filter(filter_id, user_id, jwt_token)
if not filter_result.get("success"):
return filter_result
filter_doc = filter_result["filter"]
# Remove subscription from the subscriptions array
subscriptions = filter_doc.get("subscriptions", [])
updated_subscriptions = [sub for sub in subscriptions if sub.get("subscription_id") != subscription_id]
if len(updated_subscriptions) == len(subscriptions):
return {"success": False, "error": "Subscription not found"}
# Update the filter document
from datetime import datetime
update_body = {
"doc": {
"subscriptions": updated_subscriptions,
"updated_at": datetime.utcnow().isoformat()
}
}
result = await opensearch_client.update(
index=KNOWLEDGE_FILTERS_INDEX_NAME,
id=filter_id,
body=update_body
)
if result.get("result") in ["updated", "noop"]:
return {"success": True, "message": "Subscription removed successfully"}
else:
return {"success": False, "error": "Failed to remove subscription"}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_filter_subscriptions(self, filter_id: str, user_id: str = None, jwt_token: str = None) -> Dict[str, Any]:
"""Get all subscriptions for a knowledge filter"""
try:
filter_result = await self.get_knowledge_filter(filter_id, user_id, jwt_token)
if not filter_result.get("success"):
return filter_result
filter_doc = filter_result["filter"]
subscriptions = filter_doc.get("subscriptions", [])
return {
"success": True,
"filter_id": filter_id,
"filter_name": filter_doc.get("name"),
"subscriptions": subscriptions
}
except Exception as e:
return {"success": False, "error": str(e), "subscriptions": []}

View file

@ -0,0 +1,285 @@
import uuid
import json
from typing import Any, Dict, Optional, List
from datetime import datetime
from config.settings import clients
class MonitorService:
def __init__(self, session_manager=None, webhook_base_url: str = None):
self.session_manager = session_manager
self.webhook_base_url = webhook_base_url or "http://openrag-backend:8000"
async def create_knowledge_filter_monitor(
self,
filter_id: str,
filter_name: str,
query_data: Dict[str, Any],
user_id: str,
jwt_token: str,
notification_config: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Create a document-level monitor for a knowledge filter"""
try:
opensearch_client = self.session_manager.get_user_opensearch_client(user_id, jwt_token)
subscription_id = str(uuid.uuid4())
webhook_url = f"{self.webhook_base_url}/knowledge-filter/{filter_id}/webhook/{subscription_id}"
# Convert knowledge filter query to monitor query format
monitor_query = self._convert_kf_query_to_monitor_query(query_data)
# TODO: OpenSearch 3.0 has a bug with document-level monitors on indexes with KNN fields
# Error: "Cannot invoke KNNMethodConfigContext.getVectorDataType() because knnMethodConfigContext is null"
# Consider using query-level monitors instead or excluding KNN fields from doc-level monitors
# For now, this will fail on the 'documents' index due to chunk_embedding KNN field
# Create the document-level monitor
monitor_body = {
"type": "monitor",
"monitor_type": "doc_level_monitor",
"name": f"KF Monitor: {filter_name}",
"enabled": True,
"schedule": {
"period": {
"interval": 1,
"unit": "MINUTES"
}
},
"inputs": [
{
"doc_level_input": {
"description": f"Monitor for knowledge filter: {filter_name}",
"indices": ["documents"],
"queries": [
{
"id": f"kf_query_{filter_id}",
"name": f"Knowledge Filter Query: {filter_name}",
"query": monitor_query,
"tags": [f"knowledge_filter:{filter_id}", f"user:{user_id}"]
}
]
}
}
],
"triggers": [
{
"document_level_trigger": {
"name": f"KF Trigger: {filter_name}",
"severity": "1",
"condition": {
"script": {
"source": "return true",
"lang": "painless"
}
},
"actions": [
{
"name": f"KF Webhook Action: {filter_name}",
"destination_id": await self._get_or_create_webhook_destination(
webhook_url, opensearch_client
),
"subject_template": {
"source": f"Knowledge Filter Alert: {filter_name}",
"lang": "mustache"
},
"message_template": {
"source": json.dumps({
"filter_id": filter_id,
"filter_name": filter_name,
"subscription_id": subscription_id,
"user_id": user_id,
"timestamp": "{{ctx.trigger.timestamp}}",
"findings": "{{ctx.results.0.hits.hits}}"
}),
"lang": "mustache"
}
}
]
}
}
]
}
# Create the monitor
response = await opensearch_client.transport.perform_request(
"POST",
"/_plugins/_alerting/monitors",
body=monitor_body
)
if response.get("_id"):
return {
"success": True,
"monitor_id": response["_id"],
"subscription_id": subscription_id,
"webhook_url": webhook_url,
"message": f"Monitor created successfully for knowledge filter: {filter_name}"
}
else:
return {"success": False, "error": "Failed to create monitor"}
except Exception as e:
return {"success": False, "error": f"Monitor creation failed: {str(e)}"}
async def delete_monitor(self, monitor_id: str, user_id: str, jwt_token: str) -> Dict[str, Any]:
"""Delete a document-level monitor"""
try:
opensearch_client = self.session_manager.get_user_opensearch_client(user_id, jwt_token)
response = await opensearch_client.transport.perform_request(
"DELETE",
f"/_plugins/_alerting/monitors/{monitor_id}"
)
if response.get("result") == "deleted":
return {"success": True, "message": "Monitor deleted successfully"}
else:
return {"success": False, "error": "Failed to delete monitor"}
except Exception as e:
return {"success": False, "error": f"Monitor deletion failed: {str(e)}"}
async def get_monitor(self, monitor_id: str, user_id: str, jwt_token: str) -> Dict[str, Any]:
"""Get monitor details"""
try:
opensearch_client = self.session_manager.get_user_opensearch_client(user_id, jwt_token)
response = await opensearch_client.transport.perform_request(
"GET",
f"/_plugins/_alerting/monitors/{monitor_id}"
)
if response.get("_id"):
return {"success": True, "monitor": response}
else:
return {"success": False, "error": "Monitor not found"}
except Exception as e:
return {"success": False, "error": f"Failed to get monitor: {str(e)}"}
async def list_user_monitors(self, user_id: str, jwt_token: str, limit: int = 50) -> List[Dict[str, Any]]:
"""List all monitors for a specific user"""
try:
opensearch_client = self.session_manager.get_user_opensearch_client(user_id, jwt_token)
# Search for all monitors (DLS will filter to user's monitors automatically)
search_body = {
"query": {
"bool": {
"must": [
{"term": {"monitor.type": "doc_level_monitor"}}
]
}
},
"sort": [{"monitor.last_update_time": {"order": "desc"}}],
"size": limit
}
response = await opensearch_client.search(
index=".opendistro-alerting-config",
body=search_body
)
monitors = []
for hit in response.get("hits", {}).get("hits", []):
monitor_data = hit["_source"]
monitor_data["monitor_id"] = hit["_id"]
monitors.append(monitor_data)
return monitors
except Exception as e:
print(f"Error listing monitors for user {user_id}: {e}")
return []
async def list_monitors_for_filter(self, filter_id: str, user_id: str, jwt_token: str) -> List[Dict[str, Any]]:
"""List all monitors for a specific knowledge filter"""
try:
opensearch_client = self.session_manager.get_user_opensearch_client(user_id, jwt_token)
# Search for monitors with the knowledge filter tag
search_body = {
"query": {
"bool": {
"must": [
{"term": {"monitor.type": "doc_level_monitor"}},
{"term": {"monitor.inputs.doc_level_input.queries.tags": f"knowledge_filter:{filter_id}"}}
]
}
}
}
response = await opensearch_client.search(
index=".opendistro-alerting-config",
body=search_body
)
monitors = []
for hit in response.get("hits", {}).get("hits", []):
monitor_data = hit["_source"]
monitor_data["monitor_id"] = hit["_id"]
monitors.append(monitor_data)
return monitors
except Exception as e:
print(f"Error listing monitors for filter {filter_id}: {e}")
return []
async def _get_or_create_webhook_destination(self, webhook_url: str, opensearch_client) -> str:
"""Get or create a webhook destination for notifications"""
try:
# Try to find existing webhook destination
search_response = await opensearch_client.transport.perform_request(
"GET",
"/_plugins/_notifications/configs",
params={"config_type": "webhook"}
)
# Check if we already have a destination for this webhook URL
for config in search_response.get("config_list", []):
if config.get("config", {}).get("webhook", {}).get("url") == webhook_url:
return config["config_id"]
# Create new webhook destination
destination_body = {
"config": {
"name": f"KF Webhook {str(uuid.uuid4())[:8]}",
"description": "Knowledge Filter webhook notification",
"config_type": "webhook",
"is_enabled": True,
"webhook": {
"url": webhook_url,
"method": "POST",
"header_params": {
"Content-Type": "application/json"
}
}
}
}
response = await opensearch_client.transport.perform_request(
"POST",
"/_plugins/_notifications/configs",
body=destination_body
)
return response.get("config_id")
except Exception as e:
raise Exception(f"Failed to create webhook destination: {str(e)}")
def _convert_kf_query_to_monitor_query(self, query_data: Dict[str, Any]) -> Dict[str, Any]:
"""Convert knowledge filter query format to OpenSearch monitor query format"""
# This assumes the query_data contains an OpenSearch query structure
# You may need to adjust this based on your actual knowledge filter query format
if isinstance(query_data, dict) and "query" in query_data:
# If it's already in OpenSearch query format, use it directly
return query_data["query"]
elif isinstance(query_data, dict):
# If it's a direct query object, use it as-is
return query_data
else:
# Fallback to match_all if format is unexpected
return {"match_all": {}}