Merge remote-tracking branch 'origin/dev' into dataset-permissions-auth-refactor-simplify-data-creation

This commit is contained in:
Boris Arzentar 2025-06-10 12:37:43 +02:00
commit 3d7457c594
No known key found for this signature in database
GPG key ID: D5CC274C784807B7
41 changed files with 1748 additions and 1169 deletions

View file

@ -1,6 +0,0 @@
# put your configuration values here
[runtime]
log_level = "WARNING" # the system log level of dlt
# use the dlthub_telemetry setting to enable/disable anonymous usage data reporting, see https://dlthub.com/docs/telemetry
dlthub_telemetry = false

2
.gitattributes vendored Normal file
View file

@ -0,0 +1,2 @@
# ignore jupyter notebooks in the language bar on github
notebooks/** linguist-vendored

View file

@ -11,30 +11,9 @@ jobs:
# notebook-location: notebooks/cognee_demo.ipynb
# secrets: inherit
run-llama-index-integration:
name: LlamaIndex Integration Notebook
uses: ./.github/workflows/reusable_notebook.yml
with:
notebook-location: notebooks/llama_index_cognee_integration.ipynb
secrets: inherit
run-cognee-llama-index:
name: Cognee LlamaIndex Notebook
uses: ./.github/workflows/reusable_notebook.yml
with:
notebook-location: notebooks/cognee_llama_index.ipynb
secrets: inherit
run-cognee-multimedia:
name: Cognee Multimedia Notebook
uses: ./.github/workflows/reusable_notebook.yml
with:
notebook-location: notebooks/cognee_multimedia_demo.ipynb
secrets: inherit
# run-graphrag-vs-rag:
# name: Graphrag vs Rag notebook
# uses: ./.github/workflows/reusable_notebook.yml
# with:
# notebook-location: notebooks/graphrag_vs_rag.ipynb
# secrets: inherit

View file

@ -13,6 +13,8 @@
<a href="https://cognee.ai">Learn more</a>
·
<a href="https://discord.gg/NQPKmU5CCg">Join Discord</a>
·
<a href="https://www.reddit.com/r/AIMemory/">Join r/AIMemory</a>
</p>
@ -28,7 +30,7 @@
Build dynamic Agent memory using scalable, modular ECL (Extract, Cognify, Load) pipelines.
Build dynamic memory for Agents and replace RAG using scalable, modular ECL (Extract, Cognify, Load) pipelines.
More on [use-cases](https://docs.cognee.ai/use-cases) and [evals](https://github.com/topoteretes/cognee/tree/main/evals)
@ -46,22 +48,20 @@ More on [use-cases](https://docs.cognee.ai/use-cases) and [evals](https://github
<div style="text-align: center">
<img src="https://raw.githubusercontent.com/topoteretes/cognee/refs/heads/main/assets/cognee_benefits.png" alt="Why cognee?" width="50%" />
</div>
</div>
## Features
- Interconnect and retrieve your past conversations, documents, images and audio transcriptions
- Reduce hallucinations, developer effort, and cost.
- Replaces RAG systems and reduces developer effort, and cost.
- Load data to graph and vector databases using only Pydantic
- Manipulate your data while ingesting from 30+ data sources
## Get Started
Get started quickly with a Google Colab <a href="https://colab.research.google.com/drive/1jHbWVypDgCLwjE71GSXhRL3YxYhCZzG1?usp=sharing">notebook</a> or <a href="https://github.com/topoteretes/cognee-starter">starter repo</a>
Get started quickly with a Google Colab <a href="https://colab.research.google.com/drive/1jHbWVypDgCLwjE71GSXhRL3YxYhCZzG1?usp=sharing">notebook</a> , <a href="https://deepnote.com/workspace/cognee-382213d0-0444-4c89-8265-13770e333c02/project/cognee-demo-78ffacb9-5832-4611-bb1a-560386068b30/notebook/Notebook-1-75b24cda566d4c24ab348f7150792601?utm_source=share-modal&utm_medium=product-shared-content&utm_campaign=notebook&utm_content=78ffacb9-5832-4611-bb1a-560386068b30">Deepnote notebook</a> or <a href="https://github.com/topoteretes/cognee-starter">starter repo</a>
## Contributing
@ -141,7 +141,15 @@ Example output:
```
### cognee UI
## Our paper is out! <a href="https://arxiv.org/abs/2505.24478" target="_blank" rel="noopener noreferrer">Read here</a>
<div style="text-align: center">
<img src="assets/cognee-paper.png" alt="cognee paper" width="100%" />
</div>
</div>
## Cognee UI
You can also cognify your files and query using cognee UI.

View file

@ -12,6 +12,8 @@ from sqlalchemy.util import await_only
from cognee.modules.users.methods import create_default_user, delete_user
from fastapi_users.exceptions import UserAlreadyExists
# revision identifiers, used by Alembic.
revision: str = "482cd6517ce4"
@ -21,7 +23,10 @@ depends_on: Union[str, Sequence[str], None] = "8057ae7329c2"
def upgrade() -> None:
await_only(create_default_user())
try:
await_only(create_default_user())
except UserAlreadyExists:
pass # It's fine if the default user already exists
def downgrade() -> None:

BIN
assets/cognee-paper.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 88 KiB

View file

@ -9,7 +9,7 @@
"version": "0.1.0",
"dependencies": {
"classnames": "^2.5.1",
"next": "14.2.3",
"next": "^14.2.26",
"ohmy-ui": "^0.0.6",
"react": "^18",
"react-dom": "^18",
@ -199,7 +199,9 @@
}
},
"node_modules/@next/env": {
"version": "14.2.3",
"version": "14.2.26",
"resolved": "https://registry.npmjs.org/@next/env/-/env-14.2.26.tgz",
"integrity": "sha512-vO//GJ/YBco+H7xdQhzJxF7ub3SUwft76jwaeOyVVQFHCi5DCnkP16WHB+JBylo4vOKPoZBlR94Z8xBxNBdNJA==",
"license": "MIT"
},
"node_modules/@next/eslint-plugin-next": {
@ -254,7 +256,9 @@
}
},
"node_modules/@next/swc-darwin-arm64": {
"version": "14.2.3",
"version": "14.2.26",
"resolved": "https://registry.npmjs.org/@next/swc-darwin-arm64/-/swc-darwin-arm64-14.2.26.tgz",
"integrity": "sha512-zDJY8gsKEseGAxG+C2hTMT0w9Nk9N1Sk1qV7vXYz9MEiyRoF5ogQX2+vplyUMIfygnjn9/A04I6yrUTRTuRiyQ==",
"cpu": [
"arm64"
],
@ -268,12 +272,13 @@
}
},
"node_modules/@next/swc-darwin-x64": {
"version": "14.2.3",
"resolved": "https://registry.npmjs.org/@next/swc-darwin-x64/-/swc-darwin-x64-14.2.3.tgz",
"integrity": "sha512-6adp7waE6P1TYFSXpY366xwsOnEXM+y1kgRpjSRVI2CBDOcbRjsJ67Z6EgKIqWIue52d2q/Mx8g9MszARj8IEA==",
"version": "14.2.26",
"resolved": "https://registry.npmjs.org/@next/swc-darwin-x64/-/swc-darwin-x64-14.2.26.tgz",
"integrity": "sha512-U0adH5ryLfmTDkahLwG9sUQG2L0a9rYux8crQeC92rPhi3jGQEY47nByQHrVrt3prZigadwj/2HZ1LUUimuSbg==",
"cpu": [
"x64"
],
"license": "MIT",
"optional": true,
"os": [
"darwin"
@ -283,12 +288,13 @@
}
},
"node_modules/@next/swc-linux-arm64-gnu": {
"version": "14.2.3",
"resolved": "https://registry.npmjs.org/@next/swc-linux-arm64-gnu/-/swc-linux-arm64-gnu-14.2.3.tgz",
"integrity": "sha512-cuzCE/1G0ZSnTAHJPUT1rPgQx1w5tzSX7POXSLaS7w2nIUJUD+e25QoXD/hMfxbsT9rslEXugWypJMILBj/QsA==",
"version": "14.2.26",
"resolved": "https://registry.npmjs.org/@next/swc-linux-arm64-gnu/-/swc-linux-arm64-gnu-14.2.26.tgz",
"integrity": "sha512-SINMl1I7UhfHGM7SoRiw0AbwnLEMUnJ/3XXVmhyptzriHbWvPPbbm0OEVG24uUKhuS1t0nvN/DBvm5kz6ZIqpg==",
"cpu": [
"arm64"
],
"license": "MIT",
"optional": true,
"os": [
"linux"
@ -298,12 +304,13 @@
}
},
"node_modules/@next/swc-linux-arm64-musl": {
"version": "14.2.3",
"resolved": "https://registry.npmjs.org/@next/swc-linux-arm64-musl/-/swc-linux-arm64-musl-14.2.3.tgz",
"integrity": "sha512-0D4/oMM2Y9Ta3nGuCcQN8jjJjmDPYpHX9OJzqk42NZGJocU2MqhBq5tWkJrUQOQY9N+In9xOdymzapM09GeiZw==",
"version": "14.2.26",
"resolved": "https://registry.npmjs.org/@next/swc-linux-arm64-musl/-/swc-linux-arm64-musl-14.2.26.tgz",
"integrity": "sha512-s6JaezoyJK2DxrwHWxLWtJKlqKqTdi/zaYigDXUJ/gmx/72CrzdVZfMvUc6VqnZ7YEvRijvYo+0o4Z9DencduA==",
"cpu": [
"arm64"
],
"license": "MIT",
"optional": true,
"os": [
"linux"
@ -313,12 +320,13 @@
}
},
"node_modules/@next/swc-linux-x64-gnu": {
"version": "14.2.3",
"resolved": "https://registry.npmjs.org/@next/swc-linux-x64-gnu/-/swc-linux-x64-gnu-14.2.3.tgz",
"integrity": "sha512-ENPiNnBNDInBLyUU5ii8PMQh+4XLr4pG51tOp6aJ9xqFQ2iRI6IH0Ds2yJkAzNV1CfyagcyzPfROMViS2wOZ9w==",
"version": "14.2.26",
"resolved": "https://registry.npmjs.org/@next/swc-linux-x64-gnu/-/swc-linux-x64-gnu-14.2.26.tgz",
"integrity": "sha512-FEXeUQi8/pLr/XI0hKbe0tgbLmHFRhgXOUiPScz2hk0hSmbGiU8aUqVslj/6C6KA38RzXnWoJXo4FMo6aBxjzg==",
"cpu": [
"x64"
],
"license": "MIT",
"optional": true,
"os": [
"linux"
@ -328,12 +336,13 @@
}
},
"node_modules/@next/swc-linux-x64-musl": {
"version": "14.2.3",
"resolved": "https://registry.npmjs.org/@next/swc-linux-x64-musl/-/swc-linux-x64-musl-14.2.3.tgz",
"integrity": "sha512-BTAbq0LnCbF5MtoM7I/9UeUu/8ZBY0i8SFjUMCbPDOLv+un67e2JgyN4pmgfXBwy/I+RHu8q+k+MCkDN6P9ViQ==",
"version": "14.2.26",
"resolved": "https://registry.npmjs.org/@next/swc-linux-x64-musl/-/swc-linux-x64-musl-14.2.26.tgz",
"integrity": "sha512-BUsomaO4d2DuXhXhgQCVt2jjX4B4/Thts8nDoIruEJkhE5ifeQFtvW5c9JkdOtYvE5p2G0hcwQ0UbRaQmQwaVg==",
"cpu": [
"x64"
],
"license": "MIT",
"optional": true,
"os": [
"linux"
@ -343,12 +352,13 @@
}
},
"node_modules/@next/swc-win32-arm64-msvc": {
"version": "14.2.3",
"resolved": "https://registry.npmjs.org/@next/swc-win32-arm64-msvc/-/swc-win32-arm64-msvc-14.2.3.tgz",
"integrity": "sha512-AEHIw/dhAMLNFJFJIJIyOFDzrzI5bAjI9J26gbO5xhAKHYTZ9Or04BesFPXiAYXDNdrwTP2dQceYA4dL1geu8A==",
"version": "14.2.26",
"resolved": "https://registry.npmjs.org/@next/swc-win32-arm64-msvc/-/swc-win32-arm64-msvc-14.2.26.tgz",
"integrity": "sha512-5auwsMVzT7wbB2CZXQxDctpWbdEnEW/e66DyXO1DcgHxIyhP06awu+rHKshZE+lPLIGiwtjo7bsyeuubewwxMw==",
"cpu": [
"arm64"
],
"license": "MIT",
"optional": true,
"os": [
"win32"
@ -358,12 +368,13 @@
}
},
"node_modules/@next/swc-win32-ia32-msvc": {
"version": "14.2.3",
"resolved": "https://registry.npmjs.org/@next/swc-win32-ia32-msvc/-/swc-win32-ia32-msvc-14.2.3.tgz",
"integrity": "sha512-vga40n1q6aYb0CLrM+eEmisfKCR45ixQYXuBXxOOmmoV8sYST9k7E3US32FsY+CkkF7NtzdcebiFT4CHuMSyZw==",
"version": "14.2.26",
"resolved": "https://registry.npmjs.org/@next/swc-win32-ia32-msvc/-/swc-win32-ia32-msvc-14.2.26.tgz",
"integrity": "sha512-GQWg/Vbz9zUGi9X80lOeGsz1rMH/MtFO/XqigDznhhhTfDlDoynCM6982mPCbSlxJ/aveZcKtTlwfAjwhyxDpg==",
"cpu": [
"ia32"
],
"license": "MIT",
"optional": true,
"os": [
"win32"
@ -373,12 +384,13 @@
}
},
"node_modules/@next/swc-win32-x64-msvc": {
"version": "14.2.3",
"resolved": "https://registry.npmjs.org/@next/swc-win32-x64-msvc/-/swc-win32-x64-msvc-14.2.3.tgz",
"integrity": "sha512-Q1/zm43RWynxrO7lW4ehciQVj+5ePBhOK+/K2P7pLFX3JaJ/IZVC69SHidrmZSOkqz7ECIOhhy7XhAFG4JYyHA==",
"version": "14.2.26",
"resolved": "https://registry.npmjs.org/@next/swc-win32-x64-msvc/-/swc-win32-x64-msvc-14.2.26.tgz",
"integrity": "sha512-2rdB3T1/Gp7bv1eQTTm9d1Y1sv9UuJ2LAwOE0Pe2prHKe32UNscj7YS13fRB37d0GAiGNR+Y7ZcW8YjDI8Ns0w==",
"cpu": [
"x64"
],
"license": "MIT",
"optional": true,
"os": [
"win32"
@ -2728,10 +2740,12 @@
"license": "MIT"
},
"node_modules/next": {
"version": "14.2.3",
"version": "14.2.26",
"resolved": "https://registry.npmjs.org/next/-/next-14.2.26.tgz",
"integrity": "sha512-b81XSLihMwCfwiUVRRja3LphLo4uBBMZEzBBWMaISbKTwOmq3wPknIETy/8000tr7Gq4WmbuFYPS7jOYIf+ZJw==",
"license": "MIT",
"dependencies": {
"@next/env": "14.2.3",
"@next/env": "14.2.26",
"@swc/helpers": "0.5.5",
"busboy": "1.6.0",
"caniuse-lite": "^1.0.30001579",
@ -2746,15 +2760,15 @@
"node": ">=18.17.0"
},
"optionalDependencies": {
"@next/swc-darwin-arm64": "14.2.3",
"@next/swc-darwin-x64": "14.2.3",
"@next/swc-linux-arm64-gnu": "14.2.3",
"@next/swc-linux-arm64-musl": "14.2.3",
"@next/swc-linux-x64-gnu": "14.2.3",
"@next/swc-linux-x64-musl": "14.2.3",
"@next/swc-win32-arm64-msvc": "14.2.3",
"@next/swc-win32-ia32-msvc": "14.2.3",
"@next/swc-win32-x64-msvc": "14.2.3"
"@next/swc-darwin-arm64": "14.2.26",
"@next/swc-darwin-x64": "14.2.26",
"@next/swc-linux-arm64-gnu": "14.2.26",
"@next/swc-linux-arm64-musl": "14.2.26",
"@next/swc-linux-x64-gnu": "14.2.26",
"@next/swc-linux-x64-musl": "14.2.26",
"@next/swc-win32-arm64-msvc": "14.2.26",
"@next/swc-win32-ia32-msvc": "14.2.26",
"@next/swc-win32-x64-msvc": "14.2.26"
},
"peerDependencies": {
"@opentelemetry/api": "^1.1.0",

View file

@ -10,11 +10,11 @@
},
"dependencies": {
"classnames": "^2.5.1",
"next": "14.2.3",
"ohmy-ui": "^0.0.6",
"react": "^18",
"react-dom": "^18",
"uuid": "^9.0.1"
"uuid": "^9.0.1",
"next": "^14.2.26"
},
"devDependencies": {
"@types/node": "^20",

View file

@ -53,7 +53,7 @@ export default function DatasetsView({
setExplorationDataset(dataset);
showExplorationWindow();
}
return (
<>
<Stack orientation="vertical" gap="4">
@ -95,10 +95,10 @@ export default function DatasetsView({
</DatasetItem>
))}
</Stack>
<Modal onClose={hideExplorationWindow} isOpen={isExplorationWindowShown} className={styles.explorerModal}>
<Modal closeOnBackdropClick={false} onClose={hideExplorationWindow} isOpen={isExplorationWindowShown} className={styles.explorerModal}>
<Spacer horizontal="2" vertical="3" wrap>
<Text>{dataset?.name}</Text>
</Spacer>
</Spacer>
<Explorer dataset={dataset!} />
</Modal>
</>

View file

@ -3,7 +3,7 @@
import { v4 } from 'uuid';
import classNames from 'classnames';
import { useCallback, useEffect, useState } from 'react';
import { CTAButton, Stack, Text, DropdownSelect, TextArea, useBoolean } from 'ohmy-ui';
import { CTAButton, Stack, Text, DropdownSelect, TextArea, useBoolean, Input } from 'ohmy-ui';
import { fetch } from '@/utils';
import styles from './SearchView.module.css';
import getHistory from '@/modules/chat/getHistory';
@ -33,8 +33,15 @@ export default function SearchView() {
}, {
value: 'RAG_COMPLETION',
label: 'Completion using RAG',
}, {
value: 'GRAPH_COMPLETION_COT',
label: 'Cognee\'s Chain of Thought search',
}, {
value: 'GRAPH_COMPLETION_CONTEXT_EXTENSION',
label: 'Cognee\'s Multi-Hop search',
}];
const [searchType, setSearchType] = useState(searchOptions[0]);
const [rangeValue, setRangeValue] = useState(10);
const scrollToBottom = useCallback(() => {
setTimeout(() => {
@ -90,6 +97,7 @@ export default function SearchView() {
body: JSON.stringify({
query: inputValue.trim(),
searchType: searchTypeValue,
topK: rangeValue,
}),
})
.then((response) => response.json())
@ -108,7 +116,7 @@ export default function SearchView() {
.catch(() => {
setInputValue(inputValue);
});
}, [inputValue, scrollToBottom, searchType.value]);
}, [inputValue, rangeValue, scrollToBottom, searchType.value]);
const {
value: isInputExpanded,
@ -122,6 +130,10 @@ export default function SearchView() {
}
};
const handleRangeValueChange = (event: React.ChangeEvent<HTMLInputElement>) => {
setRangeValue(parseInt(event.target.value));
};
return (
<Stack className={styles.searchViewContainer}>
<DropdownSelect<SelectOption>
@ -146,9 +158,15 @@ export default function SearchView() {
</Stack>
</div>
<form onSubmit={handleSearchSubmit}>
<Stack orientation="horizontal" align="end/" gap="2">
<Stack orientation="vertical" gap="2">
<TextArea onKeyUp={handleSubmitOnEnter} style={{ transition: 'height 0.3s ease', height: isInputExpanded ? '128px' : '38px' }} onFocus={expandInput} onBlur={contractInput} value={inputValue} onChange={handleInputChange} name="searchInput" placeholder="Search" />
<CTAButton hugContent type="submit">Search</CTAButton>
<Stack orientation="horizontal" gap="between">
<Stack orientation="horizontal" gap="2" align="center">
<label><Text>Search range: </Text></label>
<Input style={{ maxWidth: "90px" }} value={rangeValue} onChange={handleRangeValueChange} type="number" />
</Stack>
<CTAButton hugContent type="submit">Search</CTAButton>
</Stack>
</Stack>
</form>
</Stack>

View file

@ -1,6 +1,10 @@
{
"compilerOptions": {
"lib": ["dom", "dom.iterable", "esnext"],
"lib": [
"dom",
"dom.iterable",
"esnext"
],
"allowJs": true,
"skipLibCheck": true,
"strict": true,
@ -18,9 +22,19 @@
}
],
"paths": {
"@/*": ["./src/*"]
}
"@/*": [
"./src/*"
]
},
"target": "ES2017"
},
"include": ["next-env.d.ts", "**/*.ts", "**/*.tsx", ".next/types/**/*.ts"],
"exclude": ["node_modules"]
"include": [
"next-env.d.ts",
"**/*.ts",
"**/*.tsx",
".next/types/**/*.ts"
],
"exclude": [
"node_modules"
]
}

