From ab9ffa25bccca650890a522255cc50dcbb2e7b91 Mon Sep 17 00:00:00 2001 From: phact Date: Fri, 22 Aug 2025 01:09:18 -0400 Subject: [PATCH] monitor service and kf subscription WIP --- securityconfig/roles.yml | 10 +- src/api/knowledge_filter.py | 183 ++++++++++++++- src/main.py | 59 +++++ src/services/knowledge_filter_service.py | 102 +++++++- src/services/monitor_service.py | 285 +++++++++++++++++++++++ 5 files changed, 636 insertions(+), 3 deletions(-) create mode 100644 src/services/monitor_service.py diff --git a/securityconfig/roles.yml b/securityconfig/roles.yml index 886f9d4d..555c069a 100644 --- a/securityconfig/roles.yml +++ b/securityconfig/roles.yml @@ -7,6 +7,13 @@ openrag_user_role: cluster_permissions: - "indices:data/write/bulk" - "indices:data/write/index" + - "cluster:admin/opensearch/notifications/configs/create" + - "cluster:admin/opensearch/notifications/configs/list" + - "cluster:admin/opensearch/notifications/configs/get" + - "cluster:admin/opensearch/notifications/configs/update" + - "cluster:admin/opensearch/notifications/configs/delete" + - "cluster:admin/opensearch/alerting/*" + index_permissions: - index_patterns: ["documents", "documents*", "knowledge_filters", "knowledge_filters*"] allowed_actions: @@ -17,4 +24,5 @@ openrag_user_role: {"term":{"owner":"${user.name}"}}, {"term":{"allowed_users":"${user.name}"}}, {"bool":{"must_not":{"exists":{"field":"owner"}}}} - ],"minimum_should_match":1}} \ No newline at end of file + ],"minimum_should_match":1}} + diff --git a/src/api/knowledge_filter.py b/src/api/knowledge_filter.py index 7bc69147..e004ff3d 100644 --- a/src/api/knowledge_filter.py +++ b/src/api/knowledge_filter.py @@ -1,6 +1,7 @@ from starlette.requests import Request from starlette.responses import JSONResponse import uuid +import json from datetime import datetime async def create_knowledge_filter(request: Request, knowledge_filter_service, session_manager): @@ -160,4 +161,184 @@ async def delete_knowledge_filter(request: Request, knowledge_filter_service, se elif "access denied" in error_msg.lower() or "insufficient permissions" in error_msg.lower(): return JSONResponse(result, status_code=403) else: - return JSONResponse(result, status_code=500) \ No newline at end of file + return JSONResponse(result, status_code=500) + +async def subscribe_to_knowledge_filter(request: Request, knowledge_filter_service, monitor_service, session_manager): + """Create a subscription to a knowledge filter""" + filter_id = request.path_params.get("filter_id") + if not filter_id: + return JSONResponse({"error": "Knowledge filter ID is required"}, status_code=400) + + payload = await request.json() + user = request.state.user + jwt_token = request.cookies.get("auth_token") + + # Get the knowledge filter to validate it exists and get its details + filter_result = await knowledge_filter_service.get_knowledge_filter(filter_id, user_id=user.user_id, jwt_token=jwt_token) + if not filter_result.get("success"): + return JSONResponse({"error": "Knowledge filter not found or access denied"}, status_code=404) + + filter_doc = filter_result["filter"] + + # Create the monitor for this subscription + monitor_result = await monitor_service.create_knowledge_filter_monitor( + filter_id=filter_id, + filter_name=filter_doc["name"], + query_data=filter_doc["query_data"], + user_id=user.user_id, + jwt_token=jwt_token, + notification_config=payload.get("notification_config") + ) + + if not monitor_result.get("success"): + return JSONResponse(monitor_result, status_code=500) + + # Store subscription info in the knowledge filter document + subscription_data = { + "subscription_id": monitor_result["subscription_id"], + "monitor_id": monitor_result["monitor_id"], + "webhook_url": monitor_result["webhook_url"], + "created_at": datetime.utcnow().isoformat(), + "notification_config": payload.get("notification_config", {}) + } + + # Add subscription to the filter document + update_result = await knowledge_filter_service.add_subscription( + filter_id, subscription_data, user_id=user.user_id, jwt_token=jwt_token + ) + + if update_result.get("success"): + return JSONResponse({ + "success": True, + "subscription_id": monitor_result["subscription_id"], + "monitor_id": monitor_result["monitor_id"], + "webhook_url": monitor_result["webhook_url"], + "message": f"Successfully subscribed to knowledge filter: {filter_doc['name']}" + }, status_code=201) + else: + # If we can't update the filter doc, clean up the monitor + await monitor_service.delete_monitor(monitor_result["monitor_id"], user.user_id, jwt_token) + return JSONResponse({"error": "Failed to create subscription"}, status_code=500) + +async def list_knowledge_filter_subscriptions(request: Request, knowledge_filter_service, session_manager): + """List subscriptions for a knowledge filter""" + filter_id = request.path_params.get("filter_id") + if not filter_id: + return JSONResponse({"error": "Knowledge filter ID is required"}, status_code=400) + + user = request.state.user + jwt_token = request.cookies.get("auth_token") + + result = await knowledge_filter_service.get_filter_subscriptions(filter_id, user_id=user.user_id, jwt_token=jwt_token) + + if result.get("success"): + return JSONResponse(result, status_code=200) + else: + error_msg = result.get("error", "") + if "not found" in error_msg.lower(): + return JSONResponse(result, status_code=404) + elif "access denied" in error_msg.lower(): + return JSONResponse(result, status_code=403) + else: + return JSONResponse(result, status_code=500) + +async def cancel_knowledge_filter_subscription(request: Request, knowledge_filter_service, monitor_service, session_manager): + """Cancel a subscription to a knowledge filter""" + filter_id = request.path_params.get("filter_id") + subscription_id = request.path_params.get("subscription_id") + + if not filter_id or not subscription_id: + return JSONResponse({"error": "Knowledge filter ID and subscription ID are required"}, status_code=400) + + user = request.state.user + jwt_token = request.cookies.get("auth_token") + + # Get subscription details to find the monitor ID + subscriptions_result = await knowledge_filter_service.get_filter_subscriptions(filter_id, user_id=user.user_id, jwt_token=jwt_token) + if not subscriptions_result.get("success"): + return JSONResponse({"error": "Knowledge filter not found or access denied"}, status_code=404) + + # Find the specific subscription + subscription = None + for sub in subscriptions_result.get("subscriptions", []): + if sub.get("subscription_id") == subscription_id: + subscription = sub + break + + if not subscription: + return JSONResponse({"error": "Subscription not found"}, status_code=404) + + # Delete the monitor + monitor_result = await monitor_service.delete_monitor( + subscription["monitor_id"], user.user_id, jwt_token + ) + + # Remove subscription from the filter document + remove_result = await knowledge_filter_service.remove_subscription( + filter_id, subscription_id, user_id=user.user_id, jwt_token=jwt_token + ) + + if remove_result.get("success"): + return JSONResponse({ + "success": True, + "message": "Subscription cancelled successfully" + }, status_code=200) + else: + return JSONResponse({"error": "Failed to cancel subscription"}, status_code=500) + +async def knowledge_filter_webhook(request: Request, knowledge_filter_service, session_manager): + """Handle webhook notifications from OpenSearch monitors""" + filter_id = request.path_params.get("filter_id") + subscription_id = request.path_params.get("subscription_id") + + if not filter_id or not subscription_id: + return JSONResponse({"error": "Invalid webhook URL"}, status_code=400) + + try: + # Get the webhook payload + payload = await request.json() + + print(f"[WEBHOOK] Knowledge filter webhook received for filter {filter_id}, subscription {subscription_id}") + print(f"[WEBHOOK] Payload: {json.dumps(payload, indent=2)}") + + # Extract findings from the payload + findings = payload.get("findings", []) + if not findings: + print(f"[WEBHOOK] No findings in webhook payload for subscription {subscription_id}") + return JSONResponse({"status": "no_findings"}) + + # Process the findings - these are the documents that matched the knowledge filter + matched_documents = [] + for finding in findings: + # Extract document information from the finding + matched_documents.append({ + "document_id": finding.get("_id"), + "index": finding.get("_index"), + "source": finding.get("_source", {}), + "score": finding.get("_score") + }) + + # Log the matched documents + print(f"[WEBHOOK] Knowledge filter {filter_id} matched {len(matched_documents)} documents") + for doc in matched_documents: + print(f"[WEBHOOK] Matched document: {doc['document_id']} from index {doc['index']}") + + # Here you could add additional processing: + # - Send notifications to external webhooks + # - Email notifications + # - Store alerts in a database + # - Trigger other workflows + + return JSONResponse({ + "status": "processed", + "filter_id": filter_id, + "subscription_id": subscription_id, + "matched_documents": len(matched_documents), + "timestamp": datetime.utcnow().isoformat() + }) + + except Exception as e: + print(f"[ERROR] Failed to process knowledge filter webhook: {str(e)}") + import traceback + traceback.print_exc() + return JSONResponse({"error": f"Webhook processing failed: {str(e)}"}, status_code=500) \ No newline at end of file diff --git a/src/main.py b/src/main.py index 30a5538d..5bfd59ee 100644 --- a/src/main.py +++ b/src/main.py @@ -26,6 +26,7 @@ from services.task_service import TaskService from services.auth_service import AuthService from services.chat_service import ChatService from services.knowledge_filter_service import KnowledgeFilterService +from services.monitor_service import MonitorService # Existing services from connectors.service import ConnectorService @@ -55,6 +56,27 @@ async def wait_for_opensearch(): else: raise Exception("OpenSearch failed to become ready") +async def configure_alerting_security(): + """Configure OpenSearch alerting plugin security settings""" + try: + # For testing, disable backend role filtering to allow all authenticated users + # In production, you'd want to configure proper roles instead + alerting_settings = { + "persistent": { + "plugins.alerting.filter_by_backend_roles": "false", + "opendistro.alerting.filter_by_backend_roles": "false", + "opensearch.notifications.general.filter_by_backend_roles": "false" + } + } + + # Use admin client (clients.opensearch uses admin credentials) + response = await clients.opensearch.cluster.put_settings(body=alerting_settings) + print("Alerting security settings configured successfully") + print(f"Response: {response}") + except Exception as e: + print(f"Warning: Failed to configure alerting security settings: {e}") + # Don't fail startup if alerting config fails + async def init_index(): """Initialize OpenSearch index and security roles""" await wait_for_opensearch() @@ -78,6 +100,7 @@ async def init_index(): "owner": {"type": "keyword"}, "allowed_users": {"type": "keyword"}, "allowed_groups": {"type": "keyword"}, + "subscriptions": {"type": "object"}, # Store subscription data "created_at": {"type": "date"}, "updated_at": {"type": "date"} } @@ -89,6 +112,9 @@ async def init_index(): print(f"Created index '{knowledge_filter_index_name}'") else: print(f"Index '{knowledge_filter_index_name}' already exists, skipping creation.") + + # Configure alerting plugin security settings + await configure_alerting_security() def generate_jwt_keys(): """Generate RSA keys for JWT signing if they don't exist""" @@ -146,6 +172,7 @@ def initialize_services(): task_service = TaskService(document_service, process_pool) chat_service = ChatService() knowledge_filter_service = KnowledgeFilterService(session_manager) + monitor_service = MonitorService(session_manager) # Set process pool for document service document_service.process_pool = process_pool @@ -171,6 +198,7 @@ def initialize_services(): 'auth_service': auth_service, 'connector_service': connector_service, 'knowledge_filter_service': knowledge_filter_service, + 'monitor_service': monitor_service, 'session_manager': session_manager } @@ -281,6 +309,37 @@ def create_app(): session_manager=services['session_manager']) ), methods=["DELETE"]), + # Knowledge Filter Subscription endpoints + Route("/knowledge-filter/{filter_id}/subscribe", + require_auth(services['session_manager'])( + partial(knowledge_filter.subscribe_to_knowledge_filter, + knowledge_filter_service=services['knowledge_filter_service'], + monitor_service=services['monitor_service'], + session_manager=services['session_manager']) + ), methods=["POST"]), + + Route("/knowledge-filter/{filter_id}/subscriptions", + require_auth(services['session_manager'])( + partial(knowledge_filter.list_knowledge_filter_subscriptions, + knowledge_filter_service=services['knowledge_filter_service'], + session_manager=services['session_manager']) + ), methods=["GET"]), + + Route("/knowledge-filter/{filter_id}/subscribe/{subscription_id}", + require_auth(services['session_manager'])( + partial(knowledge_filter.cancel_knowledge_filter_subscription, + knowledge_filter_service=services['knowledge_filter_service'], + monitor_service=services['monitor_service'], + session_manager=services['session_manager']) + ), methods=["DELETE"]), + + # Knowledge Filter Webhook endpoint (no auth required - called by OpenSearch) + Route("/knowledge-filter/{filter_id}/webhook/{subscription_id}", + partial(knowledge_filter.knowledge_filter_webhook, + knowledge_filter_service=services['knowledge_filter_service'], + session_manager=services['session_manager']), + methods=["POST"]), + # Chat endpoints Route("/chat", require_auth(services['session_manager'])( diff --git a/src/services/knowledge_filter_service.py b/src/services/knowledge_filter_service.py index c409768c..5162208e 100644 --- a/src/services/knowledge_filter_service.py +++ b/src/services/knowledge_filter_service.py @@ -134,4 +134,104 @@ class KnowledgeFilterService: elif "AuthenticationException" in error_str: return {"success": False, "error": "Access denied: insufficient permissions"} else: - return {"success": False, "error": f"Delete operation failed: {error_str}"} \ No newline at end of file + return {"success": False, "error": f"Delete operation failed: {error_str}"} + + async def add_subscription(self, filter_id: str, subscription_data: Dict[str, Any], user_id: str = None, jwt_token: str = None) -> Dict[str, Any]: + """Add a subscription to a knowledge filter""" + try: + opensearch_client = self.session_manager.get_user_opensearch_client(user_id, jwt_token) + + # Get the current filter document + filter_result = await self.get_knowledge_filter(filter_id, user_id, jwt_token) + if not filter_result.get("success"): + return filter_result + + filter_doc = filter_result["filter"] + + # Add subscription to the subscriptions array + subscriptions = filter_doc.get("subscriptions", []) + subscriptions.append(subscription_data) + + # Update the filter document + update_body = { + "doc": { + "subscriptions": subscriptions, + "updated_at": subscription_data["created_at"] # Use the same timestamp + } + } + + result = await opensearch_client.update( + index=KNOWLEDGE_FILTERS_INDEX_NAME, + id=filter_id, + body=update_body + ) + + if result.get("result") in ["updated", "noop"]: + return {"success": True, "subscription": subscription_data} + else: + return {"success": False, "error": "Failed to add subscription"} + + except Exception as e: + return {"success": False, "error": str(e)} + + async def remove_subscription(self, filter_id: str, subscription_id: str, user_id: str = None, jwt_token: str = None) -> Dict[str, Any]: + """Remove a subscription from a knowledge filter""" + try: + opensearch_client = self.session_manager.get_user_opensearch_client(user_id, jwt_token) + + # Get the current filter document + filter_result = await self.get_knowledge_filter(filter_id, user_id, jwt_token) + if not filter_result.get("success"): + return filter_result + + filter_doc = filter_result["filter"] + + # Remove subscription from the subscriptions array + subscriptions = filter_doc.get("subscriptions", []) + updated_subscriptions = [sub for sub in subscriptions if sub.get("subscription_id") != subscription_id] + + if len(updated_subscriptions) == len(subscriptions): + return {"success": False, "error": "Subscription not found"} + + # Update the filter document + from datetime import datetime + update_body = { + "doc": { + "subscriptions": updated_subscriptions, + "updated_at": datetime.utcnow().isoformat() + } + } + + result = await opensearch_client.update( + index=KNOWLEDGE_FILTERS_INDEX_NAME, + id=filter_id, + body=update_body + ) + + if result.get("result") in ["updated", "noop"]: + return {"success": True, "message": "Subscription removed successfully"} + else: + return {"success": False, "error": "Failed to remove subscription"} + + except Exception as e: + return {"success": False, "error": str(e)} + + async def get_filter_subscriptions(self, filter_id: str, user_id: str = None, jwt_token: str = None) -> Dict[str, Any]: + """Get all subscriptions for a knowledge filter""" + try: + filter_result = await self.get_knowledge_filter(filter_id, user_id, jwt_token) + if not filter_result.get("success"): + return filter_result + + filter_doc = filter_result["filter"] + subscriptions = filter_doc.get("subscriptions", []) + + return { + "success": True, + "filter_id": filter_id, + "filter_name": filter_doc.get("name"), + "subscriptions": subscriptions + } + + except Exception as e: + return {"success": False, "error": str(e), "subscriptions": []} \ No newline at end of file diff --git a/src/services/monitor_service.py b/src/services/monitor_service.py new file mode 100644 index 00000000..6948fec4 --- /dev/null +++ b/src/services/monitor_service.py @@ -0,0 +1,285 @@ +import uuid +import json +from typing import Any, Dict, Optional, List +from datetime import datetime +from config.settings import clients + +class MonitorService: + def __init__(self, session_manager=None, webhook_base_url: str = None): + self.session_manager = session_manager + self.webhook_base_url = webhook_base_url or "http://openrag-backend:8000" + + async def create_knowledge_filter_monitor( + self, + filter_id: str, + filter_name: str, + query_data: Dict[str, Any], + user_id: str, + jwt_token: str, + notification_config: Dict[str, Any] = None + ) -> Dict[str, Any]: + """Create a document-level monitor for a knowledge filter""" + try: + opensearch_client = self.session_manager.get_user_opensearch_client(user_id, jwt_token) + + subscription_id = str(uuid.uuid4()) + webhook_url = f"{self.webhook_base_url}/knowledge-filter/{filter_id}/webhook/{subscription_id}" + + # Convert knowledge filter query to monitor query format + monitor_query = self._convert_kf_query_to_monitor_query(query_data) + + # TODO: OpenSearch 3.0 has a bug with document-level monitors on indexes with KNN fields + # Error: "Cannot invoke KNNMethodConfigContext.getVectorDataType() because knnMethodConfigContext is null" + # Consider using query-level monitors instead or excluding KNN fields from doc-level monitors + # For now, this will fail on the 'documents' index due to chunk_embedding KNN field + + # Create the document-level monitor + monitor_body = { + "type": "monitor", + "monitor_type": "doc_level_monitor", + "name": f"KF Monitor: {filter_name}", + "enabled": True, + "schedule": { + "period": { + "interval": 1, + "unit": "MINUTES" + } + }, + "inputs": [ + { + "doc_level_input": { + "description": f"Monitor for knowledge filter: {filter_name}", + "indices": ["documents"], + "queries": [ + { + "id": f"kf_query_{filter_id}", + "name": f"Knowledge Filter Query: {filter_name}", + "query": monitor_query, + "tags": [f"knowledge_filter:{filter_id}", f"user:{user_id}"] + } + ] + } + } + ], + "triggers": [ + { + "document_level_trigger": { + "name": f"KF Trigger: {filter_name}", + "severity": "1", + "condition": { + "script": { + "source": "return true", + "lang": "painless" + } + }, + "actions": [ + { + "name": f"KF Webhook Action: {filter_name}", + "destination_id": await self._get_or_create_webhook_destination( + webhook_url, opensearch_client + ), + "subject_template": { + "source": f"Knowledge Filter Alert: {filter_name}", + "lang": "mustache" + }, + "message_template": { + "source": json.dumps({ + "filter_id": filter_id, + "filter_name": filter_name, + "subscription_id": subscription_id, + "user_id": user_id, + "timestamp": "{{ctx.trigger.timestamp}}", + "findings": "{{ctx.results.0.hits.hits}}" + }), + "lang": "mustache" + } + } + ] + } + } + ] + } + + # Create the monitor + response = await opensearch_client.transport.perform_request( + "POST", + "/_plugins/_alerting/monitors", + body=monitor_body + ) + + if response.get("_id"): + return { + "success": True, + "monitor_id": response["_id"], + "subscription_id": subscription_id, + "webhook_url": webhook_url, + "message": f"Monitor created successfully for knowledge filter: {filter_name}" + } + else: + return {"success": False, "error": "Failed to create monitor"} + + except Exception as e: + return {"success": False, "error": f"Monitor creation failed: {str(e)}"} + + async def delete_monitor(self, monitor_id: str, user_id: str, jwt_token: str) -> Dict[str, Any]: + """Delete a document-level monitor""" + try: + opensearch_client = self.session_manager.get_user_opensearch_client(user_id, jwt_token) + + response = await opensearch_client.transport.perform_request( + "DELETE", + f"/_plugins/_alerting/monitors/{monitor_id}" + ) + + if response.get("result") == "deleted": + return {"success": True, "message": "Monitor deleted successfully"} + else: + return {"success": False, "error": "Failed to delete monitor"} + + except Exception as e: + return {"success": False, "error": f"Monitor deletion failed: {str(e)}"} + + async def get_monitor(self, monitor_id: str, user_id: str, jwt_token: str) -> Dict[str, Any]: + """Get monitor details""" + try: + opensearch_client = self.session_manager.get_user_opensearch_client(user_id, jwt_token) + + response = await opensearch_client.transport.perform_request( + "GET", + f"/_plugins/_alerting/monitors/{monitor_id}" + ) + + if response.get("_id"): + return {"success": True, "monitor": response} + else: + return {"success": False, "error": "Monitor not found"} + + except Exception as e: + return {"success": False, "error": f"Failed to get monitor: {str(e)}"} + + async def list_user_monitors(self, user_id: str, jwt_token: str, limit: int = 50) -> List[Dict[str, Any]]: + """List all monitors for a specific user""" + try: + opensearch_client = self.session_manager.get_user_opensearch_client(user_id, jwt_token) + + # Search for all monitors (DLS will filter to user's monitors automatically) + search_body = { + "query": { + "bool": { + "must": [ + {"term": {"monitor.type": "doc_level_monitor"}} + ] + } + }, + "sort": [{"monitor.last_update_time": {"order": "desc"}}], + "size": limit + } + + response = await opensearch_client.search( + index=".opendistro-alerting-config", + body=search_body + ) + + monitors = [] + for hit in response.get("hits", {}).get("hits", []): + monitor_data = hit["_source"] + monitor_data["monitor_id"] = hit["_id"] + monitors.append(monitor_data) + + return monitors + + except Exception as e: + print(f"Error listing monitors for user {user_id}: {e}") + return [] + + async def list_monitors_for_filter(self, filter_id: str, user_id: str, jwt_token: str) -> List[Dict[str, Any]]: + """List all monitors for a specific knowledge filter""" + try: + opensearch_client = self.session_manager.get_user_opensearch_client(user_id, jwt_token) + + # Search for monitors with the knowledge filter tag + search_body = { + "query": { + "bool": { + "must": [ + {"term": {"monitor.type": "doc_level_monitor"}}, + {"term": {"monitor.inputs.doc_level_input.queries.tags": f"knowledge_filter:{filter_id}"}} + ] + } + } + } + + response = await opensearch_client.search( + index=".opendistro-alerting-config", + body=search_body + ) + + monitors = [] + for hit in response.get("hits", {}).get("hits", []): + monitor_data = hit["_source"] + monitor_data["monitor_id"] = hit["_id"] + monitors.append(monitor_data) + + return monitors + + except Exception as e: + print(f"Error listing monitors for filter {filter_id}: {e}") + return [] + + async def _get_or_create_webhook_destination(self, webhook_url: str, opensearch_client) -> str: + """Get or create a webhook destination for notifications""" + try: + # Try to find existing webhook destination + search_response = await opensearch_client.transport.perform_request( + "GET", + "/_plugins/_notifications/configs", + params={"config_type": "webhook"} + ) + + # Check if we already have a destination for this webhook URL + for config in search_response.get("config_list", []): + if config.get("config", {}).get("webhook", {}).get("url") == webhook_url: + return config["config_id"] + + # Create new webhook destination + destination_body = { + "config": { + "name": f"KF Webhook {str(uuid.uuid4())[:8]}", + "description": "Knowledge Filter webhook notification", + "config_type": "webhook", + "is_enabled": True, + "webhook": { + "url": webhook_url, + "method": "POST", + "header_params": { + "Content-Type": "application/json" + } + } + } + } + + response = await opensearch_client.transport.perform_request( + "POST", + "/_plugins/_notifications/configs", + body=destination_body + ) + + return response.get("config_id") + + except Exception as e: + raise Exception(f"Failed to create webhook destination: {str(e)}") + + def _convert_kf_query_to_monitor_query(self, query_data: Dict[str, Any]) -> Dict[str, Any]: + """Convert knowledge filter query format to OpenSearch monitor query format""" + # This assumes the query_data contains an OpenSearch query structure + # You may need to adjust this based on your actual knowledge filter query format + + if isinstance(query_data, dict) and "query" in query_data: + # If it's already in OpenSearch query format, use it directly + return query_data["query"] + elif isinstance(query_data, dict): + # If it's a direct query object, use it as-is + return query_data + else: + # Fallback to match_all if format is unexpected + return {"match_all": {}} \ No newline at end of file