cognee/cognee/infrastructure/utils/run_sync.py
Boris 351deb0314
fix: UI (#1397)
<!-- .github/pull_request_template.md -->

## Description
<!-- 
Please provide a clear, human-generated description of the changes in
this PR.
DO NOT use AI-generated descriptions. We want to understand your thought
process and reasoning.
-->

## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [ ] Performance improvement
- [ ] Other (please specify):

## Changes Made
<!-- List the specific changes made in this PR -->
- 
- 
- 

## Testing
<!-- Describe how you tested your changes -->

## Screenshots/Videos (if applicable)
<!-- Add screenshots or videos to help explain your changes -->

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [ ] **I have tested my changes thoroughly before submitting this PR**
- [ ] **This PR contains minimal changes necessary to address the
issue/feature**
- [ ] My code follows the project's coding standards and style
guidelines
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added necessary documentation (if applicable)
- [ ] All new and existing tests pass
- [ ] I have searched existing PRs to ensure this change hasn't been
submitted already
- [ ] I have linked any relevant issues in the description
- [ ] My commits have clear and descriptive messages

## Related Issues
<!-- Link any related issues using "Fixes #issue_number" or "Relates to
#issue_number" -->

## Additional Notes
<!-- Add any additional notes, concerns, or context for reviewers -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
2025-09-12 20:06:44 +02:00

33 lines
800 B
Python

import asyncio
import threading
def run_sync(coro, running_loop=None, timeout=None):
result = None
exception = None
def runner():
nonlocal result, exception, running_loop
try:
try:
if not running_loop:
running_loop = asyncio.get_running_loop()
result = asyncio.run_coroutine_threadsafe(coro, running_loop).result(timeout)
except RuntimeError:
result = asyncio.run(coro)
except Exception as e:
exception = e
thread = threading.Thread(target=runner)
thread.start()
thread.join(timeout)
if thread.is_alive():
raise asyncio.TimeoutError("Coroutine execution timed out.")
if exception:
raise exception
return result