<!-- .github/pull_request_template.md --> ## Description This PR fixes distributed pipeline + updates core changes in distr logic. ## Type of Change <!-- Please check the relevant option --> - [x] Bug fix (non-breaking change that fixes an issue) - [x] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update - [x] Code refactoring - [x] Performance improvement - [ ] Other (please specify): ## Changes Made Fixes distributed pipeline: -Changed spawning logic + adds incremental loading to run_tasks_diistributed -Adds batching to consumer nodes -Fixes consumer stopping criteria by adding stop signal + handling -Changed edge embedding solution to avoid huge network load in a case of a multicontainer environment ## Testing Tested it by running 1GB on modal + manually ## Screenshots/Videos (if applicable) None ## Pre-submission Checklist <!-- Please check all boxes that apply before submitting your PR --> - [x] **I have tested my changes thoroughly before submitting this PR** - [x] **This PR contains minimal changes necessary to address the issue/feature** - [ ] My code follows the project's coding standards and style guidelines - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] I have added necessary documentation (if applicable) - [ ] All new and existing tests pass - [ ] I have searched existing PRs to ensure this change hasn't been submitted already - [ ] I have linked any relevant issues in the description - [ ] My commits have clear and descriptive messages ## Related Issues None ## Additional Notes None ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin. --------- Co-authored-by: Boris <boris@topoteretes.com> Co-authored-by: Boris Arzentar <borisarzentar@gmail.com>
64 lines
2.1 KiB
Python
64 lines
2.1 KiB
Python
import asyncio
|
|
from functools import wraps
|
|
|
|
from cognee.shared.logging_utils import get_logger
|
|
from cognee.infrastructure.utils.calculate_backoff import calculate_backoff
|
|
|
|
|
|
logger = get_logger("deadlock_retry")
|
|
|
|
|
|
def deadlock_retry(max_retries=10):
|
|
"""
|
|
Decorator that automatically retries an asynchronous function when rate limit errors occur.
|
|
|
|
This decorator implements an exponential backoff strategy with jitter
|
|
to handle rate limit errors efficiently.
|
|
|
|
Args:
|
|
max_retries: Maximum number of retry attempts.
|
|
initial_backoff: Initial backoff time in seconds.
|
|
backoff_factor: Multiplier for exponential backoff.
|
|
jitter: Jitter factor to avoid the thundering herd problem.
|
|
|
|
Returns:
|
|
The decorated async function.
|
|
"""
|
|
|
|
def decorator(func):
|
|
@wraps(func)
|
|
async def wrapper(self, *args, **kwargs):
|
|
from neo4j.exceptions import Neo4jError, DatabaseUnavailable
|
|
|
|
attempt = 0
|
|
|
|
async def wait():
|
|
backoff_time = calculate_backoff(attempt)
|
|
logger.warning(
|
|
f"Neo4j rate limit hit, retrying in {backoff_time:.2f}s "
|
|
f"Attempt {attempt}/{max_retries}"
|
|
)
|
|
await asyncio.sleep(backoff_time)
|
|
|
|
while attempt <= max_retries:
|
|
try:
|
|
attempt += 1
|
|
return await func(self, *args, **kwargs)
|
|
except Neo4jError as error:
|
|
if attempt > max_retries:
|
|
raise # Re-raise the original error
|
|
|
|
error_str = str(error)
|
|
if "DeadlockDetected" in error_str or "Neo.TransientError" in error_str:
|
|
await wait()
|
|
else:
|
|
raise # Re-raise the original error
|
|
except DatabaseUnavailable:
|
|
if attempt >= max_retries:
|
|
raise # Re-raise the original error
|
|
|
|
await wait()
|
|
|
|
return wrapper
|
|
|
|
return decorator
|