Merge branch 'main' into clean_dspy

This commit is contained in:
Vasilije 2024-11-14 14:50:43 +01:00 committed by GitHub
commit bc2e17592d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 710 additions and 7 deletions

View file

@ -62,3 +62,8 @@ class GraphDBInterface(Protocol):
async def delete_graph(
self,
): raise NotImplementedError
@abstractmethod
async def get_graph_data(
self
): raise NotImplementedError

View file

@ -27,9 +27,6 @@ class Neo4jAdapter(GraphDBInterface):
max_connection_lifetime = 120
)
async def close(self) -> None:
await self.driver.close()
@asynccontextmanager
async def get_session(self) -> AsyncSession:
async with self.driver.session() as session:

View file

@ -112,10 +112,18 @@ class LanceDBAdapter(VectorDBInterface):
for (data_point_index, data_point) in enumerate(data_points)
]
await collection.merge_insert("id") \
.when_matched_update_all() \
.when_not_matched_insert_all() \
.execute(lance_data_points)
# TODO: This enables us to work with pydantic version but shouldn't
# stay like this, existing rows should be updated
await collection.delete("id IS NOT NULL")
original_size = await collection.count_rows()
await collection.add(lance_data_points)
new_size = await collection.count_rows()
if new_size <= original_size:
raise ValueError(
"LanceDB create_datapoints error: data points did not get added.")
async def retrieve(self, collection_name: str, data_point_ids: list[str]):

View file

