cognee/cognee/tasks/storage/index_data_points.py
lxobr 6223ecf05b
feat: optimize repeated entity extraction (#1682)
<!-- .github/pull_request_template.md -->

## Description
<!--
Please provide a clear, human-generated description of the changes in
this PR.
DO NOT use AI-generated descriptions. We want to understand your thought
process and reasoning.
-->

- Added an `edge_text` field to edges that auto-fills from
`relationship_type` if not provided.
- Containts edges now store descriptions for better embedding
- Updated and refactored indexing so that edge_text gets embedded and
exposed
- Updated retrieval to use the new embeddings 
- Added a test to verify edge_text exists in the graph with the correct
format.

## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [x] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [x] Code refactoring
- [x] Performance improvement
- [ ] Other (please specify):

## Screenshots/Videos (if applicable)
<!-- Add screenshots or videos to help explain your changes -->

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [x] **I have tested my changes thoroughly before submitting this PR**
- [x] **This PR contains minimal changes necessary to address the
issue/feature**
- [x] My code follows the project's coding standards and style
guidelines
- [x] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added necessary documentation (if applicable)
- [ ] All new and existing tests pass
- [x] I have searched existing PRs to ensure this change hasn't been
submitted already
- [ ] I have linked any relevant issues in the description
- [ ] My commits have clear and descriptive messages

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
2025-10-30 13:56:06 +01:00

140 lines
4.7 KiB
Python

import asyncio
from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.infrastructure.engine import DataPoint
logger = get_logger("index_data_points")
async def index_data_points(data_points: list[DataPoint]):
"""Index data points in the vector engine by creating embeddings for specified fields.
Process:
1. Groups data points into a nested dict: {type_name: {field_name: [points]}}
2. Creates vector indexes for each (type, field) combination on first encounter
3. Batches points per (type, field) and creates async indexing tasks
4. Executes all indexing tasks in parallel for efficient embedding generation
Args:
data_points: List of DataPoint objects to index. Each DataPoint's metadata must
contain an 'index_fields' list specifying which fields to embed.
Returns:
The original data_points list.
"""
data_points_by_type = {}
vector_engine = get_vector_engine()
for data_point in data_points:
data_point_type = type(data_point)
type_name = data_point_type.__name__
for field_name in data_point.metadata["index_fields"]:
if getattr(data_point, field_name, None) is None:
continue
if type_name not in data_points_by_type:
data_points_by_type[type_name] = {}
if field_name not in data_points_by_type[type_name]:
await vector_engine.create_vector_index(type_name, field_name)
data_points_by_type[type_name][field_name] = []
indexed_data_point = data_point.model_copy()
indexed_data_point.metadata["index_fields"] = [field_name]
data_points_by_type[type_name][field_name].append(indexed_data_point)
batch_size = vector_engine.embedding_engine.get_batch_size()
batches = (
(type_name, field_name, points[i : i + batch_size])
for type_name, fields in data_points_by_type.items()
for field_name, points in fields.items()
for i in range(0, len(points), batch_size)
)
tasks = [
asyncio.create_task(vector_engine.index_data_points(type_name, field_name, batch_points))
for type_name, field_name, batch_points in batches
]
await asyncio.gather(*tasks)
return data_points
async def get_data_points_from_model(
data_point: DataPoint, added_data_points=None, visited_properties=None
) -> list[DataPoint]:
data_points = []
added_data_points = added_data_points or {}
visited_properties = visited_properties or {}
for field_name, field_value in data_point:
if isinstance(field_value, DataPoint):
property_key = f"{str(data_point.id)}{field_name}{str(field_value.id)}"
if property_key in visited_properties:
return []
visited_properties[property_key] = True
new_data_points = await get_data_points_from_model(
field_value, added_data_points, visited_properties
)
for new_point in new_data_points:
if str(new_point.id) not in added_data_points:
added_data_points[str(new_point.id)] = True
data_points.append(new_point)
if (
isinstance(field_value, list)
and len(field_value) > 0
and isinstance(field_value[0], DataPoint)
):
for field_value_item in field_value:
property_key = f"{str(data_point.id)}{field_name}{str(field_value_item.id)}"
if property_key in visited_properties:
return []
visited_properties[property_key] = True
new_data_points = await get_data_points_from_model(
field_value_item, added_data_points, visited_properties
)
for new_point in new_data_points:
if str(new_point.id) not in added_data_points:
added_data_points[str(new_point.id)] = True
data_points.append(new_point)
if str(data_point.id) not in added_data_points:
data_points.append(data_point)
return data_points
if __name__ == "__main__":
class Car(DataPoint):
model: str
color: str
metadata: dict = {"index_fields": ["name"]}
class Person(DataPoint):
name: str
age: int
owns_car: list[Car]
metadata: dict = {"index_fields": ["name"]}
car1 = Car(model="Tesla Model S", color="Blue")
car2 = Car(model="Toyota Camry", color="Red")
person = Person(name="John", age=30, owns_car=[car1, car2])
data_points = get_data_points_from_model(person)
print(data_points)