diff --git a/.github/workflows/auto-comment.yml b/.github/workflows/auto-comment.yml index aaba8893e..f38948f94 100644 --- a/.github/workflows/auto-comment.yml +++ b/.github/workflows/auto-comment.yml @@ -64,7 +64,7 @@ jobs: # Separate action for merged PRs - name: Handle Merged Pull Requests if: github.event.pull_request.merged == true - uses: actions-cool/pr-welcome@v1.2.1 + uses: actions-cool/pr-welcome@v1.4.0 with: token: ${{ secrets.GH_TOKEN }} comment: | diff --git a/.github/workflows/test_neo4j.yml b/.github/workflows/test_neo4j.yml index 47d928fd9..55b0f4ee4 100644 --- a/.github/workflows/test_neo4j.yml +++ b/.github/workflows/test_neo4j.yml @@ -1,10 +1,11 @@ name: test | neo4j on: + workflow_dispatch: pull_request: branches: - main - workflow_dispatch: + types: [labeled] concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} @@ -21,7 +22,7 @@ jobs: run_neo4j_integration_test: name: test needs: get_docs_changes - if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' + if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' | ${{ github.event.label.name == 'run-checks' }} runs-on: ubuntu-latest defaults: diff --git a/.github/workflows/test_notebook.yml b/.github/workflows/test_notebook.yml index 5e57ada02..20f51a6e2 100644 --- a/.github/workflows/test_notebook.yml +++ b/.github/workflows/test_notebook.yml @@ -1,10 +1,12 @@ name: test | notebook on: + workflow_dispatch: pull_request: branches: - main - workflow_dispatch: + types: [labeled] + concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} @@ -21,7 +23,7 @@ jobs: run_notebook_test: name: test needs: get_docs_changes - if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' + if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' | ${{ github.event.label.name == 'run-checks' }} runs-on: ubuntu-latest defaults: run: diff --git a/.github/workflows/test_pgvector.yml b/.github/workflows/test_pgvector.yml index 913d249e2..c9dfc2c35 100644 --- a/.github/workflows/test_pgvector.yml +++ b/.github/workflows/test_pgvector.yml @@ -1,10 +1,12 @@ name: test | pgvector on: + workflow_dispatch: pull_request: branches: - main - workflow_dispatch: + types: [labeled] + concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} @@ -21,7 +23,7 @@ jobs: run_pgvector_integration_test: name: test needs: get_docs_changes - if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' + if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' | ${{ github.event.label.name == 'run-checks' }} runs-on: ubuntu-latest defaults: run: diff --git a/.github/workflows/test_qdrant.yml b/.github/workflows/test_qdrant.yml index af9f66257..595325672 100644 --- a/.github/workflows/test_qdrant.yml +++ b/.github/workflows/test_qdrant.yml @@ -1,10 +1,12 @@ name: test | qdrant on: + workflow_dispatch: pull_request: branches: - main - workflow_dispatch: + types: [labeled] + concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} @@ -21,7 +23,7 @@ jobs: run_qdrant_integration_test: name: test needs: get_docs_changes - if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' + if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' | ${{ github.event.label.name == 'run-checks' }} runs-on: ubuntu-latest defaults: diff --git a/.github/workflows/test_weaviate.yml b/.github/workflows/test_weaviate.yml index cbdd28cef..9353d1747 100644 --- a/.github/workflows/test_weaviate.yml +++ b/.github/workflows/test_weaviate.yml @@ -1,10 +1,12 @@ name: test | weaviate on: + workflow_dispatch: pull_request: branches: - main - workflow_dispatch: + types: [labeled] + concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} @@ -21,7 +23,7 @@ jobs: run_weaviate_integration_test: name: test needs: get_docs_changes - if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' + if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' | ${{ github.event.label.name == 'run-checks' }} runs-on: ubuntu-latest defaults: diff --git a/README.md b/README.md index b34c818ef..9ce92e80e 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,11 @@ If you have questions, join our Discord pip install cognee ``` +### With pip with PostgreSQL support + +```bash +pip install 'cognee[postgres]' +``` ### With poetry diff --git a/cognee/base_config.py b/cognee/base_config.py index d2245ef6e..0e70b7652 100644 --- a/cognee/base_config.py +++ b/cognee/base_config.py @@ -1,3 +1,4 @@ +import os from typing import Optional from functools import lru_cache from pydantic_settings import BaseSettings, SettingsConfigDict @@ -7,8 +8,8 @@ from cognee.shared.data_models import MonitoringTool class BaseConfig(BaseSettings): data_root_directory: str = get_absolute_path(".data_storage") monitoring_tool: object = MonitoringTool.LANGFUSE - graphistry_username: Optional[str] = None - graphistry_password: Optional[str] = None + graphistry_username: Optional[str] = os.getenv("GRAPHISTRY_USERNAME") + graphistry_password: Optional[str] = os.getenv("GRAPHISTRY_PASSWORD") model_config = SettingsConfigDict(env_file = ".env", extra = "allow") diff --git a/cognee/tasks/dataset_generate_golden_set/__init__.py b/cognee/tasks/dataset_generate_golden_set/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cognee/tasks/dataset_generate_golden_set/generate_golden_set.py b/cognee/tasks/dataset_generate_golden_set/generate_golden_set.py new file mode 100644 index 000000000..e69de29bb diff --git a/cognee/tasks/search_evaluate/__init__.py b/cognee/tasks/search_evaluate/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/docs/pipelines.md b/docs/pipelines.md new file mode 100644 index 000000000..2392eab80 --- /dev/null +++ b/docs/pipelines.md @@ -0,0 +1,81 @@ +# PIPELINES + +Cognee uses [tasks](https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/tasks/Task.py) grouped into pipelines that populate graph and vector stores. [These tasks](https://github.com/topoteretes/cognee/tree/main/cognee/tasks) analyze and enrich data, enhancing the quality of answers produced by Large Language Models (LLMs). + +The tasks are managed and executed asynchronously using the `run_tasks` and `run_tasks_parallel` functions. + +```python +pipeline = run_tasks(tasks, documents) +async for result in pipeline: + print(result) +``` + +## Main pipeline: [cognee.cognify](https://github.com/topoteretes/cognee/blob/168cb5d1bf1964b5b0c645b2f3d8638d84554fda/cognee/api/v1/cognify/cognify_v2.py#L38) + +This is the main pipeline currently implemented in cognee. It is designed to process data in a structured way and populate the graph and vector stores. + + +This function is the entry point for processing datasets. It handles dataset retrieval, user authorization, and manages the execution of a pipeline of tasks that process documents. + +### Parameters + +- `datasets: Union[str, list[str]] = None`: A string or list of dataset names to be processed. +- `user: User = None`: The user requesting the processing. If not provided, the default user is retrieved. + +### Steps in the Function + +#### User Authentication + +```python +if user is None: + user = await get_default_user() +``` + +If no user is provided, the function retrieves the default user. + +#### Handling Empty or String Dataset Input + +```python +existing_datasets = await get_datasets(user.id) +if datasets is None or len(datasets) == 0: + datasets = existing_datasets +if type(datasets[0]) == str: + datasets = await get_datasets_by_name(datasets, user.id) +``` + +If no datasets are provided, the function retrieves all datasets owned by the user. If a list of dataset names (strings) is provided, they are converted into dataset objects. + +#### Selecting datasets from the input list that are owned by the user + +```python +existing_datasets_map = { + generate_dataset_name(dataset.name): True for dataset in existing_datasets + } +``` + +#### Run Cognify Pipeline for Each Dataset + +```python +awaitables = [] + +for dataset in datasets: + dataset_name = generate_dataset_name(dataset.name) + + if dataset_name in existing_datasets_map: + awaitables.append(run_cognify_pipeline(dataset, user)) + +return await asyncio.gather(*awaitables) + +The `run_cognify_pipeline` function is defined within `cognify` and is responsible for processing a single dataset. This is where most of the heavy lifting occurs. The function processes multiple datasets concurrently using `asyncio.gather`. + + +#### Pipeline Tasks + +The pipeline consists of several tasks, each responsible for different parts of the processing: + +- `classify_documents`: Converts each of the documents into one of the specific Document types: PdfDocument, AudioDocument, ImageDocument or TextDocument +- `check_permissions_on_documents`: Checks if the user has the necessary permissions to access the documents. In this case, it checks for "write" permission. +- `extract_chunks_from_documents`: Extracts text chunks based on the document type. +- `add_data_points`: Creates nodes and edges from the chunks and their properties. Adds them to the graph engine. +- `extract_graph_from_data`: Generates knowledge graphs from the document chunks. +- `summarize_text`: Extracts a summary for each chunk using an llm. diff --git a/docs/templates.md b/docs/templates.md deleted file mode 100644 index 0e1ce4288..000000000 --- a/docs/templates.md +++ /dev/null @@ -1,243 +0,0 @@ -# TASKS - -!!! tip "cognee uses tasks grouped into pipelines to populate graph and vector stores" - - -Cognee organizes tasks into pipelines that populate graph and vector stores. These tasks analyze and enrich data, enhancing the quality of answers produced by Large Language Models (LLMs). - -This section provides a template to help you structure your data and build pipelines. \ -These tasks serve as a starting point for using Cognee to create reliable LLM pipelines. - - - - - - - -## Task 1: Category Extraction - -Data enrichment is the process of enhancing raw data with additional information to make it more valuable. This template is a sample task that extracts categories from a document and populates a graph with the extracted categories. - -Let's go over the steps to use this template [full code provided here](https://github.com/topoteretes/cognee/blob/main/cognee/tasks/chunk_naive_llm_classifier/chunk_naive_llm_classifier.py): - - -This function is designed to classify chunks of text using a specified language model. The goal is to categorize the text, map relationships, and store the results in a vector engine and a graph engine. The function is asynchronous, allowing for concurrent execution of tasks like classification and data point creation. - -### Parameters - -- `data_chunks: list[DocumentChunk]`: A list of text chunks to be classified. Each chunk represents a piece of text and includes metadata like `chunk_id` and `document_id`. -- `classification_model: Type[BaseModel]`: The model used to classify each chunk of text. This model is expected to output labels that categorize the text. - -### Steps in the Function - -#### Check for Empty Input - -```python -if len(data_chunks) == 0: - return data_chunks -``` - -If there are no data chunks provided, the function returns immediately with the input list (which is empty). - -#### Classify Each Chunk - -```python -chunk_classifications = await asyncio.gather( - *[extract_categories(chunk.text, classification_model) for chunk in data_chunks], -) -``` - -The function uses `asyncio.gather` to concurrently classify each chunk of text. `extract_categories` is called for each chunk, and the results are collected in `chunk_classifications`. - -#### Initialize Data Structures - -```python -classification_data_points = [] -``` - -A list is initialized to store the classification data points that will be used later for mapping relationships and storing in the vector engine. - -#### Generate UUIDs for Classifications - -The function loops through each chunk and generates unique identifiers (UUIDs) for both the main classification type and its subclasses: - -```python -classification_data_points.append(uuid5(NAMESPACE_OID, chunk_classification.label.type)) -classification_data_points.append(uuid5(NAMESPACE_OID, classification_subclass.value)) -``` - -These UUIDs are used to uniquely identify classifications and ensure consistency. - -#### Retrieve or Create Vector Collection - -```python -vector_engine = get_vector_engine() -collection_name = "classification" -``` - -The function interacts with a vector engine. It checks if the collection named "classification" exists. If it does, it retrieves existing data points to avoid duplicates. Otherwise, it creates the collection. - -#### Prepare Data Points, Nodes, and Edges - -The function then builds a list of `data_points` (representing the classification results) and constructs nodes and edges to represent relationships between chunks and their classifications: - -```python -data_points.append(DataPoint[Keyword](...)) -nodes.append((...)) -edges.append((...)) -``` - -- **Nodes**: Represent classifications (e.g., media type, subtype). -- **Edges**: Represent relationships between chunks and classifications (e.g., "is_media_type", "is_subtype_of"). - -#### Create Data Points and Relationships - -If there are new nodes or edges to add, the function stores the data points in the vector engine and updates the graph engine with the new nodes and edges: - -```python -await vector_engine.create_data_points(collection_name, data_points) -await graph_engine.add_nodes(nodes) -await graph_engine.add_edges(edges) -``` - -#### Return the Processed Chunks - -Finally, the function returns the processed `data_chunks`, which can now be used further as needed: - -```python -return data_chunks -``` - -## Pipeline 1: cognee pipeline - -This is the main pipeline currently implemented in cognee. It is designed to process data in a structured way and populate the graph and vector stores with the results - - -This function is the entry point for processing datasets. It handles dataset retrieval, user authorization, and manages the execution of a pipeline of tasks that process documents. - -### Parameters - -- `datasets: Union[str, list[str]] = None`: A string or list of dataset names to be processed. -- `user: User = None`: The user requesting the processing. If not provided, the default user is retrieved. - -### Steps in the Function - -#### Database Engine Initialization - -```python -db_engine = get_relational_engine() -``` - -The function starts by getting an instance of the relational database engine, which is used to retrieve datasets and other necessary data. - -#### Handle Empty or String Dataset Input - -```python -if datasets is None or len(datasets) == 0: - return await cognify(await db_engine.get_datasets()) -if type(datasets[0]) == str: - datasets = await retrieve_datasets(datasets) -``` - -If no datasets are provided, the function retrieves all available datasets from the database. If a list of dataset names (strings) is provided, they are converted into dataset objects. - -#### User Authentication - -```python -if user is None: - user = await get_default_user() -``` - -If no user is provided, the function retrieves the default user. - -#### Run Cognify Pipeline for Each Dataset - -```python -async def run_cognify_pipeline(dataset: Dataset): - # Pipeline logic goes here... -``` - -The `run_cognify_pipeline` function is defined within `cognify` and is responsible for processing a single dataset. This is where most of the heavy lifting occurs. - -#### Retrieve Dataset Data - -The function fetches all the data associated with the dataset. - -```python -data: list[Data] = await get_dataset_data(dataset_id=dataset.id) -``` - -#### Create Document Objects - -Based on the file type (e.g., PDF, Audio, Image, Text), corresponding document objects are created. - -```python -documents = [...] -``` - -#### Check Permissions - -The user's permissions are checked to ensure they can access the documents. - -```python -await check_permissions_on_documents(user, "read", document_ids) -``` - -#### Pipeline Status Logging - -The function logs the start and end of the pipeline processing. - -```python -async with update_status_lock: - task_status = await get_pipeline_status([dataset_id]) - if dataset_id in task_status and task_status[dataset_id] == "DATASET_PROCESSING_STARTED": - logger.info("Dataset %s is already being processed.", dataset_name) - return - await log_pipeline_status(dataset_id, "DATASET_PROCESSING_STARTED", {...}) -``` - -#### Pipeline Tasks - -The pipeline consists of several tasks, each responsible for different parts of the processing: - -- `document_to_ontology`: Maps documents to an ontology structure. -- `source_documents_to_chunks`: Splits documents into chunks. -- `chunk_to_graph_decomposition`: Defines the graph structure for chunks. -- `chunks_into_graph`: Integrates chunks into the knowledge graph. -- `chunk_update_check`: Checks for updated or new chunks. -- `save_chunks_to_store`: Saves chunks to a vector store and graph database. - -Parallel Tasks: `chunk_extract_summary` and `chunk_naive_llm_classifier` run in parallel to summarize and classify chunks. - -- `chunk_remove_disconnected`: Cleans up obsolete chunks. - -The tasks are managed and executed asynchronously using the `run_tasks` and `run_tasks_parallel` functions. - -```python -pipeline = run_tasks(tasks, documents) -async for result in pipeline: - print(result) -``` - -#### Handle Errors - -If any errors occur during processing, they are logged, and the exception is raised. - -```python -except Exception as error: - await log_pipeline_status(dataset_id, "DATASET_PROCESSING_ERROR", {...}) - raise error -``` - -#### Processing Multiple Datasets - -The function prepares to process multiple datasets concurrently using `asyncio.gather`. - -```python -awaitables = [] -for dataset in datasets: - dataset_name = generate_dataset_name(dataset.name) - if dataset_name in existing_datasets: - awaitables.append(run_cognify_pipeline(dataset)) -return await asyncio.gather(*awaitables) -``` diff --git a/pyproject.toml b/pyproject.toml index d74bd5de0..93ec8e0ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "cognee" -version = "0.1.17" +version = "0.1.18" description = "Cognee - is a library for enriching LLM context with a semantic layer for better understanding and reasoning." authors = ["Vasilije Markovic", "Boris Arzentar"] readme = "README.md" diff --git a/tools/daily_twitter_stats.py b/tools/daily_twitter_stats.py deleted file mode 100644 index d66f052d9..000000000 --- a/tools/daily_twitter_stats.py +++ /dev/null @@ -1,66 +0,0 @@ -import tweepy -import requests -import json -from datetime import datetime, timezone - -# Twitter API credentials from GitHub Secrets -API_KEY = '${{ secrets.TWITTER_API_KEY }}' -API_SECRET = '${{ secrets.TWITTER_API_SECRET }}' -ACCESS_TOKEN = '${{ secrets.TWITTER_ACCESS_TOKEN }}' -ACCESS_SECRET = '${{ secrets.TWITTER_ACCESS_SECRET }}' -USERNAME = '${{ secrets.TWITTER_USERNAME }}' -SEGMENT_WRITE_KEY = '${{ secrets.SEGMENT_WRITE_KEY }}' - -# Initialize Tweepy API -auth = tweepy.OAuthHandler(API_KEY, API_SECRET) -auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET) -twitter_api = tweepy.API(auth) - -# Segment endpoint -SEGMENT_ENDPOINT = 'https://api.segment.io/v1/track' - - -def get_follower_count(username): - try: - user = twitter_api.get_user(screen_name=username) - return user.followers_count - except tweepy.TweepError as e: - print(f'Error fetching follower count: {e}') - return None - - -def send_data_to_segment(username, follower_count): - current_time = datetime.now(timezone.utc).isoformat() - - data = { - 'userId': username, - 'event': 'Follower Count Update', - 'properties': { - 'username': username, - 'follower_count': follower_count, - 'timestamp': current_time - }, - 'timestamp': current_time - } - - headers = { - 'Content-Type': 'application/json', - 'Authorization': f'Basic {SEGMENT_WRITE_KEY.encode("utf-8").decode("utf-8")}' - } - - try: - response = requests.post(SEGMENT_ENDPOINT, headers=headers, data=json.dumps(data)) - - if response.status_code == 200: - print(f'Successfully sent data to Segment for {username}') - else: - print(f'Failed to send data to Segment. Status code: {response.status_code}, Response: {response.text}') - except requests.exceptions.RequestException as e: - print(f'Error sending data to Segment: {e}') - - -follower_count = get_follower_count(USERNAME) -if follower_count is not None: - send_data_to_segment(USERNAME, follower_count) -else: - print('Failed to retrieve follower count.')