Feat: cog 1491 pipeline steps in eval (#641)

<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->
- Created get_default_tasks_by_indices to filter default tasks by
specific indices
- Added get_no_summary_tasks function to skip summarization tasks
- Added get_just_chunks_tasks function for chunk extraction and data
points only
- Added NO_SUMMARIES and JUST_CHUNKS to the TaskGetters enum
## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- The evaluation configuration now includes expanded task retrieval
options. Users can choose customized modes that bypass summarization or
focus solely on extracting data chunks, offering a more tailored
evaluation experience.
- Enhanced asynchronous task processing brings increased flexibility and
smoother performance during task selection.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
lxobr 2025-03-14 14:20:39 +01:00 committed by GitHub
parent f206edb83c
commit cad9e0ce44
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 41 additions and 18 deletions

View file

@ -2,7 +2,6 @@ import cognee
import logging
from typing import Optional, Tuple, List, Dict, Union, Any, Callable, Awaitable
from cognee.eval_framework.corpus_builder.task_getters.TaskGetters import TaskGetters
from cognee.eval_framework.benchmark_adapters.benchmark_adapters import BenchmarkAdapter
from cognee.modules.chunking.TextChunker import TextChunker
from cognee.modules.pipelines.tasks.Task import Task

View file

@ -2,8 +2,6 @@ import logging
import json
from typing import List
from unstructured.chunking.dispatch import chunk
from cognee.infrastructure.files.storage import LocalStorage
from cognee.eval_framework.corpus_builder.corpus_builder_executor import CorpusBuilderExecutor
from cognee.modules.data.models.questions_base import QuestionsBase

View file

@ -5,6 +5,10 @@ from cognee.modules.pipelines.tasks.Task import Task
from cognee.eval_framework.corpus_builder.task_getters.get_cascade_graph_tasks import (
get_cascade_graph_tasks,
)
from cognee.eval_framework.corpus_builder.task_getters.get_default_tasks_by_indices import (
get_no_summary_tasks,
get_just_chunks_tasks,
)
class TaskGetters(Enum):
@ -12,6 +16,8 @@ class TaskGetters(Enum):
DEFAULT = ("Default", get_default_tasks)
CASCADE_GRAPH = ("CascadeGraph", get_cascade_graph_tasks)
NO_SUMMARIES = ("NoSummaries", get_no_summary_tasks)
JUST_CHUNKS = ("JustChunks", get_just_chunks_tasks)
def __new__(cls, getter_name: str, getter_func: Callable[..., Awaitable[List[Task]]]):
obj = object.__new__(cls)

View file

@ -1,14 +0,0 @@
from cognee.api.v1.cognify.cognify_v2 import get_default_tasks
from typing import List
from cognee.eval_framework.corpus_builder.task_getters.base_task_getter import BaseTaskGetter
from cognee.modules.pipelines.tasks.Task import Task
from cognee.infrastructure.llm import get_max_chunk_tokens
from cognee.modules.chunking.TextChunker import TextChunker
class DefaultTaskGetter(BaseTaskGetter):
"""Default task getter that retrieves tasks using the standard get_default_tasks function."""
async def get_tasks(self, chunk_size=1024, chunker=TextChunker) -> List[Task]:
"""Retrieve default tasks asynchronously."""
return await get_default_tasks(chunk_size=chunk_size, chunker=chunker)

View file

@ -0,0 +1,32 @@
from typing import List, Awaitable, Optional
from cognee.api.v1.cognify.cognify_v2 import get_default_tasks
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.chunking.TextChunker import TextChunker
async def get_default_tasks_by_indices(
indices: List[int], chunk_size: int = None, chunker=TextChunker
) -> List[Task]:
"""Returns default tasks filtered by the provided indices."""
all_tasks = await get_default_tasks(chunker=chunker, chunk_size=chunk_size)
if any(i < 0 or i >= len(all_tasks) for i in indices):
raise IndexError(
f"Task indices {indices} out of range. Valid range: 0-{len(all_tasks) - 1}"
)
return [all_tasks[i] for i in indices]
async def get_no_summary_tasks(chunk_size: int = None, chunker=TextChunker) -> List[Task]:
"""Returns default tasks without summarization tasks."""
# Default tasks indices: 0=classify, 1=check_permissions, 2=extract_chunks, 3=extract_graph, 4=summarize, 5=add_data_points
return await get_default_tasks_by_indices(
[0, 1, 2, 3, 5], chunk_size=chunk_size, chunker=chunker
)
async def get_just_chunks_tasks(chunk_size: int = None, chunker=TextChunker) -> List[Task]:
"""Returns default tasks with only chunk extraction and data points addition."""
# Default tasks indices: 0=classify, 1=check_permissions, 2=extract_chunks, 3=extract_graph, 4=summarize, 5=add_data_points
return await get_default_tasks_by_indices([0, 1, 2, 5], chunk_size=chunk_size, chunker=chunker)

View file

@ -8,7 +8,9 @@ class EvalConfig(BaseSettings):
building_corpus_from_scratch: bool = True
number_of_samples_in_corpus: int = 1
benchmark: str = "Dummy" # Options: 'HotPotQA', 'Dummy', 'TwoWikiMultiHop'
task_getter_type: str = "Default" # Options: 'Default', 'CascadeGraph'
task_getter_type: str = (
"Default" # Options: 'Default', 'CascadeGraph', 'NoSummaries', 'JustChunks'
)
# Question answering params
answering_questions: bool = True