View file

@ -1,5 +1,7 @@
# cognee MCP server
Please refer to our documentation [here](https://docs.cognee.ai/how-to-guides/deployment/mcp) for further information.
### Installing Manually
A MCP server project
=======
@ -69,6 +71,19 @@ npx -y @smithery/cli install cognee --client claude
Define cognify tool in server.py
Restart your Claude desktop.
## Running the Server
### Standard stdio transport:
```bash
python src/server.py
```
### SSE transport:
```bash
python src/server.py --transport sse
```
## Development and Debugging
To use debugger, run:
```bash

View file

@ -0,0 +1,19 @@
# In case you choose to use OpenAI provider, just adjust the model and api_key.
LLM_API_KEY=""
LLM_MODEL="openai/gpt-4o-mini"
LLM_PROVIDER="openai"
# Not needed if you use OpenAI
LLM_ENDPOINT=""
LLM_API_VERSION=""
# In case you choose to use OpenAI provider, just adjust the model and api_key.
EMBEDDING_API_KEY=""
EMBEDDING_MODEL="openai/text-embedding-3-large"
EMBEDDING_PROVIDER="openai"
# Not needed if you use OpenAI
EMBEDDING_ENDPOINT=""
EMBEDDING_API_VERSION=""
GRAPHISTRY_USERNAME=""
GRAPHISTRY_PASSWORD=""

196
cognee-starter-kit/.gitignore vendored Normal file
View file

@ -0,0 +1,196 @@
.data
.env
.local.env
.prod.env
cognee/.data/
code_pipeline_output*/
*.lance/
.DS_Store
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
full_run.ipynb
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Cognee logs directory - keep directory, ignore contents
logs/*
!logs/.gitkeep
!logs/README.md
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.env.local
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
.vscode/
cognee/data/
cognee/cache/
# Default cognee system directory, used in development
.cognee_system/
.data_storage/
.artifacts/
.anon_id
node_modules/
# Evals
SWE-bench_testsample/
# ChromaDB Data
.chromadb_data/

View file

@ -0,0 +1,98 @@
# Cognee Starter Kit
Welcome to the <a href="https://github.com/topoteretes/cognee">cognee</a> Starter Repo! This repository is designed to help you get started quickly by providing a structured dataset and pre-built data pipelines using cognee to build powerful knowledge graphs.
You can use this repo to ingest, process, and visualize data in minutes.
By following this guide, you will:
- Load structured company and employee data
- Utilize pre-built pipelines for data processing
- Perform graph-based search and query operations
- Visualize entity relationships effortlessly on a graph
# How to Use This Repo 🛠
## Install uv if you don't have it on your system
```
pip install uv
```
## Install dependencies
```
uv sync
```
## Setup LLM
Add environment variables to `.env` file.
In case you choose to use OpenAI provider, add just the model and api_key.
```
LLM_PROVIDER=""
LLM_MODEL=""
LLM_ENDPOINT=""
LLM_API_KEY=""
LLM_API_VERSION=""
EMBEDDING_PROVIDER=""
EMBEDDING_MODEL=""
EMBEDDING_ENDPOINT=""
EMBEDDING_API_KEY=""
EMBEDDING_API_VERSION=""
```
Activate the Python environment:
```
source .venv/bin/activate
```
## Run the Default Pipeline
This script runs the cognify pipeline with default settings. It ingests text data, builds a knowledge graph, and allows you to run search queries.
```
python src/pipelines/default.py
```
## Run the Low-Level Pipeline
This script implements its own pipeline with custom ingestion task. It processes the given JSON data about companies and employees, making it searchable via a graph.
```
python src/pipelines/low_level.py
```
## Run the Custom Model Pipeline
Custom model uses custom pydantic model for graph extraction. This script categorizes programming languages as an example and visualizes relationships.
```
python src/pipelines/custom-model.py
```
## Graph preview
cognee provides a visualize_graph function that will render the graph for you.
```
graph_file_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".artifacts/graph_visualization.html")
).resolve()
)
await visualize_graph(graph_file_path)
```
If you want to use tools like Graphistry for graph visualization:
- create an account and API key from https://www.graphistry.com
- add the following environment variables to `.env` file:
```
GRAPHISTRY_USERNAME=""
GRAPHISTRY_PASSWORD=""
```
Note: `GRAPHISTRY_PASSWORD` is API key.
# What will you build with cognee?
- Expand the dataset by adding more structured/unstructured data
- Customize the data model to fit your use case
- Use the search API to build an intelligent assistant
- Visualize knowledge graphs for better insights

View file

@ -0,0 +1,11 @@
[project]
name = "cognee-starter"
version = "0.1.1"
description = "Starter project which can be harvested for parts"
readme = "README.md"
requires-python = ">=3.10, <=3.13"
dependencies = [
"cognee>=0.1.38",
]

View file

@ -0,0 +1,38 @@
[
{
"name": "TechNova Inc.",
"departments": [
"Engineering",
"Marketing"
]
},
{
"name": "GreenFuture Solutions",
"departments": [
"Research & Development",
"Sales",
"Customer Support"
]
},
{
"name": "Skyline Financials",
"departments": [
"Accounting"
]
},
{
"name": "MediCare Plus",
"departments": [
"Healthcare",
"Administration"
]
},
{
"name": "NextGen Robotics",
"departments": [
"AI Development",
"Manufacturing",
"HR"
]
}
]

View file

@ -0,0 +1,52 @@
[
{
"name": "John Doe",
"company": "TechNova Inc.",
"department": "Engineering"
},
{
"name": "Jane Smith",
"company": "TechNova Inc.",
"department": "Marketing"
},
{
"name": "Alice Johnson",
"company": "GreenFuture Solutions",
"department": "Sales"
},
{
"name": "Bob Williams",
"company": "GreenFuture Solutions",
"department": "Customer Support"
},
{
"name": "Michael Brown",
"company": "Skyline Financials",
"department": "Accounting"
},
{
"name": "Emily Davis",
"company": "MediCare Plus",
"department": "Healthcare"
},
{
"name": "David Wilson",
"company": "MediCare Plus",
"department": "Administration"
},
{
"name": "Emma Thompson",
"company": "NextGen Robotics",
"department": "AI Development"
},
{
"name": "Chris Martin",
"company": "NextGen Robotics",
"department": "Manufacturing"
},
{
"name": "Sophia White",
"company": "NextGen Robotics",
"department": "HR"
}
]

View file

@ -0,0 +1,92 @@
import os
import asyncio
import pathlib
from cognee import config, add, cognify, search, SearchType, prune, visualize_graph
# from cognee.shared.utils import render_graph
from cognee.low_level import DataPoint
async def main():
data_directory_path = str(
pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".data_storage")).resolve()
)
# Set up the data directory. Cognee will store files here.
config.data_root_directory(data_directory_path)
cognee_directory_path = str(
pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".cognee_system")).resolve()
)
# Set up the Cognee system directory. Cognee will store system files and databases here.
config.system_root_directory(cognee_directory_path)
# Prune data and system metadata before running, only if we want "fresh" state.
await prune.prune_data()
await prune.prune_system(metadata=True)
text = "The Python programming language is widely used in data analysis, web development, and machine learning."
# Add the text data to Cognee.
await add(text)
# Define a custom graph model for programming languages.
class FieldType(DataPoint):
name: str = "Field"
class Field(DataPoint):
name: str
is_type: FieldType
metadata: dict = {"index_fields": ["name"]}
class ProgrammingLanguageType(DataPoint):
name: str = "Programming Language"
class ProgrammingLanguage(DataPoint):
name: str
used_in: list[Field] = []
is_type: ProgrammingLanguageType
metadata: dict = {"index_fields": ["name"]}
# Cognify the text data.
await cognify(graph_model=ProgrammingLanguage)
# # Get a graphistry url (Register for a free account at https://www.graphistry.com)
# url = await render_graph()
# print(f"Graphistry URL: {url}")
# Or use our simple graph preview
graph_file_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".artifacts/graph_visualization.html")
).resolve()
)
await visualize_graph(graph_file_path)
# Completion query that uses graph data to form context.
graph_completion = await search(
query_text="What is python?", query_type=SearchType.GRAPH_COMPLETION
)
print("Graph completion result is:")
print(graph_completion)
# Completion query that uses document chunks to form context.
rag_completion = await search(
query_text="What is Python?", query_type=SearchType.RAG_COMPLETION
)
print("Completion result is:")
print(rag_completion)
# Query all summaries related to query.
summaries = await search(query_text="Python", query_type=SearchType.SUMMARIES)
print("Summary results are:")
for summary in summaries:
print(summary)
chunks = await search(query_text="Python", query_type=SearchType.CHUNKS)
print("Chunk results are:")
for chunk in chunks:
print(chunk)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,72 @@
import os
import asyncio
import pathlib
from cognee import config, add, cognify, search, SearchType, prune, visualize_graph
# from cognee.shared.utils import render_graph
async def main():
data_directory_path = str(
pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".data_storage")).resolve()
)
# Set up the data directory. Cognee will store files here.
config.data_root_directory(data_directory_path)
cognee_directory_path = str(
pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".cognee_system")).resolve()
)
# Set up the Cognee system directory. Cognee will store system files and databases here.
config.system_root_directory(cognee_directory_path)
# Prune data and system metadata before running, only if we want "fresh" state.
await prune.prune_data()
await prune.prune_system(metadata=True)
text = "The Python programming language is widely used in data analysis, web development, and machine learning."
# Add the text data to Cognee.
await add(text)
# Cognify the text data.
await cognify()
# # Get a graphistry url (Register for a free account at https://www.graphistry.com)
# url = await render_graph()
# print(f"Graphistry URL: {url}")
# Or use our simple graph preview
graph_file_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".artifacts/graph_visualization.html")
).resolve()
)
await visualize_graph(graph_file_path)
# Completion query that uses graph data to form context.
graph_completion = await search(
query_text="What is python?", query_type=SearchType.GRAPH_COMPLETION
)
print("Graph completion result is:")
print(graph_completion)
# Completion query that uses document chunks to form context.
rag_completion = await search(
query_text="What is Python?", query_type=SearchType.RAG_COMPLETION
)
print("Completion result is:")
print(rag_completion)
# Query all summaries related to query.
summaries = await search(query_text="Python", query_type=SearchType.SUMMARIES)
print("Summary results are:")
for summary in summaries:
print(summary)
chunks = await search(query_text="Python", query_type=SearchType.CHUNKS)
print("Chunk results are:")
for chunk in chunks:
print(chunk)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,125 @@
import os
import uuid
import json
import asyncio
import pathlib
from cognee import config, prune, search, SearchType, visualize_graph
from cognee.low_level import setup, DataPoint
from cognee.pipelines import run_tasks, Task
from cognee.tasks.storage import add_data_points
from cognee.tasks.storage.index_graph_edges import index_graph_edges
from cognee.modules.users.methods import get_default_user
class Person(DataPoint):
name: str
metadata: dict = {"index_fields": ["name"]}
class Department(DataPoint):
name: str
employees: list[Person]
metadata: dict = {"index_fields": ["name"]}
class CompanyType(DataPoint):
name: str = "Company"
class Company(DataPoint):
name: str
departments: list[Department]
is_type: CompanyType
metadata: dict = {"index_fields": ["name"]}
def ingest_files():
companies_file_path = os.path.join(os.path.dirname(__file__), "../data/companies.json")
companies = json.loads(open(companies_file_path, "r").read())
people_file_path = os.path.join(os.path.dirname(__file__), "../data/people.json")
people = json.loads(open(people_file_path, "r").read())
people_data_points = {}
departments_data_points = {}
for person in people:
new_person = Person(name=person["name"])
people_data_points[person["name"]] = new_person
if person["department"] not in departments_data_points:
departments_data_points[person["department"]] = Department(
name=person["department"], employees=[new_person]
)
else:
departments_data_points[person["department"]].employees.append(new_person)
companies_data_points = {}
# Create a single CompanyType node, so we connect all companies to it.
companyType = CompanyType()
for company in companies:
new_company = Company(name=company["name"], departments=[], is_type=companyType)
companies_data_points[company["name"]] = new_company
for department_name in company["departments"]:
if department_name not in departments_data_points:
departments_data_points[department_name] = Department(
name=department_name, employees=[]
)
new_company.departments.append(departments_data_points[department_name])
return companies_data_points.values()
async def main():
cognee_directory_path = str(
pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".cognee_system")).resolve()
)
# Set up the Cognee system directory. Cognee will store system files and databases here.
config.system_root_directory(cognee_directory_path)
# Prune system metadata before running, only if we want "fresh" state.
await prune.prune_system(metadata=True)
await setup()
# Generate a random dataset_id
dataset_id = uuid.uuid4()
user = await get_default_user()
pipeline = run_tasks(
[
Task(ingest_files),
Task(add_data_points),
],
dataset_id,
None,
user,
"demo_pipeline",
)
async for status in pipeline:
print(status)
await index_graph_edges()
# Or use our simple graph preview
graph_file_path = str(
os.path.join(os.path.dirname(__file__), ".artifacts/graph_visualization.html")
)
await visualize_graph(graph_file_path)
# Completion query that uses graph data to form context.
completion = await search(
query_text="Who works for GreenFuture Solutions?",
query_type=SearchType.GRAPH_COMPLETION,
)
print("Graph completion result is:")
print(completion)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -1,5 +1,5 @@
from uuid import UUID
from typing import Optional, Union
from typing import Optional
from datetime import datetime
from fastapi import Depends, APIRouter
from fastapi.responses import JSONResponse
@ -17,6 +17,7 @@ class SearchPayloadDTO(InDTO):
datasets: Optional[list[str]] = None
dataset_ids: Optional[list[UUID]] = None
query: str
top_k: Optional[int] = 10
def get_search_router() -> APIRouter:
@ -49,6 +50,7 @@ def get_search_router() -> APIRouter:
user=user,
datasets=payload.datasets,
dataset_ids=payload.dataset_ids,
top_k=payload.top_k,
)
return results

View file

@ -38,17 +38,19 @@ def extract_metrics_and_details(
for entry in data:
for metric, values in entry["metrics"].items():
score = values["score"]
metrics_data[metric].append(score)
if "reason" in values:
metric_details[metric].append(
{
"question": entry["question"],
"answer": entry["answer"],
"golden_answer": entry["golden_answer"],
"reason": values["reason"],
"score": score,
}
)
# Skip None scores from failed evaluations
if score is not None:
metrics_data[metric].append(score)
if "reason" in values:
metric_details[metric].append(
{
"question": entry["question"],
"answer": entry["answer"],
"golden_answer": entry["golden_answer"],
"reason": values["reason"],
"score": score,
}
)
return metrics_data, metric_details

View file

@ -7,10 +7,15 @@ from cognee.eval_framework.evaluation.metrics.f1 import F1ScoreMetric
from cognee.eval_framework.evaluation.metrics.context_coverage import ContextCoverageMetric
from typing import Any, Dict, List
from deepeval.metrics import ContextualRelevancyMetric
import time
from cognee.shared.logging_utils import get_logger
logger = get_logger()
class DeepEvalAdapter(BaseEvalAdapter):
def __init__(self):
self.n_retries = 5
self.g_eval_metrics = {
"correctness": self.g_eval_correctness(),
"EM": ExactMatchMetric(),
@ -19,6 +24,33 @@ class DeepEvalAdapter(BaseEvalAdapter):
"context_coverage": ContextCoverageMetric(),
}
def _calculate_metric(self, metric: str, test_case: LLMTestCase) -> Dict[str, Any]:
"""Calculate a single metric for a test case with retry logic."""
metric_to_calculate = self.g_eval_metrics[metric]
for attempt in range(self.n_retries):
try:
metric_to_calculate.measure(test_case)
return {
"score": metric_to_calculate.score,
"reason": metric_to_calculate.reason,
}
except Exception as e:
logger.warning(
f"Attempt {attempt + 1}/{self.n_retries} failed for metric '{metric}': {e}"
)
if attempt < self.n_retries - 1:
time.sleep(2**attempt) # Exponential backoff
else:
logger.error(
f"All {self.n_retries} attempts failed for metric '{metric}'. Returning None values."
)
return {
"score": None,
"reason": None,
}
async def evaluate_answers(
self, answers: List[Dict[str, Any]], evaluator_metrics: List[str]
) -> List[Dict[str, Any]]:
@ -40,12 +72,7 @@ class DeepEvalAdapter(BaseEvalAdapter):
)
metric_results = {}
for metric in evaluator_metrics:
metric_to_calculate = self.g_eval_metrics[metric]
metric_to_calculate.measure(test_case)
metric_results[metric] = {
"score": metric_to_calculate.score,
"reason": metric_to_calculate.reason,
}
metric_results[metric] = self._calculate_metric(metric, test_case)
results.append({**answer, "metrics": metric_results})
return results

View file

@ -16,6 +16,11 @@ async def get_graph_engine() -> GraphDBInterface:
# Async functions can't be cached. After creating and caching the graph engine
# handle all necessary async operations for different graph types bellow.
# Run any adapterspecific async initialization
if hasattr(graph_client, "initialize"):
await graph_client.initialize()
# Handle loading of graph for NetworkX
if config["graph_database_provider"].lower() == "networkx" and graph_client.graph is None:
await graph_client.load_graph_from_file()
@ -106,6 +111,18 @@ def create_graph_engine(
return KuzuAdapter(db_path=graph_file_path)
elif graph_database_provider == "kuzu-remote":
if not graph_database_url:
raise EnvironmentError("Missing required Kuzu remote URL.")
from .kuzu.remote_kuzu_adapter import RemoteKuzuAdapter
return RemoteKuzuAdapter(
api_url=graph_database_url,
username=graph_database_username,
password=graph_database_password,
)
elif graph_database_provider == "memgraph":
if not (graph_database_url and graph_database_username and graph_database_password):
raise EnvironmentError("Missing required Memgraph credentials.")

View file

@ -0,0 +1,197 @@
"""Adapter for remote Kuzu graph database via REST API."""
from cognee.shared.logging_utils import get_logger
import json
from typing import Dict, Any, List, Optional, Tuple
import aiohttp
from uuid import UUID
from cognee.infrastructure.databases.graph.kuzu.adapter import KuzuAdapter
logger = get_logger()
class UUIDEncoder(json.JSONEncoder):
"""Custom JSON encoder that handles UUID objects."""
def default(self, obj):
if isinstance(obj, UUID):
return str(obj)
return super().default(obj)
class RemoteKuzuAdapter(KuzuAdapter):
"""Adapter for remote Kuzu graph database operations via REST API."""
def __init__(self, api_url: str, username: str, password: str):
"""Initialize remote Kuzu database connection.
Args:
api_url: URL of the Kuzu REST API
username: Optional username for API authentication
password: Optional password for API authentication
"""
# Initialize parent with a dummy path since we're using REST API
super().__init__("/tmp/kuzu_remote")
self.api_url = api_url
self.username = username
self.password = password
self._session = None
self._schema_initialized = False
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create an aiohttp session."""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session
async def close(self):
"""Close the adapter and its session."""
if self._session and not self._session.closed:
await self._session.close()
self._session = None
async def _make_request(self, endpoint: str, data: dict) -> dict:
"""Make a request to the Kuzu API."""
url = f"{self.api_url}{endpoint}"
session = await self._get_session()
try:
# Use custom encoder for UUID serialization
json_data = json.dumps(data, cls=UUIDEncoder)
async with session.post(
url, data=json_data, headers={"Content-Type": "application/json"}
) as response:
if response.status != 200:
error_detail = await response.text()
logger.error(
f"API request failed with status {response.status}: {error_detail}\n"
f"Request data: {data}"
)
raise aiohttp.ClientResponseError(
response.request_info,
response.history,
status=response.status,
message=error_detail,
)
return await response.json()
except aiohttp.ClientError as e:
logger.error(f"API request failed: {str(e)}")
logger.error(f"Request data: {data}")
raise
async def query(self, query: str, params: Optional[dict] = None) -> List[Tuple]:
"""Execute a Kuzu query via the REST API."""
try:
# Initialize schema if needed
if not self._schema_initialized:
await self._initialize_schema()
response = await self._make_request(
"/query", {"query": query, "parameters": params or {}}
)
# Convert response to list of tuples
results = []
if "data" in response:
for row in response["data"]:
processed_row = []
for val in row:
if isinstance(val, dict) and "properties" in val:
try:
props = json.loads(val["properties"])
val.update(props)
del val["properties"]
except json.JSONDecodeError:
pass
processed_row.append(val)
results.append(tuple(processed_row))
return results
except Exception as e:
logger.error(f"Query execution failed: {str(e)}")
logger.error(f"Query: {query}")
logger.error(f"Parameters: {params}")
raise
async def _check_schema_exists(self) -> bool:
"""Check if the required schema exists without causing recursion."""
try:
# Make a direct request to check schema using Cypher
response = await self._make_request(
"/query",
{"query": "MATCH (n:Node) RETURN COUNT(n) > 0", "parameters": {}},
)
return bool(response.get("data") and response["data"][0][0])
except Exception as e:
logger.error(f"Failed to check schema: {e}")
return False
async def _create_schema(self):
"""Create the required schema tables."""
try:
# Create Node table if it doesn't exist
try:
await self._make_request(
"/query",
{
"query": """
CREATE NODE TABLE IF NOT EXISTS Node (
id STRING,
name STRING,
type STRING,
properties STRING,
created_at TIMESTAMP,
updated_at TIMESTAMP,
PRIMARY KEY (id)
)
""",
"parameters": {},
},
)
except aiohttp.ClientResponseError as e:
if "already exists" not in str(e):
raise
# Create EDGE table if it doesn't exist
try:
await self._make_request(
"/query",
{
"query": """
CREATE REL TABLE IF NOT EXISTS EDGE (
FROM Node TO Node,
relationship_name STRING,
properties STRING,
created_at TIMESTAMP,
updated_at TIMESTAMP
)
""",
"parameters": {},
},
)
except aiohttp.ClientResponseError as e:
if "already exists" not in str(e):
raise
self._schema_initialized = True
logger.info("Schema initialized successfully")
except Exception as e:
logger.error(f"Failed to create schema: {e}")
raise
async def _initialize_schema(self):
"""Initialize the database schema if it doesn't exist."""
if self._schema_initialized:
return
try:
if not await self._check_schema_exists():
await self._create_schema()
else:
self._schema_initialized = True
logger.info("Schema already exists")
except Exception as e:
logger.error(f"Failed to initialize schema: {e}")
raise

View file

@ -0,0 +1,33 @@
import asyncio
from cognee.infrastructure.databases.graph.kuzu.remote_kuzu_adapter import RemoteKuzuAdapter
from cognee.infrastructure.databases.graph.config import get_graph_config
async def main():
config = get_graph_config()
adapter = RemoteKuzuAdapter(
config.graph_database_url, config.graph_database_username, config.graph_database_password
)
try:
print("Node Count:")
result = await adapter.query("MATCH (n) RETURN COUNT(n) as count")
print(result)
print("\nEdge Count:")
result = await adapter.query("MATCH ()-[r]->() RETURN COUNT(r) as count")
print(result)
print("\nSample Nodes with Properties:")
result = await adapter.query("MATCH (n) RETURN n LIMIT 5")
print(result)
print("\nSample Relationships with Properties:")
result = await adapter.query("MATCH (n1)-[r]->(n2) RETURN n1, r, n2 LIMIT 5")
print(result)
finally:
await adapter.close()
if __name__ == "__main__":
asyncio.run(main())

View file

@ -1,5 +1,3 @@
#
"""Neo4j Adapter for Graph Database"""
import json
@ -29,6 +27,8 @@ from .neo4j_metrics_utils import (
logger = get_logger("Neo4jAdapter", level=ERROR)
BASE_LABEL = "__Node__"
class Neo4jAdapter(GraphDBInterface):
"""
@ -48,6 +48,15 @@ class Neo4jAdapter(GraphDBInterface):
graph_database_url,
auth=(graph_database_username, graph_database_password),
max_connection_lifetime=120,
notifications_min_severity="OFF",
)
async def initialize(self) -> None:
"""
Initializes the database: adds uniqueness constraint on id and performs indexing
"""
await self.query(
(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:`{BASE_LABEL}`) REQUIRE n.id IS UNIQUE;")
)
@asynccontextmanager
@ -103,8 +112,8 @@ class Neo4jAdapter(GraphDBInterface):
- bool: True if the node exists, otherwise False.
"""
results = self.query(
"""
MATCH (n)
f"""
MATCH (n:`{BASE_LABEL}`)
WHERE n.id = $node_id
RETURN COUNT(n) > 0 AS node_exists
""",
@ -129,7 +138,7 @@ class Neo4jAdapter(GraphDBInterface):
serialized_properties = self.serialize_properties(node.model_dump())
query = dedent(
"""MERGE (node {id: $node_id})
f"""MERGE (node: `{BASE_LABEL}`{{id: $node_id}})
ON CREATE SET node += $properties, node.updated_at = timestamp()
ON MATCH SET node += $properties, node.updated_at = timestamp()
WITH node, $node_label AS label
@ -161,9 +170,9 @@ class Neo4jAdapter(GraphDBInterface):
- None: None
"""
query = """
query = f"""
UNWIND $nodes AS node
MERGE (n {id: node.node_id})
MERGE (n: `{BASE_LABEL}`{{id: node.node_id}})
ON CREATE SET n += node.properties, n.updated_at = timestamp()
ON MATCH SET n += node.properties, n.updated_at = timestamp()
WITH n, node.label AS label
@ -215,9 +224,9 @@ class Neo4jAdapter(GraphDBInterface):
A list of nodes represented as dictionaries.
"""
query = """
query = f"""
UNWIND $node_ids AS id
MATCH (node {id: id})
MATCH (node: `{BASE_LABEL}`{{id: id}})
RETURN node"""
params = {"node_ids": node_ids}
@ -240,7 +249,7 @@ class Neo4jAdapter(GraphDBInterface):
The result of the query execution, typically indicating success or failure.
"""
query = "MATCH (node {id: $node_id}) DETACH DELETE node"
query = f"MATCH (node: `{BASE_LABEL}`{{id: $node_id}}) DETACH DELETE node"
params = {"node_id": node_id}
return await self.query(query, params)
@ -259,9 +268,9 @@ class Neo4jAdapter(GraphDBInterface):
- None: None
"""
query = """
query = f"""
UNWIND $node_ids AS id
MATCH (node {id: id})
MATCH (node: `{BASE_LABEL}`{{id: id}})
DETACH DELETE node"""
params = {"node_ids": node_ids}
@ -284,16 +293,15 @@ class Neo4jAdapter(GraphDBInterface):
- bool: True if the edge exists, otherwise False.
"""
query = """
MATCH (from_node)-[relationship]->(to_node)
WHERE from_node.id = $from_node_id AND to_node.id = $to_node_id AND type(relationship) = $edge_label
query = f"""
MATCH (from_node: `{BASE_LABEL}`)-[:`{edge_label}`]->(to_node: `{BASE_LABEL}`)
WHERE from_node.id = $from_node_id AND to_node.id = $to_node_id
RETURN COUNT(relationship) > 0 AS edge_exists
"""
params = {
"from_node_id": str(from_node),
"to_node_id": str(to_node),
"edge_label": edge_label,
}
edge_exists = await self.query(query, params)
@ -366,9 +374,9 @@ class Neo4jAdapter(GraphDBInterface):
query = dedent(
f"""\
MATCH (from_node {{id: $from_node}}),
(to_node {{id: $to_node}})
MERGE (from_node)-[r:{relationship_name}]->(to_node)
MATCH (from_node :`{BASE_LABEL}`{{id: $from_node}}),
(to_node :`{BASE_LABEL}`{{id: $to_node}})
MERGE (from_node)-[r:`{relationship_name}`]->(to_node)
ON CREATE SET r += $properties, r.updated_at = timestamp()
ON MATCH SET r += $properties, r.updated_at = timestamp()
RETURN r
@ -400,17 +408,17 @@ class Neo4jAdapter(GraphDBInterface):
- None: None
"""
query = """
query = f"""
UNWIND $edges AS edge
MATCH (from_node {id: edge.from_node})
MATCH (to_node {id: edge.to_node})
MATCH (from_node: `{BASE_LABEL}`{{id: edge.from_node}})
MATCH (to_node: `{BASE_LABEL}`{{id: edge.to_node}})
CALL apoc.merge.relationship(
from_node,
edge.relationship_name,
{
{{
source_node_id: edge.from_node,
target_node_id: edge.to_node
},
}},
edge.properties,
to_node
) YIELD rel
@ -451,8 +459,8 @@ class Neo4jAdapter(GraphDBInterface):
A list of edges connecting to the specified node, represented as tuples of details.
"""
query = """
MATCH (n {id: $node_id})-[r]-(m)
query = f"""
MATCH (n: `{BASE_LABEL}`{{id: $node_id}})-[r]-(m)
RETURN n, r, m
"""
@ -525,9 +533,9 @@ class Neo4jAdapter(GraphDBInterface):
- list[str]: A list of predecessor node IDs.
"""
if edge_label is not None:
query = """
MATCH (node)<-[r]-(predecessor)
WHERE node.id = $node_id AND type(r) = $edge_label
query = f"""
MATCH (node: `{BASE_LABEL}`)<-[r:`{edge_label}`]-(predecessor)
WHERE node.id = $node_id
RETURN predecessor
"""
@ -535,14 +543,13 @@ class Neo4jAdapter(GraphDBInterface):
query,
dict(
node_id=node_id,
edge_label=edge_label,
),
)
return [result["predecessor"] for result in results]
else:
query = """
MATCH (node)<-[r]-(predecessor)
query = f"""
MATCH (node: `{BASE_LABEL}`)<-[r]-(predecessor)
WHERE node.id = $node_id
RETURN predecessor
"""
@ -572,9 +579,9 @@ class Neo4jAdapter(GraphDBInterface):
- list[str]: A list of successor node IDs.
"""
if edge_label is not None:
query = """
MATCH (node)-[r]->(successor)
WHERE node.id = $node_id AND type(r) = $edge_label
query = f"""
MATCH (node: `{BASE_LABEL}`)-[r:`{edge_label}`]->(successor)
WHERE node.id = $node_id
RETURN successor
"""
@ -588,8 +595,8 @@ class Neo4jAdapter(GraphDBInterface):
return [result["successor"] for result in results]
else:
query = """
MATCH (node)-[r]->(successor)
query = f"""
MATCH (node: `{BASE_LABEL}`)-[r]->(successor)
WHERE node.id = $node_id
RETURN successor
"""
@ -634,8 +641,8 @@ class Neo4jAdapter(GraphDBInterface):
- Optional[Dict[str, Any]]: The requested node as a dictionary, or None if it does
not exist.
"""
query = """
MATCH (node {id: $node_id})
query = f"""
MATCH (node: `{BASE_LABEL}`{{id: $node_id}})
RETURN node
"""
results = await self.query(query, {"node_id": node_id})
@ -655,9 +662,9 @@ class Neo4jAdapter(GraphDBInterface):
- List[Dict[str, Any]]: A list of nodes represented as dictionaries.
"""
query = """
query = f"""
UNWIND $node_ids AS id
MATCH (node {id: id})
MATCH (node:`{BASE_LABEL}` {{id: id}})
RETURN node
"""
results = await self.query(query, {"node_ids": node_ids})
@ -677,13 +684,13 @@ class Neo4jAdapter(GraphDBInterface):
- list: A list of connections represented as tuples of details.
"""
predecessors_query = """
MATCH (node)<-[relation]-(neighbour)
predecessors_query = f"""
MATCH (node:`{BASE_LABEL}`)<-[relation]-(neighbour)
WHERE node.id = $node_id
RETURN neighbour, relation, node
"""
successors_query = """
MATCH (node)-[relation]->(neighbour)
successors_query = f"""
MATCH (node:`{BASE_LABEL}`)-[relation]->(neighbour)
WHERE node.id = $node_id
RETURN node, relation, neighbour
"""
@ -723,6 +730,7 @@ class Neo4jAdapter(GraphDBInterface):
- None: None
"""
# Not understanding
query = f"""
UNWIND $node_ids AS id
MATCH (node:`{id}`)-[r:{edge_label}]->(predecessor)
@ -751,6 +759,7 @@ class Neo4jAdapter(GraphDBInterface):
- None: None
"""
# Not understanding
query = f"""
UNWIND $node_ids AS id
MATCH (node:`{id}`)<-[r:{edge_label}]-(successor)

View file

@ -57,9 +57,9 @@ async def get_num_connected_components(adapter: Neo4jAdapter, graph_name: str):
found.
"""
query = f"""
CALL gds.wcc.stream('{graph_name}')
YIELD componentId
RETURN count(DISTINCT componentId) AS num_connected_components;
CALL gds.wcc.stats('{graph_name}')
YIELD componentCount
RETURN componentCount AS num_connected_components;
"""
result = await adapter.query(query)
@ -181,9 +181,9 @@ async def get_avg_clustering(adapter: Neo4jAdapter, graph_name: str):
The average clustering coefficient as a float, or 0 if no results are available.
"""
query = f"""
CALL gds.localClusteringCoefficient.stream('{graph_name}')
YIELD localClusteringCoefficient
RETURN avg(localClusteringCoefficient) AS avg_clustering;
CALL gds.localClusteringCoefficient.stats('{graph_name}')
YIELD averageClusteringCoefficient
RETURN averageClusteringCoefficient AS avg_clustering;
"""
result = await adapter.query(query)

View file

@ -453,6 +453,8 @@ class SQLAlchemyAdapter:
from cognee.infrastructure.files.storage import LocalStorage
await self.engine.dispose(close=True)
db_directory = path.dirname(self.db_path)
LocalStorage.ensure_directory_exists(db_directory)
with open(self.db_path, "w") as file:
file.write("")
else:

View file

@ -50,7 +50,7 @@ class GraphCompletionRetriever(BaseRetriever):
content = text
else:
name = node.attributes.get("name", "Unnamed Node")
content = name
content = node.attributes.get("description", name)
nodes[node.id] = {"node": node, "name": name, "content": content}
return nodes

View file

@ -1,7 +1,6 @@
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.orm import joinedload
import sqlalchemy.exc
from sqlalchemy.orm import selectinload
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.infrastructure.databases.exceptions import EntityNotFoundError
from ..models import User
@ -14,7 +13,7 @@ async def get_user(user_id: UUID):
user = (
await session.execute(
select(User)
.options(joinedload(User.roles), joinedload(User.tenant))
.options(selectinload(User.roles), selectinload(User.tenant))
.where(User.id == user_id)
)
).scalar()

View file

@ -0,0 +1,115 @@
import os
import shutil
import cognee
import pathlib
from cognee.shared.logging_utils import get_logger
from cognee.modules.search.types import SearchType
from cognee.modules.search.operations import get_history
from cognee.modules.users.methods import get_default_user
from cognee.infrastructure.databases.graph.config import get_graph_config
logger = get_logger()
async def main():
# Clean up test directories before starting
data_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_remote_kuzu")
).resolve()
)
cognee_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_remote_kuzu")
).resolve()
)
try:
# Set Kuzu as the graph database provider
cognee.config.set_graph_database_provider("kuzu")
cognee.config.data_root_directory(data_directory_path)
cognee.config.system_root_directory(cognee_directory_path)
# Configure remote Kuzu database using environment variables
os.environ["KUZU_HOST"] = os.getenv("KUZU_HOST", "localhost")
os.environ["KUZU_PORT"] = os.getenv("KUZU_PORT", "8000")
os.environ["KUZU_USERNAME"] = os.getenv("KUZU_USERNAME", "kuzu")
os.environ["KUZU_PASSWORD"] = os.getenv("KUZU_PASSWORD", "kuzu")
os.environ["KUZU_DATABASE"] = os.getenv("KUZU_DATABASE", "cognee_test")
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
dataset_name = "cs_explanations"
explanation_file_path = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt"
)
await cognee.add([explanation_file_path], dataset_name)
text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena.
At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states.
Classical physics cannot explain the operation of these quantum devices, and a scalable quantum computer could perform some calculations exponentially faster (with respect to input size scaling) than any modern "classical" computer. In particular, a large-scale quantum computer could break widely used encryption schemes and aid physicists in performing physical simulations; however, the current state of the technology is largely experimental and impractical, with several obstacles to useful applications. Moreover, scalable quantum computers do not hold promise for many practical tasks, and for many important tasks quantum speedups are proven impossible.
The basic unit of information in quantum computing is the qubit, similar to the bit in traditional digital electronics. Unlike a classical bit, a qubit can exist in a superposition of its two "basis" states. When measuring a qubit, the result is a probabilistic output of a classical bit, therefore making quantum computers nondeterministic in general. If a quantum computer manipulates the qubit in a particular way, wave interference effects can amplify the desired measurement results. The design of quantum algorithms involves creating procedures that allow a quantum computer to perform calculations efficiently and quickly.
Physically engineering high-quality qubits has proven challenging. If a physical qubit is not sufficiently isolated from its environment, it suffers from quantum decoherence, introducing noise into calculations. Paradoxically, perfectly isolating qubits is also undesirable because quantum computations typically need to initialize qubits, perform controlled qubit interactions, and measure the resulting quantum states. Each of those operations introduces errors and suffers from noise, and such inaccuracies accumulate.
In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
"""
await cognee.add([text], dataset_name)
await cognee.cognify([dataset_name])
from cognee.infrastructure.databases.vector import get_vector_engine
vector_engine = get_vector_engine()
random_node = (await vector_engine.search("Entity_name", "Quantum computer"))[0]
random_node_name = random_node.payload["text"]
search_results = await cognee.search(
query_type=SearchType.INSIGHTS, query_text=random_node_name
)
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted sentences are:\n")
for result in search_results:
print(f"{result}\n")
search_results = await cognee.search(
query_type=SearchType.CHUNKS, query_text=random_node_name
)
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted chunks are:\n")
for result in search_results:
print(f"{result}\n")
search_results = await cognee.search(
query_type=SearchType.SUMMARIES, query_text=random_node_name
)
assert len(search_results) != 0, "Query related summaries don't exist."
print("\nExtracted summaries are:\n")
for result in search_results:
print(f"{result}\n")
user = await get_default_user()
history = await get_history(user.id)
assert len(history) == 6, "Search history is not correct."
await cognee.prune.prune_data()
assert not os.path.isdir(data_directory_path), "Local data files are not deleted"
await cognee.prune.prune_system(metadata=True)
from cognee.infrastructure.databases.graph import get_graph_engine
graph_engine = await get_graph_engine()
nodes, edges = await graph_engine.get_graph_data()
assert len(nodes) == 0 and len(edges) == 0, "Remote Kuzu graph database is not empty"
finally:
# Ensure cleanup even if tests fail
for path in [data_directory_path, cognee_directory_path]:
if os.path.exists(path):
shutil.rmtree(path)
if __name__ == "__main__":
import asyncio
asyncio.run(main())

View file

@ -0,0 +1,159 @@
import asyncio
import random
import time
from cognee.infrastructure.databases.graph.kuzu.remote_kuzu_adapter import RemoteKuzuAdapter
from cognee.infrastructure.databases.graph.config import get_graph_config
from cognee.shared.logging_utils import get_logger
# Test configuration
BATCH_SIZE = 5000
NUM_BATCHES = 10
TOTAL_NODES = BATCH_SIZE * NUM_BATCHES
TOTAL_RELATIONSHIPS = TOTAL_NODES - 1
logger = get_logger()
async def create_node(adapter, node):
query = (
"CREATE (n:TestNode {"
f"id: '{node['id']}', "
f"name: '{node['name']}', "
f"value: {node['value']}"
"})"
)
await adapter.query(query)
async def create_relationship(adapter, source_id, target_id):
query = (
"MATCH (n1:TestNode {id: '" + str(source_id) + "'}), "
"(n2:TestNode {id: '" + str(target_id) + "'}) "
"CREATE (n1)-[r:CONNECTS_TO {weight: " + str(random.random()) + "}]->(n2)"
)
await adapter.query(query)
async def process_batch(adapter, start_id, batch_size):
batch_start = time.time()
batch_nodes = []
# Prepare batch data
logger.info(f"Preparing batch {start_id // batch_size + 1}/{NUM_BATCHES}...")
for j in range(batch_size):
node_id = start_id + j
properties = {
"id": str(node_id),
"name": f"TestNode_{node_id}",
"value": random.randint(1, 1000),
}
batch_nodes.append(properties)
# Create nodes concurrently
logger.info(
f"Creating {batch_size} nodes for batch {start_id // batch_size + 1}/{NUM_BATCHES}..."
)
nodes_start = time.time()
node_tasks = [create_node(adapter, node) for node in batch_nodes]
await asyncio.gather(*node_tasks)
nodes_time = time.time() - nodes_start
# Create relationships concurrently
logger.info(f"Creating relationships for batch {start_id // batch_size + 1}/{NUM_BATCHES}...")
rels_start = time.time()
rel_tasks = [
create_relationship(adapter, batch_nodes[j]["id"], batch_nodes[j + 1]["id"])
for j in range(len(batch_nodes) - 1)
]
await asyncio.gather(*rel_tasks)
rels_time = time.time() - rels_start
batch_time = time.time() - batch_start
logger.info(f"Batch {start_id // batch_size + 1}/{NUM_BATCHES} completed in {batch_time:.2f}s")
logger.info(f" - Nodes creation: {nodes_time:.2f}s")
logger.info(f" - Relationships creation: {rels_time:.2f}s")
return batch_time
async def create_test_data(adapter, batch_size=BATCH_SIZE):
tasks = []
# Create tasks for each batch
for i in range(0, TOTAL_NODES, batch_size):
task = asyncio.create_task(process_batch(adapter, i, batch_size))
tasks.append(task)
# Wait for all batches to complete
batch_times = await asyncio.gather(*tasks)
return sum(batch_times)
async def main():
config = get_graph_config()
adapter = RemoteKuzuAdapter(
config.graph_database_url, config.graph_database_username, config.graph_database_password
)
try:
logger.info("=== Starting Kuzu Stress Test ===")
logger.info(f"Configuration: {NUM_BATCHES} batches of {BATCH_SIZE} nodes each")
logger.info(f"Total nodes to create: {TOTAL_NODES}")
logger.info(f"Total relationships to create: {TOTAL_RELATIONSHIPS}")
start_time = time.time()
# Drop existing tables in correct order (relationships first, then nodes)
logger.info("[1/5] Dropping existing tables...")
await adapter.query("DROP TABLE IF EXISTS CONNECTS_TO")
await adapter.query("DROP TABLE IF EXISTS TestNode")
# Create node table
logger.info("[2/5] Creating node table structure...")
await adapter.query("""
CREATE NODE TABLE TestNode (
id STRING,
name STRING,
value INT64,
PRIMARY KEY (id)
)
""")
# Create relationship table
logger.info("[3/5] Creating relationship table structure...")
await adapter.query("""
CREATE REL TABLE CONNECTS_TO (
FROM TestNode TO TestNode,
weight DOUBLE
)
""")
# Clear existing test data
logger.info("[4/5] Clearing existing test data...")
await adapter.query("MATCH (n:TestNode) DETACH DELETE n")
# Create new test data
logger.info(
f"[5/5] Creating test data ({NUM_BATCHES} concurrent batches of {BATCH_SIZE} nodes each)..."
)
total_batch_time = await create_test_data(adapter)
end_time = time.time()
total_duration = end_time - start_time
# Verify the data
logger.info("Verifying data...")
result = await adapter.query("MATCH (n:TestNode) RETURN COUNT(n) as count")
logger.info(f"Total nodes created: {result}")
result = await adapter.query("MATCH ()-[r:CONNECTS_TO]->() RETURN COUNT(r) as count")
logger.info(f"Total relationships created: {result}")
logger.info("=== Test Summary ===")
logger.info(f"Total batch processing time: {total_batch_time:.2f} seconds")
logger.info(f"Total execution time: {total_duration:.2f} seconds")
finally:
await adapter.close()
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,66 @@
import unittest
import subprocess
import os
import sys
class TestPipelines(unittest.TestCase):
"""Tests that all pipelines run successfully."""
def setUp(self):
# Ensure we're in the correct directory
self.project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
self.pipelines_dir = os.path.join(self.project_root, "src", "pipelines")
# Required environment variables
self.required_env_vars = ["LLM_API_KEY", "EMBEDDING_API_KEY"]
# Check if required environment variables are set
missing_vars = [var for var in self.required_env_vars if not os.environ.get(var)]
if missing_vars:
self.skipTest(f"Missing required environment variables: {', '.join(missing_vars)}")
def _run_pipeline(self, script_name):
"""Helper method to run a pipeline script and return the result."""
script_path = os.path.join(self.pipelines_dir, script_name)
# Use the Python executable from the virtual environment
python_exe = os.path.join(self.project_root, ".venv", "bin", "python")
if not os.path.exists(python_exe):
python_exe = sys.executable
try:
result = subprocess.run(
[python_exe, script_path],
check=True,
capture_output=True,
text=True,
timeout=300, # 5 minute timeout
)
return result
except subprocess.CalledProcessError as e:
self.fail(
f"Pipeline {script_name} failed with code {e.returncode}. "
f"Stdout: {e.stdout}, Stderr: {e.stderr}"
)
except subprocess.TimeoutExpired:
self.fail(f"Pipeline {script_name} timed out after 300 seconds")
def test_default_pipeline(self):
"""Test that the default pipeline runs successfully."""
result = self._run_pipeline("default.py")
self.assertEqual(result.returncode, 0)
def test_low_level_pipeline(self):
"""Test that the low-level pipeline runs successfully."""
result = self._run_pipeline("low_level.py")
self.assertEqual(result.returncode, 0)
def test_custom_model_pipeline(self):
"""Test that the custom model pipeline runs successfully."""
result = self._run_pipeline("custom-model.py")
self.assertEqual(result.returncode, 0)
if __name__ == "__main__":
unittest.main()

View file

@ -14,7 +14,7 @@ echo "Environment: $ENVIRONMENT"
# smooth redeployments and container restarts while maintaining data integrity.
echo "Running database migrations..."
MIGRATION_OUTPUT=$(alembic upgrade head 2>&1)
MIGRATION_OUTPUT=$(alembic upgrade head)
MIGRATION_EXIT_CODE=$?
if [[ $MIGRATION_EXIT_CODE -ne 0 ]]; then
@ -37,10 +37,10 @@ sleep 2
if [ "$ENVIRONMENT" = "dev" ] || [ "$ENVIRONMENT" = "local" ]; then
if [ "$DEBUG" = "true" ]; then
echo "Waiting for the debugger to attach..."
debugpy --wait-for-client --listen 0.0.0.0:5678 -m gunicorn -w 3 -k uvicorn.workers.UvicornWorker -t 30000 --bind=0.0.0.0:8000 --log-level debug --reload cognee.api.client:app
debugpy --wait-for-client --listen 127.0.0.1:5678 -m gunicorn -w 3 -k uvicorn.workers.UvicornWorker -t 30000 --bind=0.0.0.0:8000 --log-level debug --reload cognee.api.client:app
else
gunicorn -w 3 -k uvicorn.workers.UvicornWorker -t 30000 --bind=0.0.0.0:8000 --log-level debug --reload cognee.api.client:app
fi
else
gunicorn -w 3 -k uvicorn.workers.UvicornWorker -t 30000 --bind=0.0.0.0:8000 --log-level error cognee.api.client:app
fi
gunicorn -w 3 -k uvicorn.workers.UvicornWorker -t 30000 --bind=0.0.0.0:8000 --log-level error cognee.api.client:app
fi

View file

@ -0,0 +1,204 @@
import os
import logging
import cognee
import asyncio
import json
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from dotenv import load_dotenv
from cognee.api.v1.search import SearchType
from cognee.modules.engine.models import NodeSet
from cognee.shared.logging_utils import setup_logging
load_dotenv()
os.environ["LLM_API_KEY"] = ""
# Notes: Nodesets cognee feature only works with kuzu and Neo4j graph databases
os.environ["GRAPH_DATABASE_PROVIDER"] = "kuzu"
class ProcurementMemorySystem:
"""Procurement system with persistent memory using Cognee"""
async def setup_memory_data(self):
"""Load and store procurement data in memory"""
# Procurement system dummy data
vendor_conversation_text_techsupply = """
Assistant: Hello! This is Sarah from TechSupply Solutions.
Thanks for reaching out for your IT procurement needs.
User: We're looking to procure 50 high-performance enterprise laptops.
Specs: Intel i7, 16GB RAM, 512GB SSD, dedicated graphics card.
Budget: $80,000. What models do you have?
Assistant: TechSupply Solutions can offer Dell Precision 5570 ($1,450) and Lenovo ThinkPad P1 ($1,550).
Both come with a 3-year warranty. Delivery: 23 weeks (Dell), 34 weeks (Lenovo).
User: Do you provide bulk discounts? We're planning another 200 units next quarter.
Assistant: Yes! Orders over $50,000 get 8% off.
So for your current order:
- Dell = $1,334 each ($66,700 total)
- Lenovo = $1,426 each ($71,300 total)
And for 200 units next quarter, we can offer 12% off with flexible delivery.
"""
vendor_conversation_text_office_solutions = """
Assistant: Hi, this is Martin from vendor Office Solutions. How can we assist you?
User: We need 50 laptops for our engineers.
Specs: i7 CPU, 16GB RAM, 512GB SSD, dedicated GPU.
We can spend up to $80,000. Can you meet this?
Assistant: Office Solutions can offer HP ZBook Power G9 for $1,600 each.
Comes with 2-year warranty, delivery time is 45 weeks.
User: That's a bit long — any options to speed it up?
Assistant: We can expedite for $75 per unit, bringing delivery to 34 weeks.
Also, for orders over $60,000 we give 6% off.
So:
- Base price = $1,600 $1,504 with discount
- Expedited price = $1,579
User: Understood. Any room for better warranty terms?
Assistant: Were working on adding a 3-year warranty option next quarter for enterprise clients.
"""
previous_purchases_text = """
Previous Purchase Records:
1. Vendor: TechSupply Solutions
Item: Desktop computers - 25 units
Amount: $35,000
Date: 2024-01-15
Performance: Excellent delivery, good quality, delivered 2 days early
Rating: 5/5
Notes: Responsive support team, competitive pricing
2. Vendor: Office Solutions
Item: Office furniture
Amount: $12,000
Date: 2024-02-20
Performance: Delayed delivery by 1 week, average quality
Rating: 2/5
Notes: Poor communication, but acceptable product quality
"""
procurement_preferences_text = """
Procurement Policies and Preferences:
1. Preferred vendors must have 3+ year warranty coverage
2. Maximum delivery time: 30 days for non-critical items
3. Bulk discount requirements: minimum 5% for orders over $50,000
4. Prioritize vendors with sustainable/green practices
5. Vendor rating system: require minimum 4/5 rating for new contracts
"""
# Initializing and pruning databases
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
# Store data in different memory categories
await cognee.add(
data=[vendor_conversation_text_techsupply, vendor_conversation_text_office_solutions],
node_set=["vendor_conversations"],
)
await cognee.add(data=previous_purchases_text, node_set=["purchase_history"])
await cognee.add(data=procurement_preferences_text, node_set=["procurement_policies"])
# Process all data through Cognee's knowledge graph
await cognee.cognify()
async def search_memory(self, query, search_categories=None):
"""Search across different memory layers"""
results = {}
for category in search_categories:
category_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION,
query_text=query,
node_type=NodeSet,
node_name=[category],
top_k=30,
)
results[category] = category_results
return results
async def run_procurement_example():
"""Main function demonstrating procurement memory system"""
print("Building AI Procurement System with Memory: Cognee Integration...\n")
# Initialize the procurement memory system
procurement_system = ProcurementMemorySystem()
# Setup memory with procurement data
print("Setting up procurement memory data...")
await procurement_system.setup_memory_data()
print("Memory successfully populated and processed.\n")
research_questions = {
"vendor_conversations": [
"What are the laptops that are discussed, together with their vendors?",
"What pricing was offered by each vendor before and after discounts?",
"What were the delivery time estimates for each product?",
],
"purchase_history": [
"Which vendors have we worked with in the past?",
"What were the satisfaction ratings for each vendor?",
"Were there any complaints or red flags associated with specific vendors?",
],
"procurement_policies": [
"What are our companys bulk discount requirements?",
"What is the maximum acceptable delivery time for non-critical items?",
"What is the minimum vendor rating for new contracts?",
],
}
research_notes = {}
print("Running contextual research questions...\n")
for category, questions in research_questions.items():
print(f"Category: {category}")
research_notes[category] = []
for q in questions:
print(f"Question: \n{q}")
results = await procurement_system.search_memory(q, search_categories=[category])
top_answer = results[category][0]
print(f"Answer: \n{top_answer.strip()}\n")
research_notes[category].append({"question": q, "answer": top_answer})
print("Contextual research complete.\n")
print("Compiling structured research information for decision-making...\n")
research_information = "\n\n".join(
f"Q: {note['question']}\nA: {note['answer'].strip()}"
for section in research_notes.values()
for note in section
)
print("Compiled Research Summary:\n")
print(research_information)
print("\nPassing research to LLM for final procurement recommendation...\n")
final_decision = await get_llm_client().acreate_structured_output(
text_input=research_information,
system_prompt="""You are a procurement decision assistant. Use the provided QA pairs that were collected through a research phase. Recommend the best vendor,
based on pricing, delivery, warranty, policy fit, and past performance. Be concise and justify your choice with evidence.
""",
response_model=str,
)
print("Final Decision:")
print(final_decision.strip())
# Run the example
if __name__ == "__main__":
setup_logging(logging.ERROR)
asyncio.run(run_procurement_example())

View file

@ -1,225 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Cognee Graphiti integration demo"
]
},
{
"cell_type": "markdown",
"metadata": {
"vscode": {
"languageId": "plaintext"
}
},
"source": [
"First we import the necessary libraries"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"\n",
"import cognee\n",
"from cognee.shared.logging_utils import get_logger, ERROR\n",
"from cognee.modules.pipelines import Task, run_tasks\n",
"from cognee.tasks.temporal_awareness import build_graph_with_temporal_awareness\n",
"from cognee.infrastructure.databases.relational import (\n",
" create_db_and_tables as create_relational_db_and_tables,\n",
")\n",
"from cognee.tasks.temporal_awareness.index_graphiti_objects import (\n",
" index_and_transform_graphiti_nodes_and_edges,\n",
")\n",
"from cognee.modules.retrieval.utils.brute_force_triplet_search import brute_force_triplet_search\n",
"from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionRetriever\n",
"from cognee.infrastructure.llm.prompts import read_query_prompt, render_prompt\n",
"from cognee.infrastructure.llm.get_llm_client import get_llm_client\n",
"from cognee.modules.users.methods import get_default_user"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set environment variables"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2025-01-15T10:43:57.893763Z",
"start_time": "2025-01-15T10:43:57.891332Z"
}
},
"outputs": [],
"source": [
"import os\n",
"\n",
"# We ignore warnigns for now\n",
"warnings.filterwarnings(\"ignore\")\n",
"\n",
"# API key for cognee\n",
"if \"LLM_API_KEY\" not in os.environ:\n",
" os.environ[\"LLM_API_KEY\"] = \"\"\n",
"\n",
"# API key for graphiti\n",
"if \"OPENAI_API_KEY\" not in os.environ:\n",
" os.environ[\"OPENAI_API_KEY\"] = \"\"\n",
"\n",
"GRAPH_DATABASE_PROVIDER = \"neo4j\"\n",
"GRAPH_DATABASE_USERNAME = \"neo4j\"\n",
"GRAPH_DATABASE_PASSWORD = \"pleaseletmein\"\n",
"GRAPH_DATABASE_URL = \"bolt://localhost:7687\"\n",
"\n",
"os.environ[\"GRAPH_DATABASE_PROVIDER\"] = GRAPH_DATABASE_PROVIDER\n",
"os.environ[\"GRAPH_DATABASE_USERNAME\"] = GRAPH_DATABASE_USERNAME\n",
"os.environ[\"GRAPH_DATABASE_PASSWORD\"] = GRAPH_DATABASE_PASSWORD\n",
"os.environ[\"GRAPH_DATABASE_URL\"] = GRAPH_DATABASE_URL\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Input texts with temporal information"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2025-01-15T10:43:57.928664Z",
"start_time": "2025-01-15T10:43:57.927105Z"
}
},
"outputs": [],
"source": [
"text_list = [\n",
" \"Kamala Harris is the Attorney General of California. She was previously \"\n",
" \"the district attorney for San Francisco.\",\n",
" \"As AG, Harris was in office from January 3, 2011 January 3, 2017\",\n",
"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Running graphiti + transforming its graph into cognee's core system (graph transformation + vector embeddings)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2025-01-15T10:44:25.008501Z",
"start_time": "2025-01-15T10:43:57.932240Z"
}
},
"outputs": [],
"source": [
"await cognee.prune.prune_data()\n",
"await cognee.prune.prune_system(metadata=True)\n",
"await create_relational_db_and_tables()\n",
"\n",
"# Initialize default user\n",
"user = await get_default_user()\n",
"\n",
"for text in text_list:\n",
" await cognee.add(text)\n",
"\n",
"tasks = [\n",
" Task(build_graph_with_temporal_awareness, text_list=text_list),\n",
" ]\n",
"\n",
"pipeline = run_tasks(tasks, user=user)\n",
"\n",
"async for result in pipeline:\n",
" print(result)\n",
"\n",
"await index_and_transform_graphiti_nodes_and_edges()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Retrieving and generating answer from graphiti graph with cognee retriever"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2025-01-15T10:44:27.844438Z",
"start_time": "2025-01-15T10:44:25.013325Z"
}
},
"outputs": [],
"source": [
"# Step 1: Formulating the Query 🔍\n",
"query = \"When was Kamala Harris in office?\"\n",
"\n",
"# Step 2: Searching for Relevant Triplets 📊\n",
"triplets = await brute_force_triplet_search(\n",
" query=query,\n",
" top_k=3,\n",
" collections=[\"graphitinode_content\", \"graphitinode_name\", \"graphitinode_summary\"],\n",
")\n",
"\n",
"# Step 3: Preparing the Context for the LLM\n",
"retriever = GraphCompletionRetriever()\n",
"context = await retriever.resolve_edges_to_text(triplets)\n",
"\n",
"args = {\"question\": query, \"context\": context}\n",
"\n",
"# Step 4: Generating Prompts ✍️\n",
"user_prompt = render_prompt(\"graph_context_for_question.txt\", args)\n",
"system_prompt = read_query_prompt(\"answer_simple_question_restricted.txt\")\n",
"\n",
"# Step 5: Interacting with the LLM 🤖\n",
"llm_client = get_llm_client()\n",
"computed_answer = await llm_client.acreate_structured_output(\n",
" text_input=user_prompt, # Input prompt for the user context\n",
" system_prompt=system_prompt, # System-level instructions for the model\n",
" response_model=str,\n",
")\n",
"\n",
"# Step 6: Displaying the Computed Answer ✨\n",
"print(f\"💡 Answer: {computed_answer}\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View file

@ -1,239 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cognee GraphRAG with LlamaIndex Documents"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install llama-index-core\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Load Data\n",
"\n",
"We will use a sample news article dataset retrieved from Diffbot, which Tomaz has conveniently made available on GitHub for easy access.\n",
"\n",
"The dataset contains 2,500 samples; for ease of experimentation, we will use 5 of these samples, which include the `title` and `text` of news articles."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"from llama_index.core import Document\n",
"\n",
"news = pd.read_csv(\n",
" \"https://raw.githubusercontent.com/tomasonjo/blog-datasets/main/news_articles.csv\"\n",
")[:5]\n",
"\n",
"news.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prepare documents as required by LlamaIndex"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"documents = [Document(text=f\"{row['title']}: {row['text']}\") for i, row in news.iterrows()]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set environment variables"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"# Setting environment variables\n",
"if \"GRAPHISTRY_USERNAME\" not in os.environ:\n",
" os.environ[\"GRAPHISTRY_USERNAME\"] = \"\"\n",
"\n",
"if \"GRAPHISTRY_PASSWORD\" not in os.environ:\n",
" os.environ[\"GRAPHISTRY_PASSWORD\"] = \"\"\n",
"\n",
"if \"LLM_API_KEY\" not in os.environ:\n",
" os.environ[\"LLM_API_KEY\"] = \"\"\n",
"\n",
"# \"neo4j\" or \"networkx\"\n",
"os.environ[\"GRAPH_DATABASE_PROVIDER\"] = \"networkx\"\n",
"# Not needed if using networkx\n",
"# os.environ[\"GRAPH_DATABASE_URL\"]=\"\"\n",
"# os.environ[\"GRAPH_DATABASE_USERNAME\"]=\"\"\n",
"# os.environ[\"GRAPH_DATABASE_PASSWORD\"]=\"\"\n",
"\n",
"# \"pgvector\", \"qdrant\", \"weaviate\" or \"lancedb\"\n",
"os.environ[\"VECTOR_DB_PROVIDER\"] = \"lancedb\"\n",
"# Not needed if using \"lancedb\" or \"pgvector\"\n",
"# os.environ[\"VECTOR_DB_URL\"]=\"\"\n",
"# os.environ[\"VECTOR_DB_KEY\"]=\"\"\n",
"\n",
"# Relational Database provider \"sqlite\" or \"postgres\"\n",
"os.environ[\"DB_PROVIDER\"] = \"sqlite\"\n",
"\n",
"# Database name\n",
"os.environ[\"DB_NAME\"] = \"cognee_db\"\n",
"\n",
"# Postgres specific parameters (Only if Postgres or PGVector is used)\n",
"# os.environ[\"DB_HOST\"]=\"127.0.0.1\"\n",
"# os.environ[\"DB_PORT\"]=\"5432\"\n",
"# os.environ[\"DB_USERNAME\"]=\"cognee\"\n",
"# os.environ[\"DB_PASSWORD\"]=\"cognee\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Run Cognee with LlamaIndex Documents"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from typing import Union, BinaryIO\n",
"\n",
"from cognee.infrastructure.databases.vector.pgvector import (\n",
" create_db_and_tables as create_pgvector_db_and_tables,\n",
")\n",
"from cognee.infrastructure.databases.relational import (\n",
" create_db_and_tables as create_relational_db_and_tables,\n",
")\n",
"from cognee.modules.users.models import User\n",
"from cognee.modules.users.methods import get_default_user\n",
"from cognee.tasks.ingestion.ingest_data import ingest_data\n",
"import cognee\n",
"\n",
"# Create a clean slate for cognee -- reset data and system state\n",
"await cognee.prune.prune_data()\n",
"await cognee.prune.prune_system(metadata=True)\n",
"\n",
"\n",
"# Add the LlamaIndex documents, and make it available for cognify\n",
"async def add(\n",
" data: Union[BinaryIO, list[BinaryIO], str, list[str]],\n",
" dataset_name: str = \"main_dataset\",\n",
" user: User = None,\n",
"):\n",
" await create_relational_db_and_tables()\n",
" await create_pgvector_db_and_tables()\n",
"\n",
" if user is None:\n",
" user = await get_default_user()\n",
"\n",
" await ingest_data(data, dataset_name, user)\n",
"\n",
"\n",
"await add(documents)\n",
"\n",
"# Use LLMs and cognee to create knowledge graph\n",
"await cognee.cognify()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Query Cognee for summaries related to data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from cognee import SearchType\n",
"\n",
"# Query cognee for summaries\n",
"search_results = await cognee.search(\n",
" query_type=SearchType.SUMMARIES, query_text=\"What are the main news discussed in the document?\"\n",
")\n",
"# Display search results\n",
"print(\"\\n Summary of main news discussed:\\n\")\n",
"print(search_results[0][\"text\"])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Render Knowledge Graph generated from provided data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import graphistry\n",
"\n",
"from cognee.infrastructure.databases.graph import get_graph_engine\n",
"from cognee.shared.utils import render_graph\n",
"\n",
"# Get graph\n",
"graphistry.login(\n",
" username=os.getenv(\"GRAPHISTRY_USERNAME\"), password=os.getenv(\"GRAPHISTRY_PASSWORD\")\n",
")\n",
"graph_engine = await get_graph_engine()\n",
"\n",
"graph_url = await render_graph(graph_engine.graph)\n",
"print(graph_url)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long