fix: add dataset and data models

This commit is contained in:
Boris Arzentar 2024-08-05 23:42:36 +02:00
parent 6035010036
commit 709a10c50c
41 changed files with 422 additions and 359 deletions

View file

@ -39,6 +39,7 @@ async def lifespan(app: FastAPI):
app = FastAPI(debug = os.getenv("ENV") != "prod", lifespan = lifespan)
origins = [
"http://127.0.0.1:3000",
"http://frontend:3000",
"http://localhost:3000",
"http://localhost:3001",
@ -107,21 +108,33 @@ def health_check():
"""
return {"status": "OK"}
class Payload(BaseModel):
payload: Dict[str, Any]
@app.get("/datasets", response_model=list)
async def get_datasets():
from cognee.api.v1.datasets.datasets import datasets
return await datasets.list_datasets()
try:
from cognee.api.v1.datasets.datasets import datasets
datasets = await datasets.list_datasets()
return JSONResponse(
status_code = 200,
content = [{
"id": str(dataset.id),
"name": dataset.name,
"created_at": dataset.created_at,
"updated_at": dataset.updated_at,
"data": dataset.data,
} for dataset in datasets],
)
except Exception as error:
raise HTTPException(status_code = 500, detail=f"Error retrieving datasets: {str(error)}") from error
@app.delete("/datasets/{dataset_id}", response_model=dict)
async def delete_dataset(dataset_id: str):
from cognee.api.v1.datasets.datasets import datasets
datasets.delete_dataset(dataset_id)
await datasets.delete_dataset(dataset_id)
return JSONResponse(
status_code=200,
content="OK",
status_code = 200,
content = "OK",
)
@app.get("/datasets/{dataset_id}/graph", response_model=list)
@ -146,7 +159,7 @@ async def get_dataset_graph(dataset_id: str):
@app.get("/datasets/{dataset_id}/data", response_model=list)
async def get_dataset_data(dataset_id: str):
from cognee.api.v1.datasets.datasets import datasets
dataset_data = datasets.list_data(dataset_id)
dataset_data = await datasets.list_data(dataset_id)
if dataset_data is None:
raise HTTPException(status_code=404, detail=f"Dataset ({dataset_id}) not found.")
return [
@ -162,17 +175,24 @@ async def get_dataset_data(dataset_id: str):
@app.get("/datasets/status", response_model=dict)
async def get_dataset_status(datasets: Annotated[List[str], Query(alias="dataset")] = None):
from cognee.api.v1.datasets.datasets import datasets as cognee_datasets
datasets_statuses = cognee_datasets.get_status(datasets)
return JSONResponse(
status_code = 200,
content = datasets_statuses,
)
try:
datasets_statuses = await cognee_datasets.get_status(datasets)
return JSONResponse(
status_code = 200,
content = datasets_statuses,
)
except Exception as error:
return JSONResponse(
status_code = 409,
content = {"error": str(error)}
)
@app.get("/datasets/{dataset_id}/data/{data_id}/raw", response_class=FileResponse)
async def get_raw_data(dataset_id: str, data_id: str):
from cognee.api.v1.datasets.datasets import datasets
dataset_data = datasets.list_data(dataset_id)
dataset_data = await datasets.list_data(dataset_id)
if dataset_data is None:
raise HTTPException(status_code=404, detail=f"Dataset ({dataset_id}) not found.")
data = [data for data in dataset_data if data["id"] == data_id][0]
@ -312,10 +332,10 @@ def start_api_server(host: str = "0.0.0.0", port: int = 8000):
try:
logger.info("Starting server at %s:%s", host, port)
import asyncio
from cognee.modules.data.deletion import prune_system, prune_data
asyncio.run(prune_data())
asyncio.run(prune_system(metadata = True))
# import asyncio
# from cognee.modules.data.deletion import prune_system, prune_data
# asyncio.run(prune_data())
# asyncio.run(prune_system(metadata = True))
uvicorn.run(app, host = host, port = port)
except Exception as e:

View file

@ -9,10 +9,11 @@ from cognee.infrastructure.files.storage import LocalStorage
from cognee.modules.ingestion import get_matched_datasets, save_data_to_file
from cognee.shared.utils import send_telemetry
from cognee.base_config import get_base_config
from cognee.infrastructure.databases.relational import get_relational_config, create_db_and_tables
from cognee.modules.users.methods import create_default_user
from cognee.infrastructure.databases.relational import get_relational_config, get_relational_engine, create_db_and_tables
from cognee.modules.users.methods import create_default_user, get_default_user
from cognee.modules.users.permissions.methods import give_permission_on_document
from cognee.modules.users.models import User
from cognee.modules.data.operations.ensure_dataset_exists import ensure_dataset_exists
async def add(data: Union[BinaryIO, List[BinaryIO], str, List[str]], dataset_name: str = "main_dataset", user: User = None):
await create_db_and_tables()
@ -99,6 +100,9 @@ async def add_files(file_paths: List[str], dataset_name: str, user):
destination = destination,
)
dataset_name = dataset_name.replace(" ", "_").replace(".", "_") if dataset_name is not None else "main_dataset"
dataset = await ensure_dataset_exists(dataset_name)
@dlt.resource(standalone = True, merge_key = "id")
async def data_resources(file_paths: str, user: User):
for file_path in file_paths:
@ -107,16 +111,34 @@ async def add_files(file_paths: List[str], dataset_name: str, user):
data_id = ingestion.identify(classified_data)
if user is None:
try:
user = await create_default_user()
await give_permission_on_document(user, data_id, "read")
await give_permission_on_document(user, data_id, "write")
except:
pass
file_metadata = classified_data.get_metadata()
from sqlalchemy import select
from cognee.modules.data.models import Data
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
data = (await session.execute(
select(Data).filter(Data.id == data_id)
)).scalar_one_or_none()
if data is not None:
data.name = file_metadata["name"]
data.raw_data_location = file_metadata["file_path"]
data.extension = file_metadata["extension"]
data.mime_type = file_metadata["mime_type"]
await session.merge(data)
else:
data = Data(
name = file_metadata["name"],
raw_data_location = file_metadata["file_path"],
extension = file_metadata["extension"],
mime_type = file_metadata["mime_type"],
)
dataset.data.append(data)
await session.merge(dataset)
yield {
"id": data_id,
"name": file_metadata["name"],
@ -125,10 +147,20 @@ async def add_files(file_paths: List[str], dataset_name: str, user):
"mime_type": file_metadata["mime_type"],
}
await give_permission_on_document(user, data_id, "read")
await give_permission_on_document(user, data_id, "write")
if user is None:
user = await get_default_user()
if user is None:
user = await create_default_user()
run_info = pipeline.run(
data_resources(processed_file_paths, user),
table_name = "file_metadata",
dataset_name = dataset_name.replace(" ", "_").replace(".", "_") if dataset_name is not None else "main_dataset",
dataset_name = dataset_name,
write_disposition = "merge",
)
send_telemetry("cognee.add")

View file

@ -19,10 +19,11 @@ from cognee.modules.data.extraction.knowledge_graph.expand_knowledge_graph impor
from cognee.modules.data.extraction.knowledge_graph.establish_graph_topology import establish_graph_topology
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.pipelines import run_tasks, run_tasks_parallel
from cognee.modules.tasks import create_task_status_table, update_task_status, get_task_status
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.permissions.methods import check_permissions_on_documents
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.modules.pipelines.operations.log_pipeline_status import log_pipeline_status
logger = logging.getLogger("cognify.v2")
@ -35,97 +36,109 @@ class PermissionDeniedException(Exception):
async def cognify(datasets: Union[str, list[str]] = None, user: User = None):
db_engine = get_relational_engine()
await create_task_status_table()
if datasets is None or len(datasets) == 0:
return await cognify(await db_engine.get_datasets())
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
if user is None:
user = await get_default_user()
if user is None:
user = await get_default_user(session= session)
async def run_cognify_pipeline(dataset_name: str, files: list[dict]):
documents = [
PdfDocument(id = file["id"], title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "pdf" else
AudioDocument(id = file["id"], title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "audio" else
ImageDocument(id = file["id"], title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "image" else
TextDocument(id = file["id"], title=f"{file['name']}.{file['extension']}", file_path=file["file_path"])
for file in files
]
async def run_cognify_pipeline(dataset_name: str, files: list[dict]):
documents = [
PdfDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "pdf" else
AudioDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "audio" else
ImageDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "image" else
TextDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"])
for file in files
document_ids = [document.id for document in documents]
await check_permissions_on_documents(
user,
"read",
document_ids,
)
async with update_status_lock:
task_status = await get_pipeline_status([dataset_name])
if dataset_name in task_status and task_status[dataset_name] == "DATASET_PROCESSING_STARTED":
logger.info(f"Dataset {dataset_name} is being processed.")
return
await log_pipeline_status(dataset_name, "DATASET_PROCESSING_STARTED", {
"dataset_name": dataset_name,
"files": document_ids,
})
try:
cognee_config = get_cognify_config()
graph_config = get_graph_config()
root_node_id = None
if graph_config.infer_graph_topology and graph_config.graph_topology_task:
from cognee.modules.topology.topology import TopologyEngine
topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology)
root_node_id = await topology_engine.add_graph_topology(files = files)
elif graph_config.infer_graph_topology and not graph_config.infer_graph_topology:
from cognee.modules.topology.topology import TopologyEngine
topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology)
await topology_engine.add_graph_topology(graph_config.topology_file_path)
elif not graph_config.graph_topology_task:
root_node_id = "ROOT"
tasks = [
Task(process_documents, parent_node_id = root_node_id), # Classify documents and save them as a nodes in graph db, extract text chunks based on the document type
Task(establish_graph_topology, topology_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Set the graph topology for the document chunk data
Task(expand_knowledge_graph, graph_model = KnowledgeGraph, collection_name = "entities"), # Generate knowledge graphs from the document chunks and attach it to chunk nodes
Task(filter_affected_chunks, collection_name = "chunks"), # Find all affected chunks, so we don't process unchanged chunks
Task(
save_data_chunks,
collection_name = "chunks",
), # Save the document chunks in vector db and as nodes in graph db (connected to the document node and between each other)
run_tasks_parallel([
Task(
summarize_text_chunks,
summarization_model = cognee_config.summarization_model,
collection_name = "chunk_summaries",
), # Summarize the document chunks
Task(
classify_text_chunks,
classification_model = cognee_config.classification_model,
),
]),
Task(remove_obsolete_chunks), # Remove the obsolete document chunks.
]
await check_permissions_on_documents(user, "read", [document.id for document in documents], session=session)
pipeline = run_tasks(tasks, documents)
async with update_status_lock:
task_status = get_task_status([dataset_name])
async for result in pipeline:
print(result)
if dataset_name in task_status and task_status[dataset_name] == "DATASET_PROCESSING_STARTED":
logger.info(f"Dataset {dataset_name} is being processed.")
return
update_task_status(dataset_name, "DATASET_PROCESSING_STARTED")
try:
cognee_config = get_cognify_config()
graph_config = get_graph_config()
root_node_id = None
if graph_config.infer_graph_topology and graph_config.graph_topology_task:
from cognee.modules.topology.topology import TopologyEngine
topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology)
root_node_id = await topology_engine.add_graph_topology(files = files)
elif graph_config.infer_graph_topology and not graph_config.infer_graph_topology:
from cognee.modules.topology.topology import TopologyEngine
topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology)
await topology_engine.add_graph_topology(graph_config.topology_file_path)
elif not graph_config.graph_topology_task:
root_node_id = "ROOT"
tasks = [
Task(process_documents, parent_node_id = root_node_id), # Classify documents and save them as a nodes in graph db, extract text chunks based on the document type
Task(establish_graph_topology, topology_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Set the graph topology for the document chunk data
Task(expand_knowledge_graph, graph_model = KnowledgeGraph, collection_name = "entities"), # Generate knowledge graphs from the document chunks and attach it to chunk nodes
Task(filter_affected_chunks, collection_name = "chunks"), # Find all affected chunks, so we don't process unchanged chunks
Task(
save_data_chunks,
collection_name = "chunks",
), # Save the document chunks in vector db and as nodes in graph db (connected to the document node and between each other)
run_tasks_parallel([
Task(
summarize_text_chunks,
summarization_model = cognee_config.summarization_model,
collection_name = "chunk_summaries",
), # Summarize the document chunks
Task(
classify_text_chunks,
classification_model = cognee_config.classification_model,
),
]),
Task(remove_obsolete_chunks), # Remove the obsolete document chunks.
]
pipeline = run_tasks(tasks, documents)
async for result in pipeline:
print(result)
update_task_status(dataset_name, "DATASET_PROCESSING_FINISHED")
except Exception as error:
update_task_status(dataset_name, "DATASET_PROCESSING_ERROR")
raise error
await log_pipeline_status(dataset_name, "DATASET_PROCESSING_FINISHED", {
"dataset_name": dataset_name,
"files": document_ids,
})
except Exception as error:
await log_pipeline_status(dataset_name, "DATASET_PROCESSING_ERROR", {
"dataset_name": dataset_name,
"files": document_ids,
})
raise error
existing_datasets = await db_engine.get_datasets()
awaitables = []
existing_datasets = [dataset.name for dataset in list(await db_engine.get_datasets())]
awaitables = []
for dataset in datasets:
dataset_name = generate_dataset_name(dataset)
for dataset_name in datasets:
dataset_name = generate_dataset_name(dataset_name)
if dataset_name in existing_datasets:
awaitables.append(run_cognify_pipeline(dataset, await db_engine.get_files_metadata(dataset_name)))
if dataset_name in existing_datasets:
awaitables.append(run_cognify_pipeline(dataset_name, await db_engine.get_files_metadata(dataset_name)))
return await asyncio.gather(*awaitables)
return await asyncio.gather(*awaitables)
def generate_dataset_name(dataset_name: str) -> str:
return dataset_name.replace(".", "_").replace(" ", "_")

View file

@ -1,6 +1,6 @@
from duckdb import CatalogException
from cognee.modules.ingestion import discover_directory_datasets
from cognee.modules.tasks import get_task_status
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.infrastructure.databases.relational import get_relational_engine
class datasets():
@ -14,24 +14,24 @@ class datasets():
return list(discover_directory_datasets(directory_path).keys())
@staticmethod
def list_data(dataset_name: str):
async def list_data(dataset_name: str):
db = get_relational_engine()
try:
return db.get_files_metadata(dataset_name)
return await db.get_files_metadata(dataset_name)
except CatalogException:
return None
@staticmethod
def get_status(dataset_ids: list[str]) -> dict:
async def get_status(dataset_ids: list[str]) -> dict:
try:
return get_task_status(dataset_ids)
return await get_pipeline_status(dataset_ids)
except CatalogException:
return {}
@staticmethod
def delete_dataset(dataset_id: str):
async def delete_dataset(dataset_id: str):
db = get_relational_engine()
try:
return db.delete_table(dataset_id)
return await db.delete_table(dataset_id)
except CatalogException:
return {}

View file

@ -1,3 +0,0 @@
from .models.Data import Data
from .models.Dataset import Dataset
from .models.DatasetData import DatasetData

View file

@ -1,23 +0,0 @@
from typing import List
from datetime import datetime, timezone
from sqlalchemy.orm import relationship, MappedColumn, Mapped
from sqlalchemy import Column, String, DateTime, UUID, Text, JSON
from cognee.infrastructure.databases.relational import Base
from .DatasetData import DatasetData
class Data(Base):
__tablename__ = "data"
id = Column(UUID, primary_key = True)
name = Column(String, nullable = True)
description = Column(Text, nullable = True)
raw_data_location = Column(String)
meta_data: Mapped[dict] = MappedColumn(type_ = JSON) # metadata is reserved word
created_at = Column(DateTime, default = datetime.now(timezone.utc))
updated_at = Column(DateTime, onupdate = datetime.now(timezone.utc))
datasets: Mapped[List["Dataset"]] = relationship(
secondary = DatasetData.__tablename__,
back_populates = "data"
)

View file

@ -1,12 +0,0 @@
from uuid import uuid4
from datetime import datetime, timezone
from sqlalchemy import Column, DateTime, UUID, ForeignKey
from cognee.infrastructure.databases.relational import Base
class DatasetData(Base):
__tablename__ = "dataset_data"
id = Column(UUID, primary_key=True, default=uuid4)
dataset_id = Column(UUID, ForeignKey("dataset.id"), nullable=False)
data_id = Column(UUID, ForeignKey("data.id"), nullable=False)

View file

@ -2,11 +2,12 @@ import os
import asyncio
from typing import AsyncGenerator
from contextlib import asynccontextmanager
from sqlalchemy import create_engine, MetaData, Table, Column, String, Boolean, TIMESTAMP, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, text, select
from sqlalchemy.orm import sessionmaker, joinedload
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from cognee.infrastructure.files.storage import LocalStorage
from cognee.infrastructure.databases.relational.FakeAsyncSession import FakeAsyncSession
from ..ModelBase import Base
def make_async_sessionmaker(sessionmaker):
@asynccontextmanager
@ -19,6 +20,7 @@ def make_async_sessionmaker(sessionmaker):
class SQLAlchemyAdapter():
def __init__(self, db_type: str, db_path: str, db_name: str, db_user: str, db_password: str, db_host: str, db_port: str):
self.db_location = os.path.abspath(os.path.join(db_path, db_name))
self.db_name = db_name
if db_type == "duckdb":
LocalStorage.ensure_directory_exists(db_path)
@ -42,15 +44,11 @@ class SQLAlchemyAdapter():
yield session
async def get_datasets(self):
async with self.engine.connect() as connection:
result = await connection.execute(text("SELECT DISTINCT table_schema FROM information_schema.tables;"))
tables = [row[0] for row in result]
return list(
filter(
lambda table_schema: not table_schema.endswith("staging") and table_schema != "cognee",
tables
)
)
from cognee.modules.data.models import Dataset
async with self.get_async_session() as session:
datasets = (await session.execute(select(Dataset).options(joinedload(Dataset.data)))).unique().scalars().all()
return datasets
async def create_table(self, schema_name: str, table_name: str, table_config: list[dict]):
fields_query_parts = [f"{item['name']} {item['type']}" for item in table_config]
@ -100,65 +98,6 @@ class SQLAlchemyAdapter():
result = await connection.execute(text(query))
return [dict(row) for row in result]
async def load_cognify_data(self, data):
metadata = MetaData()
cognify_table = Table(
"cognify",
metadata,
Column("document_id", String),
Column("created_at", TIMESTAMP, server_default=text("CURRENT_TIMESTAMP")),
Column("updated_at", TIMESTAMP, nullable=True, default=None),
Column("processed", Boolean, default=False),
Column("document_id_target", String, nullable=True)
)
async with self.engine.begin() as connection:
await connection.run_sync(metadata.create_all)
insert_query = cognify_table.insert().values(document_id=text(":document_id"))
async with self.engine.connect() as connection:
await connection.execute(insert_query, data)
async def fetch_cognify_data(self, excluded_document_id: str):
async with self.engine.connect() as connection:
await connection.execute(text("""
CREATE TABLE IF NOT EXISTS cognify (
document_id VARCHAR,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT NULL,
processed BOOLEAN DEFAULT FALSE,
document_id_target VARCHAR NULL
);
"""))
query = text("""
SELECT document_id, created_at, updated_at, processed
FROM cognify
WHERE document_id != :excluded_document_id AND processed = FALSE;
""")
records = await connection.execute(query, {"excluded_document_id": excluded_document_id})
records = records.fetchall()
if records:
document_ids = tuple(record["document_id"] for record in records)
update_query = text("UPDATE cognify SET processed = TRUE WHERE document_id IN :document_ids;")
await connection.execute(update_query, {"document_ids": document_ids})
return [dict(record) for record in records]
async def delete_cognify_data(self):
async with self.engine.connect() as connection:
await connection.execute(text("""
CREATE TABLE IF NOT EXISTS cognify (
document_id VARCHAR,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT NULL,
processed BOOLEAN DEFAULT FALSE,
document_id_target VARCHAR NULL
);
"""))
await connection.execute(text("DELETE FROM cognify;"))
await connection.execute(text("DROP TABLE cognify;"))
async def drop_tables(self, connection):
try:
await connection.execute(text("DROP TABLE IF EXISTS group_permission CASCADE"))
@ -169,9 +108,11 @@ class SQLAlchemyAdapter():
print(f"Error dropping database tables: {e}")
async def delete_database(self):
async with self.engine.begin() as connection:
async with self.engine.connect() as connection:
try:
await self.drop_tables(connection)
print("Database tables dropped successfully.")
async with self.engine.begin() as connection:
await connection.run_sync(Base.metadata.drop_all)
print("Database deleted successfully.")
except Exception as e:
print(f"Error dropping database tables: {e}")
print(f"Error deleting database: {e}")

View file

@ -0,0 +1,25 @@
from uuid import uuid4
from typing import List
from datetime import datetime, timezone
from sqlalchemy.orm import relationship, Mapped
from sqlalchemy import Column, String, DateTime, UUID
from cognee.infrastructure.databases.relational import Base
from .DatasetData import DatasetData
class Data(Base):
__tablename__ = "data"
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
name = Column(String)
extension = Column(String)
mime_type = Column(String)
raw_data_location = Column(String)
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc))
datasets: Mapped[List["Dataset"]] = relationship(
secondary = DatasetData.__tablename__,
back_populates = "data"
)

View file

@ -1,3 +1,4 @@
from uuid import uuid4
from typing import List
from datetime import datetime, timezone
from sqlalchemy.orm import relationship, Mapped
@ -6,14 +7,14 @@ from cognee.infrastructure.databases.relational import Base
from .DatasetData import DatasetData
class Dataset(Base):
__tablename__ = "dataset"
__tablename__ = "datasets"
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
id = Column(UUID, primary_key = True)
name = Column(Text)
description = Column(Text, nullable = True)
created_at = Column(DateTime, default = datetime.now(timezone.utc))
updated_at = Column(DateTime, onupdate = datetime.now(timezone.utc))
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc))
data: Mapped[List["Data"]] = relationship(
secondary = DatasetData.__tablename__,

View file

@ -0,0 +1,11 @@
from datetime import datetime, timezone
from sqlalchemy import Column, DateTime, UUID, ForeignKey
from cognee.infrastructure.databases.relational import Base
class DatasetData(Base):
__tablename__ = "dataset_data"
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
dataset_id = Column(UUID(as_uuid = True), ForeignKey("datasets.id"), primary_key = True)
data_id = Column(UUID(as_uuid = True), ForeignKey("data.id"), primary_key = True)

View file

@ -0,0 +1,2 @@
from .Data import Data
from .Dataset import Dataset

View file

@ -0,0 +1,26 @@
from sqlalchemy import select
from sqlalchemy.orm import joinedload
from cognee.modules.data.models import Dataset
from cognee.infrastructure.databases.relational import get_relational_engine
async def ensure_dataset_exists(dataset_name: str) -> Dataset:
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
dataset = (await session.scalars(
select(Dataset)\
.options(joinedload(Dataset.data))\
.filter(Dataset.name == dataset_name)
)).first()
if dataset is None:
dataset = Dataset(
name = dataset_name,
data = []
)
session.add(dataset)
await session.commit()
return dataset

View file

@ -1,5 +1,5 @@
from uuid import uuid5, NAMESPACE_OID
from typing import Optional, Generator
from uuid import UUID, uuid5, NAMESPACE_OID
from typing import Optional
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.modules.data.chunking import chunk_by_paragraph
@ -7,10 +7,10 @@ from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChu
from cognee.modules.data.processing.document_types.Document import Document
class AudioReader:
id: str
id: UUID
file_path: str
def __init__(self, id: str, file_path: str):
def __init__(self, id: UUID, file_path: str):
self.id = id
self.file_path = file_path
self.llm_client = get_llm_client() # You can choose different models like "tiny", "base", "small", etc.
@ -87,13 +87,11 @@ class AudioDocument(Document):
title: str
file_path: str
def __init__(self, title: str, file_path: str):
self.id = uuid5(NAMESPACE_OID, title)
def __init__(self, id: UUID, title: str, file_path: str):
self.id = id or uuid5(NAMESPACE_OID, title)
self.title = title
self.file_path = file_path
reader = AudioReader(self.id, self.file_path)
def get_reader(self) -> AudioReader:
reader = AudioReader(self.id, self.file_path)
return reader

View file

@ -1,5 +1,5 @@
from uuid import uuid5, NAMESPACE_OID
from typing import Optional, Generator
from uuid import UUID, uuid5, NAMESPACE_OID
from typing import Optional
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.modules.data.chunking import chunk_by_paragraph
@ -7,10 +7,10 @@ from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChu
from cognee.modules.data.processing.document_types.Document import Document
class ImageReader:
id: str
id: UUID
file_path: str
def __init__(self, id: str, file_path: str):
def __init__(self, id: UUID, file_path: str):
self.id = id
self.file_path = file_path
self.llm_client = get_llm_client() # You can choose different models like "tiny", "base", "small", etc.
@ -24,10 +24,8 @@ class ImageReader:
# Transcribe the image file
result = self.llm_client.transcribe_image(self.file_path)
print("Transcription result: ", result.choices[0].message.content)
text = result.choices[0].message.content
# Simulate reading text in chunks as done in TextReader
def read_text_chunks(text, chunk_size):
for i in range(0, len(text), chunk_size):
@ -89,13 +87,11 @@ class ImageDocument(Document):
title: str
file_path: str
def __init__(self, title: str, file_path: str):
self.id = uuid5(NAMESPACE_OID, title)
def __init__(self, id: UUID, title: str, file_path: str):
self.id = id or uuid5(NAMESPACE_OID, title)
self.title = title
self.file_path = file_path
reader = ImageReader(self.id, self.file_path)
def get_reader(self) -> ImageReader:
reader = ImageReader(self.id, self.file_path)
return reader

View file

@ -1,6 +1,6 @@
# import pdfplumber
import logging
from uuid import uuid5, NAMESPACE_OID
from uuid import UUID, uuid5, NAMESPACE_OID
from typing import Optional
from pypdf import PdfReader as pypdf_PdfReader
from cognee.modules.data.chunking import chunk_by_paragraph
@ -8,10 +8,10 @@ from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChu
from .Document import Document
class PdfReader():
id: str
id: UUID
file_path: str
def __init__(self, id: str, file_path: str):
def __init__(self, id: UUID, file_path: str):
self.id = id
self.file_path = file_path
@ -86,8 +86,8 @@ class PdfDocument(Document):
num_pages: int
file_path: str
def __init__(self, title: str, file_path: str):
self.id = uuid5(NAMESPACE_OID, title)
def __init__(self, id: UUID, title: str, file_path: str):
self.id = id or uuid5(NAMESPACE_OID, title)
self.title = title
self.file_path = file_path
logging.debug("file_path: %s", self.file_path)

View file

@ -1,14 +1,14 @@
from uuid import uuid5, NAMESPACE_OID
from uuid import UUID, uuid5, NAMESPACE_OID
from typing import Optional
from cognee.modules.data.chunking import chunk_by_paragraph
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
from .Document import Document
class TextReader():
id: str
id: UUID
file_path: str
def __init__(self, id: str, file_path: str):
def __init__(self, id: UUID, file_path: str):
self.id = id
self.file_path = file_path
@ -90,8 +90,8 @@ class TextDocument(Document):
num_pages: int
file_path: str
def __init__(self, title: str, file_path: str):
self.id = uuid5(NAMESPACE_OID, title)
def __init__(self, id: UUID, title: str, file_path: str):
self.id = id or uuid5(NAMESPACE_OID, title)
self.title = title
self.file_path = file_path

View file

@ -17,7 +17,7 @@ class BinaryData(IngestionData):
def get_identifier(self):
metadata = self.get_metadata()
return self.name + "_" + metadata["mime_type"]
return self.name + "." + metadata["extension"]
def get_metadata(self):
self.ensure_metadata()

View file

@ -1,9 +1,7 @@
from uuid import uuid5, UUID
from uuid import uuid5, NAMESPACE_OID
from .data_types import IngestionData
null_uuid: UUID = UUID("00000000-0000-0000-0000-000000000000")
def identify(data: IngestionData) -> str:
data_id: str = data.get_identifier()
return str(uuid5(null_uuid, data_id)).replace("-", "")
return uuid5(NAMESPACE_OID, data_id)

View file

@ -8,7 +8,7 @@ from .PipelineTask import PipelineTask
class Pipeline(Base):
__tablename__ = "pipelines"
id = Column(UUID, primary_key = True, default = uuid4)
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
name = Column(String)
description = Column(Text, nullable = True)

View file

@ -6,10 +6,12 @@ from cognee.infrastructure.databases.relational import Base
class PipelineRun(Base):
__tablename__ = "pipeline_runs"
id = Column(UUID, primary_key = True, default = uuid4)
dataset_name = Column(String)
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
run_name = Column(String, index = True)
status = Column(String)
run_info = Column(JSON)

View file

@ -1,4 +1,3 @@
from uuid import uuid4
from datetime import datetime, timezone
from sqlalchemy import Column, DateTime, UUID, ForeignKey
from cognee.infrastructure.databases.relational import Base
@ -6,9 +5,7 @@ from cognee.infrastructure.databases.relational import Base
class PipelineTask(Base):
__tablename__ = "pipeline_task"
id = Column(UUID, primary_key = True, default = uuid4)
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
pipeline_id = Column("pipeline", UUID, ForeignKey("pipeline.id"), primary_key = True)
task_id = Column("task", UUID, ForeignKey("task.id"), primary_key = True)
pipeline_id = Column("pipeline", UUID(as_uuid = True), ForeignKey("pipeline.id"), primary_key = True)
task_id = Column("task", UUID(as_uuid = True), ForeignKey("task.id"), primary_key = True)

View file

@ -8,7 +8,7 @@ from .PipelineTask import PipelineTask
class Task(Base):
__tablename__ = "tasks"
id = Column(UUID, primary_key = True, default = uuid4)
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
name = Column(String)
description = Column(Text, nullable = True)

View file

@ -0,0 +1,17 @@
from uuid import uuid4
from datetime import datetime, timezone
from sqlalchemy import Column, UUID, DateTime, String, JSON
from cognee.infrastructure.databases.relational import Base
class TaskRun(Base):
__tablename__ = "task_runs"
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
task_name = Column(String)
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
status = Column(String)
run_info = Column(JSON)

View file

@ -0,0 +1 @@
from .PipelineRun import PipelineRun

View file

@ -1,4 +0,0 @@
from ..models import Pipeline, Task
def add_task(pipeline: Pipeline, task: Task):
pipeline.tasks.append(task)

View file

@ -0,0 +1,40 @@
from sqlalchemy import func, select
from sqlalchemy.orm import aliased
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models import PipelineRun
async def get_pipeline_status(pipeline_names: [str]):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
query = select(
PipelineRun,
func.row_number().over(
partition_by = PipelineRun.run_name,
order_by = PipelineRun.created_at.desc(),
).label("rn")
).filter(PipelineRun.run_name.in_(pipeline_names)).subquery()
aliased_pipeline_run = aliased(PipelineRun, query)
latest_runs = (
select(aliased_pipeline_run).filter(query.c.rn == 1)
)
runs = (await session.execute(latest_runs)).scalars().all()
pipeline_statuses = {
run.run_name: run.status for run in runs
}
return pipeline_statuses
# f"""SELECT data_id, status
# FROM (
# SELECT data_id, status, ROW_NUMBER() OVER (PARTITION BY data_id ORDER BY created_at DESC) as rn
# FROM cognee.cognee.task_runs
# WHERE data_id IN ({formatted_data_ids})
# ) t
# WHERE rn = 1;"""
# return { dataset["data_id"]: dataset["status"] for dataset in datasets_statuses }

View file

@ -0,0 +1,14 @@
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models.PipelineRun import PipelineRun
async def log_pipeline_status(run_name: str, status: str, run_info: dict):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
session.add(PipelineRun(
run_name = run_name,
status = status,
run_info = run_info,
))
await session.commit()

View file

@ -1,3 +0,0 @@
from .get_task_status import get_task_status
from .update_task_status import update_task_status
from .create_task_status_table import create_task_status_table

View file

@ -1,10 +0,0 @@
from cognee.infrastructure.databases.relational import get_relational_engine
async def create_task_status_table():
db_engine = get_relational_engine()
await db_engine.create_table("cognee", "cognee_task_status", [
dict(name="data_id", type="VARCHAR"),
dict(name="status", type="VARCHAR"),
dict(name="created_at", type="TIMESTAMP DEFAULT CURRENT_TIMESTAMP"),
])

View file

@ -1,18 +0,0 @@
from cognee.infrastructure.databases.relational import get_relational_engine
def get_task_status(data_ids: [str]):
db_engine = get_relational_engine()
formatted_data_ids = ", ".join([f"'{data_id}'" for data_id in data_ids])
datasets_statuses = db_engine.execute_query(
f"""SELECT data_id, status
FROM (
SELECT data_id, status, ROW_NUMBER() OVER (PARTITION BY data_id ORDER BY created_at DESC) as rn
FROM cognee.cognee.cognee_task_status
WHERE data_id IN ({formatted_data_ids})
) t
WHERE rn = 1;"""
)
return { dataset["data_id"]: dataset["status"] for dataset in datasets_statuses }

View file

@ -1,5 +0,0 @@
from cognee.infrastructure.databases.relational import get_relational_engine
def update_task_status(data_id: str, status: str):
db_engine = get_relational_engine()
db_engine.insert_data("cognee.cognee", "cognee_task_status", [dict(data_id = data_id, status = status)])

View file

@ -1,5 +1,4 @@
import hashlib
# from cognee.infrastructure.databases.relational import get_relational_engine
from .create_user import create_user
async def create_default_user():
@ -12,12 +11,9 @@ async def create_default_user():
is_superuser = True,
is_active = True,
is_verified = True,
auto_login = True,
)
# db_engine = get_relational_engine()
# async with db_engine.get_async_session() as session:
# await session.refresh(user)
return user
async def hash_password(password: str) -> str:

