feature: extract entities from events (#1217)

<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
lxobr 2025-08-12 10:15:36 +02:00 committed by GitHub
parent 74acf029a9
commit 2149bb5601
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 322 additions and 126 deletions

View file

@ -1,11 +1,20 @@
from cognee.infrastructure.engine import DataPoint
from cognee.modules.engine.models.EntityType import EntityType
from typing import Optional
from pydantic import BaseModel, Field, ConfigDict
from typing import Optional, List, Any
from pydantic import BaseModel, Field, ConfigDict, SkipValidation
from cognee.infrastructure.engine.models.Edge import Edge
from cognee.modules.engine.models.Entity import Entity
class Timestamp(DataPoint):
time_at: int = Field(...)
year: int = Field(...)
month: int = Field(...)
day: int = Field(...)
hour: int = Field(...)
minute: int = Field(...)
second: int = Field(...)
timestamp_str: str = Field(...)
class Interval(DataPoint):
@ -19,5 +28,6 @@ class Event(DataPoint):
at: Optional[Timestamp] = None
during: Optional[Interval] = None
location: Optional[str] = None
attributes: SkipValidation[Any] = None # (Edge, list[Entity])
metadata: dict = {"index_fields": ["name"]}

View file

@ -0,0 +1,130 @@
import asyncio
from pydantic import BaseModel
from typing import Type, List
from datetime import datetime, timezone
from cognee.infrastructure.llm.LLMGateway import LLMGateway
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
from cognee.modules.engine.utils import generate_node_id
from cognee.temporal_poc.models.models import EventList
from cognee.temporal_poc.datapoints.datapoints import Interval, Timestamp, Event
# Global system prompt for event extraction
EVENT_EXTRACTION_SYSTEM_PROMPT = """
For the purposes of building event-based knowledge graphs, you are tasked with extracting highly granular stream events from a text. The events are defined as follows:
## Event Definition
- Anything with a date or a timestamp is an event
- Anything that took place in time (even if the time is unknown) is an event
- Anything that lasted over a period of time, or happened in an instant is an event: from historical milestones (wars, presidencies, olympiads) to personal milestones (birth, death, employment, etc.), to mundane actions (a walk, a conversation, etc.)
- **ANY action or verb represents an event** - this is the most important rule
- Every single verb in the text corresponds to an event that must be extracted
- This includes: thinking, feeling, seeing, hearing, moving, speaking, writing, reading, eating, sleeping, working, playing, studying, traveling, meeting, calling, texting, buying, selling, creating, destroying, building, breaking, starting, stopping, beginning, ending, etc.
- Even the most mundane or obvious actions are events: "he walked", "she sat", "they talked", "I thought", "we waited"
## Requirements
- **Be extremely thorough** - extract EVERY event mentioned, no matter how small or obvious
- **Timestamped first" - every time stamp, or date should have atleast one event
- **Verbs/actions = one event** - After you are done with timestamped events -- every verb that is an action should have a corresponding event.
- We expect long streams of events from any piece of text, easily reaching a hundred events
- Granularity and richness of the stream is key to our success and is of utmost importance
- Not all events will have timestamps, add timestamps only to known events
- For events that were instantaneous, just attach the time_from or time_to property don't create both
- **Do not skip any events** - if you're unsure whether something is an event, extract it anyway
- **Quantity over filtering** - it's better to extract too many events than to miss any
- **Descriptions** - Always include the event description together with entities (Who did what, what happened? What is the event?). If you can include the corresponding part from the text.
## Output Format
Your reply should be a JSON: list of dictionaries with the following structure:
```python
class Event(BaseModel):
name: str [concise]
description: Optional[str] = None
time_from: Optional[Timestamp] = None
time_to: Optional[Timestamp] = None
location: Optional[str] = None
```
"""
def date_to_int(ts: Timestamp) -> int:
"""Convert timestamp to integer milliseconds."""
dt = datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second, tzinfo=timezone.utc)
time = int(dt.timestamp() * 1000)
return time
def create_timestamp_datapoint(ts: Timestamp) -> Timestamp:
"""Create a Timestamp datapoint from a Timestamp model."""
time_at = date_to_int(ts)
timestamp_str = (
f"{ts.year:04d}-{ts.month:02d}-{ts.day:02d} {ts.hour:02d}:{ts.minute:02d}:{ts.second:02d}"
)
return Timestamp(
id=generate_node_id(str(time_at)),
time_at=time_at,
year=ts.year,
month=ts.month,
day=ts.day,
hour=ts.hour,
minute=ts.minute,
second=ts.second,
timestamp_str=timestamp_str,
)
def create_event_datapoint(event) -> Event:
"""Create an Event datapoint from an event model."""
# Base event data
event_data = {
"name": event.name,
"description": event.description,
"location": event.location,
}
# Create timestamps if they exist
time_from = create_timestamp_datapoint(event.time_from) if event.time_from else None
time_to = create_timestamp_datapoint(event.time_to) if event.time_to else None
# Add temporal information
if time_from and time_to:
event_data["during"] = Interval(time_from=time_from, time_to=time_to)
# Enrich description with temporal info
temporal_info = f"\n---\nTime data: {time_from.timestamp_str} to {time_to.timestamp_str}"
event_data["description"] = (event_data["description"] or "Event") + temporal_info
elif time_from or time_to:
timestamp = time_from or time_to
event_data["at"] = timestamp
# Enrich description with temporal info
temporal_info = f"\n---\nTime data: {timestamp.timestamp_str}"
event_data["description"] = (event_data["description"] or "Event") + temporal_info
return Event(**event_data)
async def extract_event_graph(
content: str, response_model: Type[BaseModel], system_prompt: str = None
):
"""Extract event graph from content using LLM."""
if system_prompt is None:
system_prompt = EVENT_EXTRACTION_SYSTEM_PROMPT
content_graph = await LLMGateway.acreate_structured_output(
content, system_prompt, response_model
)
return content_graph
async def extract_events_and_entities(data_chunks: List[DocumentChunk]) -> List[DocumentChunk]:
"""Extracts events and entities from a chunk of documents."""
events = await asyncio.gather(
*[extract_event_graph(chunk.text, EventList) for chunk in data_chunks]
)
for data_chunk, event_list in zip(data_chunks, events):
for event in event_list.events:
event_datapoint = create_event_datapoint(event)
data_chunk.contains.append(event_datapoint)
return data_chunks

