Add working crewAI example

This commit is contained in:
vasilije 2025-04-23 04:50:15 -07:00
parent 9b92810b83
commit 8a5bd3a826
9 changed files with 504 additions and 0 deletions

View file

@ -0,0 +1,26 @@
[project]
name = "latest_ai_development"
version = "0.1.0"
description = "latest-ai-development using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.13"
dependencies = [
"crewai[tools]>=0.102.0,<1.0.0",
"cognee>=0.1.34",
"s3fs>=2025.3.2",
"neo4j>=5.28.1"
]
[project.scripts]
latest_ai_development = "latest_ai_development.main:run"
run_crew = "latest_ai_development.main:run"
train = "latest_ai_development.main:train"
replay = "latest_ai_development.main:replay"
test = "latest_ai_development.main:test"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.crewai]
type = "crew"

View file

@ -0,0 +1,21 @@
researcher:
role: >
{topic} Senior Data Researcher
goal: >
Uncover cutting-edge developments in {topic}
backstory: >
You're a seasoned researcher with a knack for uncovering the latest
developments in File Analysis. You are able to pass many files paths to documents_cognee_add add.
You pass your reasoning observations to reasoning_cognee_add
Known for your ability to find the most relevant
information and present it in a clear and concise manner.
reporting_analyst:
role: >
{topic} Reporting Analyst
goal: >
Create detailed reports based on {topic} data analysis and research findings
backstory: >
You're a meticulous analyst with a keen eye for detail. You're known for
your ability to turn complex data into clear and concise reports, making
it easy for others to understand and act on the information you provide.

View file

@ -0,0 +1,22 @@
research_task:
description: >
Conduct a thorough research about filesystem files you have available.
Make sure you find any interesting and relevant information given
the current year is {current_year}.
Load the data in the multimedia folder and load the files using system tools.
Use the Cognee Add tool
to store important findings for later reference.
expected_output: >
A list with 10 data points you loaded, and your observations you loaded.
agent: researcher
reporting_task:
description: >
Review the context you got and expand each topic into a full section for a report.
Make sure the report is detailed and contains any and all relevant information.
use Cognee Add to save your report sections during your work.
Use cognee search, and don't pass any parameters
expected_output: >
A fully fledged report with the main topics, each with a full section of information.
Formatted as markdown without '```'
agent: reporting_analyst

View file

@ -0,0 +1,122 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from .tools import CogneeAdd, CogneeSearch
from crewai_tools import (
DirectoryReadTool
)
import os
docs_tool = DirectoryReadTool(directory='/Users/vasilije/cognee/examples/python/latest_ai_development/src/latest_ai_development/multimedia')
# Utility function to format paths with file:// prefix
def format_file_paths(paths):
"""
Formats a list of file paths with 'file://' prefix
Args:
paths: A string representing the output of DirectoryReadTool containing file paths
Returns:
A formatted string where each path is prefixed with 'file://'
"""
if isinstance(paths, str):
# Split the paths by newline if it's a string output
file_list = [line for line in paths.split('\n') if line.strip()]
# Format each path with file:// prefix
formatted_paths = [f"file://{os.path.abspath(path.strip())}" for path in file_list if "File paths:" not in path]
return '\n'.join(formatted_paths)
return paths
# If you want to run a snippet of code before or after the crew starts,
# you can use the @before_kickoff and @after_kickoff decorators
# https://docs.crewai.com/concepts/crews#example-crew-class-with-decorators
@CrewBase
class LatestAiDevelopment:
"""LatestAiDevelopment crew"""
# Learn more about YAML configuration files here:
# Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended
# Tasks: https://docs.crewai.com/concepts/tasks#yaml-configuration-recommended
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
# If you would like to add tools to your agents, you can learn more about it here:
# https://docs.crewai.com/concepts/agents#agent-tools
@agent
def researcher(self) -> Agent:
# Initialize the tools with different nodesets
cognee_search = CogneeSearch()
# CogneeAdd for documents with a "documents" nodeset
documents_cognee_add = CogneeAdd()
documents_cognee_add.default_nodeset = ["documents"]
documents_cognee_add.name = "Add Documents to Memory"
documents_cognee_add.description = "Add document content to Cognee memory with documents nodeset"
# CogneeAdd for reasoning/analysis with a "reasoning" nodeset
reasoning_cognee_add = CogneeAdd()
reasoning_cognee_add.default_nodeset = ["reasoning"]
reasoning_cognee_add.name = "Add Reasoning to Memory"
reasoning_cognee_add.description = "Add reasoning and analysis text to Cognee memory with reasoning nodeset"
# Create a wrapper for the DirectoryReadTool that formats output
class FormattedDirectoryReadTool(DirectoryReadTool):
def __call__(self, *args, **kwargs):
result = super().__call__(*args, **kwargs)
return format_file_paths(result)
formatted_docs_tool = FormattedDirectoryReadTool(directory='/Users/vasilije/cognee/examples/python/latest_ai_development/src/latest_ai_development/multimedia')
return Agent(
config=self.agents_config["researcher"],
tools=[formatted_docs_tool, documents_cognee_add, reasoning_cognee_add, cognee_search],
verbose=True
)
@agent
def reporting_analyst(self) -> Agent:
# Initialize the tools with default parameters
cognee_search = CogneeSearch()
# Reporting analyst can use a "reports" nodeset
reports_cognee_add = CogneeAdd()
reports_cognee_add.default_nodeset = ["reports"]
reports_cognee_add.name = "Add Reports to Memory"
reports_cognee_add.description = "Add report content to Cognee memory with reports nodeset"
return Agent(
config=self.agents_config["reporting_analyst"],
tools=[cognee_search, reports_cognee_add],
verbose=True,
)
# To learn more about structured task outputs,
# task dependencies, and task callbacks, check out the documentation:
# https://docs.crewai.com/concepts/tasks#overview-of-a-task
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config["research_task"],
)
@task
def reporting_task(self) -> Task:
return Task(config=self.tasks_config["reporting_task"], output_file="report.md")
@crew
def crew(self) -> Crew:
"""Creates the LatestAiDevelopment crew"""
# To learn how to add knowledge sources to your crew, check out the documentation:
# https://docs.crewai.com/concepts/knowledge#what-is-knowledge
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
)

