Compare commits
18 commits
main
...
sf_demo_te
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
484520eba5 | ||
|
|
8a5bd3a826 | ||
|
|
9b92810b83 | ||
|
|
a65ded6283 | ||
|
|
8924ce0298 | ||
|
|
5c350073cc | ||
|
|
97974fdc89 | ||
|
|
85e5e69494 | ||
|
|
83b20b1e92 | ||
|
|
69c090c91d | ||
|
|
b2a53b4124 | ||
|
|
40142b4789 | ||
|
|
1c40a5081a | ||
|
|
a142e27e39 | ||
|
|
95c12fbc1e | ||
|
|
2355d1bfea | ||
|
|
e7a14b9c60 | ||
|
|
ec68a8cd2d |
25 changed files with 856 additions and 190 deletions
|
|
@ -1,7 +1,9 @@
|
|||
from typing import Union
|
||||
from typing import Union, Optional, Type, List
|
||||
|
||||
from cognee.modules.users.models import User
|
||||
from cognee.infrastructure.engine.models.DataPoint import DataPoint
|
||||
from cognee.modules.search.types import SearchType
|
||||
from cognee.modules.users.exceptions import UserNotFoundError
|
||||
from cognee.modules.users.models import User
|
||||
from cognee.modules.users.methods import get_default_user
|
||||
from cognee.modules.search.methods import search as search_function
|
||||
|
||||
|
|
@ -13,6 +15,8 @@ async def search(
|
|||
datasets: Union[list[str], str, None] = None,
|
||||
system_prompt_path: str = "answer_simple_question.txt",
|
||||
top_k: int = 10,
|
||||
node_type: Optional[Type] = None,
|
||||
node_name: List[Optional[str]] = None,
|
||||
) -> list:
|
||||
# We use lists from now on for datasets
|
||||
if isinstance(datasets, str):
|
||||
|
|
@ -28,6 +32,8 @@ async def search(
|
|||
user,
|
||||
system_prompt_path=system_prompt_path,
|
||||
top_k=top_k,
|
||||
node_type=node_type,
|
||||
node_name=node_name,
|
||||
)
|
||||
|
||||
return filtered_search_results
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import inspect
|
|||
from functools import wraps
|
||||
from abc import abstractmethod, ABC
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional, Dict, Any, List, Tuple
|
||||
from typing import Optional, Dict, Any, List, Tuple, Type
|
||||
from uuid import NAMESPACE_OID, UUID, uuid5
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.infrastructure.engine import DataPoint
|
||||
|
|
@ -178,6 +178,10 @@ class GraphDBInterface(ABC):
|
|||
"""Get all edges connected to a node."""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
async def get_subgraph(self, node_type: Type[Any], node_name: List[str]):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
async def get_neighbors(self, node_id: str) -> List[NodeData]:
|
||||
"""Get all neighboring nodes."""
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ import json
|
|||
from cognee.shared.logging_utils import get_logger, ERROR
|
||||
import asyncio
|
||||
from textwrap import dedent
|
||||
from typing import Optional, Any, List, Dict
|
||||
from typing import Optional, Any, List, Dict, Type, Tuple
|
||||
from contextlib import asynccontextmanager
|
||||
from uuid import UUID
|
||||
from neo4j import AsyncSession
|
||||
|
|
@ -517,6 +517,58 @@ class Neo4jAdapter(GraphDBInterface):
|
|||
|
||||
return (nodes, edges)
|
||||
|
||||
async def get_subgraph(
|
||||
self, node_type: Type[Any], node_name: List[str]
|
||||
) -> Tuple[List[Tuple[int, dict]], List[Tuple[int, int, str, dict]]]:
|
||||
label = node_type.__name__
|
||||
|
||||
query = f"""
|
||||
UNWIND $names AS wantedName
|
||||
MATCH (n:`{label}`)
|
||||
WHERE n.name = wantedName
|
||||
WITH collect(DISTINCT n) AS primary
|
||||
|
||||
UNWIND primary AS p
|
||||
OPTIONAL MATCH (p)--(nbr)
|
||||
WITH primary, collect(DISTINCT nbr) AS nbrs
|
||||
WITH primary + nbrs AS nodelist
|
||||
|
||||
UNWIND nodelist AS node
|
||||
WITH collect(DISTINCT node) AS nodes
|
||||
|
||||
MATCH (a)-[r]-(b)
|
||||
WHERE a IN nodes AND b IN nodes
|
||||
WITH nodes, collect(DISTINCT r) AS rels
|
||||
|
||||
RETURN
|
||||
[n IN nodes |
|
||||
{{ id: n.id,
|
||||
properties: properties(n) }}] AS rawNodes,
|
||||
[r IN rels |
|
||||
{{ type: type(r),
|
||||
properties: properties(r) }}] AS rawRels
|
||||
"""
|
||||
|
||||
result = await self.query(query, {"names": node_name})
|
||||
if not result:
|
||||
return [], []
|
||||
|
||||
raw_nodes = result[0]["rawNodes"]
|
||||
raw_rels = result[0]["rawRels"]
|
||||
|
||||
nodes = [(n["properties"]["id"], n["properties"]) for n in raw_nodes]
|
||||
edges = [
|
||||
(
|
||||
r["properties"]["source_node_id"],
|
||||
r["properties"]["target_node_id"],
|
||||
r["type"],
|
||||
r["properties"],
|
||||
)
|
||||
for r in raw_rels
|
||||
]
|
||||
|
||||
return nodes, edges
|
||||
|
||||
async def get_filtered_graph_data(self, attribute_filters):
|
||||
"""
|
||||
Fetches nodes and relationships filtered by specified attribute values.
|
||||
|
|
|
|||
|
|
@ -1,9 +1,10 @@
|
|||
import pickle
|
||||
from uuid import UUID, uuid4
|
||||
from pydantic import BaseModel, Field
|
||||
from datetime import datetime, timezone
|
||||
from typing_extensions import TypedDict
|
||||
from typing import Optional, Any, Dict, List
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from typing_extensions import TypedDict
|
||||
import pickle
|
||||
|
||||
|
||||
# Define metadata type
|
||||
|
|
|
|||
|
|
@ -0,0 +1,46 @@
|
|||
You are an expert knowledge graph augmentation assistant specializing in identifying new edges that contain **semantic and conceptual associations**.
|
||||
|
||||
## Input
|
||||
Input will contain a graph description with nodes and edges:
|
||||
- **Nodes**: JSON array of objects:
|
||||
- `name`: unique node label
|
||||
- `content`: full text or description
|
||||
- **Edges**: JSON array of objects:
|
||||
- `source`: name of the first node
|
||||
- `target`: name of the second node
|
||||
- `relationship_name`: existing link label
|
||||
|
||||
## Task
|
||||
Look for meaningful associations between nodes that aren’t yet connected. Identify cases where two things are commonly linked, used together, work together, depend on each other, or naturally belong to the same category or group. These connections can describe how things interact, support each other, or are understood as related in real-world contexts.
|
||||
|
||||
The association doesn’t have to be obvious at first glance—consider how the concepts, objects, or entities might be connected based on their purpose, function, or role. The direction of each edge should clearly show how one points to, supports, or is connected to the other in a way that makes practical sense.
|
||||
|
||||
Avoid technical, structural, or generic links like uses, contains, or is_part_of. Focus on connections that describe how things go together or relate in context.
|
||||
|
||||
## Rules
|
||||
1. Propose only directed associations where direction adds meaning.
|
||||
2. Do not repeat existing edges in the same direction.
|
||||
3. Do not create self-loops (source == target).
|
||||
4. Only link nodes when there is a clear, real-world connection based on their content.
|
||||
5. Keep relationship_name concise and in snake_case, describing the nature of the association.
|
||||
|
||||
|
||||
## Strict Exclusions
|
||||
- Skip pairs already connected by any edge in the **same direction**.
|
||||
- Do **not** propose structural, containment, usage, or metadata associations.
|
||||
- No self-loops.
|
||||
|
||||
## Output
|
||||
Return **only** valid JSON in this schema:
|
||||
|
||||
```json
|
||||
{
|
||||
"new_edges": [
|
||||
{
|
||||
"source": "NodeA",
|
||||
"target": "NodeB",
|
||||
"relationship_name": "concise_snake_case_label",
|
||||
"reason": "brief justification explaining the association and its direction"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
from cognee.shared.logging_utils import get_logger
|
||||
from typing import List, Dict, Union
|
||||
from typing import List, Dict, Union, Optional, Type
|
||||
|
||||
from cognee.exceptions import InvalidValueError
|
||||
from cognee.modules.graph.exceptions import EntityNotFoundError, EntityAlreadyExistsError
|
||||
|
|
@ -61,12 +61,18 @@ class CogneeGraph(CogneeAbstractGraph):
|
|||
node_dimension=1,
|
||||
edge_dimension=1,
|
||||
memory_fragment_filter=[],
|
||||
node_type: Optional[Type] = None,
|
||||
node_name: List[Optional[str]] = None,
|
||||
) -> None:
|
||||
if node_dimension < 1 or edge_dimension < 1:
|
||||
raise InvalidValueError(message="Dimensions must be positive integers")
|
||||
|
||||
try:
|
||||
if len(memory_fragment_filter) == 0:
|
||||
if node_type is not None and node_name is not None:
|
||||
nodes_data, edges_data = await adapter.get_subgraph(
|
||||
node_type=node_type, node_name=node_name
|
||||
)
|
||||
elif len(memory_fragment_filter) == 0:
|
||||
nodes_data, edges_data = await adapter.get_graph_data()
|
||||
else:
|
||||
nodes_data, edges_data = await adapter.get_filtered_graph_data(
|
||||
|
|
@ -74,9 +80,11 @@ class CogneeGraph(CogneeAbstractGraph):
|
|||
)
|
||||
|
||||
if not nodes_data:
|
||||
raise EntityNotFoundError(message="No node data retrieved from the database.")
|
||||
#:TODO: quick and dirty solution for sf demo, as the list of nodes can be empty
|
||||
return None
|
||||
if not edges_data:
|
||||
raise EntityNotFoundError(message="No edge data retrieved from the database.")
|
||||
#:TODO: quick and dirty solution for sf demo, as the list of edges can be empty
|
||||
return None
|
||||
|
||||
for node_id, properties in nodes_data:
|
||||
node_attributes = {key: properties.get(key) for key in node_properties_to_project}
|
||||
|
|
|
|||
|
|
@ -144,6 +144,7 @@ def expand_with_nodes_and_edges(
|
|||
is_a=type_node,
|
||||
description=node.description,
|
||||
ontology_valid=ontology_validated_source_ent,
|
||||
belongs_to_set=data_chunk.belongs_to_set,
|
||||
)
|
||||
|
||||
added_nodes_map[entity_node_key] = entity_node
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Any, Optional
|
||||
from typing import Any, Optional, Type, List
|
||||
from collections import Counter
|
||||
import string
|
||||
|
||||
|
|
@ -19,11 +19,15 @@ class GraphCompletionRetriever(BaseRetriever):
|
|||
user_prompt_path: str = "graph_context_for_question.txt",
|
||||
system_prompt_path: str = "answer_simple_question.txt",
|
||||
top_k: Optional[int] = 5,
|
||||
node_type: Optional[Type] = None,
|
||||
node_name: List[Optional[str]] = None,
|
||||
):
|
||||
"""Initialize retriever with prompt paths and search parameters."""
|
||||
self.user_prompt_path = user_prompt_path
|
||||
self.system_prompt_path = system_prompt_path
|
||||
self.top_k = top_k if top_k is not None else 5
|
||||
self.node_type = node_type
|
||||
self.node_name = node_name
|
||||
|
||||
def _get_nodes(self, retrieved_edges: list) -> dict:
|
||||
"""Creates a dictionary of nodes with their names and content."""
|
||||
|
|
@ -69,9 +73,17 @@ class GraphCompletionRetriever(BaseRetriever):
|
|||
vector_index_collections.append(f"{subclass.__name__}_{field_name}")
|
||||
|
||||
found_triplets = await brute_force_triplet_search(
|
||||
query, top_k=self.top_k, collections=vector_index_collections or None
|
||||
query,
|
||||
top_k=self.top_k,
|
||||
collections=vector_index_collections or None,
|
||||
node_type=self.node_type,
|
||||
node_name=self.node_name,
|
||||
)
|
||||
|
||||
if len(found_triplets) == 0:
|
||||
#:TODO: quick and dirty solution for sf demo, as the triplets can be empty
|
||||
return []
|
||||
|
||||
return found_triplets
|
||||
|
||||
async def get_context(self, query: str) -> str:
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import asyncio
|
||||
from cognee.shared.logging_utils import get_logger, ERROR
|
||||
from typing import List, Optional
|
||||
from typing import List, Optional, Type
|
||||
|
||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||
from cognee.infrastructure.databases.vector import get_vector_engine
|
||||
|
|
@ -54,6 +54,8 @@ def format_triplets(edges):
|
|||
|
||||
async def get_memory_fragment(
|
||||
properties_to_project: Optional[List[str]] = None,
|
||||
node_type: Optional[Type] = None,
|
||||
node_name: List[Optional[str]] = None,
|
||||
) -> CogneeGraph:
|
||||
"""Creates and initializes a CogneeGraph memory fragment with optional property projections."""
|
||||
graph_engine = await get_graph_engine()
|
||||
|
|
@ -66,6 +68,8 @@ async def get_memory_fragment(
|
|||
graph_engine,
|
||||
node_properties_to_project=properties_to_project,
|
||||
edge_properties_to_project=["relationship_name"],
|
||||
node_type=node_type,
|
||||
node_name=node_name,
|
||||
)
|
||||
|
||||
return memory_fragment
|
||||
|
|
@ -78,6 +82,8 @@ async def brute_force_triplet_search(
|
|||
collections: List[str] = None,
|
||||
properties_to_project: List[str] = None,
|
||||
memory_fragment: Optional[CogneeGraph] = None,
|
||||
node_type: Optional[Type] = None,
|
||||
node_name: List[Optional[str]] = None,
|
||||
) -> list:
|
||||
if user is None:
|
||||
user = await get_default_user()
|
||||
|
|
@ -89,6 +95,8 @@ async def brute_force_triplet_search(
|
|||
collections=collections,
|
||||
properties_to_project=properties_to_project,
|
||||
memory_fragment=memory_fragment,
|
||||
node_type=node_type,
|
||||
node_name=node_name,
|
||||
)
|
||||
return retrieved_results
|
||||
|
||||
|
|
@ -100,6 +108,8 @@ async def brute_force_search(
|
|||
collections: List[str] = None,
|
||||
properties_to_project: List[str] = None,
|
||||
memory_fragment: Optional[CogneeGraph] = None,
|
||||
node_type: Optional[Type] = None,
|
||||
node_name: List[Optional[str]] = None,
|
||||
) -> list:
|
||||
"""
|
||||
Performs a brute force search to retrieve the top triplets from the graph.
|
||||
|
|
@ -111,6 +121,8 @@ async def brute_force_search(
|
|||
collections (Optional[List[str]]): List of collections to query.
|
||||
properties_to_project (Optional[List[str]]): List of properties to project.
|
||||
memory_fragment (Optional[CogneeGraph]): Existing memory fragment to reuse.
|
||||
node_type: node type to filter
|
||||
node_name: node name to filter
|
||||
|
||||
Returns:
|
||||
list: The top triplet results.
|
||||
|
|
@ -121,7 +133,9 @@ async def brute_force_search(
|
|||
raise ValueError("top_k must be a positive integer.")
|
||||
|
||||
if memory_fragment is None:
|
||||
memory_fragment = await get_memory_fragment(properties_to_project)
|
||||
memory_fragment = await get_memory_fragment(
|
||||
properties_to_project=properties_to_project, node_type=node_type, node_name=node_name
|
||||
)
|
||||
|
||||
if collections is None:
|
||||
collections = [
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import json
|
||||
from typing import Callable
|
||||
from typing import Callable, Optional, Type, List
|
||||
|
||||
from cognee.exceptions import InvalidValueError
|
||||
from cognee.infrastructure.engine.utils import parse_id
|
||||
|
|
@ -11,6 +11,7 @@ from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionR
|
|||
from cognee.modules.retrieval.graph_summary_completion_retriever import (
|
||||
GraphSummaryCompletionRetriever,
|
||||
)
|
||||
from cognee.infrastructure.engine.models.DataPoint import DataPoint
|
||||
from cognee.modules.retrieval.code_retriever import CodeRetriever
|
||||
from cognee.modules.retrieval.cypher_search_retriever import CypherSearchRetriever
|
||||
from cognee.modules.retrieval.natural_language_retriever import NaturalLanguageRetriever
|
||||
|
|
@ -29,12 +30,20 @@ async def search(
|
|||
user: User,
|
||||
system_prompt_path="answer_simple_question.txt",
|
||||
top_k: int = 10,
|
||||
node_type: Optional[Type] = None,
|
||||
node_name: List[Optional[str]] = None,
|
||||
):
|
||||
query = await log_query(query_text, query_type.value, user.id)
|
||||
|
||||
own_document_ids = await get_document_ids_for_user(user.id, datasets)
|
||||
search_results = await specific_search(
|
||||
query_type, query_text, user, system_prompt_path=system_prompt_path, top_k=top_k
|
||||
query_type,
|
||||
query_text,
|
||||
user,
|
||||
system_prompt_path=system_prompt_path,
|
||||
top_k=top_k,
|
||||
node_type=node_type,
|
||||
node_name=node_name,
|
||||
)
|
||||
|
||||
filtered_search_results = []
|
||||
|
|
@ -57,6 +66,8 @@ async def specific_search(
|
|||
user: User,
|
||||
system_prompt_path="answer_simple_question.txt",
|
||||
top_k: int = 10,
|
||||
node_type: Optional[Type] = None,
|
||||
node_name: List[Optional[str]] = None,
|
||||
) -> list:
|
||||
search_tasks: dict[SearchType, Callable] = {
|
||||
SearchType.SUMMARIES: SummariesRetriever().get_completion,
|
||||
|
|
@ -69,6 +80,8 @@ async def specific_search(
|
|||
SearchType.GRAPH_COMPLETION: GraphCompletionRetriever(
|
||||
system_prompt_path=system_prompt_path,
|
||||
top_k=top_k,
|
||||
node_type=node_type,
|
||||
node_name=node_name,
|
||||
).get_completion,
|
||||
SearchType.GRAPH_SUMMARY_COMPLETION: GraphSummaryCompletionRetriever(
|
||||
system_prompt_path=system_prompt_path,
|
||||
|
|
|
|||
|
|
@ -19,8 +19,6 @@ async def cognee_network_visualization(graph_data, destination_file_path: str =
|
|||
"EntityType": "#6510f4",
|
||||
"DocumentChunk": "#801212",
|
||||
"TextSummary": "#1077f4",
|
||||
"TableRow": "#f47710",
|
||||
"TableType": "#6510f4",
|
||||
"default": "#D3D3D3",
|
||||
}
|
||||
|
||||
|
|
|
|||
99
cognee/tasks/experimental_tasks/node_set_edge_association.py
Normal file
99
cognee/tasks/experimental_tasks/node_set_edge_association.py
Normal file
|
|
@ -0,0 +1,99 @@
|
|||
from typing import Union, Optional, Type, List
|
||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||
from cognee.modules.engine.models.node_set import NodeSet
|
||||
from cognee.shared.data_models import Edge
|
||||
from pydantic import BaseModel, Field
|
||||
from cognee.infrastructure.llm.get_llm_client import get_llm_client
|
||||
from cognee.infrastructure.llm.prompts import render_prompt
|
||||
from cognee.infrastructure.llm.config import get_llm_config
|
||||
from uuid import UUID
|
||||
|
||||
|
||||
class AssociativeEdge(BaseModel):
|
||||
source_node: str
|
||||
target_node: str
|
||||
relationship_name: str
|
||||
reason: str
|
||||
|
||||
|
||||
class AssociativeEdges(BaseModel):
|
||||
edges: List[AssociativeEdge] = Field(..., default_factory=list)
|
||||
|
||||
|
||||
async def node_set_edge_association():
|
||||
graph_engine = await get_graph_engine()
|
||||
|
||||
node_set_names = await graph_engine.query("""MATCH (n)
|
||||
WHERE n.type = 'NodeSet'
|
||||
RETURN n.name AS name
|
||||
""")
|
||||
|
||||
for node_set in node_set_names:
|
||||
node_name = node_set.get("name", None)
|
||||
nodes_data, edges_data = await graph_engine.get_subgraph(
|
||||
node_type=NodeSet, node_name=node_name
|
||||
)
|
||||
nodes = {}
|
||||
for node_id, attributes in nodes_data:
|
||||
if node_id not in nodes:
|
||||
text = attributes.get("text")
|
||||
if text:
|
||||
name = text.strip().split("\n")[0][:50]
|
||||
content = text
|
||||
else:
|
||||
name = attributes.get("name", "Unnamed Node")
|
||||
content = name
|
||||
nodes[node_id] = {"node": attributes, "name": name, "content": content}
|
||||
|
||||
name_to_uuid = {data["name"].strip().lower(): node_id for node_id, data in nodes.items()}
|
||||
|
||||
subgraph_description = create_subgraph_description(nodes, edges_data)
|
||||
|
||||
llm_client = get_llm_client()
|
||||
|
||||
system_prompt = render_prompt("edge_association_prompt.txt", {})
|
||||
associative_edges = await llm_client.acreate_structured_output(
|
||||
subgraph_description, system_prompt, AssociativeEdges
|
||||
)
|
||||
|
||||
graph_edges = []
|
||||
for ae in associative_edges.edges:
|
||||
src_str = name_to_uuid.get(ae.source_node)
|
||||
tgt_str = name_to_uuid.get(ae.target_node)
|
||||
if not src_str or not tgt_str:
|
||||
continue
|
||||
|
||||
src = UUID(src_str)
|
||||
tgt = UUID(tgt_str)
|
||||
rel = ae.relationship_name
|
||||
rea = ae.reason
|
||||
|
||||
props = {
|
||||
"ontology_valid": False,
|
||||
"relationship_name": rel,
|
||||
"source_node_id": src,
|
||||
"target_node_id": tgt,
|
||||
"reason": rea,
|
||||
}
|
||||
|
||||
graph_edges.append((src, tgt, rel, props))
|
||||
|
||||
if graph_edges:
|
||||
await graph_engine.add_edges(graph_edges)
|
||||
|
||||
print()
|
||||
|
||||
|
||||
def create_subgraph_description(nodes, edges_data):
|
||||
node_section = "\n".join(
|
||||
f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n"
|
||||
for info in nodes.values()
|
||||
)
|
||||
|
||||
connection_section = "\n".join(
|
||||
f"{nodes[source_id]['name']} --[{relationship_type}]--> {nodes[target_id]['name']}"
|
||||
for source_id, target_id, relationship_type, attributes in edges_data
|
||||
if source_id in nodes and target_id in nodes
|
||||
)
|
||||
|
||||
return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}"
|
||||
|
|
@ -13,7 +13,6 @@ from cognee.modules.users.permissions.methods import give_permission_on_document
|
|||
from .get_dlt_destination import get_dlt_destination
|
||||
from .save_data_item_to_storage import save_data_item_to_storage
|
||||
|
||||
|
||||
from cognee.api.v1.add.config import get_s3_config
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,163 +1,15 @@
|
|||
import cognee
|
||||
import asyncio
|
||||
|
||||
|
||||
from cognee.shared.logging_utils import get_logger, ERROR
|
||||
from cognee.modules.metrics.operations import get_pipeline_run_metrics
|
||||
|
||||
from cognee.modules.engine.models.Entity import Entity
|
||||
from cognee.api.v1.search import SearchType
|
||||
|
||||
job_1 = """
|
||||
CV 1: Relevant
|
||||
Name: Dr. Emily Carter
|
||||
Contact Information:
|
||||
|
||||
Email: emily.carter@example.com
|
||||
Phone: (555) 123-4567
|
||||
Summary:
|
||||
|
||||
Senior Data Scientist with over 8 years of experience in machine learning and predictive analytics. Expertise in developing advanced algorithms and deploying scalable models in production environments.
|
||||
|
||||
Education:
|
||||
|
||||
Ph.D. in Computer Science, Stanford University (2014)
|
||||
B.S. in Mathematics, University of California, Berkeley (2010)
|
||||
Experience:
|
||||
|
||||
Senior Data Scientist, InnovateAI Labs (2016 – Present)
|
||||
Led a team in developing machine learning models for natural language processing applications.
|
||||
Implemented deep learning algorithms that improved prediction accuracy by 25%.
|
||||
Collaborated with cross-functional teams to integrate models into cloud-based platforms.
|
||||
Data Scientist, DataWave Analytics (2014 – 2016)
|
||||
Developed predictive models for customer segmentation and churn analysis.
|
||||
Analyzed large datasets using Hadoop and Spark frameworks.
|
||||
Skills:
|
||||
|
||||
Programming Languages: Python, R, SQL
|
||||
Machine Learning: TensorFlow, Keras, Scikit-Learn
|
||||
Big Data Technologies: Hadoop, Spark
|
||||
Data Visualization: Tableau, Matplotlib
|
||||
"""
|
||||
|
||||
job_2 = """
|
||||
CV 2: Relevant
|
||||
Name: Michael Rodriguez
|
||||
Contact Information:
|
||||
|
||||
Email: michael.rodriguez@example.com
|
||||
Phone: (555) 234-5678
|
||||
Summary:
|
||||
|
||||
Data Scientist with a strong background in machine learning and statistical modeling. Skilled in handling large datasets and translating data into actionable business insights.
|
||||
|
||||
Education:
|
||||
|
||||
M.S. in Data Science, Carnegie Mellon University (2013)
|
||||
B.S. in Computer Science, University of Michigan (2011)
|
||||
Experience:
|
||||
|
||||
Senior Data Scientist, Alpha Analytics (2017 – Present)
|
||||
Developed machine learning models to optimize marketing strategies.
|
||||
Reduced customer acquisition cost by 15% through predictive modeling.
|
||||
Data Scientist, TechInsights (2013 – 2017)
|
||||
Analyzed user behavior data to improve product features.
|
||||
Implemented A/B testing frameworks to evaluate product changes.
|
||||
Skills:
|
||||
|
||||
Programming Languages: Python, Java, SQL
|
||||
Machine Learning: Scikit-Learn, XGBoost
|
||||
Data Visualization: Seaborn, Plotly
|
||||
Databases: MySQL, MongoDB
|
||||
"""
|
||||
|
||||
|
||||
job_3 = """
|
||||
CV 3: Relevant
|
||||
Name: Sarah Nguyen
|
||||
Contact Information:
|
||||
|
||||
Email: sarah.nguyen@example.com
|
||||
Phone: (555) 345-6789
|
||||
Summary:
|
||||
|
||||
Data Scientist specializing in machine learning with 6 years of experience. Passionate about leveraging data to drive business solutions and improve product performance.
|
||||
|
||||
Education:
|
||||
|
||||
M.S. in Statistics, University of Washington (2014)
|
||||
B.S. in Applied Mathematics, University of Texas at Austin (2012)
|
||||
Experience:
|
||||
|
||||
Data Scientist, QuantumTech (2016 – Present)
|
||||
Designed and implemented machine learning algorithms for financial forecasting.
|
||||
Improved model efficiency by 20% through algorithm optimization.
|
||||
Junior Data Scientist, DataCore Solutions (2014 – 2016)
|
||||
Assisted in developing predictive models for supply chain optimization.
|
||||
Conducted data cleaning and preprocessing on large datasets.
|
||||
Skills:
|
||||
|
||||
Programming Languages: Python, R
|
||||
Machine Learning Frameworks: PyTorch, Scikit-Learn
|
||||
Statistical Analysis: SAS, SPSS
|
||||
Cloud Platforms: AWS, Azure
|
||||
"""
|
||||
|
||||
|
||||
job_4 = """
|
||||
CV 4: Not Relevant
|
||||
Name: David Thompson
|
||||
Contact Information:
|
||||
|
||||
Email: david.thompson@example.com
|
||||
Phone: (555) 456-7890
|
||||
Summary:
|
||||
|
||||
Creative Graphic Designer with over 8 years of experience in visual design and branding. Proficient in Adobe Creative Suite and passionate about creating compelling visuals.
|
||||
|
||||
Education:
|
||||
|
||||
B.F.A. in Graphic Design, Rhode Island School of Design (2012)
|
||||
Experience:
|
||||
|
||||
Senior Graphic Designer, CreativeWorks Agency (2015 – Present)
|
||||
Led design projects for clients in various industries.
|
||||
Created branding materials that increased client engagement by 30%.
|
||||
Graphic Designer, Visual Innovations (2012 – 2015)
|
||||
Designed marketing collateral, including brochures, logos, and websites.
|
||||
Collaborated with the marketing team to develop cohesive brand strategies.
|
||||
Skills:
|
||||
|
||||
Design Software: Adobe Photoshop, Illustrator, InDesign
|
||||
Web Design: HTML, CSS
|
||||
Specialties: Branding and Identity, Typography
|
||||
"""
|
||||
|
||||
|
||||
job_5 = """
|
||||
CV 5: Not Relevant
|
||||
Name: Jessica Miller
|
||||
Contact Information:
|
||||
|
||||
Email: jessica.miller@example.com
|
||||
Phone: (555) 567-8901
|
||||
Summary:
|
||||
|
||||
Experienced Sales Manager with a strong track record in driving sales growth and building high-performing teams. Excellent communication and leadership skills.
|
||||
|
||||
Education:
|
||||
|
||||
B.A. in Business Administration, University of Southern California (2010)
|
||||
Experience:
|
||||
|
||||
Sales Manager, Global Enterprises (2015 – Present)
|
||||
Managed a sales team of 15 members, achieving a 20% increase in annual revenue.
|
||||
Developed sales strategies that expanded customer base by 25%.
|
||||
Sales Representative, Market Leaders Inc. (2010 – 2015)
|
||||
Consistently exceeded sales targets and received the 'Top Salesperson' award in 2013.
|
||||
Skills:
|
||||
|
||||
Sales Strategy and Planning
|
||||
Team Leadership and Development
|
||||
CRM Software: Salesforce, Zoho
|
||||
Negotiation and Relationship Building
|
||||
Natural language processing (NLP) is an interdisciplinary
|
||||
subfield of computer science and information retrieval.
|
||||
"""
|
||||
|
||||
|
||||
|
|
@ -173,7 +25,7 @@ async def main(enable_steps):
|
|||
|
||||
# Step 2: Add text
|
||||
if enable_steps.get("add_text"):
|
||||
text_list = [job_1, job_2, job_3, job_4, job_5]
|
||||
text_list = [job_1]
|
||||
for text in text_list:
|
||||
await cognee.add(text)
|
||||
print(f"Added text: {text[:35]}...")
|
||||
|
|
@ -191,7 +43,10 @@ async def main(enable_steps):
|
|||
# Step 5: Query insights
|
||||
if enable_steps.get("retriever"):
|
||||
search_results = await cognee.search(
|
||||
query_type=SearchType.GRAPH_COMPLETION, query_text="Who has experience in design tools?"
|
||||
query_type=SearchType.GRAPH_COMPLETION,
|
||||
query_text="What is computer science?",
|
||||
node_type=Entity,
|
||||
node_name=["computer science"],
|
||||
)
|
||||
print(search_results)
|
||||
|
||||
|
|
|
|||
26
examples/python/latest_ai_development/pyproject.toml
Normal file
26
examples/python/latest_ai_development/pyproject.toml
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
[project]
|
||||
name = "latest_ai_development"
|
||||
version = "0.1.0"
|
||||
description = "latest-ai-development using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.13"
|
||||
dependencies = [
|
||||
"crewai[tools]>=0.102.0,<1.0.0",
|
||||
"cognee>=0.1.34",
|
||||
"s3fs>=2025.3.2",
|
||||
"neo4j>=5.28.1"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
latest_ai_development = "latest_ai_development.main:run"
|
||||
run_crew = "latest_ai_development.main:run"
|
||||
train = "latest_ai_development.main:train"
|
||||
replay = "latest_ai_development.main:replay"
|
||||
test = "latest_ai_development.main:test"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.crewai]
|
||||
type = "crew"
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
researcher:
|
||||
role: >
|
||||
{topic} Senior Data Researcher
|
||||
goal: >
|
||||
Uncover cutting-edge developments in {topic}
|
||||
backstory: >
|
||||
You're a seasoned researcher with a knack for uncovering the latest
|
||||
developments in File Analysis. You are able to pass many files paths to documents_cognee_add add.
|
||||
You pass your reasoning observations to reasoning_cognee_add
|
||||
Known for your ability to find the most relevant
|
||||
information and present it in a clear and concise manner.
|
||||
|
||||
reporting_analyst:
|
||||
role: >
|
||||
{topic} Reporting Analyst
|
||||
goal: >
|
||||
Create detailed reports based on {topic} data analysis and research findings
|
||||
backstory: >
|
||||
You're a meticulous analyst with a keen eye for detail. You're known for
|
||||
your ability to turn complex data into clear and concise reports, making
|
||||
it easy for others to understand and act on the information you provide.
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
research_task:
|
||||
description: >
|
||||
Conduct a thorough research about filesystem files you have available.
|
||||
Make sure you find any interesting and relevant information given
|
||||
the current year is {current_year}.
|
||||
Load the data in the multimedia folder and load the files using system tools.
|
||||
Use the Cognee Add tool
|
||||
to store important findings for later reference.
|
||||
expected_output: >
|
||||
A list with 10 data points you loaded, and your observations you loaded.
|
||||
agent: researcher
|
||||
|
||||
reporting_task:
|
||||
description: >
|
||||
Review the context you got and expand each topic into a full section for a report.
|
||||
Make sure the report is detailed and contains any and all relevant information.
|
||||
use Cognee Add to save your report sections during your work.
|
||||
Use cognee search, and don't pass any parameters
|
||||
expected_output: >
|
||||
A fully fledged report with the main topics, each with a full section of information.
|
||||
Formatted as markdown without '```'
|
||||
agent: reporting_analyst
|
||||
|
|
@ -0,0 +1,139 @@
|
|||
from crewai import Agent, Crew, Process, Task
|
||||
from crewai.project import CrewBase, agent, crew, task, before_kickoff
|
||||
from .tools import CogneeAdd, CogneeSearch
|
||||
|
||||
from crewai_tools import DirectoryReadTool
|
||||
import os
|
||||
|
||||
# Determine multimedia input directory (can be overridden via env var)
|
||||
multimedia_dir = os.getenv("MULTIMEDIA_DIR", os.path.join(os.path.dirname(__file__), "multimedia"))
|
||||
docs_tool = DirectoryReadTool(directory=multimedia_dir)
|
||||
|
||||
|
||||
# Utility function to format paths with file:// prefix
|
||||
def format_file_paths(paths):
|
||||
"""
|
||||
Formats a list of file paths with 'file://' prefix
|
||||
|
||||
Args:
|
||||
paths: A string representing the output of DirectoryReadTool containing file paths
|
||||
|
||||
Returns:
|
||||
A formatted string where each path is prefixed with 'file://'
|
||||
"""
|
||||
if isinstance(paths, str):
|
||||
# Split the paths by newline if it's a string output
|
||||
file_list = [line for line in paths.split("\n") if line.strip()]
|
||||
# Format each path with file:// prefix
|
||||
formatted_paths = [
|
||||
f"file://{os.path.abspath(path.strip())}"
|
||||
for path in file_list
|
||||
if "File paths:" not in path
|
||||
]
|
||||
return "\n".join(formatted_paths)
|
||||
return paths
|
||||
|
||||
|
||||
# If you want to run a snippet of code before or after the crew starts,
|
||||
# you can use the @before_kickoff and @after_kickoff decorators
|
||||
# https://docs.crewai.com/concepts/crews#example-crew-class-with-decorators
|
||||
|
||||
|
||||
@CrewBase
|
||||
class LatestAiDevelopment:
|
||||
"""LatestAiDevelopment crew"""
|
||||
|
||||
# Learn more about YAML configuration files here:
|
||||
# Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended
|
||||
# Tasks: https://docs.crewai.com/concepts/tasks#yaml-configuration-recommended
|
||||
agents_config = "config/agents.yaml"
|
||||
tasks_config = "config/tasks.yaml"
|
||||
|
||||
# If you would like to add tools to your agents, you can learn more about it here:
|
||||
# https://docs.crewai.com/concepts/agents#agent-tools
|
||||
@agent
|
||||
def researcher(self) -> Agent:
|
||||
# Initialize the tools with different nodesets
|
||||
cognee_search = CogneeSearch()
|
||||
|
||||
# CogneeAdd for documents with a "documents" nodeset
|
||||
documents_cognee_add = CogneeAdd()
|
||||
documents_cognee_add.default_nodeset = ["documents"]
|
||||
documents_cognee_add.name = "Add Documents to Memory"
|
||||
documents_cognee_add.description = (
|
||||
"Add document content to Cognee memory with documents nodeset"
|
||||
)
|
||||
|
||||
# CogneeAdd for reasoning/analysis with a "reasoning" nodeset
|
||||
reasoning_cognee_add = CogneeAdd()
|
||||
reasoning_cognee_add.default_nodeset = ["reasoning"]
|
||||
reasoning_cognee_add.name = "Add Reasoning to Memory"
|
||||
reasoning_cognee_add.description = (
|
||||
"Add reasoning and analysis text to Cognee memory with reasoning nodeset"
|
||||
)
|
||||
|
||||
# Create a wrapper for the DirectoryReadTool that formats output
|
||||
class FormattedDirectoryReadTool(DirectoryReadTool):
|
||||
def __call__(self, *args, **kwargs):
|
||||
result = super().__call__(*args, **kwargs)
|
||||
return format_file_paths(result)
|
||||
|
||||
# Use the project-local multimedia directory
|
||||
formatted_docs_tool = FormattedDirectoryReadTool(directory=multimedia_dir)
|
||||
|
||||
return Agent(
|
||||
config=self.agents_config["researcher"],
|
||||
tools=[formatted_docs_tool, documents_cognee_add, reasoning_cognee_add, cognee_search],
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
@agent
|
||||
def reporting_analyst(self) -> Agent:
|
||||
# Initialize the tools with default parameters
|
||||
cognee_search = CogneeSearch()
|
||||
|
||||
# Reporting analyst can use a "reports" nodeset
|
||||
reports_cognee_add = CogneeAdd()
|
||||
reports_cognee_add.default_nodeset = ["reports"]
|
||||
reports_cognee_add.name = "Add Reports to Memory"
|
||||
reports_cognee_add.description = "Add report content to Cognee memory with reports nodeset"
|
||||
|
||||
return Agent(
|
||||
config=self.agents_config["reporting_analyst"],
|
||||
tools=[cognee_search, reports_cognee_add],
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
# To learn more about structured task outputs,
|
||||
# task dependencies, and task callbacks, check out the documentation:
|
||||
# https://docs.crewai.com/concepts/tasks#overview-of-a-task
|
||||
@task
|
||||
def research_task(self) -> Task:
|
||||
return Task(
|
||||
config=self.tasks_config["research_task"],
|
||||
)
|
||||
|
||||
@task
|
||||
def reporting_task(self) -> Task:
|
||||
return Task(config=self.tasks_config["reporting_task"], output_file="report.md")
|
||||
|
||||
@before_kickoff
|
||||
def dump_env(self, *args, **kwargs):
|
||||
"""Print environment variables at startup."""
|
||||
print("=== Environment Variables ===")
|
||||
for key in sorted(os.environ):
|
||||
print(f"{key}={os.environ[key]}")
|
||||
|
||||
@crew
|
||||
def crew(self) -> Crew:
|
||||
"""Creates the LatestAiDevelopment crew"""
|
||||
# To learn how to add knowledge sources to your crew, check out the documentation:
|
||||
# https://docs.crewai.com/concepts/knowledge#what-is-knowledge
|
||||
print(self.tasks)
|
||||
return Crew(
|
||||
agents=self.agents, # Automatically created by the @agent decorator
|
||||
tasks=self.tasks, # Automatically created by the @task decorator
|
||||
process=Process.sequential,
|
||||
verbose=True,
|
||||
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
|
||||
)
|
||||
|
|
@ -0,0 +1,73 @@
|
|||
#!/usr/bin/env python
|
||||
import sys
|
||||
import warnings
|
||||
import os
|
||||
import cognee
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from latest_ai_development.crew import LatestAiDevelopment
|
||||
|
||||
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
|
||||
|
||||
# This main file is intended to be a way for you to run your
|
||||
# crew locally, so refrain from adding unnecessary logic into this file.
|
||||
# Replace with inputs you want to test with, it will automatically
|
||||
# interpolate any tasks and agents information
|
||||
|
||||
# Set COGNEE_API_KEY if not already set
|
||||
if "LLM_API_KEY" not in os.environ:
|
||||
openai_api_key = os.environ.get("OPENAI_API_KEY")
|
||||
if openai_api_key:
|
||||
os.environ["LLM_API_KEY"] = openai_api_key
|
||||
|
||||
|
||||
def run():
|
||||
"""
|
||||
Run the crew.
|
||||
"""
|
||||
inputs = {"topic": "AI LLMs", "current_year": str(datetime.now().year)}
|
||||
|
||||
try:
|
||||
LatestAiDevelopment().crew().kickoff(inputs=inputs)
|
||||
except Exception as e:
|
||||
raise Exception(f"An error occurred while running the crew: {e}")
|
||||
|
||||
|
||||
def train():
|
||||
"""
|
||||
Train the crew for a given number of iterations.
|
||||
"""
|
||||
inputs = {"topic": "AI LLMs"}
|
||||
try:
|
||||
LatestAiDevelopment().crew().train(
|
||||
n_iterations=int(sys.argv[1]), filename=sys.argv[2], inputs=inputs
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"An error occurred while training the crew: {e}")
|
||||
|
||||
|
||||
def replay():
|
||||
"""
|
||||
Replay the crew execution from a specific task.
|
||||
"""
|
||||
try:
|
||||
LatestAiDevelopment().crew().replay(task_id=sys.argv[1])
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"An error occurred while replaying the crew: {e}")
|
||||
|
||||
|
||||
def test():
|
||||
"""
|
||||
Test the crew execution and returns the results.
|
||||
"""
|
||||
inputs = {"topic": "AI LLMs"}
|
||||
try:
|
||||
LatestAiDevelopment().crew().test(
|
||||
n_iterations=int(sys.argv[1]), openai_model_name=sys.argv[2], inputs=inputs
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"An error occurred while testing the crew: {e}")
|
||||
|
|
@ -0,0 +1 @@
|
|||
This is a dummy text file for testing DirectoryReadTool.
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
from .custom_tool import CogneeAdd, CogneeSearch, CogneeAddInput, CogneeSearchInput
|
||||
|
||||
__all__ = ["CogneeAdd", "CogneeSearch", "CogneeAddInput", "CogneeSearchInput"]
|
||||
|
|
@ -0,0 +1,208 @@
|
|||
from crewai.tools import BaseTool
|
||||
from typing import Type, List, Optional
|
||||
from pydantic import BaseModel, Field, root_validator
|
||||
from cognee.api.v1.search import SearchType
|
||||
from cognee.modules.users.methods import get_default_user
|
||||
from cognee.modules.pipelines import run_tasks, Task
|
||||
from cognee.tasks.experimental_tasks.node_set_edge_association import node_set_edge_association
|
||||
|
||||
|
||||
class CogneeAddInput(BaseModel):
|
||||
"""Input schema for CogneeAdd tool."""
|
||||
|
||||
context: Optional[str] = Field(None, description="The text content to add to Cognee memory.")
|
||||
file_paths: Optional[List[str]] = Field(
|
||||
None, description="List of file paths to add to Cognee memory."
|
||||
)
|
||||
files: Optional[List[str]] = Field(
|
||||
None, description="Alias for file_paths; list of file URLs or paths to add to memory."
|
||||
)
|
||||
text: Optional[str] = Field(
|
||||
None, description="Alternative field for text content (maps to context)."
|
||||
)
|
||||
reasoning: Optional[str] = Field(
|
||||
None, description="Alternative field for reasoning text (maps to context)."
|
||||
)
|
||||
node_set: List[str] = Field(
|
||||
default=["default"], description="The list of node sets to store the data in."
|
||||
)
|
||||
|
||||
@root_validator(pre=True)
|
||||
def normalize_inputs(cls, values):
|
||||
"""Normalize different input formats to standard fields."""
|
||||
# Map alias 'files' to 'file_paths' if provided
|
||||
if values.get("files") and not values.get("file_paths"):
|
||||
values["file_paths"] = values.get("files")
|
||||
# Map text or reasoning to context if provided
|
||||
if values.get("text") and not values.get("context"):
|
||||
values["context"] = values.get("text")
|
||||
|
||||
if values.get("reasoning") and not values.get("context"):
|
||||
values["context"] = values.get("reasoning")
|
||||
# Map report_section to context if provided
|
||||
if values.get("report_section") and not values.get("context"):
|
||||
values["context"] = values.get("report_section")
|
||||
|
||||
# Validate that at least one input field is provided
|
||||
if not values.get("context") and not values.get("file_paths"):
|
||||
raise ValueError(
|
||||
"Either 'context', 'text', 'reasoning', or 'file_paths' must be provided"
|
||||
)
|
||||
|
||||
return values
|
||||
|
||||
|
||||
class CogneeAdd(BaseTool):
|
||||
name: str = "Cognee Memory ADD"
|
||||
description: str = "Add data to cognee memory to store data in memory for AI memory"
|
||||
args_schema: Type[BaseModel] = CogneeAddInput
|
||||
default_nodeset: List[str] = ["default"] # Can be overridden per instance
|
||||
|
||||
def _run(self, **kwargs) -> str:
|
||||
import cognee
|
||||
import asyncio
|
||||
|
||||
# Use the provided node_set if given, otherwise use default_nodeset
|
||||
node_set = kwargs.get("node_set", self.default_nodeset)
|
||||
context = kwargs.get("context")
|
||||
file_paths = kwargs.get("file_paths")
|
||||
|
||||
# Handle alternative input fields
|
||||
text = kwargs.get("text")
|
||||
reasoning = kwargs.get("reasoning")
|
||||
|
||||
if text and not context:
|
||||
context = text
|
||||
|
||||
if reasoning and not context:
|
||||
context = reasoning
|
||||
|
||||
async def main(ns):
|
||||
try:
|
||||
if context:
|
||||
# Handle text content
|
||||
await cognee.add(context, node_set=ns)
|
||||
elif file_paths:
|
||||
# Handle file paths
|
||||
await cognee.add(file_paths, node_set=ns)
|
||||
|
||||
run = await cognee.cognify()
|
||||
tasks = [Task(node_set_edge_association)]
|
||||
|
||||
user = await get_default_user()
|
||||
pipeline = run_tasks(tasks=tasks, user=user)
|
||||
|
||||
async for pipeline_status in pipeline:
|
||||
print(
|
||||
f"Pipeline run status: {pipeline_status.pipeline_name} - {pipeline_status.status}"
|
||||
)
|
||||
|
||||
return run
|
||||
except Exception as e:
|
||||
return f"Error: {str(e)}"
|
||||
|
||||
# Get the current event loop or create a new one
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_running():
|
||||
# If loop is already running, create a new one
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
result = loop.run_until_complete(main(node_set))
|
||||
return result.__name__ if hasattr(result, "__name__") else str(result)
|
||||
except Exception as e:
|
||||
return f"Tool execution error: {str(e)}"
|
||||
|
||||
|
||||
class CogneeSearchInput(BaseModel):
|
||||
"""Input schema for CogneeSearch tool."""
|
||||
|
||||
query_text: Optional[str] = Field(
|
||||
None, description="The search query to find relevant information in Cognee memory."
|
||||
)
|
||||
query: Optional[str] = Field(
|
||||
None, description="Alternative field for search query (maps to query_text)."
|
||||
)
|
||||
search_term: Optional[str] = Field(
|
||||
None, description="Alternative field for search term (maps to query_text)."
|
||||
)
|
||||
node_set: List[str] = Field(
|
||||
default=["default"], description="The list of node sets to search in."
|
||||
)
|
||||
|
||||
@root_validator(pre=True)
|
||||
def normalize_inputs(cls, values):
|
||||
"""Normalize different input formats to standard fields."""
|
||||
# If the dictionary is empty, use a default query
|
||||
if not values:
|
||||
values["query_text"] = "Latest AI developments"
|
||||
return values
|
||||
|
||||
# Map alternative search fields to query_text
|
||||
if values.get("query") and not values.get("query_text"):
|
||||
values["query_text"] = values.get("query")
|
||||
|
||||
if values.get("search_term") and not values.get("query_text"):
|
||||
values["query_text"] = values.get("search_term")
|
||||
|
||||
# If security_context is provided but no query, use a default
|
||||
if "security_context" in values and not values.get("query_text"):
|
||||
values["query_text"] = "Latest AI developments"
|
||||
|
||||
# Ensure query_text is present
|
||||
if not values.get("query_text"):
|
||||
values["query_text"] = "Latest AI developments"
|
||||
|
||||
return values
|
||||
|
||||
|
||||
class CogneeSearch(BaseTool):
|
||||
name: str = "Cognee Memory SEARCH"
|
||||
description: str = "Search data from cognee memory to retrieve relevant information"
|
||||
args_schema: Type[BaseModel] = CogneeSearchInput
|
||||
default_nodeset: List[str] = ["default"] # Can be overridden per instance
|
||||
|
||||
def _run(self, **kwargs) -> str:
|
||||
import cognee
|
||||
import asyncio
|
||||
|
||||
# Use the provided node_set if given, otherwise use default_nodeset
|
||||
node_set = kwargs.get("node_set", self.default_nodeset)
|
||||
|
||||
# Get query_text from kwargs or use a default
|
||||
query_text = kwargs.get("query_text", "Latest AI developments")
|
||||
|
||||
# Handle alternative input fields
|
||||
query = kwargs.get("query")
|
||||
search_term = kwargs.get("search_term")
|
||||
|
||||
if query and not query_text:
|
||||
query_text = query
|
||||
|
||||
if search_term and not query_text:
|
||||
query_text = search_term
|
||||
|
||||
async def main(query, ns):
|
||||
try:
|
||||
# Use 'datasets' to specify which node sets (datasets) to search
|
||||
result = await cognee.search(
|
||||
query_text=query + " Only return results from context",
|
||||
query_type=SearchType.GRAPH_COMPLETION,
|
||||
datasets=ns,
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
return f"Error: {str(e)}"
|
||||
|
||||
# Get the current event loop or create a new one
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_running():
|
||||
# If loop is already running, create a new one
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
result = loop.run_until_complete(main(query_text, node_set))
|
||||
return str(result)
|
||||
except Exception as e:
|
||||
return f"Tool execution error: {str(e)}"
|
||||
46
examples/python/latest_ai_development/test_cognee_tools.py
Normal file
46
examples/python/latest_ai_development/test_cognee_tools.py
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
#!/usr/bin/env python
|
||||
"""
|
||||
Script to test if Cognee tools are working correctly.
|
||||
Run this script to test if the tools are correctly importing and functioning.
|
||||
"""
|
||||
|
||||
import os
|
||||
import cognee
|
||||
from src.latest_ai_development.tools import CogneeAdd, CogneeSearch
|
||||
|
||||
# Set COGNEE_API_KEY if not already set
|
||||
if "LLM_API_KEY" not in os.environ:
|
||||
openai_api_key = os.environ.get("OPENAI_API_KEY")
|
||||
if openai_api_key:
|
||||
os.environ["LLM_API_KEY"] = openai_api_key
|
||||
|
||||
|
||||
def test_tools():
|
||||
"""Test the CogneeAdd and CogneeSearch tools."""
|
||||
print("Testing Cognee tools...")
|
||||
|
||||
print("\nTesting CogneeAdd tool...")
|
||||
add_tool = CogneeAdd()
|
||||
test_input = (
|
||||
"This is a test text to add to Cognee memory. It contains information about AI LLMs."
|
||||
)
|
||||
node_set = ["AI", "LLMs"]
|
||||
try:
|
||||
result = add_tool._run(context=test_input, node_set=node_set)
|
||||
print(f"CogneeAdd result: {result}")
|
||||
except Exception as e:
|
||||
print(f"Error testing CogneeAdd: {str(e)}")
|
||||
|
||||
print("\nTesting CogneeSearch tool...")
|
||||
search_tool = CogneeSearch()
|
||||
search_query = "AI LLMs"
|
||||
node_set = ["AI"]
|
||||
try:
|
||||
result = search_tool._run(query_text=search_query, node_set=node_set)
|
||||
print(f"CogneeSearch result: {result}")
|
||||
except Exception as e:
|
||||
print(f"Error testing CogneeSearch: {str(e)}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_tools()
|
||||
|
|
@ -1,26 +1,40 @@
|
|||
import os
|
||||
import asyncio
|
||||
import cognee
|
||||
from cognee.api.v1.visualize.visualize import visualize_graph
|
||||
from cognee.shared.logging_utils import get_logger, ERROR
|
||||
from cognee.api.v1.search import SearchType
|
||||
from cognee.modules.users.methods import get_default_user
|
||||
from cognee.modules.pipelines import run_tasks, Task
|
||||
from cognee.tasks.experimental_tasks.node_set_edge_association import node_set_edge_association
|
||||
|
||||
text_a = """
|
||||
AI is revolutionizing financial services through intelligent fraud detection
|
||||
and automated customer service platforms.
|
||||
Leading financial technology firms like Stripe, Square, and Revolut are redefining digital commerce by embedding AI
|
||||
into their payment ecosystems. Stripe leverages machine learning to detect and prevent fraud in real time,
|
||||
while Square uses predictive analytics to offer customized lending solutions to small businesses.
|
||||
Meanwhile, Revolut applies AI algorithms to automate wealth management services, enabling users to invest,
|
||||
save, and budget with unparalleled personalization and efficiency.
|
||||
"""
|
||||
|
||||
text_b = """
|
||||
Advances in AI are enabling smarter systems that learn and adapt over time.
|
||||
Pioneering AI companies such as OpenAI, Anthropic, and DeepMind are advancing self-supervised
|
||||
learning techniques that empower systems to autonomously evolve their cognitive capabilities.
|
||||
OpenAI's models interpret complex multimodal data with minimal human annotation, while Anthropic’s
|
||||
Constitutional AI approach refines alignment and safety. DeepMind continues to push boundaries with
|
||||
breakthroughs like AlphaFold, illustrating the power of AI to decipher intricate biological structures
|
||||
without exhaustive manual input.
|
||||
"""
|
||||
|
||||
text_c = """
|
||||
MedTech startups have seen significant growth in recent years, driven by innovation
|
||||
in digital health and medical devices.
|
||||
MedTech innovators like Medtronic, Butterfly Network, and Intuitive Surgical are revolutionizing
|
||||
healthcare delivery through smart devices and AI-driven platforms. Medtronic's connected insulin
|
||||
pumps enable real-time glucose monitoring, Butterfly Network’s portable ultrasound devices bring
|
||||
diagnostic imaging to remote areas, and Intuitive Surgical’s da Vinci system enhances precision
|
||||
in minimally invasive surgeries. Together, these companies are reshaping clinical pathways and
|
||||
extending care beyond traditional hospital settings.
|
||||
"""
|
||||
|
||||
node_set_a = ["AI", "FinTech"]
|
||||
node_set_b = ["AI"]
|
||||
node_set_c = ["MedTech"]
|
||||
node_set_a = ["AI_NODESET", "FinTech_NODESET"]
|
||||
node_set_b = ["AI_NODESET"]
|
||||
node_set_c = ["MedTech_NODESET"]
|
||||
|
||||
|
||||
async def main():
|
||||
|
|
@ -32,10 +46,15 @@ async def main():
|
|||
await cognee.add(text_c, node_set=node_set_c)
|
||||
await cognee.cognify()
|
||||
|
||||
visualization_path = os.path.join(
|
||||
os.path.dirname(__file__), "./.artifacts/graph_visualization.html"
|
||||
)
|
||||
await visualize_graph(visualization_path)
|
||||
tasks = [Task(node_set_edge_association)]
|
||||
|
||||
user = await get_default_user()
|
||||
pipeline = run_tasks(tasks=tasks, user=user)
|
||||
|
||||
async for pipeline_status in pipeline:
|
||||
print(f"Pipeline run status: {pipeline_status.pipeline_name} - {pipeline_status.status}")
|
||||
|
||||
print()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue