feat: Add custom label by contributor: apenade (#1913)

<!-- .github/pull_request_template.md -->

## Description
Add ability to define custom labels for Data in Cognee. Initial PR by
contributor: apenade

## Acceptance Criteria
<!--
* Key requirements to the new feature or modification;
* Proof that the changes work and meet the requirements;
* Include instructions on how to verify the changes. Describe how to
test it locally;
* Proof that it's sufficiently tested.
-->

## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [ ] Performance improvement
- [ ] Other (please specify):

## Screenshots/Videos (if applicable)
<!-- Add screenshots or videos to help explain your changes -->

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [ ] **I have tested my changes thoroughly before submitting this PR**
- [ ] **This PR contains minimal changes necessary to address the
issue/feature**
- [ ] My code follows the project's coding standards and style
guidelines
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added necessary documentation (if applicable)
- [ ] All new and existing tests pass
- [ ] I have searched existing PRs to ensure this change hasn't been
submitted already
- [ ] I have linked any relevant issues in the description
- [ ] My commits have clear and descriptive messages

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **New Features**
* Added support for labeling individual data items during ingestion
workflows
* Expanded the add API to accept data items with optional custom labels
for better organization
* Labels are persisted and retrievable when accessing dataset
information
* Enhanced data retrieval to include label information in API responses

* **Tests**
* Added comprehensive end-to-end tests validating custom data labeling
functionality

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
Vasilije 2025-12-17 21:21:40 +01:00 committed by GitHub
commit 2ef8094666
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 3166 additions and 2794 deletions

View file

@ -315,6 +315,31 @@ jobs:
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
run: uv run python ./cognee/tests/test_multi_tenancy.py
test-data-label:
name: Test adding of label for data in Cognee
runs-on: ubuntu-22.04
steps:
- name: Check out repository
uses: actions/checkout@v4
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: '3.11.x'
- name: Run custom data label test
env:
ENV: 'dev'
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
run: uv run python ./cognee/tests/test_custom_data_label.py
test-graph-edges:
name: Test graph edge ingestion
runs-on: ubuntu-22.04

View file

@ -0,0 +1,38 @@
"""Add label column to data table
Revision ID: a1b2c3d4e5f6
Revises: 211ab850ef3d
Create Date: 2025-11-17 17:54:32.123456
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "a1b2c3d4e5f6"
down_revision: Union[str, None] = "46a6ce2bd2b2"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def _get_column(inspector, table, name, schema=None):
for col in inspector.get_columns(table, schema=schema):
if col["name"] == name:
return col
return None
def upgrade() -> None:
conn = op.get_bind()
insp = sa.inspect(conn)
label_column = _get_column(insp, "data", "label")
if not label_column:
op.add_column("data", sa.Column("label", sa.String(), nullable=True))
def downgrade() -> None:
op.drop_column("data", "label")

4183
cognee-mcp/uv.lock generated

File diff suppressed because it is too large Load diff

View file

@ -10,13 +10,14 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
)
from cognee.modules.engine.operations.setup import setup
from cognee.tasks.ingestion import ingest_data, resolve_data_directories
from cognee.tasks.ingestion.data_item import DataItem
from cognee.shared.logging_utils import get_logger
logger = get_logger()
async def add(
data: Union[BinaryIO, list[BinaryIO], str, list[str]],
data: Union[BinaryIO, list[BinaryIO], str, list[str], DataItem, list[DataItem]],
dataset_name: str = "main_dataset",
user: User = None,
node_set: Optional[List[str]] = None,

View file

@ -44,6 +44,7 @@ class DatasetDTO(OutDTO):
class DataDTO(OutDTO):
id: UUID
name: str
label: Optional[str] = None
created_at: datetime
updated_at: Optional[datetime] = None
extension: str

View file

@ -13,7 +13,7 @@ class Data(Base):
__tablename__ = "data"
id = Column(UUID, primary_key=True, default=uuid4)
label = Column(String, nullable=True)
name = Column(String)
extension = Column(String)
mime_type = Column(String)
@ -49,6 +49,7 @@ class Data(Base):
return {
"id": str(self.id),
"name": self.name,
"label": self.label,
"extension": self.extension,
"mimeType": self.mime_type,
"rawDataLocation": self.raw_data_location,

View file

@ -0,0 +1,8 @@
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class DataItem:
data: Any
label: Optional[str] = None

View file

@ -20,6 +20,7 @@ from cognee.modules.data.methods import (
from .save_data_item_to_storage import save_data_item_to_storage
from .data_item_to_text_file import data_item_to_text_file
from .data_item import DataItem
async def ingest_data(
@ -78,8 +79,16 @@ async def ingest_data(
dataset_data_map = {str(data.id): True for data in dataset_data}
for data_item in data:
# Support for DataItem (custom label + data wrapper)
current_label = None
underlying_data = data_item
if isinstance(data_item, DataItem):
underlying_data = data_item.data
current_label = data_item.label
# Get file path of data item or create a file if it doesn't exist
original_file_path = await save_data_item_to_storage(data_item)
original_file_path = await save_data_item_to_storage(underlying_data)
# Transform file path to be OS usable
actual_file_path = get_data_file_path(original_file_path)
@ -139,6 +148,7 @@ async def ingest_data(
data_point.external_metadata = ext_metadata
data_point.node_set = json.dumps(node_set) if node_set else None
data_point.tenant_id = user.tenant_id if user.tenant_id else None
data_point.label = current_label
# Check if data is already in dataset
if str(data_point.id) in dataset_data_map:
@ -169,6 +179,7 @@ async def ingest_data(
tenant_id=user.tenant_id if user.tenant_id else None,
pipeline_status={},
token_count=-1,
label=current_label,
)
new_datapoints.append(data_point)

View file

@ -9,6 +9,7 @@ from cognee.shared.logging_utils import get_logger
from pydantic_settings import BaseSettings, SettingsConfigDict
from cognee.tasks.web_scraper.utils import fetch_page_content
from cognee.tasks.ingestion.data_item import DataItem
logger = get_logger()
@ -95,5 +96,9 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str
# data is text, save it to data storage and return the file path
return await save_data_to_file(data_item)
if isinstance(data_item, DataItem):
# If instance is DataItem use the underlying data
return await save_data_item_to_storage(data_item.data)
# data is not a supported type
raise IngestionError(message=f"Data type not supported: {type(data_item)}")

View file

@ -0,0 +1,68 @@
import asyncio
import cognee
from cognee.shared.logging_utils import setup_logging, ERROR
from cognee.api.v1.search import SearchType
async def main():
# Create a clean slate for cognee -- reset data and system state
print("Resetting cognee data...")
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
print("Data reset complete.\n")
# cognee knowledge graph will be created based on this text
text = """
Natural language processing (NLP) is an interdisciplinary
subfield of computer science and information retrieval.
"""
from cognee.tasks.ingestion.data_item import DataItem
test_item = DataItem(text, "test_item")
# Add the text, and make it available for cognify
await cognee.add(test_item)
# Use LLMs and cognee to create knowledge graph
ret_val = await cognee.cognify()
query_text = "Tell me about NLP"
print(f"Searching cognee for insights with query: '{query_text}'")
# Query cognee for insights on the added text
search_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION, query_text=query_text
)
print("Search results:")
# Display results
for result_text in search_results:
print(result_text)
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
for pipeline in ret_val.values():
dataset_id = pipeline.dataset_id
dataset_data = await get_dataset_data(dataset_id=dataset_id)
from fastapi.encoders import jsonable_encoder
data = [
dict(
**jsonable_encoder(data),
dataset_id=dataset_id,
)
for data in dataset_data
]
# Check if label is properly added and stored
assert data[0]["label"] == "test_item"
if __name__ == "__main__":
logger = setup_logging(log_level=ERROR)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())

1613
uv.lock generated

File diff suppressed because it is too large Load diff