Merge pull request #123 from topoteretes/COG-206

added boilerplate for user management
This commit is contained in:
Vasilije 2024-08-06 17:58:32 +02:00 committed by GitHub
commit 80e4f298fc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
115 changed files with 2827 additions and 1415 deletions

View file

@ -17,28 +17,22 @@ jobs:
uses: actions/checkout@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
uses: docker/setup-buildx-action@v3
- name: Build Docker images
run: |
docker-compose -f docker-compose.yml build
docker compose -f docker-compose.yml build
- name: Run Docker Compose
run: |
docker-compose -f docker-compose.yml up -d
docker compose -f docker-compose.yml up -d
- name: Wait for services to be ready
run: |
# Add any necessary health checks or wait commands
sleep 30
# - name: Run tests
# run: |
# docker-compose -f docker-compose.yml run --rm <test-service> <test-command>
# # Replace <test-service> with the name of the service running the tests
# # Replace <test-command> with the actual command to run your tests
- name: Shut down Docker Compose
if: always()
run: |
docker-compose -f docker-compose.yml down
docker compose -f docker-compose.yml down

View file

@ -18,6 +18,30 @@ jobs:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml
setup_docker:
name: Set up Docker Buildx
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
# os: ["ubuntu-latest", "macos-latest"]
os: ["ubuntu-latest"]
steps:
# - name: Install Docker
# run: |
# HOMEBREW_NO_AUTO_UPDATE=1 brew install --cask docker
# sudo /Applications/Docker.app/Contents/MacOS/Docker --unattended --install-privileged-components
# open -a /Applications/Docker.app --args --unattended --accept-license
# echo "We are waiting for Docker to be up and running. It can take over 2 minutes..."
# while ! /Applications/Docker.app/Contents/Resources/bin/docker info &>/dev/null; do sleep 1; done
# if: runner.os == 'macOS'
# - name: Set up QEMU
# uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
run_common:
name: test
needs: get_docs_changes
@ -25,7 +49,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest", "macos-latest"] #, "windows-latest"
os: ["ubuntu-latest"] #, "windows-latest", "macos-latest"
python-version: ["3.11.x"]
# Test all python versions on ubuntu only
include:
@ -40,6 +64,18 @@ jobs:
run:
shell: bash
services:
postgres:
image: postgres:latest
env:
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: cognee_db
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- 5432:5432
runs-on: ${{ matrix.os }}
steps:
@ -79,6 +115,12 @@ jobs:
mkdir .cognee_system
echo $(pwd)/.cognee_system
- name: Wait for PostgreSQL to be ready
run: |
echo "Waiting for PostgreSQL to be ready..."
until pg_isready -h localhost -p 5432 -U cognee; do
sleep 1
done
- name: Run tests
run: poetry run pytest tests/
@ -88,6 +130,15 @@ jobs:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
LLM_API_KEY: ${{ secrets.OPENAI_API_KEY }}
ENV: 'dev'
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: cognee_db
POSTGRES_HOST: localhost
POSTGRES_PORT: 5432
DESTINATION__POSTGRES__CREDENTIALS__HOST: 127.0.0.1
DESTINATION__POSTGRES__CREDENTIALS__USERNAME: cognee
DESTINATION__POSTGRES__CREDENTIALS__PASSWORD: cognee
DESTINATION__POSTGRES__CREDENTIALS__DATABASE: cognee_db
run: poetry run python ./cognee/tests/test_library.py
- name: Clean up disk space

View file

@ -18,16 +18,35 @@ jobs:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml
setup_docker:
name: Set up Docker Buildx
runs-on: ubuntu-latest
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
run_neo4j_integration_test:
name: test
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
runs-on: macos-latest
runs-on: ubuntu-latest
defaults:
run:
shell: bash
services:
postgres:
image: postgres:latest
env:
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: cognee_db
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- 5432:5432
steps:
- name: Check out
uses: actions/checkout@v2
@ -52,6 +71,13 @@ jobs:
mkdir .cognee_system
echo $(pwd)/.cognee_system
- name: Wait for PostgreSQL to be ready
run: |
echo "Waiting for PostgreSQL to be ready..."
until pg_isready -h localhost -p 5432 -U cognee; do
sleep 1
done
- name: Run default Neo4j
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
@ -60,4 +86,13 @@ jobs:
GRAPH_DATABASE_PASSWORD: ${{ secrets.NEO4J_API_KEY }}
GRAPH_DATABASE_USERNAME: "neo4j"
ENV: 'dev'
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: cognee_db
POSTGRES_HOST: localhost
POSTGRES_PORT: 5432
DESTINATION__POSTGRES__CREDENTIALS__HOST: 127.0.0.1
DESTINATION__POSTGRES__CREDENTIALS__USERNAME: cognee
DESTINATION__POSTGRES__CREDENTIALS__PASSWORD: cognee
DESTINATION__POSTGRES__CREDENTIALS__DATABASE: cognee_db
run: poetry run python ./cognee/tests/test_neo4j.py

View file

@ -18,16 +18,35 @@ jobs:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml
setup_docker:
name: Set up Docker Buildx
runs-on: ubuntu-latest
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
run_qdrant_integration_test:
name: test
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
runs-on: macos-latest
runs-on: ubuntu-latest
defaults:
run:
shell: bash
services:
postgres:
image: postgres:latest
env:
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: cognee_db
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- 5432:5432
steps:
- name: Check out
uses: actions/checkout@v2
@ -52,6 +71,13 @@ jobs:
mkdir .cognee_system
echo $(pwd)/.cognee_system
- name: Wait for PostgreSQL to be ready
run: |
echo "Waiting for PostgreSQL to be ready..."
until pg_isready -h localhost -p 5432 -U cognee; do
sleep 1
done
- name: Run default Qdrant
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
@ -59,4 +85,13 @@ jobs:
VECTOR_DB_URL: ${{ secrets.QDRANT_API_URL }}
VECTOR_DB_KEY: ${{ secrets.QDRANT_API_KEY }}
ENV: 'dev'
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: cognee_db
POSTGRES_HOST: localhost
POSTGRES_PORT: 5432
DESTINATION__POSTGRES__CREDENTIALS__HOST: 127.0.0.1
DESTINATION__POSTGRES__CREDENTIALS__USERNAME: cognee
DESTINATION__POSTGRES__CREDENTIALS__PASSWORD: cognee
DESTINATION__POSTGRES__CREDENTIALS__DATABASE: cognee_db
run: poetry run python ./cognee/tests/test_qdrant.py

View file

@ -18,16 +18,35 @@ jobs:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml
setup_docker:
name: Set up Docker Buildx
runs-on: ubuntu-latest
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
run_weaviate_integration_test:
name: test
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
runs-on: macos-latest
runs-on: ubuntu-latest
defaults:
run:
shell: bash
services:
postgres:
image: postgres:latest
env:
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: cognee_db
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- 5432:5432
steps:
- name: Check out
uses: actions/checkout@v2
@ -52,6 +71,13 @@ jobs:
mkdir .cognee_system
echo $(pwd)/.cognee_system
- name: Wait for PostgreSQL to be ready
run: |
echo "Waiting for PostgreSQL to be ready..."
until pg_isready -h localhost -p 5432 -U cognee; do
sleep 1
done
- name: Run default Weaviate
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
@ -59,4 +85,13 @@ jobs:
VECTOR_DB_URL: ${{ secrets.WEAVIATE_API_URL }}
VECTOR_DB_KEY: ${{ secrets.WEAVIATE_API_KEY }}
ENV: 'dev'
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: cognee_db
POSTGRES_HOST: localhost
POSTGRES_PORT: 5432
DESTINATION__POSTGRES__CREDENTIALS__HOST: 127.0.0.1
DESTINATION__POSTGRES__CREDENTIALS__USERNAME: cognee
DESTINATION__POSTGRES__CREDENTIALS__PASSWORD: cognee
DESTINATION__POSTGRES__CREDENTIALS__DATABASE: cognee_db
run: poetry run python ./cognee/tests/test_weaviate.py

View file

@ -433,8 +433,9 @@ disable=raw-checker-failed,
use-implicit-booleaness-not-comparison-to-zero,
missing-module-docstring,
missing-function-docstring,
missing-class-docstring
missing-class-docstring,
relative-beyond-top-level
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option

View file

