diff --git a/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py b/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py new file mode 100644 index 000000000..267e11fb2 --- /dev/null +++ b/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py @@ -0,0 +1,46 @@ +"""add_last_accessed_to_data + +Revision ID: e1ec1dcb50b6 +Revises: 211ab850ef3d +Create Date: 2025-11-04 21:45:52.642322 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'e1ec1dcb50b6' +down_revision: Union[str, None] = '211ab850ef3d' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +def _get_column(inspector, table, name, schema=None): + for col in inspector.get_columns(table, schema=schema): + if col["name"] == name: + return col + return None + + +def upgrade() -> None: + conn = op.get_bind() + insp = sa.inspect(conn) + + last_accessed_column = _get_column(insp, "data", "last_accessed") + if not last_accessed_column: + op.add_column('data', + sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True) + ) + # Optionally initialize with created_at values for existing records + op.execute("UPDATE data SET last_accessed = created_at") + + +def downgrade() -> None: + conn = op.get_bind() + insp = sa.inspect(conn) + + last_accessed_column = _get_column(insp, "data", "last_accessed") + if last_accessed_column: + op.drop_column('data', 'last_accessed') diff --git a/cognee-mcp/README.md b/cognee-mcp/README.md index d14bc9fa1..9ac8b4973 100644 --- a/cognee-mcp/README.md +++ b/cognee-mcp/README.md @@ -110,6 +110,47 @@ If you'd rather run cognee-mcp in a container, you have two options: # For stdio transport (default) docker run -e TRANSPORT_MODE=stdio --env-file ./.env --rm -it cognee/cognee-mcp:main ``` + + **Installing optional dependencies at runtime:** + + You can install optional dependencies when running the container by setting the `EXTRAS` environment variable: + ```bash + # Install a single optional dependency group at runtime + docker run \ + -e TRANSPORT_MODE=http \ + -e EXTRAS=aws \ + --env-file ./.env \ + -p 8000:8000 \ + --rm -it cognee/cognee-mcp:main + + # Install multiple optional dependency groups at runtime (comma-separated) + docker run \ + -e TRANSPORT_MODE=sse \ + -e EXTRAS=aws,postgres,neo4j \ + --env-file ./.env \ + -p 8000:8000 \ + --rm -it cognee/cognee-mcp:main + ``` + + **Available optional dependency groups:** + - `aws` - S3 storage support + - `postgres` / `postgres-binary` - PostgreSQL database support + - `neo4j` - Neo4j graph database support + - `neptune` - AWS Neptune support + - `chromadb` - ChromaDB vector store support + - `scraping` - Web scraping capabilities + - `distributed` - Modal distributed execution + - `langchain` - LangChain integration + - `llama-index` - LlamaIndex integration + - `anthropic` - Anthropic models + - `groq` - Groq models + - `mistral` - Mistral models + - `ollama` / `huggingface` - Local model support + - `docs` - Document processing + - `codegraph` - Code analysis + - `monitoring` - Sentry & Langfuse monitoring + - `redis` - Redis support + - And more (see [pyproject.toml](https://github.com/topoteretes/cognee/blob/main/pyproject.toml) for full list) 2. **Pull from Docker Hub** (no build required): ```bash # With HTTP transport (recommended for web deployments) @@ -119,6 +160,17 @@ If you'd rather run cognee-mcp in a container, you have two options: # With stdio transport (default) docker run -e TRANSPORT_MODE=stdio --env-file ./.env --rm -it cognee/cognee-mcp:main ``` + + **With runtime installation of optional dependencies:** + ```bash + # Install optional dependencies from Docker Hub image + docker run \ + -e TRANSPORT_MODE=http \ + -e EXTRAS=aws,postgres \ + --env-file ./.env \ + -p 8000:8000 \ + --rm -it cognee/cognee-mcp:main + ``` ### **Important: Docker vs Direct Usage** **Docker uses environment variables**, not command line arguments: diff --git a/cognee-mcp/entrypoint.sh b/cognee-mcp/entrypoint.sh index 2f122bbfd..cf7d19f0a 100644 --- a/cognee-mcp/entrypoint.sh +++ b/cognee-mcp/entrypoint.sh @@ -4,6 +4,42 @@ set -e # Exit on error echo "Debug mode: $DEBUG" echo "Environment: $ENVIRONMENT" +# Install optional dependencies if EXTRAS is set +if [ -n "$EXTRAS" ]; then + echo "Installing optional dependencies: $EXTRAS" + + # Get the cognee version that's currently installed + COGNEE_VERSION=$(uv pip show cognee | grep "Version:" | awk '{print $2}') + echo "Current cognee version: $COGNEE_VERSION" + + # Build the extras list for cognee + IFS=',' read -ra EXTRA_ARRAY <<< "$EXTRAS" + # Combine base extras from pyproject.toml with requested extras + ALL_EXTRAS="" + for extra in "${EXTRA_ARRAY[@]}"; do + # Trim whitespace + extra=$(echo "$extra" | xargs) + # Add to extras list if not already present + if [[ ! "$ALL_EXTRAS" =~ (^|,)"$extra"(,|$) ]]; then + if [ -z "$ALL_EXTRAS" ]; then + ALL_EXTRAS="$extra" + else + ALL_EXTRAS="$ALL_EXTRAS,$extra" + fi + fi + done + + echo "Installing cognee with extras: $ALL_EXTRAS" + echo "Running: uv pip install 'cognee[$ALL_EXTRAS]==$COGNEE_VERSION'" + uv pip install "cognee[$ALL_EXTRAS]==$COGNEE_VERSION" + + # Verify installation + echo "" + echo "✓ Optional dependencies installation completed" +else + echo "No optional dependencies specified" +fi + # Set default transport mode if not specified TRANSPORT_MODE=${TRANSPORT_MODE:-"stdio"} echo "Transport mode: $TRANSPORT_MODE" diff --git a/cognee/base_config.py b/cognee/base_config.py index a2ad06249..a4c88e0da 100644 --- a/cognee/base_config.py +++ b/cognee/base_config.py @@ -1,4 +1,5 @@ import os +from pathlib import Path from typing import Optional from functools import lru_cache from cognee.root_dir import get_absolute_path, ensure_absolute_path @@ -11,6 +12,9 @@ class BaseConfig(BaseSettings): data_root_directory: str = get_absolute_path(".data_storage") system_root_directory: str = get_absolute_path(".cognee_system") cache_root_directory: str = get_absolute_path(".cognee_cache") + logs_root_directory: str = os.getenv( + "COGNEE_LOGS_DIR", str(os.path.join(os.path.dirname(os.path.dirname(__file__)), "logs")) + ) monitoring_tool: object = Observer.NONE @pydantic.model_validator(mode="after") @@ -30,6 +34,8 @@ class BaseConfig(BaseSettings): # Require absolute paths for root directories self.data_root_directory = ensure_absolute_path(self.data_root_directory) self.system_root_directory = ensure_absolute_path(self.system_root_directory) + self.logs_root_directory = ensure_absolute_path(self.logs_root_directory) + # Set monitoring tool based on available keys if self.langfuse_public_key and self.langfuse_secret_key: self.monitoring_tool = Observer.LANGFUSE @@ -49,6 +55,7 @@ class BaseConfig(BaseSettings): "system_root_directory": self.system_root_directory, "monitoring_tool": self.monitoring_tool, "cache_root_directory": self.cache_root_directory, + "logs_root_directory": self.logs_root_directory, } diff --git a/cognee/infrastructure/databases/vector/create_vector_engine.py b/cognee/infrastructure/databases/vector/create_vector_engine.py index 639bbb9f6..d1cf855d7 100644 --- a/cognee/infrastructure/databases/vector/create_vector_engine.py +++ b/cognee/infrastructure/databases/vector/create_vector_engine.py @@ -47,7 +47,7 @@ def create_vector_engine( embedding_engine=embedding_engine, ) - if vector_db_provider == "pgvector": + if vector_db_provider.lower() == "pgvector": from cognee.infrastructure.databases.relational import get_relational_config # Get configuration for postgres database @@ -78,7 +78,7 @@ def create_vector_engine( embedding_engine, ) - elif vector_db_provider == "chromadb": + elif vector_db_provider.lower() == "chromadb": try: import chromadb except ImportError: @@ -94,7 +94,7 @@ def create_vector_engine( embedding_engine=embedding_engine, ) - elif vector_db_provider == "neptune_analytics": + elif vector_db_provider.lower() == "neptune_analytics": try: from langchain_aws import NeptuneAnalyticsGraph except ImportError: @@ -122,7 +122,7 @@ def create_vector_engine( embedding_engine=embedding_engine, ) - else: + elif vector_db_provider.lower() == "lancedb": from .lancedb.LanceDBAdapter import LanceDBAdapter return LanceDBAdapter( @@ -130,3 +130,9 @@ def create_vector_engine( api_key=vector_db_key, embedding_engine=embedding_engine, ) + + else: + raise EnvironmentError( + f"Unsupported graph database provider: {vector_db_provider}. " + f"Supported providers are: {', '.join(list(supported_databases.keys()) + ['LanceDB', 'PGVector', 'neptune_analytics', 'ChromaDB'])}" + ) diff --git a/cognee/infrastructure/engine/models/DataPoint.py b/cognee/infrastructure/engine/models/DataPoint.py index 812380eaa..3178713c8 100644 --- a/cognee/infrastructure/engine/models/DataPoint.py +++ b/cognee/infrastructure/engine/models/DataPoint.py @@ -43,6 +43,9 @@ class DataPoint(BaseModel): updated_at: int = Field( default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) ) + last_accessed_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) + ) ontology_valid: bool = False version: int = 1 # Default version topological_rank: Optional[int] = 0 diff --git a/cognee/infrastructure/files/utils/get_file_metadata.py b/cognee/infrastructure/files/utils/get_file_metadata.py index 23b10a6df..3b6c5a364 100644 --- a/cognee/infrastructure/files/utils/get_file_metadata.py +++ b/cognee/infrastructure/files/utils/get_file_metadata.py @@ -1,6 +1,6 @@ import io import os.path -from typing import BinaryIO, TypedDict +from typing import BinaryIO, TypedDict, Optional from pathlib import Path from cognee.shared.logging_utils import get_logger @@ -27,7 +27,7 @@ class FileMetadata(TypedDict): file_size: int -async def get_file_metadata(file: BinaryIO) -> FileMetadata: +async def get_file_metadata(file: BinaryIO, name: Optional[str] = None) -> FileMetadata: """ Retrieve metadata from a file object. @@ -53,7 +53,7 @@ async def get_file_metadata(file: BinaryIO) -> FileMetadata: except io.UnsupportedOperation as error: logger.error(f"Error retrieving content hash for file: {file.name} \n{str(error)}\n\n") - file_type = guess_file_type(file) + file_type = guess_file_type(file, name) file_path = getattr(file, "name", None) or getattr(file, "full_name", None) diff --git a/cognee/infrastructure/files/utils/guess_file_type.py b/cognee/infrastructure/files/utils/guess_file_type.py index dcdd68cad..78b20c93d 100644 --- a/cognee/infrastructure/files/utils/guess_file_type.py +++ b/cognee/infrastructure/files/utils/guess_file_type.py @@ -1,6 +1,9 @@ -from typing import BinaryIO +import io +from pathlib import Path +from typing import BinaryIO, Optional, Any import filetype -from .is_text_content import is_text_content +from tempfile import SpooledTemporaryFile +from filetype.types.base import Type class FileTypeException(Exception): @@ -22,90 +25,7 @@ class FileTypeException(Exception): self.message = message -class TxtFileType(filetype.Type): - """ - Represents a text file type with specific MIME and extension properties. - - Public methods: - - match: Determines whether a given buffer matches the text file type. - """ - - MIME = "text/plain" - EXTENSION = "txt" - - def __init__(self): - super(TxtFileType, self).__init__(mime=TxtFileType.MIME, extension=TxtFileType.EXTENSION) - - def match(self, buf): - """ - Determine if the given buffer contains text content. - - Parameters: - ----------- - - - buf: The buffer to check for text content. - - Returns: - -------- - - Returns True if the buffer is identified as text content, otherwise False. - """ - return is_text_content(buf) - - -txt_file_type = TxtFileType() - -filetype.add_type(txt_file_type) - - -class CustomPdfMatcher(filetype.Type): - """ - Match PDF file types based on MIME type and extension. - - Public methods: - - match - - Instance variables: - - MIME: The MIME type of the PDF. - - EXTENSION: The file extension of the PDF. - """ - - MIME = "application/pdf" - EXTENSION = "pdf" - - def __init__(self): - super(CustomPdfMatcher, self).__init__( - mime=CustomPdfMatcher.MIME, extension=CustomPdfMatcher.EXTENSION - ) - - def match(self, buf): - """ - Determine if the provided buffer is a PDF file. - - This method checks for the presence of the PDF signature in the buffer. - - Raises: - - TypeError: If the buffer is not of bytes type. - - Parameters: - ----------- - - - buf: The buffer containing the data to be checked. - - Returns: - -------- - - Returns True if the buffer contains a PDF signature, otherwise returns False. - """ - return b"PDF-" in buf - - -custom_pdf_matcher = CustomPdfMatcher() - -filetype.add_type(custom_pdf_matcher) - - -def guess_file_type(file: BinaryIO) -> filetype.Type: +def guess_file_type(file: BinaryIO, name: Optional[str] = None) -> filetype.Type: """ Guess the file type from the given binary file stream. @@ -122,12 +42,23 @@ def guess_file_type(file: BinaryIO) -> filetype.Type: - filetype.Type: The guessed file type, represented as filetype.Type. """ + + # Note: If file has .txt or .text extension, consider it a plain text file as filetype.guess may not detect it properly + # as it contains no magic number encoding + ext = None + if isinstance(file, str): + ext = Path(file).suffix + elif name is not None: + ext = Path(name).suffix + + if ext in [".txt", ".text"]: + file_type = Type("text/plain", "txt") + return file_type + file_type = filetype.guess(file) # If file type could not be determined consider it a plain text file as they don't have magic number encoding if file_type is None: - from filetype.types.base import Type - file_type = Type("text/plain", "txt") if file_type is None: diff --git a/cognee/infrastructure/llm/prompts/extract_query_time.txt b/cognee/infrastructure/llm/prompts/extract_query_time.txt index 763d0e1c4..ce78c3471 100644 --- a/cognee/infrastructure/llm/prompts/extract_query_time.txt +++ b/cognee/infrastructure/llm/prompts/extract_query_time.txt @@ -1,15 +1,13 @@ -For the purposes of identifying timestamps in a query, you are tasked with extracting relevant timestamps from the query. -## Timestamp requirements -- If the query contains interval extrack both starts_at and ends_at properties -- If the query contains an instantaneous timestamp, starts_at and ends_at should be the same -- If the query its open-ended (before 2009 or after 2009), the corresponding non defined end of the time should be none - -For example: "before 2009" -- starts_at: None, ends_at: 2009 or "after 2009" -- starts_at: 2009, ends_at: None -- Put always the data that comes first in time as starts_at and the timestamps that comes second in time as ends_at -- If starts_at or ends_at cannot be extracted both of them has to be None -## Output Format -Your reply should be a JSON: list of dictionaries with the following structure: -```python -class QueryInterval(BaseModel): - starts_at: Optional[Timestamp] = None - ends_at: Optional[Timestamp] = None -``` \ No newline at end of file +You are tasked with identifying relevant time periods where the answer to a given query should be searched. +Current date is: `{{ time_now }}`. Determine relevant period(s) and return structured intervals. + +Extraction rules: + +1. Query without specific timestamp: use the time period with starts_at set to None and ends_at set to now. +2. Explicit time intervals: If the query specifies a range (e.g., from 2010 to 2020, between January and March 2023), extract both start and end dates. Always assign the earlier date to starts_at and the later date to ends_at. +3. Single timestamp: If the query refers to one specific moment (e.g., in 2015, on March 5, 2022), set starts_at and ends_at to that same timestamp. +4. Open-ended time references: For phrases such as "before X" or "after X", represent the unspecified side as None. For example: before 2009 → starts_at: None, ends_at: 2009; after 2009 → starts_at: 2009, ends_at: None. +5. Current-time references ("now", "current", "today"): If the query explicitly refers to the present, set both starts_at and ends_at to now (the ingestion timestamp). +6. "Who is" and "Who was" questions: These imply a general identity or biographical inquiry without a specific temporal scope. Set both starts_at and ends_at to None. +7. Ordering rule: Always ensure the earlier date is assigned to starts_at and the later date to ends_at. +8. No temporal information: If no valid or inferable time reference is found, set both starts_at and ends_at to None. \ No newline at end of file diff --git a/cognee/infrastructure/loaders/LoaderEngine.py b/cognee/infrastructure/loaders/LoaderEngine.py index 725f37b14..f9511e7c5 100644 --- a/cognee/infrastructure/loaders/LoaderEngine.py +++ b/cognee/infrastructure/loaders/LoaderEngine.py @@ -1,6 +1,7 @@ import filetype from typing import Dict, List, Optional, Any from .LoaderInterface import LoaderInterface +from cognee.infrastructure.files.utils.guess_file_type import guess_file_type from cognee.shared.logging_utils import get_logger logger = get_logger(__name__) @@ -80,7 +81,7 @@ class LoaderEngine: """ from pathlib import Path - file_info = filetype.guess(file_path) + file_info = guess_file_type(file_path) path_extension = Path(file_path).suffix.lstrip(".") diff --git a/cognee/infrastructure/loaders/core/audio_loader.py b/cognee/infrastructure/loaders/core/audio_loader.py index 17294bd94..f04d9a0e0 100644 --- a/cognee/infrastructure/loaders/core/audio_loader.py +++ b/cognee/infrastructure/loaders/core/audio_loader.py @@ -42,6 +42,7 @@ class AudioLoader(LoaderInterface): "audio/wav", "audio/amr", "audio/aiff", + "audio/x-wav", ] @property diff --git a/cognee/modules/chunking/models/DocumentChunk.py b/cognee/modules/chunking/models/DocumentChunk.py index 9f8c57486..601454802 100644 --- a/cognee/modules/chunking/models/DocumentChunk.py +++ b/cognee/modules/chunking/models/DocumentChunk.py @@ -1,5 +1,7 @@ from typing import List, Union +from pydantic import BaseModel, Field +from datetime import datetime, timezone from cognee.infrastructure.engine import DataPoint from cognee.modules.data.processing.document_types import Document from cognee.modules.engine.models import Entity @@ -22,6 +24,7 @@ class DocumentChunk(DataPoint): - cut_type: The type of cut that defined this chunk. - is_part_of: The document to which this chunk belongs. - contains: A list of entities or events contained within the chunk (default is None). + - last_accessed_at: The timestamp of the last time the chunk was accessed. - metadata: A dictionary to hold meta information related to the chunk, including index fields. """ @@ -32,5 +35,4 @@ class DocumentChunk(DataPoint): cut_type: str is_part_of: Document contains: List[Union[Entity, Event]] = None - metadata: dict = {"index_fields": ["text"]} diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index ef228f2e1..27ab7481e 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -36,6 +36,7 @@ class Data(Base): data_size = Column(Integer, nullable=True) # File size in bytes created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc)) + last_accessed = Column(DateTime(timezone=True), nullable=True) datasets = relationship( "Dataset", diff --git a/cognee/modules/engine/models/Entity.py b/cognee/modules/engine/models/Entity.py index 36da2e344..4083cd2e6 100644 --- a/cognee/modules/engine/models/Entity.py +++ b/cognee/modules/engine/models/Entity.py @@ -1,11 +1,11 @@ from cognee.infrastructure.engine import DataPoint from cognee.modules.engine.models.EntityType import EntityType from typing import Optional - +from datetime import datetime, timezone +from pydantic import BaseModel, Field class Entity(DataPoint): name: str is_a: Optional[EntityType] = None description: str - metadata: dict = {"index_fields": ["name"]} diff --git a/cognee/modules/graph/utils/__init__.py b/cognee/modules/graph/utils/__init__.py index ebc648495..4c0b29d47 100644 --- a/cognee/modules/graph/utils/__init__.py +++ b/cognee/modules/graph/utils/__init__.py @@ -5,3 +5,4 @@ from .retrieve_existing_edges import retrieve_existing_edges from .convert_node_to_data_point import convert_node_to_data_point from .deduplicate_nodes_and_edges import deduplicate_nodes_and_edges from .resolve_edges_to_text import resolve_edges_to_text +from .get_entity_nodes_from_triplets import get_entity_nodes_from_triplets diff --git a/cognee/modules/graph/utils/get_entity_nodes_from_triplets.py b/cognee/modules/graph/utils/get_entity_nodes_from_triplets.py new file mode 100644 index 000000000..598a36854 --- /dev/null +++ b/cognee/modules/graph/utils/get_entity_nodes_from_triplets.py @@ -0,0 +1,13 @@ + +def get_entity_nodes_from_triplets(triplets): + entity_nodes = [] + seen_ids = set() + for triplet in triplets: + if hasattr(triplet, 'node1') and triplet.node1 and triplet.node1.id not in seen_ids: + entity_nodes.append({"id": str(triplet.node1.id)}) + seen_ids.add(triplet.node1.id) + if hasattr(triplet, 'node2') and triplet.node2 and triplet.node2.id not in seen_ids: + entity_nodes.append({"id": str(triplet.node2.id)}) + seen_ids.add(triplet.node2.id) + + return entity_nodes diff --git a/cognee/modules/ingestion/data_types/BinaryData.py b/cognee/modules/ingestion/data_types/BinaryData.py index f96e0d65c..9448dddcf 100644 --- a/cognee/modules/ingestion/data_types/BinaryData.py +++ b/cognee/modules/ingestion/data_types/BinaryData.py @@ -30,7 +30,7 @@ class BinaryData(IngestionData): async def ensure_metadata(self): if self.metadata is None: - self.metadata = await get_file_metadata(self.data) + self.metadata = await get_file_metadata(self.data, name=self.name) if self.metadata["name"] is None: self.metadata["name"] = self.name diff --git a/cognee/modules/ontology/get_default_ontology_resolver.py b/cognee/modules/ontology/get_default_ontology_resolver.py index f9aebe59a..7d87c10a6 100644 --- a/cognee/modules/ontology/get_default_ontology_resolver.py +++ b/cognee/modules/ontology/get_default_ontology_resolver.py @@ -21,7 +21,8 @@ def get_ontology_resolver_from_env( Supported value: "rdflib". matching_strategy (str): The matching strategy to apply. Supported value: "fuzzy". - ontology_file_path (str): Path to the ontology file required for the resolver. + ontology_file_path (str): Path to the ontology file(s) required for the resolver. + Can be a single path or comma-separated paths for multiple files. Returns: BaseOntologyResolver: An instance of the requested ontology resolver. @@ -31,8 +32,13 @@ def get_ontology_resolver_from_env( or if required parameters are missing. """ if ontology_resolver == "rdflib" and matching_strategy == "fuzzy" and ontology_file_path: + if "," in ontology_file_path: + file_paths = [path.strip() for path in ontology_file_path.split(",")] + else: + file_paths = ontology_file_path + return RDFLibOntologyResolver( - matching_strategy=FuzzyMatchingStrategy(), ontology_file=ontology_file_path + matching_strategy=FuzzyMatchingStrategy(), ontology_file=file_paths ) else: raise EnvironmentError( diff --git a/cognee/modules/ontology/rdf_xml/RDFLibOntologyResolver.py b/cognee/modules/ontology/rdf_xml/RDFLibOntologyResolver.py index 2a7a03751..45e32936a 100644 --- a/cognee/modules/ontology/rdf_xml/RDFLibOntologyResolver.py +++ b/cognee/modules/ontology/rdf_xml/RDFLibOntologyResolver.py @@ -2,7 +2,7 @@ import os import difflib from cognee.shared.logging_utils import get_logger from collections import deque -from typing import List, Tuple, Dict, Optional, Any +from typing import List, Tuple, Dict, Optional, Any, Union from rdflib import Graph, URIRef, RDF, RDFS, OWL from cognee.modules.ontology.exceptions import ( @@ -26,22 +26,50 @@ class RDFLibOntologyResolver(BaseOntologyResolver): def __init__( self, - ontology_file: Optional[str] = None, + ontology_file: Optional[Union[str, List[str]]] = None, matching_strategy: Optional[MatchingStrategy] = None, ) -> None: super().__init__(matching_strategy) self.ontology_file = ontology_file try: - if ontology_file and os.path.exists(ontology_file): + files_to_load = [] + if ontology_file is not None: + if isinstance(ontology_file, str): + files_to_load = [ontology_file] + elif isinstance(ontology_file, list): + files_to_load = ontology_file + else: + raise ValueError( + f"ontology_file must be a string, list of strings, or None. Got: {type(ontology_file)}" + ) + + if files_to_load: self.graph = Graph() - self.graph.parse(ontology_file) - logger.info("Ontology loaded successfully from file: %s", ontology_file) + loaded_files = [] + for file_path in files_to_load: + if os.path.exists(file_path): + self.graph.parse(file_path) + loaded_files.append(file_path) + logger.info("Ontology loaded successfully from file: %s", file_path) + else: + logger.warning( + "Ontology file '%s' not found. Skipping this file.", + file_path, + ) + + if not loaded_files: + logger.info( + "No valid ontology files found. No owl ontology will be attached to the graph." + ) + self.graph = None + else: + logger.info("Total ontology files loaded: %d", len(loaded_files)) else: logger.info( - "Ontology file '%s' not found. No owl ontology will be attached to the graph.", - ontology_file, + "No ontology file provided. No owl ontology will be attached to the graph." ) self.graph = None + self.build_lookup() except Exception as e: logger.error("Failed to load ontology", exc_info=e) diff --git a/cognee/modules/pipelines/operations/run_tasks_base.py b/cognee/modules/pipelines/operations/run_tasks_base.py index ee2ccfd8c..79d37a451 100644 --- a/cognee/modules/pipelines/operations/run_tasks_base.py +++ b/cognee/modules/pipelines/operations/run_tasks_base.py @@ -27,6 +27,7 @@ async def handle_task( additional_properties={ "task_name": running_task.executable.__name__, "cognee_version": cognee_version, + "tenant_id": str(user.tenant_id) if user.tenant_id else "Single User Tenant", }, ) @@ -49,6 +50,7 @@ async def handle_task( additional_properties={ "task_name": running_task.executable.__name__, "cognee_version": cognee_version, + "tenant_id": str(user.tenant_id) if user.tenant_id else "Single User Tenant", }, ) except Exception as error: @@ -62,6 +64,7 @@ async def handle_task( additional_properties={ "task_name": running_task.executable.__name__, "cognee_version": cognee_version, + "tenant_id": str(user.tenant_id) if user.tenant_id else "Single User Tenant", }, ) raise error diff --git a/cognee/modules/pipelines/operations/run_tasks_with_telemetry.py b/cognee/modules/pipelines/operations/run_tasks_with_telemetry.py index 9a52bf854..ae968c7a5 100644 --- a/cognee/modules/pipelines/operations/run_tasks_with_telemetry.py +++ b/cognee/modules/pipelines/operations/run_tasks_with_telemetry.py @@ -28,6 +28,7 @@ async def run_tasks_with_telemetry( additional_properties={ "pipeline_name": str(pipeline_name), "cognee_version": cognee_version, + "tenant_id": str(user.tenant_id) if user.tenant_id else "Single User Tenant", } | config, ) @@ -42,6 +43,7 @@ async def run_tasks_with_telemetry( additional_properties={ "pipeline_name": str(pipeline_name), "cognee_version": cognee_version, + "tenant_id": str(user.tenant_id) if user.tenant_id else "Single User Tenant", } | config, ) @@ -58,6 +60,7 @@ async def run_tasks_with_telemetry( additional_properties={ "pipeline_name": str(pipeline_name), "cognee_version": cognee_version, + "tenant_id": str(user.tenant_id) if user.tenant_id else "Single User Tenant", } | config, ) diff --git a/cognee/modules/retrieval/chunks_retriever.py b/cognee/modules/retrieval/chunks_retriever.py index 94b9d3fb9..be1f95811 100644 --- a/cognee/modules/retrieval/chunks_retriever.py +++ b/cognee/modules/retrieval/chunks_retriever.py @@ -1,10 +1,11 @@ from typing import Any, Optional - +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError +from datetime import datetime, timezone logger = get_logger("ChunksRetriever") @@ -27,21 +28,16 @@ class ChunksRetriever(BaseRetriever): ): self.top_k = top_k - async def get_context(self, query: str) -> Any: + async def get_context(self, query: str) -> Any: """ Retrieves document chunks context based on the query. - Searches for document chunks relevant to the specified query using a vector engine. Raises a NoDataError if no data is found in the system. - Parameters: ----------- - - query (str): The query string to search for relevant document chunks. - Returns: -------- - - Any: A list of document chunk payloads retrieved from the search. """ logger.info( @@ -53,13 +49,14 @@ class ChunksRetriever(BaseRetriever): try: found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k) logger.info(f"Found {len(found_chunks)} chunks from vector search") + await update_node_access_timestamps(found_chunks) + except CollectionNotFoundError as error: logger.error("DocumentChunk_text collection not found in vector database") raise NoDataError("No data found in the system, please add data first.") from error chunk_payloads = [result.payload for result in found_chunks] logger.info(f"Returning {len(chunk_payloads)} chunk payloads") - return chunk_payloads async def get_completion( self, query: str, context: Optional[Any] = None, session_id: Optional[str] = None diff --git a/cognee/modules/retrieval/completion_retriever.py b/cognee/modules/retrieval/completion_retriever.py index bb568924d..fc8ef747f 100644 --- a/cognee/modules/retrieval/completion_retriever.py +++ b/cognee/modules/retrieval/completion_retriever.py @@ -8,6 +8,7 @@ from cognee.modules.retrieval.utils.session_cache import ( save_conversation_history, get_conversation_history, ) +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError from cognee.infrastructure.databases.vector.exceptions import CollectionNotFoundError @@ -65,7 +66,7 @@ class CompletionRetriever(BaseRetriever): if len(found_chunks) == 0: return "" - + await update_node_access_timestamps(found_chunks) # Combine all chunks text returned from vector search (number of chunks is determined by top_k chunks_payload = [found_chunk.payload["text"] for found_chunk in found_chunks] combined_context = "\n".join(chunks_payload) diff --git a/cognee/modules/retrieval/graph_completion_retriever.py b/cognee/modules/retrieval/graph_completion_retriever.py index b7ab4edae..122cc943f 100644 --- a/cognee/modules/retrieval/graph_completion_retriever.py +++ b/cognee/modules/retrieval/graph_completion_retriever.py @@ -16,11 +16,13 @@ from cognee.modules.retrieval.utils.session_cache import ( ) from cognee.shared.logging_utils import get_logger from cognee.modules.retrieval.utils.extract_uuid_from_node import extract_uuid_from_node +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.modules.retrieval.utils.models import CogneeUserInteraction from cognee.modules.engine.models.node_set import NodeSet from cognee.infrastructure.databases.graph import get_graph_engine from cognee.context_global_variables import session_user from cognee.infrastructure.databases.cache.config import CacheConfig +from cognee.modules.graph.utils import get_entity_nodes_from_triplets logger = get_logger("GraphCompletionRetriever") @@ -139,6 +141,9 @@ class GraphCompletionRetriever(BaseGraphRetriever): # context = await self.resolve_edges_to_text(triplets) + entity_nodes = get_entity_nodes_from_triplets(triplets) + + await update_node_access_timestamps(entity_nodes) return triplets async def get_completion( diff --git a/cognee/modules/retrieval/summaries_retriever.py b/cognee/modules/retrieval/summaries_retriever.py index 87b224946..0df750d22 100644 --- a/cognee/modules/retrieval/summaries_retriever.py +++ b/cognee/modules/retrieval/summaries_retriever.py @@ -4,6 +4,7 @@ from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError logger = get_logger("SummariesRetriever") @@ -54,6 +55,9 @@ class SummariesRetriever(BaseRetriever): "TextSummary_text", query, limit=self.top_k ) logger.info(f"Found {len(summaries_results)} summaries from vector search") + + await update_node_access_timestamps(summaries_results) + except CollectionNotFoundError as error: logger.error("TextSummary_text collection not found in vector database") raise NoDataError("No data found in the system, please add data first.") from error diff --git a/cognee/modules/retrieval/temporal_retriever.py b/cognee/modules/retrieval/temporal_retriever.py index 8ef5eed69..ec68d37bb 100644 --- a/cognee/modules/retrieval/temporal_retriever.py +++ b/cognee/modules/retrieval/temporal_retriever.py @@ -1,7 +1,7 @@ import os import asyncio from typing import Any, Optional, List, Type - +from datetime import datetime from operator import itemgetter from cognee.infrastructure.databases.vector import get_vector_engine @@ -79,7 +79,11 @@ class TemporalRetriever(GraphCompletionRetriever): else: base_directory = None - system_prompt = render_prompt(prompt_path, {}, base_directory=base_directory) + time_now = datetime.now().strftime("%d-%m-%Y") + + system_prompt = render_prompt( + prompt_path, {"time_now": time_now}, base_directory=base_directory + ) interval = await LLMGateway.acreate_structured_output(query, system_prompt, QueryInterval) @@ -108,8 +112,6 @@ class TemporalRetriever(GraphCompletionRetriever): graph_engine = await get_graph_engine() - triplets = [] - if time_from and time_to: ids = await graph_engine.collect_time_ids(time_from=time_from, time_to=time_to) elif time_from: diff --git a/cognee/modules/retrieval/utils/access_tracking.py b/cognee/modules/retrieval/utils/access_tracking.py new file mode 100644 index 000000000..65d597a93 --- /dev/null +++ b/cognee/modules/retrieval/utils/access_tracking.py @@ -0,0 +1,95 @@ +"""Utilities for tracking data access in retrievers.""" + +import json +from datetime import datetime, timezone +from typing import List, Any +from uuid import UUID + +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.data.models import Data +from cognee.shared.logging_utils import get_logger +from sqlalchemy import update + +logger = get_logger(__name__) + + +async def update_node_access_timestamps(items: List[Any]): + """ + Update last_accessed_at for nodes in graph database and corresponding Data records in SQL. + + This function: + 1. Updates last_accessed_at in the graph database nodes (in properties JSON) + 2. Traverses to find origin TextDocument nodes (without hardcoded relationship names) + 3. Updates last_accessed in the SQL Data table for those documents + + Parameters + ---------- + items : List[Any] + List of items with payload containing 'id' field (from vector search results) + """ + if not items: + return + + graph_engine = await get_graph_engine() + timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000) + timestamp_dt = datetime.now(timezone.utc) + + # Extract node IDs + node_ids = [] + for item in items: + item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id") + if item_id: + node_ids.append(str(item_id)) + + if not node_ids: + return + + try: + # Step 1: Batch update graph nodes + for node_id in node_ids: + result = await graph_engine.query( + "MATCH (n:Node {id: $id}) RETURN n.properties", + {"id": node_id} + ) + + if result and result[0]: + props = json.loads(result[0][0]) if result[0][0] else {} + props["last_accessed_at"] = timestamp_ms + + await graph_engine.query( + "MATCH (n:Node {id: $id}) SET n.properties = $props", + {"id": node_id, "props": json.dumps(props)} + ) + + logger.debug(f"Updated access timestamps for {len(node_ids)} graph nodes") + + # Step 2: Find origin TextDocument nodes (without hardcoded relationship names) + origin_query = """ + UNWIND $node_ids AS node_id + MATCH (chunk:Node {id: node_id})-[e:EDGE]-(doc:Node) + WHERE chunk.type = 'DocumentChunk' AND doc.type IN ['TextDocument', 'Document'] + RETURN DISTINCT doc.id + """ + + result = await graph_engine.query(origin_query, {"node_ids": node_ids}) + + # Extract and deduplicate document IDs + doc_ids = list(set([row[0] for row in result if row and row[0]])) if result else [] + + # Step 3: Update SQL Data table + if doc_ids: + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + stmt = update(Data).where( + Data.id.in_([UUID(doc_id) for doc_id in doc_ids]) + ).values(last_accessed=timestamp_dt) + + await session.execute(stmt) + await session.commit() + + logger.debug(f"Updated last_accessed for {len(doc_ids)} Data records in SQL") + + except Exception as e: + logger.error(f"Failed to update timestamps: {e}") + raise diff --git a/cognee/modules/search/methods/search.py b/cognee/modules/search/methods/search.py index 93c0ef5c8..aab004924 100644 --- a/cognee/modules/search/methods/search.py +++ b/cognee/modules/search/methods/search.py @@ -67,7 +67,10 @@ async def search( send_telemetry( "cognee.search EXECUTION STARTED", user.id, - additional_properties={"cognee_version": cognee_version}, + additional_properties={ + "cognee_version": cognee_version, + "tenant_id": str(user.tenant_id) if user.tenant_id else "Single User Tenant", + }, ) # Use search function filtered by permissions if access control is enabled @@ -108,7 +111,10 @@ async def search( send_telemetry( "cognee.search EXECUTION COMPLETED", user.id, - additional_properties={"cognee_version": cognee_version}, + additional_properties={ + "cognee_version": cognee_version, + "tenant_id": str(user.tenant_id) if user.tenant_id else "Single User Tenant", + }, ) await log_result( diff --git a/cognee/modules/visualization/cognee_network_visualization.py b/cognee/modules/visualization/cognee_network_visualization.py index c735e70f1..3bf5ea8e8 100644 --- a/cognee/modules/visualization/cognee_network_visualization.py +++ b/cognee/modules/visualization/cognee_network_visualization.py @@ -16,17 +16,17 @@ async def cognee_network_visualization(graph_data, destination_file_path: str = nodes_list = [] color_map = { - "Entity": "#f47710", - "EntityType": "#6510f4", - "DocumentChunk": "#801212", - "TextSummary": "#1077f4", - "TableRow": "#f47710", - "TableType": "#6510f4", - "ColumnValue": "#13613a", - "SchemaTable": "#f47710", - "DatabaseSchema": "#6510f4", - "SchemaRelationship": "#13613a", - "default": "#D3D3D3", + "Entity": "#5C10F4", + "EntityType": "#A550FF", + "DocumentChunk": "#0DFF00", + "TextSummary": "#5C10F4", + "TableRow": "#A550FF", + "TableType": "#5C10F4", + "ColumnValue": "#757470", + "SchemaTable": "#A550FF", + "DatabaseSchema": "#5C10F4", + "SchemaRelationship": "#323332", + "default": "#D8D8D8", } for node_id, node_info in nodes_data: @@ -98,16 +98,19 @@ async def cognee_network_visualization(graph_data, destination_file_path: str = +
+
Hover a node or edge to inspect details
@@ -305,8 +678,12 @@ async def cognee_network_visualization(graph_data, destination_file_path: str = """ - html_content = html_template.replace("{nodes}", json.dumps(nodes_list)) - html_content = html_content.replace("{links}", json.dumps(links_list)) + # Safely embed JSON inside