View file

@ -0,0 +1,73 @@
#!/usr/bin/env python
import sys
import warnings
import os
import cognee
from datetime import datetime
from latest_ai_development.crew import LatestAiDevelopment
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
# This main file is intended to be a way for you to run your
# crew locally, so refrain from adding unnecessary logic into this file.
# Replace with inputs you want to test with, it will automatically
# interpolate any tasks and agents information
# Set COGNEE_API_KEY if not already set
if "LLM_API_KEY" not in os.environ:
openai_api_key = os.environ.get("OPENAI_API_KEY")
if openai_api_key:
os.environ["LLM_API_KEY"] = openai_api_key
def run():
"""
Run the crew.
"""
inputs = {"topic": "AI LLMs", "current_year": str(datetime.now().year)}
try:
LatestAiDevelopment().crew().kickoff(inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while running the crew: {e}")
def train():
"""
Train the crew for a given number of iterations.
"""
inputs = {"topic": "AI LLMs"}
try:
LatestAiDevelopment().crew().train(
n_iterations=int(sys.argv[1]), filename=sys.argv[2], inputs=inputs
)
except Exception as e:
raise Exception(f"An error occurred while training the crew: {e}")
def replay():
"""
Replay the crew execution from a specific task.
"""
try:
LatestAiDevelopment().crew().replay(task_id=sys.argv[1])
except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")
def test():
"""
Test the crew execution and returns the results.
"""
inputs = {"topic": "AI LLMs"}
try:
LatestAiDevelopment().crew().test(
n_iterations=int(sys.argv[1]), openai_model_name=sys.argv[2], inputs=inputs
)
except Exception as e:
raise Exception(f"An error occurred while testing the crew: {e}")

View file

@ -0,0 +1,3 @@
from .custom_tool import CogneeAdd, CogneeSearch, CogneeAddInput, CogneeSearchInput
__all__ = ["CogneeAdd", "CogneeSearch", "CogneeAddInput", "CogneeSearchInput"]

View file

@ -0,0 +1,191 @@
from crewai.tools import BaseTool
from typing import Type, Dict, Any, List, Optional, Union
from pydantic import BaseModel, Field, root_validator
from cognee.api.v1.search import SearchType
from cognee.modules.engine.models.Entity import Entity
from cognee.api.v1.search import SearchType
from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines import run_tasks, Task
from cognee.tasks.experimental_tasks.node_set_edge_association import node_set_edge_association
class CogneeAddInput(BaseModel):
"""Input schema for CogneeAdd tool."""
context: Optional[str] = Field(None, description="The text content to add to Cognee memory.")
file_paths: Optional[List[str]] = Field(None, description="List of file paths to add to Cognee memory.")
text: Optional[str] = Field(None, description="Alternative field for text content (maps to context).")
reasoning: Optional[str] = Field(None, description="Alternative field for reasoning text (maps to context).")
node_set: List[str] = Field(
default=["default"], description="The list of node sets to store the data in."
)
@root_validator(pre=True)
def normalize_inputs(cls, values):
"""Normalize different input formats to standard fields."""
# Map text or reasoning to context if provided
if values.get('text') and not values.get('context'):
values['context'] = values.get('text')
if values.get('reasoning') and not values.get('context'):
values['context'] = values.get('reasoning')
# Validate that at least one input field is provided
if not values.get('context') and not values.get('file_paths'):
raise ValueError("Either 'context', 'text', 'reasoning', or 'file_paths' must be provided")
return values
class CogneeAdd(BaseTool):
name: str = "Cognee Memory ADD"
description: str = "Add data to cognee memory to store data in memory for AI memory"
args_schema: Type[BaseModel] = CogneeAddInput
default_nodeset: List[str] = ["default"] # Can be overridden per instance
def _run(self, **kwargs) -> str:
import cognee
import asyncio
# Use the provided node_set if given, otherwise use default_nodeset
node_set = kwargs.get("node_set", self.default_nodeset)
context = kwargs.get("context")
file_paths = kwargs.get("file_paths")
# Handle alternative input fields
text = kwargs.get("text")
reasoning = kwargs.get("reasoning")
if text and not context:
context = text
if reasoning and not context:
context = reasoning
async def main(ns):
try:
if context:
# Handle text content
await cognee.add(context, node_set=ns)
elif file_paths:
# Handle file paths
await cognee.add(file_paths, node_set=ns)
run = await cognee.cognify()
tasks = [Task(node_set_edge_association)]
user = await get_default_user()
pipeline = run_tasks(tasks=tasks, user=user)
async for pipeline_status in pipeline:
print(f"Pipeline run status: {pipeline_status.pipeline_name} - {pipeline_status.status}")
return run
except Exception as e:
return f"Error: {str(e)}"
# Get the current event loop or create a new one
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is already running, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(main(node_set))
return result.__name__ if hasattr(result, "__name__") else str(result)
except Exception as e:
return f"Tool execution error: {str(e)}"
class CogneeSearchInput(BaseModel):
"""Input schema for CogneeSearch tool."""
query_text: Optional[str] = Field(
None, description="The search query to find relevant information in Cognee memory."
)
query: Optional[str] = Field(
None, description="Alternative field for search query (maps to query_text)."
)
search_term: Optional[str] = Field(
None, description="Alternative field for search term (maps to query_text)."
)
node_set: List[str] = Field(
default=["default"], description="The list of node sets to search in."
)
@root_validator(pre=True)
def normalize_inputs(cls, values):
"""Normalize different input formats to standard fields."""
# If the dictionary is empty, use a default query
if not values:
values['query_text'] = "Latest AI developments"
return values
# Map alternative search fields to query_text
if values.get('query') and not values.get('query_text'):
values['query_text'] = values.get('query')
if values.get('search_term') and not values.get('query_text'):
values['query_text'] = values.get('search_term')
# If security_context is provided but no query, use a default
if 'security_context' in values and not values.get('query_text'):
values['query_text'] = "Latest AI developments"
# Ensure query_text is present
if not values.get('query_text'):
values['query_text'] = "Latest AI developments"
return values
class CogneeSearch(BaseTool):
name: str = "Cognee Memory SEARCH"
description: str = "Search data from cognee memory to retrieve relevant information"
args_schema: Type[BaseModel] = CogneeSearchInput
default_nodeset: List[str] = ["default"] # Can be overridden per instance
def _run(self, **kwargs) -> str:
import cognee
import asyncio
# Use the provided node_set if given, otherwise use default_nodeset
node_set = kwargs.get("node_set", self.default_nodeset)
# Get query_text from kwargs or use a default
query_text = kwargs.get("query_text", "Latest AI developments")
# Handle alternative input fields
query = kwargs.get("query")
search_term = kwargs.get("search_term")
if query and not query_text:
query_text = query
if search_term and not query_text:
query_text = search_term
async def main(query, ns):
try:
result = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION,
query_text=query + " Only return results from context",
node_set=ns # Pass the node_set to the search
)
return result
except Exception as e:
return f"Error: {str(e)}"
# Get the current event loop or create a new one
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is already running, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(main(query_text, node_set))
return str(result)
except Exception as e:
return f"Tool execution error: {str(e)}"

