adds dataset subset cognification to the demo

This commit is contained in:
hajdul88 2025-06-02 14:42:08 +02:00
parent 4d7c07e483
commit ae0b6c5cae
6 changed files with 96 additions and 17 deletions

View file

@ -16,9 +16,17 @@ async def add(
pipeline_run_info = None
data_packets = {dataset_name: []}
async for run_info in cognee_pipeline(
tasks=tasks, datasets=dataset_name, data=data, user=user, pipeline_name="add_pipeline"
):
if run_info.status == "PipelineRunYield":
for data_yielded in run_info.payload:
data_packets[dataset_name].append(data_yielded.id)
pipeline_run_info = run_info
if hasattr(pipeline_run_info, "packets"):
pipeline_run_info.packets = data_packets
return pipeline_run_info

View file

@ -1,6 +1,6 @@
import asyncio
from pydantic import BaseModel
from typing import Union, Optional
from typing import Union, Optional, Any
from cognee.modules.users.methods import get_default_user
from cognee.shared.logging_utils import get_logger
@ -32,6 +32,7 @@ update_status_lock = asyncio.Lock()
async def cognify(
datasets: Union[str, list[str]] = None,
datapoints: dict[str, Any] = None,
user: User = None,
graph_model: BaseModel = KnowledgeGraph,
chunker=TextChunker,
@ -46,16 +47,24 @@ async def cognify(
user = await get_default_user()
if run_in_background:
return await run_cognify_as_background_process(tasks, user, datasets)
return await run_cognify_as_background_process(tasks, user, datasets, datapoints=datapoints)
else:
return await run_cognify_blocking(tasks, user, datasets, is_stream_info_enabled)
return await run_cognify_blocking(
tasks, user, datasets, is_stream_info_enabled, datapoints=datapoints
)
async def run_cognify_blocking(tasks, user, datasets, is_stream_info_enabled=False):
async def run_cognify_blocking(
tasks, user, datasets, is_stream_info_enabled=False, datapoints=None
):
pipeline_run_info = None
async for run_info in cognee_pipeline(
tasks=tasks, datasets=datasets, user=user, pipeline_name="cognify_pipeline"
tasks=tasks,
datasets=datasets,
user=user,
pipeline_name="cognify_pipeline",
datapoints=datapoints,
):
pipeline_run_info = run_info
@ -71,9 +80,14 @@ async def run_cognify_blocking(tasks, user, datasets, is_stream_info_enabled=Fal
return pipeline_run_info
async def run_cognify_as_background_process(tasks, user, datasets):
async def run_cognify_as_background_process(tasks, user, datasets, datapoints=None):
pipeline_run = cognee_pipeline(
tasks=tasks, user=user, datasets=datasets, pipeline_name="cognify_pipeline"
tasks=tasks,
user=user,
data=None,
datasets=datasets,
pipeline_name="cognify_pipeline",
datapoints=datapoints,
)
pipeline_run_started_info = await anext(pipeline_run)

View file

@ -23,6 +23,7 @@ class PipelineRunYield(PipelineRunInfo):
class PipelineRunCompleted(PipelineRunInfo):
status: str = "PipelineRunCompleted"
packets: dict = {}
class PipelineRunErrored(PipelineRunInfo):

View file

@ -1,11 +1,12 @@
import asyncio
from typing import Union
from typing import Union, Any
from uuid import NAMESPACE_OID, uuid5
from cognee.shared.logging_utils import get_logger
from cognee.modules.data.methods import get_datasets
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id
from cognee.modules.data.methods.get_data import get_data
from cognee.modules.data.models import Data, Dataset
from cognee.modules.pipelines.operations.run_tasks import run_tasks
from cognee.modules.pipelines.models import PipelineRunStatus
@ -33,6 +34,7 @@ async def cognee_pipeline(
datasets: Union[str, list[str]] = None,
user: User = None,
pipeline_name: str = "custom_pipeline",
datapoints: dict[str, Any] = None,
):
# Create tables for databases
await create_relational_db_and_tables()
@ -92,8 +94,21 @@ async def cognee_pipeline(
datasets = dataset_instances
for dataset in datasets:
if datapoints and datapoints.get(dataset.name, None):
data_to_pass = []
dataset_specific_datapoints = datapoints.get(dataset.name)
for data_id_to_cognify in dataset_specific_datapoints:
data_element_to_pass = await get_data(
user_id=dataset.owner_id, data_id=data_id_to_cognify
)
if data_element_to_pass:
data_to_pass.append(data_element_to_pass)
else:
data_to_pass = data
async for run_info in run_pipeline(
dataset=dataset, user=user, tasks=tasks, data=data, pipeline_name=pipeline_name
dataset=dataset, user=user, tasks=tasks, data=data_to_pass, pipeline_name=pipeline_name
):
yield run_info

View file

@ -20,6 +20,7 @@ from cognee.api.v1.add.config import get_s3_config
async def ingest_data(
data: Any, dataset_name: str, user: User, node_set: Optional[List[str]] = None
):
added_datapoints = []
destination = get_dlt_destination()
if not user:
@ -138,6 +139,7 @@ async def ingest_data(
node_set=json.dumps(node_set) if node_set else None,
token_count=-1,
)
added_datapoints.append(data_point)
# Check if data is already in dataset
dataset_data = (
@ -182,11 +184,4 @@ async def ingest_data(
write_disposition="merge",
)
datasets = await get_datasets_by_name(dataset_name, user.id)
# In case no files were processed no dataset will be created
if datasets:
dataset = datasets[0]
data_documents = await get_dataset_data(dataset_id=dataset.id)
return data_documents
return []
return added_datapoints

View file

@ -0,0 +1,46 @@
import cognee
import asyncio
from cognee.shared.logging_utils import get_logger, ERROR
from cognee.modules.metrics.operations import get_pipeline_run_metrics
from cognee.api.v1.search import SearchType
text_1_to_cognify = """Germany is located next to the Netherlands."""
text_2_to_cognify = (
"""Skoda is a czech car manufacturer which is the part of the Volkswagen group"""
)
text_3_not_to_cognify = "This text should not be cognified with cognee which is an AI memory engine"
async def main():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
text_1 = await cognee.add(text_1_to_cognify)
await cognee.cognify(datapoints=text_1.packets)
text_2 = await cognee.add(text_2_to_cognify)
await cognee.cognify(datapoints=text_2.packets)
await cognee.add(text_3_not_to_cognify)
search_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION, query_text="What is cognee?"
)
print(search_results)
if __name__ == "__main__":
logger = get_logger(level=ERROR)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())