Merge remote-tracking branch 'origin/feat/code-knowledge-graph' into feature/cognee-frontend

This commit is contained in:
Boris Arzentar 2024-05-22 10:17:33 +02:00
commit 9bb30bc43a
62 changed files with 1605 additions and 257 deletions

28
.data/code/example.txt Normal file
View file

@ -0,0 +1,28 @@
'''
Given a string, find the length of the longest substring without repeating characters.
Examples:
Given "abcabcbb", the answer is "abc", which the length is 3.
Given "bbbbb", the answer is "b", with the length of 1.
Given "pwwkew", the answer is "wke", with the length of 3. Note that the answer must be a substring, "pwke" is a subsequence and not a substring.
'''
class Solution(object):
def lengthOfLongestSubstring(self, s):
"""
:type s: str
:rtype: int
"""
mapSet = {}
start, result = 0, 0
for end in range(len(s)):
if s[end] in mapSet:
start = max(mapSet[s[end]], start)
result = max(result, end-start+1)
mapSet[s[end]] = end+1
return result

View file

@ -72,13 +72,13 @@ jobs:
- name: Install dependencies
run: poetry install --no-interaction
# - name: Build with Poetry
# run: poetry build
# - name: Install Package
# run: |
# cd dist
# pip install *.whl
# - name: Build with Poetry
# run: poetry build
#
# - name: Install Package
# run: |
# cd dist
# pip install *.whl
# - name: Download NLTK Punkt Tokenizer Models
# run: |
@ -92,6 +92,7 @@ jobs:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
QDRANT_API_KEY: ${{ secrets.QDRANT_API_KEY }}
QDRANT_URL: ${{ secrets.QDRANT_API_URL }}
ENV: 'dev'
run: poetry run python ./cognee/tests/test_library.py
# - run: |

View file

