Merge remote-tracking branch 'origin/COG-206' into COG-206
This commit is contained in:
commit
0519986a3a
41 changed files with 422 additions and 359 deletions
|
|
@ -39,6 +39,7 @@ async def lifespan(app: FastAPI):
|
||||||
app = FastAPI(debug = os.getenv("ENV") != "prod", lifespan = lifespan)
|
app = FastAPI(debug = os.getenv("ENV") != "prod", lifespan = lifespan)
|
||||||
|
|
||||||
origins = [
|
origins = [
|
||||||
|
"http://127.0.0.1:3000",
|
||||||
"http://frontend:3000",
|
"http://frontend:3000",
|
||||||
"http://localhost:3000",
|
"http://localhost:3000",
|
||||||
"http://localhost:3001",
|
"http://localhost:3001",
|
||||||
|
|
@ -107,21 +108,33 @@ def health_check():
|
||||||
"""
|
"""
|
||||||
return {"status": "OK"}
|
return {"status": "OK"}
|
||||||
|
|
||||||
class Payload(BaseModel):
|
|
||||||
payload: Dict[str, Any]
|
|
||||||
|
|
||||||
@app.get("/datasets", response_model=list)
|
@app.get("/datasets", response_model=list)
|
||||||
async def get_datasets():
|
async def get_datasets():
|
||||||
from cognee.api.v1.datasets.datasets import datasets
|
try:
|
||||||
return await datasets.list_datasets()
|
from cognee.api.v1.datasets.datasets import datasets
|
||||||
|
datasets = await datasets.list_datasets()
|
||||||
|
|
||||||
|
return JSONResponse(
|
||||||
|
status_code = 200,
|
||||||
|
content = [{
|
||||||
|
"id": str(dataset.id),
|
||||||
|
"name": dataset.name,
|
||||||
|
"created_at": dataset.created_at,
|
||||||
|
"updated_at": dataset.updated_at,
|
||||||
|
"data": dataset.data,
|
||||||
|
} for dataset in datasets],
|
||||||
|
)
|
||||||
|
except Exception as error:
|
||||||
|
raise HTTPException(status_code = 500, detail=f"Error retrieving datasets: {str(error)}") from error
|
||||||
|
|
||||||
@app.delete("/datasets/{dataset_id}", response_model=dict)
|
@app.delete("/datasets/{dataset_id}", response_model=dict)
|
||||||
async def delete_dataset(dataset_id: str):
|
async def delete_dataset(dataset_id: str):
|
||||||
from cognee.api.v1.datasets.datasets import datasets
|
from cognee.api.v1.datasets.datasets import datasets
|
||||||
datasets.delete_dataset(dataset_id)
|
await datasets.delete_dataset(dataset_id)
|
||||||
|
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
status_code=200,
|
status_code = 200,
|
||||||
content="OK",
|
content = "OK",
|
||||||
)
|
)
|
||||||
|
|
||||||
@app.get("/datasets/{dataset_id}/graph", response_model=list)
|
@app.get("/datasets/{dataset_id}/graph", response_model=list)
|
||||||
|
|
@ -146,7 +159,7 @@ async def get_dataset_graph(dataset_id: str):
|
||||||
@app.get("/datasets/{dataset_id}/data", response_model=list)
|
@app.get("/datasets/{dataset_id}/data", response_model=list)
|
||||||
async def get_dataset_data(dataset_id: str):
|
async def get_dataset_data(dataset_id: str):
|
||||||
from cognee.api.v1.datasets.datasets import datasets
|
from cognee.api.v1.datasets.datasets import datasets
|
||||||
dataset_data = datasets.list_data(dataset_id)
|
dataset_data = await datasets.list_data(dataset_id)
|
||||||
if dataset_data is None:
|
if dataset_data is None:
|
||||||
raise HTTPException(status_code=404, detail=f"Dataset ({dataset_id}) not found.")
|
raise HTTPException(status_code=404, detail=f"Dataset ({dataset_id}) not found.")
|
||||||
return [
|
return [
|
||||||
|
|
@ -162,17 +175,24 @@ async def get_dataset_data(dataset_id: str):
|
||||||
@app.get("/datasets/status", response_model=dict)
|
@app.get("/datasets/status", response_model=dict)
|
||||||
async def get_dataset_status(datasets: Annotated[List[str], Query(alias="dataset")] = None):
|
async def get_dataset_status(datasets: Annotated[List[str], Query(alias="dataset")] = None):
|
||||||
from cognee.api.v1.datasets.datasets import datasets as cognee_datasets
|
from cognee.api.v1.datasets.datasets import datasets as cognee_datasets
|
||||||
datasets_statuses = cognee_datasets.get_status(datasets)
|
|
||||||
|
|
||||||
return JSONResponse(
|
try:
|
||||||
status_code = 200,
|
datasets_statuses = await cognee_datasets.get_status(datasets)
|
||||||
content = datasets_statuses,
|
|
||||||
)
|
return JSONResponse(
|
||||||
|
status_code = 200,
|
||||||
|
content = datasets_statuses,
|
||||||
|
)
|
||||||
|
except Exception as error:
|
||||||
|
return JSONResponse(
|
||||||
|
status_code = 409,
|
||||||
|
content = {"error": str(error)}
|
||||||
|
)
|
||||||
|
|
||||||
@app.get("/datasets/{dataset_id}/data/{data_id}/raw", response_class=FileResponse)
|
@app.get("/datasets/{dataset_id}/data/{data_id}/raw", response_class=FileResponse)
|
||||||
async def get_raw_data(dataset_id: str, data_id: str):
|
async def get_raw_data(dataset_id: str, data_id: str):
|
||||||
from cognee.api.v1.datasets.datasets import datasets
|
from cognee.api.v1.datasets.datasets import datasets
|
||||||
dataset_data = datasets.list_data(dataset_id)
|
dataset_data = await datasets.list_data(dataset_id)
|
||||||
if dataset_data is None:
|
if dataset_data is None:
|
||||||
raise HTTPException(status_code=404, detail=f"Dataset ({dataset_id}) not found.")
|
raise HTTPException(status_code=404, detail=f"Dataset ({dataset_id}) not found.")
|
||||||
data = [data for data in dataset_data if data["id"] == data_id][0]
|
data = [data for data in dataset_data if data["id"] == data_id][0]
|
||||||
|
|
@ -312,10 +332,10 @@ def start_api_server(host: str = "0.0.0.0", port: int = 8000):
|
||||||
try:
|
try:
|
||||||
logger.info("Starting server at %s:%s", host, port)
|
logger.info("Starting server at %s:%s", host, port)
|
||||||
|
|
||||||
import asyncio
|
# import asyncio
|
||||||
from cognee.modules.data.deletion import prune_system, prune_data
|
# from cognee.modules.data.deletion import prune_system, prune_data
|
||||||
asyncio.run(prune_data())
|
# asyncio.run(prune_data())
|
||||||
asyncio.run(prune_system(metadata = True))
|
# asyncio.run(prune_system(metadata = True))
|
||||||
|
|
||||||
uvicorn.run(app, host = host, port = port)
|
uvicorn.run(app, host = host, port = port)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
||||||
|
|
@ -9,10 +9,11 @@ from cognee.infrastructure.files.storage import LocalStorage
|
||||||
from cognee.modules.ingestion import get_matched_datasets, save_data_to_file
|
from cognee.modules.ingestion import get_matched_datasets, save_data_to_file
|
||||||
from cognee.shared.utils import send_telemetry
|
from cognee.shared.utils import send_telemetry
|
||||||
from cognee.base_config import get_base_config
|
from cognee.base_config import get_base_config
|
||||||
from cognee.infrastructure.databases.relational import get_relational_config, create_db_and_tables
|
from cognee.infrastructure.databases.relational import get_relational_config, get_relational_engine, create_db_and_tables
|
||||||
from cognee.modules.users.methods import create_default_user
|
from cognee.modules.users.methods import create_default_user, get_default_user
|
||||||
from cognee.modules.users.permissions.methods import give_permission_on_document
|
from cognee.modules.users.permissions.methods import give_permission_on_document
|
||||||
from cognee.modules.users.models import User
|
from cognee.modules.users.models import User
|
||||||
|
from cognee.modules.data.operations.ensure_dataset_exists import ensure_dataset_exists
|
||||||
|
|
||||||
async def add(data: Union[BinaryIO, List[BinaryIO], str, List[str]], dataset_name: str = "main_dataset", user: User = None):
|
async def add(data: Union[BinaryIO, List[BinaryIO], str, List[str]], dataset_name: str = "main_dataset", user: User = None):
|
||||||
await create_db_and_tables()
|
await create_db_and_tables()
|
||||||
|
|
@ -99,6 +100,9 @@ async def add_files(file_paths: List[str], dataset_name: str, user):
|
||||||
destination = destination,
|
destination = destination,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
dataset_name = dataset_name.replace(" ", "_").replace(".", "_") if dataset_name is not None else "main_dataset"
|
||||||
|
dataset = await ensure_dataset_exists(dataset_name)
|
||||||
|
|
||||||
@dlt.resource(standalone = True, merge_key = "id")
|
@dlt.resource(standalone = True, merge_key = "id")
|
||||||
async def data_resources(file_paths: str, user: User):
|
async def data_resources(file_paths: str, user: User):
|
||||||
for file_path in file_paths:
|
for file_path in file_paths:
|
||||||
|
|
@ -107,16 +111,34 @@ async def add_files(file_paths: List[str], dataset_name: str, user):
|
||||||
|
|
||||||
data_id = ingestion.identify(classified_data)
|
data_id = ingestion.identify(classified_data)
|
||||||
|
|
||||||
if user is None:
|
|
||||||
try:
|
|
||||||
user = await create_default_user()
|
|
||||||
|
|
||||||
await give_permission_on_document(user, data_id, "read")
|
|
||||||
await give_permission_on_document(user, data_id, "write")
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
file_metadata = classified_data.get_metadata()
|
file_metadata = classified_data.get_metadata()
|
||||||
|
|
||||||
|
from sqlalchemy import select
|
||||||
|
from cognee.modules.data.models import Data
|
||||||
|
db_engine = get_relational_engine()
|
||||||
|
async with db_engine.get_async_session() as session:
|
||||||
|
data = (await session.execute(
|
||||||
|
select(Data).filter(Data.id == data_id)
|
||||||
|
)).scalar_one_or_none()
|
||||||
|
|
||||||
|
if data is not None:
|
||||||
|
data.name = file_metadata["name"]
|
||||||
|
data.raw_data_location = file_metadata["file_path"]
|
||||||
|
data.extension = file_metadata["extension"]
|
||||||
|
data.mime_type = file_metadata["mime_type"]
|
||||||
|
|
||||||
|
await session.merge(data)
|
||||||
|
else:
|
||||||
|
data = Data(
|
||||||
|
name = file_metadata["name"],
|
||||||
|
raw_data_location = file_metadata["file_path"],
|
||||||
|
extension = file_metadata["extension"],
|
||||||
|
mime_type = file_metadata["mime_type"],
|
||||||
|
)
|
||||||
|
dataset.data.append(data)
|
||||||
|
|
||||||
|
await session.merge(dataset)
|
||||||
|
|
||||||
yield {
|
yield {
|
||||||
"id": data_id,
|
"id": data_id,
|
||||||
"name": file_metadata["name"],
|
"name": file_metadata["name"],
|
||||||
|
|
@ -125,10 +147,20 @@ async def add_files(file_paths: List[str], dataset_name: str, user):
|
||||||
"mime_type": file_metadata["mime_type"],
|
"mime_type": file_metadata["mime_type"],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await give_permission_on_document(user, data_id, "read")
|
||||||
|
await give_permission_on_document(user, data_id, "write")
|
||||||
|
|
||||||
|
|
||||||
|
if user is None:
|
||||||
|
user = await get_default_user()
|
||||||
|
|
||||||
|
if user is None:
|
||||||
|
user = await create_default_user()
|
||||||
|
|
||||||
run_info = pipeline.run(
|
run_info = pipeline.run(
|
||||||
data_resources(processed_file_paths, user),
|
data_resources(processed_file_paths, user),
|
||||||
table_name = "file_metadata",
|
table_name = "file_metadata",
|
||||||
dataset_name = dataset_name.replace(" ", "_").replace(".", "_") if dataset_name is not None else "main_dataset",
|
dataset_name = dataset_name,
|
||||||
write_disposition = "merge",
|
write_disposition = "merge",
|
||||||
)
|
)
|
||||||
send_telemetry("cognee.add")
|
send_telemetry("cognee.add")
|
||||||
|
|
|
||||||
|
|
@ -19,10 +19,11 @@ from cognee.modules.data.extraction.knowledge_graph.expand_knowledge_graph impor
|
||||||
from cognee.modules.data.extraction.knowledge_graph.establish_graph_topology import establish_graph_topology
|
from cognee.modules.data.extraction.knowledge_graph.establish_graph_topology import establish_graph_topology
|
||||||
from cognee.modules.pipelines.tasks.Task import Task
|
from cognee.modules.pipelines.tasks.Task import Task
|
||||||
from cognee.modules.pipelines import run_tasks, run_tasks_parallel
|
from cognee.modules.pipelines import run_tasks, run_tasks_parallel
|
||||||
from cognee.modules.tasks import create_task_status_table, update_task_status, get_task_status
|
|
||||||
from cognee.modules.users.models import User
|
from cognee.modules.users.models import User
|
||||||
from cognee.modules.users.methods import get_default_user
|
from cognee.modules.users.methods import get_default_user
|
||||||
from cognee.modules.users.permissions.methods import check_permissions_on_documents
|
from cognee.modules.users.permissions.methods import check_permissions_on_documents
|
||||||
|
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
|
||||||
|
from cognee.modules.pipelines.operations.log_pipeline_status import log_pipeline_status
|
||||||
|
|
||||||
logger = logging.getLogger("cognify.v2")
|
logger = logging.getLogger("cognify.v2")
|
||||||
|
|
||||||
|
|
@ -35,97 +36,109 @@ class PermissionDeniedException(Exception):
|
||||||
|
|
||||||
async def cognify(datasets: Union[str, list[str]] = None, user: User = None):
|
async def cognify(datasets: Union[str, list[str]] = None, user: User = None):
|
||||||
db_engine = get_relational_engine()
|
db_engine = get_relational_engine()
|
||||||
await create_task_status_table()
|
|
||||||
|
|
||||||
if datasets is None or len(datasets) == 0:
|
if datasets is None or len(datasets) == 0:
|
||||||
return await cognify(await db_engine.get_datasets())
|
return await cognify(await db_engine.get_datasets())
|
||||||
|
|
||||||
|
|
||||||
db_engine = get_relational_engine()
|
if user is None:
|
||||||
async with db_engine.get_async_session() as session:
|
user = await get_default_user()
|
||||||
|
|
||||||
if user is None:
|
async def run_cognify_pipeline(dataset_name: str, files: list[dict]):
|
||||||
user = await get_default_user(session= session)
|
documents = [
|
||||||
|
PdfDocument(id = file["id"], title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "pdf" else
|
||||||
|
AudioDocument(id = file["id"], title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "audio" else
|
||||||
|
ImageDocument(id = file["id"], title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "image" else
|
||||||
|
TextDocument(id = file["id"], title=f"{file['name']}.{file['extension']}", file_path=file["file_path"])
|
||||||
|
for file in files
|
||||||
|
]
|
||||||
|
|
||||||
async def run_cognify_pipeline(dataset_name: str, files: list[dict]):
|
document_ids = [document.id for document in documents]
|
||||||
documents = [
|
|
||||||
PdfDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "pdf" else
|
await check_permissions_on_documents(
|
||||||
AudioDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "audio" else
|
user,
|
||||||
ImageDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "image" else
|
"read",
|
||||||
TextDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"])
|
document_ids,
|
||||||
for file in files
|
)
|
||||||
|
|
||||||
|
async with update_status_lock:
|
||||||
|
task_status = await get_pipeline_status([dataset_name])
|
||||||
|
|
||||||
|
if dataset_name in task_status and task_status[dataset_name] == "DATASET_PROCESSING_STARTED":
|
||||||
|
logger.info(f"Dataset {dataset_name} is being processed.")
|
||||||
|
return
|
||||||
|
|
||||||
|
await log_pipeline_status(dataset_name, "DATASET_PROCESSING_STARTED", {
|
||||||
|
"dataset_name": dataset_name,
|
||||||
|
"files": document_ids,
|
||||||
|
})
|
||||||
|
try:
|
||||||
|
cognee_config = get_cognify_config()
|
||||||
|
graph_config = get_graph_config()
|
||||||
|
root_node_id = None
|
||||||
|
|
||||||
|
if graph_config.infer_graph_topology and graph_config.graph_topology_task:
|
||||||
|
from cognee.modules.topology.topology import TopologyEngine
|
||||||
|
topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology)
|
||||||
|
root_node_id = await topology_engine.add_graph_topology(files = files)
|
||||||
|
elif graph_config.infer_graph_topology and not graph_config.infer_graph_topology:
|
||||||
|
from cognee.modules.topology.topology import TopologyEngine
|
||||||
|
topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology)
|
||||||
|
await topology_engine.add_graph_topology(graph_config.topology_file_path)
|
||||||
|
elif not graph_config.graph_topology_task:
|
||||||
|
root_node_id = "ROOT"
|
||||||
|
|
||||||
|
tasks = [
|
||||||
|
Task(process_documents, parent_node_id = root_node_id), # Classify documents and save them as a nodes in graph db, extract text chunks based on the document type
|
||||||
|
Task(establish_graph_topology, topology_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Set the graph topology for the document chunk data
|
||||||
|
Task(expand_knowledge_graph, graph_model = KnowledgeGraph, collection_name = "entities"), # Generate knowledge graphs from the document chunks and attach it to chunk nodes
|
||||||
|
Task(filter_affected_chunks, collection_name = "chunks"), # Find all affected chunks, so we don't process unchanged chunks
|
||||||
|
Task(
|
||||||
|
save_data_chunks,
|
||||||
|
collection_name = "chunks",
|
||||||
|
), # Save the document chunks in vector db and as nodes in graph db (connected to the document node and between each other)
|
||||||
|
run_tasks_parallel([
|
||||||
|
Task(
|
||||||
|
summarize_text_chunks,
|
||||||
|
summarization_model = cognee_config.summarization_model,
|
||||||
|
collection_name = "chunk_summaries",
|
||||||
|
), # Summarize the document chunks
|
||||||
|
Task(
|
||||||
|
classify_text_chunks,
|
||||||
|
classification_model = cognee_config.classification_model,
|
||||||
|
),
|
||||||
|
]),
|
||||||
|
Task(remove_obsolete_chunks), # Remove the obsolete document chunks.
|
||||||
]
|
]
|
||||||
|
|
||||||
await check_permissions_on_documents(user, "read", [document.id for document in documents], session=session)
|
pipeline = run_tasks(tasks, documents)
|
||||||
|
|
||||||
async with update_status_lock:
|
async for result in pipeline:
|
||||||
task_status = get_task_status([dataset_name])
|
print(result)
|
||||||
|
|
||||||
if dataset_name in task_status and task_status[dataset_name] == "DATASET_PROCESSING_STARTED":
|
await log_pipeline_status(dataset_name, "DATASET_PROCESSING_FINISHED", {
|
||||||
logger.info(f"Dataset {dataset_name} is being processed.")
|
"dataset_name": dataset_name,
|
||||||
return
|
"files": document_ids,
|
||||||
|
})
|
||||||
update_task_status(dataset_name, "DATASET_PROCESSING_STARTED")
|
except Exception as error:
|
||||||
try:
|
await log_pipeline_status(dataset_name, "DATASET_PROCESSING_ERROR", {
|
||||||
cognee_config = get_cognify_config()
|
"dataset_name": dataset_name,
|
||||||
graph_config = get_graph_config()
|
"files": document_ids,
|
||||||
root_node_id = None
|
})
|
||||||
|
raise error
|
||||||
if graph_config.infer_graph_topology and graph_config.graph_topology_task:
|
|
||||||
from cognee.modules.topology.topology import TopologyEngine
|
|
||||||
topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology)
|
|
||||||
root_node_id = await topology_engine.add_graph_topology(files = files)
|
|
||||||
elif graph_config.infer_graph_topology and not graph_config.infer_graph_topology:
|
|
||||||
from cognee.modules.topology.topology import TopologyEngine
|
|
||||||
topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology)
|
|
||||||
await topology_engine.add_graph_topology(graph_config.topology_file_path)
|
|
||||||
elif not graph_config.graph_topology_task:
|
|
||||||
root_node_id = "ROOT"
|
|
||||||
|
|
||||||
tasks = [
|
|
||||||
Task(process_documents, parent_node_id = root_node_id), # Classify documents and save them as a nodes in graph db, extract text chunks based on the document type
|
|
||||||
Task(establish_graph_topology, topology_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Set the graph topology for the document chunk data
|
|
||||||
Task(expand_knowledge_graph, graph_model = KnowledgeGraph, collection_name = "entities"), # Generate knowledge graphs from the document chunks and attach it to chunk nodes
|
|
||||||
Task(filter_affected_chunks, collection_name = "chunks"), # Find all affected chunks, so we don't process unchanged chunks
|
|
||||||
Task(
|
|
||||||
save_data_chunks,
|
|
||||||
collection_name = "chunks",
|
|
||||||
), # Save the document chunks in vector db and as nodes in graph db (connected to the document node and between each other)
|
|
||||||
run_tasks_parallel([
|
|
||||||
Task(
|
|
||||||
summarize_text_chunks,
|
|
||||||
summarization_model = cognee_config.summarization_model,
|
|
||||||
collection_name = "chunk_summaries",
|
|
||||||
), # Summarize the document chunks
|
|
||||||
Task(
|
|
||||||
classify_text_chunks,
|
|
||||||
classification_model = cognee_config.classification_model,
|
|
||||||
),
|
|
||||||
]),
|
|
||||||
Task(remove_obsolete_chunks), # Remove the obsolete document chunks.
|
|
||||||
]
|
|
||||||
|
|
||||||
pipeline = run_tasks(tasks, documents)
|
|
||||||
|
|
||||||
async for result in pipeline:
|
|
||||||
print(result)
|
|
||||||
|
|
||||||
update_task_status(dataset_name, "DATASET_PROCESSING_FINISHED")
|
|
||||||
except Exception as error:
|
|
||||||
update_task_status(dataset_name, "DATASET_PROCESSING_ERROR")
|
|
||||||
raise error
|
|
||||||
|
|
||||||
|
|
||||||
existing_datasets = await db_engine.get_datasets()
|
existing_datasets = [dataset.name for dataset in list(await db_engine.get_datasets())]
|
||||||
awaitables = []
|
awaitables = []
|
||||||
|
|
||||||
for dataset in datasets:
|
for dataset_name in datasets:
|
||||||
dataset_name = generate_dataset_name(dataset)
|
dataset_name = generate_dataset_name(dataset_name)
|
||||||
|
|
||||||
if dataset_name in existing_datasets:
|
if dataset_name in existing_datasets:
|
||||||
awaitables.append(run_cognify_pipeline(dataset, await db_engine.get_files_metadata(dataset_name)))
|
awaitables.append(run_cognify_pipeline(dataset_name, await db_engine.get_files_metadata(dataset_name)))
|
||||||
|
|
||||||
|
return await asyncio.gather(*awaitables)
|
||||||
|
|
||||||
return await asyncio.gather(*awaitables)
|
|
||||||
|
|
||||||
def generate_dataset_name(dataset_name: str) -> str:
|
def generate_dataset_name(dataset_name: str) -> str:
|
||||||
return dataset_name.replace(".", "_").replace(" ", "_")
|
return dataset_name.replace(".", "_").replace(" ", "_")
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
from duckdb import CatalogException
|
from duckdb import CatalogException
|
||||||
from cognee.modules.ingestion import discover_directory_datasets
|
from cognee.modules.ingestion import discover_directory_datasets
|
||||||
from cognee.modules.tasks import get_task_status
|
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
|
|
||||||
class datasets():
|
class datasets():
|
||||||
|
|
@ -14,24 +14,24 @@ class datasets():
|
||||||
return list(discover_directory_datasets(directory_path).keys())
|
return list(discover_directory_datasets(directory_path).keys())
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def list_data(dataset_name: str):
|
async def list_data(dataset_name: str):
|
||||||
db = get_relational_engine()
|
db = get_relational_engine()
|
||||||
try:
|
try:
|
||||||
return db.get_files_metadata(dataset_name)
|
return await db.get_files_metadata(dataset_name)
|
||||||
except CatalogException:
|
except CatalogException:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_status(dataset_ids: list[str]) -> dict:
|
async def get_status(dataset_ids: list[str]) -> dict:
|
||||||
try:
|
try:
|
||||||
return get_task_status(dataset_ids)
|
return await get_pipeline_status(dataset_ids)
|
||||||
except CatalogException:
|
except CatalogException:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def delete_dataset(dataset_id: str):
|
async def delete_dataset(dataset_id: str):
|
||||||
db = get_relational_engine()
|
db = get_relational_engine()
|
||||||
try:
|
try:
|
||||||
return db.delete_table(dataset_id)
|
return await db.delete_table(dataset_id)
|
||||||
except CatalogException:
|
except CatalogException:
|
||||||
return {}
|
return {}
|
||||||
|
|
|
||||||
|
|
@ -1,3 +0,0 @@
|
||||||
from .models.Data import Data
|
|
||||||
from .models.Dataset import Dataset
|
|
||||||
from .models.DatasetData import DatasetData
|
|
||||||
|
|
@ -1,23 +0,0 @@
|
||||||
from typing import List
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
from sqlalchemy.orm import relationship, MappedColumn, Mapped
|
|
||||||
from sqlalchemy import Column, String, DateTime, UUID, Text, JSON
|
|
||||||
from cognee.infrastructure.databases.relational import Base
|
|
||||||
from .DatasetData import DatasetData
|
|
||||||
|
|
||||||
class Data(Base):
|
|
||||||
__tablename__ = "data"
|
|
||||||
|
|
||||||
id = Column(UUID, primary_key = True)
|
|
||||||
name = Column(String, nullable = True)
|
|
||||||
description = Column(Text, nullable = True)
|
|
||||||
raw_data_location = Column(String)
|
|
||||||
meta_data: Mapped[dict] = MappedColumn(type_ = JSON) # metadata is reserved word
|
|
||||||
|
|
||||||
created_at = Column(DateTime, default = datetime.now(timezone.utc))
|
|
||||||
updated_at = Column(DateTime, onupdate = datetime.now(timezone.utc))
|
|
||||||
|
|
||||||
datasets: Mapped[List["Dataset"]] = relationship(
|
|
||||||
secondary = DatasetData.__tablename__,
|
|
||||||
back_populates = "data"
|
|
||||||
)
|
|
||||||
|
|
@ -1,12 +0,0 @@
|
||||||
from uuid import uuid4
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
from sqlalchemy import Column, DateTime, UUID, ForeignKey
|
|
||||||
from cognee.infrastructure.databases.relational import Base
|
|
||||||
|
|
||||||
class DatasetData(Base):
|
|
||||||
__tablename__ = "dataset_data"
|
|
||||||
|
|
||||||
id = Column(UUID, primary_key=True, default=uuid4)
|
|
||||||
|
|
||||||
dataset_id = Column(UUID, ForeignKey("dataset.id"), nullable=False)
|
|
||||||
data_id = Column(UUID, ForeignKey("data.id"), nullable=False)
|
|
||||||
|
|
@ -2,11 +2,12 @@ import os
|
||||||
import asyncio
|
import asyncio
|
||||||
from typing import AsyncGenerator
|
from typing import AsyncGenerator
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from sqlalchemy import create_engine, MetaData, Table, Column, String, Boolean, TIMESTAMP, text
|
from sqlalchemy import create_engine, text, select
|
||||||
from sqlalchemy.orm import sessionmaker
|
from sqlalchemy.orm import sessionmaker, joinedload
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
|
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
|
||||||
from cognee.infrastructure.files.storage import LocalStorage
|
from cognee.infrastructure.files.storage import LocalStorage
|
||||||
from cognee.infrastructure.databases.relational.FakeAsyncSession import FakeAsyncSession
|
from cognee.infrastructure.databases.relational.FakeAsyncSession import FakeAsyncSession
|
||||||
|
from ..ModelBase import Base
|
||||||
|
|
||||||
def make_async_sessionmaker(sessionmaker):
|
def make_async_sessionmaker(sessionmaker):
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
|
|
@ -19,6 +20,7 @@ def make_async_sessionmaker(sessionmaker):
|
||||||
class SQLAlchemyAdapter():
|
class SQLAlchemyAdapter():
|
||||||
def __init__(self, db_type: str, db_path: str, db_name: str, db_user: str, db_password: str, db_host: str, db_port: str):
|
def __init__(self, db_type: str, db_path: str, db_name: str, db_user: str, db_password: str, db_host: str, db_port: str):
|
||||||
self.db_location = os.path.abspath(os.path.join(db_path, db_name))
|
self.db_location = os.path.abspath(os.path.join(db_path, db_name))
|
||||||
|
self.db_name = db_name
|
||||||
|
|
||||||
if db_type == "duckdb":
|
if db_type == "duckdb":
|
||||||
LocalStorage.ensure_directory_exists(db_path)
|
LocalStorage.ensure_directory_exists(db_path)
|
||||||
|
|
@ -42,15 +44,11 @@ class SQLAlchemyAdapter():
|
||||||
yield session
|
yield session
|
||||||
|
|
||||||
async def get_datasets(self):
|
async def get_datasets(self):
|
||||||
async with self.engine.connect() as connection:
|
from cognee.modules.data.models import Dataset
|
||||||
result = await connection.execute(text("SELECT DISTINCT table_schema FROM information_schema.tables;"))
|
|
||||||
tables = [row[0] for row in result]
|
async with self.get_async_session() as session:
|
||||||
return list(
|
datasets = (await session.execute(select(Dataset).options(joinedload(Dataset.data)))).unique().scalars().all()
|
||||||
filter(
|
return datasets
|
||||||
lambda table_schema: not table_schema.endswith("staging") and table_schema != "cognee",
|
|
||||||
tables
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
async def create_table(self, schema_name: str, table_name: str, table_config: list[dict]):
|
async def create_table(self, schema_name: str, table_name: str, table_config: list[dict]):
|
||||||
fields_query_parts = [f"{item['name']} {item['type']}" for item in table_config]
|
fields_query_parts = [f"{item['name']} {item['type']}" for item in table_config]
|
||||||
|
|
@ -100,65 +98,6 @@ class SQLAlchemyAdapter():
|
||||||
result = await connection.execute(text(query))
|
result = await connection.execute(text(query))
|
||||||
return [dict(row) for row in result]
|
return [dict(row) for row in result]
|
||||||
|
|
||||||
async def load_cognify_data(self, data):
|
|
||||||
metadata = MetaData()
|
|
||||||
|
|
||||||
cognify_table = Table(
|
|
||||||
"cognify",
|
|
||||||
metadata,
|
|
||||||
Column("document_id", String),
|
|
||||||
Column("created_at", TIMESTAMP, server_default=text("CURRENT_TIMESTAMP")),
|
|
||||||
Column("updated_at", TIMESTAMP, nullable=True, default=None),
|
|
||||||
Column("processed", Boolean, default=False),
|
|
||||||
Column("document_id_target", String, nullable=True)
|
|
||||||
)
|
|
||||||
|
|
||||||
async with self.engine.begin() as connection:
|
|
||||||
await connection.run_sync(metadata.create_all)
|
|
||||||
|
|
||||||
insert_query = cognify_table.insert().values(document_id=text(":document_id"))
|
|
||||||
async with self.engine.connect() as connection:
|
|
||||||
await connection.execute(insert_query, data)
|
|
||||||
|
|
||||||
async def fetch_cognify_data(self, excluded_document_id: str):
|
|
||||||
async with self.engine.connect() as connection:
|
|
||||||
await connection.execute(text("""
|
|
||||||
CREATE TABLE IF NOT EXISTS cognify (
|
|
||||||
document_id VARCHAR,
|
|
||||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
updated_at TIMESTAMP DEFAULT NULL,
|
|
||||||
processed BOOLEAN DEFAULT FALSE,
|
|
||||||
document_id_target VARCHAR NULL
|
|
||||||
);
|
|
||||||
"""))
|
|
||||||
query = text("""
|
|
||||||
SELECT document_id, created_at, updated_at, processed
|
|
||||||
FROM cognify
|
|
||||||
WHERE document_id != :excluded_document_id AND processed = FALSE;
|
|
||||||
""")
|
|
||||||
records = await connection.execute(query, {"excluded_document_id": excluded_document_id})
|
|
||||||
records = records.fetchall()
|
|
||||||
|
|
||||||
if records:
|
|
||||||
document_ids = tuple(record["document_id"] for record in records)
|
|
||||||
update_query = text("UPDATE cognify SET processed = TRUE WHERE document_id IN :document_ids;")
|
|
||||||
await connection.execute(update_query, {"document_ids": document_ids})
|
|
||||||
return [dict(record) for record in records]
|
|
||||||
|
|
||||||
async def delete_cognify_data(self):
|
|
||||||
async with self.engine.connect() as connection:
|
|
||||||
await connection.execute(text("""
|
|
||||||
CREATE TABLE IF NOT EXISTS cognify (
|
|
||||||
document_id VARCHAR,
|
|
||||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
updated_at TIMESTAMP DEFAULT NULL,
|
|
||||||
processed BOOLEAN DEFAULT FALSE,
|
|
||||||
document_id_target VARCHAR NULL
|
|
||||||
);
|
|
||||||
"""))
|
|
||||||
await connection.execute(text("DELETE FROM cognify;"))
|
|
||||||
await connection.execute(text("DROP TABLE cognify;"))
|
|
||||||
|
|
||||||
async def drop_tables(self, connection):
|
async def drop_tables(self, connection):
|
||||||
try:
|
try:
|
||||||
await connection.execute(text("DROP TABLE IF EXISTS group_permission CASCADE"))
|
await connection.execute(text("DROP TABLE IF EXISTS group_permission CASCADE"))
|
||||||
|
|
@ -169,9 +108,11 @@ class SQLAlchemyAdapter():
|
||||||
print(f"Error dropping database tables: {e}")
|
print(f"Error dropping database tables: {e}")
|
||||||
|
|
||||||
async def delete_database(self):
|
async def delete_database(self):
|
||||||
async with self.engine.begin() as connection:
|
async with self.engine.connect() as connection:
|
||||||
try:
|
try:
|
||||||
await self.drop_tables(connection)
|
async with self.engine.begin() as connection:
|
||||||
print("Database tables dropped successfully.")
|
await connection.run_sync(Base.metadata.drop_all)
|
||||||
|
|
||||||
|
print("Database deleted successfully.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error dropping database tables: {e}")
|
print(f"Error deleting database: {e}")
|
||||||
|
|
|
||||||
25
cognee/modules/data/models/Data.py
Normal file
25
cognee/modules/data/models/Data.py
Normal file
|
|
@ -0,0 +1,25 @@
|
||||||
|
from uuid import uuid4
|
||||||
|
from typing import List
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from sqlalchemy.orm import relationship, Mapped
|
||||||
|
from sqlalchemy import Column, String, DateTime, UUID
|
||||||
|
from cognee.infrastructure.databases.relational import Base
|
||||||
|
from .DatasetData import DatasetData
|
||||||
|
|
||||||
|
class Data(Base):
|
||||||
|
__tablename__ = "data"
|
||||||
|
|
||||||
|
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
|
||||||
|
|
||||||
|
name = Column(String)
|
||||||
|
extension = Column(String)
|
||||||
|
mime_type = Column(String)
|
||||||
|
raw_data_location = Column(String)
|
||||||
|
|
||||||
|
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
|
||||||
|
updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc))
|
||||||
|
|
||||||
|
datasets: Mapped[List["Dataset"]] = relationship(
|
||||||
|
secondary = DatasetData.__tablename__,
|
||||||
|
back_populates = "data"
|
||||||
|
)
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
from uuid import uuid4
|
||||||
from typing import List
|
from typing import List
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from sqlalchemy.orm import relationship, Mapped
|
from sqlalchemy.orm import relationship, Mapped
|
||||||
|
|
@ -6,14 +7,14 @@ from cognee.infrastructure.databases.relational import Base
|
||||||
from .DatasetData import DatasetData
|
from .DatasetData import DatasetData
|
||||||
|
|
||||||
class Dataset(Base):
|
class Dataset(Base):
|
||||||
__tablename__ = "dataset"
|
__tablename__ = "datasets"
|
||||||
|
|
||||||
|
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
|
||||||
|
|
||||||
id = Column(UUID, primary_key = True)
|
|
||||||
name = Column(Text)
|
name = Column(Text)
|
||||||
description = Column(Text, nullable = True)
|
|
||||||
|
|
||||||
created_at = Column(DateTime, default = datetime.now(timezone.utc))
|
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
|
||||||
updated_at = Column(DateTime, onupdate = datetime.now(timezone.utc))
|
updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc))
|
||||||
|
|
||||||
data: Mapped[List["Data"]] = relationship(
|
data: Mapped[List["Data"]] = relationship(
|
||||||
secondary = DatasetData.__tablename__,
|
secondary = DatasetData.__tablename__,
|
||||||
11
cognee/modules/data/models/DatasetData.py
Normal file
11
cognee/modules/data/models/DatasetData.py
Normal file
|
|
@ -0,0 +1,11 @@
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from sqlalchemy import Column, DateTime, UUID, ForeignKey
|
||||||
|
from cognee.infrastructure.databases.relational import Base
|
||||||
|
|
||||||
|
class DatasetData(Base):
|
||||||
|
__tablename__ = "dataset_data"
|
||||||
|
|
||||||
|
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
|
||||||
|
|
||||||
|
dataset_id = Column(UUID(as_uuid = True), ForeignKey("datasets.id"), primary_key = True)
|
||||||
|
data_id = Column(UUID(as_uuid = True), ForeignKey("data.id"), primary_key = True)
|
||||||
2
cognee/modules/data/models/__init__.py
Normal file
2
cognee/modules/data/models/__init__.py
Normal file
|
|
@ -0,0 +1,2 @@
|
||||||
|
from .Data import Data
|
||||||
|
from .Dataset import Dataset
|
||||||
26
cognee/modules/data/operations/ensure_dataset_exists.py
Normal file
26
cognee/modules/data/operations/ensure_dataset_exists.py
Normal file
|
|
@ -0,0 +1,26 @@
|
||||||
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.orm import joinedload
|
||||||
|
from cognee.modules.data.models import Dataset
|
||||||
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
|
|
||||||
|
async def ensure_dataset_exists(dataset_name: str) -> Dataset:
|
||||||
|
db_engine = get_relational_engine()
|
||||||
|
|
||||||
|
async with db_engine.get_async_session() as session:
|
||||||
|
dataset = (await session.scalars(
|
||||||
|
select(Dataset)\
|
||||||
|
.options(joinedload(Dataset.data))\
|
||||||
|
.filter(Dataset.name == dataset_name)
|
||||||
|
)).first()
|
||||||
|
|
||||||
|
if dataset is None:
|
||||||
|
dataset = Dataset(
|
||||||
|
name = dataset_name,
|
||||||
|
data = []
|
||||||
|
)
|
||||||
|
|
||||||
|
session.add(dataset)
|
||||||
|
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
return dataset
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
from uuid import uuid5, NAMESPACE_OID
|
from uuid import UUID, uuid5, NAMESPACE_OID
|
||||||
from typing import Optional, Generator
|
from typing import Optional
|
||||||
|
|
||||||
from cognee.infrastructure.llm.get_llm_client import get_llm_client
|
from cognee.infrastructure.llm.get_llm_client import get_llm_client
|
||||||
from cognee.modules.data.chunking import chunk_by_paragraph
|
from cognee.modules.data.chunking import chunk_by_paragraph
|
||||||
|
|
@ -7,10 +7,10 @@ from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChu
|
||||||
from cognee.modules.data.processing.document_types.Document import Document
|
from cognee.modules.data.processing.document_types.Document import Document
|
||||||
|
|
||||||
class AudioReader:
|
class AudioReader:
|
||||||
id: str
|
id: UUID
|
||||||
file_path: str
|
file_path: str
|
||||||
|
|
||||||
def __init__(self, id: str, file_path: str):
|
def __init__(self, id: UUID, file_path: str):
|
||||||
self.id = id
|
self.id = id
|
||||||
self.file_path = file_path
|
self.file_path = file_path
|
||||||
self.llm_client = get_llm_client() # You can choose different models like "tiny", "base", "small", etc.
|
self.llm_client = get_llm_client() # You can choose different models like "tiny", "base", "small", etc.
|
||||||
|
|
@ -87,13 +87,11 @@ class AudioDocument(Document):
|
||||||
title: str
|
title: str
|
||||||
file_path: str
|
file_path: str
|
||||||
|
|
||||||
def __init__(self, title: str, file_path: str):
|
def __init__(self, id: UUID, title: str, file_path: str):
|
||||||
self.id = uuid5(NAMESPACE_OID, title)
|
self.id = id or uuid5(NAMESPACE_OID, title)
|
||||||
self.title = title
|
self.title = title
|
||||||
self.file_path = file_path
|
self.file_path = file_path
|
||||||
|
|
||||||
reader = AudioReader(self.id, self.file_path)
|
|
||||||
|
|
||||||
def get_reader(self) -> AudioReader:
|
def get_reader(self) -> AudioReader:
|
||||||
reader = AudioReader(self.id, self.file_path)
|
reader = AudioReader(self.id, self.file_path)
|
||||||
return reader
|
return reader
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
from uuid import uuid5, NAMESPACE_OID
|
from uuid import UUID, uuid5, NAMESPACE_OID
|
||||||
from typing import Optional, Generator
|
from typing import Optional
|
||||||
|
|
||||||
from cognee.infrastructure.llm.get_llm_client import get_llm_client
|
from cognee.infrastructure.llm.get_llm_client import get_llm_client
|
||||||
from cognee.modules.data.chunking import chunk_by_paragraph
|
from cognee.modules.data.chunking import chunk_by_paragraph
|
||||||
|
|
@ -7,10 +7,10 @@ from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChu
|
||||||
from cognee.modules.data.processing.document_types.Document import Document
|
from cognee.modules.data.processing.document_types.Document import Document
|
||||||
|
|
||||||
class ImageReader:
|
class ImageReader:
|
||||||
id: str
|
id: UUID
|
||||||
file_path: str
|
file_path: str
|
||||||
|
|
||||||
def __init__(self, id: str, file_path: str):
|
def __init__(self, id: UUID, file_path: str):
|
||||||
self.id = id
|
self.id = id
|
||||||
self.file_path = file_path
|
self.file_path = file_path
|
||||||
self.llm_client = get_llm_client() # You can choose different models like "tiny", "base", "small", etc.
|
self.llm_client = get_llm_client() # You can choose different models like "tiny", "base", "small", etc.
|
||||||
|
|
@ -24,10 +24,8 @@ class ImageReader:
|
||||||
|
|
||||||
# Transcribe the image file
|
# Transcribe the image file
|
||||||
result = self.llm_client.transcribe_image(self.file_path)
|
result = self.llm_client.transcribe_image(self.file_path)
|
||||||
print("Transcription result: ", result.choices[0].message.content)
|
|
||||||
text = result.choices[0].message.content
|
text = result.choices[0].message.content
|
||||||
|
|
||||||
|
|
||||||
# Simulate reading text in chunks as done in TextReader
|
# Simulate reading text in chunks as done in TextReader
|
||||||
def read_text_chunks(text, chunk_size):
|
def read_text_chunks(text, chunk_size):
|
||||||
for i in range(0, len(text), chunk_size):
|
for i in range(0, len(text), chunk_size):
|
||||||
|
|
@ -89,13 +87,11 @@ class ImageDocument(Document):
|
||||||
title: str
|
title: str
|
||||||
file_path: str
|
file_path: str
|
||||||
|
|
||||||
def __init__(self, title: str, file_path: str):
|
def __init__(self, id: UUID, title: str, file_path: str):
|
||||||
self.id = uuid5(NAMESPACE_OID, title)
|
self.id = id or uuid5(NAMESPACE_OID, title)
|
||||||
self.title = title
|
self.title = title
|
||||||
self.file_path = file_path
|
self.file_path = file_path
|
||||||
|
|
||||||
reader = ImageReader(self.id, self.file_path)
|
|
||||||
|
|
||||||
def get_reader(self) -> ImageReader:
|
def get_reader(self) -> ImageReader:
|
||||||
reader = ImageReader(self.id, self.file_path)
|
reader = ImageReader(self.id, self.file_path)
|
||||||
return reader
|
return reader
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
# import pdfplumber
|
# import pdfplumber
|
||||||
import logging
|
import logging
|
||||||
from uuid import uuid5, NAMESPACE_OID
|
from uuid import UUID, uuid5, NAMESPACE_OID
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from pypdf import PdfReader as pypdf_PdfReader
|
from pypdf import PdfReader as pypdf_PdfReader
|
||||||
from cognee.modules.data.chunking import chunk_by_paragraph
|
from cognee.modules.data.chunking import chunk_by_paragraph
|
||||||
|
|
@ -8,10 +8,10 @@ from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChu
|
||||||
from .Document import Document
|
from .Document import Document
|
||||||
|
|
||||||
class PdfReader():
|
class PdfReader():
|
||||||
id: str
|
id: UUID
|
||||||
file_path: str
|
file_path: str
|
||||||
|
|
||||||
def __init__(self, id: str, file_path: str):
|
def __init__(self, id: UUID, file_path: str):
|
||||||
self.id = id
|
self.id = id
|
||||||
self.file_path = file_path
|
self.file_path = file_path
|
||||||
|
|
||||||
|
|
@ -86,8 +86,8 @@ class PdfDocument(Document):
|
||||||
num_pages: int
|
num_pages: int
|
||||||
file_path: str
|
file_path: str
|
||||||
|
|
||||||
def __init__(self, title: str, file_path: str):
|
def __init__(self, id: UUID, title: str, file_path: str):
|
||||||
self.id = uuid5(NAMESPACE_OID, title)
|
self.id = id or uuid5(NAMESPACE_OID, title)
|
||||||
self.title = title
|
self.title = title
|
||||||
self.file_path = file_path
|
self.file_path = file_path
|
||||||
logging.debug("file_path: %s", self.file_path)
|
logging.debug("file_path: %s", self.file_path)
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,14 @@
|
||||||
from uuid import uuid5, NAMESPACE_OID
|
from uuid import UUID, uuid5, NAMESPACE_OID
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from cognee.modules.data.chunking import chunk_by_paragraph
|
from cognee.modules.data.chunking import chunk_by_paragraph
|
||||||
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
|
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
|
||||||
from .Document import Document
|
from .Document import Document
|
||||||
|
|
||||||
class TextReader():
|
class TextReader():
|
||||||
id: str
|
id: UUID
|
||||||
file_path: str
|
file_path: str
|
||||||
|
|
||||||
def __init__(self, id: str, file_path: str):
|
def __init__(self, id: UUID, file_path: str):
|
||||||
self.id = id
|
self.id = id
|
||||||
self.file_path = file_path
|
self.file_path = file_path
|
||||||
|
|
||||||
|
|
@ -90,8 +90,8 @@ class TextDocument(Document):
|
||||||
num_pages: int
|
num_pages: int
|
||||||
file_path: str
|
file_path: str
|
||||||
|
|
||||||
def __init__(self, title: str, file_path: str):
|
def __init__(self, id: UUID, title: str, file_path: str):
|
||||||
self.id = uuid5(NAMESPACE_OID, title)
|
self.id = id or uuid5(NAMESPACE_OID, title)
|
||||||
self.title = title
|
self.title = title
|
||||||
self.file_path = file_path
|
self.file_path = file_path
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ class BinaryData(IngestionData):
|
||||||
def get_identifier(self):
|
def get_identifier(self):
|
||||||
metadata = self.get_metadata()
|
metadata = self.get_metadata()
|
||||||
|
|
||||||
return self.name + "_" + metadata["mime_type"]
|
return self.name + "." + metadata["extension"]
|
||||||
|
|
||||||
def get_metadata(self):
|
def get_metadata(self):
|
||||||
self.ensure_metadata()
|
self.ensure_metadata()
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,7 @@
|
||||||
from uuid import uuid5, UUID
|
from uuid import uuid5, NAMESPACE_OID
|
||||||
from .data_types import IngestionData
|
from .data_types import IngestionData
|
||||||
|
|
||||||
null_uuid: UUID = UUID("00000000-0000-0000-0000-000000000000")
|
|
||||||
|
|
||||||
def identify(data: IngestionData) -> str:
|
def identify(data: IngestionData) -> str:
|
||||||
data_id: str = data.get_identifier()
|
data_id: str = data.get_identifier()
|
||||||
|
|
||||||
return str(uuid5(null_uuid, data_id)).replace("-", "")
|
return uuid5(NAMESPACE_OID, data_id)
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ from .PipelineTask import PipelineTask
|
||||||
class Pipeline(Base):
|
class Pipeline(Base):
|
||||||
__tablename__ = "pipelines"
|
__tablename__ = "pipelines"
|
||||||
|
|
||||||
id = Column(UUID, primary_key = True, default = uuid4)
|
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
|
||||||
|
|
||||||
name = Column(String)
|
name = Column(String)
|
||||||
description = Column(Text, nullable = True)
|
description = Column(Text, nullable = True)
|
||||||
|
|
|
||||||
|
|
@ -6,10 +6,12 @@ from cognee.infrastructure.databases.relational import Base
|
||||||
class PipelineRun(Base):
|
class PipelineRun(Base):
|
||||||
__tablename__ = "pipeline_runs"
|
__tablename__ = "pipeline_runs"
|
||||||
|
|
||||||
id = Column(UUID, primary_key = True, default = uuid4)
|
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
|
||||||
|
|
||||||
dataset_name = Column(String)
|
|
||||||
|
|
||||||
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
|
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
|
||||||
|
|
||||||
|
run_name = Column(String, index = True)
|
||||||
|
|
||||||
|
status = Column(String)
|
||||||
|
|
||||||
run_info = Column(JSON)
|
run_info = Column(JSON)
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,3 @@
|
||||||
from uuid import uuid4
|
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from sqlalchemy import Column, DateTime, UUID, ForeignKey
|
from sqlalchemy import Column, DateTime, UUID, ForeignKey
|
||||||
from cognee.infrastructure.databases.relational import Base
|
from cognee.infrastructure.databases.relational import Base
|
||||||
|
|
@ -6,9 +5,7 @@ from cognee.infrastructure.databases.relational import Base
|
||||||
class PipelineTask(Base):
|
class PipelineTask(Base):
|
||||||
__tablename__ = "pipeline_task"
|
__tablename__ = "pipeline_task"
|
||||||
|
|
||||||
id = Column(UUID, primary_key = True, default = uuid4)
|
|
||||||
|
|
||||||
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
|
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
|
||||||
|
|
||||||
pipeline_id = Column("pipeline", UUID, ForeignKey("pipeline.id"), primary_key = True)
|
pipeline_id = Column("pipeline", UUID(as_uuid = True), ForeignKey("pipeline.id"), primary_key = True)
|
||||||
task_id = Column("task", UUID, ForeignKey("task.id"), primary_key = True)
|
task_id = Column("task", UUID(as_uuid = True), ForeignKey("task.id"), primary_key = True)
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ from .PipelineTask import PipelineTask
|
||||||
class Task(Base):
|
class Task(Base):
|
||||||
__tablename__ = "tasks"
|
__tablename__ = "tasks"
|
||||||
|
|
||||||
id = Column(UUID, primary_key = True, default = uuid4)
|
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
|
||||||
|
|
||||||
name = Column(String)
|
name = Column(String)
|
||||||
description = Column(Text, nullable = True)
|
description = Column(Text, nullable = True)
|
||||||
|
|
|
||||||
17
cognee/modules/pipelines/models/TaskRun.py
Normal file
17
cognee/modules/pipelines/models/TaskRun.py
Normal file
|
|
@ -0,0 +1,17 @@
|
||||||
|
from uuid import uuid4
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from sqlalchemy import Column, UUID, DateTime, String, JSON
|
||||||
|
from cognee.infrastructure.databases.relational import Base
|
||||||
|
|
||||||
|
class TaskRun(Base):
|
||||||
|
__tablename__ = "task_runs"
|
||||||
|
|
||||||
|
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
|
||||||
|
|
||||||
|
task_name = Column(String)
|
||||||
|
|
||||||
|
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
|
||||||
|
|
||||||
|
status = Column(String)
|
||||||
|
|
||||||
|
run_info = Column(JSON)
|
||||||
1
cognee/modules/pipelines/models/__init__.py
Normal file
1
cognee/modules/pipelines/models/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
from .PipelineRun import PipelineRun
|
||||||
|
|
@ -1,4 +0,0 @@
|
||||||
from ..models import Pipeline, Task
|
|
||||||
|
|
||||||
def add_task(pipeline: Pipeline, task: Task):
|
|
||||||
pipeline.tasks.append(task)
|
|
||||||
40
cognee/modules/pipelines/operations/get_pipeline_status.py
Normal file
40
cognee/modules/pipelines/operations/get_pipeline_status.py
Normal file
|
|
@ -0,0 +1,40 @@
|
||||||
|
from sqlalchemy import func, select
|
||||||
|
from sqlalchemy.orm import aliased
|
||||||
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
|
from ..models import PipelineRun
|
||||||
|
|
||||||
|
async def get_pipeline_status(pipeline_names: [str]):
|
||||||
|
db_engine = get_relational_engine()
|
||||||
|
|
||||||
|
async with db_engine.get_async_session() as session:
|
||||||
|
query = select(
|
||||||
|
PipelineRun,
|
||||||
|
func.row_number().over(
|
||||||
|
partition_by = PipelineRun.run_name,
|
||||||
|
order_by = PipelineRun.created_at.desc(),
|
||||||
|
).label("rn")
|
||||||
|
).filter(PipelineRun.run_name.in_(pipeline_names)).subquery()
|
||||||
|
|
||||||
|
aliased_pipeline_run = aliased(PipelineRun, query)
|
||||||
|
|
||||||
|
latest_runs = (
|
||||||
|
select(aliased_pipeline_run).filter(query.c.rn == 1)
|
||||||
|
)
|
||||||
|
|
||||||
|
runs = (await session.execute(latest_runs)).scalars().all()
|
||||||
|
|
||||||
|
pipeline_statuses = {
|
||||||
|
run.run_name: run.status for run in runs
|
||||||
|
}
|
||||||
|
|
||||||
|
return pipeline_statuses
|
||||||
|
|
||||||
|
# f"""SELECT data_id, status
|
||||||
|
# FROM (
|
||||||
|
# SELECT data_id, status, ROW_NUMBER() OVER (PARTITION BY data_id ORDER BY created_at DESC) as rn
|
||||||
|
# FROM cognee.cognee.task_runs
|
||||||
|
# WHERE data_id IN ({formatted_data_ids})
|
||||||
|
# ) t
|
||||||
|
# WHERE rn = 1;"""
|
||||||
|
|
||||||
|
# return { dataset["data_id"]: dataset["status"] for dataset in datasets_statuses }
|
||||||
14
cognee/modules/pipelines/operations/log_pipeline_status.py
Normal file
14
cognee/modules/pipelines/operations/log_pipeline_status.py
Normal file
|
|
@ -0,0 +1,14 @@
|
||||||
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
|
from ..models.PipelineRun import PipelineRun
|
||||||
|
|
||||||
|
async def log_pipeline_status(run_name: str, status: str, run_info: dict):
|
||||||
|
db_engine = get_relational_engine()
|
||||||
|
|
||||||
|
async with db_engine.get_async_session() as session:
|
||||||
|
session.add(PipelineRun(
|
||||||
|
run_name = run_name,
|
||||||
|
status = status,
|
||||||
|
run_info = run_info,
|
||||||
|
))
|
||||||
|
|
||||||
|
await session.commit()
|
||||||
|
|
@ -1,3 +0,0 @@
|
||||||
from .get_task_status import get_task_status
|
|
||||||
from .update_task_status import update_task_status
|
|
||||||
from .create_task_status_table import create_task_status_table
|
|
||||||
|
|
@ -1,10 +0,0 @@
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
|
||||||
|
|
||||||
async def create_task_status_table():
|
|
||||||
db_engine = get_relational_engine()
|
|
||||||
|
|
||||||
await db_engine.create_table("cognee", "cognee_task_status", [
|
|
||||||
dict(name="data_id", type="VARCHAR"),
|
|
||||||
dict(name="status", type="VARCHAR"),
|
|
||||||
dict(name="created_at", type="TIMESTAMP DEFAULT CURRENT_TIMESTAMP"),
|
|
||||||
])
|
|
||||||
|
|
@ -1,18 +0,0 @@
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
|
||||||
|
|
||||||
def get_task_status(data_ids: [str]):
|
|
||||||
db_engine = get_relational_engine()
|
|
||||||
|
|
||||||
formatted_data_ids = ", ".join([f"'{data_id}'" for data_id in data_ids])
|
|
||||||
|
|
||||||
datasets_statuses = db_engine.execute_query(
|
|
||||||
f"""SELECT data_id, status
|
|
||||||
FROM (
|
|
||||||
SELECT data_id, status, ROW_NUMBER() OVER (PARTITION BY data_id ORDER BY created_at DESC) as rn
|
|
||||||
FROM cognee.cognee.cognee_task_status
|
|
||||||
WHERE data_id IN ({formatted_data_ids})
|
|
||||||
) t
|
|
||||||
WHERE rn = 1;"""
|
|
||||||
)
|
|
||||||
|
|
||||||
return { dataset["data_id"]: dataset["status"] for dataset in datasets_statuses }
|
|
||||||
|
|
@ -1,5 +0,0 @@
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
|
||||||
|
|
||||||
def update_task_status(data_id: str, status: str):
|
|
||||||
db_engine = get_relational_engine()
|
|
||||||
db_engine.insert_data("cognee.cognee", "cognee_task_status", [dict(data_id = data_id, status = status)])
|
|
||||||
|
|
@ -1,5 +1,4 @@
|
||||||
import hashlib
|
import hashlib
|
||||||
# from cognee.infrastructure.databases.relational import get_relational_engine
|
|
||||||
from .create_user import create_user
|
from .create_user import create_user
|
||||||
|
|
||||||
async def create_default_user():
|
async def create_default_user():
|
||||||
|
|
@ -12,12 +11,9 @@ async def create_default_user():
|
||||||
is_superuser = True,
|
is_superuser = True,
|
||||||
is_active = True,
|
is_active = True,
|
||||||
is_verified = True,
|
is_verified = True,
|
||||||
|
auto_login = True,
|
||||||
)
|
)
|
||||||
|
|
||||||
# db_engine = get_relational_engine()
|
|
||||||
# async with db_engine.get_async_session() as session:
|
|
||||||
# await session.refresh(user)
|
|
||||||
|
|
||||||
return user
|
return user
|
||||||
|
|
||||||
async def hash_password(password: str) -> str:
|
async def hash_password(password: str) -> str:
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ async def create_user(
|
||||||
is_superuser: bool = False,
|
is_superuser: bool = False,
|
||||||
is_active: bool = True,
|
is_active: bool = True,
|
||||||
is_verified: bool = False,
|
is_verified: bool = False,
|
||||||
|
auto_login: bool = False,
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
relational_engine = get_relational_engine()
|
relational_engine = get_relational_engine()
|
||||||
|
|
@ -26,6 +27,10 @@ async def create_user(
|
||||||
is_verified = is_verified,
|
is_verified = is_verified,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if auto_login:
|
||||||
|
await session.refresh(user)
|
||||||
|
|
||||||
return user
|
return user
|
||||||
print(f"User created: {user.email}")
|
print(f"User created: {user.email}")
|
||||||
except UserAlreadyExists as error:
|
except UserAlreadyExists as error:
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,16 @@
|
||||||
from sqlalchemy.orm import joinedload
|
from sqlalchemy.orm import joinedload
|
||||||
|
from sqlalchemy.future import select
|
||||||
from cognee.modules.users.models import User
|
from cognee.modules.users.models import User
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
|
|
||||||
from sqlalchemy.future import select
|
async def get_default_user():
|
||||||
|
db_engine = get_relational_engine()
|
||||||
|
|
||||||
async def get_default_user(session):
|
async with db_engine.get_async_session() as session:
|
||||||
stmt = select(User).options(joinedload(User.groups)).where(User.email == "default_user@example.com")
|
query = select(User).options(joinedload(User.groups))\
|
||||||
result = await session.execute(stmt)
|
.where(User.email == "default_user@example.com")
|
||||||
user = result.scalars().first()
|
|
||||||
return user
|
result = await session.execute(query)
|
||||||
|
user = result.scalars().first()
|
||||||
|
|
||||||
|
return user
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,8 @@
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.orm import joinedload
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
|
|
||||||
from ...models.User import User
|
from ...models.User import User
|
||||||
from ...models.ACL import ACL
|
from ...models.ACL import ACL
|
||||||
|
|
||||||
|
|
@ -14,24 +14,26 @@ class PermissionDeniedException(Exception):
|
||||||
super().__init__(self.message)
|
super().__init__(self.message)
|
||||||
|
|
||||||
|
|
||||||
async def check_permissions_on_documents(user: User, permission_type: str, document_ids: list[str], session):
|
async def check_permissions_on_documents(user: User, permission_type: str, document_ids: list[str]):
|
||||||
|
|
||||||
logging.info("This is the user: %s", user.__dict__)
|
|
||||||
try:
|
try:
|
||||||
user_group_ids = [group.id for group in user.groups]
|
user_group_ids = [group.id for group in user.groups]
|
||||||
|
|
||||||
acls = await session.execute(
|
db_engine = get_relational_engine()
|
||||||
select(ACL)
|
|
||||||
.join(ACL.permission)
|
|
||||||
.where(ACL.principal_id.in_([user.id, *user_group_ids]))
|
|
||||||
.where(ACL.permission.has(name=permission_type))
|
|
||||||
)
|
|
||||||
resource_ids = [resource.resource_id for acl in acls.scalars().all() for resource in acl.resources]
|
|
||||||
has_permissions = all(document_id in resource_ids for document_id in document_ids)
|
|
||||||
|
|
||||||
if not has_permissions:
|
async with db_engine.get_async_session() as session:
|
||||||
raise PermissionDeniedException(f"User {user.username} does not have {permission_type} permission on documents")
|
result = await session.execute(
|
||||||
|
select(ACL)
|
||||||
|
.join(ACL.permission)
|
||||||
|
.options(joinedload(ACL.resources))
|
||||||
|
.where(ACL.principal_id.in_([user.id, *user_group_ids]))
|
||||||
|
.where(ACL.permission.has(name = permission_type))
|
||||||
|
)
|
||||||
|
acls = result.unique().scalars().all()
|
||||||
|
resource_ids = [str(resource.resource_id) for acl in acls for resource in acl.resources]
|
||||||
|
has_permissions = all(document_id in resource_ids for document_id in document_ids)
|
||||||
|
|
||||||
|
if not has_permissions:
|
||||||
|
raise PermissionDeniedException(f"User {user.username} does not have {permission_type} permission on documents")
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
logger.error("Error checking permissions on documents: %s", str(error))
|
logger.error("Error checking permissions on documents: %s", str(error))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ async def get_document_ids_for_user(user_id: UUID) -> list[str]:
|
||||||
ACL.permission.name == "read",
|
ACL.permission.name == "read",
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
document_ids = [row[0] for row in result.fetchall()]
|
document_ids = [row[0] for row in result.scalars().all()]
|
||||||
return document_ids
|
return document_ids
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,18 +12,18 @@ async def give_permission_on_document(
|
||||||
document_resource = Resource(resource_id = document_id)
|
document_resource = Resource(resource_id = document_id)
|
||||||
|
|
||||||
async with db_engine.get_async_session() as session:
|
async with db_engine.get_async_session() as session:
|
||||||
permission = (await session.execute(select(Permission).filter(Permission.name == permission_name))).first()
|
permission = (await session.execute(select(Permission).filter(Permission.name == permission_name))).scalars().first()
|
||||||
|
|
||||||
if permission is None:
|
if permission is None:
|
||||||
permission = Permission(name = permission_name)
|
permission = Permission(name = permission_name)
|
||||||
|
|
||||||
acl = ACL(principal_id = user.id)
|
acl = ACL(principal_id = user.id)
|
||||||
acl.permission = permission
|
acl.permission = permission
|
||||||
acl.resources.append(document_resource)
|
acl.resources.append(document_resource)
|
||||||
|
|
||||||
session.add(acl)
|
session.add(acl)
|
||||||
|
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
|
|
||||||
# if user.is_superuser:
|
# if user.is_superuser:
|
||||||
|
|
|
||||||
|
|
@ -27,13 +27,13 @@ async def main():
|
||||||
# In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
|
# In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
|
||||||
# """
|
# """
|
||||||
|
|
||||||
text = """A large language model (LLM) is a language model notable for its ability to achieve general-purpose language generation and other natural language processing tasks such as classification. LLMs acquire these abilities by learning statistical relationships from text documents during a computationally intensive self-supervised and semi-supervised training process. LLMs can be used for text generation, a form of generative AI, by taking an input text and repeatedly predicting the next token or word.
|
# text = """A large language model (LLM) is a language model notable for its ability to achieve general-purpose language generation and other natural language processing tasks such as classification. LLMs acquire these abilities by learning statistical relationships from text documents during a computationally intensive self-supervised and semi-supervised training process. LLMs can be used for text generation, a form of generative AI, by taking an input text and repeatedly predicting the next token or word.
|
||||||
LLMs are artificial neural networks. The largest and most capable, as of March 2024, are built with a decoder-only transformer-based architecture while some recent implementations are based on other architectures, such as recurrent neural network variants and Mamba (a state space model).
|
# LLMs are artificial neural networks. The largest and most capable, as of March 2024, are built with a decoder-only transformer-based architecture while some recent implementations are based on other architectures, such as recurrent neural network variants and Mamba (a state space model).
|
||||||
Up to 2020, fine tuning was the only way a model could be adapted to be able to accomplish specific tasks. Larger sized models, such as GPT-3, however, can be prompt-engineered to achieve similar results.[6] They are thought to acquire knowledge about syntax, semantics and "ontology" inherent in human language corpora, but also inaccuracies and biases present in the corpora.
|
# Up to 2020, fine tuning was the only way a model could be adapted to be able to accomplish specific tasks. Larger sized models, such as GPT-3, however, can be prompt-engineered to achieve similar results.[6] They are thought to acquire knowledge about syntax, semantics and "ontology" inherent in human language corpora, but also inaccuracies and biases present in the corpora.
|
||||||
Some notable LLMs are OpenAI's GPT series of models (e.g., GPT-3.5 and GPT-4, used in ChatGPT and Microsoft Copilot), Google's PaLM and Gemini (the latter of which is currently used in the chatbot of the same name), xAI's Grok, Meta's LLaMA family of open-source models, Anthropic's Claude models, Mistral AI's open source models, and Databricks' open source DBRX.
|
# Some notable LLMs are OpenAI's GPT series of models (e.g., GPT-3.5 and GPT-4, used in ChatGPT and Microsoft Copilot), Google's PaLM and Gemini (the latter of which is currently used in the chatbot of the same name), xAI's Grok, Meta's LLaMA family of open-source models, Anthropic's Claude models, Mistral AI's open source models, and Databricks' open source DBRX.
|
||||||
"""
|
# """
|
||||||
|
|
||||||
await cognee.add([text], dataset_name)
|
# await cognee.add([text], dataset_name)
|
||||||
|
|
||||||
await cognee.cognify([dataset_name])
|
await cognee.cognify([dataset_name])
|
||||||
|
|
||||||
|
|
@ -42,27 +42,27 @@ async def main():
|
||||||
random_node = (await vector_engine.search("entities", "AI"))[0]
|
random_node = (await vector_engine.search("entities", "AI"))[0]
|
||||||
random_node_name = random_node.payload["name"]
|
random_node_name = random_node.payload["name"]
|
||||||
|
|
||||||
search_results = await cognee.search("SIMILARITY", { "query": random_node_name })
|
search_results = await cognee.search("SIMILARITY", params = { "query": random_node_name })
|
||||||
assert len(search_results) != 0, "The search results list is empty."
|
assert len(search_results) != 0, "The search results list is empty."
|
||||||
print("\n\nExtracted sentences are:\n")
|
print("\n\nExtracted sentences are:\n")
|
||||||
for result in search_results:
|
for result in search_results:
|
||||||
print(f"{result}\n")
|
print(f"{result}\n")
|
||||||
|
|
||||||
search_results = await cognee.search("TRAVERSE", { "query": random_node_name })
|
search_results = await cognee.search("TRAVERSE", params = { "query": random_node_name })
|
||||||
assert len(search_results) != 0, "The search results list is empty."
|
assert len(search_results) != 0, "The search results list is empty."
|
||||||
print("\n\nExtracted sentences are:\n")
|
print("\n\nExtracted sentences are:\n")
|
||||||
for result in search_results:
|
for result in search_results:
|
||||||
print(f"{result}\n")
|
print(f"{result}\n")
|
||||||
|
|
||||||
search_results = await cognee.search("SUMMARY", { "query": random_node_name })
|
search_results = await cognee.search("SUMMARY", params = { "query": random_node_name })
|
||||||
assert len(search_results) != 0, "Query related summaries don't exist."
|
assert len(search_results) != 0, "Query related summaries don't exist."
|
||||||
print("\n\nQuery related summaries exist:\n")
|
print("\n\nQuery related summaries exist:\n")
|
||||||
for result in search_results:
|
for result in search_results:
|
||||||
print(f"{result}\n")
|
print(f"{result}\n")
|
||||||
|
|
||||||
search_results = await cognee.search("ADJACENT", { "query": random_node_name })
|
search_results = await cognee.search("ADJACENT", params = { "query": random_node_name })
|
||||||
assert len(search_results) != 0, "Large language model query found no neighbours."
|
assert len(search_results) != 0, "Large language model query found no neighbours."
|
||||||
print("\n\Large language model query found neighbours.\n")
|
print("\n\nLarge language model query found neighbours.\n")
|
||||||
for result in search_results:
|
for result in search_results:
|
||||||
print(f"{result}\n")
|
print(f"{result}\n")
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue