Merge branch 'main' into feat/COG-553-graph-memory-projection

This commit is contained in:
hajdul88 2024-11-13 16:45:13 +01:00 committed by GitHub
commit bf4eedd20e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 359 additions and 183 deletions

View file

@ -50,8 +50,11 @@ jobs:
- name: Install dependencies
run: poetry install --no-interaction
- name: Run tests
run: poetry run pytest tests/
- name: Run unit tests
run: poetry run pytest cognee/tests/unit/
- name: Run integration tests
run: poetry run pytest cognee/tests/integration/
- name: Run default basic pipeline
env:

View file

@ -50,8 +50,11 @@ jobs:
- name: Install dependencies
run: poetry install --no-interaction
- name: Run tests
run: poetry run pytest tests/
- name: Run unit tests
run: poetry run pytest cognee/tests/unit/
- name: Run integration tests
run: poetry run pytest cognee/tests/integration/
- name: Run default basic pipeline
env:

View file

@ -50,8 +50,11 @@ jobs:
- name: Install dependencies
run: poetry install --no-interaction
- name: Run tests
run: poetry run pytest tests/
- name: Run unit tests
run: poetry run pytest cognee/tests/unit/
- name: Run integration tests
run: poetry run pytest cognee/tests/integration/
- name: Run default basic pipeline
env:

View file

@ -1,72 +0,0 @@
from enum import Enum
from typing import Optional
from cognee.infrastructure.engine import DataPoint
from cognee.modules.graph.utils import get_graph_from_model, get_model_instance_from_graph
if __name__ == "__main__":
class CarTypeName(Enum):
Pickup = "Pickup"
Sedan = "Sedan"
SUV = "SUV"
Coupe = "Coupe"
Convertible = "Convertible"
Hatchback = "Hatchback"
Wagon = "Wagon"
Minivan = "Minivan"
Van = "Van"
class CarType(DataPoint):
id: str
name: CarTypeName
_metadata: dict = dict(index_fields = ["name"])
class Car(DataPoint):
id: str
brand: str
model: str
year: int
color: str
is_type: CarType
class Person(DataPoint):
id: str
name: str
age: int
owns_car: list[Car]
driving_licence: Optional[dict]
_metadata: dict = dict(index_fields = ["name"])
boris = Person(
id = "boris",
name = "Boris",
age = 30,
owns_car = [
Car(
id = "car1",
brand = "Toyota",
model = "Camry",
year = 2020,
color = "Blue",
is_type = CarType(id = "sedan", name = CarTypeName.Sedan),
),
],
driving_licence = {
"issued_by": "PU Vrsac",
"issued_on": "2025-11-06",
"number": "1234567890",
"expires_on": "2025-11-06",
},
)
nodes, edges = get_graph_from_model(boris)
print(nodes)
print(edges)
person_data = nodes[len(nodes) - 1]
parsed_person = get_model_instance_from_graph(nodes, edges, 'boris')
print(parsed_person)

View file

@ -1,13 +0,0 @@
import os
from cognee.modules.data.processing.document_types.PdfDocument import PdfDocument
if __name__ == "__main__":
test_file_path = os.path.join(os.path.dirname(__file__), "artificial-inteligence.pdf")
pdf_doc = PdfDocument("Test document.pdf", test_file_path, chunking_strategy="paragraph")
reader = pdf_doc.get_reader()
for paragraph_data in reader.read():
print(paragraph_data["word_count"])
print(paragraph_data["text"])
print(paragraph_data["cut_type"])
print("\n")

View file

@ -1,14 +0,0 @@
import asyncio
from cognee.shared.utils import render_graph
from cognee.infrastructure.databases.graph import get_graph_engine
if __name__ == "__main__":
async def main():
graph_client = await get_graph_engine()
graph = graph_client.graph
graph_url = await render_graph(graph)
print(graph_url)
asyncio.run(main())

View file

