diff --git a/README.md b/README.md
index f0aa2a280..1dfdbd39b 100644
--- a/README.md
+++ b/README.md
@@ -193,93 +193,14 @@ if __name__ == '__main__':
When you run this script, you will see step-by-step messages in the console that help you trace the execution flow and understand what the script is doing at each stage.
A version of this example is here: `examples/python/simple_example.py`
-### Create your own memory store
+### Understand our architecture
cognee framework consists of tasks that can be grouped into pipelines.
Each task can be an independent part of business logic, that can be tied to other tasks to form a pipeline.
These tasks persist data into your memory store enabling you to search for relevant context of past conversations, documents, or any other data you have stored.
-
-
-### Example: Classify your documents
-
-Here is an example of how it looks for a default cognify pipeline:
-
-1. To prepare the data for the pipeline run, first we need to add it to our metastore and normalize it:
-
-Start with:
-```
-text = """Natural language processing (NLP) is an interdisciplinary
- subfield of computer science and information retrieval"""
-
-await cognee.add(text) # Add a new piece of information
-```
-
-2. In the next step we make a task. The task can be any business logic we need, but the important part is that it should be encapsulated in one function.
-
-Here we show an example of creating a naive LLM classifier that takes a Pydantic model and then stores the data in both the graph and vector stores after analyzing each chunk.
-We provided just a snippet for reference, but feel free to check out the implementation in our repo.
-
-```
-async def chunk_naive_llm_classifier(
- data_chunks: list[DocumentChunk],
- classification_model: Type[BaseModel]
-):
- # Extract classifications asynchronously
- chunk_classifications = await asyncio.gather(
- *(extract_categories(chunk.text, classification_model) for chunk in data_chunks)
- )
-
- # Collect classification data points using a set to avoid duplicates
- classification_data_points = {
- uuid5(NAMESPACE_OID, cls.label.type)
- for cls in chunk_classifications
- } | {
- uuid5(NAMESPACE_OID, subclass.value)
- for cls in chunk_classifications
- for subclass in cls.label.subclass
- }
-
- vector_engine = get_vector_engine()
- collection_name = "classification"
-
- # Define the payload schema
- class Keyword(BaseModel):
- uuid: str
- text: str
- chunk_id: str
- document_id: str
-
- # Ensure the collection exists and retrieve existing data points
- if not await vector_engine.has_collection(collection_name):
- await vector_engine.create_collection(collection_name, payload_schema=Keyword)
- existing_points_map = {}
- else:
- existing_points_map = {}
- return data_chunks
-
-...
-
-```
-
-We have many tasks that can be used in your pipelines, and you can also create your tasks to fit your business logic.
-
-
-3. Once we have our tasks, it is time to group them into a pipeline.
-This simplified snippet demonstrates how tasks can be added to a pipeline, and how they can pass the information forward from one to another.
-
-```
-
-
-Task(
- chunk_naive_llm_classifier,
- classification_model = cognee_config.classification_model,
-)
-
-pipeline = run_tasks(tasks, documents)
-
-```
-
-To see the working code, check cognee.api.v1.cognify default pipeline in our repo.
+
+

