diff --git a/agent/canvas.py b/agent/canvas.py index 2dd0c8999..0ccbbcb8b 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -357,7 +357,7 @@ class Canvas(Graph): else: self.globals[k] = "" print(self.globals) - + async def run(self, **kwargs): st = time.perf_counter() @@ -415,6 +415,7 @@ class Canvas(Graph): loop = asyncio.get_running_loop() tasks = [] + def _run_async_in_thread(coro_func, **call_kwargs): return asyncio.run(coro_func(**call_kwargs)) diff --git a/agent/component/agent_with_tools.py b/agent/component/agent_with_tools.py index 93892a739..e0633e751 100644 --- a/agent/component/agent_with_tools.py +++ b/agent/component/agent_with_tools.py @@ -240,7 +240,7 @@ class Agent(LLM, ToolBase): self.set_output("use_tools", use_tools) return ans - async def invoke_async(self, **kwargs): + async def _invoke_async(self, **kwargs): """ Async entry: reuse existing logic but offload heavy sync parts via async wrappers to reduce blocking. """ diff --git a/agent/component/base.py b/agent/component/base.py index 0864ccb9e..6ac95e09a 100644 --- a/agent/component/base.py +++ b/agent/component/base.py @@ -14,6 +14,7 @@ # limitations under the License. # +import asyncio import re import time from abc import ABC @@ -445,6 +446,34 @@ class ComponentBase(ABC): self.set_output("_elapsed_time", time.perf_counter() - self.output("_created_time")) return self.output() + async def invoke_async(self, **kwargs) -> dict[str, Any]: + """ + Async wrapper for component invocation. + Prefers coroutine `_invoke_async` if present; otherwise falls back to `_invoke`. + Handles timing and error recording consistently with `invoke`. + """ + self.set_output("_created_time", time.perf_counter()) + try: + if self.check_if_canceled("Component processing"): + return + + fn_async = getattr(self, "_invoke_async", None) + if fn_async and asyncio.iscoroutinefunction(fn_async): + await fn_async(**kwargs) + elif asyncio.iscoroutinefunction(self._invoke): + await self._invoke(**kwargs) + else: + await asyncio.to_thread(self._invoke, **kwargs) + except Exception as e: + if self.get_exception_default_value(): + self.set_exception_default_value() + else: + self.set_output("_ERROR", str(e)) + logging.exception(e) + self._param.debug_inputs = {} + self.set_output("_elapsed_time", time.perf_counter() - self.output("_created_time")) + return self.output() + @timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 10*60))) def _invoke(self, **kwargs): raise NotImplementedError() diff --git a/agent/tools/base.py b/agent/tools/base.py index c8554e075..87e3126b1 100644 --- a/agent/tools/base.py +++ b/agent/tools/base.py @@ -155,7 +155,10 @@ class ToolBase(ComponentBase): self.set_output("_created_time", time.perf_counter()) try: - if asyncio.iscoroutinefunction(self._invoke): + fn_async = getattr(self, "_invoke_async", None) + if fn_async and asyncio.iscoroutinefunction(fn_async): + res = await fn_async(**kwargs) + elif asyncio.iscoroutinefunction(self._invoke): res = await self._invoke(**kwargs) else: res = await asyncio.to_thread(self._invoke, **kwargs)