This commit is contained in:
buua436 2025-12-15 13:26:03 +08:00
parent af787b4c36
commit 9e904c254a
2 changed files with 70 additions and 146 deletions

View file

@ -368,7 +368,7 @@ class Canvas(Graph):
if kwargs.get("webhook_payload"): if kwargs.get("webhook_payload"):
for k, cpn in self.components.items(): for k, cpn in self.components.items():
if self.components[k]["obj"].component_name.lower() == "webhook": if self.components[k]["obj"].component_name.lower() == "begin" and self.components[k]["obj"]._param.mode == "Webhook":
for kk, vv in kwargs["webhook_payload"].items(): for kk, vv in kwargs["webhook_payload"].items():
self.components[k]["obj"].set_output(kk, vv) self.components[k]["obj"].set_output(kk, vv)

View file

@ -15,12 +15,9 @@
# #
import asyncio import asyncio
import hashlib
import hmac
import ipaddress import ipaddress
import json import json
import logging import logging
import re
import time import time
from typing import Any, cast from typing import Any, cast
@ -207,9 +204,6 @@ async def webhook(agent_id: str):
elif auth_type == "jwt": elif auth_type == "jwt":
_validate_jwt_auth(security_cfg) _validate_jwt_auth(security_cfg)
elif auth_type == "hmac":
await _validate_hmac_auth(security_cfg)
else: else:
raise Exception(f"Unsupported auth_type: {auth_type}") raise Exception(f"Unsupported auth_type: {auth_type}")
@ -365,88 +359,85 @@ async def webhook(agent_id: str):
return decoded return decoded
async def _validate_hmac_auth(security_cfg):
"""Validate HMAC signature from header."""
hmac_cfg = security_cfg.get("hmac", {})
header = hmac_cfg.get("header")
secret = hmac_cfg.get("secret")
algorithm = hmac_cfg.get("algorithm", "sha256")
provided_sig = request.headers.get(header)
if not provided_sig:
raise Exception("Missing HMAC signature header")
body = await request.get_data()
if body is None:
body = b""
elif isinstance(body, str):
body = body.encode("utf-8")
computed = hmac.new(secret.encode(), body, getattr(hashlib, algorithm)).hexdigest()
if not hmac.compare_digest(provided_sig, computed):
raise Exception("Invalid HMAC signature")
try: try:
security_config=webhook_cfg.get("security", {}) security_config=webhook_cfg.get("security", {})
await validate_webhook_security(security_config) await validate_webhook_security(security_config)
except Exception as e: except Exception as e:
return get_data_error_result(message=str(e)) return get_data_error_result(message=str(e))
if not isinstance(cvs.dsl, str):
dsl = json.dumps(cvs.dsl, ensure_ascii=False)
try:
canvas = Canvas(dsl, cvs.user_id, agent_id)
except Exception as e:
return get_json_result(
data=False, message=str(e),
code=RetCode.EXCEPTION_ERROR)
# 7. Parse request body # 7. Parse request body
async def parse_webhook_request(): async def parse_webhook_request(content_type):
"""Parse request based on content-type and return structured data.""" """Parse request based on content-type and return structured data."""
# 1. Parse query parameters # 1. Query
query_data = {} query_data = {k: v for k, v in request.args.items()}
for k, v in request.args.items():
query_data[k] = v
# 2. Parse headers # 2. Headers
header_data = {} header_data = {k: v for k, v in request.headers.items()}
for k, v in request.headers.items():
header_data[k] = v
# 3. Parse body based on content-type # 3. Body
ctype = request.headers.get("Content-Type", "").split(";")[0].strip() ctype = request.headers.get("Content-Type", "").split(";")[0].strip()
raw_files = {} if ctype != content_type:
raise ValueError(
f"Invalid Content-Type: expect '{content_type}', got '{ctype or 'empty'}'"
)
if ctype == "application/json": body_data: dict = {}
try:
body_data = await request.get_json()
except:
body_data = None
elif ctype == "multipart/form-data": try:
form = await request.form if ctype == "application/json":
files = await request.files body_data = await request.get_json() or {}
raw_files = {name: file for name, file in files.items()}
body_data = {
"form": dict(form),
"files": {name: file.filename for name, file in files.items()},
}
elif ctype == "application/x-www-form-urlencoded": elif ctype == "multipart/form-data":
form = await request.form nonlocal canvas
body_data = dict(form) form = await request.form
files = await request.files
elif ctype == "text/plain": body_data = {}
body_data = (await request.get_data()).decode()
elif ctype == "application/octet-stream": for key, value in form.items():
body_data = await request.get_data() # raw binary body_data[key] = value
else: for key, file in files.items():
# unknown content type → raw body desc = FileService.upload_info(
body_data = await request.get_data() cvs.user_id, # user
file, # FileStorage
None # url (None for webhook)
)
file_parsed= await canvas.get_files_async([desc])
body_data[key] = file_parsed
elif ctype == "application/x-www-form-urlencoded":
form = await request.form
body_data = dict(form)
else:
# text/plain / octet-stream / empty / unknown
raw = await request.get_data()
if raw:
try:
body_data = json.loads(raw.decode("utf-8"))
except Exception:
body_data = {}
else:
body_data = {}
except Exception:
body_data = {}
return { return {
"query": query_data, "query": query_data,
"headers": header_data, "headers": header_data,
"body": body_data, "body": body_data,
"content_type": ctype, "content_type": ctype,
"raw_files": raw_files
} }
def extract_by_schema(data, schema, name="section"): def extract_by_schema(data, schema, name="section"):
@ -456,9 +447,6 @@ async def webhook(agent_id: str):
Optional fields default to type-based default values. Optional fields default to type-based default values.
Type validation included. Type validation included.
""" """
if schema.get("type") != "object":
return {}
props = schema.get("properties", {}) props = schema.get("properties", {})
required = schema.get("required", []) required = schema.get("required", [])
@ -498,7 +486,7 @@ async def webhook(agent_id: str):
def default_for_type(t): def default_for_type(t):
"""Return default value for the given schema type.""" """Return default value for the given schema type."""
if t == "file": if t == "file":
return "" return []
if t == "object": if t == "object":
return {} return {}
if t == "boolean": if t == "boolean":
@ -578,7 +566,7 @@ async def webhook(agent_id: str):
def validate_type(value, t): def validate_type(value, t):
"""Validate value type against schema type t.""" """Validate value type against schema type t."""
if t == "file": if t == "file":
return isinstance(value, str) return isinstance(value, list)
if t == "string": if t == "string":
return isinstance(value, str) return isinstance(value, str)
@ -609,60 +597,13 @@ async def webhook(agent_id: str):
return True return True
def extract_files_by_schema(raw_files, schema, name="files"): parsed = await parse_webhook_request(webhook_cfg.get("content_types"))
"""
Extract and validate files based on schema.
Only supports type = file (single file).
Does NOT support array<file>.
"""
if schema.get("type") != "object":
return {}
props = schema.get("properties", {})
required = schema.get("required", [])
cleaned = []
for field, field_schema in props.items():
field_type = field_schema.get("type")
# 1. Required field must exist
if field in required and field not in raw_files:
raise Exception(f"{name} missing required file field: {field}")
# 2. Ignore fields that are not file
if field_type != "file":
continue
# 3. Extract single file
file_obj = raw_files.get(field)
if file_obj:
cleaned.append({
"field": field,
"file": file_obj
})
return cleaned
parsed = await parse_webhook_request()
SCHEMA = webhook_cfg.get("schema", {"query": {}, "headers": {}, "body": {}}) SCHEMA = webhook_cfg.get("schema", {"query": {}, "headers": {}, "body": {}})
# Extract strictly by schema # Extract strictly by schema
query_clean = extract_by_schema(parsed["query"], SCHEMA.get("query", {}), name="query") query_clean = extract_by_schema(parsed["query"], SCHEMA.get("query", {}), name="query")
header_clean = extract_by_schema(parsed["headers"], SCHEMA.get("headers", {}), name="headers") header_clean = extract_by_schema(parsed["headers"], SCHEMA.get("headers", {}), name="headers")
body_clean = extract_by_schema(parsed["body"], SCHEMA.get("body", {}), name="body") body_clean = extract_by_schema(parsed["body"], SCHEMA.get("body", {}), name="body")
files_clean = extract_files_by_schema(parsed["raw_files"], SCHEMA.get("body", {}), name="files")
uploaded_files = []
for item in files_clean: # each {field, file}
file_obj = item["file"]
desc = FileService.upload_info(
cvs.user_id, # user
file_obj, # FileStorage
None # url (None for webhook)
)
uploaded_files.append(desc)
clean_request = { clean_request = {
"query": query_clean, "query": query_clean,
@ -670,15 +611,6 @@ async def webhook(agent_id: str):
"body": body_clean "body": body_clean
} }
if not isinstance(cvs.dsl, str):
dsl = json.dumps(cvs.dsl, ensure_ascii=False)
try:
canvas = Canvas(dsl, cvs.user_id, agent_id)
except Exception as e:
return get_json_result(
data=False, message=str(e),
code=RetCode.EXCEPTION_ERROR)
execution_mode = webhook_cfg.get("execution_mode", "Immediately") execution_mode = webhook_cfg.get("execution_mode", "Immediately")
response_cfg = webhook_cfg.get("response", {}) response_cfg = webhook_cfg.get("response", {})
@ -708,7 +640,6 @@ async def webhook(agent_id: str):
try: try:
async for _ in canvas.run( async for _ in canvas.run(
query="", query="",
files=uploaded_files,
user_id=cvs.user_id, user_id=cvs.user_id,
webhook_payload=clean_request webhook_payload=clean_request
): ):
@ -723,33 +654,26 @@ async def webhook(agent_id: str):
asyncio.create_task(background_run()) asyncio.create_task(background_run())
return resp return resp
else: else:
async def sse(): async def sse():
nonlocal canvas nonlocal canvas
contents: list[str] = []
try: try:
async for ans in canvas.run( async for ans in canvas.run(
query="", query="",
files=uploaded_files,
user_id=cvs.user_id, user_id=cvs.user_id,
webhook_payload=clean_request webhook_payload=clean_request,
): ):
yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n" if ans.get("event") == "message":
content = (ans.get("data") or {}).get("content")
if content:
contents.append(content)
# save updated canvas final_content = "".join(contents)
cvs.dsl = json.loads(str(canvas)) yield json.dumps(final_content, ensure_ascii=False)
UserCanvasService.update_by_id(cvs.user_id, cvs.to_dict())
except Exception as e: except Exception as e:
logging.exception(e) yield json.dumps({"code": 500, "message": str(e)}, ensure_ascii=False)
yield "data:" + json.dumps(
{"code": 500, "message": str(e), "data": False},
ensure_ascii=False
) + "\n\n"
resp = Response(sse(), mimetype="text/event-stream") resp = Response(sse(), mimetype="application/json")
resp.headers.add_header("Cache-control", "no-cache") return resp
resp.headers.add_header("Connection", "keep-alive")
resp.headers.add_header("X-Accel-Buffering", "no")
resp.headers.add_header("Content-Type", "text/event-stream; charset=utf-8")
return resp