diff --git a/examples/python/latest_ai_development/src/latest_ai_development/crew.py b/examples/python/latest_ai_development/src/latest_ai_development/crew.py index da297e274..e63f73d23 100644 --- a/examples/python/latest_ai_development/src/latest_ai_development/crew.py +++ b/examples/python/latest_ai_development/src/latest_ai_development/crew.py @@ -1,31 +1,36 @@ from crewai import Agent, Crew, Process, Task -from crewai.project import CrewBase, agent, crew, task +from crewai.project import CrewBase, agent, crew, task, before_kickoff from .tools import CogneeAdd, CogneeSearch -from crewai_tools import ( - DirectoryReadTool -) +from crewai_tools import DirectoryReadTool import os -docs_tool = DirectoryReadTool(directory='/Users/vasilije/cognee/examples/python/latest_ai_development/src/latest_ai_development/multimedia') +# Determine multimedia input directory (can be overridden via env var) +multimedia_dir = os.getenv("MULTIMEDIA_DIR", os.path.join(os.path.dirname(__file__), "multimedia")) +docs_tool = DirectoryReadTool(directory=multimedia_dir) + # Utility function to format paths with file:// prefix def format_file_paths(paths): """ Formats a list of file paths with 'file://' prefix - + Args: paths: A string representing the output of DirectoryReadTool containing file paths - + Returns: A formatted string where each path is prefixed with 'file://' """ if isinstance(paths, str): # Split the paths by newline if it's a string output - file_list = [line for line in paths.split('\n') if line.strip()] + file_list = [line for line in paths.split("\n") if line.strip()] # Format each path with file:// prefix - formatted_paths = [f"file://{os.path.abspath(path.strip())}" for path in file_list if "File paths:" not in path] - return '\n'.join(formatted_paths) + formatted_paths = [ + f"file://{os.path.abspath(path.strip())}" + for path in file_list + if "File paths:" not in path + ] + return "\n".join(formatted_paths) return paths @@ -50,38 +55,43 @@ class LatestAiDevelopment: def researcher(self) -> Agent: # Initialize the tools with different nodesets cognee_search = CogneeSearch() - + # CogneeAdd for documents with a "documents" nodeset documents_cognee_add = CogneeAdd() documents_cognee_add.default_nodeset = ["documents"] documents_cognee_add.name = "Add Documents to Memory" - documents_cognee_add.description = "Add document content to Cognee memory with documents nodeset" - + documents_cognee_add.description = ( + "Add document content to Cognee memory with documents nodeset" + ) + # CogneeAdd for reasoning/analysis with a "reasoning" nodeset reasoning_cognee_add = CogneeAdd() reasoning_cognee_add.default_nodeset = ["reasoning"] reasoning_cognee_add.name = "Add Reasoning to Memory" - reasoning_cognee_add.description = "Add reasoning and analysis text to Cognee memory with reasoning nodeset" - + reasoning_cognee_add.description = ( + "Add reasoning and analysis text to Cognee memory with reasoning nodeset" + ) + # Create a wrapper for the DirectoryReadTool that formats output class FormattedDirectoryReadTool(DirectoryReadTool): def __call__(self, *args, **kwargs): result = super().__call__(*args, **kwargs) return format_file_paths(result) - - formatted_docs_tool = FormattedDirectoryReadTool(directory='/Users/vasilije/cognee/examples/python/latest_ai_development/src/latest_ai_development/multimedia') + + # Use the project-local multimedia directory + formatted_docs_tool = FormattedDirectoryReadTool(directory=multimedia_dir) return Agent( - config=self.agents_config["researcher"], + config=self.agents_config["researcher"], tools=[formatted_docs_tool, documents_cognee_add, reasoning_cognee_add, cognee_search], - verbose=True + verbose=True, ) @agent def reporting_analyst(self) -> Agent: # Initialize the tools with default parameters cognee_search = CogneeSearch() - + # Reporting analyst can use a "reports" nodeset reports_cognee_add = CogneeAdd() reports_cognee_add.default_nodeset = ["reports"] @@ -107,16 +117,23 @@ class LatestAiDevelopment: def reporting_task(self) -> Task: return Task(config=self.tasks_config["reporting_task"], output_file="report.md") + @before_kickoff + def dump_env(self, *args, **kwargs): + """Print environment variables at startup.""" + print("=== Environment Variables ===") + for key in sorted(os.environ): + print(f"{key}={os.environ[key]}") + @crew def crew(self) -> Crew: """Creates the LatestAiDevelopment crew""" # To learn how to add knowledge sources to your crew, check out the documentation: # https://docs.crewai.com/concepts/knowledge#what-is-knowledge - + print(self.tasks) return Crew( agents=self.agents, # Automatically created by the @agent decorator tasks=self.tasks, # Automatically created by the @task decorator process=Process.sequential, verbose=True, # process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/ - ) \ No newline at end of file + ) diff --git a/examples/python/latest_ai_development/src/latest_ai_development/multimedia/dummy.txt b/examples/python/latest_ai_development/src/latest_ai_development/multimedia/dummy.txt new file mode 100644 index 000000000..02f6de818 --- /dev/null +++ b/examples/python/latest_ai_development/src/latest_ai_development/multimedia/dummy.txt @@ -0,0 +1 @@ +This is a dummy text file for testing DirectoryReadTool. diff --git a/examples/python/latest_ai_development/src/latest_ai_development/tools/custom_tool.py b/examples/python/latest_ai_development/src/latest_ai_development/tools/custom_tool.py index 766d076a0..59aba1d83 100644 --- a/examples/python/latest_ai_development/src/latest_ai_development/tools/custom_tool.py +++ b/examples/python/latest_ai_development/src/latest_ai_development/tools/custom_tool.py @@ -1,9 +1,7 @@ from crewai.tools import BaseTool -from typing import Type, Dict, Any, List, Optional, Union +from typing import Type, List, Optional from pydantic import BaseModel, Field, root_validator from cognee.api.v1.search import SearchType -from cognee.modules.engine.models.Entity import Entity -from cognee.api.v1.search import SearchType from cognee.modules.users.methods import get_default_user from cognee.modules.pipelines import run_tasks, Task from cognee.tasks.experimental_tasks.node_set_edge_association import node_set_edge_association @@ -13,27 +11,44 @@ class CogneeAddInput(BaseModel): """Input schema for CogneeAdd tool.""" context: Optional[str] = Field(None, description="The text content to add to Cognee memory.") - file_paths: Optional[List[str]] = Field(None, description="List of file paths to add to Cognee memory.") - text: Optional[str] = Field(None, description="Alternative field for text content (maps to context).") - reasoning: Optional[str] = Field(None, description="Alternative field for reasoning text (maps to context).") + file_paths: Optional[List[str]] = Field( + None, description="List of file paths to add to Cognee memory." + ) + files: Optional[List[str]] = Field( + None, description="Alias for file_paths; list of file URLs or paths to add to memory." + ) + text: Optional[str] = Field( + None, description="Alternative field for text content (maps to context)." + ) + reasoning: Optional[str] = Field( + None, description="Alternative field for reasoning text (maps to context)." + ) node_set: List[str] = Field( default=["default"], description="The list of node sets to store the data in." ) - + @root_validator(pre=True) def normalize_inputs(cls, values): """Normalize different input formats to standard fields.""" + # Map alias 'files' to 'file_paths' if provided + if values.get("files") and not values.get("file_paths"): + values["file_paths"] = values.get("files") # Map text or reasoning to context if provided - if values.get('text') and not values.get('context'): - values['context'] = values.get('text') - - if values.get('reasoning') and not values.get('context'): - values['context'] = values.get('reasoning') - + if values.get("text") and not values.get("context"): + values["context"] = values.get("text") + + if values.get("reasoning") and not values.get("context"): + values["context"] = values.get("reasoning") + # Map report_section to context if provided + if values.get("report_section") and not values.get("context"): + values["context"] = values.get("report_section") + # Validate that at least one input field is provided - if not values.get('context') and not values.get('file_paths'): - raise ValueError("Either 'context', 'text', 'reasoning', or 'file_paths' must be provided") - + if not values.get("context") and not values.get("file_paths"): + raise ValueError( + "Either 'context', 'text', 'reasoning', or 'file_paths' must be provided" + ) + return values @@ -51,14 +66,14 @@ class CogneeAdd(BaseTool): node_set = kwargs.get("node_set", self.default_nodeset) context = kwargs.get("context") file_paths = kwargs.get("file_paths") - + # Handle alternative input fields text = kwargs.get("text") reasoning = kwargs.get("reasoning") - + if text and not context: context = text - + if reasoning and not context: context = reasoning @@ -70,7 +85,7 @@ class CogneeAdd(BaseTool): elif file_paths: # Handle file paths await cognee.add(file_paths, node_set=ns) - + run = await cognee.cognify() tasks = [Task(node_set_edge_association)] @@ -78,7 +93,9 @@ class CogneeAdd(BaseTool): pipeline = run_tasks(tasks=tasks, user=user) async for pipeline_status in pipeline: - print(f"Pipeline run status: {pipeline_status.pipeline_name} - {pipeline_status.status}") + print( + f"Pipeline run status: {pipeline_status.pipeline_name} - {pipeline_status.status}" + ) return run except Exception as e: @@ -91,7 +108,6 @@ class CogneeAdd(BaseTool): # If loop is already running, create a new one loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - result = loop.run_until_complete(main(node_set)) return result.__name__ if hasattr(result, "__name__") else str(result) except Exception as e: @@ -113,30 +129,30 @@ class CogneeSearchInput(BaseModel): node_set: List[str] = Field( default=["default"], description="The list of node sets to search in." ) - + @root_validator(pre=True) def normalize_inputs(cls, values): """Normalize different input formats to standard fields.""" # If the dictionary is empty, use a default query if not values: - values['query_text'] = "Latest AI developments" + values["query_text"] = "Latest AI developments" return values - + # Map alternative search fields to query_text - if values.get('query') and not values.get('query_text'): - values['query_text'] = values.get('query') - - if values.get('search_term') and not values.get('query_text'): - values['query_text'] = values.get('search_term') - + if values.get("query") and not values.get("query_text"): + values["query_text"] = values.get("query") + + if values.get("search_term") and not values.get("query_text"): + values["query_text"] = values.get("search_term") + # If security_context is provided but no query, use a default - if 'security_context' in values and not values.get('query_text'): - values['query_text'] = "Latest AI developments" - + if "security_context" in values and not values.get("query_text"): + values["query_text"] = "Latest AI developments" + # Ensure query_text is present - if not values.get('query_text'): - values['query_text'] = "Latest AI developments" - + if not values.get("query_text"): + values["query_text"] = "Latest AI developments" + return values @@ -152,26 +168,27 @@ class CogneeSearch(BaseTool): # Use the provided node_set if given, otherwise use default_nodeset node_set = kwargs.get("node_set", self.default_nodeset) - + # Get query_text from kwargs or use a default query_text = kwargs.get("query_text", "Latest AI developments") - + # Handle alternative input fields query = kwargs.get("query") search_term = kwargs.get("search_term") - + if query and not query_text: query_text = query - + if search_term and not query_text: query_text = search_term async def main(query, ns): try: + # Use 'datasets' to specify which node sets (datasets) to search result = await cognee.search( - query_type=SearchType.GRAPH_COMPLETION, query_text=query + " Only return results from context", - node_set=ns # Pass the node_set to the search + query_type=SearchType.GRAPH_COMPLETION, + datasets=ns, ) return result except Exception as e: @@ -188,4 +205,4 @@ class CogneeSearch(BaseTool): result = loop.run_until_complete(main(query_text, node_set)) return str(result) except Exception as e: - return f"Tool execution error: {str(e)}" \ No newline at end of file + return f"Tool execution error: {str(e)}"