View file

@ -0,0 +1,153 @@
from typing import List, Type
from pydantic import BaseModel
from cognee.infrastructure.llm.LLMGateway import LLMGateway
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
from cognee.modules.engine.models.Entity import Entity
from cognee.modules.engine.models.EntityType import EntityType
from cognee.infrastructure.engine.models.Edge import Edge
from cognee.modules.engine.utils import generate_node_id, generate_node_name
from cognee.temporal_poc.models.models import EventEntityList
from cognee.temporal_poc.datapoints.datapoints import Event
from cognee.temporal_poc.models.models import EventWithEntities
ENTITY_EXTRACTION_SYSTEM_PROMPT = """For the purposes of building event-based knowledge graphs, you are tasked with extracting highly granular entities from events text. An entity is any distinct, identifiable thing, person, place, object, organization, concept, or phenomenon that can be named, referenced, or described in the event context. This includes but is not limited to: people, places, objects, organizations, concepts, events, processes, states, conditions, properties, attributes, roles, functions, and any other meaningful referents that contribute to understanding the event.
**Temporal Entity Exclusion**: Do not extract timestamp-like entities (dates, times, durations) as these are handled separately. However, extract named temporal periods, eras, historical epochs, and culturally significant time references
## Input Format
The input will be a list of dictionaries, each containing:
- `event_name`: The name of the event
- `description`: The description of the event
## Task
For each event, extract all entities mentioned in the event description and determine their relationship to the event.
## Output Format
Return the same enriched JSON with an additional key in each dictionary: `attributes`.
The `attributes` should be a list of dictionaries, each containing:
- `entity`: The name of the entity
- `entity_type`: The type/category of the entity (person, place, organization, object, concept, etc.)
- `relationship`: A concise description of how the entity relates to the event
## Requirements
- **Be extremely thorough** - extract EVERY non-temporal entity mentioned, no matter how small, obvious, or seemingly insignificant
- **After you are done with obvious entities, every noun, pronoun, proper noun, and named reference = one entity**
- We expect rich entity networks from any event, easily reaching a dozens of entities per event
- Granularity and richness of the entity extraction is key to our success and is of utmost importance
- **Do not skip any entities** - if you're unsure whether something is an entity, extract it anyway
- Use the event name for context when determining relationships
- Relationships should be technical with one or at most two words. If two words, use underscore camelcase style
- Relationships could imply general meaning like: subject, object, participant, recipient, agent, instrument, tool, source, cause, effect, purpose, manner, resource, etc.
- You can combine two words to form a relationship name: subject_role, previous_owner, etc.
- Focus on how the entity specifically relates to the event
"""
async def extract_event_entities(
content: str, response_model: Type[BaseModel], system_prompt: str = None
):
"""Extract event entities from content using LLM."""
if system_prompt is None:
system_prompt = ENTITY_EXTRACTION_SYSTEM_PROMPT
content_graph = await LLMGateway.acreate_structured_output(
content, system_prompt, response_model
)
return content_graph
async def enrich_events(events: List[Event]) -> List[EventWithEntities]:
"""Extract entities from events and return enriched events."""
import json
# Convert events to JSON format for LLM processing
events_json = [
{"event_name": event.name, "description": event.description or ""} for event in events
]
events_json_str = json.dumps(events_json)
# Extract entities from events
entity_result = await extract_event_entities(events_json_str, EventEntityList)
return entity_result.events
def add_entities_to_event(event: Event, event_with_entities: EventWithEntities) -> None:
"""Add entities to event via attributes field."""
if not event_with_entities.attributes:
return
# Create entity types cache
entity_types = {}
# Process each attribute
for attribute in event_with_entities.attributes:
# Get or create entity type
entity_type = get_or_create_entity_type(entity_types, attribute.entity_type)
# Create entity
entity_id = generate_node_id(attribute.entity)
entity_name = generate_node_name(attribute.entity)
entity = Entity(
id=entity_id,
name=entity_name,
is_a=entity_type,
description=f"Entity {attribute.entity} of type {attribute.entity_type}",
ontology_valid=False,
belongs_to_set=None,
)
# Create edge
edge = Edge(relationship_type=attribute.relationship)
# Add to event attributes
if event.attributes is None:
event.attributes = []
event.attributes.append((edge, [entity]))
def get_or_create_entity_type(entity_types: dict, entity_type_name: str) -> EntityType:
"""Get existing entity type or create new one."""
if entity_type_name not in entity_types:
type_id = generate_node_id(entity_type_name)
type_name = generate_node_name(entity_type_name)
entity_type = EntityType(
id=type_id,
name=type_name,
type=type_name,
description=f"Type for {entity_type_name}",
ontology_valid=False,
)
entity_types[entity_type_name] = entity_type
return entity_types[entity_type_name]
async def extract_event_knowledge_graph(data_chunks: List[DocumentChunk]) -> List[DocumentChunk]:
"""Extract events from chunks and enrich them with entities."""
# Extract events from chunks
all_events = []
for chunk in data_chunks:
for item in chunk.contains:
if isinstance(item, Event):
all_events.append(item)
if not all_events:
return data_chunks
# Enrich events with entities
enriched_events = await enrich_events(all_events)
# Add entities to events
for event, enriched_event in zip(all_events, enriched_events):
add_entities_to_event(event, enriched_event)
return data_chunks
async def process_event_knowledge_graph(data_chunks: List[DocumentChunk]) -> List[DocumentChunk]:
"""Process document chunks for event knowledge graph construction."""
return await extract_event_knowledge_graph(data_chunks)