@ -17,6 +17,7 @@ class TextChunker():
self.get_text = get_text
def read(self):
self.paragraph_chunks = []
for content_text in self.get_text():
for chunk_data in chunk_by_paragraph(
content_text,

View file

@ -0,0 +1,35 @@
from abc import ABC, abstractmethod
from typing import List, Dict, Union
from cognee.modules.graph.cognee_graph.CogneeGraphElements import Node, Edge
from cognee.infrastructure.databases.graph.graph_db_interface import GraphDBInterface
class CogneeAbstractGraph(ABC):
"""
Abstract base class for representing a graph structure.
"""
@abstractmethod
def add_node(self, node: Node) -> None:
"""Add a node to the graph."""
pass
@abstractmethod
def add_edge(self, edge: Edge) -> None:
"""Add an edge to the graph."""
pass
@abstractmethod
def get_node(self, node_id: str) -> Node:
"""Retrieve a node by its ID."""
pass
@abstractmethod
def get_edges(self, node_id: str) -> List[Edge]:
"""Retrieve edges connected to a specific node."""
pass
@abstractmethod
async def project_graph_from_db(self, adapter: GraphDBInterface, directed: bool, dimension: int) -> None:
"""Project the graph structure from a database using the provided adapter."""
pass

View file

@ -0,0 +1,91 @@
from typing import List, Dict, Union
from cognee.infrastructure.databases.graph.graph_db_interface import GraphDBInterface
from cognee.modules.graph.cognee_graph.CogneeGraphElements import Node, Edge
from cognee.modules.graph.cognee_graph.CogneeAbstractGraph import CogneeAbstractGraph
from cognee.infrastructure.databases.graph import get_graph_engine
class CogneeGraph(CogneeAbstractGraph):
"""
Concrete implementation of the AbstractGraph class for Cognee.
This class provides the functionality to manage nodes and edges,
and project a graph from a database using adapters.
"""
nodes: Dict[str, Node]
edges: List[Edge]
directed: bool
def __init__(self, directed: bool = True):
self.nodes = {}
self.edges = []
self.directed = directed
def add_node(self, node: Node) -> None:
if node.id not in self.nodes:
self.nodes[node.id] = node
else:
raise ValueError(f"Node with id {node.id} already exists.")
def add_edge(self, edge: Edge) -> None:
if edge not in self.edges:
self.edges.append(edge)
edge.node1.add_skeleton_edge(edge)
edge.node2.add_skeleton_edge(edge)
else:
raise ValueError(f"Edge {edge} already exists in the graph.")
def get_node(self, node_id: str) -> Node:
return self.nodes.get(node_id, None)
def get_edges(self, node_id: str) -> List[Edge]:
node = self.get_node(node_id)
if node:
return node.skeleton_edges
else:
raise ValueError(f"Node with id {node_id} does not exist.")
async def project_graph_from_db(self,
adapter: Union[GraphDBInterface],
node_properties_to_project: List[str],
edge_properties_to_project: List[str],
directed = True,
node_dimension = 1,
edge_dimension = 1) -> None:
if node_dimension < 1 or edge_dimension < 1:
raise ValueError("Dimensions must be positive integers")
try:
nodes_data, edges_data = await adapter.get_graph_data()
if not nodes_data:
raise ValueError("No node data retrieved from the database.")
if not edges_data:
raise ValueError("No edge data retrieved from the database.")
for node_id, properties in nodes_data:
node_attributes = {key: properties.get(key) for key in node_properties_to_project}
self.add_node(Node(str(node_id), node_attributes, dimension=node_dimension))
for source_id, target_id, relationship_type, properties in edges_data:
source_node = self.get_node(str(source_id))
target_node = self.get_node(str(target_id))
if source_node and target_node:
edge_attributes = {key: properties.get(key) for key in edge_properties_to_project}
edge_attributes['relationship_type'] = relationship_type
edge = Edge(source_node, target_node, attributes=edge_attributes, directed=directed, dimension=edge_dimension)
self.add_edge(edge)
source_node.add_skeleton_edge(edge)
target_node.add_skeleton_edge(edge)
else:
raise ValueError(f"Edge references nonexistent nodes: {source_id} -> {target_id}")
except (ValueError, TypeError) as e:
print(f"Error projecting graph: {e}")
except Exception as ex:
print(f"Unexpected error: {ex}")

View file

@ -0,0 +1,114 @@
import numpy as np
from typing import List, Dict, Optional, Any
class Node:
"""
Represents a node in a graph.
Attributes:
id (str): A unique identifier for the node.
attributes (Dict[str, Any]): A dictionary of attributes associated with the node.
neighbors (List[Node]): Represents the original nodes
skeleton_edges (List[Edge]): Represents the original edges
"""
id: str
attributes: Dict[str, Any]
skeleton_neighbours: List["Node"]
skeleton_edges: List["Edge"]
status: np.ndarray
def __init__(self, node_id: str, attributes: Optional[Dict[str, Any]] = None, dimension: int = 1):
if dimension <= 0:
raise ValueError("Dimension must be a positive integer")
self.id = node_id
self.attributes = attributes if attributes is not None else {}
self.skeleton_neighbours = []
self.skeleton_edges = []
self.status = np.ones(dimension, dtype=int)
def add_skeleton_neighbor(self, neighbor: "Node") -> None:
if neighbor not in self.skeleton_neighbours:
self.skeleton_neighbours.append(neighbor)
def remove_skeleton_neighbor(self, neighbor: "Node") -> None:
if neighbor in self.skeleton_neighbours:
self.skeleton_neighbours.remove(neighbor)
def add_skeleton_edge(self, edge: "Edge") -> None:
if edge not in self.skeleton_edges:
self.skeleton_edges.append(edge)
# Add neighbor
if edge.node1 == self:
self.add_skeleton_neighbor(edge.node2)
elif edge.node2 == self:
self.add_skeleton_neighbor(edge.node1)
def remove_skeleton_edge(self, edge: "Edge") -> None:
if edge in self.skeleton_edges:
self.skeleton_edges.remove(edge)
# Remove neighbor if no other edge connects them
neighbor = edge.node2 if edge.node1 == self else edge.node1
if all(e.node1 != neighbor and e.node2 != neighbor for e in self.skeleton_edges):
self.remove_skeleton_neighbor(neighbor)
def is_node_alive_in_dimension(self, dimension: int) -> bool:
if dimension < 0 or dimension >= len(self.status):
raise ValueError(f"Dimension {dimension} is out of range. Valid range is 0 to {len(self.status) - 1}.")
return self.status[dimension] == 1
def __repr__(self) -> str:
return f"Node({self.id}, attributes={self.attributes})"
def __hash__(self) -> int:
return hash(self.id)
def __eq__(self, other: "Node") -> bool:
return isinstance(other, Node) and self.id == other.id
class Edge:
"""
Represents an edge in a graph, connecting two nodes.
Attributes:
node1 (Node): The starting node of the edge.
node2 (Node): The ending node of the edge.
attributes (Dict[str, Any]): A dictionary of attributes associated with the edge.
directed (bool): A flag indicating whether the edge is directed or undirected.
"""
node1: "Node"
node2: "Node"
attributes: Dict[str, Any]
directed: bool
status: np.ndarray
def __init__(self, node1: "Node", node2: "Node", attributes: Optional[Dict[str, Any]] = None, directed: bool = True, dimension: int = 1):
if dimension <= 0:
raise ValueError("Dimensions must be a positive integer.")
self.node1 = node1
self.node2 = node2
self.attributes = attributes if attributes is not None else {}
self.directed = directed
self.status = np.ones(dimension, dtype=int)
def is_edge_alive_in_dimension(self, dimension: int) -> bool:
if dimension < 0 or dimension >= len(self.status):
raise ValueError(f"Dimension {dimension} is out of range. Valid range is 0 to {len(self.status) - 1}.")
return self.status[dimension] == 1
def __repr__(self) -> str:
direction = "->" if self.directed else "--"
return f"Edge({self.node1.id} {direction} {self.node2.id}, attributes={self.attributes})"
def __hash__(self) -> int:
if self.directed:
return hash((self.node1, self.node2))
else:
return hash(frozenset({self.node1, self.node2}))
def __eq__(self, other: "Edge") -> bool:
if not isinstance(other, Edge):
return False
if self.directed:
return self.node1 == other.node1 and self.node2 == other.node2
else:
return {self.node1, self.node2} == {other.node1, other.node2}

View file

@ -0,0 +1,144 @@
import pytest
import numpy as np
from cognee.modules.graph.cognee_graph.CogneeGraphElements import Node, Edge
def test_node_initialization():
"""Test that a Node is initialized correctly."""
node = Node("node1", {"attr1": "value1"}, dimension=2)
assert node.id == "node1"
assert node.attributes == {"attr1": "value1"}
assert len(node.status) == 2
assert np.all(node.status == 1)
def test_node_invalid_dimension():
"""Test that initializing a Node with a non-positive dimension raises an error."""
with pytest.raises(ValueError, match="Dimension must be a positive integer"):
Node("node1", dimension=0)
def test_add_skeleton_neighbor():
"""Test adding a neighbor to a node."""
node1 = Node("node1")
node2 = Node("node2")
node1.add_skeleton_neighbor(node2)
assert node2 in node1.skeleton_neighbours
def test_remove_skeleton_neighbor():
"""Test removing a neighbor from a node."""
node1 = Node("node1")
node2 = Node("node2")
node1.add_skeleton_neighbor(node2)
node1.remove_skeleton_neighbor(node2)
assert node2 not in node1.skeleton_neighbours
def test_add_skeleton_edge():
"""Test adding an edge updates both skeleton_edges and skeleton_neighbours."""
node1 = Node("node1")
node2 = Node("node2")
edge = Edge(node1, node2)
node1.add_skeleton_edge(edge)
assert edge in node1.skeleton_edges
assert node2 in node1.skeleton_neighbours
def test_remove_skeleton_edge():
"""Test removing an edge updates both skeleton_edges and skeleton_neighbours."""
node1 = Node("node1")
node2 = Node("node2")
edge = Edge(node1, node2)
node1.add_skeleton_edge(edge)
node1.remove_skeleton_edge(edge)
assert edge not in node1.skeleton_edges
assert node2 not in node1.skeleton_neighbours
def test_is_node_alive_in_dimension():
"""Test checking node's alive status in a specific dimension."""
node = Node("node1", dimension=2)
assert node.is_node_alive_in_dimension(1)
node.status[1] = 0
assert not node.is_node_alive_in_dimension(1)
def test_node_alive_invalid_dimension():
"""Test that checking alive status with an invalid dimension raises an error."""
node = Node("node1", dimension=1)
with pytest.raises(ValueError, match="Dimension 1 is out of range"):
node.is_node_alive_in_dimension(1)
def test_node_equality():
"""Test equality between nodes."""
node1 = Node("node1")
node2 = Node("node1")
assert node1 == node2
def test_node_hash():
"""Test hashing for Node."""
node = Node("node1")
assert hash(node) == hash("node1")
### Tests for Edge ###
def test_edge_initialization():
"""Test that an Edge is initialized correctly."""
node1 = Node("node1")
node2 = Node("node2")
edge = Edge(node1, node2, {"weight": 10}, directed=False, dimension=2)
assert edge.node1 == node1
assert edge.node2 == node2
assert edge.attributes == {"weight": 10}
assert edge.directed is False
assert len(edge.status) == 2
assert np.all(edge.status == 1)
def test_edge_invalid_dimension():
"""Test that initializing an Edge with a non-positive dimension raises an error."""
node1 = Node("node1")
node2 = Node("node2")
with pytest.raises(ValueError, match="Dimensions must be a positive integer."):
Edge(node1, node2, dimension=0)
def test_is_edge_alive_in_dimension():
"""Test checking edge's alive status in a specific dimension."""
node1 = Node("node1")
node2 = Node("node2")
edge = Edge(node1, node2, dimension=2)
assert edge.is_edge_alive_in_dimension(1)
edge.status[1] = 0
assert not edge.is_edge_alive_in_dimension(1)
def test_edge_alive_invalid_dimension():
"""Test that checking alive status with an invalid dimension raises an error."""
node1 = Node("node1")
node2 = Node("node2")
edge = Edge(node1, node2, dimension=1)
with pytest.raises(ValueError, match="Dimension 1 is out of range"):
edge.is_edge_alive_in_dimension(1)
def test_edge_equality_directed():
"""Test equality between directed edges."""
node1 = Node("node1")
node2 = Node("node2")
edge1 = Edge(node1, node2, directed=True)
edge2 = Edge(node1, node2, directed=True)
assert edge1 == edge2
def test_edge_equality_undirected():
"""Test equality between undirected edges."""
node1 = Node("node1")
node2 = Node("node2")
edge1 = Edge(node1, node2, directed=False)
edge2 = Edge(node2, node1, directed=False)
assert edge1 == edge2
def test_edge_hash_directed():
"""Test hashing for directed edges."""
node1 = Node("node1")
node2 = Node("node2")
edge = Edge(node1, node2, directed=True)
assert hash(edge) == hash((node1, node2))
def test_edge_hash_undirected():
"""Test hashing for undirected edges."""
node1 = Node("node1")
node2 = Node("node2")
edge = Edge(node1, node2, directed=False)
assert hash(edge) == hash(frozenset({node1, node2}))

View file

@ -0,0 +1,79 @@
import pytest
from cognee.modules.graph.cognee_graph.CogneeGraphElements import Node, Edge
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
@pytest.fixture
def setup_graph():
"""Fixture to initialize a CogneeGraph instance."""
return CogneeGraph()
def test_add_node_success(setup_graph):
"""Test successful addition of a node."""
graph = setup_graph
node = Node("node1")
graph.add_node(node)
assert graph.get_node("node1") == node
def test_add_duplicate_node(setup_graph):
"""Test adding a duplicate node raises an exception."""
graph = setup_graph
node = Node("node1")
graph.add_node(node)
with pytest.raises(ValueError, match="Node with id node1 already exists."):
graph.add_node(node)
def test_add_edge_success(setup_graph):
"""Test successful addition of an edge."""
graph = setup_graph
node1 = Node("node1")
node2 = Node("node2")
graph.add_node(node1)
graph.add_node(node2)
edge = Edge(node1, node2)
graph.add_edge(edge)
assert edge in graph.edges
assert edge in node1.skeleton_edges
assert edge in node2.skeleton_edges
def test_add_duplicate_edge(setup_graph):
"""Test adding a duplicate edge raises an exception."""
graph = setup_graph
node1 = Node("node1")
node2 = Node("node2")
graph.add_node(node1)
graph.add_node(node2)
edge = Edge(node1, node2)
graph.add_edge(edge)
with pytest.raises(ValueError, match="Edge .* already exists in the graph."):
graph.add_edge(edge)
def test_get_node_success(setup_graph):
"""Test retrieving an existing node."""
graph = setup_graph
node = Node("node1")
graph.add_node(node)
assert graph.get_node("node1") == node
def test_get_node_nonexistent(setup_graph):
"""Test retrieving a nonexistent node returns None."""
graph = setup_graph
assert graph.get_node("nonexistent") is None
def test_get_edges_success(setup_graph):
"""Test retrieving edges of a node."""
graph = setup_graph
node1 = Node("node1")
node2 = Node("node2")
graph.add_node(node1)
graph.add_node(node2)
edge = Edge(node1, node2)
graph.add_edge(edge)
assert edge in graph.get_edges("node1")
def test_get_edges_nonexistent_node(setup_graph):
"""Test retrieving edges for a nonexistent node raises an exception."""
graph = setup_graph
with pytest.raises(ValueError, match="Node with id nonexistent does not exist."):
graph.get_edges("nonexistent")

View file

@ -0,0 +1,229 @@
import cognee
import asyncio
from cognee.api.v1.search import SearchType
job_position = """0:Senior Data Scientist (Machine Learning)
Company: TechNova Solutions
Location: San Francisco, CA
Job Description:
TechNova Solutions is seeking a Senior Data Scientist specializing in Machine Learning to join our dynamic analytics team. The ideal candidate will have a strong background in developing and deploying machine learning models, working with large datasets, and translating complex data into actionable insights.
Responsibilities:
Develop and implement advanced machine learning algorithms and models.
Analyze large, complex datasets to extract meaningful patterns and insights.
Collaborate with cross-functional teams to integrate predictive models into products.
Stay updated with the latest advancements in machine learning and data science.
Mentor junior data scientists and provide technical guidance.
Qualifications:
Masters or Ph.D. in Data Science, Computer Science, Statistics, or a related field.
5+ years of experience in data science and machine learning.
Proficient in Python, R, and SQL.
Experience with deep learning frameworks (e.g., TensorFlow, PyTorch).
Strong problem-solving skills and attention to detail.
Candidate CVs
"""
job_1 = """
CV 1: Relevant
Name: Dr. Emily Carter
Contact Information:
Email: emily.carter@example.com
Phone: (555) 123-4567
Summary:
Senior Data Scientist with over 8 years of experience in machine learning and predictive analytics. Expertise in developing advanced algorithms and deploying scalable models in production environments.
Education:
Ph.D. in Computer Science, Stanford University (2014)
B.S. in Mathematics, University of California, Berkeley (2010)
Experience:
Senior Data Scientist, InnovateAI Labs (2016 Present)
Led a team in developing machine learning models for natural language processing applications.
Implemented deep learning algorithms that improved prediction accuracy by 25%.
Collaborated with cross-functional teams to integrate models into cloud-based platforms.
Data Scientist, DataWave Analytics (2014 2016)
Developed predictive models for customer segmentation and churn analysis.
Analyzed large datasets using Hadoop and Spark frameworks.
Skills:
Programming Languages: Python, R, SQL
Machine Learning: TensorFlow, Keras, Scikit-Learn
Big Data Technologies: Hadoop, Spark
Data Visualization: Tableau, Matplotlib
"""
job_2 = """
CV 2: Relevant
Name: Michael Rodriguez
Contact Information:
Email: michael.rodriguez@example.com
Phone: (555) 234-5678
Summary:
Data Scientist with a strong background in machine learning and statistical modeling. Skilled in handling large datasets and translating data into actionable business insights.
Education:
M.S. in Data Science, Carnegie Mellon University (2013)
B.S. in Computer Science, University of Michigan (2011)
Experience:
Senior Data Scientist, Alpha Analytics (2017 Present)
Developed machine learning models to optimize marketing strategies.
Reduced customer acquisition cost by 15% through predictive modeling.
Data Scientist, TechInsights (2013 2017)
Analyzed user behavior data to improve product features.
Implemented A/B testing frameworks to evaluate product changes.
Skills:
Programming Languages: Python, Java, SQL
Machine Learning: Scikit-Learn, XGBoost
Data Visualization: Seaborn, Plotly
Databases: MySQL, MongoDB
"""
job_3 = """
CV 3: Relevant
Name: Sarah Nguyen
Contact Information:
Email: sarah.nguyen@example.com
Phone: (555) 345-6789
Summary:
Data Scientist specializing in machine learning with 6 years of experience. Passionate about leveraging data to drive business solutions and improve product performance.
Education:
M.S. in Statistics, University of Washington (2014)
B.S. in Applied Mathematics, University of Texas at Austin (2012)
Experience:
Data Scientist, QuantumTech (2016 Present)
Designed and implemented machine learning algorithms for financial forecasting.
Improved model efficiency by 20% through algorithm optimization.
Junior Data Scientist, DataCore Solutions (2014 2016)
Assisted in developing predictive models for supply chain optimization.
Conducted data cleaning and preprocessing on large datasets.
Skills:
Programming Languages: Python, R
Machine Learning Frameworks: PyTorch, Scikit-Learn
Statistical Analysis: SAS, SPSS
Cloud Platforms: AWS, Azure
"""
job_4 = """
CV 4: Not Relevant
Name: David Thompson
Contact Information:
Email: david.thompson@example.com
Phone: (555) 456-7890
Summary:
Creative Graphic Designer with over 8 years of experience in visual design and branding. Proficient in Adobe Creative Suite and passionate about creating compelling visuals.
Education:
B.F.A. in Graphic Design, Rhode Island School of Design (2012)
Experience:
Senior Graphic Designer, CreativeWorks Agency (2015 Present)
Led design projects for clients in various industries.
Created branding materials that increased client engagement by 30%.
Graphic Designer, Visual Innovations (2012 2015)
Designed marketing collateral, including brochures, logos, and websites.
Collaborated with the marketing team to develop cohesive brand strategies.
Skills:
Design Software: Adobe Photoshop, Illustrator, InDesign
Web Design: HTML, CSS
Specialties: Branding and Identity, Typography
"""
job_5 = """
CV 5: Not Relevant
Name: Jessica Miller
Contact Information:
Email: jessica.miller@example.com
Phone: (555) 567-8901
Summary:
Experienced Sales Manager with a strong track record in driving sales growth and building high-performing teams. Excellent communication and leadership skills.
Education:
B.A. in Business Administration, University of Southern California (2010)
Experience:
Sales Manager, Global Enterprises (2015 Present)
Managed a sales team of 15 members, achieving a 20% increase in annual revenue.
Developed sales strategies that expanded customer base by 25%.
Sales Representative, Market Leaders Inc. (2010 2015)
Consistently exceeded sales targets and received the 'Top Salesperson' award in 2013.
Skills:
Sales Strategy and Planning
Team Leadership and Development
CRM Software: Salesforce, Zoho
Negotiation and Relationship Building
"""
async def main(enable_steps):
# Step 1: Reset data and system state
if enable_steps.get("prune_data"):
await cognee.prune.prune_data()
print("Data pruned.")
if enable_steps.get("prune_system"):
await cognee.prune.prune_system(metadata=True)
print("System pruned.")
# Step 2: Add text
if enable_steps.get("add_text"):
text_list = [job_position, job_1, job_2, job_3, job_4, job_5]
for text in text_list:
await cognee.add(text)
print(f"Added text: {text[:35]}...")
# Step 3: Create knowledge graph
if enable_steps.get("cognify"):
await cognee.cognify()
print("Knowledge graph created.")
# Step 4: Query insights
if enable_steps.get("search_insights"):
search_results = await cognee.search(
SearchType.INSIGHTS,
{'query': 'Which applicant has the most relevant experience in data science?'}
)
print("Search results:")
for result_text in search_results:
print(result_text)
if __name__ == '__main__':
# Flags to enable/disable steps
steps_to_enable = {
"prune_data": True,
"prune_system": True,
"add_text": True,
"cognify": True,
"search_insights": True
}
asyncio.run(main(steps_to_enable))