Merge branch 'dev' into feature/cog-3564-get-raw-data-endpoint-fails-when-trying-to-read-text-files
This commit is contained in:
commit
5dc6d4da48
13 changed files with 3185 additions and 2798 deletions
25
.github/workflows/e2e_tests.yml
vendored
25
.github/workflows/e2e_tests.yml
vendored
|
|
@ -315,6 +315,31 @@ jobs:
|
||||||
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
|
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
|
||||||
run: uv run python ./cognee/tests/test_multi_tenancy.py
|
run: uv run python ./cognee/tests/test_multi_tenancy.py
|
||||||
|
|
||||||
|
test-data-label:
|
||||||
|
name: Test adding of label for data in Cognee
|
||||||
|
runs-on: ubuntu-22.04
|
||||||
|
steps:
|
||||||
|
- name: Check out repository
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Cognee Setup
|
||||||
|
uses: ./.github/actions/cognee_setup
|
||||||
|
with:
|
||||||
|
python-version: '3.11.x'
|
||||||
|
|
||||||
|
- name: Run custom data label test
|
||||||
|
env:
|
||||||
|
ENV: 'dev'
|
||||||
|
LLM_MODEL: ${{ secrets.LLM_MODEL }}
|
||||||
|
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
|
||||||
|
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
|
||||||
|
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
|
||||||
|
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
|
||||||
|
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
|
||||||
|
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
|
||||||
|
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
|
||||||
|
run: uv run python ./cognee/tests/test_custom_data_label.py
|
||||||
|
|
||||||
test-graph-edges:
|
test-graph-edges:
|
||||||
name: Test graph edge ingestion
|
name: Test graph edge ingestion
|
||||||
runs-on: ubuntu-22.04
|
runs-on: ubuntu-22.04
|
||||||
|
|
|
||||||
38
alembic/versions/a1b2c3d4e5f6_add_label_column_to_data.py
Normal file
38
alembic/versions/a1b2c3d4e5f6_add_label_column_to_data.py
Normal file
|
|
@ -0,0 +1,38 @@
|
||||||
|
"""Add label column to data table
|
||||||
|
|
||||||
|
Revision ID: a1b2c3d4e5f6
|
||||||
|
Revises: 211ab850ef3d
|
||||||
|
Create Date: 2025-11-17 17:54:32.123456
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = "a1b2c3d4e5f6"
|
||||||
|
down_revision: Union[str, None] = "46a6ce2bd2b2"
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_column(inspector, table, name, schema=None):
|
||||||
|
for col in inspector.get_columns(table, schema=schema):
|
||||||
|
if col["name"] == name:
|
||||||
|
return col
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
conn = op.get_bind()
|
||||||
|
insp = sa.inspect(conn)
|
||||||
|
|
||||||
|
label_column = _get_column(insp, "data", "label")
|
||||||
|
if not label_column:
|
||||||
|
op.add_column("data", sa.Column("label", sa.String(), nullable=True))
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
op.drop_column("data", "label")
|
||||||
4183
cognee-mcp/uv.lock
generated
4183
cognee-mcp/uv.lock
generated
File diff suppressed because it is too large
Load diff
|
|
@ -10,13 +10,14 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
|
||||||
)
|
)
|
||||||
from cognee.modules.engine.operations.setup import setup
|
from cognee.modules.engine.operations.setup import setup
|
||||||
from cognee.tasks.ingestion import ingest_data, resolve_data_directories
|
from cognee.tasks.ingestion import ingest_data, resolve_data_directories
|
||||||
|
from cognee.tasks.ingestion.data_item import DataItem
|
||||||
from cognee.shared.logging_utils import get_logger
|
from cognee.shared.logging_utils import get_logger
|
||||||
|
|
||||||
logger = get_logger()
|
logger = get_logger()
|
||||||
|
|
||||||
|
|
||||||
async def add(
|
async def add(
|
||||||
data: Union[BinaryIO, list[BinaryIO], str, list[str]],
|
data: Union[BinaryIO, list[BinaryIO], str, list[str], DataItem, list[DataItem]],
|
||||||
dataset_name: str = "main_dataset",
|
dataset_name: str = "main_dataset",
|
||||||
user: User = None,
|
user: User = None,
|
||||||
node_set: Optional[List[str]] = None,
|
node_set: Optional[List[str]] = None,
|
||||||
|
|
|
||||||
|
|
@ -46,6 +46,7 @@ class DatasetDTO(OutDTO):
|
||||||
class DataDTO(OutDTO):
|
class DataDTO(OutDTO):
|
||||||
id: UUID
|
id: UUID
|
||||||
name: str
|
name: str
|
||||||
|
label: Optional[str] = None
|
||||||
created_at: datetime
|
created_at: datetime
|
||||||
updated_at: Optional[datetime] = None
|
updated_at: Optional[datetime] = None
|
||||||
extension: str
|
extension: str
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
from sqlalchemy import URL
|
||||||
from .sqlalchemy.SqlAlchemyAdapter import SQLAlchemyAdapter
|
from .sqlalchemy.SqlAlchemyAdapter import SQLAlchemyAdapter
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
|
|
||||||
|
|
@ -45,9 +46,16 @@ def create_relational_engine(
|
||||||
# Test if asyncpg is available
|
# Test if asyncpg is available
|
||||||
import asyncpg
|
import asyncpg
|
||||||
|
|
||||||
connection_string = (
|
# Handle special characters in username and password like # or @
|
||||||
f"postgresql+asyncpg://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}"
|
connection_string = URL.create(
|
||||||
|
"postgresql+asyncpg",
|
||||||
|
username=db_username,
|
||||||
|
password=db_password,
|
||||||
|
host=db_host,
|
||||||
|
port=int(db_port),
|
||||||
|
database=db_name,
|
||||||
)
|
)
|
||||||
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
raise ImportError(
|
raise ImportError(
|
||||||
"PostgreSQL dependencies are not installed. Please install with 'pip install cognee\"[postgres]\"' or 'pip install cognee\"[postgres-binary]\"' to use PostgreSQL functionality."
|
"PostgreSQL dependencies are not installed. Please install with 'pip install cognee\"[postgres]\"' or 'pip install cognee\"[postgres-binary]\"' to use PostgreSQL functionality."
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,5 @@
|
||||||
|
from sqlalchemy import URL
|
||||||
|
|
||||||
from .supported_databases import supported_databases
|
from .supported_databases import supported_databases
|
||||||
from .embeddings import get_embedding_engine
|
from .embeddings import get_embedding_engine
|
||||||
from cognee.infrastructure.databases.graph.config import get_graph_context_config
|
from cognee.infrastructure.databases.graph.config import get_graph_context_config
|
||||||
|
|
@ -66,8 +68,13 @@ def create_vector_engine(
|
||||||
if not (db_host and db_port and db_name and db_username and db_password):
|
if not (db_host and db_port and db_name and db_username and db_password):
|
||||||
raise EnvironmentError("Missing requred pgvector credentials!")
|
raise EnvironmentError("Missing requred pgvector credentials!")
|
||||||
|
|
||||||
connection_string: str = (
|
connection_string = URL.create(
|
||||||
f"postgresql+asyncpg://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}"
|
"postgresql+asyncpg",
|
||||||
|
username=db_username,
|
||||||
|
password=db_password,
|
||||||
|
host=db_host,
|
||||||
|
port=int(db_port),
|
||||||
|
database=db_name,
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ class Data(Base):
|
||||||
__tablename__ = "data"
|
__tablename__ = "data"
|
||||||
|
|
||||||
id = Column(UUID, primary_key=True, default=uuid4)
|
id = Column(UUID, primary_key=True, default=uuid4)
|
||||||
|
label = Column(String, nullable=True)
|
||||||
name = Column(String)
|
name = Column(String)
|
||||||
extension = Column(String)
|
extension = Column(String)
|
||||||
mime_type = Column(String)
|
mime_type = Column(String)
|
||||||
|
|
@ -49,6 +49,7 @@ class Data(Base):
|
||||||
return {
|
return {
|
||||||
"id": str(self.id),
|
"id": str(self.id),
|
||||||
"name": self.name,
|
"name": self.name,
|
||||||
|
"label": self.label,
|
||||||
"extension": self.extension,
|
"extension": self.extension,
|
||||||
"mimeType": self.mime_type,
|
"mimeType": self.mime_type,
|
||||||
"rawDataLocation": self.raw_data_location,
|
"rawDataLocation": self.raw_data_location,
|
||||||
|
|
|
||||||
8
cognee/tasks/ingestion/data_item.py
Normal file
8
cognee/tasks/ingestion/data_item.py
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class DataItem:
|
||||||
|
data: Any
|
||||||
|
label: Optional[str] = None
|
||||||
|
|
@ -20,6 +20,7 @@ from cognee.modules.data.methods import (
|
||||||
|
|
||||||
from .save_data_item_to_storage import save_data_item_to_storage
|
from .save_data_item_to_storage import save_data_item_to_storage
|
||||||
from .data_item_to_text_file import data_item_to_text_file
|
from .data_item_to_text_file import data_item_to_text_file
|
||||||
|
from .data_item import DataItem
|
||||||
|
|
||||||
|
|
||||||
async def ingest_data(
|
async def ingest_data(
|
||||||
|
|
@ -78,8 +79,16 @@ async def ingest_data(
|
||||||
dataset_data_map = {str(data.id): True for data in dataset_data}
|
dataset_data_map = {str(data.id): True for data in dataset_data}
|
||||||
|
|
||||||
for data_item in data:
|
for data_item in data:
|
||||||
|
# Support for DataItem (custom label + data wrapper)
|
||||||
|
current_label = None
|
||||||
|
underlying_data = data_item
|
||||||
|
|
||||||
|
if isinstance(data_item, DataItem):
|
||||||
|
underlying_data = data_item.data
|
||||||
|
current_label = data_item.label
|
||||||
|
|
||||||
# Get file path of data item or create a file if it doesn't exist
|
# Get file path of data item or create a file if it doesn't exist
|
||||||
original_file_path = await save_data_item_to_storage(data_item)
|
original_file_path = await save_data_item_to_storage(underlying_data)
|
||||||
# Transform file path to be OS usable
|
# Transform file path to be OS usable
|
||||||
actual_file_path = get_data_file_path(original_file_path)
|
actual_file_path = get_data_file_path(original_file_path)
|
||||||
|
|
||||||
|
|
@ -139,6 +148,7 @@ async def ingest_data(
|
||||||
data_point.external_metadata = ext_metadata
|
data_point.external_metadata = ext_metadata
|
||||||
data_point.node_set = json.dumps(node_set) if node_set else None
|
data_point.node_set = json.dumps(node_set) if node_set else None
|
||||||
data_point.tenant_id = user.tenant_id if user.tenant_id else None
|
data_point.tenant_id = user.tenant_id if user.tenant_id else None
|
||||||
|
data_point.label = current_label
|
||||||
|
|
||||||
# Check if data is already in dataset
|
# Check if data is already in dataset
|
||||||
if str(data_point.id) in dataset_data_map:
|
if str(data_point.id) in dataset_data_map:
|
||||||
|
|
@ -169,6 +179,7 @@ async def ingest_data(
|
||||||
tenant_id=user.tenant_id if user.tenant_id else None,
|
tenant_id=user.tenant_id if user.tenant_id else None,
|
||||||
pipeline_status={},
|
pipeline_status={},
|
||||||
token_count=-1,
|
token_count=-1,
|
||||||
|
label=current_label,
|
||||||
)
|
)
|
||||||
|
|
||||||
new_datapoints.append(data_point)
|
new_datapoints.append(data_point)
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ from cognee.shared.logging_utils import get_logger
|
||||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||||
|
|
||||||
from cognee.tasks.web_scraper.utils import fetch_page_content
|
from cognee.tasks.web_scraper.utils import fetch_page_content
|
||||||
|
from cognee.tasks.ingestion.data_item import DataItem
|
||||||
|
|
||||||
|
|
||||||
logger = get_logger()
|
logger = get_logger()
|
||||||
|
|
@ -95,5 +96,9 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str
|
||||||
# data is text, save it to data storage and return the file path
|
# data is text, save it to data storage and return the file path
|
||||||
return await save_data_to_file(data_item)
|
return await save_data_to_file(data_item)
|
||||||
|
|
||||||
|
if isinstance(data_item, DataItem):
|
||||||
|
# If instance is DataItem use the underlying data
|
||||||
|
return await save_data_item_to_storage(data_item.data)
|
||||||
|
|
||||||
# data is not a supported type
|
# data is not a supported type
|
||||||
raise IngestionError(message=f"Data type not supported: {type(data_item)}")
|
raise IngestionError(message=f"Data type not supported: {type(data_item)}")
|
||||||
|
|
|
||||||
68
cognee/tests/test_custom_data_label.py
Normal file
68
cognee/tests/test_custom_data_label.py
Normal file
|
|
@ -0,0 +1,68 @@
|
||||||
|
import asyncio
|
||||||
|
import cognee
|
||||||
|
from cognee.shared.logging_utils import setup_logging, ERROR
|
||||||
|
from cognee.api.v1.search import SearchType
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
# Create a clean slate for cognee -- reset data and system state
|
||||||
|
print("Resetting cognee data...")
|
||||||
|
await cognee.prune.prune_data()
|
||||||
|
await cognee.prune.prune_system(metadata=True)
|
||||||
|
print("Data reset complete.\n")
|
||||||
|
|
||||||
|
# cognee knowledge graph will be created based on this text
|
||||||
|
text = """
|
||||||
|
Natural language processing (NLP) is an interdisciplinary
|
||||||
|
subfield of computer science and information retrieval.
|
||||||
|
"""
|
||||||
|
from cognee.tasks.ingestion.data_item import DataItem
|
||||||
|
|
||||||
|
test_item = DataItem(text, "test_item")
|
||||||
|
# Add the text, and make it available for cognify
|
||||||
|
await cognee.add(test_item)
|
||||||
|
|
||||||
|
# Use LLMs and cognee to create knowledge graph
|
||||||
|
ret_val = await cognee.cognify()
|
||||||
|
|
||||||
|
query_text = "Tell me about NLP"
|
||||||
|
print(f"Searching cognee for insights with query: '{query_text}'")
|
||||||
|
# Query cognee for insights on the added text
|
||||||
|
search_results = await cognee.search(
|
||||||
|
query_type=SearchType.GRAPH_COMPLETION, query_text=query_text
|
||||||
|
)
|
||||||
|
|
||||||
|
print("Search results:")
|
||||||
|
# Display results
|
||||||
|
for result_text in search_results:
|
||||||
|
print(result_text)
|
||||||
|
|
||||||
|
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
|
||||||
|
|
||||||
|
for pipeline in ret_val.values():
|
||||||
|
dataset_id = pipeline.dataset_id
|
||||||
|
|
||||||
|
dataset_data = await get_dataset_data(dataset_id=dataset_id)
|
||||||
|
|
||||||
|
from fastapi.encoders import jsonable_encoder
|
||||||
|
|
||||||
|
data = [
|
||||||
|
dict(
|
||||||
|
**jsonable_encoder(data),
|
||||||
|
dataset_id=dataset_id,
|
||||||
|
)
|
||||||
|
for data in dataset_data
|
||||||
|
]
|
||||||
|
|
||||||
|
# Check if label is properly added and stored
|
||||||
|
assert data[0]["label"] == "test_item"
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
logger = setup_logging(log_level=ERROR)
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(main())
|
||||||
|
finally:
|
||||||
|
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||||
Loading…
Add table
Reference in a new issue