feature: adds time graph option to cognify (#1307)

<!-- .github/pull_request_template.md -->

## Description
feature: adds time graph option to cognify

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
Vasilije 2025-09-02 09:17:29 +02:00 committed by GitHub
commit e52f9c6a20
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 1752 additions and 10 deletions

View file

@ -0,0 +1,224 @@
name: Temporal Graph Tests
permissions:
contents: read
on:
workflow_call:
inputs:
databases:
required: false
type: string
default: "all"
description: "Which vector databases to test (comma-separated list or 'all')"
jobs:
run_temporal_graph_kuzu_lance_sqlite:
name: Temporal Graph test Kuzu (lancedb + sqlite)
runs-on: ubuntu-22.04
if: ${{ inputs.databases == 'all' || contains(inputs.databases, 'kuzu/lance/sqlite') }}
steps:
- name: Check out
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: ${{ inputs.python-version }}
- name: Dependencies already installed
run: echo "Dependencies already installed in setup"
- name: Run Temporal Graph with Kuzu (lancedb + sqlite)
env:
ENV: 'dev'
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
GRAPH_DATABASE_PROVIDER: 'kuzu'
VECTOR_DB_PROVIDER: 'lancedb'
DB_PROVIDER: 'sqlite'
run: uv run python ./cognee/tests/test_temporal_graph.py
run_temporal_graph_neo4j_lance_sqlite:
name: Temporal Graph test Neo4j (lancedb + sqlite)
runs-on: ubuntu-22.04
if: ${{ inputs.databases == 'all' || contains(inputs.databases, 'neo4j/lance/sqlite') }}
services:
neo4j:
image: neo4j:5.11
env:
NEO4J_AUTH: neo4j/pleaseletmein
NEO4J_PLUGINS: '["apoc","graph-data-science"]'
ports:
- 7474:7474
- 7687:7687
options: >-
--health-cmd="cypher-shell -u neo4j -p pleaseletmein 'RETURN 1'"
--health-interval=10s
--health-timeout=5s
--health-retries=5
steps:
- name: Check out
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: ${{ inputs.python-version }}
- name: Dependencies already installed
run: echo "Dependencies already installed in setup"
- name: Run Temporal Graph with Neo4j (lancedb + sqlite)
env:
ENV: 'dev'
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
GRAPH_DATABASE_PROVIDER: 'neo4j'
VECTOR_DB_PROVIDER: 'lancedb'
DB_PROVIDER: 'sqlite'
GRAPH_DATABASE_URL: bolt://localhost:7687
GRAPH_DATABASE_USERNAME: neo4j
GRAPH_DATABASE_PASSWORD: pleaseletmein
run: uv run python ./cognee/tests/test_temporal_graph.py
run_temporal_graph_kuzu_postgres_pgvector:
name: Temporal Graph test Kuzu (postgres + pgvector)
runs-on: ubuntu-22.04
if: ${{ inputs.databases == 'all' || contains(inputs.databases, 'kuzu/pgvector/postgres') }}
services:
postgres:
image: pgvector/pgvector:pg17
env:
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: cognee_db
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- name: Check out
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: ${{ inputs.python-version }}
extra-dependencies: "postgres"
- name: Dependencies already installed
run: echo "Dependencies already installed in setup"
- name: Run Temporal Graph with Kuzu (postgres + pgvector)
env:
ENV: dev
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
GRAPH_DATABASE_PROVIDER: 'kuzu'
VECTOR_DB_PROVIDER: 'pgvector'
DB_PROVIDER: 'postgres'
DB_NAME: 'cognee_db'
DB_HOST: '127.0.0.1'
DB_PORT: 5432
DB_USERNAME: cognee
DB_PASSWORD: cognee
run: uv run python ./cognee/tests/test_temporal_graph.py
run_temporal_graph_neo4j_postgres_pgvector:
name: Temporal Graph test Neo4j (postgres + pgvector)
runs-on: ubuntu-22.04
if: ${{ inputs.databases == 'all' || contains(inputs.databases, 'neo4j/pgvector/postgres') }}
services:
neo4j:
image: neo4j:5.11
env:
NEO4J_AUTH: neo4j/pleaseletmein
NEO4J_PLUGINS: '["apoc","graph-data-science"]'
ports:
- 7474:7474
- 7687:7687
options: >-
--health-cmd="cypher-shell -u neo4j -p pleaseletmein 'RETURN 1'"
--health-interval=10s
--health-timeout=5s
--health-retries=5
postgres:
image: pgvector/pgvector:pg17
env:
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: cognee_db
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries=5
steps:
- name: Check out
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: ${{ inputs.python-version }}
extra-dependencies: "postgres"
- name: Dependencies already installed
run: echo "Dependencies already installed in setup"
- name: Run Temporal Graph with Neo4j (postgres + pgvector)
env:
ENV: dev
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
GRAPH_DATABASE_PROVIDER: 'neo4j'
VECTOR_DB_PROVIDER: 'pgvector'
DB_PROVIDER: 'postgres'
GRAPH_DATABASE_URL: bolt://localhost:7687
GRAPH_DATABASE_USERNAME: neo4j
GRAPH_DATABASE_PASSWORD: pleaseletmein
DB_NAME: cognee_db
DB_HOST: 127.0.0.1
DB_PORT: 5432
DB_USERNAME: cognee
DB_PASSWORD: cognee
run: uv run python ./cognee/tests/test_temporal_graph.py

View file

@ -50,6 +50,12 @@ jobs:
uses: ./.github/workflows/graph_db_tests.yml
secrets: inherit
temporal-graph-tests:
name: Temporal Graph Test
needs: [ basic-tests, e2e-tests, cli-tests, graph-db-tests ]
uses: ./.github/workflows/temporal_graph_tests.yml
secrets: inherit
search-db-tests:
name: Search Test on Different DBs
needs: [basic-tests, e2e-tests, cli-tests, graph-db-tests]

View file

@ -22,6 +22,11 @@ from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.storage import add_data_points
from cognee.tasks.summarization import summarize_text
from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor
from cognee.tasks.temporal_graph.extract_events_and_entities import extract_events_and_timestamps
from cognee.tasks.temporal_graph.extract_knowledge_graph_from_events import (
extract_knowledge_graph_from_events,
)
logger = get_logger("cognify")
@ -40,6 +45,7 @@ async def cognify(
run_in_background: bool = False,
incremental_loading: bool = True,
custom_prompt: Optional[str] = None,
temporal_cognify: bool = False,
):
"""
Transform ingested data into a structured knowledge graph.
@ -182,9 +188,12 @@ async def cognify(
- LLM_RATE_LIMIT_ENABLED: Enable rate limiting (default: False)
- LLM_RATE_LIMIT_REQUESTS: Max requests per interval (default: 60)
"""
tasks = await get_default_tasks(
user, graph_model, chunker, chunk_size, ontology_file_path, custom_prompt
)
if temporal_cognify:
tasks = await get_temporal_tasks(user, chunker, chunk_size)
else:
tasks = await get_default_tasks(
user, graph_model, chunker, chunk_size, ontology_file_path, custom_prompt
)
# By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for
pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background)
@ -233,3 +242,41 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
]
return default_tasks
async def get_temporal_tasks(
user: User = None, chunker=TextChunker, chunk_size: int = None
) -> list[Task]:
"""
Builds and returns a list of temporal processing tasks to be executed in sequence.
The pipeline includes:
1. Document classification.
2. Dataset permission checks (requires "write" access).
3. Document chunking with a specified or default chunk size.
4. Event and timestamp extraction from chunks.
5. Knowledge graph extraction from events.
6. Batched insertion of data points.
Args:
user (User, optional): The user requesting task execution, used for permission checks.
chunker (Callable, optional): A text chunking function/class to split documents. Defaults to TextChunker.
chunk_size (int, optional): Maximum token size per chunk. If not provided, uses system default.
Returns:
list[Task]: A list of Task objects representing the temporal processing pipeline.
"""
temporal_tasks = [
Task(classify_documents),
Task(check_permissions_on_dataset, user=user, permissions=["write"]),
Task(
extract_chunks_from_documents,
max_chunk_size=chunk_size or get_max_chunk_tokens(),
chunker=chunker,
),
Task(extract_events_and_timestamps, task_config={"chunk_size": 10}),
Task(extract_knowledge_graph_from_events),
Task(add_data_points, task_config={"batch_size": 10}),
]
return temporal_tasks

View file

