add docstrings any typing to cognee tasks

This commit is contained in:
hande-k 2025-01-17 10:30:34 +01:00
parent 7c3e46f14e
commit 2c351c499d
13 changed files with 130 additions and 12 deletions

View file

@ -10,7 +10,29 @@ from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
async def chunk_naive_llm_classifier( async def chunk_naive_llm_classifier(
data_chunks: list[DocumentChunk], classification_model: Type[BaseModel] data_chunks: list[DocumentChunk], classification_model: Type[BaseModel]
): ) -> list[DocumentChunk]:
"""
Classifies a list of document chunks using a specified classification model and updates vector and graph databases with the classification results.
Vector Database Structure:
- Collection Name: `classification`
- Payload Schema:
- uuid (str): Unique identifier for the classification.
- text (str): Text label of the classification.
- chunk_id (str): Identifier of the chunk associated with this classification.
- document_id (str): Identifier of the document associated with this classification.
Graph Database Structure:
- Nodes:
- Represent document chunks, classification types, and classification subtypes.
- Edges:
- `is_media_type`: Links document chunks to their classification type.
- `is_subtype_of`: Links classification subtypes to their parent type.
- `is_classified_as`: Links document chunks to their classification subtypes.
Notes:
- The function assumes that vector and graph database engines (`get_vector_engine` and `get_graph_engine`) are properly initialized and accessible.
- Classification labels are processed to ensure uniqueness using UUIDs based on their values.
"""
if len(data_chunks) == 0: if len(data_chunks) == 0:
return data_chunks return data_chunks

View file

