cognee/cognee/shared/logging_utils.py
Daulet Amirkhanov 056424f244
feat: fs-cache (#1645)
<!-- .github/pull_request_template.md -->

## Description
<!--
Please provide a clear, human-generated description of the changes in
this PR.
DO NOT use AI-generated descriptions. We want to understand your thought
process and reasoning.
-->

Implement File-Based Version of the Redis Cache Adapter

Description and acceptance criteria:

This PR introduces a file-based cache adapter as an alternative to the
existing Redis-based adapter. It provides the same core functionality
for caching session data and maintaining context across multiple user
interactions but stores data locally in files instead of Redis.

Because the shared Kùzu lock mechanism relies on Redis, it is not
supported in this implementation. If a lock is configured, the adapter
will raise an error to prevent misconfiguration.

You can test this adapter by enabling caching with the following
settings:

caching=True
cache_backend="fs"

When running multiple searches in a session, the system should correctly
maintain conversational context. For example:

- What is XY?
- Are you sure?
- What was my first question?

In this case, the adapter should preserve previous user–Cognee
interactions within the cache file so that follow-up queries remain
context-aware.


## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [x] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [ ] Performance improvement
- [ ] Other (please specify):

## Screenshots/Videos (if applicable)
<!-- Add screenshots or videos to help explain your changes -->

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [x] **I have tested my changes thoroughly before submitting this PR**
- [x] **This PR contains minimal changes necessary to address the
issue/feature**
- [x] My code follows the project's coding standards and style
guidelines
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have added necessary documentation (if applicable)
- [x] All new and existing tests pass
- [x] I have searched existing PRs to ensure this change hasn't been
submitted already
- [x] I have linked any relevant issues in the description
- [x] My commits have clear and descriptive messages

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.

---------

Co-authored-by: Vasilije <8619304+Vasilije1990@users.noreply.github.com>
Co-authored-by: hajdul88 <52442977+hajdul88@users.noreply.github.com>
2025-11-12 15:34:30 +01:00

587 lines
21 KiB
Python

import os
import sys
import logging
import tempfile
import structlog
import traceback
import platform
from datetime import datetime
from pathlib import Path
import importlib.metadata
from cognee import __version__ as cognee_version
from typing import Protocol
# Configure external library logging
def configure_external_library_logging():
"""Configure logging for external libraries to reduce verbosity"""
# Set environment variables to suppress LiteLLM logging
os.environ.setdefault("LITELLM_LOG", "ERROR")
os.environ.setdefault("LITELLM_SET_VERBOSE", "False")
# Configure LiteLLM logging to reduce verbosity
try:
import litellm
# Disable verbose logging
litellm.set_verbose = False
# Set additional LiteLLM configuration
if hasattr(litellm, "suppress_debug_info"):
litellm.suppress_debug_info = True
if hasattr(litellm, "turn_off_message"):
litellm.turn_off_message = True
if hasattr(litellm, "_turn_on_debug"):
litellm._turn_on_debug = False
# Comprehensive logger suppression
loggers_to_suppress = [
"litellm",
"litellm.litellm_core_utils.logging_worker",
"litellm.litellm_core_utils",
"litellm.proxy",
"litellm.router",
"openai._base_client",
"LiteLLM", # Capital case variant
"LiteLLM.core",
"LiteLLM.logging_worker",
"litellm.logging_worker",
]
for logger_name in loggers_to_suppress:
logging.getLogger(logger_name).setLevel(logging.CRITICAL)
logging.getLogger(logger_name).disabled = True
except ImportError:
# LiteLLM not available, skip configuration
pass
# Export common log levels
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
ERROR = logging.ERROR
CRITICAL = logging.CRITICAL
log_levels = {
"CRITICAL": logging.CRITICAL,
"ERROR": logging.ERROR,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"NOTSET": logging.NOTSET,
}
# Track if structlog logging has been configured
_is_structlog_configured = False
def resolve_logs_dir():
"""Resolve a writable logs directory.
Priority:
1) BaseConfig.logs_root_directory (respects COGNEE_LOGS_DIR)
2) /tmp/cognee_logs (default, best-effort create)
Returns a Path or None if none are writable/creatable.
"""
from cognee.base_config import get_base_config
base_config = get_base_config()
logs_root_directory = Path(base_config.logs_root_directory)
try:
logs_root_directory.mkdir(parents=True, exist_ok=True)
if os.access(logs_root_directory, os.W_OK):
return logs_root_directory
except Exception:
pass
try:
tmp_log_path = Path(os.path.join("/tmp", "cognee_logs"))
tmp_log_path.mkdir(parents=True, exist_ok=True)
if os.access(tmp_log_path, os.W_OK):
return tmp_log_path
except Exception:
pass
return None
# Maximum number of log files to keep
MAX_LOG_FILES = 10
# Version information
PYTHON_VERSION = platform.python_version()
STRUCTLOG_VERSION = structlog.__version__
COGNEE_VERSION = cognee_version
OS_INFO = f"{platform.system()} {platform.release()} ({platform.version()})"
class PlainFileHandler(logging.FileHandler):
"""A custom file handler that writes simpler plain text log entries."""
def emit(self, record):
try:
# Check if stream is available before trying to write
if self.stream is None:
self.stream = self._open()
# Extract the message from the structlog record
if isinstance(record.msg, dict) and "event" in record.msg:
# Extract the basic message
message = record.msg.get("event", "")
# Extract additional context
context = {
k: v
for k, v in record.msg.items()
if k not in ("event", "logger", "level", "timestamp")
}
# Format context if present
context_str = ""
if context:
context_str = " " + " ".join(
f"{k}={v}" for k, v in context.items() if k != "exc_info"
)
# Get the logger name from the record or from the structlog context
logger_name = record.msg.get("logger", record.name)
# Format timestamp
timestamp = datetime.now().strftime(get_timestamp_format())
# Create the log entry
log_entry = f"{timestamp} [{record.levelname.ljust(8)}] {message}{context_str} [{logger_name}]\n"
# Write to file
self.stream.write(log_entry)
self.flush()
# Handle exception if present
# Check both record.exc_info and the 'exc_info' in the message dict
record_has_exc = record.exc_info and record.exc_info != (None, None, None)
msg_has_exc = "exc_info" in record.msg and record.msg["exc_info"]
if record_has_exc:
# Use the exception info from the record
tb_str = "".join(traceback.format_exception(*record.exc_info))
self.stream.write(tb_str + "\n")
self.flush()
elif msg_has_exc and isinstance(record.msg["exc_info"], tuple):
# Use the exception info from the message
tb_str = "".join(traceback.format_exception(*record.msg["exc_info"]))
self.stream.write(tb_str + "\n")
self.flush()
elif msg_has_exc and hasattr(record.msg["exc_info"], "__traceback__"):
# Handle exceptions that are passed directly
exc = record.msg["exc_info"]
tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
self.stream.write(tb_str + "\n")
self.flush()
else:
# Fall back to standard handling for non-structlog messages
msg = self.format(record)
self.stream.write(msg + self.terminator)
self.flush()
# Handle exception if present in regular record
if record.exc_info and record.exc_info != (None, None, None):
tb_str = "".join(traceback.format_exception(*record.exc_info))
self.stream.write(tb_str + "\n")
self.flush()
except Exception as e:
self.handleError(record)
# Write error about handling this record
self.stream.write(f"Error in log handler: {e}\n")
self.flush()
class LoggerInterface(Protocol):
def info(self, msg: str, *args, **kwargs) -> None: ...
def warning(self, msg: str, *args, **kwargs) -> None: ...
def error(self, msg: str, *args, **kwargs) -> None: ...
def critical(self, msg: str, *args, **kwargs) -> None: ...
def debug(self, msg: str, *args, **kwargs) -> None: ...
def get_logger(name=None, level=None) -> LoggerInterface:
"""Get a logger.
If `setup_logging()` has not been called, returns a standard Python logger.
If `setup_logging()` has been called, returns a structlog logger.
"""
if _is_structlog_configured:
return structlog.get_logger(name if name else __name__)
else:
logger = logging.getLogger(name if name else __name__)
if level is not None:
logger.setLevel(level)
return logger
def log_database_configuration(logger):
"""Log the current database configuration for all database types"""
# NOTE: Has to be imporated at runtime to avoid circular import
from cognee.infrastructure.databases.relational.config import get_relational_config
from cognee.infrastructure.databases.vector.config import get_vectordb_config
from cognee.infrastructure.databases.graph.config import get_graph_config
try:
# Get base database directory path
from cognee.base_config import get_base_config
base_config = get_base_config()
databases_path = os.path.join(base_config.system_root_directory, "databases")
# Log concise database info
logger.info(f"Database storage: {databases_path}")
except Exception as e:
logger.debug(f"Could not retrieve database configuration: {str(e)}")
def cleanup_old_logs(logs_dir, max_files):
"""
Removes old log files, keeping only the most recent ones.
Args:
logs_dir: Directory containing log files
max_files: Maximum number of log files to keep
"""
logger = structlog.get_logger()
try:
# Get all .log files in the directory (excluding README and other files)
log_files = [f for f in logs_dir.glob("*.log") if f.is_file()]
# Sort log files by modification time (newest first)
log_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
# Remove old files that exceed the maximum
if len(log_files) > max_files:
deleted_count = 0
for old_file in log_files[max_files:]:
try:
old_file.unlink()
deleted_count += 1
# Only log individual files in non-CLI mode
if os.getenv("COGNEE_CLI_MODE") != "true":
logger.info(f"Deleted old log file: {old_file}")
except Exception as e:
# Always log errors
logger.error(f"Failed to delete old log file {old_file}: {e}")
# In CLI mode, show compact summary
if os.getenv("COGNEE_CLI_MODE") == "true" and deleted_count > 0:
logger.info(f"Cleaned up {deleted_count} old log files")
return True
except Exception as e:
logger.error(f"Error cleaning up log files: {e}")
return False
def setup_logging(log_level=None, name=None):
"""Sets up the logging configuration with structlog integration.
Args:
log_level: The logging level to use (default: None, uses INFO)
name: Optional logger name (default: None, uses __name__)
Returns:
A configured structlog logger instance
"""
global _is_structlog_configured
# Regular detailed logging for non-CLI usage
log_level = log_level if log_level else log_levels[os.getenv("LOG_LEVEL", "INFO").upper()]
# Configure external library logging early to suppress verbose output
configure_external_library_logging()
# Add custom filter to suppress LiteLLM worker cancellation errors
class LiteLLMCancellationFilter(logging.Filter):
"""Filter to suppress LiteLLM worker cancellation messages"""
def filter(self, record):
# Check if this is a LiteLLM-related logger
if hasattr(record, "name") and "litellm" in record.name.lower():
return False
# Check message content for cancellation errors
if hasattr(record, "msg") and record.msg:
msg_str = str(record.msg).lower()
if any(
keyword in msg_str
for keyword in [
"loggingworker cancelled",
"logging_worker.py",
"cancellederror",
"litellm:error",
]
):
return False
# Check formatted message
try:
if hasattr(record, "getMessage"):
formatted_msg = record.getMessage().lower()
if any(
keyword in formatted_msg
for keyword in [
"loggingworker cancelled",
"logging_worker.py",
"cancellederror",
"litellm:error",
]
):
return False
except Exception:
pass
return True
# Apply the filter to root logger and specific loggers
cancellation_filter = LiteLLMCancellationFilter()
logging.getLogger().addFilter(cancellation_filter)
logging.getLogger("litellm").addFilter(cancellation_filter)
# Add custom filter to suppress LiteLLM worker cancellation errors
class LiteLLMFilter(logging.Filter):
def filter(self, record):
# Suppress LiteLLM worker cancellation errors
if hasattr(record, "msg") and isinstance(record.msg, str):
msg_lower = record.msg.lower()
if any(
phrase in msg_lower
for phrase in [
"loggingworker cancelled",
"cancellederror",
"logging_worker.py",
"loggingerror",
]
):
return False
return True
# Apply filter to root logger
litellm_filter = LiteLLMFilter()
logging.getLogger().addFilter(litellm_filter)
def exception_handler(logger, method_name, event_dict):
"""Custom processor to handle uncaught exceptions."""
# Check if there's an exc_info that needs to be processed
if event_dict.get("exc_info"):
# If it's already a tuple, use it directly
if isinstance(event_dict["exc_info"], tuple):
exc_type, exc_value, tb = event_dict["exc_info"]
else:
exc_type, exc_value, tb = sys.exc_info()
if hasattr(exc_type, __name__):
event_dict["exception_type"] = exc_type.__name__
event_dict["exception_message"] = str(exc_value)
event_dict["traceback"] = True
return event_dict
# Configure structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt=get_timestamp_format(), utc=True),
structlog.processors.StackInfoRenderer(),
exception_handler, # Add our custom exception handler
structlog.processors.UnicodeDecoder(),
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# Set up system-wide exception handling
def handle_exception(exc_type, exc_value, traceback):
"""Handle any uncaught exception."""
if issubclass(exc_type, KeyboardInterrupt):
# Let KeyboardInterrupt pass through
sys.__excepthook__(exc_type, exc_value, traceback)
return
logger = structlog.get_logger()
logger.error(
"Exception",
exc_info=(exc_type, exc_value, traceback),
)
# Hand back to the original hook → prints traceback and exits
sys.__excepthook__(exc_type, exc_value, traceback)
# Install exception handlers
sys.excepthook = handle_exception
# Create console formatter for standard library logging
console_formatter = structlog.stdlib.ProcessorFormatter(
processor=structlog.dev.ConsoleRenderer(
colors=True,
force_colors=True,
level_styles={
"critical": structlog.dev.RED,
"exception": structlog.dev.RED,
"error": structlog.dev.RED,
"warn": structlog.dev.YELLOW,
"warning": structlog.dev.YELLOW,
"info": structlog.dev.GREEN,
"debug": structlog.dev.BLUE,
},
),
)
# Setup handler with newlines for console output
class NewlineStreamHandler(logging.StreamHandler):
def emit(self, record):
try:
msg = self.format(record)
stream = self.stream
if hasattr(stream, "closed") and stream.closed:
return
stream.write("\n" + msg + self.terminator)
self.flush()
except Exception:
self.handleError(record)
# Use our custom handler for console output
stream_handler = NewlineStreamHandler(sys.stderr)
stream_handler.setFormatter(console_formatter)
stream_handler.setLevel(log_level)
root_logger = logging.getLogger()
if root_logger.hasHandlers():
root_logger.handlers.clear()
root_logger.addHandler(stream_handler)
# Note: root logger needs to be set at NOTSET to allow all messages through and specific stream and file handlers
# can define their own levels.
root_logger.setLevel(logging.NOTSET)
# Resolve logs directory with env and safe fallbacks
logs_dir = resolve_logs_dir()
# Check if we already have a log file path from the environment
# NOTE: environment variable must be used here as it allows us to
# log to a single file with a name based on a timestamp in a multiprocess setting.
# Without it, we would have a separate log file for every process.
log_file_path = os.environ.get("LOG_FILE_NAME")
if not log_file_path and logs_dir is not None:
# Create a new log file name with the cognee start time
start_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_file_path = str((logs_dir / f"{start_time}.log").resolve())
os.environ["LOG_FILE_NAME"] = log_file_path
try:
# Create a file handler that uses our custom PlainFileHandler
file_handler = PlainFileHandler(log_file_path, encoding="utf-8")
file_handler.setLevel(DEBUG)
root_logger.addHandler(file_handler)
except Exception as e:
# Note: Exceptions happen in case of read only file systems or log file path poiting to location where it does
# not have write permission. Logging to file is not mandatory so we just log a warning to console.
root_logger.warning(f"Warning: Could not create log file handler at {log_file_path}: {e}")
if log_level > logging.DEBUG:
import warnings
from sqlalchemy.exc import SAWarning
warnings.filterwarnings(
"ignore", category=SAWarning, module="dlt.destinations.impl.sqlalchemy.merge_job"
)
warnings.filterwarnings(
"ignore", category=SAWarning, module="dlt.destinations.impl.sqlalchemy.load_jobs"
)
# Clean up old log files, keeping only the most recent ones
if logs_dir is not None:
cleanup_old_logs(logs_dir, MAX_LOG_FILES)
# Mark logging as configured
_is_structlog_configured = True
from cognee.infrastructure.databases.relational.config import get_relational_config
from cognee.infrastructure.databases.vector.config import get_vectordb_config
from cognee.infrastructure.databases.graph.config import get_graph_config
graph_config = get_graph_config()
vector_config = get_vectordb_config()
relational_config = get_relational_config()
try:
# Get base database directory path
from cognee.base_config import get_base_config
base_config = get_base_config()
databases_path = os.path.join(base_config.system_root_directory, "databases")
except Exception as e:
raise ValueError from e
# Get a configured logger and log system information
logger = structlog.get_logger(name if name else __name__)
if logs_dir is not None:
logger.info(f"Log file created at: {log_file_path}", log_file=log_file_path)
# Detailed initialization for regular usage
logger.info(
"Logging initialized",
python_version=PYTHON_VERSION,
structlog_version=STRUCTLOG_VERSION,
cognee_version=COGNEE_VERSION,
os_info=OS_INFO,
database_path=databases_path,
graph_database_name=graph_config.graph_database_name,
vector_config=vector_config.vector_db_provider,
relational_config=relational_config.db_name,
)
# Log database configuration
log_database_configuration(logger)
# Return the configured logger
return logger
def get_log_file_location():
"""Return the file path of the log file in use, if any."""
root_logger = logging.getLogger()
# Loop through handlers to find the FileHandler
for handler in root_logger.handlers:
if isinstance(handler, logging.FileHandler):
return handler.baseFilename
def get_timestamp_format():
# NOTE: Some users have complained that Cognee crashes when trying to get microsecond value
# Added handler to not use microseconds if users can't access it
logger = structlog.get_logger()
try:
# We call datetime.now() here to test if microseconds are supported.
# If they are not supported a ValueError will be raised
datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")
return "%Y-%m-%dT%H:%M:%S.%f"
except Exception as e:
logger.debug(f"Exception caught: {e}")
logger.debug(
"Could not use microseconds for the logging timestamp, defaulting to use hours minutes and seconds only"
)
# We call datetime.now() here to test if won't break.
datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
# We return the timestamp format without microseconds as they are not supported
return "%Y-%m-%dT%H:%M:%S"