View file

@ -32,3 +32,19 @@ class Event(BaseModel):
class EventList(BaseModel):
events: List[Event]
class EntityAttribute(BaseModel):
entity: str
entity_type: str
relationship: str
class EventWithEntities(BaseModel):
event_name: str
description: Optional[str] = None
attributes: List[EntityAttribute] = []
class EventEntityList(BaseModel):
events: List[EventWithEntities]

View file

@ -1,11 +1,5 @@
import asyncio
import uuid
from pydantic import BaseModel
from typing import Union, Optional, List, Type
from uuid import UUID, uuid5
from datetime import datetime, timezone
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from typing import Union, Optional, List
from uuid import UUID
from cognee.shared.logging_utils import get_logger
from cognee.shared.data_models import KnowledgeGraph
from cognee.infrastructure.llm import get_max_chunk_tokens, get_llm_config
@ -14,7 +8,6 @@ from cognee.api.v1.cognify.cognify import run_cognify_blocking
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.chunking.TextChunker import TextChunker
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
from cognee.modules.users.models import User
from cognee.tasks.documents import (
@ -25,119 +18,12 @@ from cognee.tasks.documents import (
from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.storage import add_data_points
from cognee.tasks.summarization import summarize_text
from cognee.temporal_poc.models.models import EventList
from cognee.temporal_poc.datapoints.datapoints import Interval, Timestamp, Event
from cognee.temporal_poc.event_extraction import extract_events_and_entities
from cognee.temporal_poc.event_knowledge_graph import process_event_knowledge_graph
logger = get_logger("temporal_cognify")
def date_to_int(ts: Timestamp) -> int:
dt = datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second, tzinfo=timezone.utc)
time = int(dt.timestamp() * 1000)
return time
async def extract_event_graph(content: str, response_model: Type[BaseModel]):
llm_client = get_llm_client()
system_prompt = """
For the purposes of building event-based knowledge graphs, you are tasked with extracting highly granular stream events from a text. The events are defined as follows:
## Event Definition
- Anything with a date or a timestamp is an event
- Anything that took place in time (even if the time is unknown) is an event
- Anything that lasted over a period of time, or happened in an instant is an event: from historical milestones (wars, presidencies, olympiads) to personal milestones (birth, death, employment, etc.), to mundane actions (a walk, a conversation, etc.)
- **ANY action or verb represents an event** - this is the most important rule
- Every single verb in the text corresponds to an event that must be extracted
- This includes: thinking, feeling, seeing, hearing, moving, speaking, writing, reading, eating, sleeping, working, playing, studying, traveling, meeting, calling, texting, buying, selling, creating, destroying, building, breaking, starting, stopping, beginning, ending, etc.
- Even the most mundane or obvious actions are events: "he walked", "she sat", "they talked", "I thought", "we waited"
## Requirements
- **Be extremely thorough** - extract EVERY event mentioned, no matter how small or obvious
- **Timestamped first" - every time stamp, or date should have atleast one event
- **Verbs/actions = one event** - After you are done with timestamped events -- every verb that is an action should have a corresponding event.
- We expect long streams of events from any piece of text, easily reaching a hundred events
- Granularity and richness of the stream is key to our success and is of utmost importance
- Not all events will have timestamps, add timestamps only to known events
- For events that were instantaneous, just attach the time_from or time_to property don't create both
- **Do not skip any events** - if you're unsure whether something is an event, extract it anyway
- **Quantity over filtering** - it's better to extract too many events than to miss any
- **Descriptions** - Always include the event description together with entities (Who did what, what happened? What is the event?). If you can include the corresponding part from the text.
## Output Format
Your reply should be a JSON: list of dictionaries with the following structure:
```python
class Event(BaseModel):
name: str [concise]
description: Optional[str] = None
time_from: Optional[Timestamp] = None
time_to: Optional[Timestamp] = None
location: Optional[str] = None
```
"""
content_graph = await llm_client.acreate_structured_output(
content, system_prompt, response_model
)
return content_graph
async def extract_events_and_entities(data_chunks: List[DocumentChunk]) -> List[DocumentChunk]:
"""Extracts events and entities from a chunk of documents."""
# data_chunks = data_chunks + data_chunks
events = await asyncio.gather(
*[extract_event_graph(chunk.text, EventList) for chunk in data_chunks]
)
for data_chunk, event_list in zip(data_chunks, events):
for event in event_list.events:
if event.time_from and event.time_to:
event_time_from = date_to_int(event.time_from)
event_time_to = date_to_int(event.time_to)
timestamp_time_from = Timestamp(
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_from)), time_at=event_time_from
)
timestamp_time_to = Timestamp(
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_to)), time_at=event_time_to
)
event_interval = Interval(time_from=timestamp_time_from, time_to=timestamp_time_to)
event_datapoint = Event(
name=event.name,
description=event.description,
during=event_interval,
location=event.location,
)
elif event.time_from:
event_time_from = date_to_int(event.time_from)
event_time_at = Timestamp(
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_from)), time_at=event_time_from
)
event_datapoint = Event(
name=event.name,
description=event.description,
at=event_time_at,
location=event.location,
)
elif event.time_to:
event_time_to = date_to_int(event.time_to)
event_time_at = Timestamp(
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_to)), time_at=event_time_to
)
event_datapoint = Event(
name=event.name,
description=event.description,
at=event_time_at,
location=event.location,
)
else:
event_datapoint = Event(
name=event.name, description=event.description, location=event.location
)
data_chunk.contains.append(event_datapoint)
return data_chunks
async def get_temporal_tasks(
user: User = None, chunker=TextChunker, chunk_size: int = None
) -> list[Task]:
@ -150,6 +36,7 @@ async def get_temporal_tasks(
chunker=chunker,
),
Task(extract_events_and_entities, task_config={"chunk_size": 10}),
Task(process_event_knowledge_graph),
Task(add_data_points, task_config={"batch_size": 10}),
]