View file

@ -10,6 +10,7 @@ async def create_user(
is_superuser: bool = False,
is_active: bool = True,
is_verified: bool = False,
auto_login: bool = False,
):
try:
relational_engine = get_relational_engine()
@ -26,6 +27,10 @@ async def create_user(
is_verified = is_verified,
)
)
if auto_login:
await session.refresh(user)
return user
print(f"User created: {user.email}")
except UserAlreadyExists as error:

View file

@ -1,12 +1,16 @@
from sqlalchemy.orm import joinedload
from sqlalchemy.future import select
from cognee.modules.users.models import User
from cognee.infrastructure.databases.relational import get_relational_engine
from sqlalchemy.future import select
async def get_default_user():
db_engine = get_relational_engine()
async def get_default_user(session):
stmt = select(User).options(joinedload(User.groups)).where(User.email == "default_user@example.com")
result = await session.execute(stmt)
user = result.scalars().first()
return user
async with db_engine.get_async_session() as session:
query = select(User).options(joinedload(User.groups))\
.where(User.email == "default_user@example.com")
result = await session.execute(query)
user = result.scalars().first()
return user

View file

@ -1,8 +1,8 @@
import logging
from sqlalchemy import select
from sqlalchemy.orm import joinedload
from cognee.infrastructure.databases.relational import get_relational_engine
from ...models.User import User
from ...models.ACL import ACL
@ -14,24 +14,26 @@ class PermissionDeniedException(Exception):
super().__init__(self.message)
async def check_permissions_on_documents(user: User, permission_type: str, document_ids: list[str], session):
logging.info("This is the user: %s", user.__dict__)
async def check_permissions_on_documents(user: User, permission_type: str, document_ids: list[str]):
try:
user_group_ids = [group.id for group in user.groups]
acls = await session.execute(
select(ACL)
.join(ACL.permission)
.where(ACL.principal_id.in_([user.id, *user_group_ids]))
.where(ACL.permission.has(name=permission_type))
)
resource_ids = [resource.resource_id for acl in acls.scalars().all() for resource in acl.resources]
has_permissions = all(document_id in resource_ids for document_id in document_ids)
db_engine = get_relational_engine()
if not has_permissions:
raise PermissionDeniedException(f"User {user.username} does not have {permission_type} permission on documents")
async with db_engine.get_async_session() as session:
result = await session.execute(
select(ACL)
.join(ACL.permission)
.options(joinedload(ACL.resources))
.where(ACL.principal_id.in_([user.id, *user_group_ids]))
.where(ACL.permission.has(name = permission_type))
)
acls = result.unique().scalars().all()
resource_ids = [str(resource.resource_id) for acl in acls for resource in acl.resources]
has_permissions = all(document_id in resource_ids for document_id in document_ids)
if not has_permissions:
raise PermissionDeniedException(f"User {user.username} does not have {permission_type} permission on documents")
except Exception as error:
logger.error("Error checking permissions on documents: %s", str(error))
raise

