feat: cognify dataset from add step

This commit is contained in:
Boris Arzentar 2024-03-12 22:55:56 +01:00
parent d20f1bf0ab
commit 9448e36201
12 changed files with 435 additions and 268 deletions

View file

@ -1,137 +0,0 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "823c799a",
"metadata": {},
"outputs": [],
"source": [
"from os import listdir, path\n",
"from uuid import uuid5, UUID\n",
"from cognitive_architecture import add\n",
"\n",
"data_path = path.abspath(\".data\")\n",
"pdf_files = [file for file in listdir(data_path) if path.isfile(path.join(data_path, file))]\n",
"\n",
"await add(\n",
" list(map(\n",
" lambda file_path: f\"file://{path.join(data_path, file_path)}\",\n",
" pdf_files\n",
" ))[:3],\n",
" uuid5(UUID(\"00000000-0000-0000-0000-000000000000\"), \"pdf_files_cognee\"),\n",
" \"test-dataset\"\n",
")\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c4d5a399",
"metadata": {},
"outputs": [],
"source": [
"from os import listdir, path\n",
"from uuid import uuid5, UUID\n",
"from cognitive_architecture import add_dlt\n",
"\n",
"data_path = path.abspath(\".data\")\n",
"# pdf_files = [file for file in listdir(data_path) if path.isfile(path.join(data_path, file))]\n",
"\n",
"# await add_dlt(\n",
"# list(map(\n",
"# lambda file_path: f\"file://{path.join(data_path, file_path)}\",\n",
"# pdf_files\n",
"# ))[:5],\n",
"# \"pdf_files\"\n",
"# )\n",
"\n",
"results = await add_dlt(data_path, \"pravilnik.energetska_efikasnost\")\n",
"for result in results:\n",
" print(result)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "47edce91",
"metadata": {},
"outputs": [],
"source": [
"import duckdb\n",
"from cognitive_architecture.root_dir import get_absolute_path\n",
"\n",
"dataset_name = \"pdf_files\"\n",
"\n",
"db_path = get_absolute_path(\"./data/cognee\")\n",
"db_location = db_path + \"/cognee.duckdb\"\n",
"print(db_location)\n",
"\n",
"db = duckdb.connect(db_location)\n",
"\n",
"izmene = db.sql(f\"SELECT * FROM izmene.file_metadata;\")\n",
"\n",
"print(izmene)\n",
"\n",
"pravilnik = db.sql(f\"SELECT * FROM pravilnik.file_metadata;\")\n",
"\n",
"print(pravilnik)\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "607bf624",
"metadata": {},
"outputs": [],
"source": [
"from os import path, listdir\n",
"from cognitive_architecture import cognify\n",
"from unstructured.cleaners.core import clean\n",
"from unstructured.partition.pdf import partition_pdf\n",
"from cognitive_architecture.utils import render_graph\n",
"\n",
"data_path = path.abspath(\".data/izmene\")\n",
"pdf_files = [file for file in listdir(data_path) if path.isfile(path.join(data_path, file))]\n",
"\n",
"with open(path.join(data_path, pdf_files[0]), mode = \"rb\") as file:\n",
" # elements = partition_pdf(file = file, strategy = \"fast\")\n",
" # text = \"\\n\".join(map(lambda element: clean(element.text), elements))\n",
"\n",
" text = \"\"\"In the nicest possible way, Britons have always been a bit silly about animals. “Keeping pets, for the English, is not so much a leisure activity as it is an entire way of life,” wrote the anthropologist Kate Fox in Watching the English, nearly 20 years ago. Our dogs, in particular, have been an acceptable outlet for emotions and impulses we otherwise keep strictly controlled our latent desire to be demonstratively affectionate, to be silly and chat to strangers. If this seems like an exaggeration, consider the different reactions youd get if you struck up a conversation with someone in a park with a dog, versus someone on the train.\n",
"Indeed, British society has been set up to accommodate these four-legged ambassadors. In the UK unlike Australia, say, or New Zealand dogs are not just permitted on public transport but often openly encouraged. Many pubs and shops display waggish signs, reading, “Dogs welcome, people tolerated”, and have treat jars on their counters. The other day, as I was waiting outside a cafe with a friends dog, the barista urged me to bring her inside.\n",
"For years, Britons non-partisan passion for animals has been consistent amid dwindling common ground. But lately, rather than bringing out the best in us, our relationship with dogs is increasingly revealing us at our worst and our supposed “best friends” are paying the price.\n",
"As with so many latent traits in the national psyche, it all came unleashed with the pandemic, when many people thought they might as well make the most of all that time at home and in local parks with a dog. Between 2019 and 2022, the number of pet dogs in the UK rose from about nine million to 13 million. But theres long been a seasonal surge around this time of year, substantial enough for the Dogs Trust charity to coin its famous slogan back in 1978: “A dog is for life, not just for Christmas.”\"\"\"\n",
"\n",
" graph = await cognify(text)\n",
"\n",
" graph_url = await render_graph(graph, graph_type = \"networkx\")\n",
" print(graph_url)\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

254
cognee.ipynb Normal file

File diff suppressed because one or more lines are too long

View file

@ -1,3 +1,3 @@
from .api.v1.add.add import add
from .api.v1.add.add_dlt import add_dlt
from .api.v1.cognify.cognify import cognify
from .api.v1.list_datasets.list_datasets import list_datasets

View file

@ -1,53 +1,91 @@
from typing import List, Union
from os import path, listdir
import asyncio
from uuid import UUID, uuid4
from typing import Union, BinaryIO, List
import dlt
import duckdb
from unstructured.cleaners.core import clean
from cognitive_architecture.root_dir import get_absolute_path
import cognitive_architecture.modules.ingestion as ingestion
from cognitive_architecture.infrastructure import infrastructure_config
from cognitive_architecture.infrastructure.files import get_file_metadata
from cognitive_architecture.infrastructure.files.storage import LocalStorage
class DatasetException(Exception):
message: str
async def add(file_paths: Union[str, List[str]], dataset_name: str = None):
if isinstance(file_paths, str):
# Directory path provided, we need to extract the file paths and dataset name
def __init__(self, message: str):
self.message = message
def list_dir_files(root_dir_path: str, parent_dir: str = "root"):
datasets = {}
for file_or_dir in listdir(root_dir_path):
if path.isdir(path.join(root_dir_path, file_or_dir)):
dataset_name = file_or_dir if parent_dir == "root" else parent_dir + "." + file_or_dir
dataset_name = clean(dataset_name.replace(" ", "_"))
nested_datasets = list_dir_files(path.join(root_dir_path, file_or_dir), dataset_name)
for dataset in nested_datasets:
datasets[dataset] = nested_datasets[dataset]
else:
if parent_dir not in datasets:
datasets[parent_dir] = []
datasets[parent_dir].append(path.join(root_dir_path, file_or_dir))
return datasets
datasets = list_dir_files(file_paths)
results = []
for key in datasets:
if dataset_name is not None and not key.startswith(dataset_name):
continue
results.append(add_dlt(datasets[key], dataset_name = key))
return await asyncio.gather(*results)
async def add(
data: Union[str, BinaryIO, List[Union[str, BinaryIO]]],
dataset_id: UUID = uuid4(),
dataset_name: str = None
):
db_engine = infrastructure_config.get_config()["database_engine"]
if db_engine.is_db_done is not True:
await db_engine.ensure_tables()
db_path = get_absolute_path("./data/cognee")
db_location = f"{db_path}/cognee.duckdb"
if not data:
raise DatasetException("Data must be provided to cognee.add(data: str)")
LocalStorage.ensure_directory_exists(db_path)
if isinstance(data, list):
promises = []
db = duckdb.connect(db_location)
for data_item in data:
promises.append(add(data_item, dataset_id, dataset_name))
destination = dlt.destinations.duckdb(
credentials = db,
)
results = await asyncio.gather(*promises)
pipeline = dlt.pipeline(
pipeline_name = "file_load_from_filesystem",
destination = destination,
)
return results
@dlt.resource(standalone = True, merge_key = "id")
def data_resources(file_paths: str):
for file_path in file_paths:
with open(file_path.replace("file://", ""), mode = "rb") as file:
classified_data = ingestion.classify(file)
data_id = ingestion.identify(classified_data)
if is_data_path(data):
with open(data.replace("file://", ""), "rb") as file:
return await add(file, dataset_id, dataset_name)
file_metadata = get_file_metadata(classified_data.get_data())
classified_data = ingestion.classify(data)
yield {
"id": data_id,
"name": file_metadata["name"],
"file_path": file_metadata["file_path"],
"extension": file_metadata["extension"],
"mime_type": file_metadata["mime_type"],
"keywords": "|".join(file_metadata["keywords"]),
}
data_id = ingestion.identify(classified_data)
run_info = pipeline.run(
data_resources(file_paths),
table_name = "file_metadata",
dataset_name = dataset_name,
write_disposition = "merge",
)
await ingestion.save(dataset_id, dataset_name, data_id, classified_data)
return dataset_id
# await ingestion.vectorize(dataset_id, dataset_name, data_id, classified_data)
def is_data_path(data: str) -> bool:
return False if not isinstance(data, str) else data.startswith("file://")
return run_info

View file

@ -1,91 +0,0 @@
from typing import List, Union
from os import path, listdir
import asyncio
import dlt
import duckdb
from unstructured.cleaners.core import clean
from cognitive_architecture.root_dir import get_absolute_path
import cognitive_architecture.modules.ingestion as ingestion
from cognitive_architecture.infrastructure.files import get_file_metadata
from cognitive_architecture.infrastructure.files.storage import LocalStorage
async def add_dlt(file_paths: Union[str, List[str]], dataset_name: str = None):
if isinstance(file_paths, str):
# Directory path provided, we need to extract the file paths and dataset name
def list_dir_files(root_dir_path: str, parent_dir: str = "root"):
datasets = {}
for file_or_dir in listdir(root_dir_path):
if path.isdir(path.join(root_dir_path, file_or_dir)):
dataset_name = file_or_dir if parent_dir == "root" else parent_dir + "." + file_or_dir
dataset_name = clean(dataset_name.replace(" ", "_"))
nested_datasets = list_dir_files(path.join(root_dir_path, file_or_dir), dataset_name)
for dataset in nested_datasets:
datasets[dataset] = nested_datasets[dataset]
else:
if parent_dir not in datasets:
datasets[parent_dir] = []
datasets[parent_dir].append(path.join(root_dir_path, file_or_dir))
return datasets
datasets = list_dir_files(file_paths)
results = []
for key in datasets:
if dataset_name is not None and not key.startswith(dataset_name):
continue
results.append(add_dlt(datasets[key], dataset_name = key))
return await asyncio.gather(*results)
db_path = get_absolute_path("./data/cognee")
db_location = f"{db_path}/cognee.duckdb"
LocalStorage.ensure_directory_exists(db_path)
db = duckdb.connect(db_location)
destination = dlt.destinations.duckdb(
credentials = db,
)
pipeline = dlt.pipeline(
pipeline_name = "file_load_from_filesystem",
destination = destination,
)
@dlt.resource(standalone = True, merge_key = "id")
def data_resources(file_paths: str):
for file_path in file_paths:
with open(file_path.replace("file://", ""), mode = "rb") as file:
classified_data = ingestion.classify(file)
data_id = ingestion.identify(classified_data)
file_metadata = get_file_metadata(classified_data.get_data())
yield {
"id": data_id,
"name": file_metadata["name"],
"file_path": file_metadata["file_path"],
"extension": file_metadata["extension"],
"mime_type": file_metadata["mime_type"],
"keywords": "|".join(file_metadata["keywords"]),
}
run_info = pipeline.run(
data_resources(file_paths),
table_name = "file_metadata",
dataset_name = dataset_name,
write_disposition = "merge",
)
return run_info

View file

@ -0,0 +1,53 @@
import asyncio
from uuid import UUID, uuid4
from typing import Union, BinaryIO, List
import cognitive_architecture.modules.ingestion as ingestion
from cognitive_architecture.infrastructure import infrastructure_config
class DatasetException(Exception):
message: str
def __init__(self, message: str):
self.message = message
async def add_standalone(
data: Union[str, BinaryIO, List[Union[str, BinaryIO]]],
dataset_id: UUID = uuid4(),
dataset_name: str = None
):
db_engine = infrastructure_config.get_config()["database_engine"]
if db_engine.is_db_done is not True:
await db_engine.ensure_tables()
if not data:
raise DatasetException("Data must be provided to cognee.add(data: str)")
if isinstance(data, list):
promises = []
for data_item in data:
promises.append(add(data_item, dataset_id, dataset_name))
results = await asyncio.gather(*promises)
return results
if is_data_path(data):
with open(data.replace("file://", ""), "rb") as file:
return await add(file, dataset_id, dataset_name)
classified_data = ingestion.classify(data)
data_id = ingestion.identify(classified_data)
await ingestion.save(dataset_id, dataset_name, data_id, classified_data)
return dataset_id
# await ingestion.vectorize(dataset_id, dataset_name, data_id, classified_data)
def is_data_path(data: str) -> bool:
return False if not isinstance(data, str) else data.startswith("file://")

View file

@ -4,6 +4,8 @@ from typing import List
from qdrant_client import models
import instructor
from openai import OpenAI
from unstructured.cleaners.core import clean
from unstructured.partition.pdf import partition_pdf
from cognitive_architecture.infrastructure.databases.vector.qdrant.QDrantAdapter import CollectionConfig
from cognitive_architecture.infrastructure.llm.get_llm_client import get_llm_client
from cognitive_architecture.modules.cognify.graph.add_classification_nodes import add_classification_nodes
@ -23,15 +25,34 @@ from cognitive_architecture.modules.cognify.graph.create import create_semantic_
from cognitive_architecture.infrastructure.databases.graph.get_graph_client import get_graph_client
from cognitive_architecture.shared.data_models import GraphDBType
from cognitive_architecture.infrastructure.databases.vector.get_vector_database import get_vector_database
from cognitive_architecture.infrastructure.databases.relational import DuckDBAdapter
config = Config()
config.load()
aclient = instructor.patch(OpenAI())
async def cognify(input_text : str):
async def cognify(dataset_name: str):
"""This function is responsible for the cognitive processing of the content."""
db = DuckDBAdapter()
files_metadata = db.get_files_metadata(dataset_name)
files = list(files_metadata["file_path"].values())
awaitables = []
for file in files:
with open(file, "rb") as file:
elements = partition_pdf(file = file, strategy = "fast")
text = "\n".join(map(lambda element: clean(element.text), elements))
awaitables.append(process_text(text))
graphs = await asyncio.gather(*awaitables)
return graphs[0]
async def process_text(input_text: str):
classified_categories = None
try:
@ -55,7 +76,7 @@ async def cognify(input_text : str):
async def generate_graph_per_layer(text_input: str, layers: List[str], response_model: KnowledgeGraph = KnowledgeGraph):
generate_graphs_awaitables = [generate_graph(text_input, "generate_graph_prompt.txt", {"layer": layer}, response_model) for layer in
layers]
layers]
return await asyncio.gather(*generate_graphs_awaitables)

View file

@ -0,0 +1,6 @@
from cognitive_architecture.infrastructure.databases.relational import DuckDBAdapter
def list_datasets():
db = DuckDBAdapter()
return db.get_datasets()

View file

@ -1,3 +1,4 @@
from .ModelBase import ModelBase
from .DatabaseEngine import DatabaseEngine
from .sqlite.SqliteEngine import SqliteEngine
from .duckdb.DuckDBAdapter import DuckDBAdapter

View file

@ -0,0 +1,22 @@
import duckdb
from cognitive_architecture.root_dir import get_absolute_path
class DuckDBAdapter():
def __init__(self):
db_path = get_absolute_path("./data/cognee")
db_location = db_path + "/cognee.duckdb"
self.db_client = duckdb.connect(db_location)
def get_datasets(self):
tables = self.db_client.sql("SELECT DISTINCT schema_name FROM duckdb_tables();").to_df().to_dict()
return list(
filter(
lambda table_name: table_name.endswith('staging') is False,
tables["schema_name"].values()
)
)
def get_files_metadata(self, dataset_name: str):
return self.db_client.sql(f"SELECT * FROM {dataset_name}.file_metadata;").to_df().to_dict()