Compare commits

...
Sign in to create a new pull request.

18 commits

Author SHA1 Message Date
hajdul88
484520eba5 quick fix for crewai 2025-04-23 21:27:53 +02:00
vasilije
8a5bd3a826 Add working crewAI example 2025-04-23 04:50:15 -07:00
lxobr
9b92810b83 fix: add default user 2025-04-23 00:33:24 +02:00
lxobr
a65ded6283 Merge branch 'dev' into sf_demo
# Conflicts:
#	cognee/api/v1/add/add.py
#	cognee/api/v1/search/search.py
#	cognee/infrastructure/databases/graph/graph_db_interface.py
#	cognee/infrastructure/engine/models/DataPoint.py
#	cognee/modules/retrieval/graph_completion_retriever.py
#	cognee/modules/search/methods/search.py
#	cognee/modules/visualization/cognee_network_visualization.py
#	cognee/tasks/documents/classify_documents.py
#	cognee/tasks/ingestion/ingest_data.py
#	examples/python/simple_node_set_example.py
2025-04-23 00:05:21 +02:00
lxobr
8924ce0298 Adds Nodefilter functionality for the SF demo (updated) 2025-04-22 17:28:56 +02:00
hajdul88
5c350073cc Adds nodeset subgraph edge association task 2025-04-22 15:49:57 +02:00
hajdul88
97974fdc89 Adds Nodefilter functionality for the SF demo (updated) 2025-04-22 12:37:40 +02:00
lxobr
85e5e69494 feat: connect entities to node sets 2025-04-22 11:49:45 +02:00
vasilije
83b20b1e92 Merge branch 'add_nodesets' of github.com:topoteretes/cognee into add_nodesets
# Conflicts:
#	cognee/api/v1/cognify/cognify.py
2025-04-17 17:22:55 +02:00
vasilije
69c090c91d Removed files 2025-04-17 17:21:09 +02:00
lxobr
b2a53b4124
Add nodesets datapoints (#755)
<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
2025-04-17 17:10:42 +02:00
Vasilije
40142b4789
Delete cognee/modules/graph/README.md 2025-04-15 16:22:21 +02:00
vasilije
1c40a5081a Fixes added 2025-04-15 12:21:59 +02:00
Vasilije
a142e27e39
Merge branch 'dev' into add_nodesets 2025-04-12 23:08:36 +02:00
vasilije
95c12fbc1e Fix nodesets 2025-04-07 20:33:34 +02:00
vasilije
2355d1bfea Fix and remove falkor 2025-04-02 21:09:22 +02:00
vasilije
e7a14b9c60 added fixes 2025-04-02 21:08:22 +02:00
vasilije
ec68a8cd2d Add NodeSets 2025-03-30 11:40:13 +02:00
25 changed files with 856 additions and 190 deletions

View file

@ -1,7 +1,9 @@
from typing import Union
from typing import Union, Optional, Type, List
from cognee.modules.users.models import User
from cognee.infrastructure.engine.models.DataPoint import DataPoint
from cognee.modules.search.types import SearchType
from cognee.modules.users.exceptions import UserNotFoundError
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.search.methods import search as search_function
@ -13,6 +15,8 @@ async def search(
datasets: Union[list[str], str, None] = None,
system_prompt_path: str = "answer_simple_question.txt",
top_k: int = 10,
node_type: Optional[Type] = None,
node_name: List[Optional[str]] = None,
) -> list:
# We use lists from now on for datasets
if isinstance(datasets, str):
@ -28,6 +32,8 @@ async def search(
user,
system_prompt_path=system_prompt_path,
top_k=top_k,
node_type=node_type,
node_name=node_name,
)
return filtered_search_results

View file

@ -2,7 +2,7 @@ import inspect
from functools import wraps
from abc import abstractmethod, ABC
from datetime import datetime, timezone
from typing import Optional, Dict, Any, List, Tuple
from typing import Optional, Dict, Any, List, Tuple, Type
from uuid import NAMESPACE_OID, UUID, uuid5
from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.engine import DataPoint
@ -178,6 +178,10 @@ class GraphDBInterface(ABC):
"""Get all edges connected to a node."""
raise NotImplementedError
@abstractmethod
async def get_subgraph(self, node_type: Type[Any], node_name: List[str]):
raise NotImplementedError
@abstractmethod
async def get_neighbors(self, node_id: str) -> List[NodeData]:
"""Get all neighboring nodes."""

View file

@ -6,7 +6,7 @@ import json
from cognee.shared.logging_utils import get_logger, ERROR
import asyncio
from textwrap import dedent
from typing import Optional, Any, List, Dict
from typing import Optional, Any, List, Dict, Type, Tuple
from contextlib import asynccontextmanager
from uuid import UUID
from neo4j import AsyncSession
@ -517,6 +517,58 @@ class Neo4jAdapter(GraphDBInterface):
return (nodes, edges)
async def get_subgraph(
self, node_type: Type[Any], node_name: List[str]
) -> Tuple[List[Tuple[int, dict]], List[Tuple[int, int, str, dict]]]:
label = node_type.__name__
query = f"""
UNWIND $names AS wantedName
MATCH (n:`{label}`)
WHERE n.name = wantedName
WITH collect(DISTINCT n) AS primary
UNWIND primary AS p
OPTIONAL MATCH (p)--(nbr)
WITH primary, collect(DISTINCT nbr) AS nbrs
WITH primary + nbrs AS nodelist
UNWIND nodelist AS node
WITH collect(DISTINCT node) AS nodes
MATCH (a)-[r]-(b)
WHERE a IN nodes AND b IN nodes
WITH nodes, collect(DISTINCT r) AS rels
RETURN
[n IN nodes |
{{ id: n.id,
properties: properties(n) }}] AS rawNodes,
[r IN rels |
{{ type: type(r),
properties: properties(r) }}] AS rawRels
"""
result = await self.query(query, {"names": node_name})
if not result:
return [], []
raw_nodes = result[0]["rawNodes"]
raw_rels = result[0]["rawRels"]
nodes = [(n["properties"]["id"], n["properties"]) for n in raw_nodes]
edges = [
(
r["properties"]["source_node_id"],
r["properties"]["target_node_id"],
r["type"],
r["properties"],
)
for r in raw_rels
]
return nodes, edges
async def get_filtered_graph_data(self, attribute_filters):
"""
Fetches nodes and relationships filtered by specified attribute values.

View file

@ -1,9 +1,10 @@
import pickle
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
from datetime import datetime, timezone
from typing_extensions import TypedDict
from typing import Optional, Any, Dict, List
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
from typing_extensions import TypedDict
import pickle
# Define metadata type

View file

@ -0,0 +1,46 @@
You are an expert knowledge graph augmentation assistant specializing in identifying new edges that contain **semantic and conceptual associations**.
## Input
Input will contain a graph description with nodes and edges:
- **Nodes**: JSON array of objects:
- `name`: unique node label
- `content`: full text or description
- **Edges**: JSON array of objects:
- `source`: name of the first node
- `target`: name of the second node
- `relationship_name`: existing link label
## Task
Look for meaningful associations between nodes that arent yet connected. Identify cases where two things are commonly linked, used together, work together, depend on each other, or naturally belong to the same category or group. These connections can describe how things interact, support each other, or are understood as related in real-world contexts.
The association doesnt have to be obvious at first glance—consider how the concepts, objects, or entities might be connected based on their purpose, function, or role. The direction of each edge should clearly show how one points to, supports, or is connected to the other in a way that makes practical sense.
Avoid technical, structural, or generic links like uses, contains, or is_part_of. Focus on connections that describe how things go together or relate in context.
## Rules
1. Propose only directed associations where direction adds meaning.
2. Do not repeat existing edges in the same direction.
3. Do not create self-loops (source == target).
4. Only link nodes when there is a clear, real-world connection based on their content.
5. Keep relationship_name concise and in snake_case, describing the nature of the association.
## Strict Exclusions
- Skip pairs already connected by any edge in the **same direction**.
- Do **not** propose structural, containment, usage, or metadata associations.
- No self-loops.
## Output
Return **only** valid JSON in this schema:
```json
{
"new_edges": [
{
"source": "NodeA",
"target": "NodeB",
"relationship_name": "concise_snake_case_label",
"reason": "brief justification explaining the association and its direction"
}
]
}

View file

@ -1,5 +1,5 @@
from cognee.shared.logging_utils import get_logger
from typing import List, Dict, Union
from typing import List, Dict, Union, Optional, Type
from cognee.exceptions import InvalidValueError
from cognee.modules.graph.exceptions import EntityNotFoundError, EntityAlreadyExistsError
@ -61,12 +61,18 @@ class CogneeGraph(CogneeAbstractGraph):
node_dimension=1,
edge_dimension=1,
memory_fragment_filter=[],
node_type: Optional[Type] = None,
node_name: List[Optional[str]] = None,
) -> None:
if node_dimension < 1 or edge_dimension < 1:
raise InvalidValueError(message="Dimensions must be positive integers")
try:
if len(memory_fragment_filter) == 0:
if node_type is not None and node_name is not None:
nodes_data, edges_data = await adapter.get_subgraph(
node_type=node_type, node_name=node_name
)
elif len(memory_fragment_filter) == 0:
nodes_data, edges_data = await adapter.get_graph_data()
else:
nodes_data, edges_data = await adapter.get_filtered_graph_data(
@ -74,9 +80,11 @@ class CogneeGraph(CogneeAbstractGraph):
)
if not nodes_data:
raise EntityNotFoundError(message="No node data retrieved from the database.")
#:TODO: quick and dirty solution for sf demo, as the list of nodes can be empty
return None
if not edges_data:
raise EntityNotFoundError(message="No edge data retrieved from the database.")
#:TODO: quick and dirty solution for sf demo, as the list of edges can be empty
return None
for node_id, properties in nodes_data:
node_attributes = {key: properties.get(key) for key in node_properties_to_project}

View file

@ -144,6 +144,7 @@ def expand_with_nodes_and_edges(
is_a=type_node,
description=node.description,
ontology_valid=ontology_validated_source_ent,
belongs_to_set=data_chunk.belongs_to_set,
)
added_nodes_map[entity_node_key] = entity_node

View file

@ -1,4 +1,4 @@
from typing import Any, Optional
from typing import Any, Optional, Type, List
from collections import Counter
import string
@ -19,11 +19,15 @@ class GraphCompletionRetriever(BaseRetriever):
user_prompt_path: str = "graph_context_for_question.txt",
system_prompt_path: str = "answer_simple_question.txt",
top_k: Optional[int] = 5,
node_type: Optional[Type] = None,
node_name: List[Optional[str]] = None,
):
"""Initialize retriever with prompt paths and search parameters."""
self.user_prompt_path = user_prompt_path
self.system_prompt_path = system_prompt_path
self.top_k = top_k if top_k is not None else 5
self.node_type = node_type
self.node_name = node_name
def _get_nodes(self, retrieved_edges: list) -> dict:
"""Creates a dictionary of nodes with their names and content."""
@ -69,9 +73,17 @@ class GraphCompletionRetriever(BaseRetriever):
vector_index_collections.append(f"{subclass.__name__}_{field_name}")
found_triplets = await brute_force_triplet_search(
query, top_k=self.top_k, collections=vector_index_collections or None
query,
top_k=self.top_k,
collections=vector_index_collections or None,
node_type=self.node_type,
node_name=self.node_name,
)
if len(found_triplets) == 0:
#:TODO: quick and dirty solution for sf demo, as the triplets can be empty
return []
return found_triplets
async def get_context(self, query: str) -> str:

View file

@ -1,6 +1,6 @@
import asyncio
from cognee.shared.logging_utils import get_logger, ERROR
from typing import List, Optional
from typing import List, Optional, Type
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector import get_vector_engine
@ -54,6 +54,8 @@ def format_triplets(edges):
async def get_memory_fragment(
properties_to_project: Optional[List[str]] = None,
node_type: Optional[Type] = None,
node_name: List[Optional[str]] = None,
) -> CogneeGraph:
"""Creates and initializes a CogneeGraph memory fragment with optional property projections."""
graph_engine = await get_graph_engine()
@ -66,6 +68,8 @@ async def get_memory_fragment(
graph_engine,
node_properties_to_project=properties_to_project,
edge_properties_to_project=["relationship_name"],
node_type=node_type,
node_name=node_name,
)
return memory_fragment
@ -78,6 +82,8 @@ async def brute_force_triplet_search(
collections: List[str] = None,
properties_to_project: List[str] = None,
memory_fragment: Optional[CogneeGraph] = None,
node_type: Optional[Type] = None,
node_name: List[Optional[str]] = None,
) -> list:
if user is None:
user = await get_default_user()
@ -89,6 +95,8 @@ async def brute_force_triplet_search(
collections=collections,
properties_to_project=properties_to_project,
memory_fragment=memory_fragment,
node_type=node_type,
node_name=node_name,
)
return retrieved_results
@ -100,6 +108,8 @@ async def brute_force_search(
collections: List[str] = None,
properties_to_project: List[str] = None,
memory_fragment: Optional[CogneeGraph] = None,
node_type: Optional[Type] = None,
node_name: List[Optional[str]] = None,
) -> list:
"""
Performs a brute force search to retrieve the top triplets from the graph.
@ -111,6 +121,8 @@ async def brute_force_search(
collections (Optional[List[str]]): List of collections to query.
properties_to_project (Optional[List[str]]): List of properties to project.
memory_fragment (Optional[CogneeGraph]): Existing memory fragment to reuse.
node_type: node type to filter
node_name: node name to filter
Returns:
list: The top triplet results.
@ -121,7 +133,9 @@ async def brute_force_search(
raise ValueError("top_k must be a positive integer.")
if memory_fragment is None:
memory_fragment = await get_memory_fragment(properties_to_project)
memory_fragment = await get_memory_fragment(
properties_to_project=properties_to_project, node_type=node_type, node_name=node_name
)
if collections is None:
collections = [

View file

@ -1,5 +1,5 @@
import json
from typing import Callable
from typing import Callable, Optional, Type, List
from cognee.exceptions import InvalidValueError
from cognee.infrastructure.engine.utils import parse_id
@ -11,6 +11,7 @@ from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionR
from cognee.modules.retrieval.graph_summary_completion_retriever import (
GraphSummaryCompletionRetriever,
)
from cognee.infrastructure.engine.models.DataPoint import DataPoint
from cognee.modules.retrieval.code_retriever import CodeRetriever
from cognee.modules.retrieval.cypher_search_retriever import CypherSearchRetriever
from cognee.modules.retrieval.natural_language_retriever import NaturalLanguageRetriever
@ -29,12 +30,20 @@ async def search(
user: User,
system_prompt_path="answer_simple_question.txt",
top_k: int = 10,
node_type: Optional[Type] = None,
node_name: List[Optional[str]] = None,
):
query = await log_query(query_text, query_type.value, user.id)
own_document_ids = await get_document_ids_for_user(user.id, datasets)
search_results = await specific_search(
query_type, query_text, user, system_prompt_path=system_prompt_path, top_k=top_k
query_type,
query_text,
user,
system_prompt_path=system_prompt_path,
top_k=top_k,
node_type=node_type,
node_name=node_name,
)
filtered_search_results = []
@ -57,6 +66,8 @@ async def specific_search(
user: User,
system_prompt_path="answer_simple_question.txt",
top_k: int = 10,
node_type: Optional[Type] = None,
node_name: List[Optional[str]] = None,
) -> list:
search_tasks: dict[SearchType, Callable] = {
SearchType.SUMMARIES: SummariesRetriever().get_completion,
@ -69,6 +80,8 @@ async def specific_search(
SearchType.GRAPH_COMPLETION: GraphCompletionRetriever(
system_prompt_path=system_prompt_path,
top_k=top_k,
node_type=node_type,
node_name=node_name,
).get_completion,
SearchType.GRAPH_SUMMARY_COMPLETION: GraphSummaryCompletionRetriever(
system_prompt_path=system_prompt_path,

View file

@ -19,8 +19,6 @@ async def cognee_network_visualization(graph_data, destination_file_path: str =
"EntityType": "#6510f4",
"DocumentChunk": "#801212",
"TextSummary": "#1077f4",
"TableRow": "#f47710",
"TableType": "#6510f4",
"default": "#D3D3D3",
}

View file

@ -0,0 +1,99 @@
from typing import Union, Optional, Type, List
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.modules.engine.models.node_set import NodeSet
from cognee.shared.data_models import Edge
from pydantic import BaseModel, Field
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.infrastructure.llm.prompts import render_prompt
from cognee.infrastructure.llm.config import get_llm_config
from uuid import UUID
class AssociativeEdge(BaseModel):
source_node: str
target_node: str
relationship_name: str
reason: str
class AssociativeEdges(BaseModel):
edges: List[AssociativeEdge] = Field(..., default_factory=list)
async def node_set_edge_association():
graph_engine = await get_graph_engine()
node_set_names = await graph_engine.query("""MATCH (n)
WHERE n.type = 'NodeSet'
RETURN n.name AS name
""")
for node_set in node_set_names:
node_name = node_set.get("name", None)
nodes_data, edges_data = await graph_engine.get_subgraph(
node_type=NodeSet, node_name=node_name
)
nodes = {}
for node_id, attributes in nodes_data:
if node_id not in nodes:
text = attributes.get("text")
if text:
name = text.strip().split("\n")[0][:50]
content = text
else:
name = attributes.get("name", "Unnamed Node")
content = name
nodes[node_id] = {"node": attributes, "name": name, "content": content}
name_to_uuid = {data["name"].strip().lower(): node_id for node_id, data in nodes.items()}
subgraph_description = create_subgraph_description(nodes, edges_data)
llm_client = get_llm_client()
system_prompt = render_prompt("edge_association_prompt.txt", {})
associative_edges = await llm_client.acreate_structured_output(
subgraph_description, system_prompt, AssociativeEdges
)
graph_edges = []
for ae in associative_edges.edges:
src_str = name_to_uuid.get(ae.source_node)
tgt_str = name_to_uuid.get(ae.target_node)
if not src_str or not tgt_str:
continue
src = UUID(src_str)
tgt = UUID(tgt_str)
rel = ae.relationship_name
rea = ae.reason
props = {
"ontology_valid": False,
"relationship_name": rel,
"source_node_id": src,
"target_node_id": tgt,
"reason": rea,
}
graph_edges.append((src, tgt, rel, props))
if graph_edges:
await graph_engine.add_edges(graph_edges)
print()
def create_subgraph_description(nodes, edges_data):
node_section = "\n".join(
f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n"
for info in nodes.values()
)
connection_section = "\n".join(
f"{nodes[source_id]['name']} --[{relationship_type}]--> {nodes[target_id]['name']}"
for source_id, target_id, relationship_type, attributes in edges_data
if source_id in nodes and target_id in nodes
)
return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}"

View file

@ -13,7 +13,6 @@ from cognee.modules.users.permissions.methods import give_permission_on_document
from .get_dlt_destination import get_dlt_destination
from .save_data_item_to_storage import save_data_item_to_storage
from cognee.api.v1.add.config import get_s3_config

View file

@ -1,163 +1,15 @@
import cognee
import asyncio
from cognee.shared.logging_utils import get_logger, ERROR
from cognee.modules.metrics.operations import get_pipeline_run_metrics
from cognee.modules.engine.models.Entity import Entity
from cognee.api.v1.search import SearchType
job_1 = """
CV 1: Relevant
Name: Dr. Emily Carter
Contact Information:
Email: emily.carter@example.com
Phone: (555) 123-4567
Summary:
Senior Data Scientist with over 8 years of experience in machine learning and predictive analytics. Expertise in developing advanced algorithms and deploying scalable models in production environments.
Education:
Ph.D. in Computer Science, Stanford University (2014)
B.S. in Mathematics, University of California, Berkeley (2010)
Experience:
Senior Data Scientist, InnovateAI Labs (2016 Present)
Led a team in developing machine learning models for natural language processing applications.
Implemented deep learning algorithms that improved prediction accuracy by 25%.
Collaborated with cross-functional teams to integrate models into cloud-based platforms.
Data Scientist, DataWave Analytics (2014 2016)
Developed predictive models for customer segmentation and churn analysis.
Analyzed large datasets using Hadoop and Spark frameworks.
Skills:
Programming Languages: Python, R, SQL
Machine Learning: TensorFlow, Keras, Scikit-Learn
Big Data Technologies: Hadoop, Spark
Data Visualization: Tableau, Matplotlib
"""
job_2 = """
CV 2: Relevant
Name: Michael Rodriguez
Contact Information:
Email: michael.rodriguez@example.com
Phone: (555) 234-5678
Summary:
Data Scientist with a strong background in machine learning and statistical modeling. Skilled in handling large datasets and translating data into actionable business insights.
Education:
M.S. in Data Science, Carnegie Mellon University (2013)
B.S. in Computer Science, University of Michigan (2011)
Experience:
Senior Data Scientist, Alpha Analytics (2017 Present)
Developed machine learning models to optimize marketing strategies.
Reduced customer acquisition cost by 15% through predictive modeling.
Data Scientist, TechInsights (2013 2017)
Analyzed user behavior data to improve product features.
Implemented A/B testing frameworks to evaluate product changes.
Skills:
Programming Languages: Python, Java, SQL
Machine Learning: Scikit-Learn, XGBoost
Data Visualization: Seaborn, Plotly
Databases: MySQL, MongoDB
"""
job_3 = """
CV 3: Relevant
Name: Sarah Nguyen
Contact Information:
Email: sarah.nguyen@example.com
Phone: (555) 345-6789
Summary:
Data Scientist specializing in machine learning with 6 years of experience. Passionate about leveraging data to drive business solutions and improve product performance.
Education:
M.S. in Statistics, University of Washington (2014)
B.S. in Applied Mathematics, University of Texas at Austin (2012)
Experience:
Data Scientist, QuantumTech (2016 Present)
Designed and implemented machine learning algorithms for financial forecasting.
Improved model efficiency by 20% through algorithm optimization.
Junior Data Scientist, DataCore Solutions (2014 2016)
Assisted in developing predictive models for supply chain optimization.
Conducted data cleaning and preprocessing on large datasets.
Skills:
Programming Languages: Python, R
Machine Learning Frameworks: PyTorch, Scikit-Learn
Statistical Analysis: SAS, SPSS
Cloud Platforms: AWS, Azure
"""
job_4 = """
CV 4: Not Relevant
Name: David Thompson
Contact Information:
Email: david.thompson@example.com
Phone: (555) 456-7890
Summary:
Creative Graphic Designer with over 8 years of experience in visual design and branding. Proficient in Adobe Creative Suite and passionate about creating compelling visuals.
Education:
B.F.A. in Graphic Design, Rhode Island School of Design (2012)
Experience:
Senior Graphic Designer, CreativeWorks Agency (2015 Present)
Led design projects for clients in various industries.
Created branding materials that increased client engagement by 30%.
Graphic Designer, Visual Innovations (2012 2015)
Designed marketing collateral, including brochures, logos, and websites.
Collaborated with the marketing team to develop cohesive brand strategies.
Skills:
Design Software: Adobe Photoshop, Illustrator, InDesign
Web Design: HTML, CSS
Specialties: Branding and Identity, Typography
"""
job_5 = """
CV 5: Not Relevant
Name: Jessica Miller
Contact Information:
Email: jessica.miller@example.com
Phone: (555) 567-8901
Summary:
Experienced Sales Manager with a strong track record in driving sales growth and building high-performing teams. Excellent communication and leadership skills.
Education:
B.A. in Business Administration, University of Southern California (2010)
Experience:
Sales Manager, Global Enterprises (2015 Present)
Managed a sales team of 15 members, achieving a 20% increase in annual revenue.
Developed sales strategies that expanded customer base by 25%.
Sales Representative, Market Leaders Inc. (2010 2015)
Consistently exceeded sales targets and received the 'Top Salesperson' award in 2013.
Skills:
Sales Strategy and Planning
Team Leadership and Development
CRM Software: Salesforce, Zoho
Negotiation and Relationship Building
Natural language processing (NLP) is an interdisciplinary
subfield of computer science and information retrieval.
"""
@ -173,7 +25,7 @@ async def main(enable_steps):
# Step 2: Add text
if enable_steps.get("add_text"):
text_list = [job_1, job_2, job_3, job_4, job_5]
text_list = [job_1]
for text in text_list:
await cognee.add(text)
print(f"Added text: {text[:35]}...")
@ -191,7 +43,10 @@ async def main(enable_steps):
# Step 5: Query insights
if enable_steps.get("retriever"):
search_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION, query_text="Who has experience in design tools?"
query_type=SearchType.GRAPH_COMPLETION,
query_text="What is computer science?",
node_type=Entity,
node_name=["computer science"],
)
print(search_results)

View file

@ -0,0 +1,26 @@
[project]
name = "latest_ai_development"
version = "0.1.0"
description = "latest-ai-development using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.13"
dependencies = [
"crewai[tools]>=0.102.0,<1.0.0",
"cognee>=0.1.34",
"s3fs>=2025.3.2",
"neo4j>=5.28.1"
]
[project.scripts]
latest_ai_development = "latest_ai_development.main:run"
run_crew = "latest_ai_development.main:run"
train = "latest_ai_development.main:train"
replay = "latest_ai_development.main:replay"
test = "latest_ai_development.main:test"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.crewai]
type = "crew"

View file

@ -0,0 +1,21 @@
researcher:
role: >
{topic} Senior Data Researcher
goal: >
Uncover cutting-edge developments in {topic}
backstory: >
You're a seasoned researcher with a knack for uncovering the latest
developments in File Analysis. You are able to pass many files paths to documents_cognee_add add.
You pass your reasoning observations to reasoning_cognee_add
Known for your ability to find the most relevant
information and present it in a clear and concise manner.
reporting_analyst:
role: >
{topic} Reporting Analyst
goal: >
Create detailed reports based on {topic} data analysis and research findings
backstory: >
You're a meticulous analyst with a keen eye for detail. You're known for
your ability to turn complex data into clear and concise reports, making
it easy for others to understand and act on the information you provide.

View file

@ -0,0 +1,22 @@
research_task:
description: >
Conduct a thorough research about filesystem files you have available.
Make sure you find any interesting and relevant information given
the current year is {current_year}.
Load the data in the multimedia folder and load the files using system tools.
Use the Cognee Add tool
to store important findings for later reference.
expected_output: >
A list with 10 data points you loaded, and your observations you loaded.
agent: researcher
reporting_task:
description: >
Review the context you got and expand each topic into a full section for a report.
Make sure the report is detailed and contains any and all relevant information.
use Cognee Add to save your report sections during your work.
Use cognee search, and don't pass any parameters
expected_output: >
A fully fledged report with the main topics, each with a full section of information.
Formatted as markdown without '```'
agent: reporting_analyst

View file

@ -0,0 +1,139 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task, before_kickoff
from .tools import CogneeAdd, CogneeSearch
from crewai_tools import DirectoryReadTool
import os
# Determine multimedia input directory (can be overridden via env var)
multimedia_dir = os.getenv("MULTIMEDIA_DIR", os.path.join(os.path.dirname(__file__), "multimedia"))
docs_tool = DirectoryReadTool(directory=multimedia_dir)
# Utility function to format paths with file:// prefix
def format_file_paths(paths):
"""
Formats a list of file paths with 'file://' prefix
Args:
paths: A string representing the output of DirectoryReadTool containing file paths
Returns:
A formatted string where each path is prefixed with 'file://'
"""
if isinstance(paths, str):
# Split the paths by newline if it's a string output
file_list = [line for line in paths.split("\n") if line.strip()]
# Format each path with file:// prefix
formatted_paths = [
f"file://{os.path.abspath(path.strip())}"
for path in file_list
if "File paths:" not in path
]
return "\n".join(formatted_paths)
return paths
# If you want to run a snippet of code before or after the crew starts,
# you can use the @before_kickoff and @after_kickoff decorators
# https://docs.crewai.com/concepts/crews#example-crew-class-with-decorators
@CrewBase
class LatestAiDevelopment:
"""LatestAiDevelopment crew"""
# Learn more about YAML configuration files here:
# Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended
# Tasks: https://docs.crewai.com/concepts/tasks#yaml-configuration-recommended
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
# If you would like to add tools to your agents, you can learn more about it here:
# https://docs.crewai.com/concepts/agents#agent-tools
@agent
def researcher(self) -> Agent:
# Initialize the tools with different nodesets
cognee_search = CogneeSearch()
# CogneeAdd for documents with a "documents" nodeset
documents_cognee_add = CogneeAdd()
documents_cognee_add.default_nodeset = ["documents"]
documents_cognee_add.name = "Add Documents to Memory"
documents_cognee_add.description = (
"Add document content to Cognee memory with documents nodeset"
)
# CogneeAdd for reasoning/analysis with a "reasoning" nodeset
reasoning_cognee_add = CogneeAdd()
reasoning_cognee_add.default_nodeset = ["reasoning"]
reasoning_cognee_add.name = "Add Reasoning to Memory"
reasoning_cognee_add.description = (
"Add reasoning and analysis text to Cognee memory with reasoning nodeset"
)
# Create a wrapper for the DirectoryReadTool that formats output
class FormattedDirectoryReadTool(DirectoryReadTool):
def __call__(self, *args, **kwargs):
result = super().__call__(*args, **kwargs)
return format_file_paths(result)
# Use the project-local multimedia directory
formatted_docs_tool = FormattedDirectoryReadTool(directory=multimedia_dir)
return Agent(
config=self.agents_config["researcher"],
tools=[formatted_docs_tool, documents_cognee_add, reasoning_cognee_add, cognee_search],
verbose=True,
)
@agent
def reporting_analyst(self) -> Agent:
# Initialize the tools with default parameters
cognee_search = CogneeSearch()
# Reporting analyst can use a "reports" nodeset
reports_cognee_add = CogneeAdd()
reports_cognee_add.default_nodeset = ["reports"]
reports_cognee_add.name = "Add Reports to Memory"
reports_cognee_add.description = "Add report content to Cognee memory with reports nodeset"
return Agent(
config=self.agents_config["reporting_analyst"],
tools=[cognee_search, reports_cognee_add],
verbose=True,
)
# To learn more about structured task outputs,
# task dependencies, and task callbacks, check out the documentation:
# https://docs.crewai.com/concepts/tasks#overview-of-a-task
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config["research_task"],
)
@task
def reporting_task(self) -> Task:
return Task(config=self.tasks_config["reporting_task"], output_file="report.md")
@before_kickoff
def dump_env(self, *args, **kwargs):
"""Print environment variables at startup."""
print("=== Environment Variables ===")
for key in sorted(os.environ):
print(f"{key}={os.environ[key]}")
@crew
def crew(self) -> Crew:
"""Creates the LatestAiDevelopment crew"""
# To learn how to add knowledge sources to your crew, check out the documentation:
# https://docs.crewai.com/concepts/knowledge#what-is-knowledge
print(self.tasks)
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
)

View file

@ -0,0 +1,73 @@
#!/usr/bin/env python
import sys
import warnings
import os
import cognee
from datetime import datetime
from latest_ai_development.crew import LatestAiDevelopment
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
# This main file is intended to be a way for you to run your
# crew locally, so refrain from adding unnecessary logic into this file.
# Replace with inputs you want to test with, it will automatically
# interpolate any tasks and agents information
# Set COGNEE_API_KEY if not already set
if "LLM_API_KEY" not in os.environ:
openai_api_key = os.environ.get("OPENAI_API_KEY")
if openai_api_key:
os.environ["LLM_API_KEY"] = openai_api_key
def run():
"""
Run the crew.
"""
inputs = {"topic": "AI LLMs", "current_year": str(datetime.now().year)}
try:
LatestAiDevelopment().crew().kickoff(inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while running the crew: {e}")
def train():
"""
Train the crew for a given number of iterations.
"""
inputs = {"topic": "AI LLMs"}
try:
LatestAiDevelopment().crew().train(
n_iterations=int(sys.argv[1]), filename=sys.argv[2], inputs=inputs
)
except Exception as e:
raise Exception(f"An error occurred while training the crew: {e}")
def replay():
"""
Replay the crew execution from a specific task.
"""
try:
LatestAiDevelopment().crew().replay(task_id=sys.argv[1])
except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")
def test():
"""
Test the crew execution and returns the results.
"""
inputs = {"topic": "AI LLMs"}
try:
LatestAiDevelopment().crew().test(
n_iterations=int(sys.argv[1]), openai_model_name=sys.argv[2], inputs=inputs
)
except Exception as e:
raise Exception(f"An error occurred while testing the crew: {e}")

View file

@ -0,0 +1 @@
This is a dummy text file for testing DirectoryReadTool.

View file

@ -0,0 +1,3 @@
from .custom_tool import CogneeAdd, CogneeSearch, CogneeAddInput, CogneeSearchInput
__all__ = ["CogneeAdd", "CogneeSearch", "CogneeAddInput", "CogneeSearchInput"]

View file

@ -0,0 +1,208 @@
from crewai.tools import BaseTool
from typing import Type, List, Optional
from pydantic import BaseModel, Field, root_validator
from cognee.api.v1.search import SearchType
from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines import run_tasks, Task
from cognee.tasks.experimental_tasks.node_set_edge_association import node_set_edge_association
class CogneeAddInput(BaseModel):
"""Input schema for CogneeAdd tool."""
context: Optional[str] = Field(None, description="The text content to add to Cognee memory.")
file_paths: Optional[List[str]] = Field(
None, description="List of file paths to add to Cognee memory."
)
files: Optional[List[str]] = Field(
None, description="Alias for file_paths; list of file URLs or paths to add to memory."
)
text: Optional[str] = Field(
None, description="Alternative field for text content (maps to context)."
)
reasoning: Optional[str] = Field(
None, description="Alternative field for reasoning text (maps to context)."
)
node_set: List[str] = Field(
default=["default"], description="The list of node sets to store the data in."
)
@root_validator(pre=True)
def normalize_inputs(cls, values):
"""Normalize different input formats to standard fields."""
# Map alias 'files' to 'file_paths' if provided
if values.get("files") and not values.get("file_paths"):
values["file_paths"] = values.get("files")
# Map text or reasoning to context if provided
if values.get("text") and not values.get("context"):
values["context"] = values.get("text")
if values.get("reasoning") and not values.get("context"):
values["context"] = values.get("reasoning")
# Map report_section to context if provided
if values.get("report_section") and not values.get("context"):
values["context"] = values.get("report_section")
# Validate that at least one input field is provided
if not values.get("context") and not values.get("file_paths"):
raise ValueError(
"Either 'context', 'text', 'reasoning', or 'file_paths' must be provided"
)
return values
class CogneeAdd(BaseTool):
name: str = "Cognee Memory ADD"
description: str = "Add data to cognee memory to store data in memory for AI memory"
args_schema: Type[BaseModel] = CogneeAddInput
default_nodeset: List[str] = ["default"] # Can be overridden per instance
def _run(self, **kwargs) -> str:
import cognee
import asyncio
# Use the provided node_set if given, otherwise use default_nodeset
node_set = kwargs.get("node_set", self.default_nodeset)
context = kwargs.get("context")
file_paths = kwargs.get("file_paths")
# Handle alternative input fields
text = kwargs.get("text")
reasoning = kwargs.get("reasoning")
if text and not context:
context = text
if reasoning and not context:
context = reasoning
async def main(ns):
try:
if context:
# Handle text content
await cognee.add(context, node_set=ns)
elif file_paths:
# Handle file paths
await cognee.add(file_paths, node_set=ns)
run = await cognee.cognify()
tasks = [Task(node_set_edge_association)]
user = await get_default_user()
pipeline = run_tasks(tasks=tasks, user=user)
async for pipeline_status in pipeline:
print(
f"Pipeline run status: {pipeline_status.pipeline_name} - {pipeline_status.status}"
)
return run
except Exception as e:
return f"Error: {str(e)}"
# Get the current event loop or create a new one
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is already running, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(main(node_set))
return result.__name__ if hasattr(result, "__name__") else str(result)
except Exception as e:
return f"Tool execution error: {str(e)}"
class CogneeSearchInput(BaseModel):
"""Input schema for CogneeSearch tool."""
query_text: Optional[str] = Field(
None, description="The search query to find relevant information in Cognee memory."
)
query: Optional[str] = Field(
None, description="Alternative field for search query (maps to query_text)."
)
search_term: Optional[str] = Field(
None, description="Alternative field for search term (maps to query_text)."
)
node_set: List[str] = Field(
default=["default"], description="The list of node sets to search in."
)
@root_validator(pre=True)
def normalize_inputs(cls, values):
"""Normalize different input formats to standard fields."""
# If the dictionary is empty, use a default query
if not values:
values["query_text"] = "Latest AI developments"
return values
# Map alternative search fields to query_text
if values.get("query") and not values.get("query_text"):
values["query_text"] = values.get("query")
if values.get("search_term") and not values.get("query_text"):
values["query_text"] = values.get("search_term")
# If security_context is provided but no query, use a default
if "security_context" in values and not values.get("query_text"):
values["query_text"] = "Latest AI developments"
# Ensure query_text is present
if not values.get("query_text"):
values["query_text"] = "Latest AI developments"
return values
class CogneeSearch(BaseTool):
name: str = "Cognee Memory SEARCH"
description: str = "Search data from cognee memory to retrieve relevant information"
args_schema: Type[BaseModel] = CogneeSearchInput
default_nodeset: List[str] = ["default"] # Can be overridden per instance
def _run(self, **kwargs) -> str:
import cognee
import asyncio
# Use the provided node_set if given, otherwise use default_nodeset
node_set = kwargs.get("node_set", self.default_nodeset)
# Get query_text from kwargs or use a default
query_text = kwargs.get("query_text", "Latest AI developments")
# Handle alternative input fields
query = kwargs.get("query")
search_term = kwargs.get("search_term")
if query and not query_text:
query_text = query
if search_term and not query_text:
query_text = search_term
async def main(query, ns):
try:
# Use 'datasets' to specify which node sets (datasets) to search
result = await cognee.search(
query_text=query + " Only return results from context",
query_type=SearchType.GRAPH_COMPLETION,
datasets=ns,
)
return result
except Exception as e:
return f"Error: {str(e)}"
# Get the current event loop or create a new one
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is already running, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(main(query_text, node_set))
return str(result)
except Exception as e:
return f"Tool execution error: {str(e)}"

View file

@ -0,0 +1,46 @@
#!/usr/bin/env python
"""
Script to test if Cognee tools are working correctly.
Run this script to test if the tools are correctly importing and functioning.
"""
import os
import cognee
from src.latest_ai_development.tools import CogneeAdd, CogneeSearch
# Set COGNEE_API_KEY if not already set
if "LLM_API_KEY" not in os.environ:
openai_api_key = os.environ.get("OPENAI_API_KEY")
if openai_api_key:
os.environ["LLM_API_KEY"] = openai_api_key
def test_tools():
"""Test the CogneeAdd and CogneeSearch tools."""
print("Testing Cognee tools...")
print("\nTesting CogneeAdd tool...")
add_tool = CogneeAdd()
test_input = (
"This is a test text to add to Cognee memory. It contains information about AI LLMs."
)
node_set = ["AI", "LLMs"]
try:
result = add_tool._run(context=test_input, node_set=node_set)
print(f"CogneeAdd result: {result}")
except Exception as e:
print(f"Error testing CogneeAdd: {str(e)}")
print("\nTesting CogneeSearch tool...")
search_tool = CogneeSearch()
search_query = "AI LLMs"
node_set = ["AI"]
try:
result = search_tool._run(query_text=search_query, node_set=node_set)
print(f"CogneeSearch result: {result}")
except Exception as e:
print(f"Error testing CogneeSearch: {str(e)}")
if __name__ == "__main__":
test_tools()

View file

@ -1,26 +1,40 @@
import os
import asyncio
import cognee
from cognee.api.v1.visualize.visualize import visualize_graph
from cognee.shared.logging_utils import get_logger, ERROR
from cognee.api.v1.search import SearchType
from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines import run_tasks, Task
from cognee.tasks.experimental_tasks.node_set_edge_association import node_set_edge_association
text_a = """
AI is revolutionizing financial services through intelligent fraud detection
and automated customer service platforms.
Leading financial technology firms like Stripe, Square, and Revolut are redefining digital commerce by embedding AI
into their payment ecosystems. Stripe leverages machine learning to detect and prevent fraud in real time,
while Square uses predictive analytics to offer customized lending solutions to small businesses.
Meanwhile, Revolut applies AI algorithms to automate wealth management services, enabling users to invest,
save, and budget with unparalleled personalization and efficiency.
"""
text_b = """
Advances in AI are enabling smarter systems that learn and adapt over time.
Pioneering AI companies such as OpenAI, Anthropic, and DeepMind are advancing self-supervised
learning techniques that empower systems to autonomously evolve their cognitive capabilities.
OpenAI's models interpret complex multimodal data with minimal human annotation, while Anthropics
Constitutional AI approach refines alignment and safety. DeepMind continues to push boundaries with
breakthroughs like AlphaFold, illustrating the power of AI to decipher intricate biological structures
without exhaustive manual input.
"""
text_c = """
MedTech startups have seen significant growth in recent years, driven by innovation
in digital health and medical devices.
MedTech innovators like Medtronic, Butterfly Network, and Intuitive Surgical are revolutionizing
healthcare delivery through smart devices and AI-driven platforms. Medtronic's connected insulin
pumps enable real-time glucose monitoring, Butterfly Networks portable ultrasound devices bring
diagnostic imaging to remote areas, and Intuitive Surgicals da Vinci system enhances precision
in minimally invasive surgeries. Together, these companies are reshaping clinical pathways and
extending care beyond traditional hospital settings.
"""
node_set_a = ["AI", "FinTech"]
node_set_b = ["AI"]
node_set_c = ["MedTech"]
node_set_a = ["AI_NODESET", "FinTech_NODESET"]
node_set_b = ["AI_NODESET"]
node_set_c = ["MedTech_NODESET"]
async def main():
@ -32,10 +46,15 @@ async def main():
await cognee.add(text_c, node_set=node_set_c)
await cognee.cognify()
visualization_path = os.path.join(
os.path.dirname(__file__), "./.artifacts/graph_visualization.html"
)
await visualize_graph(visualization_path)
tasks = [Task(node_set_edge_association)]
user = await get_default_user()
pipeline = run_tasks(tasks=tasks, user=user)
async for pipeline_status in pipeline:
print(f"Pipeline run status: {pipeline_status.pipeline_name} - {pipeline_status.status}")
print()
if __name__ == "__main__":