Merge branch 'main' into feature/cog-717-create-edge-embeddings-in-vector-databases

This commit is contained in:
hajdul88 2024-12-05 09:08:37 +01:00 committed by GitHub
commit 68c3f42ab8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 65 additions and 0 deletions

View file

@ -0,0 +1,4 @@
from .build_graph_with_temporal_awareness import \
build_graph_with_temporal_awareness
from .search_graph_with_temporal_awareness import \
search_graph_with_temporal_awareness

View file

@ -0,0 +1,26 @@
import os
from datetime import datetime
from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeType
async def build_graph_with_temporal_awareness(text_list):
url = os.getenv("GRAPH_DATABASE_URL")
password = os.getenv("GRAPH_DATABASE_PASSWORD")
graphiti = Graphiti(url, "neo4j", password)
await graphiti.build_indices_and_constraints()
print("Graph database initialized.")
for i, text in enumerate(text_list):
await graphiti.add_episode(
name=f"episode_{i}",
episode_body=text,
source=EpisodeType.text,
source_description="input",
reference_time=datetime.now()
)
print(f"Added text: {text[:35]}...")
return graphiti

View file

@ -0,0 +1,6 @@
async def search_graph_with_temporal_awareness(graphiti, query):
search_result = await graphiti.search(query)
await graphiti.close()
return search_result

View file

@ -0,0 +1,29 @@
import asyncio
import cognee
from cognee.api.v1.search import SearchType
from cognee.modules.pipelines import Task, run_tasks
from cognee.tasks.temporal_awareness import (
build_graph_with_temporal_awareness, search_graph_with_temporal_awareness)
text_list = [
"Kamala Harris is the Attorney General of California. She was previously "
"the district attorney for San Francisco.",
"As AG, Harris was in office from January 3, 2011 January 3, 2017",
]
async def main():
tasks = [
Task(build_graph_with_temporal_awareness, text_list=text_list),
Task(search_graph_with_temporal_awareness, query='Who was the California Attorney General?')
]
pipeline = run_tasks(tasks)
async for result in pipeline:
print(result)
if __name__ == '__main__':
asyncio.run(main())