@ -21,6 +21,8 @@ from cognee.infrastructure.databases.graph.graph_db_interface import (
)
from cognee.infrastructure.engine import DataPoint
from cognee.modules.storage.utils import JSONEncoder
from cognee.modules.engine.utils.generate_timestamp_datapoint import date_to_int
from cognee.tasks.temporal_graph.models import Timestamp
logger = get_logger()
@ -106,6 +108,18 @@ class KuzuAdapter(GraphDBInterface):
self.db.init_database()
self.connection = Connection(self.db)
try:
self.connection.execute("INSTALL JSON;")
except Exception as e:
logger.info(f"JSON extension already installed or not needed: {e}")
try:
self.connection.execute("LOAD EXTENSION JSON;")
logger.info("Loaded JSON extension")
except Exception as e:
logger.info(f"JSON extension already loaded or unavailable: {e}")
# Create node table with essential fields and timestamp
self.connection.execute("""
CREATE NODE TABLE IF NOT EXISTS Node(
@ -1693,3 +1707,124 @@ class KuzuAdapter(GraphDBInterface):
SET r.properties = $props
"""
await self.query(update_query, {"node_id": node_id, "props": new_props})
async def collect_events(self, ids: List[str]) -> Any:
"""
Collect all Event-type nodes reachable within 1..2 hops
from the given node IDs.
Args:
graph_engine: Object exposing an async .query(str) -> Any
ids: List of node IDs (strings)
Returns:
List of events
"""
event_collection_cypher = """UNWIND [{quoted}] AS uid
MATCH (start {{id: uid}})
MATCH (start)-[*1..2]-(event)
WHERE event.type = 'Event'
WITH DISTINCT event
RETURN collect(event) AS events;
"""
query = event_collection_cypher.format(quoted=ids)
result = await self.query(query)
events = []
for node in result[0][0]:
props = json.loads(node["properties"])
event = {
"id": node["id"],
"name": node["name"],
"description": props.get("description"),
}
if props.get("location"):
event["location"] = props["location"]
events.append(event)
return [{"events": events}]
async def collect_time_ids(
self,
time_from: Optional[Timestamp] = None,
time_to: Optional[Timestamp] = None,
) -> str:
"""
Collect IDs of Timestamp nodes between time_from and time_to.
Args:
graph_engine: Object exposing an async .query(query, params) -> list[dict]
time_from: Lower bound int (inclusive), optional
time_to: Upper bound int (inclusive), optional
Returns:
A string of quoted IDs: "'id1', 'id2', 'id3'"
(ready for use in a Cypher UNWIND clause).
"""
ids: List[str] = []
if time_from and time_to:
time_from = date_to_int(time_from)
time_to = date_to_int(time_to)
cypher = f"""
MATCH (n:Node)
WHERE n.type = 'Timestamp'
// Extract time_at from the JSON string and cast to INT64
WITH n, json_extract(n.properties, '$.time_at') AS t_str
WITH n,
CASE
WHEN t_str IS NULL OR t_str = '' THEN NULL
ELSE CAST(t_str AS INT64)
END AS t
WHERE t >= {time_from}
AND t <= {time_to}
RETURN n.id as id
"""
elif time_from:
time_from = date_to_int(time_from)
cypher = f"""
MATCH (n:Node)
WHERE n.type = 'Timestamp'
// Extract time_at from the JSON string and cast to INT64
WITH n, json_extract(n.properties, '$.time_at') AS t_str
WITH n,
CASE
WHEN t_str IS NULL OR t_str = '' THEN NULL
ELSE CAST(t_str AS INT64)
END AS t
WHERE t >= {time_from}
RETURN n.id as id
"""
elif time_to:
time_to = date_to_int(time_to)
cypher = f"""
MATCH (n:Node)
WHERE n.type = 'Timestamp'
// Extract time_at from the JSON string and cast to INT64
WITH n, json_extract(n.properties, '$.time_at') AS t_str
WITH n,
CASE
WHEN t_str IS NULL OR t_str = '' THEN NULL
ELSE CAST(t_str AS INT64)
END AS t
WHERE t <= {time_to}
RETURN n.id as id
"""
else:
return ids
time_nodes = await self.query(cypher)
time_ids_list = [item[0] for item in time_nodes]
return ", ".join(f"'{uid}'" for uid in time_ids_list)

View file

@ -11,6 +11,8 @@ from contextlib import asynccontextmanager
from typing import Optional, Any, List, Dict, Type, Tuple
from cognee.infrastructure.engine import DataPoint
from cognee.modules.engine.utils.generate_timestamp_datapoint import date_to_int
from cognee.tasks.temporal_graph.models import Timestamp
from cognee.shared.logging_utils import get_logger, ERROR
from cognee.infrastructure.databases.graph.graph_db_interface import (
GraphDBInterface,
@ -1371,3 +1373,90 @@ class Neo4jAdapter(GraphDBInterface):
query,
params={"weight": float(weight), "node_ids": list(node_ids)},
)
async def collect_events(self, ids: List[str]) -> Any:
"""
Collect all Event-type nodes reachable within 1..2 hops
from the given node IDs.
Args:
graph_engine: Object exposing an async .query(str) -> Any
ids: List of node IDs (strings)
Returns:
List of events
"""
event_collection_cypher = """UNWIND [{quoted}] AS uid
MATCH (start {{id: uid}})
MATCH (start)-[*1..2]-(event)
WHERE event.type = 'Event'
WITH DISTINCT event
RETURN collect(event) AS events;
"""
query = event_collection_cypher.format(quoted=ids)
return await self.query(query)
async def collect_time_ids(
self,
time_from: Optional[Timestamp] = None,
time_to: Optional[Timestamp] = None,
) -> str:
"""
Collect IDs of Timestamp nodes between time_from and time_to.
Args:
graph_engine: Object exposing an async .query(query, params) -> list[dict]
time_from: Lower bound int (inclusive), optional
time_to: Upper bound int (inclusive), optional
Returns:
A string of quoted IDs: "'id1', 'id2', 'id3'"
(ready for use in a Cypher UNWIND clause).
"""
ids: List[str] = []
if time_from and time_to:
time_from = date_to_int(time_from)
time_to = date_to_int(time_to)
cypher = """
MATCH (n)
WHERE n.type = 'Timestamp'
AND n.time_at >= $time_from
AND n.time_at <= $time_to
RETURN n.id AS id
"""
params = {"time_from": time_from, "time_to": time_to}
elif time_from:
time_from = date_to_int(time_from)
cypher = """
MATCH (n)
WHERE n.type = 'Timestamp'
AND n.time_at >= $time_from
RETURN n.id AS id
"""
params = {"time_from": time_from}
elif time_to:
time_to = date_to_int(time_to)
cypher = """
MATCH (n)
WHERE n.type = 'Timestamp'
AND n.time_at <= $time_to
RETURN n.id AS id
"""
params = {"time_to": time_to}
else:
return ids
time_nodes = await self.query(cypher, params)
time_ids_list = [item["id"] for item in time_nodes if "id" in item]
return ", ".join(f"'{uid}'" for uid in time_ids_list)

View file

@ -144,3 +144,21 @@ class LLMGateway:
)
return extract_summary(content=content, response_model=response_model)
@staticmethod
def extract_event_graph(content: str, response_model: Type[BaseModel]) -> Coroutine:
# TODO: Add BAML version of category and extraction and update function (consulted with Igor)
from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.extraction import (
extract_event_graph,
)
return extract_event_graph(content=content, response_model=response_model)
@staticmethod
def extract_event_entities(content: str, response_model: Type[BaseModel]) -> Coroutine:
# TODO: Add BAML version of category and extraction and update function (consulted with Igor)
from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.extraction import (
extract_event_entities,
)
return extract_event_entities(content=content, response_model=response_model)

View file

@ -52,6 +52,8 @@ class LLMConfig(BaseSettings):
transcription_model: str = "whisper-1"
graph_prompt_path: str = "generate_graph_prompt.txt"
temporal_graph_prompt_path: str = "generate_event_graph_prompt.txt"
event_entity_prompt_path: str = "generate_event_entity_prompt.txt"
llm_rate_limit_enabled: bool = False
llm_rate_limit_requests: int = 60
llm_rate_limit_interval: int = 60 # in seconds (default is 60 requests per minute)

View file

@ -0,0 +1,15 @@
For the purposes of identifying timestamps in a query, you are tasked with extracting relevant timestamps from the query.
## Timestamp requirements
- If the query contains interval extrack both starts_at and ends_at properties
- If the query contains an instantaneous timestamp, starts_at and ends_at should be the same
- If the query its open-ended (before 2009 or after 2009), the corresponding non defined end of the time should be none
-For example: "before 2009" -- starts_at: None, ends_at: 2009 or "after 2009" -- starts_at: 2009, ends_at: None
- Put always the data that comes first in time as starts_at and the timestamps that comes second in time as ends_at
- If starts_at or ends_at cannot be extracted both of them has to be None
## Output Format
Your reply should be a JSON: list of dictionaries with the following structure:
```python
class QueryInterval(BaseModel):
starts_at: Optional[Timestamp] = None
ends_at: Optional[Timestamp] = None
```

View file

