refactor: Add routers for datasets and add endpoints

Added routers for datasets and add endpoints

Refactor #COG-334
This commit is contained in:
Igor Ilic 2024-10-24 17:56:15 +02:00
parent f8f2746603
commit 4fd4651471
5 changed files with 221 additions and 189 deletions

View file

@ -1,23 +1,11 @@
""" FastAPI server for the Cognee API. """
from datetime import datetime
import os
from uuid import UUID
import aiohttp
import uvicorn
import logging
import sentry_sdk
from typing import List, Optional
from typing_extensions import Annotated
from fastapi import FastAPI, HTTPException, Form, UploadFile, Query, Depends
from fastapi.responses import JSONResponse, FileResponse, Response
from fastapi import FastAPI
from fastapi.responses import JSONResponse, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from cognee.api.DTO import OutDTO
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user
from cognee.modules.pipelines.models import PipelineRunStatus
# Set up logging
logging.basicConfig(
@ -66,8 +54,10 @@ from cognee.api.v1.users.routers import get_auth_router, get_register_router,\
get_reset_password_router, get_verify_router, get_users_router
from cognee.api.v1.permissions.routers import get_permissions_router
from cognee.api.v1.settings.routers import get_settings_router
from cognee.api.v1.datasets.routers import get_datasets_router
from cognee.api.v1.cognify.routers import get_cognify_router
from cognee.api.v1.search.routers import get_search_router
from cognee.api.v1.add.routers import get_add_router
from fastapi import Request
from fastapi.encoders import jsonable_encoder
@ -137,182 +127,17 @@ def health_check():
"""
return Response(status_code = 200)
app.include_router(
get_datasets_router(),
prefix="/api/v1/datasets",
tags=["datasets"]
)
class ErrorResponseDTO(BaseModel):
message: str
class DatasetDTO(OutDTO):
id: UUID
name: str
created_at: datetime
updated_at: Optional[datetime]
owner_id: UUID
@app.get("/api/v1/datasets", response_model = list[DatasetDTO])
async def get_datasets(user: User = Depends(get_authenticated_user)):
try:
from cognee.modules.data.methods import get_datasets
datasets = await get_datasets(user.id)
return datasets
except Exception as error:
logger.error(f"Error retrieving datasets: {str(error)}")
raise HTTPException(status_code = 500, detail = f"Error retrieving datasets: {str(error)}") from error
@app.delete("/api/v1/datasets/{dataset_id}", response_model = None, responses = { 404: { "model": ErrorResponseDTO }})
async def delete_dataset(dataset_id: str, user: User = Depends(get_authenticated_user)):
from cognee.modules.data.methods import get_dataset, delete_dataset
dataset = await get_dataset(user.id, dataset_id)
if dataset is None:
raise HTTPException(
status_code = 404,
detail = f"Dataset ({dataset_id}) not found."
)
await delete_dataset(dataset)
@app.get("/api/v1/datasets/{dataset_id}/graph", response_model = str)
async def get_dataset_graph(dataset_id: str, user: User = Depends(get_authenticated_user)):
from cognee.shared.utils import render_graph
from cognee.infrastructure.databases.graph import get_graph_engine
try:
graph_client = await get_graph_engine()
graph_url = await render_graph(graph_client.graph)
return JSONResponse(
status_code = 200,
content = str(graph_url),
)
except:
return JSONResponse(
status_code = 409,
content = "Graphistry credentials are not set. Please set them in your .env file.",
)
class DataDTO(OutDTO):
id: UUID
name: str
created_at: datetime
updated_at: Optional[datetime]
extension: str
mime_type: str
raw_data_location: str
@app.get("/api/v1/datasets/{dataset_id}/data", response_model = list[DataDTO], responses = { 404: { "model": ErrorResponseDTO }})
async def get_dataset_data(dataset_id: str, user: User = Depends(get_authenticated_user)):
from cognee.modules.data.methods import get_dataset_data, get_dataset
dataset = await get_dataset(user.id, dataset_id)
if dataset is None:
return JSONResponse(
status_code = 404,
content = ErrorResponseDTO(f"Dataset ({dataset_id}) not found."),
)
dataset_data = await get_dataset_data(dataset_id = dataset.id)
if dataset_data is None:
return []
return dataset_data
@app.get("/api/v1/datasets/status", response_model = dict[str, PipelineRunStatus])
async def get_dataset_status(datasets: Annotated[List[str], Query(alias="dataset")] = None, user: User = Depends(get_authenticated_user)):
from cognee.api.v1.datasets.datasets import datasets as cognee_datasets
try:
datasets_statuses = await cognee_datasets.get_status(datasets)
return datasets_statuses
except Exception as error:
return JSONResponse(
status_code = 409,
content = {"error": str(error)}
)
@app.get("/api/v1/datasets/{dataset_id}/data/{data_id}/raw", response_class = FileResponse)
async def get_raw_data(dataset_id: str, data_id: str, user: User = Depends(get_authenticated_user)):
from cognee.modules.data.methods import get_dataset, get_dataset_data
dataset = await get_dataset(user.id, dataset_id)
if dataset is None:
return JSONResponse(
status_code = 404,
content = {
"detail": f"Dataset ({dataset_id}) not found."
}
)
dataset_data = await get_dataset_data(dataset.id)
if dataset_data is None:
raise HTTPException(status_code = 404, detail = f"Dataset ({dataset_id}) not found.")
data = [data for data in dataset_data if str(data.id) == data_id][0]
if data is None:
return JSONResponse(
status_code = 404,
content = {
"detail": f"Data ({data_id}) not found in dataset ({dataset_id})."
}
)
return data.raw_data_location
@app.post("/api/v1/add", response_model = None)
async def add(
data: List[UploadFile],
datasetId: str = Form(...),
user: User = Depends(get_authenticated_user),
):
""" This endpoint is responsible for adding data to the graph."""
from cognee.api.v1.add import add as cognee_add
try:
if isinstance(data, str) and data.startswith("http"):
if "github" in data:
# Perform git clone if the URL is from GitHub
repo_name = data.split("/")[-1].replace(".git", "")
os.system(f"git clone {data} .data/{repo_name}")
await cognee_add(
"data://.data/",
f"{repo_name}",
)
else:
# Fetch and store the data from other types of URL using curl
async with aiohttp.ClientSession() as session:
async with session.get(data) as resp:
if resp.status == 200:
file_data = await resp.read()
with open(f".data/{data.split('/')[-1]}", "wb") as f:
f.write(file_data)
await cognee_add(
"data://.data/",
f"{data.split('/')[-1]}",
)
else:
await cognee_add(
data,
datasetId,
user = user,
)
except Exception as error:
return JSONResponse(
status_code = 409,
content = {"error": str(error)}
)
app.include_router(
get_add_router(),
prefix="/api/v1/add",
tags=["add"]
)
app.include_router(
get_cognify_router(),

View file

@ -0,0 +1 @@
from .get_add_router import get_add_router

View file

@ -0,0 +1,56 @@
from fastapi import Form, UploadFile, Depends
from fastapi.responses import JSONResponse
from fastapi import APIRouter
from typing import List
import aiohttp
import os
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user
def get_add_router():
router = APIRouter()
@router.post("/", response_model=None)
async def add(
data: List[UploadFile],
datasetId: str = Form(...),
user: User = Depends(get_authenticated_user),
):
""" This endpoint is responsible for adding data to the graph."""
from cognee.api.v1.add import add as cognee_add
try:
if isinstance(data, str) and data.startswith("http"):
if "github" in data:
# Perform git clone if the URL is from GitHub
repo_name = data.split("/")[-1].replace(".git", "")
os.system(f"git clone {data} .data/{repo_name}")
await cognee_add(
"data://.data/",
f"{repo_name}",
)
else:
# Fetch and store the data from other types of URL using curl
async with aiohttp.ClientSession() as session:
async with session.get(data) as resp:
if resp.status == 200:
file_data = await resp.read()
with open(f".data/{data.split('/')[-1]}", "wb") as f:
f.write(file_data)
await cognee_add(
"data://.data/",
f"{data.split('/')[-1]}",
)
else:
await cognee_add(
data,
datasetId,
user=user,
)
except Exception as error:
return JSONResponse(
status_code=409,
content={"error": str(error)}
)
return router

View file

@ -0,0 +1 @@
from .get_datasets_router import get_datasets_router

View file

@ -0,0 +1,149 @@
import logging
from fastapi import APIRouter
from datetime import datetime
from uuid import UUID
from typing import List, Optional
from typing_extensions import Annotated
from fastapi import HTTPException, Query, Depends
from fastapi.responses import JSONResponse, FileResponse
from pydantic import BaseModel
from cognee.api.DTO import OutDTO
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user
from cognee.modules.pipelines.models import PipelineRunStatus
def get_datasets_router():
logger = logging.getLogger(__name__)
router = APIRouter()
class ErrorResponseDTO(BaseModel):
message: str
class DatasetDTO(OutDTO):
id: UUID
name: str
created_at: datetime
updated_at: Optional[datetime]
owner_id: UUID
@router.get("/", response_model=list[DatasetDTO])
async def get_datasets(user: User = Depends(get_authenticated_user)):
try:
from cognee.modules.data.methods import get_datasets
datasets = await get_datasets(user.id)
return datasets
except Exception as error:
logger.error(f"Error retrieving datasets: {str(error)}")
raise HTTPException(status_code=500, detail=f"Error retrieving datasets: {str(error)}") from error
@router.delete("/{dataset_id}", response_model=None, responses={404: {"model": ErrorResponseDTO}})
async def delete_dataset(dataset_id: str, user: User = Depends(get_authenticated_user)):
from cognee.modules.data.methods import get_dataset, delete_dataset
dataset = await get_dataset(user.id, dataset_id)
if dataset is None:
raise HTTPException(
status_code=404,
detail=f"Dataset ({dataset_id}) not found."
)
await delete_dataset(dataset)
@router.get("/{dataset_id}/graph", response_model=str)
async def get_dataset_graph(dataset_id: str, user: User = Depends(get_authenticated_user)):
from cognee.shared.utils import render_graph
from cognee.infrastructure.databases.graph import get_graph_engine
try:
graph_client = await get_graph_engine()
graph_url = await render_graph(graph_client.graph)
return JSONResponse(
status_code=200,
content=str(graph_url),
)
except:
return JSONResponse(
status_code=409,
content="Graphistry credentials are not set. Please set them in your .env file.",
)
class DataDTO(OutDTO):
id: UUID
name: str
created_at: datetime
updated_at: Optional[datetime]
extension: str
mime_type: str
raw_data_location: str
@router.get("/{dataset_id}/data", response_model=list[DataDTO],
responses={404: {"model": ErrorResponseDTO}})
async def get_dataset_data(dataset_id: str, user: User = Depends(get_authenticated_user)):
from cognee.modules.data.methods import get_dataset_data, get_dataset
dataset = await get_dataset(user.id, dataset_id)
if dataset is None:
return JSONResponse(
status_code=404,
content=ErrorResponseDTO(f"Dataset ({dataset_id}) not found."),
)
dataset_data = await get_dataset_data(dataset_id=dataset.id)
if dataset_data is None:
return []
return dataset_data
@router.get("/status", response_model=dict[str, PipelineRunStatus])
async def get_dataset_status(datasets: Annotated[List[str], Query(alias="dataset")] = None,
user: User = Depends(get_authenticated_user)):
from cognee.api.v1.datasets.datasets import datasets as cognee_datasets
try:
datasets_statuses = await cognee_datasets.get_status(datasets)
return datasets_statuses
except Exception as error:
return JSONResponse(
status_code=409,
content={"error": str(error)}
)
@router.get("/{dataset_id}/data/{data_id}/raw", response_class=FileResponse)
async def get_raw_data(dataset_id: str, data_id: str, user: User = Depends(get_authenticated_user)):
from cognee.modules.data.methods import get_dataset, get_dataset_data
dataset = await get_dataset(user.id, dataset_id)
if dataset is None:
return JSONResponse(
status_code=404,
content={
"detail": f"Dataset ({dataset_id}) not found."
}
)
dataset_data = await get_dataset_data(dataset.id)
if dataset_data is None:
raise HTTPException(status_code=404, detail=f"Dataset ({dataset_id}) not found.")
data = [data for data in dataset_data if str(data.id) == data_id][0]
if data is None:
return JSONResponse(
status_code=404,
content={
"detail": f"Data ({data_id}) not found in dataset ({dataset_id})."
}
)
return data.raw_data_location
return router