@ -1,53 +0,0 @@
from cognee.tasks.chunks import chunk_by_paragraph
if __name__ == "__main__":
def test_chunking_on_whole_text():
test_text = """This is example text. It contains multiple sentences.
This is a second paragraph. First two paragraphs are whole.
Third paragraph is a bit longer and is finished with a dot."""
chunks = []
for chunk_data in chunk_by_paragraph(test_text, 12, batch_paragraphs = False):
chunks.append(chunk_data)
assert len(chunks) == 3
assert chunks[0]["text"] == "This is example text. It contains multiple sentences."
assert chunks[0]["word_count"] == 8
assert chunks[0]["cut_type"] == "paragraph_end"
assert chunks[1]["text"] == "This is a second paragraph. First two paragraphs are whole."
assert chunks[1]["word_count"] == 10
assert chunks[1]["cut_type"] == "paragraph_end"
assert chunks[2]["text"] == "Third paragraph is a bit longer and is finished with a dot."
assert chunks[2]["word_count"] == 12
assert chunks[2]["cut_type"] == "sentence_end"
def test_chunking_on_cut_text():
test_text = """This is example text. It contains multiple sentences.
This is a second paragraph. First two paragraphs are whole.
Third paragraph is cut and is missing the dot at the end"""
chunks = []
for chunk_data in chunk_by_paragraph(test_text, 12, batch_paragraphs = False):
chunks.append(chunk_data)
assert len(chunks) == 3
assert chunks[0]["text"] == "This is example text. It contains multiple sentences."
assert chunks[0]["word_count"] == 8
assert chunks[0]["cut_type"] == "paragraph_end"
assert chunks[1]["text"] == "This is a second paragraph. First two paragraphs are whole."
assert chunks[1]["word_count"] == 10
assert chunks[1]["cut_type"] == "paragraph_end"
assert chunks[2]["text"] == "Third paragraph is cut and is missing the dot at the end"
assert chunks[2]["word_count"] == 12
assert chunks[2]["cut_type"] == "sentence_cut"
test_chunking_on_whole_text()
test_chunking_on_cut_text()

View file

@ -13,5 +13,4 @@ def classify_documents(data_documents: list[Data]) -> list[Document]:
EXTENSION_TO_DOCUMENT_CLASS[data_item.extension](id = data_item.id, title=f"{data_item.name}.{data_item.extension}", raw_data_location=data_item.raw_data_location, name=data_item.name)
for data_item in data_documents
]
return documents

View file

@ -0,0 +1,11 @@
import os
import pytest
@pytest.fixture(autouse=True, scope="session")
def copy_cognee_db_to_target_location():
os.makedirs("cognee/.cognee_system/databases/", exist_ok=True)
os.system(
"cp cognee/tests/integration/run_toy_tasks/data/cognee_db cognee/.cognee_system/databases/cognee_db"
)

Binary file not shown.

View file