@ -0,0 +1,25 @@
For the purposes of building event-based knowledge graphs, you are tasked with extracting highly granular entities from events text. An entity is any distinct, identifiable thing, person, place, object, organization, concept, or phenomenon that can be named, referenced, or described in the event context. This includes but is not limited to: people, places, objects, organizations, concepts, events, processes, states, conditions, properties, attributes, roles, functions, and any other meaningful referents that contribute to understanding the event.
**Temporal Entity Exclusion**: Do not extract timestamp-like entities (dates, times, durations) as these are handled separately. However, extract named temporal periods, eras, historical epochs, and culturally significant time references
## Input Format
The input will be a list of dictionaries, each containing:
- `event_name`: The name of the event
- `description`: The description of the event
## Task
For each event, extract all entities mentioned in the event description and determine their relationship to the event.
## Output Format
Return the same enriched JSON with an additional key in each dictionary: `attributes`.
The `attributes` should be a list of dictionaries, each containing:
- `entity`: The name of the entity
- `entity_type`: The type/category of the entity (person, place, organization, object, concept, etc.)
- `relationship`: A concise description of how the entity relates to the event
## Requirements
- **Be extremely thorough** - extract EVERY non-temporal entity mentioned, no matter how small, obvious, or seemingly insignificant
- **After you are done with obvious entities, every noun, pronoun, proper noun, and named reference = one entity**
- We expect rich entity networks from any event, easily reaching a dozens of entities per event
- Granularity and richness of the entity extraction is key to our success and is of utmost importance
- **Do not skip any entities** - if you're unsure whether something is an entity, extract it anyway
- Use the event name for context when determining relationships
- Relationships should be technical with one or at most two words. If two words, use underscore camelcase style
- Relationships could imply general meaning like: subject, object, participant, recipient, agent, instrument, tool, source, cause, effect, purpose, manner, resource, etc.
- You can combine two words to form a relationship name: subject_role, previous_owner, etc.
- Focus on how the entity specifically relates to the event

View file

@ -0,0 +1,30 @@
For the purposes of building event-based knowledge graphs, you are tasked with extracting highly granular stream events from a text. The events are defined as follows:
## Event Definition
- Anything with a date or a timestamp is an event
- Anything that took place in time (even if the time is unknown) is an event
- Anything that lasted over a period of time, or happened in an instant is an event: from historical milestones (wars, presidencies, olympiads) to personal milestones (birth, death, employment, etc.), to mundane actions (a walk, a conversation, etc.)
- **ANY action or verb represents an event** - this is the most important rule
- Every single verb in the text corresponds to an event that must be extracted
- This includes: thinking, feeling, seeing, hearing, moving, speaking, writing, reading, eating, sleeping, working, playing, studying, traveling, meeting, calling, texting, buying, selling, creating, destroying, building, breaking, starting, stopping, beginning, ending, etc.
- Even the most mundane or obvious actions are events: "he walked", "she sat", "they talked", "I thought", "we waited"
## Requirements
- **Be extremely thorough** - extract EVERY event mentioned, no matter how small or obvious
- **Timestamped first" - every time stamp, or date should have atleast one event
- **Verbs/actions = one event** - After you are done with timestamped events -- every verb that is an action should have a corresponding event.
- We expect long streams of events from any piece of text, easily reaching a hundred events
- Granularity and richness of the stream is key to our success and is of utmost importance
- Not all events will have timestamps, add timestamps only to known events
- For events that were instantaneous, just attach the time_from or time_to property don't create both
- **Do not skip any events** - if you're unsure whether something is an event, extract it anyway
- **Quantity over filtering** - it's better to extract too many events than to miss any
- **Descriptions** - Always include the event description together with entities (Who did what, what happened? What is the event?). If you can include the corresponding part from the text.
## Output Format
Your reply should be a JSON: list of dictionaries with the following structure:
```python
class Event(BaseModel):
name: str [concise]
description: Optional[str] = None
time_from: Optional[Timestamp] = None
time_to: Optional[Timestamp] = None
location: Optional[str] = None
```

View file

@ -1,3 +1,5 @@
from .knowledge_graph.extract_content_graph import extract_content_graph
from .knowledge_graph.extract_event_graph import extract_event_graph
from .extract_categories import extract_categories
from .extract_summary import extract_summary, extract_code_summary
from .extract_event_entities import extract_event_entities

View file

@ -0,0 +1,44 @@
import os
from typing import List, Type
from pydantic import BaseModel
from cognee.infrastructure.llm.LLMGateway import LLMGateway
from cognee.infrastructure.llm.config import (
get_llm_config,
)
async def extract_event_entities(content: str, response_model: Type[BaseModel]):
"""
Extracts event-related entities from the given content using an LLM with structured output.
This function loads an event entity extraction prompt from the LLM configuration,
renders it into a system prompt, and queries the LLM to produce structured entities
that conform to the specified response model.
Args:
content (str): The input text from which to extract event entities.
response_model (Type[BaseModel]): A Pydantic model defining the structure of the expected output.
Returns:
BaseModel: An instance of the response_model populated with extracted event entities.
"""
llm_config = get_llm_config()
prompt_path = llm_config.event_entity_prompt_path
# Check if the prompt path is an absolute path or just a filename
if os.path.isabs(prompt_path):
# directory containing the file
base_directory = os.path.dirname(prompt_path)
# just the filename itself
prompt_path = os.path.basename(prompt_path)
else:
base_directory = None
system_prompt = LLMGateway.render_prompt(prompt_path, {}, base_directory=base_directory)
content_graph = await LLMGateway.acreate_structured_output(
content, system_prompt, response_model
)
return content_graph

View file

@ -1 +1,2 @@
from .extract_content_graph import extract_content_graph
from .extract_event_graph import extract_event_graph

View file

@ -0,0 +1,46 @@
import os
from pydantic import BaseModel
from typing import Type
from cognee.infrastructure.llm.LLMGateway import LLMGateway
from cognee.infrastructure.llm.config import (
get_llm_config,
)
async def extract_event_graph(content: str, response_model: Type[BaseModel]):
"""
Extracts an event graph from the given content using an LLM with a structured output format.
This function loads a temporal graph extraction prompt from the LLM configuration,
renders it as a system prompt, and queries the LLM to produce a structured event
graph matching the specified response model.
Args:
content (str): The input text from which to extract the event graph.
response_model (Type[BaseModel]): A Pydantic model defining the structure of the expected output.
Returns:
BaseModel: An instance of the response_model populated with the extracted event graph.
"""
llm_config = get_llm_config()
prompt_path = llm_config.temporal_graph_prompt_path
# Check if the prompt path is an absolute path or just a filename
if os.path.isabs(prompt_path):
# directory containing the file
base_directory = os.path.dirname(prompt_path)
# just the filename itself
prompt_path = os.path.basename(prompt_path)
else:
base_directory = None
system_prompt = LLMGateway.render_prompt(prompt_path, {}, base_directory=base_directory)
content_graph = await LLMGateway.acreate_structured_output(
content, system_prompt, response_model
)
return content_graph

View file

