Updating cognify pipeline documentation (#181)
* Updating cognify pipeline documentation * typo fix * Update docs/pipelines.md Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * removing a minor confusing part --------- Co-authored-by: Boris <boris@topoteretes.com> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
This commit is contained in:
parent
058b1da3ed
commit
4dbf559a59
2 changed files with 81 additions and 243 deletions
81
docs/pipelines.md
Normal file
81
docs/pipelines.md
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
# PIPELINES
|
||||
|
||||
Cognee uses [tasks](https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/tasks/Task.py) grouped into pipelines that populate graph and vector stores. [These tasks](https://github.com/topoteretes/cognee/tree/main/cognee/tasks) analyze and enrich data, enhancing the quality of answers produced by Large Language Models (LLMs).
|
||||
|
||||
The tasks are managed and executed asynchronously using the `run_tasks` and `run_tasks_parallel` functions.
|
||||
|
||||
```python
|
||||
pipeline = run_tasks(tasks, documents)
|
||||
async for result in pipeline:
|
||||
print(result)
|
||||
```
|
||||
|
||||
## Main pipeline: [cognee.cognify](https://github.com/topoteretes/cognee/blob/168cb5d1bf1964b5b0c645b2f3d8638d84554fda/cognee/api/v1/cognify/cognify_v2.py#L38)
|
||||
|
||||
This is the main pipeline currently implemented in cognee. It is designed to process data in a structured way and populate the graph and vector stores.
|
||||
|
||||
|
||||
This function is the entry point for processing datasets. It handles dataset retrieval, user authorization, and manages the execution of a pipeline of tasks that process documents.
|
||||
|
||||
### Parameters
|
||||
|
||||
- `datasets: Union[str, list[str]] = None`: A string or list of dataset names to be processed.
|
||||
- `user: User = None`: The user requesting the processing. If not provided, the default user is retrieved.
|
||||
|
||||
### Steps in the Function
|
||||
|
||||
#### User Authentication
|
||||
|
||||
```python
|
||||
if user is None:
|
||||
user = await get_default_user()
|
||||
```
|
||||
|
||||
If no user is provided, the function retrieves the default user.
|
||||
|
||||
#### Handling Empty or String Dataset Input
|
||||
|
||||
```python
|
||||
existing_datasets = await get_datasets(user.id)
|
||||
if datasets is None or len(datasets) == 0:
|
||||
datasets = existing_datasets
|
||||
if type(datasets[0]) == str:
|
||||
datasets = await get_datasets_by_name(datasets, user.id)
|
||||
```
|
||||
|
||||
If no datasets are provided, the function retrieves all datasets owned by the user. If a list of dataset names (strings) is provided, they are converted into dataset objects.
|
||||
|
||||
#### Selecting datasets from the input list that are owned by the user
|
||||
|
||||
```python
|
||||
existing_datasets_map = {
|
||||
generate_dataset_name(dataset.name): True for dataset in existing_datasets
|
||||
}
|
||||
```
|
||||
|
||||
#### Run Cognify Pipeline for Each Dataset
|
||||
|
||||
```python
|
||||
awaitables = []
|
||||
|
||||
for dataset in datasets:
|
||||
dataset_name = generate_dataset_name(dataset.name)
|
||||
|
||||
if dataset_name in existing_datasets_map:
|
||||
awaitables.append(run_cognify_pipeline(dataset, user))
|
||||
|
||||
return await asyncio.gather(*awaitables)
|
||||
|
||||
The `run_cognify_pipeline` function is defined within `cognify` and is responsible for processing a single dataset. This is where most of the heavy lifting occurs. The function processes multiple datasets concurrently using `asyncio.gather`.
|
||||
|
||||
|
||||
#### Pipeline Tasks
|
||||
|
||||
The pipeline consists of several tasks, each responsible for different parts of the processing:
|
||||
|
||||
- `classify_documents`: Converts each of the documents into one of the specific Document types: PdfDocument, AudioDocument, ImageDocument or TextDocument
|
||||
- `check_permissions_on_documents`: Checks if the user has the necessary permissions to access the documents. In this case, it checks for "write" permission.
|
||||
- `extract_chunks_from_documents`: Extracts text chunks based on the document type.
|
||||
- `add_data_points`: Creates nodes and edges from the chunks and their properties. Adds them to the graph engine.
|
||||
- `extract_graph_from_data`: Generates knowledge graphs from the document chunks.
|
||||
- `summarize_text`: Extracts a summary for each chunk using an llm.
|
||||
|
|
@ -1,243 +0,0 @@
|
|||
# TASKS
|
||||
|
||||
!!! tip "cognee uses tasks grouped into pipelines to populate graph and vector stores"
|
||||
|
||||
|
||||
Cognee organizes tasks into pipelines that populate graph and vector stores. These tasks analyze and enrich data, enhancing the quality of answers produced by Large Language Models (LLMs).
|
||||
|
||||
This section provides a template to help you structure your data and build pipelines. \
|
||||
These tasks serve as a starting point for using Cognee to create reliable LLM pipelines.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
## Task 1: Category Extraction
|
||||
|
||||
Data enrichment is the process of enhancing raw data with additional information to make it more valuable. This template is a sample task that extracts categories from a document and populates a graph with the extracted categories.
|
||||
|
||||
Let's go over the steps to use this template [full code provided here](https://github.com/topoteretes/cognee/blob/main/cognee/tasks/chunk_naive_llm_classifier/chunk_naive_llm_classifier.py):
|
||||
|
||||
|
||||
This function is designed to classify chunks of text using a specified language model. The goal is to categorize the text, map relationships, and store the results in a vector engine and a graph engine. The function is asynchronous, allowing for concurrent execution of tasks like classification and data point creation.
|
||||
|
||||
### Parameters
|
||||
|
||||
- `data_chunks: list[DocumentChunk]`: A list of text chunks to be classified. Each chunk represents a piece of text and includes metadata like `chunk_id` and `document_id`.
|
||||
- `classification_model: Type[BaseModel]`: The model used to classify each chunk of text. This model is expected to output labels that categorize the text.
|
||||
|
||||
### Steps in the Function
|
||||
|
||||
#### Check for Empty Input
|
||||
|
||||
```python
|
||||
if len(data_chunks) == 0:
|
||||
return data_chunks
|
||||
```
|
||||
|
||||
If there are no data chunks provided, the function returns immediately with the input list (which is empty).
|
||||
|
||||
#### Classify Each Chunk
|
||||
|
||||
```python
|
||||
chunk_classifications = await asyncio.gather(
|
||||
*[extract_categories(chunk.text, classification_model) for chunk in data_chunks],
|
||||
)
|
||||
```
|
||||
|
||||
The function uses `asyncio.gather` to concurrently classify each chunk of text. `extract_categories` is called for each chunk, and the results are collected in `chunk_classifications`.
|
||||
|
||||
#### Initialize Data Structures
|
||||
|
||||
```python
|
||||
classification_data_points = []
|
||||
```
|
||||
|
||||
A list is initialized to store the classification data points that will be used later for mapping relationships and storing in the vector engine.
|
||||
|
||||
#### Generate UUIDs for Classifications
|
||||
|
||||
The function loops through each chunk and generates unique identifiers (UUIDs) for both the main classification type and its subclasses:
|
||||
|
||||
```python
|
||||
classification_data_points.append(uuid5(NAMESPACE_OID, chunk_classification.label.type))
|
||||
classification_data_points.append(uuid5(NAMESPACE_OID, classification_subclass.value))
|
||||
```
|
||||
|
||||
These UUIDs are used to uniquely identify classifications and ensure consistency.
|
||||
|
||||
#### Retrieve or Create Vector Collection
|
||||
|
||||
```python
|
||||
vector_engine = get_vector_engine()
|
||||
collection_name = "classification"
|
||||
```
|
||||
|
||||
The function interacts with a vector engine. It checks if the collection named "classification" exists. If it does, it retrieves existing data points to avoid duplicates. Otherwise, it creates the collection.
|
||||
|
||||
#### Prepare Data Points, Nodes, and Edges
|
||||
|
||||
The function then builds a list of `data_points` (representing the classification results) and constructs nodes and edges to represent relationships between chunks and their classifications:
|
||||
|
||||
```python
|
||||
data_points.append(DataPoint[Keyword](...))
|
||||
nodes.append((...))
|
||||
edges.append((...))
|
||||
```
|
||||
|
||||
- **Nodes**: Represent classifications (e.g., media type, subtype).
|
||||
- **Edges**: Represent relationships between chunks and classifications (e.g., "is_media_type", "is_subtype_of").
|
||||
|
||||
#### Create Data Points and Relationships
|
||||
|
||||
If there are new nodes or edges to add, the function stores the data points in the vector engine and updates the graph engine with the new nodes and edges:
|
||||
|
||||
```python
|
||||
await vector_engine.create_data_points(collection_name, data_points)
|
||||
await graph_engine.add_nodes(nodes)
|
||||
await graph_engine.add_edges(edges)
|
||||
```
|
||||
|
||||
#### Return the Processed Chunks
|
||||
|
||||
Finally, the function returns the processed `data_chunks`, which can now be used further as needed:
|
||||
|
||||
```python
|
||||
return data_chunks
|
||||
```
|
||||
|
||||
## Pipeline 1: cognee pipeline
|
||||
|
||||
This is the main pipeline currently implemented in cognee. It is designed to process data in a structured way and populate the graph and vector stores with the results
|
||||
|
||||
|
||||
This function is the entry point for processing datasets. It handles dataset retrieval, user authorization, and manages the execution of a pipeline of tasks that process documents.
|
||||
|
||||
### Parameters
|
||||
|
||||
- `datasets: Union[str, list[str]] = None`: A string or list of dataset names to be processed.
|
||||
- `user: User = None`: The user requesting the processing. If not provided, the default user is retrieved.
|
||||
|
||||
### Steps in the Function
|
||||
|
||||
#### Database Engine Initialization
|
||||
|
||||
```python
|
||||
db_engine = get_relational_engine()
|
||||
```
|
||||
|
||||
The function starts by getting an instance of the relational database engine, which is used to retrieve datasets and other necessary data.
|
||||
|
||||
#### Handle Empty or String Dataset Input
|
||||
|
||||
```python
|
||||
if datasets is None or len(datasets) == 0:
|
||||
return await cognify(await db_engine.get_datasets())
|
||||
if type(datasets[0]) == str:
|
||||
datasets = await retrieve_datasets(datasets)
|
||||
```
|
||||
|
||||
If no datasets are provided, the function retrieves all available datasets from the database. If a list of dataset names (strings) is provided, they are converted into dataset objects.
|
||||
|
||||
#### User Authentication
|
||||
|
||||
```python
|
||||
if user is None:
|
||||
user = await get_default_user()
|
||||
```
|
||||
|
||||
If no user is provided, the function retrieves the default user.
|
||||
|
||||
#### Run Cognify Pipeline for Each Dataset
|
||||
|
||||
```python
|
||||
async def run_cognify_pipeline(dataset: Dataset):
|
||||
# Pipeline logic goes here...
|
||||
```
|
||||
|
||||
The `run_cognify_pipeline` function is defined within `cognify` and is responsible for processing a single dataset. This is where most of the heavy lifting occurs.
|
||||
|
||||
#### Retrieve Dataset Data
|
||||
|
||||
The function fetches all the data associated with the dataset.
|
||||
|
||||
```python
|
||||
data: list[Data] = await get_dataset_data(dataset_id=dataset.id)
|
||||
```
|
||||
|
||||
#### Create Document Objects
|
||||
|
||||
Based on the file type (e.g., PDF, Audio, Image, Text), corresponding document objects are created.
|
||||
|
||||
```python
|
||||
documents = [...]
|
||||
```
|
||||
|
||||
#### Check Permissions
|
||||
|
||||
The user's permissions are checked to ensure they can access the documents.
|
||||
|
||||
```python
|
||||
await check_permissions_on_documents(user, "read", document_ids)
|
||||
```
|
||||
|
||||
#### Pipeline Status Logging
|
||||
|
||||
The function logs the start and end of the pipeline processing.
|
||||
|
||||
```python
|
||||
async with update_status_lock:
|
||||
task_status = await get_pipeline_status([dataset_id])
|
||||
if dataset_id in task_status and task_status[dataset_id] == "DATASET_PROCESSING_STARTED":
|
||||
logger.info("Dataset %s is already being processed.", dataset_name)
|
||||
return
|
||||
await log_pipeline_status(dataset_id, "DATASET_PROCESSING_STARTED", {...})
|
||||
```
|
||||
|
||||
#### Pipeline Tasks
|
||||
|
||||
The pipeline consists of several tasks, each responsible for different parts of the processing:
|
||||
|
||||
- `document_to_ontology`: Maps documents to an ontology structure.
|
||||
- `source_documents_to_chunks`: Splits documents into chunks.
|
||||
- `chunk_to_graph_decomposition`: Defines the graph structure for chunks.
|
||||
- `chunks_into_graph`: Integrates chunks into the knowledge graph.
|
||||
- `chunk_update_check`: Checks for updated or new chunks.
|
||||
- `save_chunks_to_store`: Saves chunks to a vector store and graph database.
|
||||
|
||||
Parallel Tasks: `chunk_extract_summary` and `chunk_naive_llm_classifier` run in parallel to summarize and classify chunks.
|
||||
|
||||
- `chunk_remove_disconnected`: Cleans up obsolete chunks.
|
||||
|
||||
The tasks are managed and executed asynchronously using the `run_tasks` and `run_tasks_parallel` functions.
|
||||
|
||||
```python
|
||||
pipeline = run_tasks(tasks, documents)
|
||||
async for result in pipeline:
|
||||
print(result)
|
||||
```
|
||||
|
||||
#### Handle Errors
|
||||
|
||||
If any errors occur during processing, they are logged, and the exception is raised.
|
||||
|
||||
```python
|
||||
except Exception as error:
|
||||
await log_pipeline_status(dataset_id, "DATASET_PROCESSING_ERROR", {...})
|
||||
raise error
|
||||
```
|
||||
|
||||
#### Processing Multiple Datasets
|
||||
|
||||
The function prepares to process multiple datasets concurrently using `asyncio.gather`.
|
||||
|
||||
```python
|
||||
awaitables = []
|
||||
for dataset in datasets:
|
||||
dataset_name = generate_dataset_name(dataset.name)
|
||||
if dataset_name in existing_datasets:
|
||||
awaitables.append(run_cognify_pipeline(dataset))
|
||||
return await asyncio.gather(*awaitables)
|
||||
```
|
||||
Loading…
Add table
Reference in a new issue