@ -17,6 +17,12 @@ def chunk_by_paragraph(
""" """
Chunks text by paragraph while preserving exact text reconstruction capability. Chunks text by paragraph while preserving exact text reconstruction capability.
When chunks are joined with empty string "", they reproduce the original text exactly. When chunks are joined with empty string "", they reproduce the original text exactly.
Notes:
- Tokenization is handled using the `tiktoken` library, ensuring compatibility with the vector engine's embedding model.
- If `batch_paragraphs` is False, each paragraph will be yielded as a separate chunk.
- Handles cases where paragraphs exceed the specified token or word limits by splitting them as needed.
- Remaining text at the end of the input will be yielded as a final chunk.
""" """
current_chunk = "" current_chunk = ""
current_word_count = 0 current_word_count = 0

View file

@ -1,9 +1,19 @@
from uuid import uuid4 from uuid import uuid4, UUID
from typing import Optional from typing import Optional, Iterator, Tuple
from .chunk_by_word import chunk_by_word from .chunk_by_word import chunk_by_word
def chunk_by_sentence(data: str, maximum_length: Optional[int] = None): def chunk_by_sentence(
data: str, maximum_length: Optional[int] = None
) -> Iterator[Tuple[UUID, str, int, Optional[str]]]:
"""
Splits the input text into sentences based on word-level processing, with optional sentence length constraints.
Notes:
- Relies on the `chunk_by_word` function for word-level tokenization and classification.
- Ensures sentences within paragraphs are uniquely identifiable using UUIDs.
- Handles cases where the text ends mid-sentence by appending a special "sentence_cut" type.
"""
sentence = "" sentence = ""
paragraph_id = uuid4() paragraph_id = uuid4()
word_count = 0 word_count = 0

View file

@ -1,4 +1,6 @@
import re import re
from typing import Iterator, Tuple
SENTENCE_ENDINGS = r"[.;!?…]" SENTENCE_ENDINGS = r"[.;!?…]"
PARAGRAPH_ENDINGS = r"[\n\r]" PARAGRAPH_ENDINGS = r"[\n\r]"
@ -34,7 +36,7 @@ def is_real_paragraph_end(last_char: str, current_pos: int, text: str) -> bool:
return False return False
def chunk_by_word(data: str): def chunk_by_word(data: str) -> Iterator[Tuple[str, str]]:
""" """
Chunks text into words and endings while preserving whitespace. Chunks text into words and endings while preserving whitespace.
Whitespace is included with the preceding word. Whitespace is included with the preceding word.

View file

@ -3,11 +3,19 @@ from cognee.infrastructure.databases.vector import get_vector_engine
async def query_chunks(query: str) -> list[dict]: async def query_chunks(query: str) -> list[dict]:
""" """
Queries the vector database to retrieve chunks related to the given query string.
Parameters: Parameters:
- query (str): The query string to filter nodes by. - query (str): The query string to filter nodes by.
Returns: Returns:
- list(dict): A list of objects providing information about the chunks related to query. - list(dict): A list of objects providing information about the chunks related to query.
Notes:
- The function uses the `search` method of the vector engine to find matches.
- Limits the results to the top 5 matching chunks to balance performance and relevance.
- Ensure that the vector database is properly initialized and contains the "document_chunk_text" collection.
""" """
vector_engine = get_vector_engine() vector_engine = get_vector_engine()

View file

@ -3,6 +3,14 @@ from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
async def remove_disconnected_chunks(data_chunks: list[DocumentChunk]) -> list[DocumentChunk]: async def remove_disconnected_chunks(data_chunks: list[DocumentChunk]) -> list[DocumentChunk]:
"""
Removes disconnected or obsolete chunks from the graph database.
Notes:
- Obsolete chunks are defined as chunks with no "next_chunk" predecessor.
- Fully disconnected nodes are identified and deleted separately.
- This function assumes that the graph database is properly initialized and accessible.
"""
graph_engine = await get_graph_engine() graph_engine = await get_graph_engine()
document_ids = set((data_chunk.document_id for data_chunk in data_chunks)) document_ids = set((data_chunk.document_id for data_chunk in data_chunks))

View file

@ -6,6 +6,10 @@ from cognee.modules.retrieval.brute_force_triplet_search import brute_force_trip
def retrieved_edges_to_string(retrieved_edges: list) -> str: def retrieved_edges_to_string(retrieved_edges: list) -> str:
"""
Converts a list of retrieved graph edges into a human-readable string format.
"""
edge_strings = [] edge_strings = []
for edge in retrieved_edges: for edge in retrieved_edges:
node1_string = edge.node1.attributes.get("text") or edge.node1.attributes.get("name") node1_string = edge.node1.attributes.get("text") or edge.node1.attributes.get("name")
@ -18,11 +22,19 @@ def retrieved_edges_to_string(retrieved_edges: list) -> str:
async def graph_query_completion(query: str) -> list: async def graph_query_completion(query: str) -> list:
""" """
Executes a query on the graph database and retrieves a relevant completion based on the found data.
Parameters: Parameters:
- query (str): The query string to compute. - query (str): The query string to compute.
Returns: Returns:
- list: Answer to the query. - list: Answer to the query.
Notes:
- The `brute_force_triplet_search` is used to retrieve relevant graph data.
- Prompts are dynamically rendered and provided to the LLM for contextual understanding.
- Ensure that the LLM client and graph database are properly configured and accessible.
""" """
found_triplets = await brute_force_triplet_search(query, top_k=5) found_triplets = await brute_force_triplet_search(query, top_k=5)

View file

@ -6,11 +6,20 @@ from cognee.infrastructure.llm.prompts import read_query_prompt, render_prompt
async def query_completion(query: str) -> list: async def query_completion(query: str) -> list:
""" """
Executes a query against a vector database and computes a relevant response using an LLM.
Parameters: Parameters:
- query (str): The query string to compute. - query (str): The query string to compute.
Returns: Returns:
- list: Answer to the query. - list: Answer to the query.
Notes:
- Limits the search to the top 1 matching chunk for simplicity and relevance.
- Ensure that the vector database and LLM client are properly configured and accessible.
- The response model used for the LLM output is expected to be a string.
""" """
vector_engine = get_vector_engine() vector_engine = get_vector_engine()

View file

@ -1,8 +1,19 @@
from cognee.modules.data.processing.document_types import Document from cognee.modules.data.processing.document_types import Document
from cognee.modules.users.permissions.methods import check_permission_on_documents from cognee.modules.users.permissions.methods import check_permission_on_documents
from typing import List
async def check_permissions_on_documents(documents: list[Document], user, permissions): async def check_permissions_on_documents(
documents: list[Document], user, permissions
) -> List[Document]:
"""
Validates a user's permissions on a list of documents.
Notes:
- This function assumes that `check_permission_on_documents` raises an exception if the permission check fails.
- It is designed to validate multiple permissions in a sequential manner for the same set of documents.
- Ensure that the `Document` and `user` objects conform to the expected structure and interfaces.
"""
document_ids = [document.id for document in documents] document_ids = [document.id for document in documents]
for permission in permissions: for permission in permissions:

View file

@ -50,6 +50,13 @@ EXTENSION_TO_DOCUMENT_CLASS = {
async def classify_documents(data_documents: list[Data]) -> list[Document]: async def classify_documents(data_documents: list[Data]) -> list[Document]:
"""
Classifies a list of data items into specific document types based on file extensions.
Notes:
- The function relies on `get_metadata` to retrieve metadata information for each data item.
- Ensure the `Data` objects and their attributes (e.g., `extension`, `id`) are valid before calling this function.
"""
documents = [] documents = []
for data_item in data_documents: for data_item in data_documents:
metadata = await get_metadata(data_item.id) metadata = await get_metadata(data_item.id)

View file

@ -1,4 +1,4 @@
from typing import Optional from typing import Optional, AsyncGenerator
from cognee.modules.data.processing.document_types.Document import Document from cognee.modules.data.processing.document_types.Document import Document
@ -8,7 +8,14 @@ async def extract_chunks_from_documents(
chunk_size: int = 1024, chunk_size: int = 1024,
chunker="text_chunker", chunker="text_chunker",
max_tokens: Optional[int] = None, max_tokens: Optional[int] = None,
): ) -> AsyncGenerator:
"""
Extracts chunks of data from a list of documents based on the specified chunking parameters.
Notes:
- The `read` method of the `Document` class must be implemented to support the chunking operation.
- The `chunker` parameter determines the chunking logic and should align with the document type.
"""
for document in documents: for document in documents:
for document_chunk in document.read( for document_chunk in document.read(
chunk_size=chunk_size, chunker=chunker, max_tokens=max_tokens chunk_size=chunk_size, chunker=chunker, max_tokens=max_tokens

View file

@ -1,12 +1,21 @@
import asyncio import asyncio
from typing import Type from typing import Type, List
from pydantic import BaseModel from pydantic import BaseModel
from cognee.modules.data.extraction.knowledge_graph import extract_content_graph from cognee.modules.data.extraction.knowledge_graph import extract_content_graph
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
from cognee.tasks.storage import add_data_points from cognee.tasks.storage import add_data_points
async def extract_graph_from_code(data_chunks: list[DocumentChunk], graph_model: Type[BaseModel]): async def extract_graph_from_code(
data_chunks: list[DocumentChunk], graph_model: Type[BaseModel]
) -> List[DocumentChunk]:
"""
Extracts a knowledge graph from the text content of document chunks using a specified graph model.
Notes:
- The `extract_content_graph` function processes each chunk's text to extract graph information.
- Graph nodes are stored using the `add_data_points` function for later retrieval or analysis.
"""
chunk_graphs = await asyncio.gather( chunk_graphs = await asyncio.gather(
*[extract_content_graph(chunk.text, graph_model) for chunk in data_chunks] *[extract_content_graph(chunk.text, graph_model) for chunk in data_chunks]
) )

View file

@ -1,5 +1,5 @@
import asyncio import asyncio
from typing import Type from typing import Type, List
from pydantic import BaseModel from pydantic import BaseModel
@ -13,7 +13,14 @@ from cognee.modules.graph.utils import (
from cognee.tasks.storage import add_data_points from cognee.tasks.storage import add_data_points
async def extract_graph_from_data(data_chunks: list[DocumentChunk], graph_model: Type[BaseModel]): async def extract_graph_from_data(
data_chunks: list[DocumentChunk], graph_model: Type[BaseModel]
) -> List[DocumentChunk]:
"""
Extracts and integrates a knowledge graph from the text content of document chunks using a specified graph model.
"""
chunk_graphs = await asyncio.gather( chunk_graphs = await asyncio.gather(
*[extract_content_graph(chunk.text, graph_model) for chunk in data_chunks] *[extract_content_graph(chunk.text, graph_model) for chunk in data_chunks]
) )