@ -1,8 +1,10 @@
import asyncio
from queue import Queue
from cognee.modules.pipelines.operations.run_tasks import run_tasks
from cognee.modules.pipelines.tasks.Task import Task
async def pipeline(data_queue):
async def queue_consumer():
while not data_queue.is_closed:
@ -17,20 +19,25 @@ async def pipeline(data_queue):
async def multiply_by_two(num):
yield num * 2
tasks_run = run_tasks([
Task(queue_consumer),
Task(add_one),
Task(multiply_by_two),
])
tasks_run = run_tasks(
[
Task(queue_consumer),
Task(add_one),
Task(multiply_by_two),
],
pipeline_name="test_run_tasks_from_queue",
)
results = [2, 4, 6, 8, 10, 12, 14, 16, 18]
results = [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
index = 0
async for result in tasks_run:
print(result)
assert result == results[index]
assert (
result == results[index]
), f"at {index = }: {result = } != {results[index] = }"
index += 1
async def main():
async def run_queue():
data_queue = Queue()
data_queue.is_closed = False
@ -42,5 +49,6 @@ async def main():
await asyncio.gather(pipeline(data_queue), queue_producer())
if __name__ == "__main__":
asyncio.run(main())
def test_run_tasks_from_queue():
asyncio.run(run_queue())

View file

@ -1,9 +1,10 @@
import asyncio
from cognee.modules.pipelines.operations.run_tasks import run_tasks
from cognee.modules.pipelines.tasks.Task import Task
async def main():
async def run_and_check_tasks():
def number_generator(num):
for i in range(num):
yield i + 1
@ -18,19 +19,25 @@ async def main():
async def add_one_single(num):
yield num + 1
pipeline = run_tasks([
Task(number_generator),
Task(add_one, task_config = {"batch_size": 5}),
Task(multiply_by_two, task_config = {"batch_size": 1}),
Task(add_one_single),
], 10)
pipeline = run_tasks(
[
Task(number_generator),
Task(add_one, task_config={"batch_size": 5}),
Task(multiply_by_two, task_config={"batch_size": 1}),
Task(add_one_single),
],
10,
pipeline_name="test_run_tasks",
)
results = [5, 7, 9, 11, 13, 15, 17, 19, 21, 23]
index = 0
async for result in pipeline:
print(result)
assert result == results[index]
assert (
result == results[index]
), f"at {index = }: {result = } != {results[index] = }"
index += 1
if __name__ == "__main__":
asyncio.run(main())
def test_run_tasks():
asyncio.run(run_and_check_tasks())

View file

@ -0,0 +1,34 @@
import os
import uuid
from cognee.modules.data.processing.document_types.PdfDocument import PdfDocument
GROUND_TRUTH = [
{"word_count": 879, "len_text": 5622, "cut_type": "sentence_end"},
{"word_count": 951, "len_text": 6384, "cut_type": "sentence_end"},
]
def test_PdfDocument():
test_file_path = os.path.join(
os.sep,
*(os.path.dirname(__file__).split(os.sep)[:-2]),
"test_data",
"artificial-intelligence.pdf",
)
pdf_doc = PdfDocument(
id=uuid.uuid4(), name="Test document.pdf", raw_data_location=test_file_path
)
for ground_truth, paragraph_data in zip(
GROUND_TRUTH, pdf_doc.read(chunk_size=1024)
):
assert (
ground_truth["word_count"] == paragraph_data.word_count
), f'{ground_truth["word_count"] = } != {paragraph_data.word_count = }'
assert ground_truth["len_text"] == len(
paragraph_data.text
), f'{ground_truth["len_text"] = } != {len(paragraph_data.text) = }'
assert (
ground_truth["cut_type"] == paragraph_data.cut_type
), f'{ground_truth["cut_type"] = } != {paragraph_data.cut_type = }'

View file

@ -0,0 +1,80 @@
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
import pytest
from cognee.infrastructure.engine import DataPoint
from cognee.modules.graph.utils import (
get_graph_from_model,
get_model_instance_from_graph,
)
class CarTypeName(Enum):
Pickup = "Pickup"
Sedan = "Sedan"
SUV = "SUV"
Coupe = "Coupe"
Convertible = "Convertible"
Hatchback = "Hatchback"
Wagon = "Wagon"
Minivan = "Minivan"
Van = "Van"
class CarType(DataPoint):
id: str
name: CarTypeName
_metadata: dict = dict(index_fields=["name"])
class Car(DataPoint):
id: str
brand: str
model: str
year: int
color: str
is_type: CarType
class Person(DataPoint):
id: str
name: str
age: int
owns_car: list[Car]
driving_license: Optional[dict]
_metadata: dict = dict(index_fields=["name"])
@pytest.fixture(scope="session")
def graph_outputs():
boris = Person(
id="boris",
name="Boris",
age=30,
owns_car=[
Car(
id="car1",
brand="Toyota",
model="Camry",
year=2020,
color="Blue",
is_type=CarType(id="sedan", name=CarTypeName.Sedan),
)
],
driving_license={
"issued_by": "PU Vrsac",
"issued_on": "2025-11-06",
"number": "1234567890",
"expires_on": "2025-11-06",
},
)
nodes, edges = get_graph_from_model(boris)
car, person = nodes[0], nodes[1]
edge = edges[0]
parsed_person = get_model_instance_from_graph(nodes, edges, "boris")
return (car, person, edge, parsed_person)

View file

@ -0,0 +1,54 @@
from cognee.tests.unit.interfaces.graph.util import run_test_against_ground_truth
EDGE_GROUND_TRUTH = (
"boris",
"car1",
"owns_car",
{
"source_node_id": "boris",
"target_node_id": "car1",
"relationship_name": "owns_car",
"metadata": {"type": "list"},
},
)
CAR_GROUND_TRUTH = {
"id": "car1",
"brand": "Toyota",
"model": "Camry",
"year": 2020,
"color": "Blue",
}
PERSON_GROUND_TRUTH = {
"id": "boris",
"name": "Boris",
"age": 30,
"driving_license": {
"issued_by": "PU Vrsac",
"issued_on": "2025-11-06",
"number": "1234567890",
"expires_on": "2025-11-06",
},
}
def test_extracted_person(graph_outputs):
(_, person, _, _) = graph_outputs
run_test_against_ground_truth("person", person, PERSON_GROUND_TRUTH)
def test_extracted_car(graph_outputs):
(car, _, _, _) = graph_outputs
run_test_against_ground_truth("car", car, CAR_GROUND_TRUTH)
def test_extracted_edge(graph_outputs):
(_, _, edge, _) = graph_outputs
assert (
EDGE_GROUND_TRUTH[:3] == edge[:3]
), f"{EDGE_GROUND_TRUTH[:3] = } != {edge[:3] = }"
for key, ground_truth in EDGE_GROUND_TRUTH[3].items():
assert ground_truth == edge[3][key], f"{ground_truth = } != {edge[3][key] = }"

View file

@ -0,0 +1,29 @@
from cognee.tests.unit.interfaces.graph.util import run_test_against_ground_truth
PARSED_PERSON_GROUND_TRUTH = {
"id": "boris",
"name": "Boris",
"age": 30,
"driving_license": {
"issued_by": "PU Vrsac",
"issued_on": "2025-11-06",
"number": "1234567890",
"expires_on": "2025-11-06",
},
}
CAR_GROUND_TRUTH = {
"id": "car1",
"brand": "Toyota",
"model": "Camry",
"year": 2020,
"color": "Blue",
}
def test_parsed_person(graph_outputs):
(_, _, _, parsed_person) = graph_outputs
run_test_against_ground_truth(
"parsed_person", parsed_person, PARSED_PERSON_GROUND_TRUTH
)
run_test_against_ground_truth("car", parsed_person.owns_car[0], CAR_GROUND_TRUTH)

View file

@ -0,0 +1,30 @@
from datetime import datetime, timezone
from typing import Any, Dict
def run_test_against_ground_truth(
test_target_item_name: str, test_target_item: Any, ground_truth_dict: Dict[str, Any]
):
"""Validates test target item attributes against ground truth values.
Args:
test_target_item_name: Name of the item being tested (for error messages)
test_target_item: Object whose attributes are being validated
ground_truth_dict: Dictionary containing expected values
Raises:
AssertionError: If any attribute doesn't match ground truth or if update timestamp is too old
"""
for key, ground_truth in ground_truth_dict.items():
if isinstance(ground_truth, dict):
for key2, ground_truth2 in ground_truth.items():
assert (
ground_truth2 == getattr(test_target_item, key)[key2]
), f"{test_target_item_name}/{key = }/{key2 = }: {ground_truth2 = } != {getattr(test_target_item, key)[key2] = }"
else:
assert ground_truth == getattr(
test_target_item, key
), f"{test_target_item_name}/{key = }: {ground_truth = } != {getattr(test_target_item, key) = }"
time_delta = datetime.now(timezone.utc) - getattr(test_target_item, "updated_at")
assert time_delta.total_seconds() < 60, f"{ time_delta.total_seconds() = }"

View file

@ -0,0 +1,69 @@
from cognee.tasks.chunks import chunk_by_paragraph
GROUND_TRUTH = {
"whole_text": [
{
"text": "This is example text. It contains multiple sentences.",
"word_count": 8,
"cut_type": "paragraph_end",
},
{
"text": "This is a second paragraph. First two paragraphs are whole.",
"word_count": 10,
"cut_type": "paragraph_end",
},
{
"text": "Third paragraph is a bit longer and is finished with a dot.",
"word_count": 12,
"cut_type": "sentence_end",
},
],
"cut_text": [
{
"text": "This is example text. It contains multiple sentences.",
"word_count": 8,
"cut_type": "paragraph_end",
},
{
"text": "This is a second paragraph. First two paragraphs are whole.",
"word_count": 10,
"cut_type": "paragraph_end",
},
{
"text": "Third paragraph is cut and is missing the dot at the end",
"word_count": 12,
"cut_type": "sentence_cut",
},
],
}
INPUT_TEXT = {
"whole_text": """This is example text. It contains multiple sentences.
This is a second paragraph. First two paragraphs are whole.
Third paragraph is a bit longer and is finished with a dot.""",
"cut_text": """This is example text. It contains multiple sentences.
This is a second paragraph. First two paragraphs are whole.
Third paragraph is cut and is missing the dot at the end""",
}
def run_chunking_test(test_text, expected_chunks):
chunks = []
for chunk_data in chunk_by_paragraph(test_text, 12, batch_paragraphs=False):
chunks.append(chunk_data)
assert len(chunks) == 3
for expected_chunks_item, chunk in zip(expected_chunks, chunks):
for key in ["text", "word_count", "cut_type"]:
assert (
chunk[key] == expected_chunks_item[key]
), f"{key = }: {chunk[key] = } != {expected_chunks_item[key] = }"
def test_chunking_whole_text():
run_chunking_test(INPUT_TEXT["whole_text"], GROUND_TRUTH["whole_text"])
def test_chunking_cut_text():
run_chunking_test(INPUT_TEXT["cut_text"], GROUND_TRUTH["cut_text"])

View file

@ -1,2 +0,0 @@
[pytest]
addopts = tests/