cognee/cognee/tests/subprocesses/simple_cognify_2.py
hajdul88 9821a01a47
feat: Redis lock integration and Kuzu agentic access fix (#1504)
<!-- .github/pull_request_template.md -->

## Description
This PR introduces a shared locked mechanism in KuzuAdapter to avoid use
case when multiple subprocesses from different environments are trying
to use the same Kuzu adatabase.

## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [x] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [x] Performance improvement
- [ ] Other (please specify):

## Screenshots/Videos (if applicable)
None

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [x] **I have tested my changes thoroughly before submitting this PR**
- [x] **This PR contains minimal changes necessary to address the
issue/feature**
- [x] My code follows the project's coding standards and style
guidelines
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have added necessary documentation (if applicable)
- [x] All new and existing tests pass
- [x] I have searched existing PRs to ensure this change hasn't been
submitted already
- [x] I have linked any relevant issues in the description
- [x] My commits have clear and descriptive messages

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
2025-10-16 15:48:20 +02:00

31 lines
872 B
Python

import asyncio
import cognee
from cognee.shared.logging_utils import setup_logging, INFO
from cognee.api.v1.search import SearchType
async def main():
await cognee.cognify(datasets=["second_cognify_dataset"])
query_text = (
"Tell me what is in the context. Additionally write out 'SECOND_COGNIFY' before your answer"
)
search_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION,
query_text=query_text,
datasets=["second_cognify_dataset"],
)
print("Search results:")
for result_text in search_results:
print(result_text)
if __name__ == "__main__":
logger = setup_logging(log_level=INFO)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())