View file

@ -0,0 +1,46 @@
#!/usr/bin/env python
"""
Script to test if Cognee tools are working correctly.
Run this script to test if the tools are correctly importing and functioning.
"""
import os
import cognee
from src.latest_ai_development.tools import CogneeAdd, CogneeSearch
# Set COGNEE_API_KEY if not already set
if "LLM_API_KEY" not in os.environ:
openai_api_key = os.environ.get("OPENAI_API_KEY")
if openai_api_key:
os.environ["LLM_API_KEY"] = openai_api_key
def test_tools():
"""Test the CogneeAdd and CogneeSearch tools."""
print("Testing Cognee tools...")
print("\nTesting CogneeAdd tool...")
add_tool = CogneeAdd()
test_input = (
"This is a test text to add to Cognee memory. It contains information about AI LLMs."
)
node_set = ["AI", "LLMs"]
try:
result = add_tool._run(context=test_input, node_set=node_set)
print(f"CogneeAdd result: {result}")
except Exception as e:
print(f"Error testing CogneeAdd: {str(e)}")
print("\nTesting CogneeSearch tool...")
search_tool = CogneeSearch()
search_query = "AI LLMs"
node_set = ["AI"]
try:
result = search_tool._run(query_text=search_query, node_set=node_set)
print(f"CogneeSearch result: {result}")
except Exception as e:
print(f"Error testing CogneeSearch: {str(e)}")
if __name__ == "__main__":
test_tools()