feat: adds event and timestamp pydantic to datapoint methods

This commit is contained in:
hajdul88 2025-08-27 15:16:35 +02:00
parent a3cc1ebe2d
commit f5489f2027
3 changed files with 59 additions and 0 deletions

View file

@ -1,3 +1,5 @@
from .generate_node_id import generate_node_id
from .generate_node_name import generate_node_name
from .generate_edge_name import generate_edge_name
from .generate_event_datapoint import generate_event_datapoint
from .generate_timestamp_datapoint import generate_timestamp_datapoint

View file

@ -0,0 +1,30 @@
from cognee.modules.engine.models import Interval, Event
from cognee.modules.engine.utils.generate_timestamp_datapoint import generate_timestamp_datapoint
def generate_event_datapoint(event) -> Event:
"""Create an Event datapoint from an event model."""
# Base event data
event_data = {
"name": event.name,
"description": event.description,
"location": event.location,
}
# Create timestamps if they exist
time_from = generate_timestamp_datapoint(event.time_from) if event.time_from else None
time_to = generate_timestamp_datapoint(event.time_to) if event.time_to else None
# Add temporal information
if time_from and time_to:
event_data["during"] = Interval(time_from=time_from, time_to=time_to)
# Enrich description with temporal info
temporal_info = f"\n---\nTime data: {time_from.timestamp_str} to {time_to.timestamp_str}"
event_data["description"] = (event_data["description"] or "Event") + temporal_info
elif time_from or time_to:
timestamp = time_from or time_to
event_data["at"] = timestamp
# Enrich description with temporal info
temporal_info = f"\n---\nTime data: {timestamp.timestamp_str}"
event_data["description"] = (event_data["description"] or "Event") + temporal_info
return Event(**event_data)

View file

@ -0,0 +1,27 @@
from datetime import datetime, timezone
from cognee.modules.engine.models import Interval, Timestamp, Event
from cognee.modules.engine.utils import generate_node_id
def generate_timestamp_datapoint(ts: Timestamp) -> Timestamp:
"""Create a Timestamp datapoint from a Timestamp model."""
time_at = date_to_int(ts)
timestamp_str = (
f"{ts.year:04d}-{ts.month:02d}-{ts.day:02d} {ts.hour:02d}:{ts.minute:02d}:{ts.second:02d}"
)
return Timestamp(
id=generate_node_id(str(time_at)),
time_at=time_at,
year=ts.year,
month=ts.month,
day=ts.day,
hour=ts.hour,
minute=ts.minute,
second=ts.second,
timestamp_str=timestamp_str,
)
def date_to_int(ts: Timestamp) -> int:
"""Convert timestamp to integer milliseconds."""
dt = datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second, tzinfo=timezone.utc)
time = int(dt.timestamp() * 1000)
return time