@ -48,15 +48,15 @@ export default function Home() {
});
}, [showNotification])
const onDatasetCognify = useCallback((dataset: { id: string }) => {
showNotification(`Cognification started for dataset "${dataset.id}".`, 5000);
const onDatasetCognify = useCallback((dataset: { id: string, name: string }) => {
showNotification(`Cognification started for dataset "${dataset.name}".`, 5000);
return cognifyDataset(dataset)
.then(() => {
showNotification(`Dataset "${dataset.id}" cognified.`, 5000);
showNotification(`Dataset "${dataset.name}" cognified.`, 5000);
})
.catch(() => {
showNotification(`Dataset "${dataset.id}" cognification failed. Please try again.`, 5000);
showNotification(`Dataset "${dataset.name}" cognification failed. Please try again.`, 5000);
});
}, [showNotification]);

View file

@ -1,5 +1,5 @@
export default function cognifyDataset(dataset: { id: string }) {
return fetch('http://0.0.0.0:8000/cognify', {
export default function cognifyDataset(dataset: { id: string, name: string }) {
return fetch('http://127.0.0.1:8000/cognify', {
method: 'POST',
headers: {
'Content-Type': 'application/json',

View file

@ -1,5 +1,5 @@
export default function deleteDataset(dataset: { id: string }) {
return fetch(`http://0.0.0.0:8000/datasets/${dataset.id}`, {
return fetch(`http://127.0.0.1:8000/datasets/${dataset.id}`, {
method: 'DELETE',
})
}

View file

@ -1,4 +1,4 @@
export default function getDatasetData(dataset: { id: string }) {
return fetch(`http://0.0.0.0:8000/datasets/${dataset.id}/data`)
return fetch(`http://127.0.0.1:8000/datasets/${dataset.id}/data`)
.then((response) => response.json());
}

View file

@ -1,5 +1,5 @@
export default function getExplorationGraphUrl(dataset: { id: string }) {
return fetch(`http://0.0.0.0:8000/datasets/${dataset.id}/graph`)
return fetch(`http://127.0.0.1:8000/datasets/${dataset.id}/graph`)
.then(async (response) => {
if (response.status !== 200) {
throw new Error((await response.text()).replaceAll("\"", ""));

View file

@ -12,6 +12,7 @@
}
.dataTable {
color: white;
border-collapse: collapse;
}
.dataTable td, .dataTable th {

View file

@ -13,8 +13,9 @@ import RawDataPreview from './RawDataPreview';
export interface Data {
id: string;
name: string;
filePath: string;
mimeType: string;
extension: string;
rawDataLocation: string;
}
interface DatasetLike {
@ -36,7 +37,7 @@ export default function DataView({ datasetId, data, onClose, onDataAdd }: DataVi
const showRawData = useCallback((dataItem: Data) => {
setSelectedData(dataItem);
fetch(`http://0.0.0.0:8000/datasets/${datasetId}/data/${dataItem.id}/raw`)
fetch(`http://127.0.0.1:8000/datasets/${datasetId}/data/${dataItem.id}/raw`)
.then((response) => response.arrayBuffer())
.then(setRawData);
@ -80,7 +81,6 @@ export default function DataView({ datasetId, data, onClose, onDataAdd }: DataVi
<th>Name</th>
<th>File path</th>
<th>MIME type</th>
<th>Keywords</th>
</tr>
</thead>
<tbody>
@ -104,10 +104,10 @@ export default function DataView({ datasetId, data, onClose, onDataAdd }: DataVi
<Text>{dataItem.id}</Text>
</td>
<td>
<Text>{dataItem.name}</Text>
<Text>{dataItem.name}.{dataItem.extension}</Text>
</td>
<td>
<Text>{dataItem.filePath}</Text>
<Text>{dataItem.rawDataLocation}</Text>
</td>
<td>
<Text>{dataItem.mimeType}</Text>

View file

@ -40,7 +40,7 @@ export default function DatasetsView({
.finally(() => enableCognifyRun());
}
const [dataset, setExplorationDataset] = useState<{ id: string } | null>(null);
const [dataset, setExplorationDataset] = useState<{ id: string, name: string } | null>(null);
const {
value: isExplorationWindowShown,
setTrue: showExplorationWindow,
@ -97,7 +97,7 @@ export default function DatasetsView({
</Stack>
<Modal onClose={hideExplorationWindow} isOpen={isExplorationWindowShown} className={styles.explorerModal}>
<Spacer horizontal="2" vertical="3" wrap>
<Text>{dataset?.id}</Text>
<Text>{dataset?.name}</Text>
</Spacer>
<Explorer dataset={dataset!} />
</Modal>

View file

@ -5,7 +5,7 @@ export default function addData(dataset: { id: string }, files: File[]) {
})
formData.append('datasetId', dataset.id);
return fetch('http://0.0.0.0:8000/add', {
return fetch('http://127.0.0.1:8000/add', {
method: 'POST',
body: formData,
}).then((response) => response.json());

View file

@ -14,7 +14,7 @@ function useDatasets() {
const statusTimeout = useRef<any>(null);
const fetchDatasetStatuses = useCallback((datasets: Dataset[]) => {
fetch(`http://0.0.0.0:8000/datasets/status?dataset=${datasets.map(d => d.id).join('&dataset=')}`)
fetch(`http://127.0.0.1:8000/datasets/status?dataset=${datasets.map(d => d.id).join('&dataset=')}`)
.then((response) => response.json())
.then((statuses) => setDatasets(
(datasets) => (
@ -65,9 +65,8 @@ function useDatasets() {
}, []);
const fetchDatasets = useCallback(() => {
fetch('http://0.0.0.0:8000/datasets')
fetch('http://127.0.0.1:8000/datasets')
.then((response) => response.json())
.then((datasets) => datasets.map((dataset: string) => ({ id: dataset, name: dataset })))
.then((datasets) => {
setDatasets(datasets);

View file

@ -7,7 +7,7 @@ import styles from './SearchView.module.css';
interface Message {
id: string;
user: 'user' | 'system';
text: string;
text: any;
}
interface SelectOption {
@ -98,7 +98,9 @@ export default function SearchView() {
[styles.userMessage]: message.user === "user",
})}
>
{message.text}
{message?.text && (
typeof(message.text) == "string" ? message.text : JSON.stringify(message.text)
)}
</Text>
))}
</Stack>

View file

@ -75,7 +75,7 @@ export default function Settings({ onDone = () => {}, submitButtonText = 'Save'
startSaving();
fetch('http://0.0.0.0:8000/settings', {
fetch('http://127.0.0.1:8000/settings', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
@ -138,7 +138,7 @@ export default function Settings({ onDone = () => {}, submitButtonText = 'Save'
useEffect(() => {
const fetchConfig = async () => {
const response = await fetch('http://0.0.0.0:8000/settings');
const response = await fetch('http://127.0.0.1:8000/settings');
const settings = await response.json();
if (!settings.llm.model) {

View file

@ -3,7 +3,6 @@ import os
import aiohttp
import uvicorn
import json
import asyncio
import logging
import sentry_sdk
from typing import Dict, Any, List, Union, Optional, Literal
@ -13,6 +12,8 @@ from fastapi.responses import JSONResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from cognee.infrastructure.databases.relational import create_db_and_tables
# Set up logging
logging.basicConfig(
level=logging.INFO, # Set the logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL)
@ -27,9 +28,18 @@ if os.getenv("ENV") == "prod":
profiles_sample_rate = 1.0,
)
app = FastAPI(debug = os.getenv("ENV") != "prod")
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Not needed if you setup a migration system like Alembic
await create_db_and_tables()
yield
app = FastAPI(debug = os.getenv("ENV") != "prod", lifespan = lifespan)
origins = [
"http://127.0.0.1:3000",
"http://frontend:3000",
"http://localhost:3000",
"http://localhost:3001",
@ -43,6 +53,47 @@ app.add_middleware(
allow_headers=["*"],
)
from cognee.api.v1.users.routers import get_auth_router, get_register_router,\
get_reset_password_router, get_verify_router, get_users_router
from cognee.api.v1.permissions.get_permissions_router import get_permissions_router
app.include_router(
get_auth_router(),
prefix = "/auth/jwt",
tags = ["auth"]
)
app.include_router(
get_register_router(),
prefix = "/auth",
tags = ["auth"],
)
app.include_router(
get_reset_password_router(),
prefix = "/auth",
tags = ["auth"],
)
app.include_router(
get_verify_router(),
prefix = "/auth",
tags = ["auth"],
)
app.include_router(
get_users_router(),
prefix = "/users",
tags = ["users"],
)
app.include_router(
get_permissions_router(),
prefix = "/permissions",
tags = ["permissions"],
)
@app.get("/")
async def root():
"""
@ -57,21 +108,27 @@ def health_check():
"""
return {"status": "OK"}
class Payload(BaseModel):
payload: Dict[str, Any]
@app.get("/datasets", response_model=list)
@app.get("/datasets", response_model = list)
async def get_datasets():
from cognee.api.v1.datasets.datasets import datasets
return datasets.list_datasets()
try:
from cognee.api.v1.datasets.datasets import datasets
datasets = await datasets.list_datasets()
@app.delete("/datasets/{dataset_id}", response_model=dict)
return JSONResponse(
status_code = 200,
content = [dataset.to_json() for dataset in datasets],
)
except Exception as error:
raise HTTPException(status_code = 500, detail=f"Error retrieving datasets: {str(error)}") from error
@app.delete("/datasets/{dataset_id}", response_model = dict)
async def delete_dataset(dataset_id: str):
from cognee.api.v1.datasets.datasets import datasets
datasets.delete_dataset(dataset_id)
await datasets.delete_dataset(dataset_id)
return JSONResponse(
status_code=200,
content="OK",
status_code = 200,
content = "OK",
)
@app.get("/datasets/{dataset_id}/graph", response_model=list)
@ -96,37 +153,43 @@ async def get_dataset_graph(dataset_id: str):
@app.get("/datasets/{dataset_id}/data", response_model=list)
async def get_dataset_data(dataset_id: str):
from cognee.api.v1.datasets.datasets import datasets
dataset_data = datasets.list_data(dataset_id)
dataset_data = await datasets.list_data(dataset_id = dataset_id)
if dataset_data is None:
raise HTTPException(status_code=404, detail=f"Dataset ({dataset_id}) not found.")
raise HTTPException(status_code = 404, detail = f"Dataset ({dataset_id}) not found.")
return [
dict(
id=data["id"],
name=f"{data['name']}.{data['extension']}",
filePath=data["file_path"],
mimeType=data["mime_type"],
)
for data in dataset_data
data.to_json() for data in dataset_data
]
@app.get("/datasets/status", response_model=dict)
async def get_dataset_status(datasets: Annotated[List[str], Query(alias="dataset")] = None):
from cognee.api.v1.datasets.datasets import datasets as cognee_datasets
datasets_statuses = cognee_datasets.get_status(datasets)
return JSONResponse(
status_code = 200,
content = datasets_statuses,
)
try:
datasets_statuses = await cognee_datasets.get_status(datasets)
return JSONResponse(
status_code = 200,
content = datasets_statuses,
)
except Exception as error:
return JSONResponse(
status_code = 409,
content = {"error": str(error)}
)
@app.get("/datasets/{dataset_id}/data/{data_id}/raw", response_class=FileResponse)
async def get_raw_data(dataset_id: str, data_id: str):
from cognee.api.v1.datasets.datasets import datasets
dataset_data = datasets.list_data(dataset_id)
dataset_data = await datasets.list_data(dataset_id)
if dataset_data is None:
raise HTTPException(status_code=404, detail=f"Dataset ({dataset_id}) not found.")
data = [data for data in dataset_data if data["id"] == data_id][0]
return data["file_path"]
raise HTTPException(status_code = 404, detail = f"Dataset ({dataset_id}) not found.")
data = [data for data in dataset_data if str(data.id) == data_id][0]
return data.raw_data_location
class AddPayload(BaseModel):
data: Union[str, UploadFile, List[Union[str, UploadFile]]]
@ -206,24 +269,27 @@ async def search(payload: SearchPayload):
from cognee.api.v1.search import search as cognee_search
try:
search_type = payload.query_params["searchType"]
params = {
"query": payload.query_params["query"],
}
results = await cognee_search(search_type, params)
return JSONResponse(
status_code=200,
content=json.dumps(results)
status_code = 200,
content = results,
)
except Exception as error:
return JSONResponse(
status_code=409,
content={"error": str(error)}
status_code = 409,
content = {"error": str(error)}
)
@app.get("/settings", response_model=dict)
async def get_settings():
from cognee.modules.settings import get_settings
return get_settings()
from cognee.modules.settings import get_settings as get_cognee_settings
return get_cognee_settings()
class LLMConfig(BaseModel):
provider: Union[Literal["openai"], Literal["ollama"], Literal["anthropic"]]
@ -262,10 +328,7 @@ def start_api_server(host: str = "0.0.0.0", port: int = 8000):
try:
logger.info("Starting server at %s:%s", host, port)
from cognee.infrastructure.databases.relational import get_relationaldb_config
relational_config = get_relationaldb_config()
relational_config.create_engine()
import asyncio
from cognee.modules.data.deletion import prune_system, prune_data
asyncio.run(prune_data())
asyncio.run(prune_system(metadata = True))

View file

@ -3,14 +3,21 @@ from os import path
import asyncio
import dlt
import duckdb
import cognee.modules.ingestion as ingestion
from cognee.infrastructure.files.storage import LocalStorage
from cognee.modules.ingestion import get_matched_datasets, save_data_to_file
from cognee.shared.utils import send_telemetry
from cognee.base_config import get_base_config
from cognee.infrastructure.databases.relational.config import get_relationaldb_config
from cognee.infrastructure.databases.relational import get_relational_config, get_relational_engine, create_db_and_tables
from cognee.modules.users.methods import create_default_user, get_default_user
from cognee.modules.users.permissions.methods import give_permission_on_document
from cognee.modules.users.models import User
from cognee.modules.data.operations.ensure_dataset_exists import ensure_dataset_exists
async def add(data: Union[BinaryIO, List[BinaryIO], str, List[str]], dataset_name: str = "main_dataset", user: User = None):
await create_db_and_tables()
async def add(data: Union[BinaryIO, List[BinaryIO], str, List[str]], dataset_name: str = "main_dataset"):
if isinstance(data, str):
if "data://" in data:
# data is a data directory path
@ -44,11 +51,11 @@ async def add(data: Union[BinaryIO, List[BinaryIO], str, List[str]], dataset_nam
file_paths.append(save_data_to_file(data_item, dataset_name))
if len(file_paths) > 0:
return await add_files(file_paths, dataset_name)
return await add_files(file_paths, dataset_name, user)
return []
async def add_files(file_paths: List[str], dataset_name: str):
async def add_files(file_paths: List[str], dataset_name: str, user):
base_config = get_base_config()
data_directory_path = base_config.data_root_directory
@ -69,20 +76,35 @@ async def add_files(file_paths: List[str], dataset_name: str):
else:
processed_file_paths.append(file_path)
relational_config = get_relationaldb_config()
db = duckdb.connect(relational_config.db_file_path)
relational_config = get_relational_config()
destination = dlt.destinations.duckdb(
credentials = db,
)
if relational_config.db_provider == "duckdb":
db = duckdb.connect(relational_config.db_file_path)
destination = dlt.destinations.duckdb(
credentials = db,
)
else:
destination = dlt.destinations.postgres(
credentials = {
"host": relational_config.db_host,
"port": relational_config.db_port,
"user": relational_config.db_user,
"password": relational_config.db_password,
"database": relational_config.db_name,
},
)
pipeline = dlt.pipeline(
pipeline_name = "file_load_from_filesystem",
destination = destination,
)
dataset_name = dataset_name.replace(" ", "_").replace(".", "_") if dataset_name is not None else "main_dataset"
dataset = await ensure_dataset_exists(dataset_name)
@dlt.resource(standalone = True, merge_key = "id")
def data_resources(file_paths: str):
async def data_resources(file_paths: str, user: User):
for file_path in file_paths:
with open(file_path.replace("file://", ""), mode = "rb") as file:
classified_data = ingestion.classify(file)
@ -91,6 +113,35 @@ async def add_files(file_paths: List[str], dataset_name: str):
file_metadata = classified_data.get_metadata()
from sqlalchemy import select
from cognee.modules.data.models import Data
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
data = (await session.execute(
select(Data).filter(Data.id == data_id)
)).scalar_one_or_none()
if data is not None:
data.name = file_metadata["name"]
data.raw_data_location = file_metadata["file_path"]
data.extension = file_metadata["extension"]
data.mime_type = file_metadata["mime_type"]
await session.merge(data)
else:
data = Data(
id = data_id,
name = file_metadata["name"],
raw_data_location = file_metadata["file_path"],
extension = file_metadata["extension"],
mime_type = file_metadata["mime_type"],
)
dataset.data.append(data)
await session.merge(dataset)
await session.commit()
yield {
"id": data_id,
"name": file_metadata["name"],
@ -99,10 +150,20 @@ async def add_files(file_paths: List[str], dataset_name: str):
"mime_type": file_metadata["mime_type"],
}
await give_permission_on_document(user, data_id, "read")
await give_permission_on_document(user, data_id, "write")
if user is None:
user = await get_default_user()
if user is None:
user = await create_default_user()
run_info = pipeline.run(
data_resources(processed_file_paths),
data_resources(processed_file_paths, user),
table_name = "file_metadata",
dataset_name = dataset_name.replace(" ", "_").replace(".", "_") if dataset_name is not None else "main_dataset",
dataset_name = dataset_name,
write_disposition = "merge",
)
send_telemetry("cognee.add")

View file

@ -0,0 +1 @@
from .authenticate_user import authenticate_user

View file

@ -0,0 +1,24 @@
from cognee.infrastructure.databases.relational.user_authentication.users import authenticate_user_method
async def authenticate_user(email: str, password: str):
"""
This function is used to authenticate a user.
"""
output = await authenticate_user_method(email=email, password=password)
return output
if __name__ == "__main__":
import asyncio
# Define an example user
example_email = "example@example.com"
example_password = "securepassword123"
example_is_superuser = False
# Create an event loop and run the create_user function
loop = asyncio.get_event_loop()
result = loop.run_until_complete(authenticate_user(example_email, example_password))
# Print the result
print(result)

View file

@ -30,9 +30,10 @@ from cognee.shared.utils import send_telemetry
from cognee.modules.tasks import create_task_status_table, update_task_status
from cognee.shared.SourceCodeGraph import SourceCodeGraph
from cognee.modules.tasks import get_task_status
from cognee.modules.data.operations.get_dataset_data import get_dataset_data
from cognee.infrastructure.data.chunking.config import get_chunk_config
from cognee.modules.cognify.config import get_cognify_config
from cognee.infrastructure.databases.relational.config import get_relationaldb_config
from cognee.infrastructure.databases.relational import get_relational_engine
USER_ID = "default_user"
@ -45,15 +46,14 @@ async def cognify(datasets: Union[str, List[str]] = None):
# Has to be loaded in advance, multithreading doesn't work without it.
nltk.download("stopwords", quiet=True)
stopwords.ensure_loaded()
create_task_status_table()
await create_task_status_table()
graph_client = await get_graph_engine()
relational_config = get_relationaldb_config()
db_engine = relational_config.database_engine
db_engine = get_relational_engine()
if datasets is None or len(datasets) == 0:
datasets = db_engine.get_datasets()
datasets = await db_engine.get_datasets()
awaitables = []
@ -83,7 +83,7 @@ async def cognify(datasets: Union[str, List[str]] = None):
graphs = await asyncio.gather(*awaitables)
return graphs[0]
added_datasets = db_engine.get_datasets()
added_datasets = await db_engine.get_datasets()
# datasets is a dataset name string
dataset_files = []
@ -91,7 +91,7 @@ async def cognify(datasets: Union[str, List[str]] = None):
for added_dataset in added_datasets:
if dataset_name in added_dataset:
dataset_files.append((added_dataset, db_engine.get_files_metadata(added_dataset)))
dataset_files.append((added_dataset, await get_dataset_data(dataset_name = added_dataset)))
chunk_config = get_chunk_config()
chunk_engine = get_chunk_engine()
@ -167,7 +167,7 @@ async def cognify(datasets: Union[str, List[str]] = None):
else:
document_id = await add_document_node(
graph_client,
parent_node_id = file_metadata['id'],
parent_node_id = file_metadata["id"],
document_metadata = file_metadata,
)
@ -226,7 +226,7 @@ async def process_text(chunk_collection: str, chunk_id: str, input_text: str, fi
if cognify_config.connect_documents is True:
db_engine = get_relationaldb_config().database_engine
db_engine = get_relational_engine()
relevant_documents_to_connect = db_engine.fetch_cognify_data(excluded_document_id = document_id)
list_of_nodes = []

View file

@ -4,7 +4,7 @@ from typing import Union
from cognee.infrastructure.databases.graph import get_graph_config
from cognee.modules.cognify.config import get_cognify_config
from cognee.infrastructure.databases.relational.config import get_relationaldb_config
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.processing.document_types.AudioDocument import AudioDocument
from cognee.modules.data.processing.document_types.ImageDocument import ImageDocument
from cognee.shared.data_models import KnowledgeGraph
@ -17,32 +17,72 @@ from cognee.modules.data.processing.filter_affected_chunks import filter_affecte
from cognee.modules.data.processing.remove_obsolete_chunks import remove_obsolete_chunks
from cognee.modules.data.extraction.knowledge_graph.expand_knowledge_graph import expand_knowledge_graph
from cognee.modules.data.extraction.knowledge_graph.establish_graph_topology import establish_graph_topology
from cognee.modules.data.models import Dataset, Data
from cognee.modules.data.operations.get_dataset_data import get_dataset_data
from cognee.modules.data.operations.retrieve_datasets import retrieve_datasets
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.pipelines import run_tasks, run_tasks_parallel
from cognee.modules.tasks import create_task_status_table, update_task_status, get_task_status
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.permissions.methods import check_permissions_on_documents
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.modules.pipelines.operations.log_pipeline_status import log_pipeline_status
logger = logging.getLogger("cognify.v2")
update_status_lock = asyncio.Lock()
async def cognify(datasets: Union[str, list[str]] = None, root_node_id: str = None):
relational_config = get_relationaldb_config()
db_engine = relational_config.database_engine
create_task_status_table()
class PermissionDeniedException(Exception):
def __init__(self, message: str):
self.message = message
super().__init__(self.message)
async def cognify(datasets: Union[str, list[str]] = None, user: User = None):
db_engine = get_relational_engine()
if datasets is None or len(datasets) == 0:
return await cognify(db_engine.get_datasets())
return await cognify(await db_engine.get_datasets())
if type(datasets[0]) == str:
datasets = await retrieve_datasets(datasets)
if user is None:
user = await get_default_user()
async def run_cognify_pipeline(dataset: Dataset):
data: list[Data] = await get_dataset_data(dataset_id = dataset.id)
documents = [
PdfDocument(id = data_item.id, title=f"{data_item.name}.{data_item.extension}", file_path=data_item.raw_data_location) if data_item.extension == "pdf" else
AudioDocument(id = data_item.id, title=f"{data_item.name}.{data_item.extension}", file_path=data_item.raw_data_location) if data_item.extension == "audio" else
ImageDocument(id = data_item.id, title=f"{data_item.name}.{data_item.extension}", file_path=data_item.raw_data_location) if data_item.extension == "image" else
TextDocument(id = data_item.id, title=f"{data_item.name}.{data_item.extension}", file_path=data_item.raw_data_location)
for data_item in data
]
document_ids = [document.id for document in documents]
document_ids_str = list(map(str, document_ids))
await check_permissions_on_documents(
user,
"read",
document_ids,
)
dataset_id = dataset.id
dataset_name = generate_dataset_name(dataset.name)
async def run_cognify_pipeline(dataset_name: str, files: list[dict]):
async with update_status_lock:
task_status = get_task_status([dataset_name])
task_status = await get_pipeline_status([dataset_id])
if dataset_name in task_status and task_status[dataset_name] == "DATASET_PROCESSING_STARTED":
logger.info(f"Dataset {dataset_name} is being processed.")
if dataset_id in task_status and task_status[dataset_id] == "DATASET_PROCESSING_STARTED":
logger.info("Dataset %s is already being processed.", dataset_name)
return
update_task_status(dataset_name, "DATASET_PROCESSING_STARTED")
await log_pipeline_status(dataset_id, "DATASET_PROCESSING_STARTED", {
"dataset_name": dataset_name,
"files": document_ids_str,
})
try:
cognee_config = get_cognify_config()
graph_config = get_graph_config()
@ -51,7 +91,7 @@ async def cognify(datasets: Union[str, list[str]] = None, root_node_id: str = No
if graph_config.infer_graph_topology and graph_config.graph_topology_task:
from cognee.modules.topology.topology import TopologyEngine
topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology)
root_node_id = await topology_engine.add_graph_topology(files = files)
root_node_id = await topology_engine.add_graph_topology(files = data)
elif graph_config.infer_graph_topology and not graph_config.infer_graph_topology:
from cognee.modules.topology.topology import TopologyEngine
topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology)
@ -82,58 +122,34 @@ async def cognify(datasets: Union[str, list[str]] = None, root_node_id: str = No
Task(remove_obsolete_chunks), # Remove the obsolete document chunks.
]
pipeline = run_tasks(tasks, [
PdfDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "pdf" else
AudioDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "audio" else
ImageDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"]) if file["extension"] == "image" else
TextDocument(title=f"{file['name']}.{file['extension']}", file_path=file["file_path"])
for file in files
])
pipeline = run_tasks(tasks, documents)
async for result in pipeline:
print(result)
update_task_status(dataset_name, "DATASET_PROCESSING_FINISHED")
await log_pipeline_status(dataset_id, "DATASET_PROCESSING_FINISHED", {
"dataset_name": dataset_name,
"files": document_ids_str,
})
except Exception as error:
update_task_status(dataset_name, "DATASET_PROCESSING_ERROR")
await log_pipeline_status(dataset_id, "DATASET_PROCESSING_ERROR", {
"dataset_name": dataset_name,
"files": document_ids_str,
})
raise error
existing_datasets = db_engine.get_datasets()
existing_datasets = [dataset.name for dataset in list(await db_engine.get_datasets())]
awaitables = []
# dataset_files = []
# dataset_name = datasets.replace(".", "_").replace(" ", "_")
# for added_dataset in existing_datasets:
# if dataset_name in added_dataset:
# dataset_files.append((added_dataset, db_engine.get_files_metadata(added_dataset)))
for dataset in datasets:
if dataset in existing_datasets:
# for file_metadata in files:
# if root_node_id is None:
# root_node_id=file_metadata['id']
awaitables.append(run_cognify_pipeline(dataset, db_engine.get_files_metadata(dataset)))
dataset_name = generate_dataset_name(dataset.name)
if dataset_name in existing_datasets:
awaitables.append(run_cognify_pipeline(dataset))
return await asyncio.gather(*awaitables)
#
# if __name__ == "__main__":
# from cognee.api.v1.add import add
# from cognee.api.v1.datasets.datasets import datasets
#
#
# async def aa():
# await add("TEXT ABOUT NLP AND MONKEYS")
#
# print(datasets.discover_datasets())
#
# return
# asyncio.run(cognify())
def generate_dataset_name(dataset_name: str) -> str:
return dataset_name.replace(".", "_").replace(" ", "_")

View file

@ -5,16 +5,17 @@ from cognee.modules.cognify.config import get_cognify_config
from cognee.infrastructure.data.chunking.config import get_chunk_config
from cognee.infrastructure.databases.vector import get_vectordb_config
from cognee.infrastructure.databases.graph.config import get_graph_config
from cognee.infrastructure.databases.relational import get_relationaldb_config
from cognee.infrastructure.databases.relational import get_relational_config
from cognee.infrastructure.files.storage import LocalStorage
class config():
@staticmethod
def system_root_directory(system_root_directory: str):
databases_directory_path = os.path.join(system_root_directory, "databases")
relational_config = get_relationaldb_config()
relational_config = get_relational_config()
relational_config.db_path = databases_directory_path
relational_config.create_engine()
LocalStorage.ensure_directory_exists(databases_directory_path)
graph_config = get_graph_config()
graph_config.graph_file_path = os.path.join(databases_directory_path, "cognee.graph")

View file

@ -1,40 +1,37 @@
from duckdb import CatalogException
from cognee.modules.ingestion import discover_directory_datasets
from cognee.modules.tasks import get_task_status
from cognee.infrastructure.databases.relational.config import get_relationaldb_config
from cognee.modules.data.operations.get_dataset_data import get_dataset_data
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.infrastructure.databases.relational import get_relational_engine
class datasets():
@staticmethod
def list_datasets():
relational_config = get_relationaldb_config()
db = relational_config.database_engine
return db.get_datasets()
async def list_datasets():
db = get_relational_engine()
return await db.get_datasets()
@staticmethod
def discover_datasets(directory_path: str):
return list(discover_directory_datasets(directory_path).keys())
@staticmethod
def list_data(dataset_name: str):
relational_config = get_relationaldb_config()
db = relational_config.database_engine
async def list_data(dataset_id: str, dataset_name: str = None):
try:
return db.get_files_metadata(dataset_name)
return await get_dataset_data(dataset_id = dataset_id, dataset_name = dataset_name)
except CatalogException:
return None
@staticmethod
def get_status(dataset_ids: list[str]) -> dict:
async def get_status(dataset_ids: list[str]) -> dict:
try:
return get_task_status(dataset_ids)
return await get_pipeline_status(dataset_ids)
except CatalogException:
return {}
@staticmethod
def delete_dataset(dataset_id: str):
relational_config = get_relationaldb_config()
db = relational_config.database_engine
async def delete_dataset(dataset_id: str):
db = get_relational_engine()
try:
return db.delete_table(dataset_id)
return await db.delete_table(dataset_id)
except CatalogException:
return {}

View file

@ -0,0 +1,43 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from cognee.modules.users import get_user_db
from cognee.modules.users.models import User, Group, Permission
def get_permissions_router() -> APIRouter:
permissions_router = APIRouter()
@permissions_router.post("/groups/{group_id}/permissions")
async def give_permission_to_group(group_id: int, permission: str, db: Session = Depends(get_user_db)):
group = db.query(Group).filter(Group.id == group_id).first()
if not group:
raise HTTPException(status_code = 404, detail = "Group not found")
permission = db.query(Permission).filter(Permission.name == permission).first()
if not permission:
permission = Permission(name = permission)
db.add(permission)
group.permissions.append(permission)
db.commit()
return JSONResponse(status_code = 200, content = {"message": "Permission assigned to group"})
@permissions_router.post("/users/{user_id}/groups")
async def add_user_to_group(user_id: int, group_id: int, db: Session = Depends(get_user_db)):
user = db.query(User).filter(User.id == user_id).first()
group = db.query(Group).filter(Group.id == group_id).first()
if not user or not group:
raise HTTPException(status_code = 404, detail = "User or group not found")
user.groups.append(group)
db.commit()
return JSONResponse(status_code = 200, content = {"message": "User added to group"})
return permissions_router

View file

@ -10,6 +10,9 @@ from cognee.modules.search.vector.search_traverse import search_traverse
from cognee.modules.search.graph.search_summary import search_summary
from cognee.modules.search.graph.search_similarity import search_similarity
from cognee.shared.utils import send_telemetry
from cognee.modules.users.permissions.methods import get_document_ids_for_user
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
class SearchType(Enum):
ADJACENT = "ADJACENT"
@ -25,23 +28,40 @@ class SearchType(Enum):
def from_str(name: str):
try:
return SearchType[name.upper()]
except KeyError:
raise ValueError(f"{name} is not a valid SearchType")
except KeyError as error:
raise ValueError(f"{name} is not a valid SearchType") from error
class SearchParameters(BaseModel):
search_type: SearchType
params: Dict[str, Any]
@field_validator("search_type", mode="before")
def convert_string_to_enum(cls, value):
def convert_string_to_enum(cls, value): # pylint: disable=no-self-argument
if isinstance(value, str):
return SearchType.from_str(value)
return value
async def search(search_type: str, params: Dict[str, Any]) -> List:
async def search(search_type: str, params: Dict[str, Any], user: User = None) -> List:
if user is None:
user = await get_default_user()
own_document_ids = await get_document_ids_for_user(user.id)
search_params = SearchParameters(search_type = search_type, params = params)
return await specific_search([search_params])
search_results = await specific_search([search_params])
from uuid import UUID
filtered_search_results = []
for search_result in search_results:
document_id = search_result["document_id"] if "document_id" in search_result else None
document_id = UUID(document_id) if type(document_id) == str else document_id
if document_id is None or document_id in own_document_ids:
filtered_search_results.append(search_result)
return filtered_search_results
async def specific_search(query_params: List[SearchParameters]) -> List:
@ -53,7 +73,6 @@ async def specific_search(query_params: List[SearchParameters]) -> List:
SearchType.SIMILARITY: search_similarity,
}
results = []
search_tasks = []
for search_param in query_params:
@ -66,38 +85,6 @@ async def specific_search(query_params: List[SearchParameters]) -> List:
# Use asyncio.gather to run all scheduled tasks concurrently
search_results = await asyncio.gather(*search_tasks)
# Update the results set with the results from all tasks
results.extend(search_results)
send_telemetry("cognee.search")
return results[0] if len(results) == 1 else results
if __name__ == "__main__":
async def main():
# Assuming 'graph' is your graph object, obtained from somewhere
search_type = 'CATEGORIES'
params = {'query': 'Ministarstvo', 'other_param': {"node_id": "LLM_LAYER_SUMMARY:DOCUMENT:881ecb36-2819-54c3-8147-ed80293084d6"}}
results = await search(search_type, params)
print(results)
# Run the async main function
asyncio.run(main())
# if __name__ == "__main__":
# import asyncio
# query_params = {
# SearchType.SIMILARITY: {'query': 'your search query here'}
# }
# async def main():
# graph_client = get_graph_engine()
# await graph_client.load_graph_from_file()
# graph = graph_client.graph
# results = await search(graph, query_params)
# print(results)
# asyncio.run(main())
return search_results[0] if len(search_results) == 1 else search_results

View file

@ -0,0 +1 @@
from .create_user import create_user

View file

@ -0,0 +1,12 @@
from cognee.modules.users.methods import create_user as create_user_method
async def create_user(email: str, password: str, is_superuser: bool = False):
user = await create_user_method(
email = email,
password = password,
is_superuser = is_superuser,
is_verified = True,
)
return user

View file

@ -0,0 +1,5 @@
from .get_auth_router import get_auth_router
from .get_register_router import get_register_router
from .get_reset_password_router import get_reset_password_router
from .get_users_router import get_users_router
from .get_verify_router import get_verify_router

View file

@ -0,0 +1,6 @@
from cognee.modules.users.get_fastapi_users import get_fastapi_users
from cognee.modules.users.authentication.get_auth_backend import get_auth_backend
def get_auth_router():
auth_backend = get_auth_backend()
return get_fastapi_users().get_auth_router(auth_backend)

View file

@ -0,0 +1,5 @@
from cognee.modules.users.get_fastapi_users import get_fastapi_users
from cognee.modules.users.models.User import UserRead, UserCreate
def get_register_router():
return get_fastapi_users().get_register_router(UserRead, UserCreate)

View file

@ -0,0 +1,4 @@
from cognee.modules.users.get_fastapi_users import get_fastapi_users
def get_reset_password_router():
return get_fastapi_users().get_reset_password_router()

View file

@ -0,0 +1,5 @@
from cognee.modules.users.get_fastapi_users import get_fastapi_users
from cognee.modules.users.models.User import UserRead, UserUpdate
def get_users_router():
return get_fastapi_users().get_users_router(UserRead, UserUpdate)

View file

@ -0,0 +1,5 @@
from cognee.modules.users.get_fastapi_users import get_fastapi_users
from cognee.modules.users.models.User import UserRead
def get_verify_router():
return get_fastapi_users().get_verify_router(UserRead)

View file

@ -1,3 +0,0 @@
from .models.Data import Data
from .models.Dataset import Dataset
from .models.DatasetData import DatasetData

View file

@ -1,23 +0,0 @@
from typing import List
from datetime import datetime, timezone
from sqlalchemy.orm import relationship, MappedColumn, Mapped
from sqlalchemy import Column, String, DateTime, UUID, Text, JSON
from cognee.infrastructure.databases.relational import ModelBase
from .DatasetData import DatasetData
class Data(ModelBase):
__tablename__ = "data"
id = Column(UUID, primary_key = True)
name = Column(String, nullable = True)
description = Column(Text, nullable = True)
raw_data_location = Column(String)
meta_data: Mapped[dict] = MappedColumn(type_ = JSON) # metadata is reserved word
created_at = Column(DateTime, default = datetime.now(timezone.utc))
updated_at = Column(DateTime, onupdate = datetime.now(timezone.utc))
datasets: Mapped[List["Dataset"]] = relationship(
secondary = DatasetData.__tablename__,
back_populates = "data"
)

View file

@ -1,21 +0,0 @@
from typing import List
from datetime import datetime, timezone
from sqlalchemy.orm import relationship, Mapped
from sqlalchemy import Column, Text, DateTime, UUID
from cognee.infrastructure.databases.relational import ModelBase
from .DatasetData import DatasetData
class Dataset(ModelBase):
__tablename__ = "dataset"
id = Column(UUID, primary_key = True)
name = Column(Text)
description = Column(Text, nullable = True)
created_at = Column(DateTime, default = datetime.now(timezone.utc))
updated_at = Column(DateTime, onupdate = datetime.now(timezone.utc))
data: Mapped[List["Data"]] = relationship(
secondary = DatasetData.__tablename__,
back_populates = "datasets"
)

View file

@ -1,14 +0,0 @@
from uuid import uuid4
from datetime import datetime, timezone
from sqlalchemy import Column, DateTime, UUID, ForeignKey
from cognee.infrastructure.databases.relational import ModelBase
class DatasetData(ModelBase):
__tablename__ = "dataset_data"
id = Column(UUID, primary_key = True, default = uuid4())
created_at = Column(DateTime, default = datetime.now(timezone.utc))
dataset_id = Column("dataset", UUID, ForeignKey("dataset.id"), primary_key = True)
data_id = Column("data", UUID, ForeignKey("data.id"), primary_key = True)

View file

@ -21,7 +21,8 @@ class Neo4jAdapter(GraphDBInterface):
):
self.driver = driver or AsyncGraphDatabase.driver(
graph_database_url,
auth = (graph_database_username, graph_database_password)
auth = (graph_database_username, graph_database_password),
max_connection_lifetime = 120
)
async def close(self) -> None:

View file

@ -0,0 +1,29 @@
import inspect
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
class FakeAsyncSession:
def __init__(self, session: Session):
self.session = session
def run_sync(self, *args, **kwargs):
return self.execute(*args, **kwargs)
def __getattr__(self, name: str) -> Any:
"""
If the method being called is async in AsyncSession, create a fake async version
for Session so callers can `await` as usual. Think `commit`, `refresh`,
`delete`, etc.
"""
async_session_attr = getattr(AsyncSession, name, None)
session_attr = getattr(self.session, name)
if not inspect.iscoroutinefunction(async_session_attr):
return session_attr
async def async_wrapper(*args, **kwargs):
return session_attr(*args, **kwargs)
return async_wrapper

View file

@ -1,3 +1,4 @@
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import DeclarativeBase
ModelBase = declarative_base()
class Base(DeclarativeBase):
pass

View file

@ -1,5 +1,7 @@
from .ModelBase import ModelBase
from .ModelBase import Base
from .DatabaseEngine import DatabaseEngine
from .sqlite.SqliteEngine import SqliteEngine
from .duckdb.DuckDBAdapter import DuckDBAdapter
from .config import get_relationaldb_config
from .config import get_relational_config
from .create_db_and_tables import create_db_and_tables
from .get_relational_engine import get_relational_engine

View file

@ -2,23 +2,20 @@ import os
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict
from cognee.root_dir import get_absolute_path
from .create_relational_engine import create_relational_engine
class RelationalConfig(BaseSettings):
db_path: str = os.path.join(get_absolute_path(".cognee_system"), "databases")
db_name: str = "cognee.db"
db_host: str = "localhost"
db_name: str = "cognee_db"
db_host: str = "127.0.0.1"
db_port: str = "5432"
db_user: str = "cognee"
db_password: str = "cognee"
database_engine: object = create_relational_engine(db_path, db_name)
db_provider: str = "postgresql+asyncpg"
# db_provider: str = "duckdb"
db_file_path: str = os.path.join(db_path, db_name)
model_config = SettingsConfigDict(env_file = ".env", extra = "allow")
def create_engine(self):
self.db_file_path = os.path.join(self.db_path, self.db_name)
self.database_engine = create_relational_engine(self.db_path, self.db_name)
model_config = SettingsConfigDict(env_file = ".env", extra = "allow")
def to_dict(self) -> dict:
return {
@ -28,9 +25,9 @@ class RelationalConfig(BaseSettings):
"db_port": self.db_port,
"db_user": self.db_user,
"db_password": self.db_password,
"db_engine": self.database_engine,
"db_provider": self.db_provider,
}
@lru_cache
def get_relationaldb_config():
def get_relational_config():
return RelationalConfig()

View file

@ -0,0 +1,9 @@
from .ModelBase import Base
from .get_relational_engine import get_relational_engine
async def create_db_and_tables():
relational_engine = get_relational_engine()
async with relational_engine.engine.begin() as connection:
if len(Base.metadata.tables.keys()) > 0:
await connection.run_sync(Base.metadata.create_all)

View file

@ -1,10 +1,21 @@
from cognee.infrastructure.files.storage import LocalStorage
from cognee.infrastructure.databases.relational import DuckDBAdapter
from .sqlalchemy.SqlAlchemyAdapter import SQLAlchemyAdapter
def create_relational_engine(db_path: str, db_name: str):
LocalStorage.ensure_directory_exists(db_path)
return DuckDBAdapter(
def create_relational_engine(
db_path: str,
db_name: str,
db_provider: str,
db_host: str,
db_port: str,
db_user: str,
db_password: str,
):
return SQLAlchemyAdapter(
db_name = db_name,
db_path = db_path,
db_type = db_provider,
db_host = db_host,
db_port = db_port,
db_user = db_user,
db_password = db_password
)

View file

@ -79,7 +79,6 @@ class DuckDBAdapter():
connection.execute("""
CREATE TABLE IF NOT EXISTS cognify (
document_id STRING,
layer_id STRING,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT NULL,
processed BOOLEAN DEFAULT FALSE,
@ -89,8 +88,8 @@ class DuckDBAdapter():
# Prepare the insert statement
insert_query = """
INSERT INTO cognify (document_id, layer_id)
VALUES (?, ?);
INSERT INTO cognify (document_id)
VALUES (?);
"""
# Insert each record into the "cognify" table
@ -98,7 +97,6 @@ class DuckDBAdapter():
with self.get_connection() as connection:
connection.execute(insert_query, [
record.get("document_id"),
record.get("layer_id")
])
def fetch_cognify_data(self, excluded_document_id: str):
@ -106,7 +104,6 @@ class DuckDBAdapter():
create_table_sql = """
CREATE TABLE IF NOT EXISTS cognify (
document_id STRING,
layer_id STRING,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT NULL,
processed BOOLEAN DEFAULT FALSE,
@ -118,7 +115,7 @@ class DuckDBAdapter():
connection.execute(create_table_sql)
# SQL command to select data from the "cognify" table
select_data_sql = f"SELECT document_id, layer_id, created_at, updated_at, processed FROM cognify WHERE document_id != '{excluded_document_id}' AND processed = FALSE;"
select_data_sql = f"SELECT document_id, created_at, updated_at, processed FROM cognify WHERE document_id != '{excluded_document_id}' AND processed = FALSE;"
with self.get_connection() as connection:
# Execute the query and fetch the results
@ -144,7 +141,6 @@ class DuckDBAdapter():
create_table_sql = """
CREATE TABLE IF NOT EXISTS cognify (
document_id STRING,
layer_id STRING,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT NULL,
processed BOOLEAN DEFAULT FALSE,
@ -166,7 +162,8 @@ class DuckDBAdapter():
def delete_database(self):
from cognee.infrastructure.files.storage import LocalStorage
LocalStorage.remove(self.db_location)
if LocalStorage.file_exists(self.db_location):
LocalStorage.remove(self.db_location)
if LocalStorage.file_exists(self.db_location + ".wal"):
LocalStorage.remove(self.db_location + ".wal")

View file

@ -0,0 +1,8 @@
from .config import get_relational_config
from .create_relational_engine import create_relational_engine
def get_relational_engine():
relational_config = get_relational_config()
return create_relational_engine(**relational_config.to_dict())

View file

@ -0,0 +1,109 @@
import os
import asyncio
from typing import AsyncGenerator
from contextlib import asynccontextmanager
from sqlalchemy import create_engine, text, select
from sqlalchemy.orm import sessionmaker, joinedload
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from cognee.infrastructure.files.storage import LocalStorage
from cognee.infrastructure.databases.relational.FakeAsyncSession import FakeAsyncSession
from ..ModelBase import Base
def make_async_sessionmaker(sessionmaker):
@asynccontextmanager
async def async_session_maker():
await asyncio.sleep(0.1)
yield FakeAsyncSession(sessionmaker())
return async_session_maker
class SQLAlchemyAdapter():
def __init__(self, db_type: str, db_path: str, db_name: str, db_user: str, db_password: str, db_host: str, db_port: str):
self.db_location = os.path.abspath(os.path.join(db_path, db_name))
self.db_name = db_name
if db_type == "duckdb":
LocalStorage.ensure_directory_exists(db_path)
self.engine = create_engine(f"duckdb:///{self.db_location}")
self.sessionmaker = make_async_sessionmaker(sessionmaker(bind = self.engine))
else:
self.engine = create_async_engine(f"postgresql+asyncpg://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}")
self.sessionmaker = async_sessionmaker(bind = self.engine, expire_on_commit = False)
@asynccontextmanager
async def get_async_session(self) -> AsyncGenerator[AsyncSession, None]:
async_session_maker = self.sessionmaker
async with async_session_maker() as session:
yield session
def get_session(self):
session_maker = self.sessionmaker
with session_maker() as session:
yield session
async def get_datasets(self):
from cognee.modules.data.models import Dataset
async with self.get_async_session() as session:
datasets = (await session.execute(select(Dataset).options(joinedload(Dataset.data)))).unique().scalars().all()
return datasets
async def create_table(self, schema_name: str, table_name: str, table_config: list[dict]):
fields_query_parts = [f"{item['name']} {item['type']}" for item in table_config]
async with self.engine.begin() as connection:
await connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name};"))
await connection.execute(text(f"CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ({', '.join(fields_query_parts)});"))
async def delete_table(self, table_name: str):
async with self.engine.connect() as connection:
await connection.execute(text(f"DROP TABLE IF EXISTS {table_name};"))
async def insert_data(self, schema_name: str, table_name: str, data: list[dict]):
columns = ", ".join(data[0].keys())
values = ", ".join([f"({', '.join([f':{key}' for key in row.keys()])})" for row in data])
insert_query = text(f"INSERT INTO {schema_name}.{table_name} ({columns}) VALUES {values};")
async with self.engine.connect() as connection:
await connection.execute(insert_query, data)
async def get_data(self, table_name: str, filters: dict = None):
async with self.engine.connect() as connection:
query = f"SELECT * FROM {table_name}"
if filters:
filter_conditions = " AND ".join([
f"{key} IN ({', '.join([f':{key}{i}' for i in range(len(value))])})" if isinstance(value, list)
else f"{key} = :{key}" for key, value in filters.items()
])
query += f" WHERE {filter_conditions};"
query = text(query)
results = await connection.execute(query, filters)
else:
query += ";"
query = text(query)
results = await connection.execute(query)
return {result["data_id"]: result["status"] for result in results}
async def execute_query(self, query):
async with self.engine.connect() as connection:
result = await connection.execute(text(query))
return [dict(row) for row in result]
async def drop_tables(self, connection):
try:
await connection.execute(text("DROP TABLE IF EXISTS group_permission CASCADE"))
await connection.execute(text("DROP TABLE IF EXISTS permissions CASCADE"))
# Add more DROP TABLE statements for other tables as needed
print("Database tables dropped successfully.")
except Exception as e:
print(f"Error dropping database tables: {e}")
async def delete_database(self):
async with self.engine.connect() as connection:
try:
async with self.engine.begin() as connection:
await connection.run_sync(Base.metadata.drop_all)
print("Database deleted successfully.")
except Exception as e:
print(f"Error deleting database: {e}")

View file

@ -6,7 +6,7 @@ from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, Asyn
from sqlalchemy.future import select
from cognee.infrastructure.files.storage.LocalStorage import LocalStorage
from ..DatabaseEngine import DatabaseEngine
from ..ModelBase import ModelBase
from ..ModelBase import Base
from ..utils import with_rollback
class SqliteEngine(DatabaseEngine):
@ -60,7 +60,7 @@ class SqliteEngine(DatabaseEngine):
async def create_tables(self):
async with self.engine.begin() as connection:
return await connection.run_sync(ModelBase.metadata.create_all)
return await connection.run_sync(Base.metadata.create_all)
async def create(self, data):
async with with_rollback(self.session_maker()) as session:

View file

@ -1,6 +1,6 @@
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.infrastructure.databases.graph.get_graph_engine import get_graph_engine
from cognee.infrastructure.databases.relational import get_relationaldb_config
from cognee.infrastructure.databases.relational import get_relational_engine
async def prune_system(graph = True, vector = True, metadata = False):
if graph:
@ -12,6 +12,5 @@ async def prune_system(graph = True, vector = True, metadata = False):
await vector_engine.prune()
if metadata:
db_config = get_relationaldb_config()
db_engine = db_config.database_engine
db_engine.delete_database()
db_engine = get_relational_engine()
await db_engine.delete_database()

View file

@ -26,6 +26,7 @@ async def summarize_text_chunks(data_chunks: list[DocumentChunk], summarization_
id = str(chunk.chunk_id),
payload = dict(
chunk_id = str(chunk.chunk_id),
document_id = str(chunk.document_id),
text = chunk_summaries[chunk_index].summary,
),
embed_field = "text",

View file

@ -1,7 +1,7 @@
import json
import asyncio
from uuid import uuid5, NAMESPACE_OID
from datetime import datetime
from datetime import datetime, timezone
from typing import Type
from pydantic import BaseModel
from cognee.infrastructure.databases.graph import get_graph_engine
@ -91,8 +91,8 @@ async def expand_knowledge_graph(data_chunks: list[DocumentChunk], graph_model:
name = node_name,
type = node_name,
description = node.description,
created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
created_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
updated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
)
graph_nodes.append((
@ -145,8 +145,8 @@ async def expand_knowledge_graph(data_chunks: list[DocumentChunk], graph_model:
name = type_node_name,
type = type_node_id,
description = type_node_name,
created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
created_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
updated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
)
graph_nodes.append((type_node_id, dict(

View file

@ -0,0 +1,37 @@
from uuid import uuid4
from typing import List
from datetime import datetime, timezone
from sqlalchemy.orm import relationship, Mapped
from sqlalchemy import Column, String, DateTime, UUID
from cognee.infrastructure.databases.relational import Base
from .DatasetData import DatasetData
class Data(Base):
__tablename__ = "data"
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
name = Column(String)
extension = Column(String)
mime_type = Column(String)
raw_data_location = Column(String)
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc))
datasets: Mapped[List["Dataset"]] = relationship(
secondary = DatasetData.__tablename__,
back_populates = "data"
)
def to_json(self) -> dict:
return {
"id": str(self.id),
"name": self.name,
"extension": self.extension,
"mimeType": self.mime_type,
"rawDataLocation": self.raw_data_location,
"createdAt": self.created_at.isoformat(),
"updatedAt": self.updated_at.isoformat() if self.updated_at else None,
# "datasets": [dataset.to_json() for dataset in self.datasets]
}

View file

@ -0,0 +1,31 @@
from uuid import uuid4
from typing import List
from datetime import datetime, timezone
from sqlalchemy.orm import relationship, Mapped
from sqlalchemy import Column, Text, DateTime, UUID
from cognee.infrastructure.databases.relational import Base
from .DatasetData import DatasetData
class Dataset(Base):
__tablename__ = "datasets"
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
name = Column(Text)
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc))
data: Mapped[List["Data"]] = relationship(
secondary = DatasetData.__tablename__,
back_populates = "datasets"
)
def to_json(self) -> dict:
return {
"id": str(self.id),
"name": self.name,
"createdAt": self.created_at.isoformat(),
"updatedAt": self.updated_at.isoformat() if self.updated_at else None,
"data": [data.to_json() for data in self.data]
}

View file

@ -0,0 +1,11 @@
from datetime import datetime, timezone
from sqlalchemy import Column, DateTime, UUID, ForeignKey
from cognee.infrastructure.databases.relational import Base
class DatasetData(Base):
__tablename__ = "dataset_data"
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
dataset_id = Column(UUID(as_uuid = True), ForeignKey("datasets.id"), primary_key = True)
data_id = Column(UUID(as_uuid = True), ForeignKey("data.id"), primary_key = True)

View file

@ -0,0 +1,2 @@
from .Data import Data
from .Dataset import Dataset

View file

@ -0,0 +1,26 @@
from sqlalchemy import select
from sqlalchemy.orm import joinedload
from cognee.modules.data.models import Dataset
from cognee.infrastructure.databases.relational import get_relational_engine
async def ensure_dataset_exists(dataset_name: str) -> Dataset:
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
dataset = (await session.scalars(
select(Dataset)\
.options(joinedload(Dataset.data))\
.filter(Dataset.name == dataset_name)
)).first()
if dataset is None:
dataset = Dataset(
name = dataset_name,
data = []
)
session.add(dataset)
await session.commit()
return dataset

View file

@ -0,0 +1,18 @@
from uuid import UUID
from sqlalchemy import select
from cognee.modules.data.models import Data, Dataset
from cognee.infrastructure.databases.relational import get_relational_engine
async def get_dataset_data(dataset_id: UUID = None, dataset_name: str = None):
if dataset_id is None and dataset_name is None:
raise ValueError("get_dataset_data: Either dataset_id or dataset_name must be provided.")
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
result = await session.execute(
select(Data).join(Data.datasets).filter((Dataset.id == dataset_id) | (Dataset.name == dataset_name))
)
data = result.scalars().all()
return data

View file

@ -0,0 +1,13 @@
from sqlalchemy import select
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models import Dataset
async def retrieve_datasets(dataset_names: list[str]) -> list[Dataset]:
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
datasets = (await session.scalars(
select(Dataset).filter(Dataset.name.in_(dataset_names))
)).all()
return datasets

View file

@ -1,5 +1,5 @@
from uuid import uuid5, NAMESPACE_OID
from typing import Optional, Generator
from uuid import UUID, uuid5, NAMESPACE_OID
from typing import Optional
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.modules.data.chunking import chunk_by_paragraph
@ -7,10 +7,10 @@ from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChu
from cognee.modules.data.processing.document_types.Document import Document
class AudioReader:
id: str
id: UUID
file_path: str
def __init__(self, id: str, file_path: str):
def __init__(self, id: UUID, file_path: str):
self.id = id
self.file_path = file_path
self.llm_client = get_llm_client() # You can choose different models like "tiny", "base", "small", etc.
@ -87,13 +87,11 @@ class AudioDocument(Document):
title: str
file_path: str
def __init__(self, title: str, file_path: str):
self.id = uuid5(NAMESPACE_OID, title)
def __init__(self, id: UUID, title: str, file_path: str):
self.id = id or uuid5(NAMESPACE_OID, title)
self.title = title
self.file_path = file_path
reader = AudioReader(self.id, self.file_path)
def get_reader(self) -> AudioReader:
reader = AudioReader(self.id, self.file_path)
return reader

View file

@ -1,5 +1,5 @@
from uuid import uuid5, NAMESPACE_OID
from typing import Optional, Generator
from uuid import UUID, uuid5, NAMESPACE_OID
from typing import Optional
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.modules.data.chunking import chunk_by_paragraph
@ -7,10 +7,10 @@ from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChu
from cognee.modules.data.processing.document_types.Document import Document
class ImageReader:
id: str
id: UUID
file_path: str
def __init__(self, id: str, file_path: str):
def __init__(self, id: UUID, file_path: str):
self.id = id
self.file_path = file_path
self.llm_client = get_llm_client() # You can choose different models like "tiny", "base", "small", etc.
@ -24,10 +24,8 @@ class ImageReader:
# Transcribe the image file
result = self.llm_client.transcribe_image(self.file_path)
print("Transcription result: ", result.choices[0].message.content)
text = result.choices[0].message.content
# Simulate reading text in chunks as done in TextReader
def read_text_chunks(text, chunk_size):
for i in range(0, len(text), chunk_size):
@ -89,13 +87,11 @@ class ImageDocument(Document):
title: str
file_path: str
def __init__(self, title: str, file_path: str):
self.id = uuid5(NAMESPACE_OID, title)
def __init__(self, id: UUID, title: str, file_path: str):
self.id = id or uuid5(NAMESPACE_OID, title)
self.title = title
self.file_path = file_path
reader = ImageReader(self.id, self.file_path)
def get_reader(self) -> ImageReader:
reader = ImageReader(self.id, self.file_path)
return reader

View file

@ -1,6 +1,6 @@
# import pdfplumber
import logging
from uuid import uuid5, NAMESPACE_OID
from uuid import UUID, uuid5, NAMESPACE_OID
from typing import Optional
from pypdf import PdfReader as pypdf_PdfReader
from cognee.modules.data.chunking import chunk_by_paragraph
@ -8,10 +8,10 @@ from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChu
from .Document import Document
class PdfReader():
id: str
id: UUID
file_path: str
def __init__(self, id: str, file_path: str):
def __init__(self, id: UUID, file_path: str):
self.id = id
self.file_path = file_path
@ -86,8 +86,8 @@ class PdfDocument(Document):
num_pages: int
file_path: str
def __init__(self, title: str, file_path: str):
self.id = uuid5(NAMESPACE_OID, title)
def __init__(self, id: UUID, title: str, file_path: str):
self.id = id or uuid5(NAMESPACE_OID, title)
self.title = title
self.file_path = file_path
logging.debug("file_path: %s", self.file_path)

View file

@ -1,14 +1,14 @@
from uuid import uuid5, NAMESPACE_OID
from uuid import UUID, uuid5, NAMESPACE_OID
from typing import Optional
from cognee.modules.data.chunking import chunk_by_paragraph
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
from .Document import Document
class TextReader():
id: str
id: UUID
file_path: str
def __init__(self, id: str, file_path: str):
def __init__(self, id: UUID, file_path: str):
self.id = id
self.file_path = file_path
@ -90,8 +90,8 @@ class TextDocument(Document):
num_pages: int
file_path: str
def __init__(self, title: str, file_path: str):
self.id = uuid5(NAMESPACE_OID, title)
def __init__(self, id: UUID, title: str, file_path: str):
self.id = id or uuid5(NAMESPACE_OID, title)
self.title = title
self.file_path = file_path

View file

@ -1,7 +1,7 @@
from cognee.infrastructure.databases.graph import get_graph_engine
from .document_types import Document
async def process_documents(documents: list[Document], parent_node_id: str = None):
async def process_documents(documents: list[Document], parent_node_id: str = None, user:str=None, user_permissions:str=None):
graph_engine = await get_graph_engine()
nodes = []
@ -16,6 +16,9 @@ async def process_documents(documents: list[Document], parent_node_id: str = Non
document_node = document_nodes[document_index] if document_index in document_nodes else None
if document_node is None:
document_dict = document.to_dict()
document_dict["user"] = user
document_dict["user_permissions"] = user_permissions
nodes.append((str(document.id), document.to_dict()))
if parent_node_id:

View file

@ -17,7 +17,7 @@ class BinaryData(IngestionData):
def get_identifier(self):
metadata = self.get_metadata()
return self.name + "_" + metadata["mime_type"]
return self.name + "." + metadata["extension"]
def get_metadata(self):
self.ensure_metadata()

View file

@ -1,9 +1,7 @@
from uuid import uuid5, UUID
from uuid import uuid5, NAMESPACE_OID
from .data_types import IngestionData
null_uuid: UUID = UUID("00000000-0000-0000-0000-000000000000")
def identify(data: IngestionData) -> str:
data_id: str = data.get_identifier()
return str(uuid5(null_uuid, data_id)).replace("-", "")
return uuid5(NAMESPACE_OID, data_id)

View file

@ -1,22 +1,22 @@
from typing import List
from uuid import uuid4
from datetime import datetime, timezone
from sqlalchemy import Column, UUID, DateTime, String, Text
from sqlalchemy.orm import relationship, Mapped
from cognee.infrastructure.databases.relational import ModelBase
from cognee.infrastructure.databases.relational import Base
from .PipelineTask import PipelineTask
class Pipeline(ModelBase):
class Pipeline(Base):
__tablename__ = "pipelines"
id = Column(UUID, primary_key = True, default = uuid4())
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
name = Column(String)
description = Column(Text, nullable = True)
created_at = Column(DateTime, default = datetime.now(timezone.utc))
updated_at = Column(DateTime, onupdate = datetime.now(timezone.utc))
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc))
tasks = Mapped[List["Task"]] = relationship(
tasks = Mapped[list["Task"]] = relationship(
secondary = PipelineTask.__tablename__,
back_populates = "pipeline",
)

View file

@ -0,0 +1,16 @@
from uuid import uuid4
from datetime import datetime, timezone
from sqlalchemy import Column, UUID, DateTime, String, JSON
from cognee.infrastructure.databases.relational import Base
class PipelineRun(Base):
__tablename__ = "pipeline_runs"
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
status = Column(String)
run_id = Column(UUID(as_uuid = True), index = True)
run_info = Column(JSON)

View file

@ -1,14 +1,11 @@
from uuid import uuid4
from datetime import datetime, timezone
from sqlalchemy import Column, DateTime, UUID, ForeignKey
from cognee.infrastructure.databases.relational import ModelBase
from cognee.infrastructure.databases.relational import Base
class PipelineTask(ModelBase):
class PipelineTask(Base):
__tablename__ = "pipeline_task"
id = Column(UUID, primary_key = True, default = uuid4())
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
created_at = Column(DateTime, default = datetime.now(timezone.utc))
pipeline_id = Column("pipeline", UUID, ForeignKey("pipeline.id"), primary_key = True)
task_id = Column("task", UUID, ForeignKey("task.id"), primary_key = True)
pipeline_id = Column("pipeline", UUID(as_uuid = True), ForeignKey("pipeline.id"), primary_key = True)
task_id = Column("task", UUID(as_uuid = True), ForeignKey("task.id"), primary_key = True)

View file

@ -1,24 +1,24 @@
from uuid import uuid4
from typing import List
from datetime import datetime, timezone
from sqlalchemy.orm import relationship, Mapped
from sqlalchemy import Column, String, DateTime, UUID, Text
from cognee.infrastructure.databases.relational import ModelBase
from cognee.infrastructure.databases.relational import Base
from .PipelineTask import PipelineTask
class Task(ModelBase):
class Task(Base):
__tablename__ = "tasks"
id = Column(UUID, primary_key = True, default = uuid4())
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
name = Column(String)
description = Column(Text, nullable = True)
executable = Column(Text)
created_at = Column(DateTime, default = datetime.now(timezone.utc))
updated_at = Column(DateTime, onupdate = datetime.now(timezone.utc))
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc))
datasets: Mapped[List["Pipeline"]] = relationship(
datasets: Mapped[list["Pipeline"]] = relationship(
secondary = PipelineTask.__tablename__,
back_populates = "task"
)

View file

@ -0,0 +1,17 @@
from uuid import uuid4
from datetime import datetime, timezone
from sqlalchemy import Column, UUID, DateTime, String, JSON
from cognee.infrastructure.databases.relational import Base
class TaskRun(Base):
__tablename__ = "task_runs"
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
task_name = Column(String)
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
status = Column(String)
run_info = Column(JSON)

View file

@ -0,0 +1 @@
from .PipelineRun import PipelineRun

View file

@ -1,4 +0,0 @@
from ..models import Pipeline, Task
def add_task(pipeline: Pipeline, task: Task):
pipeline.tasks.append(task)

View file

@ -0,0 +1,41 @@
from uuid import UUID
from sqlalchemy import func, select
from sqlalchemy.orm import aliased
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models import PipelineRun
async def get_pipeline_status(pipeline_ids: list[UUID]):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
query = select(
PipelineRun,
func.row_number().over(
partition_by = PipelineRun.run_id,
order_by = PipelineRun.created_at.desc(),
).label("rn")
).filter(PipelineRun.run_id.in_(pipeline_ids)).subquery()
aliased_pipeline_run = aliased(PipelineRun, query)
latest_runs = (
select(aliased_pipeline_run).filter(query.c.rn == 1)
)
runs = (await session.execute(latest_runs)).scalars().all()
pipeline_statuses = {
str(run.run_id): run.status for run in runs
}
return pipeline_statuses
# f"""SELECT data_id, status
# FROM (
# SELECT data_id, status, ROW_NUMBER() OVER (PARTITION BY data_id ORDER BY created_at DESC) as rn
# FROM cognee.cognee.task_runs
# WHERE data_id IN ({formatted_data_ids})
# ) t
# WHERE rn = 1;"""
# return { dataset["data_id"]: dataset["status"] for dataset in datasets_statuses }

View file

@ -0,0 +1,15 @@
from uuid import UUID
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models.PipelineRun import PipelineRun
async def log_pipeline_status(run_id: UUID, status: str, run_info: dict):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
session.add(PipelineRun(
run_id = run_id,
status = status,
run_info = run_info,
))
await session.commit()

View file

@ -12,6 +12,16 @@ async def search_similarity(query: str) -> list[str, str]:
similar_results = await vector_engine.search("chunks", query, limit = 5)
results = [result.payload for result in similar_results]
results = [
parse_payload(result.payload) for result in similar_results
]
return results
def parse_payload(payload: dict) -> dict:
return {
"text": payload["text"],
"chunk_id": payload["chunk_id"],
"document_id": payload["document_id"],
}

View file

@ -1,3 +0,0 @@
from .get_task_status import get_task_status
from .update_task_status import update_task_status
from .create_task_status_table import create_task_status_table

View file

@ -1,11 +0,0 @@
from cognee.infrastructure.databases.relational.config import get_relationaldb_config
def create_task_status_table():
config = get_relationaldb_config()
db_engine = config.database_engine
db_engine.create_table("cognee.cognee", "cognee_task_status", [
dict(name = "data_id", type = "STRING"),
dict(name = "status", type = "STRING"),
dict(name = "created_at", type = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"),
])

View file

@ -1,19 +0,0 @@
from cognee.infrastructure.databases.relational.config import get_relationaldb_config
def get_task_status(data_ids: [str]):
relational_config = get_relationaldb_config()
db_engine = relational_config.database_engine
formatted_data_ids = ", ".join([f"'{data_id}'" for data_id in data_ids])
datasets_statuses = db_engine.execute_query(
f"""SELECT data_id, status
FROM (
SELECT data_id, status, ROW_NUMBER() OVER (PARTITION BY data_id ORDER BY created_at DESC) as rn
FROM cognee.cognee.cognee_task_status
WHERE data_id IN ({formatted_data_ids})
) t
WHERE rn = 1;"""
)
return { dataset["data_id"]: dataset["status"] for dataset in datasets_statuses }

View file

@ -1,6 +0,0 @@
from cognee.infrastructure.databases.relational.config import get_relationaldb_config
def update_task_status(data_id: str, status: str):
config = get_relationaldb_config()
db_engine = config.database_engine
db_engine.insert_data("cognee.cognee", "cognee_task_status", [dict(data_id = data_id, status = status)])

View file

@ -3,10 +3,8 @@
import csv
import json
import logging
import os
from typing import Any, Dict, List, Optional, Union, Type
import asyncio
import aiofiles
import pandas as pd
from pydantic import BaseModel
@ -14,16 +12,10 @@ from pydantic import BaseModel
from cognee.infrastructure.data.chunking.config import get_chunk_config
from cognee.infrastructure.data.chunking.get_chunking_engine import get_chunk_engine
from cognee.infrastructure.databases.graph.get_graph_engine import get_graph_engine
from cognee.infrastructure.databases.relational import get_relationaldb_config
from cognee.infrastructure.files.utils.extract_text_from_file import extract_text_from_file
from cognee.infrastructure.files.utils.guess_file_type import guess_file_type, FileTypeException
from cognee.modules.cognify.config import get_cognify_config
from cognee.base_config import get_base_config
from cognee.modules.topology.topology_data_models import NodeModel
cognify_config = get_cognify_config()
base_config = get_base_config()
logger = logging.getLogger("topology")
class TopologyEngine:
@ -136,53 +128,3 @@ class TopologyEngine:
return
except Exception as e:
raise RuntimeError(f"Failed to add graph topology from {file_path}: {e}") from e
async def main():
# text = """Conservative PP in the lead in Spain, according to estimate
# An estimate has been published for Spain:
#
# Opposition leader Alberto Núñez Feijóos conservative Peoples party (PP): 32.4%
#
# Spanish prime minister Pedro Sánchezs Socialist party (PSOE): 30.2%
#
# The far-right Vox party: 10.4%
#
# In Spain, the right has sought to turn the European election into a referendum on Sánchez.
#
# Ahead of the vote, public attention has focused on a saga embroiling the prime ministers wife, Begoña Gómez, who is being investigated over allegations of corruption and influence-peddling, which Sanchez has dismissed as politically-motivated and totally baseless."""
# text_two = """The far-right Vox party: 10.4%"""
from cognee.api.v1.add import add
dataset_name = "explanations"
print(os.getcwd())
data_dir = os.path.abspath("../../.data")
print(os.getcwd())
await add(f"data://{data_dir}", dataset_name="explanations")
relational_config = get_relationaldb_config()
db_engine = relational_config.database_engine
datasets = db_engine.get_datasets()
dataset_files =[]
for added_dataset in datasets:
if dataset_name in added_dataset:
dataset_files.append((added_dataset, db_engine.get_files_metadata(added_dataset)))
print(dataset_files)
topology_engine = TopologyEngine(infer=True)
file_path = "example_data.json" # or 'example_data.csv'
#
# # Adding graph topology
graph = await topology_engine.add_graph_topology(file_path, dataset_files=dataset_files)
print(graph)
# Run the main function
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1 @@
from .get_user_db import get_user_db

View file

@ -0,0 +1,24 @@
import os
from functools import lru_cache
from fastapi_users import models
from fastapi_users.authentication import (
AuthenticationBackend,
BearerTransport,
JWTStrategy,
)
@lru_cache
def get_auth_backend():
bearer_transport = BearerTransport(tokenUrl = "auth/jwt/login")
def get_jwt_strategy() -> JWTStrategy[models.UP, models.ID]:
secret = os.getenv("FASTAPI_USERS_JWT_SECRET", "super_secret")
return JWTStrategy(secret, lifetime_seconds = 3600)
auth_backend = AuthenticationBackend(
name = "jwt",
transport = bearer_transport,
get_strategy = get_jwt_strategy,
)
return auth_backend

View file

@ -0,0 +1,21 @@
from fastapi.security import OAuth2PasswordRequestForm
from fastapi_users.exceptions import UserNotExists
from cognee.infrastructure.databases.relational import get_relational_engine
from ...get_user_manager import get_user_manager_context
from ...get_user_db import get_user_db_context
async def authenticate_user(email: str, password: str):
try:
relational_engine = get_relational_engine()
async with relational_engine.get_async_session() as session:
async with get_user_db_context(session) as user_db:
async with get_user_manager_context(user_db) as user_manager:
credentials = OAuth2PasswordRequestForm(username = email, password = password)
user = await user_manager.authenticate(credentials)
if user is None or not user.is_active:
return None
return user
except UserNotExists as error:
print(f"User {email} doesn't exist")
raise error

View file

@ -0,0 +1,15 @@
import uuid
from functools import lru_cache
from fastapi_users import FastAPIUsers
from .authentication.get_auth_backend import get_auth_backend
from .get_user_manager import get_user_manager
from .models.User import User
@lru_cache
def get_fastapi_users():
auth_backend = get_auth_backend()
fastapi_users = FastAPIUsers[User, uuid.UUID](get_user_manager, [auth_backend])
return fastapi_users

View file

@ -0,0 +1,10 @@
# from fastapi import Depends
from fastapi_users.db import SQLAlchemyUserDatabase
# from cognee.infrastructure.databases.relational import get_relational_engine
from .models.User import User
async def get_user_db(session):
yield SQLAlchemyUserDatabase(session, User)
from contextlib import asynccontextmanager
get_user_db_context = asynccontextmanager(get_user_db)

View file

@ -0,0 +1,32 @@
import os
import uuid
from typing import Optional
from fastapi import Depends, Request
from fastapi_users import BaseUserManager, UUIDIDMixin
from fastapi_users.db import SQLAlchemyUserDatabase
from .get_user_db import get_user_db
from .models import User
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
reset_password_token_secret = os.getenv("FASTAPI_USERS_RESET_PASSWORD_TOKEN_SECRET", "super_secret")
verification_token_secret = os.getenv("FASTAPI_USERS_VERIFICATION_TOKEN_SECRET", "super_secret")
async def on_after_register(self, user: User, request: Optional[Request] = None):
print(f"User {user.id} has registered.")
async def on_after_forgot_password(
self, user: User, token: str, request: Optional[Request] = None
):
print(f"User {user.id} has forgot their password. Reset token: {token}")
async def on_after_request_verify(
self, user: User, token: str, request: Optional[Request] = None
):
print(f"Verification requested for user {user.id}. Verification token: {token}")
async def get_user_manager(user_db: SQLAlchemyUserDatabase = Depends(get_user_db)):
yield UserManager(user_db)
from contextlib import asynccontextmanager
get_user_manager_context = asynccontextmanager(get_user_manager)

View file

@ -1,3 +0,0 @@
from .is_existing_memory import is_existing_memory
from .register_memory_for_user import register_memory_for_user
from .create_information_points import create_information_points

View file

@ -1,23 +0,0 @@
import uuid
from typing import List
from qdrant_client.models import PointStruct
from cognee.infrastructure.databases.vector.get_vector_database import get_vector_database
from cognee.infrastructure.llm.openai.openai_tools import async_get_embedding_with_backoff
async def create_information_points(memory_name: str, payload: List[str]):
vector_db = get_vector_database()
data_points = list()
for point in map(create_data_point, payload):
data_points.append(await point)
return await vector_db.create_data_points(memory_name, data_points)
async def create_data_point(data: str) -> PointStruct:
return PointStruct(
id = str(uuid.uuid4()),
vector = await async_get_embedding_with_backoff(data),
payload = {
"raw": data,
}
)

View file

@ -1,6 +0,0 @@
from cognee.infrastructure.databases.relational.get_database import get_database
async def is_existing_memory(memory_name: str):
memory = await (get_database().get_memory_by_name(memory_name))
return memory is not None

View file

@ -1,4 +0,0 @@
from cognee.infrastructure.databases.relational.get_database import get_database
def register_memory_for_user(user_id: str, memory_name: str):
return get_database().add_memory(user_id, memory_name)

View file

@ -0,0 +1,3 @@
from .create_user import create_user
from .get_default_user import get_default_user
from .create_default_user import create_default_user

View file

@ -0,0 +1,20 @@
import hashlib
from .create_user import create_user
async def create_default_user():
default_user_email = "default_user@example.com"
default_user_password = "default_password"
user = await create_user(
email = default_user_email,
password = await hash_password(default_user_password),
is_superuser = True,
is_active = True,
is_verified = True,
auto_login = True,
)
return user
async def hash_password(password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest()

View file

@ -0,0 +1,38 @@
from fastapi_users.exceptions import UserAlreadyExists
from cognee.infrastructure.databases.relational import get_relational_engine
from ..get_user_manager import get_user_manager_context
from ..get_user_db import get_user_db_context
from ..models.User import UserCreate
async def create_user(
email: str,
password: str,
is_superuser: bool = False,
is_active: bool = True,
is_verified: bool = False,
auto_login: bool = False,
):
try:
relational_engine = get_relational_engine()
async with relational_engine.get_async_session() as session:
async with get_user_db_context(session) as user_db:
async with get_user_manager_context(user_db) as user_manager:
user = await user_manager.create(
UserCreate(
email = email,
password = password,
is_superuser = is_superuser,
is_active = is_active,
is_verified = is_verified,
)
)
if auto_login:
await session.refresh(user)
return user
print(f"User created: {user.email}")
except UserAlreadyExists as error:
print(f"User {email} already exists")
raise error

View file

@ -0,0 +1,16 @@
from sqlalchemy.orm import joinedload
from sqlalchemy.future import select
from cognee.modules.users.models import User
from cognee.infrastructure.databases.relational import get_relational_engine
async def get_default_user():
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
query = select(User).options(joinedload(User.groups))\
.where(User.email == "default_user@example.com")
result = await session.execute(query)
user = result.scalars().first()
return user

View file

@ -0,0 +1,25 @@
from uuid import uuid4
from datetime import datetime, timezone
from sqlalchemy.orm import relationship, Mapped
from sqlalchemy import Column, ForeignKey, DateTime, UUID
from cognee.infrastructure.databases.relational import Base
from .ACLResources import ACLResources
class ACL(Base):
__tablename__ = "acls"
id = Column(UUID(as_uuid = True), primary_key = True, default = uuid4)
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc))
principal_id = Column(UUID(as_uuid = True), ForeignKey("principals.id"))
permission_id = Column(UUID(as_uuid = True), ForeignKey("permissions.id"))
principal = relationship("Principal")
permission = relationship("Permission")
resources: Mapped[list["Resource"]] = relationship(
"Resource",
secondary = ACLResources.__tablename__,
back_populates = "acls",
)

View file

@ -0,0 +1,11 @@
from datetime import datetime, timezone
from sqlalchemy import Column, ForeignKey, UUID, DateTime
from cognee.infrastructure.databases.relational import Base
class ACLResources(Base):
__tablename__ = "acl_resources"
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
acl_id = Column(UUID(as_uuid = True), ForeignKey("acls.id"), primary_key = True)
resource_id = Column(UUID(as_uuid = True), ForeignKey("resources.id"), primary_key = True)

Some files were not shown because too many files have changed in this diff Show more