View file

@ -33,11 +33,13 @@ async def main():
texts = await reading_temporal_data()
texts = texts[:5]
# texts = ["Buzz Aldrin (born January 20, 1930) is an American former astronaut."]
await cognee.add(texts)
await temporal_cognify()
search_results = await cognee.search(
query_type=SearchType.TEMPORAL, query_text="What happened in 2015"
query_type=SearchType.TEMPORAL, query_text="What happened in the 1930s?"
)
print(search_results)

View file

@ -4,7 +4,7 @@ import string
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.engine import DataPoint
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.infrastructure.llm.LLMGateway import LLMGateway
from cognee.modules.graph.utils.convert_node_to_data_point import get_all_subclasses
from cognee.modules.retrieval.base_retriever import BaseRetriever
from cognee.modules.retrieval.utils.brute_force_triplet_search import brute_force_triplet_search
@ -12,7 +12,7 @@ from cognee.modules.retrieval.utils.completion import generate_completion
from cognee.modules.retrieval.utils.stop_words import DEFAULT_STOP_WORDS
from cognee.shared.logging_utils import get_logger
from cognee.temporal_poc.models.models import QueryInterval
from cognee.temporal_poc.temporal_cognify import date_to_int
from cognee.temporal_poc.event_extraction import date_to_int
logger = get_logger("TemporalRetriever")
@ -98,8 +98,6 @@ class TemporalRetriever(BaseRetriever):
return found_triplets
async def extract_time_from_query(self, query: str):
llm_client = get_llm_client()
system_prompt = """
For the purposes of identifying timestamps in a query, you are tasked with extracting relevant timestamps from the query.
## Timestamp requirements
@ -117,7 +115,7 @@ class TemporalRetriever(BaseRetriever):
```
"""
interval = await llm_client.acreate_structured_output(query, system_prompt, QueryInterval)
interval = await LLMGateway.acreate_structured_output(query, system_prompt, QueryInterval)
return interval