View file

@ -16,7 +16,7 @@ async def get_document_ids_for_user(user_id: UUID) -> list[str]:
ACL.permission.name == "read",
)
)
document_ids = [row[0] for row in result.fetchall()]
document_ids = [row[0] for row in result.scalars().all()]
return document_ids

View file

@ -12,18 +12,18 @@ async def give_permission_on_document(
document_resource = Resource(resource_id = document_id)
async with db_engine.get_async_session() as session:
permission = (await session.execute(select(Permission).filter(Permission.name == permission_name))).first()
permission = (await session.execute(select(Permission).filter(Permission.name == permission_name))).scalars().first()
if permission is None:
permission = Permission(name = permission_name)
acl = ACL(principal_id = user.id)
acl.permission = permission
acl.resources.append(document_resource)
acl = ACL(principal_id = user.id)
acl.permission = permission
acl.resources.append(document_resource)
session.add(acl)
session.add(acl)
await session.commit()
await session.commit()
# if user.is_superuser:

View file

@ -27,13 +27,13 @@ async def main():
# In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
# """
text = """A large language model (LLM) is a language model notable for its ability to achieve general-purpose language generation and other natural language processing tasks such as classification. LLMs acquire these abilities by learning statistical relationships from text documents during a computationally intensive self-supervised and semi-supervised training process. LLMs can be used for text generation, a form of generative AI, by taking an input text and repeatedly predicting the next token or word.
LLMs are artificial neural networks. The largest and most capable, as of March 2024, are built with a decoder-only transformer-based architecture while some recent implementations are based on other architectures, such as recurrent neural network variants and Mamba (a state space model).
Up to 2020, fine tuning was the only way a model could be adapted to be able to accomplish specific tasks. Larger sized models, such as GPT-3, however, can be prompt-engineered to achieve similar results.[6] They are thought to acquire knowledge about syntax, semantics and "ontology" inherent in human language corpora, but also inaccuracies and biases present in the corpora.
Some notable LLMs are OpenAI's GPT series of models (e.g., GPT-3.5 and GPT-4, used in ChatGPT and Microsoft Copilot), Google's PaLM and Gemini (the latter of which is currently used in the chatbot of the same name), xAI's Grok, Meta's LLaMA family of open-source models, Anthropic's Claude models, Mistral AI's open source models, and Databricks' open source DBRX.
"""
# text = """A large language model (LLM) is a language model notable for its ability to achieve general-purpose language generation and other natural language processing tasks such as classification. LLMs acquire these abilities by learning statistical relationships from text documents during a computationally intensive self-supervised and semi-supervised training process. LLMs can be used for text generation, a form of generative AI, by taking an input text and repeatedly predicting the next token or word.
# LLMs are artificial neural networks. The largest and most capable, as of March 2024, are built with a decoder-only transformer-based architecture while some recent implementations are based on other architectures, such as recurrent neural network variants and Mamba (a state space model).
# Up to 2020, fine tuning was the only way a model could be adapted to be able to accomplish specific tasks. Larger sized models, such as GPT-3, however, can be prompt-engineered to achieve similar results.[6] They are thought to acquire knowledge about syntax, semantics and "ontology" inherent in human language corpora, but also inaccuracies and biases present in the corpora.
# Some notable LLMs are OpenAI's GPT series of models (e.g., GPT-3.5 and GPT-4, used in ChatGPT and Microsoft Copilot), Google's PaLM and Gemini (the latter of which is currently used in the chatbot of the same name), xAI's Grok, Meta's LLaMA family of open-source models, Anthropic's Claude models, Mistral AI's open source models, and Databricks' open source DBRX.
# """
await cognee.add([text], dataset_name)
# await cognee.add([text], dataset_name)
await cognee.cognify([dataset_name])
@ -42,27 +42,27 @@ async def main():
random_node = (await vector_engine.search("entities", "AI"))[0]
random_node_name = random_node.payload["name"]
search_results = await cognee.search("SIMILARITY", { "query": random_node_name })
search_results = await cognee.search("SIMILARITY", params = { "query": random_node_name })
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted sentences are:\n")
for result in search_results:
print(f"{result}\n")
search_results = await cognee.search("TRAVERSE", { "query": random_node_name })
search_results = await cognee.search("TRAVERSE", params = { "query": random_node_name })
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted sentences are:\n")
for result in search_results:
print(f"{result}\n")
search_results = await cognee.search("SUMMARY", { "query": random_node_name })
search_results = await cognee.search("SUMMARY", params = { "query": random_node_name })
assert len(search_results) != 0, "Query related summaries don't exist."
print("\n\nQuery related summaries exist:\n")
for result in search_results:
print(f"{result}\n")
search_results = await cognee.search("ADJACENT", { "query": random_node_name })
search_results = await cognee.search("ADJACENT", params = { "query": random_node_name })
assert len(search_results) != 0, "Large language model query found no neighbours."
print("\n\Large language model query found neighbours.\n")
print("\n\nLarge language model query found neighbours.\n")
for result in search_results:
print(f"{result}\n")