cognee/cognee/infrastructure/databases/cache/fscache/FsCacheAdapter.py
Daulet Amirkhanov 056424f244
feat: fs-cache (#1645)
<!-- .github/pull_request_template.md -->

## Description
<!--
Please provide a clear, human-generated description of the changes in
this PR.
DO NOT use AI-generated descriptions. We want to understand your thought
process and reasoning.
-->

Implement File-Based Version of the Redis Cache Adapter

Description and acceptance criteria:

This PR introduces a file-based cache adapter as an alternative to the
existing Redis-based adapter. It provides the same core functionality
for caching session data and maintaining context across multiple user
interactions but stores data locally in files instead of Redis.

Because the shared Kùzu lock mechanism relies on Redis, it is not
supported in this implementation. If a lock is configured, the adapter
will raise an error to prevent misconfiguration.

You can test this adapter by enabling caching with the following
settings:

caching=True
cache_backend="fs"

When running multiple searches in a session, the system should correctly
maintain conversational context. For example:

- What is XY?
- Are you sure?
- What was my first question?

In this case, the adapter should preserve previous user–Cognee
interactions within the cache file so that follow-up queries remain
context-aware.


## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [x] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [ ] Performance improvement
- [ ] Other (please specify):

## Screenshots/Videos (if applicable)
<!-- Add screenshots or videos to help explain your changes -->

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [x] **I have tested my changes thoroughly before submitting this PR**
- [x] **This PR contains minimal changes necessary to address the
issue/feature**
- [x] My code follows the project's coding standards and style
guidelines
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have added necessary documentation (if applicable)
- [x] All new and existing tests pass
- [x] I have searched existing PRs to ensure this change hasn't been
submitted already
- [x] I have linked any relevant issues in the description
- [x] My commits have clear and descriptive messages

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.

---------

Co-authored-by: Vasilije <8619304+Vasilije1990@users.noreply.github.com>
Co-authored-by: hajdul88 <52442977+hajdul88@users.noreply.github.com>
2025-11-12 15:34:30 +01:00

151 lines
4.5 KiB
Python

import asyncio
import json
import os
from datetime import datetime
import time
import threading
import diskcache as dc
from cognee.infrastructure.databases.cache.cache_db_interface import CacheDBInterface
from cognee.infrastructure.databases.exceptions.exceptions import (
CacheConnectionError,
SharedKuzuLockRequiresRedisError,
)
from cognee.infrastructure.files.storage.get_storage_config import get_storage_config
from cognee.shared.logging_utils import get_logger
logger = get_logger("FSCacheAdapter")
class FSCacheAdapter(CacheDBInterface):
def __init__(self):
default_key = "sessions_db"
storage_config = get_storage_config()
data_root_directory = storage_config["data_root_directory"]
cache_directory = os.path.join(data_root_directory, ".cognee_fs_cache", default_key)
os.makedirs(cache_directory, exist_ok=True)
self.cache = dc.Cache(directory=cache_directory)
self.cache.expire()
logger.debug(f"FSCacheAdapter initialized with cache directory: {cache_directory}")
def acquire_lock(self):
"""Lock acquisition is not available for filesystem cache backend."""
message = "Shared Kuzu lock requires Redis cache backend."
logger.error(message)
raise SharedKuzuLockRequiresRedisError()
def release_lock(self):
"""Lock release is not available for filesystem cache backend."""
message = "Shared Kuzu lock requires Redis cache backend."
logger.error(message)
raise SharedKuzuLockRequiresRedisError()
async def add_qa(
self,
user_id: str,
session_id: str,
question: str,
context: str,
answer: str,
ttl: int | None = 86400,
):
try:
session_key = f"agent_sessions:{user_id}:{session_id}"
qa_entry = {
"time": datetime.utcnow().isoformat(),
"question": question,
"context": context,
"answer": answer,
}
existing_value = self.cache.get(session_key)
if existing_value is not None:
value: list = json.loads(existing_value)
value.append(qa_entry)
else:
value = [qa_entry]
self.cache.set(session_key, json.dumps(value), expire=ttl)
except Exception as e:
error_msg = f"Unexpected error while adding Q&A to diskcache: {str(e)}"
logger.error(error_msg)
raise CacheConnectionError(error_msg) from e
async def get_latest_qa(self, user_id: str, session_id: str, last_n: int = 5):
session_key = f"agent_sessions:{user_id}:{session_id}"
value = self.cache.get(session_key)
if value is None:
return None
entries = json.loads(value)
return entries[-last_n:] if len(entries) > last_n else entries
async def get_all_qas(self, user_id: str, session_id: str):
session_key = f"agent_sessions:{user_id}:{session_id}"
value = self.cache.get(session_key)
if value is None:
return None
return json.loads(value)
async def close(self):
if self.cache is not None:
self.cache.expire()
self.cache.close()
async def main():
adapter = FSCacheAdapter()
session_id = "demo_session"
user_id = "demo_user_id"
print("\nAdding sample Q/A pairs...")
await adapter.add_qa(
user_id,
session_id,
"What is Redis?",
"Basic DB context",
"Redis is an in-memory data store.",
)
await adapter.add_qa(
user_id,
session_id,
"Who created Redis?",
"Historical context",
"Salvatore Sanfilippo (antirez).",
)
print("\nLatest QA:")
latest = await adapter.get_latest_qa(user_id, session_id)
print(json.dumps(latest, indent=2))
print("\nLast 2 QAs:")
last_two = await adapter.get_latest_qa(user_id, session_id, last_n=2)
print(json.dumps(last_two, indent=2))
session_id = "session_expire_demo"
await adapter.add_qa(
user_id,
session_id,
"What is Redis?",
"Database context",
"Redis is an in-memory data store.",
)
await adapter.add_qa(
user_id,
session_id,
"Who created Redis?",
"History context",
"Salvatore Sanfilippo (antirez).",
)
print(await adapter.get_all_qas(user_id, session_id))
await adapter.close()
if __name__ == "__main__":
asyncio.run(main())