+
## Vector retrieval, Graphs and LLMs
diff --git a/assets/architecture.png b/assets/architecture.png
deleted file mode 100644
index 3911fdc21..000000000
Binary files a/assets/architecture.png and /dev/null differ
diff --git a/assets/cognee_diagram.png b/assets/cognee_diagram.png
new file mode 100644
index 000000000..4eb45e45e
Binary files /dev/null and b/assets/cognee_diagram.png differ
diff --git a/cognee/api/v1/cognify/code_graph_pipeline.py b/cognee/api/v1/cognify/code_graph_pipeline.py
index c35f9719f..8e92d08e0 100644
--- a/cognee/api/v1/cognify/code_graph_pipeline.py
+++ b/cognee/api/v1/cognify/code_graph_pipeline.py
@@ -2,29 +2,37 @@ import asyncio
import logging
from pathlib import Path
+from cognee.base_config import get_base_config
+from cognee.modules.cognify.config import get_cognify_config
from cognee.modules.pipelines import run_tasks
from cognee.modules.pipelines.tasks.Task import Task
+from cognee.modules.users.methods import get_default_user
+from cognee.shared.data_models import KnowledgeGraph, MonitoringTool
+from cognee.tasks.documents import (classify_documents,
+ extract_chunks_from_documents)
+from cognee.tasks.graph import extract_graph_from_data
+from cognee.tasks.ingestion import ingest_data_with_metadata
from cognee.tasks.repo_processor import (enrich_dependency_graph,
expand_dependency_graph,
+ get_data_list_for_user,
+ get_non_code_files,
get_repo_file_dependencies)
from cognee.tasks.storage import add_data_points
-from cognee.base_config import get_base_config
-from cognee.shared.data_models import MonitoringTool
-
monitoring = get_base_config().monitoring_tool
if monitoring == MonitoringTool.LANGFUSE:
from langfuse.decorators import observe
-from cognee.tasks.summarization import summarize_code
+from cognee.tasks.summarization import summarize_code, summarize_text
logger = logging.getLogger("code_graph_pipeline")
update_status_lock = asyncio.Lock()
@observe
-async def run_code_graph_pipeline(repo_path):
+async def run_code_graph_pipeline(repo_path, include_docs=True):
import os
import pathlib
+
import cognee
from cognee.infrastructure.databases.relational import create_db_and_tables
@@ -38,6 +46,9 @@ async def run_code_graph_pipeline(repo_path):
await cognee.prune.prune_system(metadata=True)
await create_db_and_tables()
+ cognee_config = get_cognify_config()
+ user = await get_default_user()
+
tasks = [
Task(get_repo_file_dependencies),
Task(enrich_dependency_graph, task_config={"batch_size": 50}),
@@ -46,4 +57,24 @@ async def run_code_graph_pipeline(repo_path):
Task(add_data_points, task_config={"batch_size": 50}),
]
- return run_tasks(tasks, repo_path, "cognify_code_pipeline")
+ if include_docs:
+ non_code_tasks = [
+ Task(get_non_code_files, task_config={"batch_size": 50}),
+ Task(ingest_data_with_metadata, dataset_name="repo_docs", user=user),
+ Task(get_data_list_for_user, dataset_name="repo_docs", user=user),
+ Task(classify_documents),
+ Task(extract_chunks_from_documents),
+ Task(extract_graph_from_data, graph_model=KnowledgeGraph, task_config={"batch_size": 50}),
+ Task(
+ summarize_text,
+ summarization_model=cognee_config.summarization_model,
+ task_config={"batch_size": 50}
+ ),
+ ]
+
+ if include_docs:
+ async for result in run_tasks(non_code_tasks, repo_path):
+ yield result
+
+ async for result in run_tasks(tasks, repo_path, "cognify_code_pipeline"):
+ yield result
\ No newline at end of file
diff --git a/cognee/infrastructure/llm/openai/adapter.py b/cognee/infrastructure/llm/openai/adapter.py
index 3eb3528e2..bb5af15f2 100644
--- a/cognee/infrastructure/llm/openai/adapter.py
+++ b/cognee/infrastructure/llm/openai/adapter.py
@@ -12,7 +12,8 @@ from cognee.infrastructure.llm.llm_interface import LLMInterface
from cognee.infrastructure.llm.prompts import read_query_prompt
from cognee.base_config import get_base_config
-if MonitoringTool.LANGFUSE:
+monitoring = get_base_config().monitoring_tool
+if monitoring == MonitoringTool.LANGFUSE:
from langfuse.decorators import observe
class OpenAIAdapter(LLMInterface):
@@ -43,7 +44,7 @@ class OpenAIAdapter(LLMInterface):
base_config = get_base_config()
- @observe()
+ @observe(as_type='generation')
async def acreate_structured_output(self, text_input: str, system_prompt: str,
response_model: Type[BaseModel]) -> BaseModel:
diff --git a/cognee/modules/data/extraction/extract_summary.py b/cognee/modules/data/extraction/extract_summary.py
index 102d5bff4..0d3feef95 100644
--- a/cognee/modules/data/extraction/extract_summary.py
+++ b/cognee/modules/data/extraction/extract_summary.py
@@ -1,12 +1,17 @@
-from typing import Type
+import logging
import os
+from typing import Type
+
+from instructor.exceptions import InstructorRetryException
from pydantic import BaseModel
+from tenacity import RetryError
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.infrastructure.llm.prompts import read_query_prompt
-from cognee.shared.data_models import SummarizedCode, SummarizedClass, SummarizedFunction
+from cognee.shared.data_models import SummarizedCode
from cognee.tasks.summarization.mock_summary import get_mock_summarized_code
+logger = logging.getLogger("extract_summary")
async def extract_summary(content: str, response_model: Type[BaseModel]):
llm_client = get_llm_client()
@@ -14,7 +19,7 @@ async def extract_summary(content: str, response_model: Type[BaseModel]):
system_prompt = read_query_prompt("summarize_content.txt")
llm_output = await llm_client.acreate_structured_output(content, system_prompt, response_model)
-
+
return llm_output
async def extract_code_summary(content: str):
@@ -27,5 +32,10 @@ async def extract_code_summary(content: str):
result = get_mock_summarized_code()
return result
else:
- result = await extract_summary(content, response_model=SummarizedCode)
+ try:
+ result = await extract_summary(content, response_model=SummarizedCode)
+ except (RetryError, InstructorRetryException) as e:
+ logger.error("Failed to extract code summary, falling back to mock summary", exc_info=e)
+ result = get_mock_summarized_code()
+
return result
diff --git a/cognee/tasks/repo_processor/__init__.py b/cognee/tasks/repo_processor/__init__.py
index 05e111b29..fa754028e 100644
--- a/cognee/tasks/repo_processor/__init__.py
+++ b/cognee/tasks/repo_processor/__init__.py
@@ -4,4 +4,5 @@ logger = logging.getLogger("task:repo_processor")
from .enrich_dependency_graph import enrich_dependency_graph
from .expand_dependency_graph import expand_dependency_graph
+from .get_non_code_files import get_data_list_for_user, get_non_py_files
from .get_repo_file_dependencies import get_repo_file_dependencies
diff --git a/cognee/tasks/repo_processor/get_non_code_files.py b/cognee/tasks/repo_processor/get_non_code_files.py
new file mode 100644
index 000000000..671b998d9
--- /dev/null
+++ b/cognee/tasks/repo_processor/get_non_code_files.py
@@ -0,0 +1,48 @@
+import os
+
+import aiofiles
+
+import cognee.modules.ingestion as ingestion
+from cognee.infrastructure.engine import DataPoint
+from cognee.modules.data.methods import get_datasets
+from cognee.modules.data.methods.get_dataset_data import get_dataset_data
+from cognee.modules.data.methods.get_datasets_by_name import \
+ get_datasets_by_name
+from cognee.modules.data.models import Data
+from cognee.modules.data.operations.write_metadata import write_metadata
+from cognee.modules.ingestion.data_types import BinaryData
+from cognee.modules.users.methods import get_default_user
+from cognee.shared.CodeGraphEntities import Repository
+
+
+async def get_non_py_files(repo_path):
+ """Get files that are not .py files and their contents"""
+ if not os.path.exists(repo_path):
+ return {}
+
+ IGNORED_PATTERNS = {
+ '.git', '__pycache__', '*.pyc', '*.pyo', '*.pyd',
+ 'node_modules', '*.egg-info'
+ }
+
+ def should_process(path):
+ return not any(pattern in path for pattern in IGNORED_PATTERNS)
+
+ non_py_files_paths = [
+ os.path.join(root, file)
+ for root, _, files in os.walk(repo_path) for file in files
+ if not file.endswith(".py") and should_process(os.path.join(root, file))
+ ]
+ return non_py_files_paths
+
+
+async def get_data_list_for_user(_, dataset_name, user):
+ # Note: This method is meant to be used as a Task in a pipeline.
+ # By the nature of pipelines, the output of the previous Task will be passed as the first argument here,
+ # but it is not needed here, hence the "_" input.
+ datasets = await get_datasets_by_name(dataset_name, user.id)
+ data_documents: list[Data] = []
+ for dataset in datasets:
+ data_docs: list[Data] = await get_dataset_data(dataset_id=dataset.id)
+ data_documents.extend(data_docs)
+ return data_documents
\ No newline at end of file
diff --git a/examples/python/code_graph_example.py b/examples/python/code_graph_example.py
index 9189de46c..c0b91972b 100644
--- a/examples/python/code_graph_example.py
+++ b/examples/python/code_graph_example.py
@@ -1,15 +1,16 @@
import argparse
import asyncio
+
from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline
-async def main(repo_path):
- async for result in await run_code_graph_pipeline(repo_path):
+async def main(repo_path, include_docs):
+ async for result in run_code_graph_pipeline(repo_path, include_docs):
print(result)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
- parser.add_argument("--repo-path", type=str, required=True, help="Path to the repository")
+ parser.add_argument("--repo_path", type=str, required=True, help="Path to the repository")
+ parser.add_argument("--include_docs", type=bool, default=True, help="Whether or not to process non-code files")
args = parser.parse_args()
- asyncio.run(main(args.repo_path))
-
+ asyncio.run(main(args.repo_path, args.include_docs))
\ No newline at end of file