@ -2,18 +2,18 @@ import asyncio
from uuid import uuid4
from typing import List, Union
import logging
# import instructor
import nltk
# from openai import OpenAI
from nltk.corpus import stopwords
from cognee.config import Config
from cognee.modules.cognify.graph.add_data_chunks import add_data_chunks
from cognee.infrastructure.data.chunking.LangchainChunkingEngine import LangchainChunkEngine
from cognee.infrastructure.databases.vector.embeddings.DefaultEmbeddingEngine import LiteLLMEmbeddingEngine
from cognee.modules.cognify.graph.add_node_connections import group_nodes_by_layer, \
graph_ready_output, connect_nodes_in_graph
from cognee.modules.cognify.graph.add_data_chunks import add_data_chunks, add_data_chunks_basic_rag
from cognee.modules.cognify.graph.add_document_node import add_document_node
from cognee.modules.cognify.graph.add_classification_nodes import add_classification_nodes
from cognee.modules.cognify.graph.add_cognitive_layer_graphs import add_cognitive_layer_graphs
from cognee.modules.cognify.graph.add_summary_nodes import add_summary_nodes
from cognee.modules.cognify.graph.add_node_connections import group_nodes_by_layer, \
graph_ready_output, connect_nodes_in_graph
from cognee.modules.cognify.llm.resolve_cross_graph_references import resolve_cross_graph_references
from cognee.infrastructure.databases.graph.get_graph_client import get_graph_client
from cognee.modules.cognify.graph.add_cognitive_layers import add_cognitive_layers
@ -25,9 +25,10 @@ from cognee.modules.data.get_content_categories import get_content_categories
from cognee.modules.data.get_content_summary import get_content_summary
from cognee.modules.data.get_cognitive_layers import get_cognitive_layers
from cognee.modules.data.get_layer_graphs import get_layer_graphs
from cognee.shared.data_models import ChunkStrategy, KnowledgeGraph
from cognee.utils import send_telemetry
from cognee.modules.tasks import create_task_status_table, update_task_status
from cognee.shared.SourceCodeGraph import SourceCodeGraph
config = Config()
config.load()
@ -74,28 +75,22 @@ async def cognify(datasets: Union[str, List[str]] = None):
if dataset_name in added_dataset:
dataset_files.append((added_dataset, db_engine.get_files_metadata(added_dataset)))
# await initialize_graph(USER_ID, graph_data_model, graph_client)
data_chunks = {}
chunk_engine = infrastructure_config.get_config()["chunk_engine"]
chunk_strategy = infrastructure_config.get_config()["chunk_strategy"]
for (dataset_name, files) in dataset_files:
update_task_status(dataset_name, "DATASET_PROCESSING_STARTED")
async def process_batch(files_batch):
data_chunks = {}
for file_metadata in files:
for dataset_name, file_metadata, document_id in files_batch:
with open(file_metadata["file_path"], "rb") as file:
try:
document_id = await add_document_node(
graph_client,
parent_node_id = f"DefaultGraphModel__{USER_ID}",
document_metadata = file_metadata,
)
update_task_status(document_id, "DOCUMENT_PROCESSING_STARTED")
file_type = guess_file_type(file)
text = extract_text_from_file(file, file_type)
if text is None:
text = "empty file"
if text == "":
text = "empty file"
subchunks = chunk_engine.chunk_data(chunk_strategy, text, config.chunk_size, config.chunk_overlap)
if dataset_name not in data_chunks:
@ -106,68 +101,104 @@ async def cognify(datasets: Union[str, List[str]] = None):
document_id = document_id,
chunk_id = str(uuid4()),
text = subchunk,
file_metadata = file_metadata,
))
except FileTypeException:
logger.warning("File (%s) has an unknown file type. We are skipping it.", file_metadata["id"])
added_chunks: list[tuple[str, str, dict]] = await add_data_chunks(data_chunks)
added_chunks = await add_data_chunks(data_chunks)
await add_data_chunks_basic_rag(data_chunks)
chunks_by_document = {}
await asyncio.gather(
*[process_text(
chunk["collection"],
chunk["chunk_id"],
chunk["text"],
chunk["file_metadata"],
chunk["document_id"]
) for chunk in added_chunks]
)
for chunk in added_chunks:
if chunk["document_id"] not in chunks_by_document:
chunks_by_document[chunk["document_id"]] = []
chunks_by_document[chunk["document_id"]].append(chunk)
batch_size = 20
file_count = 0
files_batch = []
for document_id, chunks in chunks_by_document.items():
try:
await asyncio.gather(
*[process_text(
chunk["document_id"],
chunk["chunk_id"],
chunk["collection"],
chunk["text"],
) for chunk in chunks]
update_task_status(dataset_name, "DATASET_PROCESSING_STARTED")
for (dataset_name, files) in dataset_files:
for file_metadata in files:
graph_topology = infrastructure_config.get_config()["graph_model"]
if graph_topology == SourceCodeGraph:
parent_node_id = f"{file_metadata['name']}.{file_metadata['extension']}"
else:
parent_node_id = f"DefaultGraphModel__{USER_ID}"
document_id = await add_document_node(
graph_client,
parent_node_id=parent_node_id,
document_metadata=file_metadata,
)
update_task_status(document_id, "DOCUMENT_PROCESSING_FINISHED")
except Exception as e:
logger.exception(e)
update_task_status(document_id, "DOCUMENT_PROCESSING_FAILED")
files_batch.append((dataset_name, file_metadata, document_id))
file_count += 1
if file_count >= batch_size:
await process_batch(files_batch)
files_batch = []
file_count = 0
# Process any remaining files in the last batch
if len(files_batch) > 0:
await process_batch(files_batch)
update_task_status(dataset_name, "DATASET_PROCESSING_FINISHED")
return graph_client.graph
async def process_text(document_id: str, chunk_id: str, chunk_collection: str, input_text: str):
raw_document_id = document_id.split("__")[-1]
print(f"Processing chunk ({chunk_id}) from document ({raw_document_id}).")
async def process_text(chunk_collection: str, chunk_id: str, input_text: str, file_metadata: dict, document_id: str):
print(f"Processing chunk ({chunk_id}) from document ({file_metadata['id']}).")
graph_client = await get_graph_client(infrastructure_config.get_config()["graph_engine"])
classified_categories = await get_content_categories(input_text)
graph_topology = infrastructure_config.get_config()["graph_model"]
if graph_topology == SourceCodeGraph:
classified_categories = [{"data_type": "text", "category_name": "Code and functions"}]
elif graph_topology == KnowledgeGraph:
classified_categories = await get_content_categories(input_text)
else:
classified_categories = [{"data_type": "text", "category_name": "Unclassified text"}]
# await add_label_nodes(graph_client, document_id, chunk_id, file_metadata["keywords"].split("|"))
await add_classification_nodes(
graph_client,
parent_node_id = document_id,
categories = classified_categories,
)
print(f"Chunk ({chunk_id}) classified.")
content_summary = await get_content_summary(input_text)
await add_summary_nodes(graph_client, document_id, content_summary)
print(f"Chunk ({chunk_id}) summarized.")
cognitive_layers = await get_cognitive_layers(input_text, classified_categories)
cognitive_layers = (await add_cognitive_layers(graph_client, document_id, cognitive_layers))[:2]
cognitive_layers = cognitive_layers[:config.cognitive_layers_limit]
try:
cognitive_layers = (await add_cognitive_layers(graph_client, document_id, cognitive_layers))[:2]
print("cognitive_layers", cognitive_layers)
layer_graphs = await get_layer_graphs(input_text, cognitive_layers)
await add_cognitive_layer_graphs(graph_client, chunk_collection, chunk_id, layer_graphs)
except:
pass
layer_graphs = await get_layer_graphs(input_text, cognitive_layers)
await add_cognitive_layer_graphs(graph_client, chunk_collection, chunk_id, layer_graphs)
if infrastructure_config.get_config()["connect_documents"] is True:
db_engine = infrastructure_config.get_config()["database_engine"]
relevant_documents_to_connect = db_engine.fetch_cognify_data(excluded_document_id = raw_document_id)
relevant_documents_to_connect = db_engine.fetch_cognify_data(excluded_document_id = document_id)
list_of_nodes = []
@ -196,19 +227,53 @@ async def process_text(document_id: str, chunk_id: str, chunk_collection: str, i
print(f"Chunk ({chunk_id}) cognified.")
if __name__ == "__main__":
async def test():
# await prune.prune_system()
# #
# from cognee.api.v1.add import add
# data_directory_path = os.path.abspath("../../../.data")
# # print(data_directory_path)
# # config.data_root_directory(data_directory_path)
# # cognee_directory_path = os.path.abspath("../.cognee_system")
# # config.system_root_directory(cognee_directory_path)
#
# await add("data://" +data_directory_path, "example")
text = """import subprocess
def show_all_processes():
process = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE)
output, error = process.communicate()
if error:
print(f"Error: {error}")
else:
print(output.decode())
show_all_processes()"""
from cognee.api.v1.add import add
await add(["A large language model (LLM) is a language model notable for its ability to achieve general-purpose language generation and other natural language processing tasks such as classification"], "test")
await add([text], "example_dataset")
graph = await cognify()
infrastructure_config.set_config( {"chunk_engine": LangchainChunkEngine() , "chunk_strategy": ChunkStrategy.CODE,'embedding_engine': LiteLLMEmbeddingEngine() })
from cognee.shared.SourceCodeGraph import SourceCodeGraph
from cognee.api.v1.config import config
from cognee.utils import render_graph
# config.set_graph_model(SourceCodeGraph)
# config.set_classification_model(CodeContentPrediction)
# graph = await cognify()
vector_client = infrastructure_config.get_config("vector_engine")
await render_graph(graph, include_color=True, include_nodes=True, include_size=True)
out = await vector_client.search(collection_name ="basic_rag", query_text="show_all_processes", limit=10)
print("results", out)
#
# from cognee.utils import render_graph
#
# await render_graph(graph, include_color=True, include_nodes=False, include_size=False)
import asyncio
asyncio.run(test())

View file

@ -72,3 +72,9 @@ class config():
infrastructure_config.set_config({
"chunk_strategy": chunk_strategy
})
@staticmethod
def set_graph_topology(graph_topology: object):
infrastructure_config.set_config({
"graph_topology": graph_topology
})

View file

@ -3,6 +3,8 @@ import asyncio
from enum import Enum
from typing import Dict, Any, Callable, List
from pydantic import BaseModel, field_validator
from cognee.modules.search.graph import search_cypher
from cognee.modules.search.graph.search_adjacent import search_adjacent
from cognee.modules.search.vector.search_similarity import search_similarity
from cognee.modules.search.graph.search_categories import search_categories
@ -18,6 +20,10 @@ class SearchType(Enum):
CATEGORIES = 'CATEGORIES'
NEIGHBOR = 'NEIGHBOR'
SUMMARY = 'SUMMARY'
SUMMARY_CLASSIFICATION = 'SUMMARY_CLASSIFICATION'
NODE_CLASSIFICATION = 'NODE_CLASSIFICATION'
DOCUMENT_CLASSIFICATION = 'DOCUMENT_CLASSIFICATION',
CYPHER = 'CYPHER'
@staticmethod
def from_str(name: str):
@ -51,7 +57,9 @@ async def specific_search(query_params: List[SearchParameters]) -> List:
SearchType.SIMILARITY: search_similarity,
SearchType.CATEGORIES: search_categories,
SearchType.NEIGHBOR: search_neighbour,
SearchType.SUMMARY: search_summary
SearchType.SUMMARY: search_summary,
SearchType.CYPHER: search_cypher
}
results = []

View file

View file

@ -0,0 +1,91 @@
from typing import List, Dict, Any, Union, Optional
from cognee.infrastructure import infrastructure_config
from cognee.infrastructure.databases.graph.get_graph_client import get_graph_client
from cognee.modules.topology.topology import TopologyEngine, GitHubRepositoryModel
import pandas as pd
from pydantic import BaseModel
USER_ID = "default_user"
async def add_topology(directory="example", model=GitHubRepositoryModel):
graph_db_type = infrastructure_config.get_config()["graph_engine"]
graph_client = await get_graph_client(graph_db_type)
graph_topology = infrastructure_config.get_config()["graph_topology"]
engine = TopologyEngine()
topology = await engine.infer_from_directory_structure(node_id =USER_ID , repository = directory, model=model)
def flatten_model(model: BaseModel, parent_id: Optional[str] = None) -> Dict[str, Any]:
"""Flatten a single Pydantic model to a dictionary handling nested structures."""
result = {**model.dict(), 'parent_id': parent_id}
if hasattr(model, 'default_relationship') and model.default_relationship:
result.update({
'relationship_type': model.default_relationship.type,
'relationship_source': model.default_relationship.source,
'relationship_target': model.default_relationship.target
})
return result
def recursive_flatten(items: Union[List[Any], BaseModel], parent_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""Recursively flatten nested Pydantic models or lists of models."""
if isinstance(items, list):
return [entry for item in items for entry in recursive_flatten(item, parent_id)]
elif isinstance(items, BaseModel):
flat = [flatten_model(items, parent_id)]
for field, value in items:
if isinstance(value, (BaseModel, list)):
flat.extend(recursive_flatten(value, items.dict().get('node_id', None)))
return flat
else:
return []
def flatten_repository(repo_model):
""" Flatten the entire repository model, starting with the top-level model """
return recursive_flatten(repo_model)
flt_topology = flatten_repository(topology)
df =pd.DataFrame(flt_topology)
print(df.head(10))
for _, row in df.iterrows():
node_data = row.to_dict()
node_id = node_data.pop('node_id')
# Remove 'node_id' and get its value
await graph_client.add_node(node_id, node_data)
if pd.notna(row['relationship_source']) and pd.notna(row['relationship_target']):
await graph_client.add_edge(row['relationship_source'], row['relationship_target'], relationship_name=row['relationship_type'])
return graph_client.graph
if __name__ == "__main__":
async def test():
# await prune.prune_system()
# #
# from cognee.api.v1.add import add
# data_directory_path = os.path.abspath("../../../.data")
# # print(data_directory_path)
# # config.data_root_directory(data_directory_path)
# # cognee_directory_path = os.path.abspath("../.cognee_system")
# # config.system_root_directory(cognee_directory_path)
#
# await add("data://" +data_directory_path, "example")
# graph = await add_topology()
graph_db_type = infrastructure_config.get_config()["graph_engine"]
graph_client = await get_graph_client(graph_db_type)
#
from cognee.utils import render_graph
await render_graph(graph_client.graph, include_color=True, include_nodes=False, include_size=False)
import asyncio
asyncio.run(test())

View file

@ -8,7 +8,7 @@ from dataclasses import dataclass, field
from pathlib import Path
from dotenv import load_dotenv
from cognee.root_dir import get_absolute_path
from cognee.shared.data_models import ChunkStrategy
from cognee.shared.data_models import ChunkStrategy, DefaultGraphModel
base_dir = Path(__file__).resolve().parent.parent
# Load the .env file from the base directory
@ -51,18 +51,20 @@ class Config:
# Model parameters
llm_provider: str = os.getenv("LLM_PROVIDER","openai") #openai, or custom or ollama
custom_model: str = os.getenv("CUSTOM_LLM_MODEL", "mistralai/Mixtral-8x7B-Instruct-v0.1") #"mistralai/Mixtral-8x7B-Instruct-v0.1"
custom_model: str = os.getenv("CUSTOM_LLM_MODEL", "llama3-70b-8192") #"mistralai/Mixtral-8x7B-Instruct-v0.1"
custom_endpoint: str = os.getenv("CUSTOM_ENDPOINT", "https://api.endpoints.anyscale.com/v1") #"https://api.endpoints.anyscale.com/v1" # pass claude endpoint
custom_key: Optional[str] = os.getenv("CUSTOM_LLM_API_KEY")
ollama_endpoint: str = os.getenv("CUSTOM_OLLAMA_ENDPOINT", "http://localhost:11434/v1") #"http://localhost:11434/v1"
ollama_key: Optional[str] = "ollama"
ollama_model: str = os.getenv("CUSTOM_OLLAMA_MODEL", "mistral:instruct") #"mistral:instruct"
openai_model: str = os.getenv("OPENAI_MODEL", "gpt-4-1106-preview" ) #"gpt-4-1106-preview"
openai_model: str = os.getenv("OPENAI_MODEL", "gpt-4o" ) #"gpt-4o"
model_endpoint: str = "openai"
openai_key: Optional[str] = os.getenv("OPENAI_API_KEY")
openai_temperature: float = float(os.getenv("OPENAI_TEMPERATURE", 0.0))
openai_embedding_model = "text-embedding-3-large"
openai_embedding_dimensions = 3072
litellm_embedding_model = "text-embedding-3-large"
litellm_embedding_dimensions = 3072
graphistry_username = os.getenv("GRAPHISTRY_USERNAME")
graphistry_password = os.getenv("GRAPHISTRY_PASSWORD")
@ -74,6 +76,13 @@ class Config:
# Database parameters
graph_database_provider: str = os.getenv("GRAPH_DB_PROVIDER", "NETWORKX")
graph_topology:str = DefaultGraphModel
cognitive_layers_limit: int = 2
from cognee.shared.data_models import MonitoringTool
# Monitoring tool
monitoring_tool: str = os.getenv("MONITORING_TOOL", MonitoringTool.LANGFUSE)
if (
os.getenv("ENV") == "prod"

View file

@ -33,6 +33,8 @@ class InfrastructureConfig():
database_file_path: str = None
chunk_strategy = config.chunk_strategy
chunk_engine = None
graph_topology = config.graph_topology
monitoring_tool = config.monitoring_tool
def get_config(self, config_entity: str = None) -> dict:
if (config_entity is None or config_entity == "database_engine") and self.database_engine is None:
@ -78,12 +80,17 @@ class InfrastructureConfig():
if self.chunk_engine is None:
self.chunk_engine = DefaultChunkEngine()
if self.graph_topology is None:
self.graph_topology = config.graph_topology
if (config_entity is None or config_entity == "llm_engine") and self.llm_engine is None:
self.llm_engine = OpenAIAdapter(config.openai_key, config.openai_model)
if (config_entity is None or config_entity == "database_directory_path") and self.database_directory_path is None:
self.database_directory_path = self.system_root_directory + "/" + config.db_path
if self.database_directory_path is None:
self.database_directory_path = self.system_root_directory + "/" + config.db_path
if (config_entity is None or config_entity == "database_file_path") and self.database_file_path is None:
self.database_file_path = self.system_root_directory + "/" + config.db_path + "/" + config.db_name
@ -151,6 +158,7 @@ class InfrastructureConfig():
"database_path": self.database_file_path,
"chunk_strategy": self.chunk_strategy,
"chunk_engine": self.chunk_engine,
"graph_topology": self.graph_topology
}
def set_config(self, new_config: dict):
@ -205,4 +213,7 @@ class InfrastructureConfig():
if "chunk_engine" in new_config:
self.chunk_engine = new_config["chunk_engine"]
if "graph_topology" in new_config:
self.graph_topology = new_config["graph_topology"]
infrastructure_config = InfrastructureConfig()

View file

@ -52,6 +52,7 @@ class DefaultChunkEngine():
elif chunk_strategy == ChunkStrategy.EXACT:
chunked_data = DefaultChunkEngine.chunk_data_exact(source_data, chunk_size, chunk_overlap)
return chunked_data

View file

@ -0,0 +1,52 @@
from __future__ import annotations
import re
from cognee.infrastructure.data.chunking.DefaultChunkEngine import DefaultChunkEngine
from cognee.shared.data_models import ChunkStrategy
class LangchainChunkEngine():
@staticmethod
def chunk_data(
chunk_strategy = None,
source_data = None,
chunk_size = None,
chunk_overlap = None,
):
"""
Chunk data based on the specified strategy.
Parameters:
- chunk_strategy: The strategy to use for chunking.
- source_data: The data to be chunked.
- chunk_size: The size of each chunk.
- chunk_overlap: The overlap between chunks.
Returns:
- The chunked data.
"""
if chunk_strategy == ChunkStrategy.CODE:
chunked_data = LangchainChunkEngine.chunk_data_by_code(source_data,chunk_size, chunk_overlap)
else:
chunked_data = DefaultChunkEngine.chunk_data_by_paragraph(source_data,chunk_size, chunk_overlap)
return chunked_data
@staticmethod
def chunk_data_by_code(data_chunks, chunk_size, chunk_overlap, language=None):
from langchain_text_splitters import (
Language,
RecursiveCharacterTextSplitter,
)
if language is None:
language = Language.PYTHON
python_splitter = RecursiveCharacterTextSplitter.from_language(
language=language, chunk_size=chunk_size, chunk_overlap=chunk_overlap
)
code_chunks = python_splitter.create_documents([data_chunks])
only_content = [chunk.page_content for chunk in code_chunks]
return only_content

View file

@ -0,0 +1,191 @@
""" FalcorDB Adapter for Graph Database"""
import json
import logging
from typing import Optional, Any, List, Dict
from contextlib import asynccontextmanager
from falkordb.asyncio import FalkorDB
from cognee.infrastructure.databases.graph.graph_db_interface import GraphDBInterface
logger = logging.getLogger("FalcorDBAdapter")
class FalcorDBAdapter(GraphDBInterface):
def __init__(
self,
graph_database_url: str,
graph_database_username: str,
graph_database_password: str,
graph_database_port: int,
driver: Optional[Any] = None,
graph_name: str = "DefaultGraph",
):
self.driver = FalkorDB(
host = graph_database_url,
port = graph_database_port)
self.graph_name = graph_name
async def query(
self,
query: str,
params: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
try:
selected_graph = self.driver.select_graph(self.graph_name)
result = await selected_graph.query(query)
return result.result_set
except Exception as error:
logger.error("Falkor query error: %s", error, exc_info = True)
raise error
async def graph(self):
return self.driver
async def add_node(self, node_id: str, node_properties: Dict[str, Any] = None):
node_id = node_id.replace(":", "_")
serialized_properties = self.serialize_properties(node_properties)
if "name" not in serialized_properties:
serialized_properties["name"] = node_id
# serialized_properties["created_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# serialized_properties["updated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# properties = ", ".join(f"{property_name}: ${property_name}" for property_name in serialized_properties.keys())
query = f"""MERGE (node:`{node_id}` {{id: $node_id}})
ON CREATE SET node += $properties
RETURN ID(node) AS internal_id, node.id AS nodeId"""
params = {
"node_id": node_id,
"properties": serialized_properties,
}
return await self.query(query, params)
async def add_nodes(self, nodes: list[tuple[str, dict[str, Any]]]) -> None:
# nodes_data = []
for node in nodes:
node_id, node_properties = node
node_id = node_id.replace(":", "_")
await self.add_node(
node_id = node_id,
node_properties = node_properties,
)
async def extract_node_description(self, node_id: str):
query = """MATCH (n)-[r]->(m)
WHERE n.id = $node_id
AND NOT m.id CONTAINS 'DefaultGraphModel'
RETURN m
"""
result = await self.query(query, dict(node_id = node_id))
descriptions = []
for node in result:
# Assuming 'm' is a consistent key in your data structure
attributes = node.get("m", {})
# Ensure all required attributes are present
if all(key in attributes for key in ["id", "layer_id", "description"]):
descriptions.append({
"id": attributes["id"],
"layer_id": attributes["layer_id"],
"description": attributes["description"],
})
return descriptions
async def get_layer_nodes(self):
query = """MATCH (node) WHERE node.layer_id IS NOT NULL
RETURN node"""
return [result['node'] for result in (await self.query(query))]
async def extract_node(self, node_id: str):
query= """
MATCH(node {id: $node_id})
RETURN node
"""
results = [node['node'] for node in (await self.query(query, dict(node_id = node_id)))]
return results[0] if len(results) > 0 else None
async def delete_node(self, node_id: str):
node_id = id.replace(":", "_")
query = f"MATCH (node:`{node_id}` {{id: $node_id}}) DETACH DELETE n"
params = { "node_id": node_id }
return await self.query(query, params)
async def add_edge(self, from_node: str, to_node: str, relationship_name: str, edge_properties: Optional[Dict[str, Any]] = {}):
serialized_properties = self.serialize_properties(edge_properties)
from_node = from_node.replace(":", "_")
to_node = to_node.replace(":", "_")
query = f"""MATCH (from_node:`{from_node}` {{id: $from_node}}), (to_node:`{to_node}` {{id: $to_node}})
MERGE (from_node)-[r:`{relationship_name}`]->(to_node)
SET r += $properties
RETURN r"""
params = {
"from_node": from_node,
"to_node": to_node,
"properties": serialized_properties
}
return await self.query(query, params)
async def add_edges(self, edges: list[tuple[str, str, str, dict[str, Any]]]) -> None:
# edges_data = []
for edge in edges:
from_node, to_node, relationship_name, edge_properties = edge
from_node = from_node.replace(":", "_")
to_node = to_node.replace(":", "_")
await self.add_edge(
from_node = from_node,
to_node = to_node,
relationship_name = relationship_name,
edge_properties = edge_properties
)
async def filter_nodes(self, search_criteria):
query = f"""MATCH (node)
WHERE node.id CONTAINS '{search_criteria}'
RETURN node"""
return await self.query(query)
async def delete_graph(self):
query = """MATCH (node)
DETACH DELETE node;"""
return await self.query(query)
def serialize_properties(self, properties = dict()):
return {
property_key: json.dumps(property_value)
if isinstance(property_value, (dict, list))
else property_value for property_key, property_value in properties.items()
}

View file

@ -41,6 +41,9 @@ class NetworkXAdapter(GraphDBInterface):
) -> None:
self.graph.add_nodes_from(nodes)
await self.save_graph_to_file(self.filename)
async def get_graph(self):
return self.graph
async def add_edge(
self,

View file

@ -1,3 +1,4 @@
import asyncio
from typing import List
import instructor
@ -6,8 +7,11 @@ from fastembed import TextEmbedding
from cognee.config import Config
from cognee.root_dir import get_absolute_path
from .EmbeddingEngine import EmbeddingEngine
from cognee.infrastructure.databases.vector.embeddings.EmbeddingEngine import EmbeddingEngine
from litellm import aembedding
import litellm
litellm.set_verbose = True
config = Config()
config.load()
@ -22,19 +26,39 @@ class DefaultEmbeddingEngine(EmbeddingEngine):
return config.embedding_dimensions
class OpenAIEmbeddingEngine(EmbeddingEngine):
async def embed_text(self, text: List[str]) -> List[float]:
class LiteLLMEmbeddingEngine(EmbeddingEngine):
import asyncio
from typing import List
OPENAI_API_KEY = config.openai_key
async def embed_text(self, text: List[str]) -> List[List[float]]:
async def get_embedding(text_):
response = await aembedding(config.litellm_embedding_model, input=text_)
return response.data[0]['embedding']
aclient = instructor.patch(AsyncOpenAI())
text = text.replace("\n", " ")
response = await aclient.embeddings.create(input = text, model = config.openai_embedding_model)
embedding = response.data[0].embedding
# embeddings_list = list(map(lambda embedding: embedding.tolist(), embedding_model.embed(text)))
return embedding
tasks = [get_embedding(text_) for text_ in text]
result = await asyncio.gather(*tasks)
return result
# embedding = response.data[0].embedding
# # embeddings_list = list(map(lambda embedding: embedding.tolist(), embedding_model.embed(text)))
# print("response", type(response.data[0]['embedding']))
# print("response", response.data[0])
# return [response.data[0]['embedding']]
def get_vector_size(self) -> int:
return config.openai_embedding_dimensions
return config.litellm_embedding_dimensions
if __name__ == "__main__":
async def gg():
openai_embedding_engine = LiteLLMEmbeddingEngine()
# print(openai_embedding_engine.embed_text(["Hello, how are you?"]))
# print(openai_embedding_engine.get_vector_size())
# default_embedding_engine = DefaultEmbeddingEngine()
sds = await openai_embedding_engine.embed_text(["Hello, sadasdas are you?"])
print(sds)
# print(default_embedding_engine.get_vector_size())
asyncio.run(gg())

View file

@ -37,10 +37,11 @@ class LanceDBAdapter(VectorDBInterface):
async def create_collection(self, collection_name: str, payload_schema: BaseModel):
data_point_types = get_type_hints(DataPoint)
vector_size = self.embedding_engine.get_vector_size()
class LanceDataPoint(LanceModel):
id: data_point_types["id"] = Field(...)
vector: Vector(self.embedding_engine.get_vector_size())
vector: Vector(vector_size)
payload: payload_schema
if not await self.collection_exists(collection_name):
@ -68,10 +69,11 @@ class LanceDBAdapter(VectorDBInterface):
IdType = TypeVar("IdType")
PayloadSchema = TypeVar("PayloadSchema")
vector_size = self.embedding_engine.get_vector_size()
class LanceDataPoint(LanceModel, Generic[IdType, PayloadSchema]):
id: IdType
vector: Vector(self.embedding_engine.get_vector_size())
vector: Vector(vector_size)
payload: PayloadSchema
lance_data_points = [

View file

@ -16,7 +16,14 @@ def get_file_metadata(file: BinaryIO) -> FileMetadata:
file.seek(0)
file_text = extract_text_from_file(file, file_type)
keywords = extract_keywords(file_text)
import uuid
try:
keywords = extract_keywords(file_text)
except:
keywords = ["no keywords detected" + str(uuid.uuid4())]
file_path = file.name
file_name = file_path.split("/")[-1].split(".")[0] if file_path else None

View file

@ -1,25 +1,51 @@
import asyncio
import os
from typing import List, Type
from pydantic import BaseModel
import instructor
from tenacity import retry, stop_after_attempt
from openai import AsyncOpenAI
import openai
from cognee.config import Config
from cognee.infrastructure import infrastructure_config
from cognee.infrastructure.llm.llm_interface import LLMInterface
from cognee.infrastructure.llm.prompts import read_query_prompt
from cognee.shared.data_models import MonitoringTool
config = Config()
config.load()
if config.monitoring_tool == MonitoringTool.LANGFUSE:
from langfuse.openai import AsyncOpenAI, OpenAI
elif config.monitoring_tool == MonitoringTool.LANGSMITH:
from langsmith import wrap_openai
from openai import AsyncOpenAI
AsyncOpenAI = wrap_openai(AsyncOpenAI())
else:
from openai import AsyncOpenAI, OpenAI
class GenericAPIAdapter(LLMInterface):
"""Adapter for Ollama's API"""
"""Adapter for Generic API LLM provider API """
def __init__(self, api_endpoint, api_key: str, model: str):
self.aclient = instructor.patch(
AsyncOpenAI(
base_url = api_endpoint,
api_key = api_key, # required, but unused
),
mode = instructor.Mode.JSON,
)
if infrastructure_config.get_config()["llm_provider"] == 'groq':
from groq import groq
self.aclient = instructor.from_openai(client = groq.Groq(
api_key=api_key,
), mode=instructor.Mode.MD_JSON)
else:
self.aclient = instructor.patch(
AsyncOpenAI(
base_url = api_endpoint,
api_key = api_key, # required, but unused
),
mode = instructor.Mode.JSON,
)
self.model = model
@retry(stop = stop_after_attempt(5))
@ -75,20 +101,21 @@ class GenericAPIAdapter(LLMInterface):
return embeddings
@retry(stop=stop_after_attempt(5))
async def acreate_structured_output(self, text_input: str, system_prompt: str,
response_model: Type[BaseModel]) -> BaseModel:
@retry(stop = stop_after_attempt(5))
async def acreate_structured_output(self, text_input: str, system_prompt: str, response_model: Type[BaseModel]) -> BaseModel:
"""Generate a response from a user query."""
return await self.aclient.chat.completions.create(
model=self.model,
messages=[
model = self.model,
messages = [
{
"role": "user",
"content": f"""Use the given format to
extract information from the following input: {text_input}. {system_prompt} """,
}
extract information from the following input: {text_input}. """,
},
{"role": "system", "content": system_prompt},
],
response_model=response_model,
response_model = response_model,
)
def show_prompt(self, text_input: str, system_prompt: str) -> str:

View file

@ -2,11 +2,25 @@ import asyncio
from typing import List, Type
import openai
import instructor
from openai import AsyncOpenAI, OpenAI
from pydantic import BaseModel
from tenacity import retry, stop_after_attempt
from cognee.config import Config
from cognee.infrastructure.llm.llm_interface import LLMInterface
from cognee.infrastructure.llm.prompts import read_query_prompt
from cognee.shared.data_models import MonitoringTool
config = Config()
config.load()
if config.monitoring_tool == MonitoringTool.LANGFUSE:
from langfuse.openai import AsyncOpenAI, OpenAI
elif config.monitoring_tool == MonitoringTool.LANGSMITH:
from langsmith import wrap_openai
from openai import AsyncOpenAI
AsyncOpenAI = wrap_openai(AsyncOpenAI())
else:
from openai import AsyncOpenAI, OpenAI
class OpenAIAdapter(LLMInterface):
"""Adapter for OpenAI's GPT-3, GPT=4 API"""

View file

@ -0,0 +1,2 @@
Chose the summary that is the most relevant to the query`{{ query }}`
Here are the categories:`{{ categories }}`

View file

@ -0,0 +1,2 @@
Chose the summary that is the most relevant to the query`{{ query }}`
Here are the summaries:`{{ summaries }}`

View file

@ -0,0 +1,4 @@
You are a topology master and need to extract the following topology information from the text provided to you.
Relationship part'S can't be empty, and have to be logical AND CONNECTING ELEMENTS OF THE TOPOLOGY
The source is the parent of the target. And the target is the child of the source.
Have in mind this model needs to become a graph later.

View file

@ -5,20 +5,20 @@ async def add_classification_nodes(graph_client, parent_node_id: str, categories
data_type = category["data_type"].upper().replace(" ", "_")
category_name = category["category_name"].upper().replace(" ", "_").replace("'", "").replace("/", "_")
data_type_node_id = f"DATA_TYPE__{data_type}"
data_type_node_id = data_type
data_type_node = await graph_client.extract_node(data_type_node_id)
if not data_type_node:
data_type_node = await graph_client.add_node(data_type_node_id, dict(name = data_type, entity_type = "DataType"))
data_type_node = await graph_client.add_node(data_type_node_id, dict(name = data_type, type = "DataType"))
await graph_client.add_edge(data_type_node_id, parent_node_id, "classified_as", dict(relationship_name = "classified_as"))
category_node_id = f"DATA_CATEGORY__{category_name}"
category_node_id = category_name
category_node = await graph_client.extract_node(category_node_id)
if not category_node:
category_node = await graph_client.add_node(category_node_id, dict(name = category_name, entity_type = "DataCategory"))
category_node = await graph_client.add_node(category_node_id, dict(name = category_name, type = "DataCategory"))
await graph_client.add_edge(category_node_id, parent_node_id, "classified_as", dict(relationship_name = "classified_as"))

View file

@ -4,7 +4,6 @@ from typing import List, Tuple, TypedDict
from pydantic import BaseModel
from cognee.infrastructure import infrastructure_config
from cognee.infrastructure.databases.vector import DataPoint
from cognee.shared.data_models import KnowledgeGraph
from cognee.utils import extract_pos_tags, extract_named_entities, extract_sentiment_vader
class GraphLike(TypedDict):
@ -19,47 +18,54 @@ async def add_cognitive_layer_graphs(
layer_graphs: List[Tuple[str, GraphLike]],
):
vector_client = infrastructure_config.get_config("vector_engine")
graph_model = infrastructure_config.get_config("graph_model")
for (layer_id, layer_graph) in layer_graphs:
graph_nodes = []
graph_edges = []
if not isinstance(layer_graph, KnowledgeGraph):
layer_graph = KnowledgeGraph.parse_obj(layer_graph)
if not isinstance(layer_graph, graph_model):
layer_graph = graph_model.parse_obj(layer_graph)
for node in layer_graph.nodes:
node_id = generate_proposition_node_id(node.id)
node_id = generate_node_id(node.id)
entity_type_node_id = generate_type_node_id(node.entity_type)
entity_type_node = await graph_client.extract_node(entity_type_node_id)
type_node_id = generate_node_id(node.type)
type_node = await graph_client.extract_node(type_node_id)
if not entity_type_node:
node_name = node.entity_type.lower().capitalize()
if not type_node:
node_name = node.type.lower().capitalize()
entity_type_node = (
entity_type_node_id,
type_node = (
type_node_id,
dict(
id = entity_type_node_id,
id = type_node_id,
name = node_name,
entity_type = node_name,
type = node_name,
created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
)
)
graph_nodes.append(entity_type_node)
graph_nodes.append(type_node)
# Add relationship between document and entity type: "Document contains Person"
graph_edges.append((
layer_id,
entity_type_node_id,
type_node_id,
"contains",
dict(relationship_name = "contains"),
))
pos_tags = extract_pos_tags(node.entity_description)
named_entities = extract_named_entities(node.entity_description)
sentiment = extract_sentiment_vader(node.entity_description)
# pos_tags = extract_pos_tags(node.description)
# named_entities = extract_named_entities(node.description)
# sentiment = extract_sentiment_vader(node.description)
id, type, name, description, *node_properties = node
print("Node properties: ", node_properties)
node_properties = dict(node_properties)
graph_nodes.append((
node_id,
@ -68,21 +74,22 @@ async def add_cognitive_layer_graphs(
layer_id = layer_id,
chunk_id = chunk_id,
chunk_collection = chunk_collection,
name = node.entity_name,
entity_type = node.entity_type.lower().capitalize(),
description = node.entity_description,
pos_tags = pos_tags,
sentiment = sentiment,
named_entities = named_entities,
name = node.name,
type = node.type.lower().capitalize(),
description = node.description,
# pos_tags = pos_tags,
# sentiment = sentiment,
# named_entities = named_entities,
created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
**node_properties,
)
))
# Add relationship between entity type and entity itself: "Jake is Person"
graph_edges.append((
node_id,
entity_type_node_id,
type_node_id,
"is",
dict(relationship_name = "is"),
))
@ -97,8 +104,8 @@ async def add_cognitive_layer_graphs(
# Add relationship that came from graphs.
for edge in layer_graph.edges:
graph_edges.append((
generate_proposition_node_id(edge.source_node_id),
generate_proposition_node_id(edge.target_node_id),
generate_node_id(edge.source_node_id),
generate_node_id(edge.target_node_id),
edge.relationship_name,
dict(relationship_name = edge.relationship_name),
))
@ -138,8 +145,5 @@ async def add_cognitive_layer_graphs(
await vector_client.create_data_points(layer_id, data_points)
def generate_proposition_node_id(node_id: str) -> str:
return f"PROPOSITION_NODE__{node_id.upper().replace(' ', '_')}".replace("'", "")
def generate_type_node_id(node_id: str) -> str:
return f"PROPOSITION_TYPE_NODE__{node_id.upper().replace(' ', '_')}".replace("'", "")
def generate_node_id(node_id: str) -> str:
return node_id.upper().replace(' ', '_').replace("'", "")

View file

@ -29,5 +29,7 @@ def fix_layer_name(layer_name):
def generate_cognitive_layer_id(layer_id: str) -> str:
layer = f"COGNITIVE_LAYER__{layer_id.upper().replace(' ', '_')}".replace("'", "").replace("/", "_")
return fix_layer_name(layer)

View file

@ -1,3 +1,4 @@
from typing import TypedDict
from pydantic import BaseModel, Field
from cognee.infrastructure import infrastructure_config
@ -30,6 +31,7 @@ async def add_data_chunks(dataset_data_chunks: dict[str, list[TextChunk]]):
collection = dataset_name,
text = chunk["text"],
document_id = chunk["document_id"],
file_metadata = chunk["file_metadata"],
) for chunk in chunks
]
@ -47,3 +49,44 @@ async def add_data_chunks(dataset_data_chunks: dict[str, list[TextChunk]]):
)
return identified_chunks
async def add_data_chunks_basic_rag(dataset_data_chunks: dict[str, list[TextChunk]]):
vector_client = infrastructure_config.get_config("vector_engine")
identified_chunks = []
class PayloadSchema(BaseModel):
text: str = Field(...)
for (dataset_name, chunks) in dataset_data_chunks.items():
try:
await vector_client.create_collection("basic_rag", payload_schema = PayloadSchema)
except Exception as error:
print(error)
dataset_chunks = [
dict(
chunk_id = chunk["chunk_id"],
collection = "basic_rag",
text = chunk["text"],
document_id = chunk["document_id"],
file_metadata = chunk["file_metadata"],
) for chunk in chunks
]
identified_chunks.extend(dataset_chunks)
await vector_client.create_data_points(
"basic_rag",
[
DataPoint[PayloadSchema](
id = chunk["chunk_id"],
payload = PayloadSchema.parse_obj(dict(text = chunk["text"])),
embed_field = "text",
) for chunk in dataset_chunks
],
)
return identified_chunks

View file

@ -1,5 +1,5 @@
from cognee.shared.data_models import Document
from cognee.modules.cognify.graph.add_label_nodes import add_label_nodes
# from cognee.modules.cognify.graph.add_label_nodes import add_label_nodes
from cognee.infrastructure.databases.graph.graph_db_interface import GraphDBInterface
async def add_document_node(graph_client: GraphDBInterface, parent_node_id, document_metadata):
@ -14,9 +14,10 @@ async def add_document_node(graph_client: GraphDBInterface, parent_node_id, docu
file_path = document_metadata["file_path"],
).model_dump()
document["entity_type"] = "Document"
document["type"] = "Document"
await graph_client.add_node(document_id, document)
print(f"Added document node: {document_id}")
await graph_client.add_edge(
parent_node_id,
@ -25,6 +26,7 @@ async def add_document_node(graph_client: GraphDBInterface, parent_node_id, docu
dict(relationship_name = "has_document"),
)
await add_label_nodes(graph_client, document_id, document_metadata["keywords"].split("|"))
#
# await add_label_nodes(graph_client, document_id, document_metadata["keywords"].split("|"))
return document_id

View file

@ -61,8 +61,8 @@ async def add_label_nodes(graph_client, parent_node_id: str, keywords: List[str]
try:
await vector_client.create_collection(parent_node_id, payload_schema = PayloadSchema)
except Exception:
except Exception as e:
# It's ok if the collection already exists.
pass
print(e)
await vector_client.create_data_points(parent_node_id, keyword_data_points)
await vector_client.create_data_points(parent_node_id, keyword_data_points)

View file

@ -7,6 +7,7 @@ async def add_summary_nodes(graph_client, document_id, summary):
summary_node_id,
dict(
name = "Summary",
document_id = document_id,
summary = summary["summary"],
),
)
@ -20,7 +21,8 @@ async def add_summary_nodes(graph_client, document_id, summary):
description_node_id,
dict(
name = "Description",
summary = summary["description"],
document_id= document_id,
description = summary["description"],
),
)

View file

@ -40,11 +40,20 @@ async def add_node(client, parent_id: Optional[str], node_id: str, node_data: di
if node_id != "Relationship_default":
try:
# Attempt to add the node to the graph database
# print('NODE ID', node_id)
# print('NODE DATA', node_data)
result = await client.add_node(node_id, node_properties = node_data)
print("added node", result)
# Add an edge if a parent ID is provided and the graph engine is NETWORKX
if parent_id and "default_relationship" in node_data and infrastructure_config.get_config()["graph_engine"] == GraphDBType.NETWORKX:
await client.add_edge(parent_id, node_id, relationship_name = node_data["default_relationship"]["type"], edge_properties = node_data)
try:
await client.add_edge(parent_id, node_id, relationship_name = node_data["default_relationship"]["type"], edge_properties = node_data)
print("added edge")
except Exception as e:
print(f"Error adding edge: {e}")
pass
except Exception as e:
# Log the exception; consider a logging framework for production use
print(f"Error adding node or edge: {e}")
@ -103,6 +112,7 @@ async def add_node(client, parent_id: Optional[str], node_id: str, node_data: di
async def add_edge(client, parent_id: Optional[str], node_id: str, node_data: dict, created_node_ids):
print('NODE ID', node_data)
if node_id == "Relationship_default" and parent_id:
# Initialize source and target variables outside the loop
@ -137,39 +147,67 @@ async def add_edge(client, parent_id: Optional[str], node_id: str, node_data: di
await client.add_edge(relationship_details['source'], relationship_details['target'], relationship_name = relationship_details['type'])
async def process_attribute(graph_client, parent_id: Optional[str], attribute: str, value: Any, created_node_ids=None):
if created_node_ids is None:
created_node_ids = []
if isinstance(value, BaseModel):
node_id = await generate_node_id(value)
node_data = value.model_dump()
# Use the specified default relationship for the edge between the parent node and the current node
# Add node to the graph
created_node_id = await add_node(graph_client, parent_id, node_id, node_data)
created_node_ids.append(node_id)
created_node_ids.append(created_node_id)
# await add_edge(graph_client, parent_id, node_id, node_data, relationship_data,created_node_ids)
# Add an edge if a default_relationship exists
if hasattr(value, 'default_relationship') and value.default_relationship:
await add_edge(graph_client, parent_id, node_id, value.default_relationship.dict())
# Recursively process nested attributes to ensure all nodes and relationships are added to the graph
for sub_attr, sub_val in value.__dict__.items(): # Access attributes and their values directly
out = await process_attribute(graph_client, node_id, sub_attr, sub_val)
created_node_ids.extend(out)
for sub_attr, sub_val in value.dict().items():
if isinstance(sub_val, (BaseModel, list)): # Check if the value is a model or list
await process_attribute(graph_client, node_id, sub_attr, sub_val, created_node_ids)
elif isinstance(value, list) and all(isinstance(item, BaseModel) for item in value):
# For lists of BaseModel instances, process each item in the list
for item in value:
out = await process_attribute(graph_client, parent_id, attribute, item, created_node_ids)
created_node_ids.extend(out)
await process_attribute(graph_client, parent_id, attribute, item, created_node_ids)
return created_node_ids
# async def process_attribute(graph_client, parent_id: Optional[str], attribute: str, value: Any, created_node_ids=None):
#
# if created_node_ids is None:
# created_node_ids = []
# if isinstance(value, BaseModel):
# node_id = await generate_node_id(value)
# node_data = value.model_dump()
#
# # Use the specified default relationship for the edge between the parent node and the current node
#
# created_node_id = await add_node(graph_client, parent_id, node_id, node_data)
#
# created_node_ids.append(created_node_id)
#
# # await add_edge(graph_client, parent_id, node_id, node_data, relationship_data,created_node_ids)
#
# # Recursively process nested attributes to ensure all nodes and relationships are added to the graph
# # for sub_attr, sub_val in value.__dict__.items(): # Access attributes and their values directly
# # print('SUB ATTR', sub_attr)
# # print('SUB VAL', sub_val)
# #
# # out = await process_attribute(graph_client, node_id, sub_attr, sub_val)
# #
# # created_node_ids.extend(out)
#
# elif isinstance(value, list) and all(isinstance(item, BaseModel) for item in value):
# # For lists of BaseModel instances, process each item in the list
# for item in value:
# out = await process_attribute(graph_client, parent_id, attribute, item, created_node_ids)
# created_node_ids.extend(out)
#
# return created_node_ids
async def process_attribute_edge(graph_client, parent_id: Optional[str], attribute: str, value: Any, created_node_ids=None):
@ -210,20 +248,22 @@ async def create_dynamic(graph_model, graph_client) :
created_node_ids.append(out)
print('CREATED NODE IDS', created_node_ids)
for attribute_name, attribute_value in graph_model:
ids = await process_attribute(graph_client, root_id, attribute_name, attribute_value)
created_node_ids.extend(ids)
flattened_and_deduplicated = list({
item["nodeId"]: item
# Use the 'nodeId' as the unique key in the dictionary comprehension
for sublist in created_node_ids if sublist # Ensure sublist is not None
for item in sublist # Iterate over items in the sublist
}.values())
for attribute_name, attribute_value in graph_model:
ids = await process_attribute_edge(graph_client, root_id, attribute_name, attribute_value, flattened_and_deduplicated)
# created_node_ids.extend(ids)
#
# flattened_and_deduplicated = list({
# item["nodeId"]: item
# # Use the 'nodeId' as the unique key in the dictionary comprehension
# for sublist in created_node_ids if sublist # Ensure sublist is not None
# for item in sublist # Iterate over items in the sublist
# }.values())
#
# for attribute_name, attribute_value in graph_model:
# ids = await process_attribute_edge(graph_client, root_id, attribute_name, attribute_value, flattened_and_deduplicated)
#
return graph_client

View file

@ -1,43 +0,0 @@
from datetime import datetime
from cognee.shared.data_models import DefaultGraphModel, Relationship, UserProperties, UserLocation
from cognee.modules.cognify.graph.create import create_semantic_graph
async def initialize_graph(root_id: str, graphdatamodel, graph_client):
if graphdatamodel:
graph = graphdatamodel(id = root_id)
graph_ = await create_semantic_graph(graph, graph_client)
return graph_
else:
print("Creating default graph")
graph = DefaultGraphModel(
node_id = root_id,
user_properties = UserProperties(
custom_properties = { "age": "30" },
location = UserLocation(
location_id = "ny",
description = "New York",
default_relationship = Relationship(
type = "located_in",
source = "UserProperties",
target = "ny",
)
),
default_relationship = Relationship(
type = "has_properties",
source = root_id,
target = "UserProperties",
)
),
default_relationship = Relationship(
type = "has_properties",
source = root_id,
target = "UserProperties"
),
default_fields = {
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
)
return await create_semantic_graph(graph, graph_client)

View file

@ -21,4 +21,4 @@ async def extract_knowledge_graph(text: str, cognitive_layer, graph_model):
except Exception as error:
# TODO: Log error to Sentry
return await extract_content_graph(text, cognitive_layer, graph_model)
return await extract_content_graph(text, cognitive_layer, graph_model)

View file

@ -29,8 +29,8 @@ class GraphFromText(dspy.Signature):
def are_all_nodes_and_edges_valid(graph: KnowledgeGraph) -> bool:
return all([getattr(node, "entity_type", "").strip() != "" for node in graph.nodes]) and \
all([getattr(node, "entity_name", "").strip() != "" for node in graph.nodes]) and \
return all([getattr(node, "type", "").strip() != "" for node in graph.nodes]) and \
all([getattr(node, "name", "").strip() != "" for node in graph.nodes]) and \
all([getattr(edge, "relationship_name", "").strip() != "" for edge in graph.edges])
def is_node_connected(node: Node, edges: List[Edge]) -> bool:
@ -56,7 +56,7 @@ class ExtractKnowledgeGraph(dspy.Module):
graph = self.generate_graph(text = context).graph
not_valid_nodes_or_edges_message = """
All nodes must contain "entity_name".
All nodes must contain "name".
All edges must contain "relationship_name".
Please add mandatory fields to nodes and edges."""

View file

@ -22,7 +22,7 @@ async def search_adjacent(graph: Union[nx.Graph, any], query: str, infrastructur
if node_id is None:
return {}
from cognee.infrastructure import infrastructure_config
if infrastructure_config.get_config()["graph_engine"] == GraphDBType.NETWORKX:
if node_id not in graph:
return {}

View file

@ -1,11 +1,26 @@
from typing import Union, Dict
import re
from pydantic import BaseModel
from cognee.modules.search.llm.extraction.categorize_relevant_category import categorize_relevant_category
""" Search categories in the graph and return their summary attributes. """
from cognee.shared.data_models import GraphDBType
from cognee.shared.data_models import GraphDBType, DefaultContentPrediction
import networkx as nx
async def search_categories(graph: Union[nx.Graph, any], query_label: str, infrastructure_config: Dict):
def strip_exact_regex(s, substring):
# Escaping substring to be used in a regex pattern
pattern = re.escape(substring)
# Regex to match the exact substring at the start and end
return re.sub(f"^{pattern}|{pattern}$", "", s)
class DefaultResponseModel(BaseModel):
document_id: str
async def search_categories(query:str, graph: Union[nx.Graph, any], query_label: str=None, infrastructure_config: Dict=None):
"""
Filter nodes in the graph that contain the specified label and return their summary attributes.
This function supports both NetworkX graphs and Neo4j graph databases.
@ -21,9 +36,22 @@ async def search_categories(graph: Union[nx.Graph, any], query_label: str, infra
each representing a node with 'nodeId' and 'summary'.
"""
# Determine which client is in use based on the configuration
from cognee.infrastructure import infrastructure_config
if infrastructure_config.get_config()["graph_engine"] == GraphDBType.NETWORKX:
# Logic for NetworkX
return {node: data.get('content_labels') for node, data in graph.nodes(data=True) if query_label in node and 'content_labels' in data}
categories_and_ids = [
{'document_id': strip_exact_regex(_, "DATA_SUMMARY__"), 'Summary': data['summary']}
for _, data in graph.nodes(data=True)
if 'summary' in data
]
connected_nodes = []
for id in categories_and_ids:
print("id", id)
connected_nodes.append(list(graph.neighbors(id['document_id'])))
check_relevant_category = await categorize_relevant_category(query, categories_and_ids, response_model=DefaultResponseModel )
connected_nodes = list(graph.neighbors(check_relevant_category['document_id']))
descriptions = {node: graph.nodes[node].get('description', 'No desc available') for node in connected_nodes}
return descriptions
elif infrastructure_config.get_config()["graph_engine"] == GraphDBType.NEO4J:
# Logic for Neo4j

View file

@ -0,0 +1,24 @@
from typing import Union, Dict
import re
import networkx as nx
from pydantic import BaseModel
from cognee.modules.search.llm.extraction.categorize_relevant_category import categorize_relevant_category
from cognee.shared.data_models import GraphDBType
async def search_cypher(query:str, graph: Union[nx.Graph, any]):
"""
Use a Cypher query to search the graph and return the results.
"""
from cognee.infrastructure import infrastructure_config
if infrastructure_config.get_config()["graph_engine"] == GraphDBType.NEO4J:
result = await graph.run(query)
return result
else:
raise ValueError("Unsupported graph engine type.")

View file

@ -1,11 +1,13 @@
""" Fetches the context of a given node in the graph"""
from typing import Union, Dict
from neo4j import AsyncSession
from cognee.infrastructure.databases.graph.get_graph_client import get_graph_client
import networkx as nx
from cognee.shared.data_models import GraphDBType
async def search_neighbour(graph: Union[nx.Graph, any], id: str, infrastructure_config: Dict,
async def search_neighbour(graph: Union[nx.Graph, any], node_id: str,
other_param: dict = None):
"""
Search for nodes that share the same 'layer_uuid' as the specified node and return their descriptions.
@ -20,26 +22,23 @@ async def search_neighbour(graph: Union[nx.Graph, any], id: str, infrastructure_
Returns:
- List[str]: A list of 'description' attributes of nodes that share the same 'layer_uuid' with the specified node.
"""
node_id = other_param.get('node_id') if other_param else None
from cognee.infrastructure import infrastructure_config
if node_id is None:
node_id = other_param.get('node_id') if other_param else None
if node_id is None:
return []
if infrastructure_config.get_config()["graph_engine"] == GraphDBType.NETWORKX:
if isinstance(graph, nx.Graph):
if node_id not in graph:
return []
relevant_context = []
target_layer_uuid = graph.nodes[node_id].get('layer_uuid')
relevant_context = []
target_layer_uuid = graph.nodes[node_id].get('layer_uuid')
for n, attr in graph.nodes(data=True):
if attr.get('layer_uuid') == target_layer_uuid and 'description' in attr:
relevant_context.append(attr['description'])
for n, attr in graph.nodes(data=True):
if attr.get('layer_uuid') == target_layer_uuid and 'description' in attr:
relevant_context.append(attr['description'])
return relevant_context
return relevant_context
else:
raise ValueError("Graph object does not match the specified graph engine type in the configuration.")
elif infrastructure_config.get_config()["graph_engine"] == GraphDBType.NEO4J:
if isinstance(graph, AsyncSession):

View file

@ -3,9 +3,19 @@
from typing import Union, Dict
import networkx as nx
from cognee.shared.data_models import GraphDBType
from cognee.infrastructure import infrastructure_config
async def search_summary(graph: Union[nx.Graph, any], query: str, infrastructure_config: Dict, other_param: str = None) -> Dict[str, str]:
from cognee.modules.search.llm.extraction.categorize_relevant_summary import categorize_relevant_summary
from cognee.shared.data_models import GraphDBType, ResponseSummaryModel
import re
def strip_exact_regex(s, substring):
# Escaping substring to be used in a regex pattern
pattern = re.escape(substring)
# Regex to match the exact substring at the start and end
return re.sub(f"^{pattern}|{pattern}$", "", s)
async def search_summary( query: str, graph: Union[nx.Graph, any]) -> Dict[str, str]:
"""
Filter nodes based on a condition (such as containing 'SUMMARY' in their identifiers) and return their summary attributes.
Supports both NetworkX graphs and Neo4j graph databases based on the configuration.
@ -19,8 +29,24 @@ async def search_summary(graph: Union[nx.Graph, any], query: str, infrastructure
Returns:
- Dict[str, str]: A dictionary where keys are node identifiers containing the query string, and values are their 'summary' attributes.
"""
if infrastructure_config.get_config()["graph_engine"] == GraphDBType.NETWORKX:
return {node: data.get('summary') for node, data in graph.nodes(data=True) if query in node and 'summary' in data}
print("graph", graph)
summaries_and_ids = [
{'document_id': strip_exact_regex(_, "DATA_SUMMARY__"), 'Summary': data['summary']}
for _, data in graph.nodes(data=True)
if 'summary' in data
]
print("summaries_and_ids", summaries_and_ids)
check_relevant_summary = await categorize_relevant_summary(query, summaries_and_ids, response_model=ResponseSummaryModel)
print("check_relevant_summary", check_relevant_summary)
connected_nodes = list(graph.neighbors(check_relevant_summary['document_id']))
print("connected_nodes", connected_nodes)
descriptions = {node: graph.nodes[node].get('description', 'No desc available') for node in connected_nodes}
print("descs", descriptions)
return descriptions
elif infrastructure_config.get_config()["graph_engine"] == GraphDBType.NEO4J:
cypher_query = f"""

View file

@ -0,0 +1,16 @@
from typing import Type
from pydantic import BaseModel
from cognee.infrastructure.llm.prompts import render_prompt
from cognee.infrastructure.llm.get_llm_client import get_llm_client
async def categorize_relevant_category(query: str, summary, response_model: Type[BaseModel]):
llm_client = get_llm_client()
enriched_query= render_prompt("categorize_categories.txt", {"query": query, "categories": summary})
system_prompt = " Choose the relevant categories and return appropriate output based on the model"
llm_output = await llm_client.acreate_structured_output(enriched_query, system_prompt, response_model)
return llm_output.model_dump()

View file

@ -0,0 +1,17 @@
from typing import Type
from pydantic import BaseModel
from cognee.infrastructure.llm.prompts import render_prompt
from cognee.infrastructure.llm.get_llm_client import get_llm_client
async def categorize_relevant_summary(query: str, summary, response_model: Type[BaseModel]):
llm_client = get_llm_client()
enriched_query= render_prompt("categorize_summary.txt", {"query": query, "summaries": summary})
print("enriched_query", enriched_query)
system_prompt = " Choose the relevant summary and return appropriate output based on the model"
llm_output = await llm_client.acreate_structured_output(enriched_query, system_prompt, response_model)
return llm_output.model_dump()

View file

@ -0,0 +1,17 @@
import logging
from typing import List, Dict
from cognee.infrastructure import infrastructure_config
from.extraction.categorize_relevant_summary import categorize_relevant_summary
logger = logging.getLogger(__name__)
async def get_cognitive_layers(content: str, categories: List[Dict]):
try:
return (await categorize_relevant_summary(
content,
categories[0],
infrastructure_config.get_config()["categorize_summary_model"]
)).cognitive_layers
except Exception as error:
logger.error("Error extracting cognitive layers from content: %s", error, exc_info = True)
raise error

View file

@ -0,0 +1 @@
""" Placeholder for BM25 implementation"""

View file

@ -0,0 +1 @@
"""Placeholder for fusions search implementation"""

View file

@ -11,6 +11,8 @@ async def search_similarity(query: str, graph):
layer_nodes = await graph_client.get_layer_nodes()
unique_layer_uuids = set(node["layer_id"] for node in layer_nodes)
print("unique_layer_uuids", unique_layer_uuids)
graph_nodes = []
@ -18,6 +20,8 @@ async def search_similarity(query: str, graph):
vector_engine = infrastructure_config.get_config()["vector_engine"]
results = await vector_engine.search(layer_id, query_text = query, limit = 10)
print("results", results)
print("len_rs", len(results))
if len(results) > 0:
graph_nodes.extend([
@ -25,25 +29,33 @@ async def search_similarity(query: str, graph):
layer_id = result.payload["references"]["cognitive_layer"],
node_id = result.payload["references"]["node_id"],
score = result.score,
) for result in results if result.score > 0.8
) for result in results if result.score > 0.3
])
if len(graph_nodes) == 0:
return []
relevant_context = []
for graph_node_data in graph_nodes:
graph_node = await graph_client.extract_node(graph_node_data["node_id"])
return graph_nodes
if "chunk_collection" not in graph_node and "chunk_id" not in graph_node:
continue
vector_point = await vector_engine.retrieve(
graph_node["chunk_collection"],
graph_node["chunk_id"],
)
relevant_context.append(vector_point.payload["text"])
return deduplicate(relevant_context)
# for graph_node_data in graph_nodes:
# if graph_node_data['score'] >0.8:
# graph_node = await graph_client.extract_node(graph_node_data["node_id"])
#
# if "chunk_collection" not in graph_node and "chunk_id" not in graph_node:
# continue
#
# vector_point = await vector_engine.retrieve(
# graph_node["chunk_collection"],
# graph_node["chunk_id"],
# )
#
# print("vector_point", vector_point.payload["text"])
#
# relevant_context.append(vector_point.payload["text"])
#
# print(relevant_context)
#
# return deduplicate(relevant_context)

View file

View file

@ -0,0 +1,14 @@
from typing import Type, List
from pydantic import BaseModel
from cognee.infrastructure.llm.prompts import read_query_prompt
from cognee.infrastructure.llm.get_llm_client import get_llm_client
async def extract_categories(content: str, response_model: Type[BaseModel]):
llm_client = get_llm_client()
system_prompt = read_query_prompt("extract_topology.txt")
llm_output = await llm_client.acreate_structured_output(content, system_prompt, response_model)
return llm_output.model_dump()

View file

@ -0,0 +1,19 @@
import logging
from typing import List, Dict
from cognee.infrastructure import infrastructure_config
from cognee.modules.topology.extraction.extract_topology import extract_categories
logger = logging.getLogger(__name__)
async def infer_data_topology(content: str, graph_topology=None):
if graph_topology is None:
graph_topology = infrastructure_config.get_config()["graph_topology"]
try:
return (await extract_categories(
content,
graph_topology
))
except Exception as error:
logger.error("Error extracting cognitive layers from content: %s", error, exc_info = True)
raise error

View file

@ -0,0 +1,178 @@
import os
import glob
from pydantic import BaseModel, create_model
from typing import Dict, Type, Any
from pydantic import BaseModel, Field
from typing import Dict, List, Optional, Union
from datetime import datetime
from cognee import config
from cognee.infrastructure import infrastructure_config
from cognee.modules.topology.infer_data_topology import infer_data_topology
# class UserLocation(BaseModel):
# location_id: str
# description: str
# default_relationship: Relationship = Relationship(type = "located_in")
#
# class UserProperties(BaseModel):
# custom_properties: Optional[Dict[str, Any]] = None
# location: Optional[UserLocation] = None
#
# class DefaultGraphModel(BaseModel):
# node_id: str
# user_properties: UserProperties = UserProperties()
# documents: List[Document] = []
# default_fields: Optional[Dict[str, Any]] = {}
# default_relationship: Relationship = Relationship(type = "has_properties")
#
class Relationship(BaseModel):
type: str = Field(..., description="The type of relationship, e.g., 'belongs_to'.")
source: Optional[str] = Field(None, description="The identifier of the source id of in the relationship being a directory or subdirectory")
target: Optional[str] = Field(None, description="The identifier of the target id in the relationship being the directory, subdirectory or file")
properties: Optional[Dict[str, Any]] = Field(None, description="A dictionary of additional properties and values related to the relationship.")
class Document(BaseModel):
node_id: str
title: str
description: Optional[str] = None
default_relationship: Relationship
class DirectoryModel(BaseModel):
node_id: str
path: str
summary: str
documents: List[Document] = []
subdirectories: List['DirectoryModel'] = []
default_relationship: Relationship
DirectoryModel.update_forward_refs()
class DirMetadata(BaseModel):
node_id: str
summary: str
owner: str
description: Optional[str] = None
directories: List[DirectoryModel] = []
documents: List[Document] = []
default_relationship: Relationship
class GitHubRepositoryModel(BaseModel):
node_id: str
metadata: DirMetadata
root_directory: DirectoryModel
class TopologyEngine:
def __init__(self):
self.models: Dict[str, Type[BaseModel]] = {}
async def populate_model(self, directory_path, file_structure, parent_id=None):
directory_id = os.path.basename(directory_path) or "root"
directory = DirectoryModel(
node_id=directory_id,
path=directory_path,
summary=f"Contents of {directory_id}",
default_relationship=Relationship(type="contains", source=parent_id, target=directory_id)
)
for key, value in file_structure.items():
if isinstance(value, dict):
# Recurse into subdirectory
subdirectory_path = os.path.join(directory_path, key)
subdirectory = await self.populate_model(subdirectory_path, value, parent_id=directory_id)
directory.subdirectories.append(subdirectory)
elif isinstance(value, tuple) and value[0] == 'file':
# Handle file
document = Document(
node_id=key,
title=key,
default_relationship=Relationship(type="contained_by", source=key, target=directory_id)
)
directory.documents.append(document)
return directory
async def infer_from_directory_structure(self, node_id:str, repository: str, model):
""" Infer the topology of a repository from its file structure """
path = infrastructure_config.get_config()["data_root_directory"]
path = path +"/"+ str(repository)
print(path)
if not os.path.exists(path):
raise FileNotFoundError(f"No such directory: {path}")
root = {}
for filename in glob.glob(f"{path}/**", recursive=True):
parts = os.path.relpath(filename, start=path).split(os.path.sep)
current = root
for part in parts[:-1]: # Traverse/create to the last directory
if part not in current:
current[part] = {}
current = current[part]
last_part = parts[-1]
if os.path.isfile(filename):
current[last_part] = ("file", ...) # Placeholder for file content or metadata
elif os.path.isdir(filename):
if last_part not in current: # Only create a new directory entry if it doesn't exist
current[last_part] = {}
root_directory = await self.populate_model('/', root)
# repository_metadata = await infer_data_topology(str(root), DirMetadata)
repository_metadata = DirMetadata(
node_id="repo1",
summary="Example repository",
owner="user1",
directories=[root_directory],
documents=[],
default_relationship=Relationship(type="contained_by", source="repo1", target=node_id)
)
active_model = GitHubRepositoryModel(
node_id=node_id,
metadata=repository_metadata,
root_directory=root_directory
)
return active_model
# print(github_repo_model)
def load(self, model_name: str):
return self.models.get(model_name)
def extrapolate(self, model_name: str):
# This method would be implementation-specific depending on what "extrapolate" means
pass
if __name__ == "__main__":
data_directory_path = os.path.abspath("../../../.data")
print(data_directory_path)
config.data_root_directory(data_directory_path)
cognee_directory_path = os.path.abspath("../.cognee_system")
config.system_root_directory(cognee_directory_path)
async def main():
engine = TopologyEngine()
# model = engine.load("GitHubRepositoryModel")
# if model is None:
# raise ValueError("Model not found")
result = await engine.infer("example")
print(result)
import asyncio
asyncio.run(main())
# result = engine.extrapolate("GitHubRepositoryModel")
# print(result)

View file

@ -0,0 +1,25 @@
from enum import Enum
from typing import List
from pydantic import BaseModel
class TextSubclass(str, Enum):
SOURCE_CODE = "Source code in various programming languages"
SHELL_SCRIPTS = "Shell commands and scripts"
MARKUP_LANGUAGES = "Markup languages (HTML, XML)"
STYLESHEETS = "Stylesheets (CSS) and configuration files (YAML, JSON, INI)"
OTHER = "Other that does not fit into any of the above categories"
class ContentType(BaseModel):
"""Base class for content type, storing type of content as string."""
type: str = "TEXT"
class TextContent(ContentType):
"""Textual content class for more specific text categories."""
subclass: List[TextSubclass]
class CodeContentPrediction(BaseModel):
"""Model to predict the type of content."""
label: TextContent

View file

@ -0,0 +1,36 @@
from pydantic import BaseModel
from typing import List, Optional, Dict, Any, Union
class Relationship(BaseModel):
type: str
attributes: Optional[Dict[str, Any]] = {}
class Document(BaseModel):
name: str
content: str
filetype: str
class Directory(BaseModel):
name: str
documents: List[Document] = []
directories: List['Directory'] = []
# Allows recursive Directory Model
Directory.update_forward_refs()
class RepositoryProperties(BaseModel):
custom_properties: Optional[Dict[str, Any]] = None
location: Optional[str] = None # Simplified location reference
class RepositoryNode(BaseModel):
node_id: str
node_type: str # 'document' or 'directory'
properties: RepositoryProperties = RepositoryProperties()
content: Union[Document, Directory, None] = None
relationships: List[Relationship] = []
class RepositoryGraphModel(BaseModel):
root: RepositoryNode
default_relationships: List[Relationship] = []

View file

@ -0,0 +1,84 @@
from typing import List, Union, Literal, Optional
from pydantic import BaseModel
class BaseClass(BaseModel):
id: str
name: str
type: Literal["Class"] = "Class"
description: str
constructor_parameters: Optional[List[str]]
class Class(BaseModel):
id: str
name: str
type: Literal["Class"] = "Class"
description: str
constructor_parameters: Optional[List[str]]
from_class: Optional[BaseClass]
class ClassInstance(BaseModel):
id: str
name: str
type: Literal["ClassInstance"] = "ClassInstance"
description: str
from_class: Class
class Function(BaseModel):
id: str
name: str
type: Literal["Function"] = "Function"
description: str
parameters: Optional[List[str]]
return_type: str
is_static: Optional[bool] = False
class Variable(BaseModel):
id: str
name: str
type: Literal["Variable"] = "Variable"
description: str
is_static: Optional[bool] = False
default_value: Optional[str]
class Operator(BaseModel):
id: str
name: str
type: Literal["Operator"] = "Operator"
description: str
return_type: str
class ExpressionPart(BaseModel):
id: str
name: str
type: Literal["Expression"] = "Expression"
description: str
expression: str
members: List[Union[Variable, Function, Operator]]
class Expression(BaseModel):
id: str
name: str
type: Literal["Expression"] = "Expression"
description: str
expression: str
members: List[Union[Variable, Function, Operator, ExpressionPart]]
class Edge(BaseModel):
source_node_id: str
target_node_id: str
relationship_name: Literal["called in", "stored in", "defined in", "returned by", "instantiated in", "uses", "updates"]
class SourceCodeGraph(BaseModel):
id: str
name: str
description: str
language: str
nodes: List[Union[
Class,
Function,
Variable,
Operator,
Expression,
ClassInstance,
]]
edges: List[Edge]

View file

@ -7,9 +7,9 @@ from pydantic import BaseModel, Field
class Node(BaseModel):
"""Node in a knowledge graph."""
id: str
entity_name: str
entity_type: str
entity_description: str
name: str
type: str
description: str
class Edge(BaseModel):
"""Edge in a knowledge graph."""
@ -26,8 +26,6 @@ class GraphQLQuery(BaseModel):
"""GraphQL query."""
query: str
class Answer(BaseModel):
"""Answer."""
answer: str
@ -36,13 +34,13 @@ class ChunkStrategy(Enum):
EXACT = "exact"
PARAGRAPH = "paragraph"
SENTENCE = "sentence"
CODE = "code"
class MemorySummary(BaseModel):
""" Memory summary. """
nodes: List[Node] = Field(..., default_factory=list)
edges: List[Edge] = Field(..., default_factory=list)
class TextSubclass(str, Enum):
ARTICLES = "Articles, essays, and reports"
BOOKS = "Books and manuscripts"
@ -107,7 +105,6 @@ class ImageSubclass(str, Enum):
SCREENSHOTS = "Screenshots and graphical user interfaces"
OTHER_IMAGES = "Other types of images"
class VideoSubclass(str, Enum):
MOVIES = "Movies and short films"
DOCUMENTARIES = "Documentaries and educational videos"
@ -183,7 +180,6 @@ class DefaultContentPrediction(BaseModel):
ProceduralContent,
]
class SummarizedContent(BaseModel):
"""Class for a single class label summary and description."""
summary: str
@ -194,7 +190,6 @@ class LabeledContent(BaseModel):
content_labels: str
class CognitiveLayerSubgroup(BaseModel):
""" CognitiveLayerSubgroup in a general layer """
id: int
@ -250,3 +245,17 @@ class DefaultGraphModel(BaseModel):
documents: List[Document] = []
default_fields: Optional[Dict[str, Any]] = {}
default_relationship: Relationship = Relationship(type = "has_properties")
class ResponseSummaryModel(BaseModel):
""" Response summary model and existing document id """
document_id: str
response_summary: str
class MonitoringTool(str, Enum):
""" Monitoring tools """
LANGFUSE = "langfuse"
LLMLITE = "llmlite"
LANGSMITH = "langsmith"

View file

@ -1,5 +1,5 @@
""" This module contains utility functions for the cognee. """
import logging
import os
import uuid
import datetime
@ -20,6 +20,8 @@ config.load()
def send_telemetry(event_name: str):
if os.getenv("TELEMETRY_DISABLED"):
print("Telemetry is disabled.")
logging.info("Telemetry is disabled.")
return
env = os.getenv("ENV")

View file

@ -69,6 +69,26 @@ services:
- cognee_backend
ports:
- "5432:5432"
litellm:
build:
context: .
args:
target: runtime
image: ghcr.io/berriai/litellm:main-latest
ports:
- "4000:4000" # Map the container port to the host, change the host port if necessary
volumes:
- ./litellm-config.yaml:/app/config.yaml # Mount the local configuration file
# You can change the port or number of workers as per your requirements or pass any new supported CLI augument. Make sure the port passed here matches with the container port defined above in `ports` value
command: [ "--config", "/app/config.yaml", "--port", "4000", "--num_workers", "8" ]
falkordb:
image: falkordb/falkordb:edge
container_name: falkordb
ports:
- "6379:6379"
- "3001:3000"
networks:
- cognee_backend
networks:
cognee_backend:

View file

@ -0,0 +1,48 @@
from deepeval.dataset import EvaluationDataset
from deepeval.synthesizer import Synthesizer
import dotenv
from deepeval.test_case import LLMTestCase
dotenv.load_dotenv()
# synthesizer = Synthesizer()
# synthesizer.generate_goldens_from_docs(
# document_paths=['natural_language_processing.txt', 'soldiers_home.pdf', 'trump.txt'],
# max_goldens_per_document=5,
# num_evolutions=5,
# include_expected_output=True,
# enable_breadth_evolve=True,
# )
#
# synthesizer.save_as(
# file_type='json', # or 'csv'
# directory="./synthetic_data"
# )
dataset = EvaluationDataset()
dataset.generate_goldens_from_docs(
document_paths=['natural_language_processing.txt', 'soldiers_home.pdf', 'trump.txt'],
max_goldens_per_document=10,
num_evolutions=5,
enable_breadth_evolve=True,
)
print(dataset.goldens)
print(dataset)
import pytest
from deepeval import assert_test
from deepeval.metrics import AnswerRelevancyMetric
answer_relevancy_metric = AnswerRelevancyMetric(threshold=0.5)
from deepeval import evaluate
# evaluate(dataset, [answer_relevancy_metric])

View file

@ -0,0 +1,147 @@
from deepeval.dataset import EvaluationDataset
from pydantic import BaseModel
from typing import List, Type
from deepeval.test_case import LLMTestCase
from deepeval.dataset import Golden
import dotenv
dotenv.load_dotenv()
from cognee.infrastructure.llm.get_llm_client import get_llm_client
dataset = EvaluationDataset()
dataset.add_test_cases_from_json_file(
# file_path is the absolute path to you .json file
file_path="./synthetic_data/20240519_185842.json",
input_key_name="input",
actual_output_key_name="actual_output",
expected_output_key_name="expected_output",
context_key_name="context"
)
print(dataset)
# from deepeval.synthesizer import Synthesizer
#
# synthesizer = Synthesizer(model="gpt-3.5-turbo")
#
# dataset = EvaluationDataset()
# dataset.generate_goldens_from_docs(
# synthesizer=synthesizer,
# document_paths=['natural_language_processing.txt', 'soldiers_home.pdf', 'trump.txt'],
# max_goldens_per_document=10,
# num_evolutions=5,
# enable_breadth_evolve=True,
# )
print(dataset.goldens)
print(dataset)
import logging
from typing import List, Dict
from cognee.infrastructure import infrastructure_config
logger = logging.getLogger(__name__)
class AnswerModel(BaseModel):
response:str
def get_answer_base(content: str, context:str, response_model: Type[BaseModel]):
llm_client = get_llm_client()
system_prompt = "THIS IS YOUR CONTEXT:" + str(context)
return llm_client.create_structured_output(content, system_prompt, response_model)
def get_answer(content: str,context, model: Type[BaseModel]= AnswerModel):
try:
return (get_answer_base(
content,
context,
model
))
except Exception as error:
logger.error("Error extracting cognitive layers from content: %s", error, exc_info = True)
raise error
async def run_cognify_base_rag():
from cognee.api.v1.add import add
from cognee.api.v1.prune import prune
from cognee.api.v1.cognify.cognify import cognify
await prune.prune_system()
await add("data://test_datasets", "initial_test")
graph = await cognify("initial_test")
pass
async def cognify_search_base_rag(content:str, context:str):
infrastructure_config.set_config({"database_directory_path": "/Users/vasa/Projects/cognee/cognee/.cognee_system/databases/cognee.lancedb"})
vector_client = infrastructure_config.get_config("vector_engine")
return_ = await vector_client.search(collection_name="basic_rag", query_text=content, limit=10)
print("results", return_)
return return_
async def cognify_search_graph(content:str, context:str):
from cognee.api.v1.search.search import search
search_type = 'SIMILARITY'
params = {'query': 'Donald Trump'}
results = await search(search_type, params)
print("results", results)
return results
def convert_goldens_to_test_cases(test_cases_raw: List[LLMTestCase]) -> List[LLMTestCase]:
test_cases = []
for case in test_cases_raw:
test_case = LLMTestCase(
input=case.input,
# Generate actual output using the 'input' and 'additional_metadata'
actual_output= str(get_answer(case.input, case.context).model_dump()['response']),
expected_output=case.expected_output,
context=case.context,
retrieval_context=["retrieval_context"],
)
test_cases.append(test_case)
return test_cases
# # Data preprocessing before setting the dataset test cases
# dataset.test_cases = convert_goldens_to_test_cases(dataset.test_cases)
#
#
# from deepeval.metrics import HallucinationMetric
#
#
# metric = HallucinationMetric()
# dataset.evaluate([metric])
if __name__ == "__main__":
import asyncio
async def main():
# await run_cognify_base_rag()
# await cognify_search_base_rag("show_all_processes", "context")
await cognify_search_graph("show_all_processes", "context")
asyncio.run(main())
# run_cognify_base_rag_and_search()
# # Data preprocessing before setting the dataset test cases
# dataset.test_cases = convert_goldens_to_test_cases(dataset.test_cases)
# from deepeval.metrics import HallucinationMetric
# metric = HallucinationMetric()
# dataset.evaluate([metric])
pass

View file

@ -0,0 +1,2 @@
Natural language processing (NLP) is an interdisciplinary subfield of computer science and information retrieval. It is primarily concerned with giving computers the ability to support and manipulate human language. It involves processing natural language datasets, such as text corpora or speech corpora, using either rule-based or probabilistic (i.e. statistical and, most recently, neural network-based) machine learning approaches. The goal is a computer capable of "understanding"[citation needed] the contents of documents, including the contextual nuances of the language within them. To this end, natural language processing often borrows ideas from theoretical linguistics. The technology can then accurately extract information and insights contained in the documents as well as categorize and organize the documents themselves.
Challenges in natural language processing frequently involve speech recognition, natural-language understanding, and natural-language generation.

Binary file not shown.

View file

@ -0,0 +1,15 @@
Donald Trump flirted with the idea of being president for three terms a clear violation of the US constitution during a bombastic speech for the National Rifle Association in which he vowed to reverse gun safety measures green-lighted during the Biden administration.
“You know, FDR 16 years almost 16 years he was four terms. I dont know, are we going to be considered three-term? Or two-term?” The ex-president and GOP presidential frontrunner said to the organizations annual convention in Dallas, prompting some in the crowd to yell “three!” Politico reported.
Trump has floated a third term in past comments, even mentioning a prolonged presidency while campaigning in 2020. He has also tried distancing himself from this idea, telling Time magazine in April: “I wouldnt be in favor of it at all. I intend to serve four years and do a great job.”
The 22nd amendment, which was enacted following Franklin Delano Rosevelts fourth term, limits the presidency to two terms.
In his speech to the NRA, Trump spoke on abortion, immigration and criticized Robert F Kennedy Jr as being part of the “radical left”. He also complained about the multiple criminal cases against him, including a gag order that bars him from commenting about witnesses in his ongoing New York City criminal trial.
Trump has the NRAs endorsement, but the organization has recently been reeling from legal and financial woe and is not quite the force in US politics it once was.
The NRA is holding its convention less than three months after its former long-serving leader Wayne LaPierre as well as other executives of the group were held liable in a lawsuit centered on the organizations lavish spending.
Trump, who said he heard that gun owners “dont vote,” pushed NRA members to hit the polls in November: “Lets be rebellious and vote this time, OK?”

View file

@ -38,7 +38,7 @@ greenlet = "^3.0.3"
ruff = "^0.2.2"
filetype = "^1.2.0"
nltk = "^3.8.1"
dlt = "^0.4.7"
dlt = "0.4.10"
duckdb = {version = "^0.10.0", extras = ["dlt"]}
overrides = "^7.7.0"
aiofiles = "^23.2.1"
@ -60,8 +60,18 @@ tiktoken = "^0.6.0"
dspy-ai = "2.4.3"
posthog = "^3.5.0"
lancedb = "^0.6.10"
importlib-metadata = "6.8.0"
litellm = "^1.37.3"
groq = "^0.5.0"
tantivy = "^0.21.0"
python-multipart = "^0.0.9"
langfuse = "^2.32.0"
spacy = "^3.7.4"
protobuf = "<5.0.0"
langchain-community = "0.0.38"
deepeval = "^0.21.42"
falkordb = "^1.0.4"
[tool.poetry.extras]