add invoke_async for component base and tools base

This commit is contained in:
yongtenglei 2025-12-03 13:28:27 +08:00
parent 5646d2b4c9
commit 1c21aa480d
4 changed files with 36 additions and 3 deletions

View file

@ -357,7 +357,7 @@ class Canvas(Graph):
else:
self.globals[k] = ""
print(self.globals)
async def run(self, **kwargs):
st = time.perf_counter()
@ -415,6 +415,7 @@ class Canvas(Graph):
loop = asyncio.get_running_loop()
tasks = []
def _run_async_in_thread(coro_func, **call_kwargs):
return asyncio.run(coro_func(**call_kwargs))

View file

@ -240,7 +240,7 @@ class Agent(LLM, ToolBase):
self.set_output("use_tools", use_tools)
return ans
async def invoke_async(self, **kwargs):
async def _invoke_async(self, **kwargs):
"""
Async entry: reuse existing logic but offload heavy sync parts via async wrappers to reduce blocking.
"""

View file

@ -14,6 +14,7 @@
# limitations under the License.
#
import asyncio
import re
import time
from abc import ABC
@ -445,6 +446,34 @@ class ComponentBase(ABC):
self.set_output("_elapsed_time", time.perf_counter() - self.output("_created_time"))
return self.output()
async def invoke_async(self, **kwargs) -> dict[str, Any]:
"""
Async wrapper for component invocation.
Prefers coroutine `_invoke_async` if present; otherwise falls back to `_invoke`.
Handles timing and error recording consistently with `invoke`.
"""
self.set_output("_created_time", time.perf_counter())
try:
if self.check_if_canceled("Component processing"):
return
fn_async = getattr(self, "_invoke_async", None)
if fn_async and asyncio.iscoroutinefunction(fn_async):
await fn_async(**kwargs)
elif asyncio.iscoroutinefunction(self._invoke):
await self._invoke(**kwargs)
else:
await asyncio.to_thread(self._invoke, **kwargs)
except Exception as e:
if self.get_exception_default_value():
self.set_exception_default_value()
else:
self.set_output("_ERROR", str(e))
logging.exception(e)
self._param.debug_inputs = {}
self.set_output("_elapsed_time", time.perf_counter() - self.output("_created_time"))
return self.output()
@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 10*60)))
def _invoke(self, **kwargs):
raise NotImplementedError()

View file

@ -155,7 +155,10 @@ class ToolBase(ComponentBase):
self.set_output("_created_time", time.perf_counter())
try:
if asyncio.iscoroutinefunction(self._invoke):
fn_async = getattr(self, "_invoke_async", None)
if fn_async and asyncio.iscoroutinefunction(fn_async):
res = await fn_async(**kwargs)
elif asyncio.iscoroutinefunction(self._invoke):
res = await self._invoke(**kwargs)
else:
res = await asyncio.to_thread(self._invoke, **kwargs)