feat: adds event extraction and timestamp and interval definitons

This commit is contained in:
hajdul88 2025-08-02 14:28:32 +02:00
parent e90cbc43dd
commit 6119ac08de
3 changed files with 64 additions and 25 deletions

View file

@ -4,13 +4,13 @@ from typing import Optional
from pydantic import BaseModel, Field, ConfigDict
class Interval(DataPoint):
time_from: int = Field(..., ge=0)
time_to: int = Field(..., ge=0)
class Timestamp(DataPoint):
time_at: int = Field(..., ge=0)
time_at: int = Field(...)
class Interval(DataPoint):
time_from: Timestamp = Field(...)
time_to: Timestamp = Field(...)
class Event(DataPoint):

View file

@ -1,8 +1,10 @@
import asyncio
import uuid
from pydantic import BaseModel
from typing import Union, Optional, List, Type
from uuid import UUID
from uuid import UUID, uuid5
from datetime import datetime, timezone
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.shared.logging_utils import get_logger
from cognee.shared.data_models import KnowledgeGraph
@ -29,22 +31,47 @@ from cognee.temporal_poc.datapoints.datapoints import Interval, Timestamp, Event
logger = get_logger("temporal_cognify")
def date_to_int(ts: Timestamp) -> int:
dt = datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second, tzinfo=timezone.utc)
time = int(dt.timestamp() * 1000)
return time
async def extract_event_graph(content: str, response_model: Type[BaseModel]):
llm_client = get_llm_client()
system_prompt = """
You are an extractor. From input text, pull out:
Timestamps: concrete points (year, month, day, hour, minute, second).
Intervals: spans with explicit start and end times; resolve relative durations if anchored.
Entities: people, organizations, topics, etc., with name, short description, and with their type (person/org/location/topic/other). Always attach the type.
Events: include name, brief description, subject (actor), object (target), time as either a point (at) or span (during), and location. Prefer during if its a multi-hour span; use at for a point. Omit ambiguous times rather than guessing.
Output JSON. Reuse entity names when repeated. Use null for missing optional fields.
For the purposes of building event-based knowledge graphs, you are tasked with extracting highly granular stream events from a text. The events are defined as follows:
## Event Definition
- Anything with a date or a timestamp is an event
- Anything that took place in time (even if the time is unknown) is an event
- Anything that lasted over a period of time, or happened in an instant is an event: from historical milestones (wars, presidencies, olympiads) to personal milestones (birth, death, employment, etc.), to mundane actions (a walk, a conversation, etc.)
- **ANY action or verb represents an event** - this is the most important rule
- Every single verb in the text corresponds to an event that must be extracted
- This includes: thinking, feeling, seeing, hearing, moving, speaking, writing, reading, eating, sleeping, working, playing, studying, traveling, meeting, calling, texting, buying, selling, creating, destroying, building, breaking, starting, stopping, beginning, ending, etc.
- Even the most mundane or obvious actions are events: "he walked", "she sat", "they talked", "I thought", "we waited"
## Requirements
- **Be extremely thorough** - extract EVERY event mentioned, no matter how small or obvious
- **Timestamped first" - every time stamp, or date should have atleast one event
- **Verbs/actions = one event** - After you are done with timestamped events -- every verb that is an action should have a corresponding event.
- We expect long streams of events from any piece of text, easily reaching a hundred events
- Granularity and richness of the stream is key to our success and is of utmost importance
- Not all events will have timestamps, add timestamps only to known events
- For events that were instantaneous, just attach the time_from or time_to property don't create both
- **Do not skip any events** - if you're unsure whether something is an event, extract it anyway
- **Quantity over filtering** - it's better to extract too many events than to miss any
- **Descriptions** - Always include the event description together with entities (Who did what, what happened? What is the event?). If you can include the corresponding part from the text.
## Output Format
Your reply should be a JSON: list of dictionaries with the following structure:
```python
class Event(BaseModel):
name: str [concise]
description: Optional[str] = None
time_from: Optional[Timestamp] = None
time_to: Optional[Timestamp] = None
location: Optional[str] = None
```
"""
content_graph = await llm_client.acreate_structured_output(
@ -64,9 +91,15 @@ async def extract_events_and_entities(data_chunks: List[DocumentChunk]) -> List[
for data_chunk, event_list in zip(data_chunks, events):
for event in event_list.events:
if event.time_from and event.time_to:
event_interval = Interval(
time_from=int(event.time_from), time_to=int(event.time_to)
event_time_from = date_to_int(event.time_from)
event_time_to = date_to_int(event.time_to)
timestamp_time_from = Timestamp(
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_from)), time_at=event_time_from
)
timestamp_time_to = Timestamp(
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_to)), time_at=event_time_to
)
event_interval = Interval(time_from=timestamp_time_from, time_to=timestamp_time_to)
event_datapoint = Event(
name=event.name,
description=event.description,
@ -74,7 +107,10 @@ async def extract_events_and_entities(data_chunks: List[DocumentChunk]) -> List[
location=event.location,
)
elif event.time_from:
event_time_at = Timestamp(time_at=int(event.time_from))
event_time_from = date_to_int(event.time_from)
event_time_at = Timestamp(
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_from)), time_at=event_time_from
)
event_datapoint = Event(
name=event.name,
description=event.description,
@ -82,7 +118,10 @@ async def extract_events_and_entities(data_chunks: List[DocumentChunk]) -> List[
location=event.location,
)
elif event.time_to:
event_time_at = Timestamp(time_at=int(event.time_to))
event_time_to = date_to_int(event.time_to)
event_time_at = Timestamp(
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_to)), time_at=event_time_to
)
event_datapoint = Event(
name=event.name,
description=event.description,

View file

@ -29,7 +29,7 @@ async def main():
await cognee.prune.prune_system(metadata=True)
texts = await reading_temporal_data()
texts = texts[:10]
texts = texts[:5]
await cognee.add(texts)
await temporal_cognify()