feat: extract entities from events
This commit is contained in:
parent
6119ac08de
commit
0abfe6bba0
4 changed files with 266 additions and 118 deletions
126
cognee/temporal_poc/event_extraction.py
Normal file
126
cognee/temporal_poc/event_extraction.py
Normal file
|
|
@ -0,0 +1,126 @@
|
|||
import asyncio
|
||||
import uuid
|
||||
|
||||
from pydantic import BaseModel
|
||||
from typing import Type, List
|
||||
from uuid import uuid5
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from cognee.infrastructure.llm.get_llm_client import get_llm_client
|
||||
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
|
||||
from cognee.temporal_poc.models.models import EventList
|
||||
from cognee.temporal_poc.datapoints.datapoints import Interval, Timestamp, Event
|
||||
|
||||
|
||||
# Global system prompt for event extraction
|
||||
EVENT_EXTRACTION_SYSTEM_PROMPT = """
|
||||
For the purposes of building event-based knowledge graphs, you are tasked with extracting highly granular stream events from a text. The events are defined as follows:
|
||||
## Event Definition
|
||||
- Anything with a date or a timestamp is an event
|
||||
- Anything that took place in time (even if the time is unknown) is an event
|
||||
- Anything that lasted over a period of time, or happened in an instant is an event: from historical milestones (wars, presidencies, olympiads) to personal milestones (birth, death, employment, etc.), to mundane actions (a walk, a conversation, etc.)
|
||||
- **ANY action or verb represents an event** - this is the most important rule
|
||||
- Every single verb in the text corresponds to an event that must be extracted
|
||||
- This includes: thinking, feeling, seeing, hearing, moving, speaking, writing, reading, eating, sleeping, working, playing, studying, traveling, meeting, calling, texting, buying, selling, creating, destroying, building, breaking, starting, stopping, beginning, ending, etc.
|
||||
- Even the most mundane or obvious actions are events: "he walked", "she sat", "they talked", "I thought", "we waited"
|
||||
## Requirements
|
||||
- **Be extremely thorough** - extract EVERY event mentioned, no matter how small or obvious
|
||||
- **Timestamped first" - every time stamp, or date should have atleast one event
|
||||
- **Verbs/actions = one event** - After you are done with timestamped events -- every verb that is an action should have a corresponding event.
|
||||
- We expect long streams of events from any piece of text, easily reaching a hundred events
|
||||
- Granularity and richness of the stream is key to our success and is of utmost importance
|
||||
- Not all events will have timestamps, add timestamps only to known events
|
||||
- For events that were instantaneous, just attach the time_from or time_to property don't create both
|
||||
- **Do not skip any events** - if you're unsure whether something is an event, extract it anyway
|
||||
- **Quantity over filtering** - it's better to extract too many events than to miss any
|
||||
- **Descriptions** - Always include the event description together with entities (Who did what, what happened? What is the event?). If you can include the corresponding part from the text.
|
||||
## Output Format
|
||||
Your reply should be a JSON: list of dictionaries with the following structure:
|
||||
```python
|
||||
class Event(BaseModel):
|
||||
name: str [concise]
|
||||
description: Optional[str] = None
|
||||
time_from: Optional[Timestamp] = None
|
||||
time_to: Optional[Timestamp] = None
|
||||
location: Optional[str] = None
|
||||
```
|
||||
"""
|
||||
|
||||
|
||||
def date_to_int(ts: Timestamp) -> int:
|
||||
"""Convert timestamp to integer milliseconds."""
|
||||
dt = datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second, tzinfo=timezone.utc)
|
||||
time = int(dt.timestamp() * 1000)
|
||||
return time
|
||||
|
||||
|
||||
async def extract_event_graph(
|
||||
content: str, response_model: Type[BaseModel], system_prompt: str = None
|
||||
):
|
||||
"""Extract event graph from content using LLM."""
|
||||
llm_client = get_llm_client()
|
||||
|
||||
if system_prompt is None:
|
||||
system_prompt = EVENT_EXTRACTION_SYSTEM_PROMPT
|
||||
|
||||
content_graph = await llm_client.acreate_structured_output(
|
||||
content, system_prompt, response_model
|
||||
)
|
||||
|
||||
return content_graph
|
||||
|
||||
|
||||
async def extract_events_and_entities(data_chunks: List[DocumentChunk]) -> List[DocumentChunk]:
|
||||
"""Extracts events and entities from a chunk of documents."""
|
||||
events = await asyncio.gather(
|
||||
*[extract_event_graph(chunk.text, EventList) for chunk in data_chunks]
|
||||
)
|
||||
|
||||
for data_chunk, event_list in zip(data_chunks, events):
|
||||
for event in event_list.events:
|
||||
if event.time_from and event.time_to:
|
||||
event_time_from = date_to_int(event.time_from)
|
||||
event_time_to = date_to_int(event.time_to)
|
||||
timestamp_time_from = Timestamp(
|
||||
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_from)), time_at=event_time_from
|
||||
)
|
||||
timestamp_time_to = Timestamp(
|
||||
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_to)), time_at=event_time_to
|
||||
)
|
||||
event_interval = Interval(time_from=timestamp_time_from, time_to=timestamp_time_to)
|
||||
event_datapoint = Event(
|
||||
name=event.name,
|
||||
description=event.description,
|
||||
during=event_interval,
|
||||
location=event.location,
|
||||
)
|
||||
elif event.time_from:
|
||||
event_time_from = date_to_int(event.time_from)
|
||||
event_time_at = Timestamp(
|
||||
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_from)), time_at=event_time_from
|
||||
)
|
||||
event_datapoint = Event(
|
||||
name=event.name,
|
||||
description=event.description,
|
||||
at=event_time_at,
|
||||
location=event.location,
|
||||
)
|
||||
elif event.time_to:
|
||||
event_time_to = date_to_int(event.time_to)
|
||||
event_time_at = Timestamp(
|
||||
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_to)), time_at=event_time_to
|
||||
)
|
||||
event_datapoint = Event(
|
||||
name=event.name,
|
||||
description=event.description,
|
||||
at=event_time_at,
|
||||
location=event.location,
|
||||
)
|
||||
else:
|
||||
event_datapoint = Event(
|
||||
name=event.name, description=event.description, location=event.location
|
||||
)
|
||||
|
||||
data_chunk.contains.append(event_datapoint)
|
||||
|
||||
return data_chunks
|
||||
119
cognee/temporal_poc/event_knowledge_graph.py
Normal file
119
cognee/temporal_poc/event_knowledge_graph.py
Normal file
|
|
@ -0,0 +1,119 @@
|
|||
from typing import List, Type
|
||||
from pydantic import BaseModel
|
||||
|
||||
from cognee.infrastructure.llm.get_llm_client import get_llm_client
|
||||
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
|
||||
from cognee.modules.engine.models.Entity import Entity
|
||||
from cognee.modules.engine.models.EntityType import EntityType
|
||||
from cognee.temporal_poc.models.models import EventEntityList
|
||||
|
||||
ENTITY_EXTRACTION_SYSTEM_PROMPT = """For the purposes of building event-based knowledge graphs, you are tasked with extracting highly granular entities from events text. An entity is any distinct, identifiable thing, person, place, object, organization, concept, or phenomenon that can be named, referenced, or described in the event context. This includes but is not limited to: people, places, objects, organizations, concepts, events, processes, states, conditions, properties, attributes, roles, functions, and any other meaningful referents that contribute to understanding the event.
|
||||
**Temporal Entity Exclusion**: Do not extract timestamp-like entities (dates, times, durations) as these are handled separately. However, extract named temporal periods, eras, historical epochs, and culturally significant time references
|
||||
## Input Format
|
||||
The input will be a list of dictionaries, each containing:
|
||||
- `event_name`: The name of the event
|
||||
- `description`: The description of the event
|
||||
|
||||
## Task
|
||||
For each event, extract all entities mentioned in the event description and determine their relationship to the event.
|
||||
|
||||
## Output Format
|
||||
Return the same enriched JSON with an additional key in each dictionary: `attributes`.
|
||||
|
||||
The `attributes` should be a list of dictionaries, each containing:
|
||||
- `entity`: The name of the entity
|
||||
- `entity_type`: The type/category of the entity (person, place, organization, object, concept, etc.)
|
||||
- `relationship`: A concise description of how the entity relates to the event
|
||||
|
||||
## Requirements
|
||||
- **Be extremely thorough** - extract EVERY non-temporal entity mentioned, no matter how small, obvious, or seemingly insignificant
|
||||
- **After you are done with obvious entities, every noun, pronoun, proper noun, and named reference = one entity**
|
||||
- We expect rich entity networks from any event, easily reaching a dozens of entities per event
|
||||
- Granularity and richness of the entity extraction is key to our success and is of utmost importance
|
||||
- **Do not skip any entities** - if you're unsure whether something is an entity, extract it anyway
|
||||
- Use the event name for context when determining relationships
|
||||
- Relationships should be technical with one or at most two words. If two words, use underscore camelcase style
|
||||
- Relationships could imply general meaning like: subject, object, participant, recipient, agent, instrument, tool, source, cause, effect, purpose, manner, resource, etc.
|
||||
- You can combine two words to form a relationship name: subject_role, previous_owner, etc.
|
||||
- Focus on how the entity specifically relates to the event
|
||||
"""
|
||||
|
||||
|
||||
async def extract_event_entities(
|
||||
content: str, response_model: Type[BaseModel], system_prompt: str = None
|
||||
):
|
||||
"""Extract event entities from content using LLM."""
|
||||
llm_client = get_llm_client()
|
||||
|
||||
if system_prompt is None:
|
||||
system_prompt = ENTITY_EXTRACTION_SYSTEM_PROMPT
|
||||
|
||||
content_graph = await llm_client.acreate_structured_output(
|
||||
content, system_prompt, response_model
|
||||
)
|
||||
|
||||
return content_graph
|
||||
|
||||
|
||||
async def extract_event_knowledge_graph(data_chunks: List[DocumentChunk]) -> List[DocumentChunk]:
|
||||
"""Extract events from chunks and process them for entity extraction."""
|
||||
from cognee.temporal_poc.datapoints.datapoints import Event
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
# Extract events from chunks - create a list of lists
|
||||
chunk_events = [
|
||||
[
|
||||
{"event_name": item.name, "description": item.description or ""}
|
||||
for item in chunk.contains
|
||||
if isinstance(item, Event)
|
||||
]
|
||||
for chunk in data_chunks
|
||||
]
|
||||
|
||||
# Convert each chunk's events to JSON and gather all calls
|
||||
events_jsons = [json.dumps(events) if events else "[]" for events in chunk_events]
|
||||
|
||||
# Extract entities from all chunks concurrently
|
||||
entity_results = await asyncio.gather(
|
||||
*[extract_event_entities(events_json, EventEntityList) for events_json in events_jsons]
|
||||
)
|
||||
|
||||
# Process entity results and enrich chunks
|
||||
for chunk, entity_result in zip(data_chunks, entity_results):
|
||||
add_entities_to_chunk(chunk, entity_result)
|
||||
|
||||
return data_chunks
|
||||
|
||||
|
||||
def add_entities_to_chunk(chunk: DocumentChunk, entity_result) -> None:
|
||||
"""Add entities and entity types to a chunk."""
|
||||
entity_types = {} # Cache to avoid duplicates
|
||||
for event_with_entities in entity_result.events:
|
||||
for attribute in event_with_entities.attributes:
|
||||
entity_type = get_or_create_entity_type(entity_types, attribute.entity_type)
|
||||
create_entity(chunk, attribute, entity_type)
|
||||
|
||||
|
||||
def get_or_create_entity_type(entity_types: dict, entity_type_name: str) -> EntityType:
|
||||
"""Get existing entity type or create new one."""
|
||||
if entity_type_name not in entity_types:
|
||||
entity_type = EntityType(name=entity_type_name, description=f"Type for {entity_type_name}")
|
||||
entity_types[entity_type_name] = entity_type
|
||||
|
||||
return entity_types[entity_type_name]
|
||||
|
||||
|
||||
def create_entity(chunk: DocumentChunk, attribute, entity_type: EntityType) -> None:
|
||||
"""Create and add entity to chunk."""
|
||||
entity = Entity(
|
||||
name=attribute.entity,
|
||||
is_a=entity_type,
|
||||
description=f"Entity {attribute.entity} of type {attribute.entity_type}",
|
||||
)
|
||||
chunk.contains.append(entity)
|
||||
|
||||
|
||||
async def process_event_knowledge_graph(data_chunks: List[DocumentChunk]) -> List[DocumentChunk]:
|
||||
"""Process document chunks for event knowledge graph construction."""
|
||||
return await extract_event_knowledge_graph(data_chunks)
|
||||
|
|
@ -27,3 +27,19 @@ class Event(BaseModel):
|
|||
|
||||
class EventList(BaseModel):
|
||||
events: List[Event]
|
||||
|
||||
|
||||
class EntityAttribute(BaseModel):
|
||||
entity: str
|
||||
entity_type: str
|
||||
relationship: str
|
||||
|
||||
|
||||
class EventWithEntities(BaseModel):
|
||||
event_name: str
|
||||
description: Optional[str] = None
|
||||
attributes: List[EntityAttribute] = []
|
||||
|
||||
|
||||
class EventEntityList(BaseModel):
|
||||
events: List[EventWithEntities]
|
||||
|
|
|
|||
|
|
@ -1,11 +1,5 @@
|
|||
import asyncio
|
||||
import uuid
|
||||
|
||||
from pydantic import BaseModel
|
||||
from typing import Union, Optional, List, Type
|
||||
from uuid import UUID, uuid5
|
||||
from datetime import datetime, timezone
|
||||
from cognee.infrastructure.llm.get_llm_client import get_llm_client
|
||||
from typing import Union, Optional, List
|
||||
from uuid import UUID
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.shared.data_models import KnowledgeGraph
|
||||
from cognee.infrastructure.llm import get_max_chunk_tokens, get_llm_config
|
||||
|
|
@ -14,7 +8,6 @@ from cognee.api.v1.cognify.cognify import run_cognify_blocking
|
|||
from cognee.modules.pipelines.tasks.task import Task
|
||||
from cognee.modules.chunking.TextChunker import TextChunker
|
||||
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
|
||||
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
|
||||
|
||||
from cognee.modules.users.models import User
|
||||
from cognee.tasks.documents import (
|
||||
|
|
@ -25,119 +18,12 @@ from cognee.tasks.documents import (
|
|||
from cognee.tasks.graph import extract_graph_from_data
|
||||
from cognee.tasks.storage import add_data_points
|
||||
from cognee.tasks.summarization import summarize_text
|
||||
from cognee.temporal_poc.models.models import EventList
|
||||
from cognee.temporal_poc.datapoints.datapoints import Interval, Timestamp, Event
|
||||
from cognee.temporal_poc.event_extraction import extract_events_and_entities
|
||||
from cognee.temporal_poc.event_knowledge_graph import process_event_knowledge_graph
|
||||
|
||||
logger = get_logger("temporal_cognify")
|
||||
|
||||
|
||||
def date_to_int(ts: Timestamp) -> int:
|
||||
dt = datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second, tzinfo=timezone.utc)
|
||||
|
||||
time = int(dt.timestamp() * 1000)
|
||||
return time
|
||||
|
||||
|
||||
async def extract_event_graph(content: str, response_model: Type[BaseModel]):
|
||||
llm_client = get_llm_client()
|
||||
|
||||
system_prompt = """
|
||||
For the purposes of building event-based knowledge graphs, you are tasked with extracting highly granular stream events from a text. The events are defined as follows:
|
||||
## Event Definition
|
||||
- Anything with a date or a timestamp is an event
|
||||
- Anything that took place in time (even if the time is unknown) is an event
|
||||
- Anything that lasted over a period of time, or happened in an instant is an event: from historical milestones (wars, presidencies, olympiads) to personal milestones (birth, death, employment, etc.), to mundane actions (a walk, a conversation, etc.)
|
||||
- **ANY action or verb represents an event** - this is the most important rule
|
||||
- Every single verb in the text corresponds to an event that must be extracted
|
||||
- This includes: thinking, feeling, seeing, hearing, moving, speaking, writing, reading, eating, sleeping, working, playing, studying, traveling, meeting, calling, texting, buying, selling, creating, destroying, building, breaking, starting, stopping, beginning, ending, etc.
|
||||
- Even the most mundane or obvious actions are events: "he walked", "she sat", "they talked", "I thought", "we waited"
|
||||
## Requirements
|
||||
- **Be extremely thorough** - extract EVERY event mentioned, no matter how small or obvious
|
||||
- **Timestamped first" - every time stamp, or date should have atleast one event
|
||||
- **Verbs/actions = one event** - After you are done with timestamped events -- every verb that is an action should have a corresponding event.
|
||||
- We expect long streams of events from any piece of text, easily reaching a hundred events
|
||||
- Granularity and richness of the stream is key to our success and is of utmost importance
|
||||
- Not all events will have timestamps, add timestamps only to known events
|
||||
- For events that were instantaneous, just attach the time_from or time_to property don't create both
|
||||
- **Do not skip any events** - if you're unsure whether something is an event, extract it anyway
|
||||
- **Quantity over filtering** - it's better to extract too many events than to miss any
|
||||
- **Descriptions** - Always include the event description together with entities (Who did what, what happened? What is the event?). If you can include the corresponding part from the text.
|
||||
## Output Format
|
||||
Your reply should be a JSON: list of dictionaries with the following structure:
|
||||
```python
|
||||
class Event(BaseModel):
|
||||
name: str [concise]
|
||||
description: Optional[str] = None
|
||||
time_from: Optional[Timestamp] = None
|
||||
time_to: Optional[Timestamp] = None
|
||||
location: Optional[str] = None
|
||||
```
|
||||
"""
|
||||
|
||||
content_graph = await llm_client.acreate_structured_output(
|
||||
content, system_prompt, response_model
|
||||
)
|
||||
|
||||
return content_graph
|
||||
|
||||
|
||||
async def extract_events_and_entities(data_chunks: List[DocumentChunk]) -> List[DocumentChunk]:
|
||||
"""Extracts events and entities from a chunk of documents."""
|
||||
# data_chunks = data_chunks + data_chunks
|
||||
|
||||
events = await asyncio.gather(
|
||||
*[extract_event_graph(chunk.text, EventList) for chunk in data_chunks]
|
||||
)
|
||||
for data_chunk, event_list in zip(data_chunks, events):
|
||||
for event in event_list.events:
|
||||
if event.time_from and event.time_to:
|
||||
event_time_from = date_to_int(event.time_from)
|
||||
event_time_to = date_to_int(event.time_to)
|
||||
timestamp_time_from = Timestamp(
|
||||
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_from)), time_at=event_time_from
|
||||
)
|
||||
timestamp_time_to = Timestamp(
|
||||
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_to)), time_at=event_time_to
|
||||
)
|
||||
event_interval = Interval(time_from=timestamp_time_from, time_to=timestamp_time_to)
|
||||
event_datapoint = Event(
|
||||
name=event.name,
|
||||
description=event.description,
|
||||
during=event_interval,
|
||||
location=event.location,
|
||||
)
|
||||
elif event.time_from:
|
||||
event_time_from = date_to_int(event.time_from)
|
||||
event_time_at = Timestamp(
|
||||
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_from)), time_at=event_time_from
|
||||
)
|
||||
event_datapoint = Event(
|
||||
name=event.name,
|
||||
description=event.description,
|
||||
at=event_time_at,
|
||||
location=event.location,
|
||||
)
|
||||
elif event.time_to:
|
||||
event_time_to = date_to_int(event.time_to)
|
||||
event_time_at = Timestamp(
|
||||
id=uuid5(uuid.NAMESPACE_OID, name=str(event_time_to)), time_at=event_time_to
|
||||
)
|
||||
event_datapoint = Event(
|
||||
name=event.name,
|
||||
description=event.description,
|
||||
at=event_time_at,
|
||||
location=event.location,
|
||||
)
|
||||
else:
|
||||
event_datapoint = Event(
|
||||
name=event.name, description=event.description, location=event.location
|
||||
)
|
||||
|
||||
data_chunk.contains.append(event_datapoint)
|
||||
|
||||
return data_chunks
|
||||
|
||||
|
||||
async def get_temporal_tasks(
|
||||
user: User = None, chunker=TextChunker, chunk_size: int = None
|
||||
) -> list[Task]:
|
||||
|
|
@ -150,6 +36,7 @@ async def get_temporal_tasks(
|
|||
chunker=chunker,
|
||||
),
|
||||
Task(extract_events_and_entities, task_config={"chunk_size": 10}),
|
||||
Task(process_event_knowledge_graph),
|
||||
Task(add_data_points, task_config={"batch_size": 10}),
|
||||
]
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue