Compare commits

...
Sign in to create a new pull request.

11 commits

Author SHA1 Message Date
Vasilije
8909f788f5
Merge branch 'dev' into feat/configurable-path-exclusion 2025-08-02 19:58:44 +02:00
EricXiao
fc7a91d991
feature: implement FEELING_LUCKY search type (#1178)
<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->
This PR implements the 'FEELING_LUCKY' search type, which intelligently
routes user queries to the most appropriate search retriever, addressing
[#1162](https://github.com/topoteretes/cognee/issues/1162).

- implement new search type FEELING_LUCKY
- Add the select_search_type function to analyze queries and choose the
proper search type
- Integrate with an LLM for intelligent search type determination
- Add logging for the search type selection process
- Support fallback to RAG_COMPLETION when the LLM selection fails
- Add tests for the new search type

## How it works
When a user selects the 'FEELING_LUCKY' search type, the system first
sends their natural language query to an LLM-based classifier. This
classifier analyzes the query's intent (e.g., is it asking for a
relationship, a summary, or a factual answer?) and selects the optimal
SearchType, such as 'INSIGHTS' or 'GRAPH_COMPLETION'. The main search
function then proceeds using this dynamically selected type. If the
classification process fails, it gracefully falls back to the default
'RAG_COMPLETION' type.

## Testing
Tests can be run with:
```bash
python -m pytest cognee/tests/unit/modules/search/search_methods_test.py -k "feeling_lucky" -v
```

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.

Signed-off-by: EricXiao <taoiaox@gmail.com>
2025-08-02 16:30:08 +02:00
Igor Ilic
9faa47fc5a
feat: add default tokenizer in case hugging face is not available (#1177)
<!-- .github/pull_request_template.md -->

## Description
Add default tokenizer for custom models not available on HuggingFace

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
2025-08-01 16:37:53 +02:00
Igor Ilic
5b6e946c43
fix: Add async lock for dynamic vector table creation (#1175)
<!-- .github/pull_request_template.md -->

## Description
Add async lock for dynamic table creation

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
2025-08-01 15:12:04 +02:00
Boris Arzentar
cd930ed646
Merge remote-tracking branch 'origin/main' into dev 2025-07-30 11:30:38 +02:00
Igor Ilic
14ba3e8829
feat: Enable async execution of data items for incremental loading (#1092)
<!-- .github/pull_request_template.md -->

## Description
Attempt at making incremental loading run async

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
2025-07-29 10:39:31 -04:00
hajdul88
f78af0cec3
feature: solve edge embedding duplicates in edge collection + retriever optimization (#1151)
<!-- .github/pull_request_template.md -->

## Description
feature: solve edge embedding duplicates in edge collection + retriever
optimization

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.

---------

Co-authored-by: Vasilije <8619304+Vasilije1990@users.noreply.github.com>
2025-07-29 12:35:38 +02:00
Boris
4ea4b100ab
fix: datasets status (#1166)
<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
2025-07-28 23:32:40 +02:00
Boris Arzentar
961fa5ec45
chore: update uv.lock file 2025-07-28 23:23:35 +02:00
Boris Arzentar
9793cd56ad
version: 0.2.2.dev0 2025-07-28 23:20:21 +02:00
Boris Arzentar
6773121904
fix: datasets status without datasets parameter 2025-07-28 23:19:36 +02:00
35 changed files with 598 additions and 80 deletions

View file

@ -15,6 +15,7 @@ async def add(
vector_db_config: dict = None, vector_db_config: dict = None,
graph_db_config: dict = None, graph_db_config: dict = None,
dataset_id: Optional[UUID] = None, dataset_id: Optional[UUID] = None,
incremental_loading: bool = True,
): ):
""" """
Add data to Cognee for knowledge graph processing. Add data to Cognee for knowledge graph processing.
@ -153,6 +154,7 @@ async def add(
pipeline_name="add_pipeline", pipeline_name="add_pipeline",
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
): ):
pipeline_run_info = run_info pipeline_run_info = run_info

View file

@ -11,6 +11,7 @@ from typing import List, Optional, Union, Literal
from cognee.modules.users.models import User from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user from cognee.modules.users.methods import get_authenticated_user
from cognee.shared.utils import send_telemetry from cognee.shared.utils import send_telemetry
from cognee.modules.pipelines.models import PipelineRunErrored
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
logger = get_logger() logger = get_logger()
@ -100,6 +101,8 @@ def get_add_router() -> APIRouter:
else: else:
add_run = await cognee_add(data, datasetName, user=user, dataset_id=datasetId) add_run = await cognee_add(data, datasetName, user=user, dataset_id=datasetId)
if isinstance(add_run, PipelineRunErrored):
return JSONResponse(status_code=420, content=add_run.model_dump(mode="json"))
return add_run.model_dump() return add_run.model_dump()
except Exception as error: except Exception as error:
return JSONResponse(status_code=409, content={"error": str(error)}) return JSONResponse(status_code=409, content={"error": str(error)})

View file

@ -103,7 +103,9 @@ async def run_code_graph_pipeline(repo_path, include_docs=False, excluded_paths=
async for run_status in non_code_pipeline_run: async for run_status in non_code_pipeline_run:
yield run_status yield run_status
async for run_status in run_tasks(tasks, dataset.id, repo_path, user, "cognify_code_pipeline"): async for run_status in run_tasks(
tasks, dataset.id, repo_path, user, "cognify_code_pipeline", incremental_loading=False
):
yield run_status yield run_status

View file

@ -39,6 +39,7 @@ async def cognify(
vector_db_config: dict = None, vector_db_config: dict = None,
graph_db_config: dict = None, graph_db_config: dict = None,
run_in_background: bool = False, run_in_background: bool = False,
incremental_loading: bool = True,
): ):
""" """
Transform ingested data into a structured knowledge graph. Transform ingested data into a structured knowledge graph.
@ -194,6 +195,7 @@ async def cognify(
datasets=datasets, datasets=datasets,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
) )
else: else:
return await run_cognify_blocking( return await run_cognify_blocking(
@ -202,6 +204,7 @@ async def cognify(
datasets=datasets, datasets=datasets,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
) )
@ -211,6 +214,7 @@ async def run_cognify_blocking(
datasets, datasets,
graph_db_config: dict = None, graph_db_config: dict = None,
vector_db_config: dict = False, vector_db_config: dict = False,
incremental_loading: bool = True,
): ):
total_run_info = {} total_run_info = {}
@ -221,6 +225,7 @@ async def run_cognify_blocking(
pipeline_name="cognify_pipeline", pipeline_name="cognify_pipeline",
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
incremental_loading=incremental_loading,
): ):
if run_info.dataset_id: if run_info.dataset_id:
total_run_info[run_info.dataset_id] = run_info total_run_info[run_info.dataset_id] = run_info
@ -236,6 +241,7 @@ async def run_cognify_as_background_process(
datasets, datasets,
graph_db_config: dict = None, graph_db_config: dict = None,
vector_db_config: dict = False, vector_db_config: dict = False,
incremental_loading: bool = True,
): ):
# Convert dataset to list if it's a string # Convert dataset to list if it's a string
if isinstance(datasets, str): if isinstance(datasets, str):
@ -246,6 +252,7 @@ async def run_cognify_as_background_process(
async def handle_rest_of_the_run(pipeline_list): async def handle_rest_of_the_run(pipeline_list):
# Execute all provided pipelines one by one to avoid database write conflicts # Execute all provided pipelines one by one to avoid database write conflicts
# TODO: Convert to async gather task instead of for loop when Queue mechanism for database is created
for pipeline in pipeline_list: for pipeline in pipeline_list:
while True: while True:
try: try:
@ -270,6 +277,7 @@ async def run_cognify_as_background_process(
pipeline_name="cognify_pipeline", pipeline_name="cognify_pipeline",
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
incremental_loading=incremental_loading,
) )
# Save dataset Pipeline run started info # Save dataset Pipeline run started info

View file

@ -16,7 +16,11 @@ from cognee.modules.graph.methods import get_formatted_graph_data
from cognee.modules.users.get_user_manager import get_user_manager_context from cognee.modules.users.get_user_manager import get_user_manager_context
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.users.authentication.default.default_jwt_strategy import DefaultJWTStrategy from cognee.modules.users.authentication.default.default_jwt_strategy import DefaultJWTStrategy
from cognee.modules.pipelines.models.PipelineRunInfo import PipelineRunCompleted, PipelineRunInfo from cognee.modules.pipelines.models.PipelineRunInfo import (
PipelineRunCompleted,
PipelineRunInfo,
PipelineRunErrored,
)
from cognee.modules.pipelines.queues.pipeline_run_info_queues import ( from cognee.modules.pipelines.queues.pipeline_run_info_queues import (
get_from_queue, get_from_queue,
initialize_queue, initialize_queue,
@ -105,6 +109,9 @@ def get_cognify_router() -> APIRouter:
datasets, user, run_in_background=payload.run_in_background datasets, user, run_in_background=payload.run_in_background
) )
# If any cognify run errored return JSONResponse with proper error status code
if any(isinstance(v, PipelineRunErrored) for v in cognify_run.values()):
return JSONResponse(status_code=420, content=cognify_run)
return cognify_run return cognify_run
except Exception as error: except Exception as error:
return JSONResponse(status_code=409, content={"error": str(error)}) return JSONResponse(status_code=409, content={"error": str(error)})

View file

@ -353,7 +353,7 @@ def get_datasets_router() -> APIRouter:
@router.get("/status", response_model=dict[str, PipelineRunStatus]) @router.get("/status", response_model=dict[str, PipelineRunStatus])
async def get_dataset_status( async def get_dataset_status(
datasets: Annotated[List[UUID], Query(alias="dataset")] = None, datasets: Annotated[List[UUID], Query(alias="dataset")] = [],
user: User = Depends(get_authenticated_user), user: User = Depends(get_authenticated_user),
): ):
""" """

View file

@ -71,6 +71,12 @@ async def search(
Best for: Advanced users, specific graph traversals, debugging. Best for: Advanced users, specific graph traversals, debugging.
Returns: Raw graph query results. Returns: Raw graph query results.
**FEELING_LUCKY**:
Intelligently selects and runs the most appropriate search type.
Best for: General-purpose queries or when you're unsure which search type is best.
Returns: The results from the automatically selected search type.
Args: Args:
query_text: Your question or search query in natural language. query_text: Your question or search query in natural language.
Examples: Examples:
@ -119,6 +125,9 @@ async def search(
**CODE**: **CODE**:
[List of structured code information with context] [List of structured code information with context]
**FEELING_LUCKY**:
[List of results in the format of the search type that is automatically selected]
@ -130,6 +139,7 @@ async def search(
- **CHUNKS**: Fastest, pure vector similarity search without LLM - **CHUNKS**: Fastest, pure vector similarity search without LLM
- **SUMMARIES**: Fast, returns pre-computed summaries - **SUMMARIES**: Fast, returns pre-computed summaries
- **CODE**: Medium speed, specialized for code understanding - **CODE**: Medium speed, specialized for code understanding
- **FEELING_LUCKY**: Variable speed, uses LLM + search type selection intelligently
- **top_k**: Start with 10, increase for comprehensive analysis (max 100) - **top_k**: Start with 10, increase for comprehensive analysis (max 100)
- **datasets**: Specify datasets to improve speed and relevance - **datasets**: Specify datasets to improve speed and relevance

View file

@ -177,7 +177,12 @@ class LiteLLMEmbeddingEngine(EmbeddingEngine):
elif "mistral" in self.provider.lower(): elif "mistral" in self.provider.lower():
tokenizer = MistralTokenizer(model=model, max_tokens=self.max_tokens) tokenizer = MistralTokenizer(model=model, max_tokens=self.max_tokens)
else: else:
tokenizer = HuggingFaceTokenizer(model=self.model, max_tokens=self.max_tokens) try:
tokenizer = HuggingFaceTokenizer(model=self.model, max_tokens=self.max_tokens)
except Exception as e:
logger.warning(f"Could not get tokenizer from HuggingFace due to: {e}")
logger.info("Switching to TikToken default tokenizer.")
tokenizer = TikTokenTokenizer(model=None, max_tokens=self.max_tokens)
logger.debug(f"Tokenizer loaded for model: {self.model}") logger.debug(f"Tokenizer loaded for model: {self.model}")
return tokenizer return tokenizer

View file

@ -0,0 +1,130 @@
You are an expert query analyzer for a **GraphRAG system**. Your primary goal is to analyze a user's query and select the single most appropriate `SearchType` tool to answer it.
Here are the available `SearchType` tools and their specific functions:
- **`SUMMARIES`**: The `SUMMARIES` search type retrieves summarized information from the knowledge graph.
**Best for:**
- Getting concise overviews of topics
- Summarizing large amounts of information
- Quick understanding of complex subjects
* **`INSIGHTS`**: The `INSIGHTS` search type discovers connections and relationships between entities in the knowledge graph.
**Best for:**
- Discovering how entities are connected
- Understanding relationships between concepts
- Exploring the structure of your knowledge graph
* **`CHUNKS`**: The `CHUNKS` search type retrieves specific facts and information chunks from the knowledge graph.
**Best for:**
- Finding specific facts
- Getting direct answers to questions
- Retrieving precise information
* **`RAG_COMPLETION`**: Use for direct factual questions that can likely be answered by retrieving a specific text passage from a document. It does not use the graph's relationship structure.
**Best for:**
- Getting detailed explanations or comprehensive answers
- Combining multiple pieces of information
- Getting a single, coherent answer that is generated from relevant text passages
* **`GRAPH_COMPLETION`**: The `GRAPH_COMPLETION` search type leverages the graph structure to provide more contextually aware completions.
**Best for:**
- Complex queries requiring graph traversal
- Questions that benefit from understanding relationships
- Queries where context from connected entities matters
* **`GRAPH_SUMMARY_COMPLETION`**: The `GRAPH_SUMMARY_COMPLETION` search type combines graph traversal with summarization to provide concise but comprehensive answers.
**Best for:**
- Getting summarized information that requires understanding relationships
- Complex topics that need concise explanations
- Queries that benefit from both graph structure and summarization
* **`GRAPH_COMPLETION_COT`**: The `GRAPH_COMPLETION_COT` search type combines graph traversal with chain of thought to provide answers to complex multi hop questions.
**Best for:**
- Multi-hop questions that require following several linked concepts or entities
- Tracing relational paths in a knowledge graph while also getting clear step-by-step reasoning
- Summarizing completx linkages into a concise, human-readable answer once all hops have been explored
* **`GRAPH_COMPLETION_CONTEXT_EXTENSION`**: The `GRAPH_COMPLETION_CONTEXT_EXTENSION` search type combines graph traversal with multi-round context extension.
**Best for:**
- Iterative, multi-hop queries where intermediate facts arent all present upfront
- Complex linkages that benefit from multi-round “search → extend context → reason” loops to uncover deep connections.
- Sparse or evolving graphs that require on-the-fly expansion—issuing follow-up searches to discover missing nodes or properties
* **`CODE`**: The `CODE` search type is specialized for retrieving and understanding code-related information from the knowledge graph.
**Best for:**
- Code-related queries
- Programming examples and patterns
- Technical documentation searches
* **`CYPHER`**: The `CYPHER` search type allows user to execute raw Cypher queries directly against your graph database.
**Best for:**
- Executing precise graph queries with full control
- Leveraging Cypher features and functions
- Getting raw data directly from the graph database
* **`NATURAL_LANGUAGE`**: The `NATURAL_LANGUAGE` search type translates a natural language question into a precise Cypher query that is executed directly against the graph database.
**Best for:**
- Getting precise, structured answers from the graph using natural language.
- Performing advanced graph operations like filtering and aggregating data using natural language.
- Asking precise, database-style questions without needing to write Cypher.
**Examples:**
Query: "Summarize the key findings from these research papers"
Response: `SUMMARIES`
Query: "What is the relationship between the methodologies used in these papers?"
Response: `INSIGHTS`
Query: "When was Einstein born?"
Response: `CHUNKS`
Query: "Explain Einstein's contributions to physics"
Response: `RAG_COMPLETION`
Query: "Provide a comprehensive analysis of how these papers contribute to the field"
Response: `GRAPH_COMPLETION`
Query: "Explain the overall architecture of this codebase"
Response: `GRAPH_SUMMARY_COMPLETION`
Query: "Who was the father of the person who invented the lightbulb"
Response: `GRAPH_COMPLETION_COT`
Query: "What county was XY born in"
Response: `GRAPH_COMPLETION_CONTEXT_EXTENSION`
Query: "How to implement authentication in this codebase"
Response: `CODE`
Query: "MATCH (n) RETURN labels(n) as types, n.name as name LIMIT 10"
Response: `CYPHER`
Query: "Get all nodes connected to John"
Response: `NATURAL_LANGUAGE`
Your response MUST be a single word, consisting of only the chosen `SearchType` name. Do not provide any explanation.

View file

@ -1,4 +1,4 @@
from typing import List, Any from typing import List, Any, Optional
import tiktoken import tiktoken
from ..tokenizer_interface import TokenizerInterface from ..tokenizer_interface import TokenizerInterface
@ -12,13 +12,17 @@ class TikTokenTokenizer(TokenizerInterface):
def __init__( def __init__(
self, self,
model: str, model: Optional[str] = None,
max_tokens: int = 8191, max_tokens: int = 8191,
): ):
self.model = model self.model = model
self.max_tokens = max_tokens self.max_tokens = max_tokens
# Initialize TikToken for GPT based on model # Initialize TikToken for GPT based on model
self.tokenizer = tiktoken.encoding_for_model(self.model) if model:
self.tokenizer = tiktoken.encoding_for_model(self.model)
else:
# Use default if model not provided
self.tokenizer = tiktoken.get_encoding("cl100k_base")
def extract_tokens(self, text: str) -> List[Any]: def extract_tokens(self, text: str) -> List[Any]:
""" """

View file

@ -1,6 +1,7 @@
from datetime import datetime, timezone from datetime import datetime, timezone
from uuid import uuid4 from uuid import uuid4
from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from cognee.infrastructure.databases.relational import Base from cognee.infrastructure.databases.relational import Base
@ -21,7 +22,11 @@ class Data(Base):
tenant_id = Column(UUID, index=True, nullable=True) tenant_id = Column(UUID, index=True, nullable=True)
content_hash = Column(String) content_hash = Column(String)
external_metadata = Column(JSON) external_metadata = Column(JSON)
node_set = Column(JSON, nullable=True) # Store NodeSet as JSON list of strings # Store NodeSet as JSON list of strings
node_set = Column(JSON, nullable=True)
# MutableDict allows SQLAlchemy to notice key-value pair changes, without it changing a value for a key
# wouldn't be noticed when commiting a database session
pipeline_status = Column(MutableDict.as_mutable(JSON))
token_count = Column(Integer) token_count = Column(Integer)
data_size = Column(Integer, nullable=True) # File size in bytes data_size = Column(Integer, nullable=True) # File size in bytes
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))

View file

@ -5,7 +5,6 @@ from cognee.modules.chunking.Chunker import Chunker
from cognee.infrastructure.files.utils.open_data_file import open_data_file from cognee.infrastructure.files.utils.open_data_file import open_data_file
from .Document import Document from .Document import Document
from .exceptions.exceptions import PyPdfInternalError
logger = get_logger("PDFDocument") logger = get_logger("PDFDocument")
@ -17,18 +16,12 @@ class PdfDocument(Document):
async with open_data_file(self.raw_data_location, mode="rb") as stream: async with open_data_file(self.raw_data_location, mode="rb") as stream:
logger.info(f"Reading PDF: {self.raw_data_location}") logger.info(f"Reading PDF: {self.raw_data_location}")
try: file = PdfReader(stream, strict=False)
file = PdfReader(stream, strict=False)
except Exception:
raise PyPdfInternalError()
async def get_text(): async def get_text():
try: for page in file.pages:
for page in file.pages: page_text = page.extract_text()
page_text = page.extract_text() yield page_text
yield page_text
except Exception:
raise PyPdfInternalError()
chunker = chunker_cls(self, get_text=get_text, max_chunk_size=max_chunk_size) chunker = chunker_cls(self, get_text=get_text, max_chunk_size=max_chunk_size)

View file

@ -0,0 +1,5 @@
from uuid import NAMESPACE_OID, uuid5
def generate_edge_id(edge_id: str) -> str:
return uuid5(NAMESPACE_OID, edge_id.lower().replace(" ", "_").replace("'", ""))

View file

@ -170,28 +170,19 @@ class CogneeGraph(CogneeAbstractGraph):
for edge in self.edges: for edge in self.edges:
relationship_type = edge.attributes.get("relationship_type") relationship_type = edge.attributes.get("relationship_type")
if relationship_type and relationship_type in embedding_map: distance = embedding_map.get(relationship_type, None)
edge.attributes["vector_distance"] = embedding_map[relationship_type] if distance is not None:
edge.attributes["vector_distance"] = distance
except Exception as ex: except Exception as ex:
logger.error(f"Error mapping vector distances to edges: {str(ex)}") logger.error(f"Error mapping vector distances to edges: {str(ex)}")
raise ex raise ex
async def calculate_top_triplet_importances(self, k: int) -> List: async def calculate_top_triplet_importances(self, k: int) -> List:
min_heap = [] def score(edge):
n1 = edge.node1.attributes.get("vector_distance", 1)
n2 = edge.node2.attributes.get("vector_distance", 1)
e = edge.attributes.get("vector_distance", 1)
return n1 + n2 + e
for i, edge in enumerate(self.edges): return heapq.nsmallest(k, self.edges, key=score)
source_node = self.get_node(edge.node1.id)
target_node = self.get_node(edge.node2.id)
source_distance = source_node.attributes.get("vector_distance", 1) if source_node else 1
target_distance = target_node.attributes.get("vector_distance", 1) if target_node else 1
edge_distance = edge.attributes.get("vector_distance", 1)
total_distance = source_distance + target_distance + edge_distance
heapq.heappush(min_heap, (-total_distance, i, edge))
if len(min_heap) > k:
heapq.heappop(min_heap)
return [edge for _, _, edge in sorted(min_heap)]

View file

@ -0,0 +1 @@
from .exceptions import PipelineRunFailedError

View file

@ -0,0 +1,12 @@
from cognee.exceptions import CogneeApiError
from fastapi import status
class PipelineRunFailedError(CogneeApiError):
def __init__(
self,
message: str = "Pipeline run failed.",
name: str = "PipelineRunFailedError",
status_code: int = status.HTTP_422_UNPROCESSABLE_ENTITY,
):
super().__init__(message, name, status_code)

View file

@ -0,0 +1,5 @@
import enum
class DataItemStatus(str, enum.Enum):
DATA_ITEM_PROCESSING_COMPLETED = "DATA_ITEM_PROCESSING_COMPLETED"

View file

@ -9,6 +9,7 @@ class PipelineRunInfo(BaseModel):
dataset_id: UUID dataset_id: UUID
dataset_name: str dataset_name: str
payload: Optional[Any] = None payload: Optional[Any] = None
data_ingestion_info: Optional[list] = None
model_config = { model_config = {
"arbitrary_types_allowed": True, "arbitrary_types_allowed": True,
@ -30,6 +31,11 @@ class PipelineRunCompleted(PipelineRunInfo):
pass pass
class PipelineRunAlreadyCompleted(PipelineRunInfo):
status: str = "PipelineRunAlreadyCompleted"
pass
class PipelineRunErrored(PipelineRunInfo): class PipelineRunErrored(PipelineRunInfo):
status: str = "PipelineRunErrored" status: str = "PipelineRunErrored"
pass pass

View file

@ -6,3 +6,4 @@ from .PipelineRunInfo import (
PipelineRunCompleted, PipelineRunCompleted,
PipelineRunErrored, PipelineRunErrored,
) )
from .DataItemStatus import DataItemStatus

View file

@ -52,6 +52,7 @@ async def cognee_pipeline(
pipeline_name: str = "custom_pipeline", pipeline_name: str = "custom_pipeline",
vector_db_config: dict = None, vector_db_config: dict = None,
graph_db_config: dict = None, graph_db_config: dict = None,
incremental_loading: bool = True,
): ):
# Note: These context variables allow different value assignment for databases in Cognee # Note: These context variables allow different value assignment for databases in Cognee
# per async task, thread, process and etc. # per async task, thread, process and etc.
@ -106,6 +107,7 @@ async def cognee_pipeline(
data=data, data=data,
pipeline_name=pipeline_name, pipeline_name=pipeline_name,
context={"dataset": dataset}, context={"dataset": dataset},
incremental_loading=incremental_loading,
): ):
yield run_info yield run_info
@ -117,6 +119,7 @@ async def run_pipeline(
data=None, data=None,
pipeline_name: str = "custom_pipeline", pipeline_name: str = "custom_pipeline",
context: dict = None, context: dict = None,
incremental_loading=True,
): ):
check_dataset_name(dataset.name) check_dataset_name(dataset.name)
@ -184,7 +187,9 @@ async def run_pipeline(
if not isinstance(task, Task): if not isinstance(task, Task):
raise ValueError(f"Task {task} is not an instance of Task") raise ValueError(f"Task {task} is not an instance of Task")
pipeline_run = run_tasks(tasks, dataset_id, data, user, pipeline_name, context) pipeline_run = run_tasks(
tasks, dataset_id, data, user, pipeline_name, context, incremental_loading
)
async for pipeline_run_info in pipeline_run: async for pipeline_run_info in pipeline_run:
yield pipeline_run_info yield pipeline_run_info

View file

@ -1,21 +1,31 @@
import os import os
import asyncio
from uuid import UUID from uuid import UUID
from typing import Any from typing import Any
from functools import wraps from functools import wraps
from sqlalchemy import select
import cognee.modules.ingestion as ingestion
from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed
from cognee.modules.users.models import User from cognee.modules.users.models import User
from cognee.modules.data.models import Data
from cognee.infrastructure.files.utils.open_data_file import open_data_file
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
from cognee.modules.users.methods import get_default_user from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines.utils import generate_pipeline_id from cognee.modules.pipelines.utils import generate_pipeline_id
from cognee.modules.pipelines.exceptions import PipelineRunFailedError
from cognee.tasks.ingestion import save_data_item_to_storage, resolve_data_directories
from cognee.modules.pipelines.models.PipelineRunInfo import ( from cognee.modules.pipelines.models.PipelineRunInfo import (
PipelineRunCompleted, PipelineRunCompleted,
PipelineRunErrored, PipelineRunErrored,
PipelineRunStarted, PipelineRunStarted,
PipelineRunYield, PipelineRunYield,
PipelineRunAlreadyCompleted,
) )
from cognee.modules.pipelines.models.DataItemStatus import DataItemStatus
from cognee.modules.pipelines.operations import ( from cognee.modules.pipelines.operations import (
log_pipeline_run_start, log_pipeline_run_start,
@ -56,34 +66,116 @@ async def run_tasks(
user: User = None, user: User = None,
pipeline_name: str = "unknown_pipeline", pipeline_name: str = "unknown_pipeline",
context: dict = None, context: dict = None,
incremental_loading: bool = True,
): ):
if not user: async def _run_tasks_data_item_incremental(
user = await get_default_user() data_item,
dataset,
tasks,
pipeline_name,
pipeline_id,
pipeline_run_id,
context,
user,
):
db_engine = get_relational_engine()
# If incremental_loading of data is set to True don't process documents already processed by pipeline
# If data is being added to Cognee for the first time calculate the id of the data
if not isinstance(data_item, Data):
file_path = await save_data_item_to_storage(data_item)
# Ingest data and add metadata
async with open_data_file(file_path) as file:
classified_data = ingestion.classify(file)
# data_id is the hash of file contents + owner id to avoid duplicate data
data_id = ingestion.identify(classified_data, user)
else:
# If data was already processed by Cognee get data id
data_id = data_item.id
# Get Dataset object # Check pipeline status, if Data already processed for pipeline before skip current processing
db_engine = get_relational_engine() async with db_engine.get_async_session() as session:
async with db_engine.get_async_session() as session: data_point = (
from cognee.modules.data.models import Dataset await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
if data_point:
if (
data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id))
== DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
):
yield {
"run_info": PipelineRunAlreadyCompleted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
),
"data_id": data_id,
}
return
dataset = await session.get(Dataset, dataset_id) try:
# Process data based on data_item and list of tasks
async for result in run_tasks_with_telemetry(
tasks=tasks,
data=[data_item],
user=user,
pipeline_name=pipeline_id,
context=context,
):
yield PipelineRunYield(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
payload=result,
)
pipeline_id = generate_pipeline_id(user.id, dataset.id, pipeline_name) # Update pipeline status for Data element
async with db_engine.get_async_session() as session:
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
data_point.pipeline_status[pipeline_name] = {
str(dataset.id): DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
}
await session.merge(data_point)
await session.commit()
pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset_id, data) yield {
"run_info": PipelineRunCompleted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
),
"data_id": data_id,
}
pipeline_run_id = pipeline_run.pipeline_run_id except Exception as error:
# Temporarily swallow error and try to process rest of documents first, then re-raise error at end of data ingestion pipeline
logger.error(
f"Exception caught while processing data: {error}.\n Data processing failed for data item: {data_item}."
)
yield {
"run_info": PipelineRunErrored(
pipeline_run_id=pipeline_run_id,
payload=repr(error),
dataset_id=dataset.id,
dataset_name=dataset.name,
),
"data_id": data_id,
}
yield PipelineRunStarted( async def _run_tasks_data_item_regular(
pipeline_run_id=pipeline_run_id, data_item,
dataset_id=dataset.id, dataset,
dataset_name=dataset.name, tasks,
payload=data, pipeline_id,
) pipeline_run_id,
context,
try: user,
):
# Process data based on data_item and list of tasks
async for result in run_tasks_with_telemetry( async for result in run_tasks_with_telemetry(
tasks=tasks, tasks=tasks,
data=data, data=[data_item],
user=user, user=user,
pipeline_name=pipeline_id, pipeline_name=pipeline_id,
context=context, context=context,
@ -95,6 +187,112 @@ async def run_tasks(
payload=result, payload=result,
) )
yield {
"run_info": PipelineRunCompleted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
)
}
async def _run_tasks_data_item(
data_item,
dataset,
tasks,
pipeline_name,
pipeline_id,
pipeline_run_id,
context,
user,
incremental_loading,
):
# Go through async generator and return data item processing result. Result can be PipelineRunAlreadyCompleted when data item is skipped,
# PipelineRunCompleted when processing was successful and PipelineRunErrored if there were issues
result = None
if incremental_loading:
async for result in _run_tasks_data_item_incremental(
data_item=data_item,
dataset=dataset,
tasks=tasks,
pipeline_name=pipeline_name,
pipeline_id=pipeline_id,
pipeline_run_id=pipeline_run_id,
context=context,
user=user,
):
pass
else:
async for result in _run_tasks_data_item_regular(
data_item=data_item,
dataset=dataset,
tasks=tasks,
pipeline_id=pipeline_id,
pipeline_run_id=pipeline_run_id,
context=context,
user=user,
):
pass
return result
if not user:
user = await get_default_user()
# Get Dataset object
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
from cognee.modules.data.models import Dataset
dataset = await session.get(Dataset, dataset_id)
pipeline_id = generate_pipeline_id(user.id, dataset.id, pipeline_name)
pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset_id, data)
pipeline_run_id = pipeline_run.pipeline_run_id
yield PipelineRunStarted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
payload=data,
)
try:
if not isinstance(data, list):
data = [data]
if incremental_loading:
data = await resolve_data_directories(data)
# Create async tasks per data item that will run the pipeline for the data item
data_item_tasks = [
asyncio.create_task(
_run_tasks_data_item(
data_item,
dataset,
tasks,
pipeline_name,
pipeline_id,
pipeline_run_id,
context,
user,
incremental_loading,
)
)
for data_item in data
]
results = await asyncio.gather(*data_item_tasks)
# Remove skipped data items from results
results = [result for result in results if result]
# If any data item could not be processed propagate error
errored_results = [
result for result in results if isinstance(result["run_info"], PipelineRunErrored)
]
if errored_results:
raise PipelineRunFailedError(
message="Pipeline run failed. Data item could not be processed."
)
await log_pipeline_run_complete( await log_pipeline_run_complete(
pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data
) )
@ -103,6 +301,7 @@ async def run_tasks(
pipeline_run_id=pipeline_run_id, pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id, dataset_id=dataset.id,
dataset_name=dataset.name, dataset_name=dataset.name,
data_ingestion_info=results,
) )
graph_engine = await get_graph_engine() graph_engine = await get_graph_engine()
@ -120,9 +319,14 @@ async def run_tasks(
yield PipelineRunErrored( yield PipelineRunErrored(
pipeline_run_id=pipeline_run_id, pipeline_run_id=pipeline_run_id,
payload=error, payload=repr(error),
dataset_id=dataset.id, dataset_id=dataset.id,
dataset_name=dataset.name, dataset_name=dataset.name,
data_ingestion_info=locals().get(
"results"
), # Returns results if they exist or returns None
) )
raise error # In case of error during incremental loading of data just let the user know the pipeline Errored, don't raise error
if not isinstance(error, PipelineRunFailedError):
raise error

View file

@ -27,7 +27,7 @@ from cognee.modules.users.models import User
from cognee.modules.data.models import Dataset from cognee.modules.data.models import Dataset
from cognee.shared.utils import send_telemetry from cognee.shared.utils import send_telemetry
from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets
from cognee.modules.search.operations import log_query, log_result from cognee.modules.search.operations import log_query, log_result, select_search_type
async def search( async def search(
@ -129,6 +129,10 @@ async def specific_search(
SearchType.NATURAL_LANGUAGE: NaturalLanguageRetriever().get_completion, SearchType.NATURAL_LANGUAGE: NaturalLanguageRetriever().get_completion,
} }
# If the query type is FEELING_LUCKY, select the search type intelligently
if query_type is SearchType.FEELING_LUCKY:
query_type = await select_search_type(query)
search_task = search_tasks.get(query_type) search_task = search_tasks.get(query_type)
if search_task is None: if search_task is None:

View file

@ -1,3 +1,4 @@
from .log_query import log_query from .log_query import log_query
from .log_result import log_result from .log_result import log_result
from .get_history import get_history from .get_history import get_history
from .select_search_type import select_search_type

View file

@ -0,0 +1,43 @@
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.infrastructure.llm.prompts import read_query_prompt
from cognee.modules.search.types import SearchType
from cognee.shared.logging_utils import get_logger
logger = get_logger("SearchTypeSelector")
async def select_search_type(
query: str,
system_prompt_path: str = "search_type_selector_prompt.txt",
) -> SearchType:
"""
Analyzes the query and Selects the best search type.
Args:
query: The query to analyze.
system_prompt_path: The path to the system prompt.
Returns:
The best search type given by the LLM.
"""
default_search_type = SearchType.RAG_COMPLETION
system_prompt = read_query_prompt(system_prompt_path)
llm_client = get_llm_client()
try:
response = await llm_client.acreate_structured_output(
text_input=query,
system_prompt=system_prompt,
response_model=str,
)
if response.upper() in SearchType.__members__:
logger.info(f"Selected lucky search type: {response.upper()}")
return SearchType(response.upper())
# If the response is not a valid search type, return the default search type
logger.info(f"LLM gives an invalid search type: {response.upper()}")
return default_search_type
except Exception as e:
logger.error(f"Failed to select search type intelligently from LLM: {str(e)}")
return default_search_type

View file

@ -13,3 +13,4 @@ class SearchType(Enum):
NATURAL_LANGUAGE = "NATURAL_LANGUAGE" NATURAL_LANGUAGE = "NATURAL_LANGUAGE"
GRAPH_COMPLETION_COT = "GRAPH_COMPLETION_COT" GRAPH_COMPLETION_COT = "GRAPH_COMPLETION_COT"
GRAPH_COMPLETION_CONTEXT_EXTENSION = "GRAPH_COMPLETION_CONTEXT_EXTENSION" GRAPH_COMPLETION_CONTEXT_EXTENSION = "GRAPH_COMPLETION_CONTEXT_EXTENSION"
FEELING_LUCKY = "FEELING_LUCKY"

View file

@ -8,7 +8,6 @@ from cognee.modules.data.models import Data
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.chunking.TextChunker import TextChunker from cognee.modules.chunking.TextChunker import TextChunker
from cognee.modules.chunking.Chunker import Chunker from cognee.modules.chunking.Chunker import Chunker
from cognee.modules.data.processing.document_types.exceptions.exceptions import PyPdfInternalError
async def update_document_token_count(document_id: UUID, token_count: int) -> None: async def update_document_token_count(document_id: UUID, token_count: int) -> None:
@ -40,15 +39,14 @@ async def extract_chunks_from_documents(
""" """
for document in documents: for document in documents:
document_token_count = 0 document_token_count = 0
try:
async for document_chunk in document.read(
max_chunk_size=max_chunk_size, chunker_cls=chunker
):
document_token_count += document_chunk.chunk_size
document_chunk.belongs_to_set = document.belongs_to_set
yield document_chunk
await update_document_token_count(document.id, document_token_count) async for document_chunk in document.read(
except PyPdfInternalError: max_chunk_size=max_chunk_size, chunker_cls=chunker
pass ):
document_token_count += document_chunk.chunk_size
document_chunk.belongs_to_set = document.belongs_to_set
yield document_chunk
await update_document_token_count(document.id, document_token_count)
# todo rita # todo rita

View file

@ -5,12 +5,12 @@ from uuid import UUID
from typing import Union, BinaryIO, Any, List, Optional from typing import Union, BinaryIO, Any, List, Optional
import cognee.modules.ingestion as ingestion import cognee.modules.ingestion as ingestion
from cognee.infrastructure.files.utils.open_data_file import open_data_file
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data from cognee.modules.data.models import Data
from cognee.modules.users.models import User from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user from cognee.modules.users.methods import get_default_user
from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets
from cognee.infrastructure.files.utils.open_data_file import open_data_file
from cognee.modules.data.methods import ( from cognee.modules.data.methods import (
get_authorized_existing_datasets, get_authorized_existing_datasets,
get_dataset_data, get_dataset_data,
@ -134,6 +134,7 @@ async def ingest_data(
node_set=json.dumps(node_set) if node_set else None, node_set=json.dumps(node_set) if node_set else None,
data_size=file_metadata["file_size"], data_size=file_metadata["file_size"],
tenant_id=user.tenant_id if user.tenant_id else None, tenant_id=user.tenant_id if user.tenant_id else None,
pipeline_status={},
token_count=-1, token_count=-1,
) )

View file

@ -40,6 +40,9 @@ async def resolve_data_directories(
if include_subdirectories: if include_subdirectories:
base_path = item if item.endswith("/") else item + "/" base_path = item if item.endswith("/") else item + "/"
s3_keys = fs.glob(base_path + "**") s3_keys = fs.glob(base_path + "**")
# If path is not directory attempt to add item directly
if not s3_keys:
s3_keys = fs.ls(item)
else: else:
s3_keys = fs.ls(item) s3_keys = fs.ls(item)
# Filter out keys that represent directories using fs.isdir # Filter out keys that represent directories using fs.isdir

View file

@ -107,6 +107,9 @@ async def get_repo_file_dependencies(
- excluded_paths: Optional custom exclusion list - excluded_paths: Optional custom exclusion list
""" """
if isinstance(repo_path, list) and len(repo_path) == 1:
repo_path = repo_path[0]
if not os.path.exists(repo_path): if not os.path.exists(repo_path):
raise FileNotFoundError(f"Repository path {repo_path} does not exist.") raise FileNotFoundError(f"Repository path {repo_path} does not exist.")

View file

@ -1,3 +1,5 @@
import asyncio
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.databases.exceptions.EmbeddingException import EmbeddingException from cognee.infrastructure.databases.exceptions.EmbeddingException import EmbeddingException
@ -6,6 +8,9 @@ from cognee.infrastructure.engine import DataPoint
logger = get_logger("index_data_points") logger = get_logger("index_data_points")
# A single lock shared by all coroutines
vector_index_lock = asyncio.Lock()
async def index_data_points(data_points: list[DataPoint]): async def index_data_points(data_points: list[DataPoint]):
created_indexes = {} created_indexes = {}
@ -22,9 +27,11 @@ async def index_data_points(data_points: list[DataPoint]):
index_name = f"{data_point_type.__name__}_{field_name}" index_name = f"{data_point_type.__name__}_{field_name}"
if index_name not in created_indexes: # Add async lock to make sure two different coroutines won't create a table at the same time
await vector_engine.create_vector_index(data_point_type.__name__, field_name) async with vector_index_lock:
created_indexes[index_name] = True if index_name not in created_indexes:
await vector_engine.create_vector_index(data_point_type.__name__, field_name)
created_indexes[index_name] = True
if index_name not in index_points: if index_name not in index_points:
index_points[index_name] = [] index_points[index_name] = []

View file

@ -1,3 +1,4 @@
from cognee.modules.engine.utils.generate_edge_id import generate_edge_id
from cognee.shared.logging_utils import get_logger, ERROR from cognee.shared.logging_utils import get_logger, ERROR
from collections import Counter from collections import Counter
@ -49,7 +50,9 @@ async def index_graph_edges(batch_size: int = 1024):
) )
for text, count in edge_types.items(): for text, count in edge_types.items():
edge = EdgeType(relationship_name=text, number_of_edges=count) edge = EdgeType(
id=generate_edge_id(edge_id=text), relationship_name=text, number_of_edges=count
)
data_point_type = type(edge) data_point_type = type(edge)
for field_name in edge.metadata["index_fields"]: for field_name in edge.metadata["index_fields"]:

View file

@ -26,8 +26,8 @@ async def test_deduplication():
explanation_file_path2 = os.path.join( explanation_file_path2 = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing_copy.txt" pathlib.Path(__file__).parent, "test_data/Natural_language_processing_copy.txt"
) )
await cognee.add([explanation_file_path], dataset_name) await cognee.add([explanation_file_path], dataset_name, incremental_loading=False)
await cognee.add([explanation_file_path2], dataset_name2) await cognee.add([explanation_file_path2], dataset_name2, incremental_loading=False)
result = await relational_engine.get_all_data_from_table("data") result = await relational_engine.get_all_data_from_table("data")
assert len(result) == 1, "More than one data entity was found." assert len(result) == 1, "More than one data entity was found."

View file

@ -155,6 +155,61 @@ async def test_specific_search_chunks(mock_send_telemetry, mock_chunks_retriever
assert results[0]["content"] == "Chunk result" assert results[0]["content"] == "Chunk result"
@pytest.mark.asyncio
@pytest.mark.parametrize(
"selected_type, retriever_name, expected_content, top_k",
[
(SearchType.RAG_COMPLETION, "CompletionRetriever", "RAG result from lucky search", 10),
(SearchType.CHUNKS, "ChunksRetriever", "Chunk result from lucky search", 5),
(SearchType.SUMMARIES, "SummariesRetriever", "Summary from lucky search", 15),
(SearchType.INSIGHTS, "InsightsRetriever", "Insight result from lucky search", 20),
],
)
@patch.object(search_module, "select_search_type")
@patch.object(search_module, "send_telemetry")
async def test_specific_search_feeling_lucky(
mock_send_telemetry,
mock_select_search_type,
selected_type,
retriever_name,
expected_content,
top_k,
mock_user,
):
with patch.object(search_module, retriever_name) as mock_retriever_class:
# Setup
query = f"test query for {retriever_name}"
query_type = SearchType.FEELING_LUCKY
# Mock the intelligent search type selection
mock_select_search_type.return_value = selected_type
# Mock the retriever
mock_retriever_instance = MagicMock()
mock_retriever_instance.get_completion = AsyncMock(
return_value=[{"content": expected_content}]
)
mock_retriever_class.return_value = mock_retriever_instance
# Execute
results = await specific_search(query_type, query, mock_user, top_k=top_k)
# Verify
mock_select_search_type.assert_called_once_with(query)
if retriever_name == "CompletionRetriever":
mock_retriever_class.assert_called_once_with(
system_prompt_path="answer_simple_question.txt", top_k=top_k
)
else:
mock_retriever_class.assert_called_once_with(top_k=top_k)
mock_retriever_instance.get_completion.assert_called_once_with(query)
mock_send_telemetry.assert_called()
assert len(results) == 1
assert results[0]["content"] == expected_content
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_specific_search_invalid_type(mock_user): async def test_specific_search_invalid_type(mock_user):
# Setup # Setup

View file

@ -1,6 +1,6 @@
[project] [project]
name = "cognee" name = "cognee"
version = "0.2.1" version = "0.2.2.dev0"
description = "Cognee - is a library for enriching LLM context with a semantic layer for better understanding and reasoning." description = "Cognee - is a library for enriching LLM context with a semantic layer for better understanding and reasoning."
authors = [ authors = [
{ name = "Vasilije Markovic" }, { name = "Vasilije Markovic" },

2
uv.lock generated
View file

@ -863,7 +863,7 @@ wheels = [
[[package]] [[package]]
name = "cognee" name = "cognee"
version = "0.2.1" version = "0.2.2.dev0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "aiofiles" }, { name = "aiofiles" },