feat: adds entity kg from events logic

This commit is contained in:
hajdul88 2025-08-27 18:04:10 +02:00
parent 7468ef6e53
commit 97abdeeb2a
5 changed files with 105 additions and 2 deletions

View file

@ -1,2 +1,3 @@
from .extract_events_and_entities import extract_events_and_entities
from .extract_events_and_entities import extract_events_and_timestamps
from .extract_knowledge_graph_from_events import extract_knowledge_graph_from_events

View file

@ -0,0 +1,55 @@
from cognee.modules.engine.models import Event
from cognee.tasks.temporal_graph.models import EventWithEntities
from cognee.modules.engine.models.Entity import Entity
from cognee.modules.engine.models.EntityType import EntityType
from cognee.infrastructure.engine.models.Edge import Edge
from cognee.modules.engine.utils import generate_node_id, generate_node_name
def add_entities_to_event(event: Event, event_with_entities: EventWithEntities) -> None:
"""Add entities to event via attributes field."""
if not event_with_entities.attributes:
return
# Create entity types cache
entity_types = {}
# Process each attribute
for attribute in event_with_entities.attributes:
# Get or create entity type
entity_type = get_or_create_entity_type(entity_types, attribute.entity_type)
# Create entity
entity_id = generate_node_id(attribute.entity)
entity_name = generate_node_name(attribute.entity)
entity = Entity(
id=entity_id,
name=entity_name,
is_a=entity_type,
description=f"Entity {attribute.entity} of type {attribute.entity_type}",
ontology_valid=False,
belongs_to_set=None,
)
# Create edge
edge = Edge(relationship_type=attribute.relationship)
# Add to event attributes
if event.attributes is None:
event.attributes = []
event.attributes.append((edge, [entity]))
def get_or_create_entity_type(entity_types: dict, entity_type_name: str) -> EntityType:
"""Get existing entity type or create new one."""
if entity_type_name not in entity_types:
type_id = generate_node_id(entity_type_name)
type_name = generate_node_name(entity_type_name)
entity_type = EntityType(
id=type_id,
name=type_name,
type=type_name,
description=f"Type for {entity_type_name}",
ontology_valid=False,
)
entity_types[entity_type_name] = entity_type
return entity_types[entity_type_name]

View file

@ -0,0 +1,21 @@
from typing import List
from cognee.infrastructure.llm import LLMGateway
from cognee.modules.engine.models import Event
from cognee.tasks.temporal_graph.models import EventWithEntities,EventEntityList
async def enrich_events(events: List[Event]) -> List[EventWithEntities]:
"""Extract entities from events and return enriched events."""
import json
# Convert events to JSON format for LLM processing
events_json = [
{"event_name": event.name, "description": event.description or ""} for event in events
]
events_json_str = json.dumps(events_json)
# Extract entities from events
entity_result = await LLMGateway.extract_event_entities(events_json_str, EventEntityList)
return entity_result.events

View file

@ -6,7 +6,7 @@ from cognee.tasks.temporal_graph.models import EventList
from cognee.modules.engine.utils.generate_event_datapoint import generate_event_datapoint
async def extract_events_and_entities(data_chunks: List[DocumentChunk]) -> List[DocumentChunk]:
async def extract_events_and_timestamps(data_chunks: List[DocumentChunk]) -> List[DocumentChunk]:
"""Extracts events and entities from a chunk of documents."""
events = await asyncio.gather(
*[LLMGateway.extract_event_graph(chunk.text, EventList) for chunk in data_chunks]

View file

@ -0,0 +1,26 @@
from typing import List
from cognee.modules.chunking.models import DocumentChunk
from cognee.modules.engine.models import Event
from cognee.tasks.temporal_graph.enrich_events import enrich_events
from cognee.tasks.temporal_graph.add_entities_to_event import add_entities_to_event
async def extract_knowledge_graph_from_events(data_chunks: List[DocumentChunk]) -> List[DocumentChunk]:
"""Extract events from chunks and enrich them with entities."""
# Extract events from chunks
all_events = []
for chunk in data_chunks:
for item in chunk.contains:
if isinstance(item, Event):
all_events.append(item)
if not all_events:
return data_chunks
# Enrich events with entities
enriched_events = await enrich_events(all_events)
# Add entities to events
for event, enriched_event in zip(all_events, enriched_events):
add_entities_to_event(event, enriched_event)
return data_chunks