@ -1,8 +1,9 @@
from typing import List
from typing import List, Union
from cognee.infrastructure.engine import DataPoint
from cognee.modules.data.processing.document_types import Document
from cognee.modules.engine.models import Entity
from cognee.tasks.temporal_graph.models import Event
class DocumentChunk(DataPoint):
@ -20,7 +21,7 @@ class DocumentChunk(DataPoint):
- chunk_index: The index of the chunk in the original document.
- cut_type: The type of cut that defined this chunk.
- is_part_of: The document to which this chunk belongs.
- contains: A list of entities contained within the chunk (default is None).
- contains: A list of entities or events contained within the chunk (default is None).
- metadata: A dictionary to hold meta information related to the chunk, including index
fields.
"""
@ -30,6 +31,6 @@ class DocumentChunk(DataPoint):
chunk_index: int
cut_type: str
is_part_of: Document
contains: List[Entity] = None
contains: List[Union[Entity, Event]] = None
metadata: dict = {"index_fields": ["text"]}

View file

@ -0,0 +1,16 @@
from typing import Optional, Any
from pydantic import SkipValidation
from cognee.infrastructure.engine import DataPoint
from cognee.modules.engine.models.Timestamp import Timestamp
from cognee.modules.engine.models.Interval import Interval
class Event(DataPoint):
name: str
description: Optional[str] = None
at: Optional[Timestamp] = None
during: Optional[Interval] = None
location: Optional[str] = None
attributes: SkipValidation[Any] = None
metadata: dict = {"index_fields": ["name"]}

View file

@ -0,0 +1,8 @@
from pydantic import Field
from cognee.infrastructure.engine import DataPoint
from cognee.modules.engine.models.Timestamp import Timestamp
class Interval(DataPoint):
time_from: Timestamp = Field(...)
time_to: Timestamp = Field(...)

View file

@ -0,0 +1,13 @@
from pydantic import Field
from cognee.infrastructure.engine import DataPoint
class Timestamp(DataPoint):
time_at: int = Field(...)
year: int = Field(...)
month: int = Field(...)
day: int = Field(...)
hour: int = Field(...)
minute: int = Field(...)
second: int = Field(...)
timestamp_str: str = Field(...)

View file

@ -4,3 +4,6 @@ from .TableRow import TableRow
from .TableType import TableType
from .node_set import NodeSet
from .ColumnValue import ColumnValue
from .Timestamp import Timestamp
from .Interval import Interval
from .Event import Event

View file

@ -1,3 +1,5 @@
from .generate_node_id import generate_node_id
from .generate_node_name import generate_node_name
from .generate_edge_name import generate_edge_name
from .generate_event_datapoint import generate_event_datapoint
from .generate_timestamp_datapoint import generate_timestamp_datapoint

View file

@ -0,0 +1,46 @@
from cognee.modules.engine.models import Interval, Event
from cognee.modules.engine.utils.generate_timestamp_datapoint import generate_timestamp_datapoint
def generate_event_datapoint(event) -> Event:
"""
Generates an Event datapoint from a given event model, including temporal metadata if available.
The function maps the basic attributes (name, description, location) from the input event
and enriches them with temporal information. If start and end times are provided, an
Interval is created. If only one timestamp is available, it is added directly. Temporal
information is also appended to the event description for context.
Args:
event: An event model instance containing attributes such as name, description,
location, time_from, and time_to.
Returns:
Event: A structured Event object with name, description, location, and enriched
temporal details.
"""
# Base event data
event_data = {
"name": event.name,
"description": event.description,
"location": event.location,
}
# Create timestamps if they exist
time_from = generate_timestamp_datapoint(event.time_from) if event.time_from else None
time_to = generate_timestamp_datapoint(event.time_to) if event.time_to else None
# Add temporal information
if time_from and time_to:
event_data["during"] = Interval(time_from=time_from, time_to=time_to)
# Enrich description with temporal info
temporal_info = f"\n---\nTime data: {time_from.timestamp_str} to {time_to.timestamp_str}"
event_data["description"] = (event_data["description"] or "Event") + temporal_info
elif time_from or time_to:
timestamp = time_from or time_to
event_data["at"] = timestamp
# Enrich description with temporal info
temporal_info = f"\n---\nTime data: {timestamp.timestamp_str}"
event_data["description"] = (event_data["description"] or "Event") + temporal_info
return Event(**event_data)

View file

@ -0,0 +1,51 @@
from datetime import datetime, timezone
from cognee.modules.engine.models import Interval, Timestamp, Event
from cognee.modules.engine.utils import generate_node_id
def generate_timestamp_datapoint(ts: Timestamp) -> Timestamp:
"""
Generates a normalized Timestamp datapoint from a given Timestamp model.
The function converts the provided timestamp into an integer representation,
constructs a human-readable string format, and creates a new Timestamp object
with a unique identifier.
Args:
ts (Timestamp): The input Timestamp model containing date and time components.
Returns:
Timestamp: A new Timestamp object with a generated ID, integer representation,
original components, and formatted string.
"""
time_at = date_to_int(ts)
timestamp_str = (
f"{ts.year:04d}-{ts.month:02d}-{ts.day:02d} {ts.hour:02d}:{ts.minute:02d}:{ts.second:02d}"
)
return Timestamp(
id=generate_node_id(str(time_at)),
time_at=time_at,
year=ts.year,
month=ts.month,
day=ts.day,
hour=ts.hour,
minute=ts.minute,
second=ts.second,
timestamp_str=timestamp_str,
)
def date_to_int(ts: Timestamp) -> int:
"""
Converts a Timestamp model into an integer representation in milliseconds since the Unix epoch (UTC).
Args:
ts (Timestamp): The input Timestamp model containing year, month, day, hour, minute, and second.
Returns:
int: The UTC timestamp in milliseconds since January 1, 1970.
"""
dt = datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second, tzinfo=timezone.utc)
time = int(dt.timestamp() * 1000)
return time

View file

@ -0,0 +1,149 @@
import os
from typing import Any, Optional, List, Type
from operator import itemgetter
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.modules.retrieval.utils.completion import generate_completion
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.llm import LLMGateway
from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionRetriever
from cognee.shared.logging_utils import get_logger
from cognee.tasks.temporal_graph.models import QueryInterval
logger = get_logger()
class TemporalRetriever(GraphCompletionRetriever):
"""
Handles graph completion by generating responses based on a series of interactions with
a language model. This class extends from GraphCompletionRetriever and is designed to
manage the retrieval and validation process for user queries, integrating follow-up
questions based on reasoning. The public methods are:
- get_completion
Instance variables include:
- validation_system_prompt_path
- validation_user_prompt_path
- followup_system_prompt_path
- followup_user_prompt_path
"""
def __init__(
self,
user_prompt_path: str = "graph_context_for_question.txt",
system_prompt_path: str = "answer_simple_question.txt",
time_extraction_prompt_path: str = "extract_query_time.txt",
top_k: Optional[int] = 5,
node_type: Optional[Type] = None,
node_name: Optional[List[str]] = None,
):
super().__init__(
user_prompt_path=user_prompt_path,
system_prompt_path=system_prompt_path,
top_k=top_k,
node_type=node_type,
node_name=node_name,
)
self.user_prompt_path = user_prompt_path
self.system_prompt_path = system_prompt_path
self.time_extraction_prompt_path = time_extraction_prompt_path
self.top_k = top_k if top_k is not None else 5
self.node_type = node_type
self.node_name = node_name
def descriptions_to_string(self, results):
descs = []
for entry in results:
d = entry.get("description")
if d:
descs.append(d.strip())
return "\n#####################\n".join(descs)
async def extract_time_from_query(self, query: str):
prompt_path = self.time_extraction_prompt_path
if os.path.isabs(prompt_path):
base_directory = os.path.dirname(prompt_path)
prompt_path = os.path.basename(prompt_path)
else:
base_directory = None
system_prompt = LLMGateway.render_prompt(prompt_path, {}, base_directory=base_directory)
interval = await LLMGateway.acreate_structured_output(query, system_prompt, QueryInterval)
time_from = interval.starts_at
time_to = interval.ends_at
return time_from, time_to
async def filter_top_k_events(self, relevant_events, scored_results):
# Build a score lookup from vector search results
score_lookup = {res.payload["id"]: res.score for res in scored_results}
events_with_scores = []
for event in relevant_events[0]["events"]:
score = score_lookup.get(event["id"], float("inf"))
events_with_scores.append({**event, "score": score})
events_with_scores.sort(key=itemgetter("score"))
return events_with_scores[: self.top_k]
async def get_context(self, query: str) -> Any:
"""Retrieves context based on the query."""
time_from, time_to = await self.extract_time_from_query(query)
graph_engine = await get_graph_engine()
if time_from and time_to:
ids = await graph_engine.collect_time_ids(time_from=time_from, time_to=time_to)
elif time_from:
ids = await graph_engine.collect_time_ids(time_from=time_from)
elif time_to:
ids = await graph_engine.collect_time_ids(time_to=time_to)
else:
logger.info(
"No timestamps identified based on the query, performing retrieval using triplet search on events and entities."
)
triplets = await self.get_triplets(query)
return await self.resolve_edges_to_text(triplets)
if ids:
relevant_events = await graph_engine.collect_events(ids=ids)
else:
logger.info(
"No events identified based on timestamp filtering, performing retrieval using triplet search on events and entities."
)
triplets = await self.get_triplets(query)
return await self.resolve_edges_to_text(triplets)
vector_engine = get_vector_engine()
query_vector = (await vector_engine.embedding_engine.embed_text([query]))[0]
vector_search_results = await vector_engine.search(
collection_name="Event_name", query_vector=query_vector, limit=0
)
top_k_events = await self.filter_top_k_events(relevant_events, vector_search_results)
return self.descriptions_to_string(top_k_events)
async def get_completion(self, query: str, context: Optional[Any] = None) -> Any:
"""Generates a response using the query and optional context."""
context = await self.get_context(query=query)
completion = await generate_completion(
query=query,
context=context,
user_prompt_path=self.user_prompt_path,
system_prompt_path=self.system_prompt_path,
)
return [completion]

View file

@ -13,6 +13,7 @@ from cognee.modules.retrieval.insights_retriever import InsightsRetriever
from cognee.modules.retrieval.summaries_retriever import SummariesRetriever
from cognee.modules.retrieval.completion_retriever import CompletionRetriever
from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionRetriever
from cognee.modules.retrieval.temporal_retriever import TemporalRetriever
from cognee.modules.retrieval.graph_summary_completion_retriever import (
GraphSummaryCompletionRetriever,
)
@ -167,6 +168,7 @@ async def specific_search(
SearchType.CYPHER: CypherSearchRetriever().get_completion,
SearchType.NATURAL_LANGUAGE: NaturalLanguageRetriever().get_completion,
SearchType.FEEDBACK: UserQAFeedback(last_k=last_k).add_feedback,
SearchType.TEMPORAL: TemporalRetriever(top_k=top_k).get_completion,
}
# If the query type is FEELING_LUCKY, select the search type intelligently

View file

@ -15,3 +15,4 @@ class SearchType(Enum):
GRAPH_COMPLETION_CONTEXT_EXTENSION = "GRAPH_COMPLETION_CONTEXT_EXTENSION"
FEELING_LUCKY = "FEELING_LUCKY"
FEEDBACK = "FEEDBACK"
TEMPORAL = "TEMPORAL"

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,85 @@
from cognee.modules.engine.models import Event
from cognee.tasks.temporal_graph.models import EventWithEntities
from cognee.modules.engine.models.Entity import Entity
from cognee.modules.engine.models.EntityType import EntityType
from cognee.infrastructure.engine.models.Edge import Edge
from cognee.modules.engine.utils import generate_node_id, generate_node_name
def add_entities_to_event(event: Event, event_with_entities: EventWithEntities) -> None:
"""
Adds extracted entities to an Event object by populating its attributes field.
For each attribute in the provided EventWithEntities, the function ensures that
the corresponding entity type exists, creates an Entity node with metadata, and
links it to the event via an Edge representing the relationship. Entities are
cached by type to avoid duplication.
Args:
event (Event): The target Event object to enrich with entities.
event_with_entities (EventWithEntities): An event model containing extracted
attributes with entity, type, and relationship metadata.
Returns:
None
"""
if not event_with_entities.attributes:
return
# Create entity types cache
entity_types = {}
# Process each attribute
for attribute in event_with_entities.attributes:
# Get or create entity type
entity_type = get_or_create_entity_type(entity_types, attribute.entity_type)
# Create entity
entity_id = generate_node_id(attribute.entity)
entity_name = generate_node_name(attribute.entity)
entity = Entity(
id=entity_id,
name=entity_name,
is_a=entity_type,
description=f"Entity {attribute.entity} of type {attribute.entity_type}",
ontology_valid=False,
belongs_to_set=None,
)
# Create edge
edge = Edge(relationship_type=attribute.relationship)
# Add to event attributes
if event.attributes is None:
event.attributes = []
event.attributes.append((edge, [entity]))
def get_or_create_entity_type(entity_types: dict, entity_type_name: str) -> EntityType:
"""
Retrieves an existing EntityType from the cache or creates a new one if it does not exist.
If the given entity type name is not already in the cache, a new EntityType is generated
with a unique ID, normalized name, and description, then added to the cache.
Args:
entity_types (dict): A cache mapping entity type names to EntityType objects.
entity_type_name (str): The name of the entity type to retrieve or create.
Returns:
EntityType: The existing or newly created EntityType object.
"""
if entity_type_name not in entity_types:
type_id = generate_node_id(entity_type_name)
type_name = generate_node_name(entity_type_name)
entity_type = EntityType(
id=type_id,
name=type_name,
type=type_name,
description=f"Type for {entity_type_name}",
ontology_valid=False,
)
entity_types[entity_type_name] = entity_type
return entity_types[entity_type_name]

View file

@ -0,0 +1,34 @@
from typing import List
from cognee.infrastructure.llm import LLMGateway
from cognee.modules.engine.models import Event
from cognee.tasks.temporal_graph.models import EventWithEntities, EventEntityList
async def enrich_events(events: List[Event]) -> List[EventWithEntities]:
"""
Enriches a list of events by extracting entities using an LLM.
The function serializes event data into JSON, sends it to the LLM for
entity extraction, and returns enriched events with associated entities.
Args:
events (List[Event]): A list of Event objects to be enriched.
Returns:
List[EventWithEntities]: A list of events augmented with extracted entities.
"""
import json
# Convert events to JSON format for LLM processing
events_json = [
{"event_name": event.name, "description": event.description or ""} for event in events
]
events_json_str = json.dumps(events_json)
# Extract entities from events
entity_result = await LLMGateway.extract_event_entities(events_json_str, EventEntityList)
return entity_result.events

View file

@ -0,0 +1,32 @@
import asyncio
from typing import Type, List
from cognee.infrastructure.llm.LLMGateway import LLMGateway
from cognee.modules.chunking.models import DocumentChunk
from cognee.tasks.temporal_graph.models import EventList
from cognee.modules.engine.utils.generate_event_datapoint import generate_event_datapoint
async def extract_events_and_timestamps(data_chunks: List[DocumentChunk]) -> List[DocumentChunk]:
"""
Extracts events and their timestamps from document chunks using an LLM.
Each document chunk is processed with the event graph extractor to identify events.
The extracted events are converted into Event datapoints and appended to the
chunk's `contains` list.
Args:
data_chunks (List[DocumentChunk]): A list of document chunks containing text to process.
Returns:
List[DocumentChunk]: The same list of document chunks, enriched with extracted Event datapoints.
"""
events = await asyncio.gather(
*[LLMGateway.extract_event_graph(chunk.text, EventList) for chunk in data_chunks]
)
for data_chunk, event_list in zip(data_chunks, events):
for event in event_list.events:
event_datapoint = generate_event_datapoint(event)
data_chunk.contains.append(event_datapoint)
return data_chunks

View file

@ -0,0 +1,41 @@
from typing import List
from cognee.modules.chunking.models import DocumentChunk
from cognee.modules.engine.models import Event
from cognee.tasks.temporal_graph.enrich_events import enrich_events
from cognee.tasks.temporal_graph.add_entities_to_event import add_entities_to_event
async def extract_knowledge_graph_from_events(
data_chunks: List[DocumentChunk],
) -> List[DocumentChunk]:
"""
Extracts events from document chunks and enriches them with entities to form a knowledge graph.
The function collects all Event objects from the given document chunks,
uses an LLM to extract and attach related entities, and updates the events
with these enriched attributes.
Args:
data_chunks (List[DocumentChunk]): A list of document chunks containing extracted events.
Returns:
List[DocumentChunk]: The same list of document chunks, with their events enriched by entities.
"""
# Extract events from chunks
all_events = []
for chunk in data_chunks:
for item in chunk.contains:
if isinstance(item, Event):
all_events.append(item)
if not all_events:
return data_chunks
# Enrich events with entities
enriched_events = await enrich_events(all_events)
# Add entities to events
for event, enriched_event in zip(all_events, enriched_events):
add_entities_to_event(event, enriched_event)
return data_chunks

View file

@ -0,0 +1,49 @@
from typing import Optional, List
from pydantic import BaseModel, Field
class Timestamp(BaseModel):
year: int = Field(..., ge=1, le=9999)
month: int = Field(..., ge=1, le=12)
day: int = Field(..., ge=1, le=31)
hour: int = Field(..., ge=0, le=23)
minute: int = Field(..., ge=0, le=59)
second: int = Field(..., ge=0, le=59)
class Interval(BaseModel):
starts_at: Timestamp
ends_at: Timestamp
class QueryInterval(BaseModel):
starts_at: Optional[Timestamp] = None
ends_at: Optional[Timestamp] = None
class Event(BaseModel):
name: str
description: Optional[str] = None
time_from: Optional[Timestamp] = None
time_to: Optional[Timestamp] = None
location: Optional[str] = None
class EventList(BaseModel):
events: List[Event]
class EntityAttribute(BaseModel):
entity: str
entity_type: str
relationship: str
class EventWithEntities(BaseModel):
event_name: str
description: Optional[str] = None
attributes: List[EntityAttribute] = []
class EventEntityList(BaseModel):
events: List[EventWithEntities]

View file

@ -0,0 +1,167 @@
import asyncio
import cognee
from cognee.modules.retrieval.temporal_retriever import TemporalRetriever
from cognee.shared.logging_utils import setup_logging, INFO
from cognee.tasks.temporal_graph.models import Timestamp
from cognee.api.v1.search import SearchType
from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.databases.graph.get_graph_engine import get_graph_engine
from collections import Counter
from cognee.modules.engine.utils.generate_timestamp_datapoint import date_to_int
logger = get_logger()
biography_1 = """
Attaphol Buspakom Attaphol Buspakom ( ; ) , nicknamed Tak ( ; ) ; 1 October 1962 16 April 2015 ) was a Thai national and football coach . He was given the role at Muangthong United and Buriram United after TTM Samut Sakhon folded after the 2009 season . He played for the Thailand national football team , appearing in several FIFA World Cup qualifying matches .
Club career .
Attaphol began his career as a player at Thai Port FC Authority of Thailand in 1985 . In his first year , he won his first championship with the club . He played for the club until 1989 and in 1987 also won the Queens Cup . He then moved to Malaysia for two seasons for Pahang FA , then return to Thailand to his former club . His time from 1991 to 1994 was marked by less success than in his first stay at Port Authority . From 1994 to 1996 he played for Pahang again and this time he was able to win with the club , the Malaysia Super League and also reached the final of the Malaysia Cup and the Malaysia FA Cup . Both cup finals but lost . Back in Thailand , he let end his playing career at FC Stock Exchange of Thailand , with which he once again runnerup in 1996-97 . In 1998 , he finished his career .
International career .
For the Thailand national football team Attaphol played between 1985 and 1998 a total of 85 games and scored 13 results . In 1992 , he participated with the team in the finals of the Asian Cup . He also stood in various cadres to qualifications to FIFA World Cup .
Coaching career .
Bec Tero Sasana .
In BEC Tero Sasana F.C . began his coaching career in 2001 for him , first as assistant coach . He took over the reigning champions of the Thai League T1 , after his predecessor Pichai Pituwong resigned from his post . It was his first coach station and he had the difficult task of leading the club through the new AFC Champions League . He could accomplish this task with flying colors and even led the club to the finals . The finale , then still played in home and away matches , was lost with 1:2 at the end against Al Ain FC . Attaphol is and was next to Charnwit Polcheewin the only coach who managed a club from Thailand to lead to the final of the AFC Champions League . 2002-03 and 2003-04 he won with the club also two runnerup . In his team , which reached the final of the Champions League , were a number of exceptional players like Therdsak Chaiman , Worrawoot Srimaka , Dusit Chalermsan and Anurak Srikerd .
Geylang United / Krung Thai Bank .
In 2006 , he went to Singapore in the SLeague to Geylang United He was released after a few months due to lack of success . In 2008 , he took over as coach at Krung Thai Bank F.C. , where he had almost a similar task , as a few years earlier by BECTero . As vicechampion of the club was also qualified for the AFC Champions League . However , he failed to lead the team through the group stage of the season 2008 and beyond . With the Kashima Antlers of Japan and Beijing Guoan F.C . athletic competition was too great . One of the highlights was put under his leadership , yet the club . In the group match against the Vietnam club Nam Dinh F.C . his team won with 9-1 , but also lost four weeks later with 1-8 against Kashima Antlers . At the end of the National Football League season , he reached the Krung Thai 6th Table space . The Erstligalizenz the club was sold at the end of the season at the Bangkok Glass F.C. . Attaphol finished his coaching career with the club and accepted an offer of TTM Samutsakorn . After only a short time in office
Muangthong United .
In 2009 , he received an offer from Muangthong United F.C. , which he accepted and changed . He can champion Muang Thong United for 2009 Thai Premier League and Attaphol won Coach of The year for Thai Premier League and he was able to lead Muang Thong United to play AFC Champions League qualifying playoff for the first in the clubs history .
Buriram United .
In 2010 Buspakom moved from Muangthong United to Buriram United F.C. . He received Coach of the Month in Thai Premier League 2 time in June and October . In 2011 , he led Buriram United win 2011 Thai Premier League second time for club and set a record with the most points in the Thai League T1 for 85 point and He led Buriram win 2011 Thai FA Cup by beat Muangthong United F.C . 10 and he led Buriram win 2011 Thai League Cup by beat Thai Port F.C . 20 . In 2012 , he led Buriram United to the 2012 AFC Champions League group stage . Buriram along with Guangzhou Evergrande F.C . from China , Kashiwa Reysol from Japan and Jeonbuk Hyundai Motors which are all champions from their country . In the first match of Buriram they beat Kashiwa 32 and Second Match they beat Guangzhou 12 at the Tianhe Stadium . Before losing to Jeonbuk 02 and 32 with lose Kashiwa and Guangzhou 10 and 12 respectively and Thai Premier League Attaphol lead Buriram end 4th for table with win 2012 Thai FA Cup and 2012 Thai League Cup .
Bangkok Glass .
In 2013 , he moved from Buriram United to Bangkok Glass F.C. .
Individual
- Thai Premier League Coach of the Year ( 3 ) : 2001-02 , 2009 , 2013
"""
biography_2 = """
Arnulf Øverland Ole Peter Arnulf Øverland ( 27 April 1889 25 March 1968 ) was a Norwegian poet and artist . He is principally known for his poetry which served to inspire the Norwegian resistance movement during the German occupation of Norway during World War II .
Biography .
Øverland was born in Kristiansund and raised in Bergen . His parents were Peter Anton Øverland ( 18521906 ) and Hanna Hage ( 18541939 ) . The early death of his father , left the family economically stressed . He was able to attend Bergen Cathedral School and in 1904 Kristiania Cathedral School . He graduated in 1907 and for a time studied philology at University of Kristiania . Øverland published his first collection of poems ( 1911 ) .
Øverland became a communist sympathizer from the early 1920s and became a member of Mot Dag . He also served as chairman of the Norwegian Students Society 192328 . He changed his stand in 1937 , partly as an expression of dissent against the ongoing Moscow Trials . He was an avid opponent of Nazism and in 1936 he wrote the poem Du ikke sove which was printed in the journal Samtiden . It ends with . ( I thought: : Something is imminent . Our era is over Europes on fire! ) . Probably the most famous line of the poem is ( You mustnt endure so well the injustice that doesnt affect you yourself! )
During the German occupation of Norway from 1940 in World War II , he wrote to inspire the Norwegian resistance movement . He wrote a series of poems which were clandestinely distributed , leading to the arrest of both him and his future wife Margrete Aamot Øverland in 1941 . Arnulf Øverland was held first in the prison camp of Grini before being transferred to Sachsenhausen concentration camp in Germany . He spent a fouryear imprisonment until the liberation of Norway in 1945 . His poems were later collected in Vi overlever alt and published in 1945 .
Øverland played an important role in the Norwegian language struggle in the postwar era . He became a noted supporter for the conservative written form of Norwegian called Riksmål , he was president of Riksmålsforbundet ( an organization in support of Riksmål ) from 1947 to 1956 . In addition , Øverland adhered to the traditionalist style of writing , criticising modernist poetry on several occasions . His speech Tungetale fra parnasset , published in Arbeiderbladet in 1954 , initiated the socalled Glossolalia debate .
Personal life .
In 1918 he had married the singer Hildur Arntzen ( 18881957 ) . Their marriage was dissolved in 1939 . In 1940 , he married Bartholine Eufemia Leganger ( 19031995 ) . They separated shortly after , and were officially divorced in 1945 . Øverland was married to journalist Margrete Aamot Øverland ( 19131978 ) during June 1945 . In 1946 , the Norwegian Parliament arranged for Arnulf and Margrete Aamot Øverland to reside at the Grotten . He lived there until his death in 1968 and she lived there for another ten years until her death in 1978 . Arnulf Øverland was buried at Vår Frelsers Gravlund in Oslo . Joseph Grimeland designed the bust of Arnulf Øverland ( bronze , 1970 ) at his grave site .
Selected Works .
- Den ensomme fest ( 1911 )
- Berget det blå ( 1927 )
- En Hustavle ( 1929 )
- Den røde front ( 1937 )
- Vi overlever alt ( 1945 )
- Sverdet bak døren ( 1956 )
- Livets minutter ( 1965 )
Awards .
- Gyldendals Endowment ( 1935 )
- Dobloug Prize ( 1951 )
- Mads Wiel Nygaards legat ( 1961 )
"""
async def main():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await cognee.add([biography_1, biography_2])
await cognee.cognify(temporal_cognify=True)
graph_engine = await get_graph_engine()
graph = await graph_engine.get_graph_data()
type_counts = Counter(node_data[1].get("type", {}) for node_data in graph[0])
edge_type_counts = Counter(edge_type[2] for edge_type in graph[1])
# Graph structure test
assert type_counts.get("TextDocument", 0) == 2, (
f"Expected exactly one TextDocument, but found {type_counts.get('TextDocument', 0)}"
)
assert type_counts.get("DocumentChunk", 0) == 2, (
f"Expected exactly one DocumentChunk, but found {type_counts.get('DocumentChunk', 0)}"
)
assert type_counts.get("Entity", 0) >= 20, (
f"Expected multiple entities (assert is set to 20), but found {type_counts.get('Entity', 0)}"
)
assert type_counts.get("EntityType", 0) >= 2, (
f"Expected multiple entity types, but found {type_counts.get('EntityType', 0)}"
)
assert type_counts.get("Event", 0) >= 20, (
f"Expected multiple events (assert is set to 20), but found {type_counts.get('Event', 0)}"
)
assert type_counts.get("Timestamp", 0) >= 20, (
f"Expected multiple timestamps (assert is set to 20), but found {type_counts.get('Timestamp', 0)}"
)
assert type_counts.get("Interval", 0) >= 2, (
f"Expected multiple intervals, but found {type_counts.get('Interval', 0)}"
)
assert edge_type_counts.get("contains", 0) >= 20, (
f"Expected multiple 'contains' edge, but found {edge_type_counts.get('contains', 0)}"
)
assert edge_type_counts.get("is_a", 0) >= 20, (
f"Expected multiple 'is_a' edge, but found {edge_type_counts.get('is_a', 0)}"
)
assert edge_type_counts.get("during", 0) == type_counts.get("Interval", 0), (
"Expected the same amount of during and interval objects in the graph"
)
assert edge_type_counts.get("during", 0) == type_counts.get("Interval", 0), (
"Expected the same amount of during and interval objects in the graph"
)
assert edge_type_counts.get("time_from", 0) == type_counts.get("Interval", 0), (
"Expected the same amount of time_from and interval objects in the graph"
)
assert edge_type_counts.get("time_to", 0) == type_counts.get("Interval", 0), (
"Expected the same amount of time_to and interval objects in the graph"
)
retriever = TemporalRetriever()
result_before = await retriever.extract_time_from_query("What happened before 1890?")
assert result_before[0] is None
result_after = await retriever.extract_time_from_query("What happened after 1891?")
assert result_after[1] is None
result_between = await retriever.extract_time_from_query("What happened between 1890 and 1900?")
assert result_between[1]
assert result_between[0]
if __name__ == "__main__":
logger = setup_logging(log_level=INFO)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())

View file

@ -1,7 +1,7 @@
import os
import pytest
import pathlib
from typing import List
import cognee
from cognee.low_level import setup
from cognee.tasks.storage import add_data_points
@ -10,6 +10,20 @@ from cognee.modules.chunking.models import DocumentChunk
from cognee.modules.data.processing.document_types import TextDocument
from cognee.modules.retrieval.exceptions.exceptions import NoDataError
from cognee.modules.retrieval.chunks_retriever import ChunksRetriever
from cognee.infrastructure.engine import DataPoint
from cognee.modules.data.processing.document_types import Document
from cognee.modules.engine.models import Entity
class DocumentChunkWithEntities(DataPoint):
text: str
chunk_size: int
chunk_index: int
cut_type: str
is_part_of: Document
contains: List[Entity] = None
metadata: dict = {"index_fields": ["text"]}
class TestChunksRetriever:
@ -179,7 +193,9 @@ class TestChunksRetriever:
await retriever.get_context("Christina Mayer")
vector_engine = get_vector_engine()
await vector_engine.create_collection("DocumentChunk_text", payload_schema=DocumentChunk)
await vector_engine.create_collection(
"DocumentChunk_text", payload_schema=DocumentChunkWithEntities
)
context = await retriever.get_context("Christina Mayer")
assert len(context) == 0, "Found chunks when none should exist"

View file

@ -1,7 +1,7 @@
import os
from typing import List
import pytest
import pathlib
import cognee
from cognee.low_level import setup
from cognee.tasks.storage import add_data_points
@ -10,6 +10,20 @@ from cognee.modules.chunking.models import DocumentChunk
from cognee.modules.data.processing.document_types import TextDocument
from cognee.modules.retrieval.exceptions.exceptions import NoDataError
from cognee.modules.retrieval.completion_retriever import CompletionRetriever
from cognee.infrastructure.engine import DataPoint
from cognee.modules.data.processing.document_types import Document
from cognee.modules.engine.models import Entity
class DocumentChunkWithEntities(DataPoint):
text: str
chunk_size: int
chunk_index: int
cut_type: str
is_part_of: Document
contains: List[Entity] = None
metadata: dict = {"index_fields": ["text"]}
class TestRAGCompletionRetriever:
@ -182,7 +196,9 @@ class TestRAGCompletionRetriever:
await retriever.get_context("Christina Mayer")
vector_engine = get_vector_engine()
await vector_engine.create_collection("DocumentChunk_text", payload_schema=DocumentChunk)
await vector_engine.create_collection(
"DocumentChunk_text", payload_schema=DocumentChunkWithEntities
)
context = await retriever.get_context("Christina Mayer")
assert context == "", "Returned context should be empty on an empty graph"

View file

@ -0,0 +1,225 @@
import asyncio
from types import SimpleNamespace
import pytest
from cognee.modules.retrieval.temporal_retriever import TemporalRetriever
# Test TemporalRetriever initialization defaults and overrides
def test_init_defaults_and_overrides():
tr = TemporalRetriever()
assert tr.top_k == 5
assert tr.user_prompt_path == "graph_context_for_question.txt"
assert tr.system_prompt_path == "answer_simple_question.txt"
assert tr.time_extraction_prompt_path == "extract_query_time.txt"
tr2 = TemporalRetriever(
top_k=3,
user_prompt_path="u.txt",
system_prompt_path="s.txt",
time_extraction_prompt_path="t.txt",
)
assert tr2.top_k == 3
assert tr2.user_prompt_path == "u.txt"
assert tr2.system_prompt_path == "s.txt"
assert tr2.time_extraction_prompt_path == "t.txt"
# Test descriptions_to_string with basic and empty results
def test_descriptions_to_string_basic_and_empty():
tr = TemporalRetriever()
results = [
{"description": " First "},
{"nope": "no description"},
{"description": "Second"},
{"description": ""},
{"description": " Third line "},
]
s = tr.descriptions_to_string(results)
assert s == "First\n#####################\nSecond\n#####################\nThird line"
assert tr.descriptions_to_string([]) == ""
# Test filter_top_k_events sorts and limits correctly
@pytest.mark.asyncio
async def test_filter_top_k_events_sorts_and_limits():
tr = TemporalRetriever(top_k=2)
relevant_events = [
{
"events": [
{"id": "e1", "description": "E1"},
{"id": "e2", "description": "E2"},
{"id": "e3", "description": "E3 - not in vector results"},
]
}
]
scored_results = [
SimpleNamespace(payload={"id": "e2"}, score=0.10),
SimpleNamespace(payload={"id": "e1"}, score=0.20),
]
top = await tr.filter_top_k_events(relevant_events, scored_results)
assert [e["id"] for e in top] == ["e2", "e1"]
assert all("score" in e for e in top)
assert top[0]["score"] == 0.10
assert top[1]["score"] == 0.20
# Test filter_top_k_events handles unknown ids as infinite scores
@pytest.mark.asyncio
async def test_filter_top_k_events_includes_unknown_as_infinite_but_not_in_top_k():
tr = TemporalRetriever(top_k=2)
relevant_events = [
{
"events": [
{"id": "known1", "description": "Known 1"},
{"id": "unknown", "description": "Unknown"},
{"id": "known2", "description": "Known 2"},
]
}
]
scored_results = [
SimpleNamespace(payload={"id": "known2"}, score=0.05),
SimpleNamespace(payload={"id": "known1"}, score=0.50),
]
top = await tr.filter_top_k_events(relevant_events, scored_results)
assert [e["id"] for e in top] == ["known2", "known1"]
assert all(e["score"] != float("inf") for e in top)
# Test descriptions_to_string with unicode and newlines
def test_descriptions_to_string_unicode_and_newlines():
tr = TemporalRetriever()
results = [
{"description": "Line A\nwith newline"},
{"description": "This is a description"},
]
s = tr.descriptions_to_string(results)
assert "Line A\nwith newline" in s
assert "This is a description" in s
assert s.count("#####################") == 1
# Test filter_top_k_events when top_k is larger than available events
@pytest.mark.asyncio
async def test_filter_top_k_events_limits_when_top_k_exceeds_events():
tr = TemporalRetriever(top_k=10)
relevant_events = [{"events": [{"id": "a"}, {"id": "b"}]}]
scored_results = [
SimpleNamespace(payload={"id": "a"}, score=0.1),
SimpleNamespace(payload={"id": "b"}, score=0.2),
]
out = await tr.filter_top_k_events(relevant_events, scored_results)
assert [e["id"] for e in out] == ["a", "b"]
# Test filter_top_k_events when scored_results is empty
@pytest.mark.asyncio
async def test_filter_top_k_events_handles_empty_scored_results():
tr = TemporalRetriever(top_k=2)
relevant_events = [{"events": [{"id": "x"}, {"id": "y"}]}]
scored_results = []
out = await tr.filter_top_k_events(relevant_events, scored_results)
assert [e["id"] for e in out] == ["x", "y"]
assert all(e["score"] == float("inf") for e in out)
# Test filter_top_k_events error handling for missing structure
@pytest.mark.asyncio
async def test_filter_top_k_events_error_handling():
tr = TemporalRetriever(top_k=2)
with pytest.raises((KeyError, TypeError)):
await tr.filter_top_k_events([{}], [])
class _FakeRetriever(TemporalRetriever):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._calls = []
async def extract_time_from_query(self, query: str):
if "both" in query:
return "2024-01-01", "2024-12-31"
if "from_only" in query:
return "2024-01-01", None
if "to_only" in query:
return None, "2024-12-31"
return None, None
async def get_triplets(self, query: str):
self._calls.append(("get_triplets", query))
return [{"s": "a", "p": "b", "o": "c"}]
async def resolve_edges_to_text(self, triplets):
self._calls.append(("resolve_edges_to_text", len(triplets)))
return "edges->text"
async def _fake_graph_collect_ids(self, **kwargs):
return ["e1", "e2"]
async def _fake_graph_collect_events(self, ids):
return [
{
"events": [
{"id": "e1", "description": "E1"},
{"id": "e2", "description": "E2"},
{"id": "e3", "description": "E3"},
]
}
]
async def _fake_vector_embed(self, texts):
assert isinstance(texts, list) and texts
return [[0.0, 1.0, 2.0]]
async def _fake_vector_search(self, **kwargs):
return [
SimpleNamespace(payload={"id": "e2"}, score=0.05),
SimpleNamespace(payload={"id": "e1"}, score=0.10),
]
async def get_context(self, query: str):
time_from, time_to = await self.extract_time_from_query(query)
if not (time_from or time_to):
triplets = await self.get_triplets(query)
return await self.resolve_edges_to_text(triplets)
ids = await self._fake_graph_collect_ids(time_from=time_from, time_to=time_to)
relevant_events = await self._fake_graph_collect_events(ids)
_ = await self._fake_vector_embed([query])
vector_search_results = await self._fake_vector_search(
collection_name="Event_name", query_vector=[0.0], limit=0
)
top_k_events = await self.filter_top_k_events(relevant_events, vector_search_results)
return self.descriptions_to_string(top_k_events)
# Test get_context fallback to triplets when no time is extracted
@pytest.mark.asyncio
async def test_fake_get_context_falls_back_to_triplets_when_no_time():
tr = _FakeRetriever(top_k=2)
ctx = await tr.get_context("no_time")
assert ctx == "edges->text"
assert tr._calls[0][0] == "get_triplets"
assert tr._calls[1][0] == "resolve_edges_to_text"
# Test get_context when time is extracted and vector ranking is applied
@pytest.mark.asyncio
async def test_fake_get_context_with_time_filters_and_vector_ranking():
tr = _FakeRetriever(top_k=2)
ctx = await tr.get_context("both time")
assert ctx.startswith("E2")
assert "#####################" in ctx
assert "E1" in ctx and "E3" not in ctx

View file

@ -0,0 +1,100 @@
import asyncio
import cognee
from cognee.shared.logging_utils import setup_logging, INFO
from cognee.api.v1.search import SearchType
biography_1 = """
Attaphol Buspakom Attaphol Buspakom ( ; ) , nicknamed Tak ( ; ) ; 1 October 1962 16 April 2015 ) was a Thai national and football coach . He was given the role at Muangthong United and Buriram United after TTM Samut Sakhon folded after the 2009 season . He played for the Thailand national football team , appearing in several FIFA World Cup qualifying matches .
Club career .
Attaphol began his career as a player at Thai Port FC Authority of Thailand in 1985 . In his first year , he won his first championship with the club . He played for the club until 1989 and in 1987 also won the Queens Cup . He then moved to Malaysia for two seasons for Pahang FA , then return to Thailand to his former club . His time from 1991 to 1994 was marked by less success than in his first stay at Port Authority . From 1994 to 1996 he played for Pahang again and this time he was able to win with the club , the Malaysia Super League and also reached the final of the Malaysia Cup and the Malaysia FA Cup . Both cup finals but lost . Back in Thailand , he let end his playing career at FC Stock Exchange of Thailand , with which he once again runner-up in 1996-97 . In 1998 , he finished his career .
International career .
For the Thailand national football team Attaphol played between 1985 and 1998 a total of 85 games and scored 13 results . In 1992 , he participated with the team in the finals of the Asian Cup . He also stood in various cadres to qualifications to FIFA World Cup .
Coaching career .
Bec Tero Sasana .
In BEC Tero Sasana F.C . began his coaching career in 2001 for him , first as assistant coach . He took over the reigning champions of the Thai League T1 , after his predecessor Pichai Pituwong resigned from his post . It was his first coach station and he had the difficult task of leading the club through the new AFC Champions League . He could accomplish this task with flying colors and even led the club to the finals . The finale , then still played in home and away matches , was lost with 1:2 at the end against Al Ain FC . Attaphol is and was next to Charnwit Polcheewin the only coach who managed a club from Thailand to lead to the final of the AFC Champions League . 2002-03 and 2003-04 he won with the club also two runner-up . In his team , which reached the final of the Champions League , were a number of exceptional players like Therdsak Chaiman , Worrawoot Srimaka , Dusit Chalermsan and Anurak Srikerd .
Geylang United / Krung Thai Bank .
In 2006 , he went to Singapore in the S-League to Geylang United He was released after a few months due to lack of success . In 2008 , he took over as coach at Krung Thai Bank F.C. , where he had almost a similar task , as a few years earlier by BEC-Tero . As vice-champion of the club was also qualified for the AFC Champions League . However , he failed to lead the team through the group stage of the season 2008 and beyond . With the Kashima Antlers of Japan and Beijing Guoan F.C . athletic competition was too great . One of the highlights was put under his leadership , yet the club . In the group match against the Vietnam club Nam Dinh F.C . his team won with 9-1 , but also lost four weeks later with 1-8 against Kashima Antlers . At the end of the National Football League season , he reached the Krung Thai 6th Table space . The Erstligalizenz the club was sold at the end of the season at the Bangkok Glass F.C. . Attaphol finished his coaching career with the club and accepted an offer of TTM Samutsakorn . After only a short time in office
Muangthong United .
In 2009 , he received an offer from Muangthong United F.C. , which he accepted and changed . He can champion Muang Thong United for 2009 Thai Premier League and Attaphol won Coach of The year for Thai Premier League and he was able to lead Muang Thong United to play AFC Champions League qualifying play-off for the first in the clubs history .
Buriram United .
In 2010 Buspakom moved from Muangthong United to Buriram United F.C. . He received Coach of the Month in Thai Premier League 2 time in June and October . In 2011 , he led Buriram United win 2011 Thai Premier League second time for club and set a record with the most points in the Thai League T1 for 85 point and He led Buriram win 2011 Thai FA Cup by beat Muangthong United F.C . 1-0 and he led Buriram win 2011 Thai League Cup by beat Thai Port F.C . 2-0 . In 2012 , he led Buriram United to the 2012 AFC Champions League group stage . Buriram along with Guangzhou Evergrande F.C . from China , Kashiwa Reysol from Japan and Jeonbuk Hyundai Motors which are all champions from their country . In the first match of Buriram they beat Kashiwa 3-2 and Second Match they beat Guangzhou 1-2 at the Tianhe Stadium . Before losing to Jeonbuk 0-2 and 3-2 with lose Kashiwa and Guangzhou 1-0 and 1-2 respectively and Thai Premier League Attaphol lead Buriram end 4th for table with win 2012 Thai FA Cup and 2012 Thai League Cup .
Bangkok Glass .
In 2013 , he moved from Buriram United to Bangkok Glass F.C. .
Individual
- Thai Premier League Coach of the Year ( 3 ) : 2001-02 , 2009 , 2013
"""
biography_2 = """
Arnulf Øverland Ole Peter Arnulf Øverland ( 27 April 1889 25 March 1968 ) was a Norwegian poet and artist . He is principally known for his poetry which served to inspire the Norwegian resistance movement during the German occupation of Norway during World War II .
Biography .
Øverland was born in Kristiansund and raised in Bergen . His parents were Peter Anton Øverland ( 18521906 ) and Hanna Hage ( 18541939 ) . The early death of his father , left the family economically stressed . He was able to attend Bergen Cathedral School and in 1904 Kristiania Cathedral School . He graduated in 1907 and for a time studied philology at University of Kristiania . Øverland published his first collection of poems ( 1911 ) .
Øverland became a communist sympathizer from the early 1920s and became a member of Mot Dag . He also served as chairman of the Norwegian Students Society 192328 . He changed his stand in 1937 , partly as an expression of dissent against the ongoing Moscow Trials . He was an avid opponent of Nazism and in 1936 he wrote the poem Du ikke sove which was printed in the journal Samtiden . It ends with . ( I thought: : Something is imminent . Our era is over Europes on fire! ) . Probably the most famous line of the poem is ( You mustnt endure so well the injustice that doesnt affect you yourself! )
During the German occupation of Norway from 1940 in World War II , he wrote to inspire the Norwegian resistance movement . He wrote a series of poems which were clandestinely distributed , leading to the arrest of both him and his future wife Margrete Aamot Øverland in 1941 . Arnulf Øverland was held first in the prison camp of Grini before being transferred to Sachsenhausen concentration camp in Germany . He spent a four-year imprisonment until the liberation of Norway in 1945 . His poems were later collected in Vi overlever alt and published in 1945 .
Øverland played an important role in the Norwegian language struggle in the post-war era . He became a noted supporter for the conservative written form of Norwegian called Riksmål , he was president of Riksmålsforbundet ( an organization in support of Riksmål ) from 1947 to 1956 . In addition , Øverland adhered to the traditionalist style of writing , criticising modernist poetry on several occasions . His speech Tungetale fra parnasset , published in Arbeiderbladet in 1954 , initiated the so-called Glossolalia debate .
Personal life .
In 1918 he had married the singer Hildur Arntzen ( 18881957 ) . Their marriage was dissolved in 1939 . In 1940 , he married Bartholine Eufemia Leganger ( 19031995 ) . They separated shortly after , and were officially divorced in 1945 . Øverland was married to journalist Margrete Aamot Øverland ( 19131978 ) during June 1945 . In 1946 , the Norwegian Parliament arranged for Arnulf and Margrete Aamot Øverland to reside at the Grotten . He lived there until his death in 1968 and she lived there for another ten years until her death in 1978 . Arnulf Øverland was buried at Vår Frelsers Gravlund in Oslo . Joseph Grimeland designed the bust of Arnulf Øverland ( bronze , 1970 ) at his grave site .
Selected Works .
- Den ensomme fest ( 1911 )
- Berget det blå ( 1927 )
- En Hustavle ( 1929 )
- Den røde front ( 1937 )
- Vi overlever alt ( 1945 )
- Sverdet bak døren ( 1956 )
- Livets minutter ( 1965 )
Awards .
- Gyldendals Endowment ( 1935 )
- Dobloug Prize ( 1951 )
- Mads Wiel Nygaards legat ( 1961 )
"""
async def main():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await cognee.add([biography_1, biography_2])
await cognee.cognify(temporal_cognify=True)
queries = [
"What happened before 1980?",
"What happened after 2010?",
"What happened between 2000 and 2006?",
"What happened between 1903 and 1995, I am interested in the Selected Works of Arnulf Øverland Ole Peter Arnulf Øverland?",
"Who is Attaphol Buspakom Attaphol Buspakom?",
]
for query_text in queries:
search_results = await cognee.search(
query_type=SearchType.TEMPORAL,
query_text=query_text,
top_k=15,
)
print(f"Query: {query_text}")
print(f"Results: {search_results}\n")
if __name__ == "__main__":
logger = setup_logging(log_level=INFO)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())