Merge branch 'dev' into ruff-version

This commit is contained in:
Igor Ilic 2025-01-17 10:40:50 +01:00 committed by GitHub
commit be2aa9901f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -36,6 +36,7 @@ async def cognify(
datasets: Union[str, list[str]] = None, datasets: Union[str, list[str]] = None,
user: User = None, user: User = None,
graph_model: BaseModel = KnowledgeGraph, graph_model: BaseModel = KnowledgeGraph,
tasks: list[Task] = None,
): ):
if user is None: if user is None:
user = await get_default_user() user = await get_default_user()
@ -55,18 +56,19 @@ async def cognify(
awaitables = [] awaitables = []
if tasks is None:
tasks = await get_default_tasks(user, graph_model)
for dataset in datasets: for dataset in datasets:
dataset_name = generate_dataset_name(dataset.name) dataset_name = generate_dataset_name(dataset.name)
if dataset_name in existing_datasets_map: if dataset_name in existing_datasets_map:
awaitables.append(run_cognify_pipeline(dataset, user, graph_model)) awaitables.append(run_cognify_pipeline(dataset, user, tasks))
return await asyncio.gather(*awaitables) return await asyncio.gather(*awaitables)
async def run_cognify_pipeline( async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
dataset: Dataset, user: User, graph_model: BaseModel = KnowledgeGraph
):
data_documents: list[Data] = await get_dataset_data(dataset_id=dataset.id) data_documents: list[Data] = await get_dataset_data(dataset_id=dataset.id)
document_ids_str = [str(document.id) for document in data_documents] document_ids_str = [str(document.id) for document in data_documents]
@ -96,22 +98,12 @@ async def run_cognify_pipeline(
) )
try: try:
cognee_config = get_cognify_config() if not isinstance(tasks, list):
raise ValueError("Tasks must be a list")
tasks = [ for task in tasks:
Task(classify_documents), if not isinstance(task, Task):
Task(check_permissions_on_documents, user=user, permissions=["write"]), raise ValueError(f"Task {task} is not an instance of Task")
Task(extract_chunks_from_documents), # Extract text chunks based on the document type.
Task(
extract_graph_from_data, graph_model=graph_model, task_config={"batch_size": 10}
), # Generate knowledge graphs from the document chunks.
Task(
summarize_text,
summarization_model=cognee_config.summarization_model,
task_config={"batch_size": 10},
),
Task(add_data_points, only_root=True, task_config={"batch_size": 10}),
]
pipeline = run_tasks(tasks, data_documents, "cognify_pipeline") pipeline = run_tasks(tasks, data_documents, "cognify_pipeline")
@ -146,3 +138,31 @@ async def run_cognify_pipeline(
def generate_dataset_name(dataset_name: str) -> str: def generate_dataset_name(dataset_name: str) -> str:
return dataset_name.replace(".", "_").replace(" ", "_") return dataset_name.replace(".", "_").replace(" ", "_")
async def get_default_tasks(
user: User = None, graph_model: BaseModel = KnowledgeGraph
) -> list[Task]:
if user is None:
user = await get_default_user()
try:
cognee_config = get_cognify_config()
default_tasks = [
Task(classify_documents),
Task(check_permissions_on_documents, user=user, permissions=["write"]),
Task(extract_chunks_from_documents), # Extract text chunks based on the document type.
Task(
extract_graph_from_data, graph_model=graph_model, task_config={"batch_size": 10}
), # Generate knowledge graphs from the document chunks.
Task(
summarize_text,
summarization_model=cognee_config.summarization_model,
task_config={"batch_size": 10},
),
Task(add_data_points, only_root=True, task_config={"batch_size": 10}),
]
except Exception as error:
send_telemetry("cognee.cognify DEFAULT TASKS CREATION ERRORED", user.id)
raise error
return default_tasks