diff --git a/agent/canvas.py b/agent/canvas.py index 15936f526..70ea6e45c 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -368,7 +368,7 @@ class Canvas(Graph): if kwargs.get("webhook_payload"): for k, cpn in self.components.items(): - if self.components[k]["obj"].component_name.lower() == "webhook": + if self.components[k]["obj"].component_name.lower() == "begin" and self.components[k]["obj"]._param.mode == "Webhook": for kk, vv in kwargs["webhook_payload"].items(): self.components[k]["obj"].set_output(kk, vv) diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index bc83a1cbb..a3426c399 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -15,12 +15,9 @@ # import asyncio -import hashlib -import hmac import ipaddress import json import logging -import re import time from typing import Any, cast @@ -207,9 +204,6 @@ async def webhook(agent_id: str): elif auth_type == "jwt": _validate_jwt_auth(security_cfg) - elif auth_type == "hmac": - await _validate_hmac_auth(security_cfg) - else: raise Exception(f"Unsupported auth_type: {auth_type}") @@ -365,88 +359,85 @@ async def webhook(agent_id: str): return decoded - - async def _validate_hmac_auth(security_cfg): - """Validate HMAC signature from header.""" - hmac_cfg = security_cfg.get("hmac", {}) - header = hmac_cfg.get("header") - secret = hmac_cfg.get("secret") - algorithm = hmac_cfg.get("algorithm", "sha256") - - provided_sig = request.headers.get(header) - if not provided_sig: - raise Exception("Missing HMAC signature header") - - body = await request.get_data() - if body is None: - body = b"" - elif isinstance(body, str): - body = body.encode("utf-8") - - computed = hmac.new(secret.encode(), body, getattr(hashlib, algorithm)).hexdigest() - - if not hmac.compare_digest(provided_sig, computed): - raise Exception("Invalid HMAC signature") - try: security_config=webhook_cfg.get("security", {}) await validate_webhook_security(security_config) except Exception as e: return get_data_error_result(message=str(e)) + if not isinstance(cvs.dsl, str): + dsl = json.dumps(cvs.dsl, ensure_ascii=False) + try: + canvas = Canvas(dsl, cvs.user_id, agent_id) + except Exception as e: + return get_json_result( + data=False, message=str(e), + code=RetCode.EXCEPTION_ERROR) # 7. Parse request body - async def parse_webhook_request(): + async def parse_webhook_request(content_type): """Parse request based on content-type and return structured data.""" - # 1. Parse query parameters - query_data = {} - for k, v in request.args.items(): - query_data[k] = v + # 1. Query + query_data = {k: v for k, v in request.args.items()} - # 2. Parse headers - header_data = {} - for k, v in request.headers.items(): - header_data[k] = v + # 2. Headers + header_data = {k: v for k, v in request.headers.items()} - # 3. Parse body based on content-type + # 3. Body ctype = request.headers.get("Content-Type", "").split(";")[0].strip() - raw_files = {} + if ctype != content_type: + raise ValueError( + f"Invalid Content-Type: expect '{content_type}', got '{ctype or 'empty'}'" + ) - if ctype == "application/json": - try: - body_data = await request.get_json() - except: - body_data = None + body_data: dict = {} - elif ctype == "multipart/form-data": - form = await request.form - files = await request.files - raw_files = {name: file for name, file in files.items()} - body_data = { - "form": dict(form), - "files": {name: file.filename for name, file in files.items()}, - } + try: + if ctype == "application/json": + body_data = await request.get_json() or {} - elif ctype == "application/x-www-form-urlencoded": - form = await request.form - body_data = dict(form) + elif ctype == "multipart/form-data": + nonlocal canvas + form = await request.form + files = await request.files - elif ctype == "text/plain": - body_data = (await request.get_data()).decode() + body_data = {} - elif ctype == "application/octet-stream": - body_data = await request.get_data() # raw binary + for key, value in form.items(): + body_data[key] = value - else: - # unknown content type → raw body - body_data = await request.get_data() + for key, file in files.items(): + desc = FileService.upload_info( + cvs.user_id, # user + file, # FileStorage + None # url (None for webhook) + ) + file_parsed= await canvas.get_files_async([desc]) + body_data[key] = file_parsed + + elif ctype == "application/x-www-form-urlencoded": + form = await request.form + body_data = dict(form) + + else: + # text/plain / octet-stream / empty / unknown + raw = await request.get_data() + if raw: + try: + body_data = json.loads(raw.decode("utf-8")) + except Exception: + body_data = {} + else: + body_data = {} + + except Exception: + body_data = {} return { "query": query_data, "headers": header_data, "body": body_data, "content_type": ctype, - "raw_files": raw_files } def extract_by_schema(data, schema, name="section"): @@ -456,9 +447,6 @@ async def webhook(agent_id: str): Optional fields default to type-based default values. Type validation included. """ - if schema.get("type") != "object": - return {} - props = schema.get("properties", {}) required = schema.get("required", []) @@ -498,7 +486,7 @@ async def webhook(agent_id: str): def default_for_type(t): """Return default value for the given schema type.""" if t == "file": - return "" + return [] if t == "object": return {} if t == "boolean": @@ -578,7 +566,7 @@ async def webhook(agent_id: str): def validate_type(value, t): """Validate value type against schema type t.""" if t == "file": - return isinstance(value, str) + return isinstance(value, list) if t == "string": return isinstance(value, str) @@ -609,60 +597,13 @@ async def webhook(agent_id: str): return True - def extract_files_by_schema(raw_files, schema, name="files"): - """ - Extract and validate files based on schema. - Only supports type = file (single file). - Does NOT support array. - """ - - if schema.get("type") != "object": - return {} - - props = schema.get("properties", {}) - required = schema.get("required", []) - - cleaned = [] - - for field, field_schema in props.items(): - field_type = field_schema.get("type") - - # 1. Required field must exist - if field in required and field not in raw_files: - raise Exception(f"{name} missing required file field: {field}") - - # 2. Ignore fields that are not file - if field_type != "file": - continue - - # 3. Extract single file - file_obj = raw_files.get(field) - - if file_obj: - cleaned.append({ - "field": field, - "file": file_obj - }) - return cleaned - - parsed = await parse_webhook_request() + parsed = await parse_webhook_request(webhook_cfg.get("content_types")) SCHEMA = webhook_cfg.get("schema", {"query": {}, "headers": {}, "body": {}}) # Extract strictly by schema query_clean = extract_by_schema(parsed["query"], SCHEMA.get("query", {}), name="query") header_clean = extract_by_schema(parsed["headers"], SCHEMA.get("headers", {}), name="headers") body_clean = extract_by_schema(parsed["body"], SCHEMA.get("body", {}), name="body") - files_clean = extract_files_by_schema(parsed["raw_files"], SCHEMA.get("body", {}), name="files") - - uploaded_files = [] - for item in files_clean: # each {field, file} - file_obj = item["file"] - desc = FileService.upload_info( - cvs.user_id, # user - file_obj, # FileStorage - None # url (None for webhook) - ) - uploaded_files.append(desc) clean_request = { "query": query_clean, @@ -670,15 +611,6 @@ async def webhook(agent_id: str): "body": body_clean } - if not isinstance(cvs.dsl, str): - dsl = json.dumps(cvs.dsl, ensure_ascii=False) - try: - canvas = Canvas(dsl, cvs.user_id, agent_id) - except Exception as e: - return get_json_result( - data=False, message=str(e), - code=RetCode.EXCEPTION_ERROR) - execution_mode = webhook_cfg.get("execution_mode", "Immediately") response_cfg = webhook_cfg.get("response", {}) @@ -708,7 +640,6 @@ async def webhook(agent_id: str): try: async for _ in canvas.run( query="", - files=uploaded_files, user_id=cvs.user_id, webhook_payload=clean_request ): @@ -723,33 +654,26 @@ async def webhook(agent_id: str): asyncio.create_task(background_run()) return resp else: - async def sse(): nonlocal canvas + contents: list[str] = [] try: async for ans in canvas.run( query="", - files=uploaded_files, user_id=cvs.user_id, - webhook_payload=clean_request + webhook_payload=clean_request, ): - yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n" + if ans.get("event") == "message": + content = (ans.get("data") or {}).get("content") + if content: + contents.append(content) - # save updated canvas - cvs.dsl = json.loads(str(canvas)) - UserCanvasService.update_by_id(cvs.user_id, cvs.to_dict()) + final_content = "".join(contents) + yield json.dumps(final_content, ensure_ascii=False) except Exception as e: - logging.exception(e) - yield "data:" + json.dumps( - {"code": 500, "message": str(e), "data": False}, - ensure_ascii=False - ) + "\n\n" + yield json.dumps({"code": 500, "message": str(e)}, ensure_ascii=False) - resp = Response(sse(), mimetype="text/event-stream") - resp.headers.add_header("Cache-control", "no-cache") - resp.headers.add_header("Connection", "keep-alive") - resp.headers.add_header("X-Accel-Buffering", "no") - resp.headers.add_header("Content-Type", "text/event-stream; charset=utf-8") - return resp + resp = Response(sse(), mimetype="application/json") + return resp \ No newline at end of file