Merge branch 'main' into COG-698
This commit is contained in:
commit
ea879b2882
48 changed files with 756 additions and 215 deletions
69
.github/workflows/test_deduplication.yml
vendored
Normal file
69
.github/workflows/test_deduplication.yml
vendored
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
name: test | deduplication
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
pull_request:
|
||||
branches:
|
||||
- main
|
||||
types: [labeled, synchronize]
|
||||
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
RUNTIME__LOG_LEVEL: ERROR
|
||||
|
||||
jobs:
|
||||
get_docs_changes:
|
||||
name: docs changes
|
||||
uses: ./.github/workflows/get_docs_changes.yml
|
||||
|
||||
run_deduplication_test:
|
||||
name: test
|
||||
needs: get_docs_changes
|
||||
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' && ${{ github.event.label.name == 'run-checks' }}
|
||||
runs-on: ubuntu-latest
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
services:
|
||||
postgres:
|
||||
image: pgvector/pgvector:pg17
|
||||
env:
|
||||
POSTGRES_USER: cognee
|
||||
POSTGRES_PASSWORD: cognee
|
||||
POSTGRES_DB: cognee_db
|
||||
options: >-
|
||||
--health-cmd pg_isready
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 5
|
||||
ports:
|
||||
- 5432:5432
|
||||
|
||||
steps:
|
||||
- name: Check out
|
||||
uses: actions/checkout@master
|
||||
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.11.x'
|
||||
|
||||
- name: Install Poetry
|
||||
uses: snok/install-poetry@v1.3.2
|
||||
with:
|
||||
virtualenvs-create: true
|
||||
virtualenvs-in-project: true
|
||||
installer-parallel: true
|
||||
|
||||
- name: Install dependencies
|
||||
run: poetry install -E postgres --no-interaction
|
||||
|
||||
- name: Run deduplication test
|
||||
env:
|
||||
ENV: 'dev'
|
||||
LLM_API_KEY: ${{ secrets.OPENAI_API_KEY }}
|
||||
run: poetry run python ./cognee/tests/test_deduplication.py
|
||||
127
CODE_OF_CONDUCT.md
Normal file
127
CODE_OF_CONDUCT.md
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
# Contributor Covenant Code of Conduct
|
||||
|
||||
## Our Pledge
|
||||
|
||||
We as members, contributors, and leaders pledge to make participation in our
|
||||
community a harassment-free experience for everyone, regardless of age, body
|
||||
size, visible or invisible disability, ethnicity, sex characteristics, gender
|
||||
identity and expression, level of experience, education, socio-economic status,
|
||||
nationality, personal appearance, race, religion, or sexual identity
|
||||
and orientation.
|
||||
|
||||
We pledge to act and interact in ways that contribute to an open, welcoming,
|
||||
diverse, inclusive, and healthy community.
|
||||
|
||||
## Our Standards
|
||||
|
||||
Examples of behavior that contributes to a positive environment for our
|
||||
community include:
|
||||
|
||||
- Demonstrating empathy and kindness toward other people
|
||||
- Being respectful of differing opinions, viewpoints, and experiences
|
||||
- Giving and gracefully accepting constructive feedback
|
||||
- Accepting responsibility and apologizing to those affected by our mistakes,
|
||||
and learning from the experience
|
||||
- Focusing on what is best not just for us as individuals, but for the
|
||||
overall community
|
||||
|
||||
Examples of unacceptable behavior include:
|
||||
|
||||
- The use of sexualized language or imagery, and sexual attention or
|
||||
advances of any kind
|
||||
- Trolling, insulting or derogatory comments, and personal or political attacks
|
||||
- Public or private harassment
|
||||
- Publishing others' private information, such as a physical or email
|
||||
address, without their explicit permission
|
||||
- Other conduct which could reasonably be considered inappropriate in a
|
||||
professional setting
|
||||
|
||||
## Enforcement Responsibilities
|
||||
|
||||
Community leaders are responsible for clarifying and enforcing our standards of
|
||||
acceptable behavior and will take appropriate and fair corrective action in
|
||||
response to any behavior that they deem inappropriate, threatening, offensive,
|
||||
or harmful.
|
||||
|
||||
Community leaders have the right and responsibility to remove, edit, or reject
|
||||
comments, commits, code, wiki edits, issues, and other contributions that are
|
||||
not aligned to this Code of Conduct, and will communicate reasons for moderation
|
||||
decisions when appropriate.
|
||||
|
||||
## Scope
|
||||
|
||||
This Code of Conduct applies within all community spaces, and also applies when
|
||||
an individual is officially representing the community in public spaces.
|
||||
Examples of representing our community include using an official e-mail address,
|
||||
posting via an official social media account, or acting as an appointed
|
||||
representative at an online or offline event.
|
||||
|
||||
## Enforcement
|
||||
|
||||
Instances of abusive, harassing, or otherwise unacceptable behavior may be
|
||||
reported to the community leaders responsible for enforcement by emailing <NAME> at <EMAIL>.
|
||||
All complaints will be reviewed and investigated promptly and fairly.
|
||||
|
||||
All community leaders are obligated to respect the privacy and security of the
|
||||
reporter of any incident.
|
||||
|
||||
## Enforcement Guidelines
|
||||
|
||||
Community leaders will follow these Community Impact Guidelines in determining
|
||||
the consequences for any action they deem in violation of this Code of Conduct:
|
||||
|
||||
### 1. Correction
|
||||
|
||||
**Community Impact**: Use of inappropriate language or other behavior deemed
|
||||
unprofessional or unwelcome in the community.
|
||||
|
||||
**Consequence**: A private, written warning from community leaders, providing
|
||||
clarity around the nature of the violation and an explanation of why the
|
||||
behavior was inappropriate. A public apology may be requested.
|
||||
|
||||
### 2. Warning
|
||||
|
||||
**Community Impact**: A violation through a single incident or series
|
||||
of actions.
|
||||
|
||||
**Consequence**: A warning with consequences for continued behavior. No
|
||||
interaction with the people involved, including unsolicited interaction with
|
||||
those enforcing the Code of Conduct, for a specified period of time. This
|
||||
includes avoiding interactions in community spaces as well as external channels
|
||||
like social media. Violating these terms may lead to a temporary or
|
||||
permanent ban.
|
||||
|
||||
### 3. Temporary Ban
|
||||
|
||||
**Community Impact**: A serious violation of community standards, including
|
||||
sustained inappropriate behavior.
|
||||
|
||||
**Consequence**: A temporary ban from any sort of interaction or public
|
||||
communication with the community for a specified period of time. No public or
|
||||
private interaction with the people involved, including unsolicited interaction
|
||||
with those enforcing the Code of Conduct, is allowed during this period.
|
||||
Violating these terms may lead to a permanent ban.
|
||||
|
||||
### 4. Permanent Ban
|
||||
|
||||
**Community Impact**: Demonstrating a pattern of violation of community
|
||||
standards, including sustained inappropriate behavior, harassment of an
|
||||
individual, or aggression toward or disparagement of classes of individuals.
|
||||
|
||||
**Consequence**: A permanent ban from any sort of public interaction within
|
||||
the community.
|
||||
|
||||
## Attribution
|
||||
|
||||
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
|
||||
version 2.0, available at
|
||||
https://www.contributor-covenant.org/version/2/0/code_of_conduct.html.
|
||||
|
||||
Community Impact Guidelines were inspired by [Mozilla's code of conduct
|
||||
enforcement ladder](https://github.com/mozilla/diversity).
|
||||
|
||||
[homepage]: https://www.contributor-covenant.org
|
||||
|
||||
For answers to common questions about this code of conduct, see the FAQ at
|
||||
https://www.contributor-covenant.org/faq. Translations are available at
|
||||
https://www.contributor-covenant.org/translations.
|
||||
|
|
@ -49,10 +49,10 @@ python cognee/cognee/tests/test_library.py
|
|||
|
||||
```shell
|
||||
# Add your changes to the staging area:
|
||||
git add .
|
||||
git add .
|
||||
|
||||
# Commit changes with an adequate description:
|
||||
git commit -m "Describe your changes here"
|
||||
# Commit changes with an adequate description:
|
||||
git commit -m "Describe your changes here"
|
||||
|
||||
# Push your feature branch to your forked repository:
|
||||
git push origin feature/your-feature-name
|
||||
|
|
@ -73,7 +73,7 @@ The project maintainers will review your work, possibly suggest improvements, or
|
|||
|
||||
## 5. 📜 Code of Conduct
|
||||
|
||||
Ensure you adhere to the project's Code of Conduct throughout your participation.
|
||||
Ensure you adhere to the project's [Code of Conduct](https://github.com/topoteretes/cognee/blob/main/CODE_OF_CONDUCT.md) throughout your participation.
|
||||
|
||||
## 6. 📫 Contact
|
||||
|
||||
|
|
|
|||
10
NOTICE.md
Normal file
10
NOTICE.md
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
topoteretes - cognee
|
||||
Copyright © 2024 Topoteretes UG. (haftungsbeschränkt), Schonehauser Allee 163, Berlin.
|
||||
|
||||
This project includes software developed at Topoteretes UG. (https://www.cognee.ai/).
|
||||
|
||||
<!-- Add software from external sources that you used here. e.g.
|
||||
|
||||
This project redistributes code originally from <website>.
|
||||
|
||||
-->
|
||||
|
|
@ -33,11 +33,11 @@ async def add(data: Union[BinaryIO, List[BinaryIO], str, List[str]], dataset_nam
|
|||
|
||||
# data is text
|
||||
else:
|
||||
file_path = save_data_to_file(data, dataset_name)
|
||||
file_path = save_data_to_file(data)
|
||||
return await add([file_path], dataset_name)
|
||||
|
||||
if hasattr(data, "file"):
|
||||
file_path = save_data_to_file(data.file, dataset_name, filename = data.filename)
|
||||
file_path = save_data_to_file(data.file, filename = data.filename)
|
||||
return await add([file_path], dataset_name)
|
||||
|
||||
# data is a list of file paths or texts
|
||||
|
|
@ -45,13 +45,13 @@ async def add(data: Union[BinaryIO, List[BinaryIO], str, List[str]], dataset_nam
|
|||
|
||||
for data_item in data:
|
||||
if hasattr(data_item, "file"):
|
||||
file_paths.append(save_data_to_file(data_item, dataset_name, filename = data_item.filename))
|
||||
file_paths.append(save_data_to_file(data_item, filename = data_item.filename))
|
||||
elif isinstance(data_item, str) and (
|
||||
data_item.startswith("/") or data_item.startswith("file://")
|
||||
):
|
||||
file_paths.append(data_item)
|
||||
elif isinstance(data_item, str):
|
||||
file_paths.append(save_data_to_file(data_item, dataset_name))
|
||||
file_paths.append(save_data_to_file(data_item))
|
||||
|
||||
if len(file_paths) > 0:
|
||||
return await add_files(file_paths, dataset_name, user)
|
||||
|
|
|
|||
|
|
@ -81,13 +81,13 @@ async def run_cognify_pipeline(dataset: Dataset, user: User):
|
|||
Task(classify_documents),
|
||||
Task(check_permissions_on_documents, user = user, permissions = ["write"]),
|
||||
Task(extract_chunks_from_documents), # Extract text chunks based on the document type.
|
||||
Task(add_data_points, task_config = { "batch_size": 10 }),
|
||||
Task(extract_graph_from_data, graph_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Generate knowledge graphs from the document chunks.
|
||||
Task(
|
||||
summarize_text,
|
||||
summarization_model = cognee_config.summarization_model,
|
||||
task_config = { "batch_size": 10 }
|
||||
),
|
||||
Task(add_data_points, task_config = { "batch_size": 10 }),
|
||||
]
|
||||
|
||||
pipeline = run_tasks(tasks, data_documents, "cognify_pipeline")
|
||||
|
|
|
|||
|
|
@ -29,7 +29,14 @@ class LiteLLMEmbeddingEngine(EmbeddingEngine):
|
|||
self.model = model
|
||||
self.dimensions = dimensions
|
||||
|
||||
MAX_RETRIES = 5
|
||||
retry_count = 0
|
||||
|
||||
async def embed_text(self, text: List[str]) -> List[List[float]]:
|
||||
async def exponential_backoff(attempt):
|
||||
wait_time = min(10 * (2 ** attempt), 60) # Max 60 seconds
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
try:
|
||||
response = await litellm.aembedding(
|
||||
self.model,
|
||||
|
|
@ -38,11 +45,18 @@ class LiteLLMEmbeddingEngine(EmbeddingEngine):
|
|||
api_base = self.endpoint,
|
||||
api_version = self.api_version
|
||||
)
|
||||
|
||||
self.retry_count = 0
|
||||
|
||||
return [data["embedding"] for data in response.data]
|
||||
|
||||
except litellm.exceptions.ContextWindowExceededError as error:
|
||||
if isinstance(text, list):
|
||||
parts = [text[0:math.ceil(len(text)/2)], text[math.ceil(len(text)/2):]]
|
||||
if len(text) == 1:
|
||||
parts = [text]
|
||||
else:
|
||||
parts = [text[0:math.ceil(len(text)/2)], text[math.ceil(len(text)/2):]]
|
||||
|
||||
parts_futures = [self.embed_text(part) for part in parts]
|
||||
embeddings = await asyncio.gather(*parts_futures)
|
||||
|
||||
|
|
@ -50,11 +64,21 @@ class LiteLLMEmbeddingEngine(EmbeddingEngine):
|
|||
for embeddings_part in embeddings:
|
||||
all_embeddings.extend(embeddings_part)
|
||||
|
||||
return [data["embedding"] for data in all_embeddings]
|
||||
return all_embeddings
|
||||
|
||||
logger.error("Context window exceeded for embedding text: %s", str(error))
|
||||
raise error
|
||||
|
||||
except litellm.exceptions.RateLimitError:
|
||||
if self.retry_count >= self.MAX_RETRIES:
|
||||
raise Exception(f"Rate limit exceeded and no more retries left.")
|
||||
|
||||
await exponential_backoff(self.retry_count)
|
||||
|
||||
self.retry_count += 1
|
||||
|
||||
return await self.embed_text(text)
|
||||
|
||||
except Exception as error:
|
||||
logger.error("Error embedding text: %s", str(error))
|
||||
raise error
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
from typing import BinaryIO, TypedDict
|
||||
import hashlib
|
||||
from .guess_file_type import guess_file_type
|
||||
from cognee.shared.utils import get_file_content_hash
|
||||
|
||||
|
||||
class FileMetadata(TypedDict):
|
||||
|
|
@ -7,10 +9,14 @@ class FileMetadata(TypedDict):
|
|||
file_path: str
|
||||
mime_type: str
|
||||
extension: str
|
||||
content_hash: str
|
||||
|
||||
def get_file_metadata(file: BinaryIO) -> FileMetadata:
|
||||
"""Get metadata from a file"""
|
||||
file.seek(0)
|
||||
content_hash = get_file_content_hash(file)
|
||||
file.seek(0)
|
||||
|
||||
file_type = guess_file_type(file)
|
||||
|
||||
file_path = file.name
|
||||
|
|
@ -21,4 +27,5 @@ def get_file_metadata(file: BinaryIO) -> FileMetadata:
|
|||
file_path = file_path,
|
||||
mime_type = file_type.mime,
|
||||
extension = file_type.extension,
|
||||
content_hash = content_hash,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ class TextChunker():
|
|||
is_part_of = self.document,
|
||||
chunk_index = self.chunk_index,
|
||||
cut_type = chunk_data["cut_type"],
|
||||
contains = [],
|
||||
_metadata = {
|
||||
"index_fields": ["text"],
|
||||
"metadata_id": self.document.metadata_id
|
||||
|
|
@ -52,6 +53,7 @@ class TextChunker():
|
|||
is_part_of = self.document,
|
||||
chunk_index = self.chunk_index,
|
||||
cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
|
||||
contains = [],
|
||||
_metadata = {
|
||||
"index_fields": ["text"],
|
||||
"metadata_id": self.document.metadata_id
|
||||
|
|
@ -73,6 +75,7 @@ class TextChunker():
|
|||
is_part_of = self.document,
|
||||
chunk_index = self.chunk_index,
|
||||
cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
|
||||
contains = [],
|
||||
_metadata = {
|
||||
"index_fields": ["text"],
|
||||
"metadata_id": self.document.metadata_id
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
from typing import Optional
|
||||
from typing import List, Optional
|
||||
from cognee.infrastructure.engine import DataPoint
|
||||
from cognee.modules.data.processing.document_types import Document
|
||||
from cognee.modules.engine.models import Entity
|
||||
|
||||
class DocumentChunk(DataPoint):
|
||||
__tablename__ = "document_chunk"
|
||||
|
|
@ -9,6 +10,7 @@ class DocumentChunk(DataPoint):
|
|||
chunk_index: int
|
||||
cut_type: str
|
||||
is_part_of: Document
|
||||
contains: List[Entity] = None
|
||||
|
||||
_metadata: Optional[dict] = {
|
||||
"index_fields": ["text"],
|
||||
|
|
|
|||
1
cognee/modules/chunking/models/__init__.py
Normal file
1
cognee/modules/chunking/models/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
|||
from .DocumentChunk import DocumentChunk
|
||||
|
|
@ -1,7 +1,6 @@
|
|||
from datetime import datetime, timezone
|
||||
from typing import List
|
||||
from uuid import uuid4
|
||||
|
||||
from sqlalchemy import UUID, Column, DateTime, String
|
||||
from sqlalchemy.orm import Mapped, relationship
|
||||
|
||||
|
|
@ -19,6 +18,8 @@ class Data(Base):
|
|||
extension = Column(String)
|
||||
mime_type = Column(String)
|
||||
raw_data_location = Column(String)
|
||||
owner_id = Column(UUID, index=True)
|
||||
content_hash = Column(String)
|
||||
created_at = Column(
|
||||
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
|
||||
)
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ import inspect
|
|||
import json
|
||||
import re
|
||||
import warnings
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
from sqlalchemy import select
|
||||
from typing import Any, BinaryIO, Union
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
from cognee.infrastructure.engine import DataPoint
|
||||
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
|
||||
from cognee.modules.engine.models.EntityType import EntityType
|
||||
|
||||
|
||||
|
|
@ -8,7 +7,6 @@ class Entity(DataPoint):
|
|||
name: str
|
||||
is_a: EntityType
|
||||
description: str
|
||||
mentioned_in: DocumentChunk
|
||||
|
||||
_metadata: dict = {
|
||||
"index_fields": ["name"],
|
||||
|
|
|
|||
|
|
@ -1,13 +1,10 @@
|
|||
from cognee.infrastructure.engine import DataPoint
|
||||
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
|
||||
|
||||
|
||||
class EntityType(DataPoint):
|
||||
__tablename__ = "entity_type"
|
||||
name: str
|
||||
type: str
|
||||
description: str
|
||||
exists_in: DocumentChunk
|
||||
|
||||
_metadata: dict = {
|
||||
"index_fields": ["name"],
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
from typing import Optional
|
||||
|
||||
from cognee.infrastructure.engine import DataPoint
|
||||
from cognee.modules.chunking.models import DocumentChunk
|
||||
from cognee.modules.engine.models import Entity, EntityType
|
||||
from cognee.modules.engine.utils import (
|
||||
generate_edge_name,
|
||||
|
|
@ -11,7 +11,8 @@ from cognee.shared.data_models import KnowledgeGraph
|
|||
|
||||
|
||||
def expand_with_nodes_and_edges(
|
||||
graph_node_index: list[tuple[DataPoint, KnowledgeGraph]],
|
||||
data_chunks: list[DocumentChunk],
|
||||
chunk_graphs: list[KnowledgeGraph],
|
||||
existing_edges_map: Optional[dict[str, bool]] = None,
|
||||
):
|
||||
if existing_edges_map is None:
|
||||
|
|
@ -19,9 +20,10 @@ def expand_with_nodes_and_edges(
|
|||
|
||||
added_nodes_map = {}
|
||||
relationships = []
|
||||
data_points = []
|
||||
|
||||
for graph_source, graph in graph_node_index:
|
||||
for index, data_chunk in enumerate(data_chunks):
|
||||
graph = chunk_graphs[index]
|
||||
|
||||
if graph is None:
|
||||
continue
|
||||
|
||||
|
|
@ -38,7 +40,6 @@ def expand_with_nodes_and_edges(
|
|||
name = type_node_name,
|
||||
type = type_node_name,
|
||||
description = type_node_name,
|
||||
exists_in = graph_source,
|
||||
)
|
||||
added_nodes_map[f"{str(type_node_id)}_type"] = type_node
|
||||
else:
|
||||
|
|
@ -50,9 +51,13 @@ def expand_with_nodes_and_edges(
|
|||
name = node_name,
|
||||
is_a = type_node,
|
||||
description = node.description,
|
||||
mentioned_in = graph_source,
|
||||
)
|
||||
data_points.append(entity_node)
|
||||
|
||||
if data_chunk.contains is None:
|
||||
data_chunk.contains = []
|
||||
|
||||
data_chunk.contains.append(entity_node)
|
||||
|
||||
added_nodes_map[f"{str(node_id)}_entity"] = entity_node
|
||||
|
||||
# Add relationship that came from graphs.
|
||||
|
|
@ -80,4 +85,4 @@ def expand_with_nodes_and_edges(
|
|||
)
|
||||
existing_edges_map[edge_key] = True
|
||||
|
||||
return (data_points, relationships)
|
||||
return (data_chunks, relationships)
|
||||
|
|
|
|||
|
|
@ -1,154 +1,115 @@
|
|||
from datetime import datetime, timezone
|
||||
|
||||
from cognee.infrastructure.engine import DataPoint
|
||||
from cognee.modules.storage.utils import copy_model
|
||||
|
||||
async def get_graph_from_model(
|
||||
data_point: DataPoint,
|
||||
added_nodes: dict,
|
||||
added_edges: dict,
|
||||
visited_properties: dict = None,
|
||||
include_root = True,
|
||||
added_nodes = None,
|
||||
added_edges = None,
|
||||
visited_properties = None,
|
||||
):
|
||||
if str(data_point.id) in added_nodes:
|
||||
return [], []
|
||||
|
||||
nodes = []
|
||||
edges = []
|
||||
added_nodes = added_nodes or {}
|
||||
added_edges = added_edges or {}
|
||||
visited_properties = visited_properties or {}
|
||||
|
||||
data_point_properties = {}
|
||||
excluded_properties = set()
|
||||
|
||||
if str(data_point.id) in added_nodes:
|
||||
return nodes, edges
|
||||
properties_to_visit = set()
|
||||
|
||||
for field_name, field_value in data_point:
|
||||
if field_name == "_metadata":
|
||||
continue
|
||||
|
||||
if field_value is None:
|
||||
excluded_properties.add(field_name)
|
||||
continue
|
||||
|
||||
if isinstance(field_value, DataPoint):
|
||||
excluded_properties.add(field_name)
|
||||
|
||||
property_key = f"{str(data_point.id)}{field_name}{str(field_value.id)}"
|
||||
property_key = str(data_point.id) + field_name + str(field_value.id)
|
||||
|
||||
if property_key in visited_properties:
|
||||
continue
|
||||
|
||||
visited_properties[property_key] = True
|
||||
|
||||
nodes, edges = await add_nodes_and_edges(
|
||||
data_point,
|
||||
field_name,
|
||||
field_value,
|
||||
nodes,
|
||||
edges,
|
||||
added_nodes,
|
||||
added_edges,
|
||||
visited_properties,
|
||||
)
|
||||
properties_to_visit.add(field_name)
|
||||
|
||||
continue
|
||||
|
||||
if isinstance(field_value, list) and len(field_value) > 0 and isinstance(field_value[0], DataPoint):
|
||||
excluded_properties.add(field_name)
|
||||
|
||||
for field_value_item in field_value:
|
||||
property_key = f"{str(data_point.id)}{field_name}{str(field_value_item.id)}"
|
||||
for index, item in enumerate(field_value):
|
||||
property_key = str(data_point.id) + field_name + str(item.id)
|
||||
|
||||
if property_key in visited_properties:
|
||||
continue
|
||||
|
||||
visited_properties[property_key] = True
|
||||
|
||||
nodes, edges = await add_nodes_and_edges(
|
||||
data_point,
|
||||
field_name,
|
||||
field_value_item,
|
||||
nodes,
|
||||
edges,
|
||||
added_nodes,
|
||||
added_edges,
|
||||
visited_properties,
|
||||
)
|
||||
properties_to_visit.add(f"{field_name}.{index}")
|
||||
|
||||
continue
|
||||
|
||||
data_point_properties[field_name] = field_value
|
||||
|
||||
if include_root:
|
||||
|
||||
if include_root and str(data_point.id) not in added_nodes:
|
||||
SimpleDataPointModel = copy_model(
|
||||
type(data_point),
|
||||
include_fields = {
|
||||
"_metadata": (dict, data_point._metadata),
|
||||
"__tablename__": data_point.__tablename__,
|
||||
"__tablename__": (str, data_point.__tablename__),
|
||||
},
|
||||
exclude_fields = excluded_properties,
|
||||
exclude_fields = list(excluded_properties),
|
||||
)
|
||||
nodes.append(SimpleDataPointModel(**data_point_properties))
|
||||
added_nodes[str(data_point.id)] = True
|
||||
|
||||
for field_name in properties_to_visit:
|
||||
index = None
|
||||
|
||||
if "." in field_name:
|
||||
field_name, index = field_name.split(".")
|
||||
|
||||
field_value = getattr(data_point, field_name)
|
||||
|
||||
if index is not None:
|
||||
field_value = field_value[int(index)]
|
||||
|
||||
edge_key = str(data_point.id) + str(field_value.id) + field_name
|
||||
|
||||
if str(edge_key) not in added_edges:
|
||||
edges.append((data_point.id, field_value.id, field_name, {
|
||||
"source_node_id": data_point.id,
|
||||
"target_node_id": field_value.id,
|
||||
"relationship_name": field_name,
|
||||
"updated_at": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
|
||||
}))
|
||||
added_edges[str(edge_key)] = True
|
||||
|
||||
if str(field_value.id) in added_nodes:
|
||||
continue
|
||||
|
||||
property_nodes, property_edges = await get_graph_from_model(
|
||||
field_value,
|
||||
include_root = True,
|
||||
added_nodes = added_nodes,
|
||||
added_edges = added_edges,
|
||||
visited_properties = visited_properties,
|
||||
)
|
||||
|
||||
for node in property_nodes:
|
||||
nodes.append(node)
|
||||
|
||||
for edge in property_edges:
|
||||
edges.append(edge)
|
||||
|
||||
property_key = str(data_point.id) + field_name + str(field_value.id)
|
||||
visited_properties[property_key] = True
|
||||
|
||||
return nodes, edges
|
||||
|
||||
|
||||
async def add_nodes_and_edges(
|
||||
data_point,
|
||||
field_name,
|
||||
field_value,
|
||||
nodes,
|
||||
edges,
|
||||
added_nodes,
|
||||
added_edges,
|
||||
visited_properties,
|
||||
):
|
||||
property_nodes, property_edges = await get_graph_from_model(
|
||||
field_value,
|
||||
True,
|
||||
added_nodes,
|
||||
added_edges,
|
||||
visited_properties,
|
||||
)
|
||||
|
||||
for node in property_nodes:
|
||||
if str(node.id) not in added_nodes:
|
||||
nodes.append(node)
|
||||
added_nodes[str(node.id)] = True
|
||||
|
||||
for edge in property_edges:
|
||||
edge_key = str(edge[0]) + str(edge[1]) + edge[2]
|
||||
|
||||
if str(edge_key) not in added_edges:
|
||||
edges.append(edge)
|
||||
added_edges[str(edge_key)] = True
|
||||
|
||||
for property_node in get_own_properties(property_nodes, property_edges):
|
||||
edge_key = str(data_point.id) + str(property_node.id) + field_name
|
||||
|
||||
if str(edge_key) not in added_edges:
|
||||
edges.append(
|
||||
(
|
||||
data_point.id,
|
||||
property_node.id,
|
||||
field_name,
|
||||
{
|
||||
"source_node_id": data_point.id,
|
||||
"target_node_id": property_node.id,
|
||||
"relationship_name": field_name,
|
||||
"updated_at": datetime.now(timezone.utc).strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
),
|
||||
},
|
||||
)
|
||||
)
|
||||
added_edges[str(edge_key)] = True
|
||||
|
||||
return (nodes, edges)
|
||||
|
||||
|
||||
def get_own_properties(property_nodes, property_edges):
|
||||
def get_own_property_nodes(property_nodes, property_edges):
|
||||
own_properties = []
|
||||
|
||||
destination_nodes = [str(property_edge[1]) for property_edge in property_edges]
|
||||
|
|
|
|||
|
|
@ -5,7 +5,8 @@ from cognee.shared.data_models import KnowledgeGraph
|
|||
|
||||
|
||||
async def retrieve_existing_edges(
|
||||
graph_node_index: list[tuple[DataPoint, KnowledgeGraph]],
|
||||
data_chunks: list[DataPoint],
|
||||
chunk_graphs: list[KnowledgeGraph],
|
||||
graph_engine: GraphDBInterface,
|
||||
) -> dict[str, bool]:
|
||||
processed_nodes = {}
|
||||
|
|
@ -13,23 +14,25 @@ async def retrieve_existing_edges(
|
|||
entity_node_edges = []
|
||||
type_entity_edges = []
|
||||
|
||||
for graph_source, graph in graph_node_index:
|
||||
for index, data_chunk in enumerate(data_chunks):
|
||||
graph = chunk_graphs[index]
|
||||
|
||||
for node in graph.nodes:
|
||||
type_node_id = generate_node_id(node.type)
|
||||
entity_node_id = generate_node_id(node.id)
|
||||
|
||||
if str(type_node_id) not in processed_nodes:
|
||||
type_node_edges.append(
|
||||
(str(graph_source), str(type_node_id), "exists_in")
|
||||
(data_chunk.id, type_node_id, "exists_in")
|
||||
)
|
||||
processed_nodes[str(type_node_id)] = True
|
||||
|
||||
if str(entity_node_id) not in processed_nodes:
|
||||
entity_node_edges.append(
|
||||
(str(graph_source), entity_node_id, "mentioned_in")
|
||||
(data_chunk.id, entity_node_id, "mentioned_in")
|
||||
)
|
||||
type_entity_edges.append(
|
||||
(str(entity_node_id), str(type_node_id), "is_a")
|
||||
(entity_node_id, type_node_id, "is_a")
|
||||
)
|
||||
processed_nodes[str(entity_node_id)] = True
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ class BinaryData(IngestionData):
|
|||
def get_identifier(self):
|
||||
metadata = self.get_metadata()
|
||||
|
||||
return self.name + "." + metadata["extension"]
|
||||
return metadata["content_hash"]
|
||||
|
||||
def get_metadata(self):
|
||||
self.ensure_metadata()
|
||||
|
|
|
|||
|
|
@ -1,7 +1,11 @@
|
|||
from uuid import uuid5, NAMESPACE_OID
|
||||
from .data_types import IngestionData
|
||||
|
||||
def identify(data: IngestionData) -> str:
|
||||
data_id: str = data.get_identifier()
|
||||
from cognee.modules.users.models import User
|
||||
|
||||
return uuid5(NAMESPACE_OID, data_id)
|
||||
|
||||
def identify(data: IngestionData, user: User) -> str:
|
||||
data_content_hash: str = data.get_identifier()
|
||||
|
||||
# return UUID hash of file contents + owner id
|
||||
return uuid5(NAMESPACE_OID, f"{data_content_hash}{user.id}")
|
||||
|
|
|
|||
|
|
@ -1,25 +1,28 @@
|
|||
import string
|
||||
import random
|
||||
import os.path
|
||||
import hashlib
|
||||
from typing import BinaryIO, Union
|
||||
from cognee.base_config import get_base_config
|
||||
from cognee.infrastructure.files.storage import LocalStorage
|
||||
from .classify import classify
|
||||
|
||||
def save_data_to_file(data: Union[str, BinaryIO], dataset_name: str, filename: str = None):
|
||||
def save_data_to_file(data: Union[str, BinaryIO], filename: str = None):
|
||||
base_config = get_base_config()
|
||||
data_directory_path = base_config.data_root_directory
|
||||
|
||||
classified_data = classify(data, filename)
|
||||
|
||||
storage_path = data_directory_path + "/" + dataset_name.replace(".", "/")
|
||||
storage_path = os.path.join(data_directory_path, "data")
|
||||
LocalStorage.ensure_directory_exists(storage_path)
|
||||
|
||||
file_metadata = classified_data.get_metadata()
|
||||
if "name" not in file_metadata or file_metadata["name"] is None:
|
||||
letters = string.ascii_lowercase
|
||||
random_string = "".join(random.choice(letters) for _ in range(32))
|
||||
file_metadata["name"] = "text_" + random_string + ".txt"
|
||||
data_contents = classified_data.get_data().encode('utf-8')
|
||||
hash_contents = hashlib.md5(data_contents).hexdigest()
|
||||
file_metadata["name"] = "text_" + hash_contents + ".txt"
|
||||
file_name = file_metadata["name"]
|
||||
LocalStorage(storage_path).store(file_name, classified_data.get_data())
|
||||
|
||||
# Don't save file if it already exists
|
||||
if not os.path.isfile(os.path.join(storage_path, file_name)):
|
||||
LocalStorage(storage_path).store(file_name, classified_data.get_data())
|
||||
|
||||
return "file://" + storage_path + "/" + file_name
|
||||
|
|
|
|||
|
|
@ -1,10 +1,12 @@
|
|||
import json
|
||||
import inspect
|
||||
import json
|
||||
import logging
|
||||
|
||||
from cognee.modules.settings import get_current_settings
|
||||
from cognee.shared.utils import send_telemetry
|
||||
from cognee.modules.users.models import User
|
||||
from cognee.modules.users.methods import get_default_user
|
||||
from cognee.modules.users.models import User
|
||||
from cognee.shared.utils import send_telemetry
|
||||
|
||||
from ..tasks.Task import Task
|
||||
|
||||
logger = logging.getLogger("run_tasks(tasks: [Task], data)")
|
||||
|
|
@ -160,21 +162,28 @@ async def run_tasks_base(tasks: list[Task], data = None, user: User = None):
|
|||
raise error
|
||||
|
||||
async def run_tasks_with_telemetry(tasks: list[Task], data, pipeline_name: str):
|
||||
user = await get_default_user()
|
||||
|
||||
config = get_current_settings()
|
||||
|
||||
logger.debug("\nRunning pipeline with configuration:\n%s\n", json.dumps(config, indent = 1))
|
||||
|
||||
user = await get_default_user()
|
||||
|
||||
try:
|
||||
logger.info("Pipeline run started: `%s`", pipeline_name)
|
||||
send_telemetry("Pipeline Run Started", user.id, {
|
||||
"pipeline_name": pipeline_name,
|
||||
})
|
||||
|
||||
send_telemetry("Pipeline Run Started",
|
||||
user.id,
|
||||
additional_properties = {"pipeline_name": pipeline_name, } | config
|
||||
)
|
||||
|
||||
async for result in run_tasks_base(tasks, data, user):
|
||||
yield result
|
||||
|
||||
logger.info("Pipeline run completed: `%s`", pipeline_name)
|
||||
send_telemetry("Pipeline Run Completed", user.id, {
|
||||
"pipeline_name": pipeline_name,
|
||||
})
|
||||
send_telemetry("Pipeline Run Completed",
|
||||
user.id,
|
||||
additional_properties = {"pipeline_name": pipeline_name, }
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
"Pipeline run errored: `%s`\n%s\n",
|
||||
|
|
@ -182,15 +191,14 @@ async def run_tasks_with_telemetry(tasks: list[Task], data, pipeline_name: str):
|
|||
str(error),
|
||||
exc_info = True,
|
||||
)
|
||||
send_telemetry("Pipeline Run Errored", user.id, {
|
||||
"pipeline_name": pipeline_name,
|
||||
})
|
||||
send_telemetry("Pipeline Run Errored",
|
||||
user.id,
|
||||
additional_properties = {"pipeline_name": pipeline_name, } | config
|
||||
)
|
||||
|
||||
raise error
|
||||
|
||||
async def run_tasks(tasks: list[Task], data = None, pipeline_name: str = "default_pipeline"):
|
||||
config = get_current_settings()
|
||||
logger.debug("\nRunning pipeline with configuration:\n%s\n", json.dumps(config, indent = 1))
|
||||
|
||||
|
||||
async for result in run_tasks_with_telemetry(tasks, data, pipeline_name):
|
||||
yield result
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ class Repository(DataPoint):
|
|||
type: Optional[str] = "Repository"
|
||||
|
||||
class CodeFile(DataPoint):
|
||||
__tablename__ = "CodeFile"
|
||||
__tablename__ = "codefile"
|
||||
extracted_id: str # actually file path
|
||||
type: Optional[str] = "CodeFile"
|
||||
source_code: Optional[str] = None
|
||||
|
|
@ -21,7 +21,7 @@ class CodeFile(DataPoint):
|
|||
}
|
||||
|
||||
class CodePart(DataPoint):
|
||||
__tablename__ = "CodePart"
|
||||
__tablename__ = "codepart"
|
||||
# part_of: Optional[CodeFile]
|
||||
source_code: str
|
||||
type: Optional[str] = "CodePart"
|
||||
|
|
|
|||
9
cognee/shared/exceptions/__init__.py
Normal file
9
cognee/shared/exceptions/__init__.py
Normal file
|
|
@ -0,0 +1,9 @@
|
|||
"""
|
||||
Custom exceptions for the Cognee API.
|
||||
|
||||
This module defines a set of exceptions for handling various shared utility errors
|
||||
"""
|
||||
|
||||
from .exceptions import (
|
||||
IngestionError,
|
||||
)
|
||||
11
cognee/shared/exceptions/exceptions.py
Normal file
11
cognee/shared/exceptions/exceptions.py
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
from cognee.exceptions import CogneeApiError
|
||||
from fastapi import status
|
||||
|
||||
class IngestionError(CogneeApiError):
|
||||
def __init__(
|
||||
self,
|
||||
message: str = "Failed to load data.",
|
||||
name: str = "IngestionError",
|
||||
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
||||
):
|
||||
super().__init__(message, name, status_code)
|
||||
|
|
@ -1,6 +1,9 @@
|
|||
""" This module contains utility functions for the cognee. """
|
||||
import os
|
||||
from typing import BinaryIO, Union
|
||||
|
||||
import requests
|
||||
import hashlib
|
||||
from datetime import datetime, timezone
|
||||
import graphistry
|
||||
import networkx as nx
|
||||
|
|
@ -16,6 +19,8 @@ from cognee.infrastructure.databases.graph import get_graph_engine
|
|||
from uuid import uuid4
|
||||
import pathlib
|
||||
|
||||
from cognee.shared.exceptions import IngestionError
|
||||
|
||||
# Analytics Proxy Url, currently hosted by Vercel
|
||||
proxy_url = "https://test.prometh.ai"
|
||||
|
||||
|
|
@ -70,6 +75,29 @@ def num_tokens_from_string(string: str, encoding_name: str) -> int:
|
|||
num_tokens = len(encoding.encode(string))
|
||||
return num_tokens
|
||||
|
||||
def get_file_content_hash(file_obj: Union[str, BinaryIO]) -> str:
|
||||
h = hashlib.md5()
|
||||
|
||||
try:
|
||||
if isinstance(file_obj, str):
|
||||
with open(file_obj, 'rb') as file:
|
||||
while True:
|
||||
# Reading is buffered, so we can read smaller chunks.
|
||||
chunk = file.read(h.block_size)
|
||||
if not chunk:
|
||||
break
|
||||
h.update(chunk)
|
||||
else:
|
||||
while True:
|
||||
# Reading is buffered, so we can read smaller chunks.
|
||||
chunk = file_obj.read(h.block_size)
|
||||
if not chunk:
|
||||
break
|
||||
h.update(chunk)
|
||||
|
||||
return h.hexdigest()
|
||||
except IOError as e:
|
||||
raise IngestionError(message=f"Failed to load data from {file}: {e}")
|
||||
|
||||
def trim_text_to_max_tokens(text: str, max_tokens: int, encoding_name: str) -> str:
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -44,11 +44,11 @@ async def classify_documents(data_documents: list[Data]) -> list[Document]:
|
|||
for data_item in data_documents:
|
||||
metadata = await get_metadata(data_item.id)
|
||||
document = EXTENSION_TO_DOCUMENT_CLASS[data_item.extension](
|
||||
id=data_item.id,
|
||||
title=f"{data_item.name}.{data_item.extension}",
|
||||
raw_data_location=data_item.raw_data_location,
|
||||
name=data_item.name,
|
||||
metadata_id=metadata.id
|
||||
id = data_item.id,
|
||||
title = f"{data_item.name}.{data_item.extension}",
|
||||
raw_data_location = data_item.raw_data_location,
|
||||
name = data_item.name,
|
||||
metadata_id = metadata.id
|
||||
)
|
||||
documents.append(document)
|
||||
|
||||
|
|
|
|||
|
|
@ -20,16 +20,16 @@ async def extract_graph_from_data(
|
|||
*[extract_content_graph(chunk.text, graph_model) for chunk in data_chunks]
|
||||
)
|
||||
graph_engine = await get_graph_engine()
|
||||
chunk_and_chunk_graphs = [
|
||||
(chunk, chunk_graph) for chunk, chunk_graph in zip(data_chunks, chunk_graphs)
|
||||
]
|
||||
|
||||
existing_edges_map = await retrieve_existing_edges(
|
||||
chunk_and_chunk_graphs,
|
||||
data_chunks,
|
||||
chunk_graphs,
|
||||
graph_engine,
|
||||
)
|
||||
|
||||
graph_nodes, graph_edges = expand_with_nodes_and_edges(
|
||||
chunk_and_chunk_graphs,
|
||||
data_chunks,
|
||||
chunk_graphs,
|
||||
existing_edges_map,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,10 @@
|
|||
from typing import Any
|
||||
from typing import Any, List
|
||||
|
||||
import dlt
|
||||
import cognee.modules.ingestion as ingestion
|
||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||
from cognee.modules.data.methods import create_dataset
|
||||
from cognee.modules.data.operations.delete_metadata import delete_metadata
|
||||
from cognee.modules.data.models.DatasetData import DatasetData
|
||||
from cognee.modules.users.models import User
|
||||
from cognee.modules.users.permissions.methods import give_permission_on_document
|
||||
from cognee.shared.utils import send_telemetry
|
||||
|
|
@ -23,12 +23,12 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
|
|||
destination = destination,
|
||||
)
|
||||
|
||||
@dlt.resource(standalone=True, merge_key="id")
|
||||
async def data_resources(file_paths: str):
|
||||
@dlt.resource(standalone=True, primary_key="id", merge_key="id")
|
||||
async def data_resources(file_paths: List[str], user: User):
|
||||
for file_path in file_paths:
|
||||
with open(file_path.replace("file://", ""), mode="rb") as file:
|
||||
classified_data = ingestion.classify(file)
|
||||
data_id = ingestion.identify(classified_data)
|
||||
data_id = ingestion.identify(classified_data, user)
|
||||
file_metadata = classified_data.get_metadata()
|
||||
yield {
|
||||
"id": data_id,
|
||||
|
|
@ -36,6 +36,8 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
|
|||
"file_path": file_metadata["file_path"],
|
||||
"extension": file_metadata["extension"],
|
||||
"mime_type": file_metadata["mime_type"],
|
||||
"content_hash": file_metadata["content_hash"],
|
||||
"owner_id": str(user.id),
|
||||
}
|
||||
|
||||
async def data_storing(data: Any, dataset_name: str, user: User):
|
||||
|
|
@ -57,7 +59,8 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
|
|||
with open(file_path.replace("file://", ""), mode = "rb") as file:
|
||||
classified_data = ingestion.classify(file)
|
||||
|
||||
data_id = ingestion.identify(classified_data)
|
||||
# data_id is the hash of file contents + owner id to avoid duplicate data
|
||||
data_id = ingestion.identify(classified_data, user)
|
||||
|
||||
file_metadata = classified_data.get_metadata()
|
||||
|
||||
|
|
@ -70,6 +73,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
|
|||
async with db_engine.get_async_session() as session:
|
||||
dataset = await create_dataset(dataset_name, user.id, session)
|
||||
|
||||
# Check to see if data should be updated
|
||||
data_point = (
|
||||
await session.execute(select(Data).filter(Data.id == data_id))
|
||||
).scalar_one_or_none()
|
||||
|
|
@ -79,6 +83,8 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
|
|||
data_point.raw_data_location = file_metadata["file_path"]
|
||||
data_point.extension = file_metadata["extension"]
|
||||
data_point.mime_type = file_metadata["mime_type"]
|
||||
data_point.owner_id = user.id
|
||||
data_point.content_hash = file_metadata["content_hash"]
|
||||
await session.merge(data_point)
|
||||
else:
|
||||
data_point = Data(
|
||||
|
|
@ -86,10 +92,20 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
|
|||
name = file_metadata["name"],
|
||||
raw_data_location = file_metadata["file_path"],
|
||||
extension = file_metadata["extension"],
|
||||
mime_type = file_metadata["mime_type"]
|
||||
mime_type = file_metadata["mime_type"],
|
||||
owner_id = user.id,
|
||||
content_hash = file_metadata["content_hash"],
|
||||
)
|
||||
|
||||
# Check if data is already in dataset
|
||||
dataset_data = (
|
||||
await session.execute(select(DatasetData).filter(DatasetData.data_id == data_id,
|
||||
DatasetData.dataset_id == dataset.id))
|
||||
).scalar_one_or_none()
|
||||
# If data is not present in dataset add it
|
||||
if dataset_data is None:
|
||||
dataset.data.append(data_point)
|
||||
|
||||
await session.commit()
|
||||
await write_metadata(data_item, data_point.id, file_metadata)
|
||||
|
||||
|
|
@ -109,16 +125,17 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
|
|||
# To use sqlite with dlt dataset_name must be set to "main".
|
||||
# Sqlite doesn't support schemas
|
||||
run_info = pipeline.run(
|
||||
data_resources(file_paths),
|
||||
data_resources(file_paths, user),
|
||||
table_name="file_metadata",
|
||||
dataset_name="main",
|
||||
write_disposition="merge",
|
||||
)
|
||||
else:
|
||||
# Data should be stored in the same schema to allow deduplication
|
||||
run_info = pipeline.run(
|
||||
data_resources(file_paths),
|
||||
data_resources(file_paths, user),
|
||||
table_name="file_metadata",
|
||||
dataset_name=dataset_name,
|
||||
dataset_name="public",
|
||||
write_disposition="merge",
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ def save_data_item_to_storage(data_item: Union[BinaryIO, str], dataset_name: str
|
|||
|
||||
# data is a file object coming from upload.
|
||||
if hasattr(data_item, "file"):
|
||||
file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename)
|
||||
file_path = save_data_to_file(data_item.file, filename=data_item.filename)
|
||||
|
||||
elif isinstance(data_item, str):
|
||||
# data is a file path
|
||||
|
|
@ -15,7 +15,7 @@ def save_data_item_to_storage(data_item: Union[BinaryIO, str], dataset_name: str
|
|||
file_path = data_item.replace("file://", "")
|
||||
# data is text
|
||||
else:
|
||||
file_path = save_data_to_file(data_item, dataset_name)
|
||||
file_path = save_data_to_file(data_item)
|
||||
else:
|
||||
raise IngestionError(message=f"Data type not supported: {type(data_item)}")
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ async def save_data_item_with_metadata_to_storage(
|
|||
# data is a file object coming from upload.
|
||||
elif hasattr(data_item, "file"):
|
||||
file_path = save_data_to_file(
|
||||
data_item.file, dataset_name, filename=data_item.filename
|
||||
data_item.file, filename=data_item.filename
|
||||
)
|
||||
|
||||
elif isinstance(data_item, str):
|
||||
|
|
@ -26,7 +26,7 @@ async def save_data_item_with_metadata_to_storage(
|
|||
file_path = data_item.replace("file://", "")
|
||||
# data is text
|
||||
else:
|
||||
file_path = save_data_to_file(data_item, dataset_name)
|
||||
file_path = save_data_to_file(data_item)
|
||||
else:
|
||||
raise IngestionError(message=f"Data type not supported: {type(data_item)}")
|
||||
|
||||
|
|
|
|||
|
|
@ -8,11 +8,11 @@ def get_data_from_llama_index(data_point: Union[Document, ImageDocument], datase
|
|||
if type(data_point) == Document:
|
||||
file_path = data_point.metadata.get("file_path")
|
||||
if file_path is None:
|
||||
file_path = save_data_to_file(data_point.text, dataset_name)
|
||||
file_path = save_data_to_file(data_point.text)
|
||||
return file_path
|
||||
return file_path
|
||||
elif type(data_point) == ImageDocument:
|
||||
if data_point.image_path is None:
|
||||
file_path = save_data_to_file(data_point.text, dataset_name)
|
||||
file_path = save_data_to_file(data_point.text)
|
||||
return file_path
|
||||
return data_point.image_path
|
||||
|
|
@ -70,7 +70,7 @@ async def node_enrich_and_connect(
|
|||
if desc_id in data_points_map:
|
||||
desc = data_points_map[desc_id]
|
||||
else:
|
||||
node_data = await graph_engine.extract_node(desc_id)
|
||||
node_data = await graph_engine.extract_node(str(desc_id))
|
||||
try:
|
||||
desc = convert_node_to_data_point(node_data)
|
||||
except Exception:
|
||||
|
|
@ -87,9 +87,17 @@ async def enrich_dependency_graph(data_points: list[DataPoint]) -> AsyncGenerato
|
|||
"""Enriches the graph with topological ranks and 'depends_on' edges."""
|
||||
nodes = []
|
||||
edges = []
|
||||
added_nodes = {}
|
||||
added_edges = {}
|
||||
visited_properties = {}
|
||||
|
||||
for data_point in data_points:
|
||||
graph_nodes, graph_edges = await get_graph_from_model(data_point)
|
||||
graph_nodes, graph_edges = await get_graph_from_model(
|
||||
data_point,
|
||||
added_nodes = added_nodes,
|
||||
added_edges = added_edges,
|
||||
visited_properties = visited_properties,
|
||||
)
|
||||
nodes.extend(graph_nodes)
|
||||
edges.extend(graph_edges)
|
||||
|
||||
|
|
|
|||
|
|
@ -11,12 +11,14 @@ async def add_data_points(data_points: list[DataPoint]):
|
|||
|
||||
added_nodes = {}
|
||||
added_edges = {}
|
||||
visited_properties = {}
|
||||
|
||||
results = await asyncio.gather(*[
|
||||
get_graph_from_model(
|
||||
data_point,
|
||||
added_nodes = added_nodes,
|
||||
added_edges = added_edges,
|
||||
visited_properties = visited_properties,
|
||||
) for data_point in data_points
|
||||
])
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
from cognee.infrastructure.engine import DataPoint
|
||||
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
|
||||
from cognee.modules.data.processing.document_types import Document
|
||||
from cognee.modules.chunking.models import DocumentChunk
|
||||
from cognee.shared.CodeGraphEntities import CodeFile
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ from uuid import uuid5
|
|||
from pydantic import BaseModel
|
||||
from cognee.modules.data.extraction.extract_summary import extract_summary
|
||||
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
|
||||
from cognee.tasks.storage import add_data_points
|
||||
from .models import TextSummary
|
||||
|
||||
async def summarize_text(data_chunks: list[DocumentChunk], summarization_model: Type[BaseModel]):
|
||||
|
|
@ -23,6 +22,4 @@ async def summarize_text(data_chunks: list[DocumentChunk], summarization_model:
|
|||
) for (chunk_index, chunk) in enumerate(data_chunks)
|
||||
]
|
||||
|
||||
await add_data_points(summaries)
|
||||
|
||||
return data_chunks
|
||||
return summaries
|
||||
|
|
|
|||
|
|
@ -0,0 +1,2 @@
|
|||
Natural language processing (NLP) is an interdisciplinary subfield of computer science and information retrieval. It is primarily concerned with giving computers the ability to support and manipulate human language. It involves processing natural language datasets, such as text corpora or speech corpora, using either rule-based or probabilistic (i.e. statistical and, most recently, neural network-based) machine learning approaches. The goal is a computer capable of "understanding"[citation needed] the contents of documents, including the contextual nuances of the language within them. To this end, natural language processing often borrows ideas from theoretical linguistics. The technology can then accurately extract information and insights contained in the documents as well as categorize and organize the documents themselves.
|
||||
Challenges in natural language processing frequently involve speech recognition, natural-language understanding, and natural-language generation.
|
||||
BIN
cognee/tests/test_data/example.png
Normal file
BIN
cognee/tests/test_data/example.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 10 KiB |
BIN
cognee/tests/test_data/example_copy.png
Normal file
BIN
cognee/tests/test_data/example_copy.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 10 KiB |
BIN
cognee/tests/test_data/text_to_speech.mp3
Normal file
BIN
cognee/tests/test_data/text_to_speech.mp3
Normal file
Binary file not shown.
BIN
cognee/tests/test_data/text_to_speech_copy.mp3
Normal file
BIN
cognee/tests/test_data/text_to_speech_copy.mp3
Normal file
Binary file not shown.
160
cognee/tests/test_deduplication.py
Normal file
160
cognee/tests/test_deduplication.py
Normal file
|
|
@ -0,0 +1,160 @@
|
|||
import hashlib
|
||||
import os
|
||||
import logging
|
||||
import pathlib
|
||||
|
||||
import cognee
|
||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
async def test_deduplication():
|
||||
await cognee.prune.prune_data()
|
||||
await cognee.prune.prune_system(metadata=True)
|
||||
|
||||
relational_engine = get_relational_engine()
|
||||
|
||||
dataset_name = "test_deduplication"
|
||||
dataset_name2 = "test_deduplication2"
|
||||
|
||||
# Test deduplication of local files
|
||||
explanation_file_path = os.path.join(
|
||||
pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt"
|
||||
)
|
||||
explanation_file_path2 = os.path.join(
|
||||
pathlib.Path(__file__).parent, "test_data/Natural_language_processing_copy.txt"
|
||||
)
|
||||
await cognee.add([explanation_file_path], dataset_name)
|
||||
await cognee.add([explanation_file_path2], dataset_name2)
|
||||
|
||||
result = await relational_engine.get_all_data_from_table("data")
|
||||
assert len(result) == 1, "More than one data entity was found."
|
||||
assert result[0]["name"] == "Natural_language_processing_copy", "Result name does not match expected value."
|
||||
|
||||
result = await relational_engine.get_all_data_from_table("datasets")
|
||||
assert len(result) == 2, "Unexpected number of datasets found."
|
||||
assert result[0]["name"] == dataset_name, "Result name does not match expected value."
|
||||
assert result[1]["name"] == dataset_name2, "Result name does not match expected value."
|
||||
|
||||
result = await relational_engine.get_all_data_from_table("dataset_data")
|
||||
assert len(result) == 2, "Unexpected number of dataset data relationships found."
|
||||
assert result[0]["data_id"] == result[1]["data_id"], "Data item is not reused between datasets."
|
||||
assert result[0]["dataset_id"] != result[1]["dataset_id"], "Dataset items are not different."
|
||||
|
||||
await cognee.prune.prune_data()
|
||||
await cognee.prune.prune_system(metadata=True)
|
||||
|
||||
# Test deduplication of text input
|
||||
text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena.
|
||||
At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states.
|
||||
Classical physics cannot explain the operation of these quantum devices, and a scalable quantum computer could perform some calculations exponentially faster (with respect to input size scaling) than any modern "classical" computer. In particular, a large-scale quantum computer could break widely used encryption schemes and aid physicists in performing physical simulations; however, the current state of the technology is largely experimental and impractical, with several obstacles to useful applications. Moreover, scalable quantum computers do not hold promise for many practical tasks, and for many important tasks quantum speedups are proven impossible.
|
||||
The basic unit of information in quantum computing is the qubit, similar to the bit in traditional digital electronics. Unlike a classical bit, a qubit can exist in a superposition of its two "basis" states. When measuring a qubit, the result is a probabilistic output of a classical bit, therefore making quantum computers nondeterministic in general. If a quantum computer manipulates the qubit in a particular way, wave interference effects can amplify the desired measurement results. The design of quantum algorithms involves creating procedures that allow a quantum computer to perform calculations efficiently and quickly.
|
||||
Physically engineering high-quality qubits has proven challenging. If a physical qubit is not sufficiently isolated from its environment, it suffers from quantum decoherence, introducing noise into calculations. Paradoxically, perfectly isolating qubits is also undesirable because quantum computations typically need to initialize qubits, perform controlled qubit interactions, and measure the resulting quantum states. Each of those operations introduces errors and suffers from noise, and such inaccuracies accumulate.
|
||||
In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
|
||||
"""
|
||||
|
||||
await cognee.add([text], dataset_name)
|
||||
await cognee.add([text], dataset_name2)
|
||||
|
||||
result = await relational_engine.get_all_data_from_table("data")
|
||||
assert len(result) == 1, "More than one data entity was found."
|
||||
assert hashlib.md5(text.encode('utf-8')).hexdigest() in result[0]["name"], "Content hash is not a part of file name."
|
||||
|
||||
await cognee.prune.prune_data()
|
||||
await cognee.prune.prune_system(metadata=True)
|
||||
|
||||
# Test deduplication of image files
|
||||
explanation_file_path = os.path.join(
|
||||
pathlib.Path(__file__).parent, "test_data/example.png"
|
||||
)
|
||||
explanation_file_path2 = os.path.join(
|
||||
pathlib.Path(__file__).parent, "test_data/example_copy.png"
|
||||
)
|
||||
|
||||
await cognee.add([explanation_file_path], dataset_name)
|
||||
await cognee.add([explanation_file_path2], dataset_name2)
|
||||
|
||||
result = await relational_engine.get_all_data_from_table("data")
|
||||
assert len(result) == 1, "More than one data entity was found."
|
||||
|
||||
await cognee.prune.prune_data()
|
||||
await cognee.prune.prune_system(metadata=True)
|
||||
|
||||
# Test deduplication of sound files
|
||||
explanation_file_path = os.path.join(
|
||||
pathlib.Path(__file__).parent, "test_data/text_to_speech.mp3"
|
||||
)
|
||||
explanation_file_path2 = os.path.join(
|
||||
pathlib.Path(__file__).parent, "test_data/text_to_speech_copy.mp3"
|
||||
)
|
||||
|
||||
await cognee.add([explanation_file_path], dataset_name)
|
||||
await cognee.add([explanation_file_path2], dataset_name2)
|
||||
|
||||
result = await relational_engine.get_all_data_from_table("data")
|
||||
assert len(result) == 1, "More than one data entity was found."
|
||||
|
||||
await cognee.prune.prune_data()
|
||||
await cognee.prune.prune_system(metadata=True)
|
||||
|
||||
|
||||
async def test_deduplication_postgres():
|
||||
cognee.config.set_vector_db_config(
|
||||
{
|
||||
"vector_db_url": "",
|
||||
"vector_db_key": "",
|
||||
"vector_db_provider": "pgvector"
|
||||
}
|
||||
)
|
||||
cognee.config.set_relational_db_config(
|
||||
{
|
||||
"db_name": "cognee_db",
|
||||
"db_host": "127.0.0.1",
|
||||
"db_port": "5432",
|
||||
"db_username": "cognee",
|
||||
"db_password": "cognee",
|
||||
"db_provider": "postgres",
|
||||
}
|
||||
)
|
||||
|
||||
await test_deduplication()
|
||||
|
||||
async def test_deduplication_sqlite():
|
||||
cognee.config.set_vector_db_config(
|
||||
{
|
||||
"vector_db_url": "",
|
||||
"vector_db_key": "",
|
||||
"vector_db_provider": "lancedb"
|
||||
}
|
||||
)
|
||||
cognee.config.set_relational_db_config(
|
||||
{
|
||||
"db_provider": "sqlite",
|
||||
}
|
||||
)
|
||||
|
||||
await test_deduplication()
|
||||
|
||||
|
||||
async def main():
|
||||
|
||||
data_directory_path = str(
|
||||
pathlib.Path(
|
||||
os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_deduplication")
|
||||
).resolve()
|
||||
)
|
||||
cognee.config.data_root_directory(data_directory_path)
|
||||
cognee_directory_path = str(
|
||||
pathlib.Path(
|
||||
os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_deduplication")
|
||||
).resolve()
|
||||
)
|
||||
cognee.config.system_root_directory(cognee_directory_path)
|
||||
|
||||
await test_deduplication_postgres()
|
||||
await test_deduplication_sqlite()
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
|
||||
asyncio.run(main())
|
||||
|
|
@ -73,10 +73,13 @@ async def test_circular_reference_extraction():
|
|||
nodes = []
|
||||
edges = []
|
||||
|
||||
added_nodes = {}
|
||||
added_edges = {}
|
||||
|
||||
start = time.perf_counter_ns()
|
||||
|
||||
results = await asyncio.gather(*[
|
||||
get_graph_from_model(code_file) for code_file in code_files
|
||||
get_graph_from_model(code_file, added_nodes = added_nodes, added_edges = added_edges) for code_file in code_files
|
||||
])
|
||||
|
||||
time_to_run = time.perf_counter_ns() - start
|
||||
|
|
@ -87,12 +90,6 @@ async def test_circular_reference_extraction():
|
|||
nodes.extend(result_nodes)
|
||||
edges.extend(result_edges)
|
||||
|
||||
# for code_file in code_files:
|
||||
# model_nodes, model_edges = get_graph_from_model(code_file)
|
||||
|
||||
# nodes.extend(model_nodes)
|
||||
# edges.extend(model_edges)
|
||||
|
||||
assert len(nodes) == 1501
|
||||
assert len(edges) == 1501 * 20 + 1500 * 5
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,69 @@
|
|||
import asyncio
|
||||
import random
|
||||
from typing import List
|
||||
from uuid import uuid5, NAMESPACE_OID
|
||||
|
||||
from cognee.infrastructure.engine import DataPoint
|
||||
from cognee.modules.graph.utils import get_graph_from_model
|
||||
|
||||
class Document(DataPoint):
|
||||
path: str
|
||||
|
||||
class DocumentChunk(DataPoint):
|
||||
part_of: Document
|
||||
text: str
|
||||
contains: List["Entity"] = None
|
||||
|
||||
class EntityType(DataPoint):
|
||||
name: str
|
||||
|
||||
class Entity(DataPoint):
|
||||
name: str
|
||||
is_type: EntityType
|
||||
|
||||
DocumentChunk.model_rebuild()
|
||||
|
||||
|
||||
async def get_graph_from_model_test():
|
||||
document = Document(path = "file_path")
|
||||
|
||||
document_chunks = [DocumentChunk(
|
||||
id = uuid5(NAMESPACE_OID, f"file{file_index}"),
|
||||
text = "some text",
|
||||
part_of = document,
|
||||
contains = [],
|
||||
) for file_index in range(1)]
|
||||
|
||||
for document_chunk in document_chunks:
|
||||
document_chunk.contains.append(Entity(
|
||||
name = f"Entity",
|
||||
is_type = EntityType(
|
||||
name = "Type 1",
|
||||
),
|
||||
))
|
||||
|
||||
nodes = []
|
||||
edges = []
|
||||
|
||||
added_nodes = {}
|
||||
added_edges = {}
|
||||
visited_properties = {}
|
||||
|
||||
results = await asyncio.gather(*[
|
||||
get_graph_from_model(
|
||||
document_chunk,
|
||||
added_nodes = added_nodes,
|
||||
added_edges = added_edges,
|
||||
visited_properties = visited_properties,
|
||||
) for document_chunk in document_chunks
|
||||
])
|
||||
|
||||
for result_nodes, result_edges in results:
|
||||
nodes.extend(result_nodes)
|
||||
edges.extend(result_edges)
|
||||
|
||||
assert len(nodes) == 4
|
||||
assert len(edges) == 3
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(get_graph_from_model_test())
|
||||
|
|
@ -64,7 +64,6 @@ async def generate_patch_with_cognee(instance, llm_client, search_type=SearchTyp
|
|||
|
||||
tasks = [
|
||||
Task(get_repo_file_dependencies),
|
||||
Task(add_data_points, task_config = { "batch_size": 50 }),
|
||||
Task(enrich_dependency_graph, task_config = { "batch_size": 50 }),
|
||||
Task(expand_dependency_graph, task_config = { "batch_size": 50 }),
|
||||
Task(add_data_points, task_config = { "batch_size": 50 }),
|
||||
|
|
|
|||
4
licenses/README.md
Normal file
4
licenses/README.md
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
# Third party licenses
|
||||
|
||||
This folder contains the licenses of third-party open-source software that has been redistributed in this project.
|
||||
Details of included files and modifications can be found in [NOTICE](/NOTICE.md).
|
||||
|
|
@ -265,7 +265,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"execution_count": null,
|
||||
"id": "df16431d0f48b006",
|
||||
"metadata": {
|
||||
"ExecuteTime": {
|
||||
|
|
@ -304,7 +304,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"execution_count": null,
|
||||
"id": "9086abf3af077ab4",
|
||||
"metadata": {
|
||||
"ExecuteTime": {
|
||||
|
|
@ -349,7 +349,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"execution_count": null,
|
||||
"id": "a9de0cc07f798b7f",
|
||||
"metadata": {
|
||||
"ExecuteTime": {
|
||||
|
|
@ -393,7 +393,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"execution_count": null,
|
||||
"id": "185ff1c102d06111",
|
||||
"metadata": {
|
||||
"ExecuteTime": {
|
||||
|
|
@ -437,7 +437,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 5,
|
||||
"execution_count": null,
|
||||
"id": "d55ce4c58f8efb67",
|
||||
"metadata": {
|
||||
"ExecuteTime": {
|
||||
|
|
@ -479,7 +479,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 6,
|
||||
"execution_count": null,
|
||||
"id": "ca4ecc32721ad332",
|
||||
"metadata": {
|
||||
"ExecuteTime": {
|
||||
|
|
@ -529,7 +529,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 7,
|
||||
"execution_count": null,
|
||||
"id": "bce39dc6",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
|
|
@ -622,7 +622,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 10,
|
||||
"execution_count": null,
|
||||
"id": "7c431fdef4921ae0",
|
||||
"metadata": {
|
||||
"ExecuteTime": {
|
||||
|
|
@ -654,13 +654,13 @@
|
|||
" Task(classify_documents),\n",
|
||||
" Task(check_permissions_on_documents, user = user, permissions = [\"write\"]),\n",
|
||||
" Task(extract_chunks_from_documents), # Extract text chunks based on the document type.\n",
|
||||
" Task(add_data_points, task_config = { \"batch_size\": 10 }),\n",
|
||||
" Task(extract_graph_from_data, graph_model = KnowledgeGraph, task_config = { \"batch_size\": 10 }), # Generate knowledge graphs from the document chunks.\n",
|
||||
" Task(\n",
|
||||
" summarize_text,\n",
|
||||
" summarization_model = cognee_config.summarization_model,\n",
|
||||
" task_config = { \"batch_size\": 10 }\n",
|
||||
" ),\n",
|
||||
" Task(add_data_points, task_config = { \"batch_size\": 10 }),\n",
|
||||
" ]\n",
|
||||
"\n",
|
||||
" pipeline = run_tasks(tasks, data_documents)\n",
|
||||
|
|
@ -883,7 +883,7 @@
|
|||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.6"
|
||||
"version": "3.11.8"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
|
|
|
|||
|
|
@ -28,10 +28,27 @@ if __name__ == "__main__":
|
|||
society = create_organization_recursive(
|
||||
"society", "Society", PERSON_NAMES, args.recursive_depth
|
||||
)
|
||||
nodes, edges = asyncio.run(get_graph_from_model(society))
|
||||
added_nodes = {}
|
||||
added_edges = {}
|
||||
visited_properties = {}
|
||||
nodes, edges = asyncio.run(get_graph_from_model(
|
||||
society,
|
||||
added_nodes = added_nodes,
|
||||
added_edges = added_edges,
|
||||
visited_properties = visited_properties,
|
||||
))
|
||||
|
||||
def get_graph_from_model_sync(model):
|
||||
return asyncio.run(get_graph_from_model(model))
|
||||
added_nodes = {}
|
||||
added_edges = {}
|
||||
visited_properties = {}
|
||||
|
||||
return asyncio.run(get_graph_from_model(
|
||||
model,
|
||||
added_nodes = added_nodes,
|
||||
added_edges = added_edges,
|
||||
visited_properties = visited_properties,
|
||||
))
|
||||
|
||||
results = benchmark_function(get_graph_from_model_sync, society, num_runs=args.runs)
|
||||
print("\nBenchmark Results:")
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue