feat: Implements pipeline structure for retrievers

This commit is contained in:
hajdul88 2024-11-19 11:14:42 +01:00
parent 2331739e07
commit c4850f64dc
6 changed files with 84 additions and 9 deletions

View file

View file

View file

@ -0,0 +1,25 @@
from uuid import UUID
from enum import Enum
from typing import Callable, Dict
from cognee.shared.utils import send_telemetry
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.permissions.methods import get_document_ids_for_user
async def two_step_retriever(query: Dict[str, str], user: User = None) -> list:
if user is None:
user = await get_default_user()
if user is None:
raise PermissionError("No user found in the system. Please create a user.")
own_document_ids = await get_document_ids_for_user(user.id)
retrieved_results = await diffusion_retriever(query, user)
filtered_search_results = []
return retrieved_results
async def diffusion_retriever(query: str, user, community_filter = []) -> list:
raise(NotImplementedError)

View file

@ -0,0 +1,25 @@
from uuid import UUID
from enum import Enum
from typing import Callable, Dict
from cognee.shared.utils import send_telemetry
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.permissions.methods import get_document_ids_for_user
async def two_step_retriever(query: Dict[str, str], user: User = None) -> list:
if user is None:
user = await get_default_user()
if user is None:
raise PermissionError("No user found in the system. Please create a user.")
own_document_ids = await get_document_ids_for_user(user.id)
retrieved_results = await g_retriever(query, user)
filtered_search_results = []
return retrieved_results
async def g_retriever(query: str, user, community_filter = []) -> list:
raise(NotImplementedError)

View file

@ -0,0 +1,26 @@
from uuid import UUID
from enum import Enum
from typing import Callable, Dict
from cognee.shared.utils import send_telemetry
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.permissions.methods import get_document_ids_for_user
async def two_step_retriever(query: Dict[str, str], user: User = None) -> list:
if user is None:
user = await get_default_user()
if user is None:
raise PermissionError("No user found in the system. Please create a user.")
own_document_ids = await get_document_ids_for_user(user.id)
retrieved_results = await run_two_step_retriever(query, user)
filtered_search_results = []
return retrieved_results
async def run_two_step_retriever(query: str, user, community_filter = []) -> list:
raise(NotImplementedError)

View file

@ -1,6 +1,6 @@
import cognee
import asyncio
from cognee.api.v1.search import SearchType
from cognee.pipelines.retriever.two_steps_retriever import two_step_retriever
job_position = """0:Senior Data Scientist (Machine Learning)
@ -206,9 +206,8 @@ async def main(enable_steps):
print("Knowledge graph created.")
# Step 4: Query insights
if enable_steps.get("search_insights"):
search_results = await cognee.search(
SearchType.INSIGHTS,
if enable_steps.get("retriever"):
search_results = await two_step_retriever(
{'query': 'Which applicant has the most relevant experience in data science?'}
)
print("Search results:")
@ -219,11 +218,11 @@ async def main(enable_steps):
if __name__ == '__main__':
# Flags to enable/disable steps
steps_to_enable = {
"prune_data": True,
"prune_system": True,
"add_text": True,
"cognify": True,
"search_insights": True
"prune_data": False,
"prune_system": False,
"add_text": False,
"cognify": False,
"retriever": True
}
asyncio.run(main(steps_to_enable))