Merge remote-tracking branch 'origin/dev' into feat/cognee-mcp
This commit is contained in:
commit
29de5a3e37
9 changed files with 113 additions and 100 deletions
87
README.md
87
README.md
|
|
@ -193,93 +193,14 @@ if __name__ == '__main__':
|
|||
When you run this script, you will see step-by-step messages in the console that help you trace the execution flow and understand what the script is doing at each stage.
|
||||
A version of this example is here: `examples/python/simple_example.py`
|
||||
|
||||
### Create your own memory store
|
||||
### Understand our architecture
|
||||
|
||||
cognee framework consists of tasks that can be grouped into pipelines.
|
||||
Each task can be an independent part of business logic, that can be tied to other tasks to form a pipeline.
|
||||
These tasks persist data into your memory store enabling you to search for relevant context of past conversations, documents, or any other data you have stored.
|
||||
|
||||
|
||||
### Example: Classify your documents
|
||||
|
||||
Here is an example of how it looks for a default cognify pipeline:
|
||||
|
||||
1. To prepare the data for the pipeline run, first we need to add it to our metastore and normalize it:
|
||||
|
||||
Start with:
|
||||
```
|
||||
text = """Natural language processing (NLP) is an interdisciplinary
|
||||
subfield of computer science and information retrieval"""
|
||||
|
||||
await cognee.add(text) # Add a new piece of information
|
||||
```
|
||||
|
||||
2. In the next step we make a task. The task can be any business logic we need, but the important part is that it should be encapsulated in one function.
|
||||
|
||||
Here we show an example of creating a naive LLM classifier that takes a Pydantic model and then stores the data in both the graph and vector stores after analyzing each chunk.
|
||||
We provided just a snippet for reference, but feel free to check out the implementation in our repo.
|
||||
|
||||
```
|
||||
async def chunk_naive_llm_classifier(
|
||||
data_chunks: list[DocumentChunk],
|
||||
classification_model: Type[BaseModel]
|
||||
):
|
||||
# Extract classifications asynchronously
|
||||
chunk_classifications = await asyncio.gather(
|
||||
*(extract_categories(chunk.text, classification_model) for chunk in data_chunks)
|
||||
)
|
||||
|
||||
# Collect classification data points using a set to avoid duplicates
|
||||
classification_data_points = {
|
||||
uuid5(NAMESPACE_OID, cls.label.type)
|
||||
for cls in chunk_classifications
|
||||
} | {
|
||||
uuid5(NAMESPACE_OID, subclass.value)
|
||||
for cls in chunk_classifications
|
||||
for subclass in cls.label.subclass
|
||||
}
|
||||
|
||||
vector_engine = get_vector_engine()
|
||||
collection_name = "classification"
|
||||
|
||||
# Define the payload schema
|
||||
class Keyword(BaseModel):
|
||||
uuid: str
|
||||
text: str
|
||||
chunk_id: str
|
||||
document_id: str
|
||||
|
||||
# Ensure the collection exists and retrieve existing data points
|
||||
if not await vector_engine.has_collection(collection_name):
|
||||
await vector_engine.create_collection(collection_name, payload_schema=Keyword)
|
||||
existing_points_map = {}
|
||||
else:
|
||||
existing_points_map = {}
|
||||
return data_chunks
|
||||
|
||||
...
|
||||
|
||||
```
|
||||
|
||||
We have many tasks that can be used in your pipelines, and you can also create your tasks to fit your business logic.
|
||||
|
||||
|
||||
3. Once we have our tasks, it is time to group them into a pipeline.
|
||||
This simplified snippet demonstrates how tasks can be added to a pipeline, and how they can pass the information forward from one to another.
|
||||
|
||||
```
|
||||
|
||||
|
||||
Task(
|
||||
chunk_naive_llm_classifier,
|
||||
classification_model = cognee_config.classification_model,
|
||||
)
|
||||
|
||||
pipeline = run_tasks(tasks, documents)
|
||||
|
||||
```
|
||||
|
||||
To see the working code, check cognee.api.v1.cognify default pipeline in our repo.
|
||||
<div align="center">
|
||||
<img src="assets/cognee_diagram.png" alt="cognee concept diagram" width="50%" />
|
||||
</div>
|
||||
|
||||
|
||||
## Vector retrieval, Graphs and LLMs
|
||||
|
|
|
|||
Binary file not shown.
|
Before Width: | Height: | Size: 77 KiB |
BIN
assets/cognee_diagram.png
Normal file
BIN
assets/cognee_diagram.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 428 KiB |
|
|
@ -2,29 +2,37 @@ import asyncio
|
|||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from cognee.base_config import get_base_config
|
||||
from cognee.modules.cognify.config import get_cognify_config
|
||||
from cognee.modules.pipelines import run_tasks
|
||||
from cognee.modules.pipelines.tasks.Task import Task
|
||||
from cognee.modules.users.methods import get_default_user
|
||||
from cognee.shared.data_models import KnowledgeGraph, MonitoringTool
|
||||
from cognee.tasks.documents import (classify_documents,
|
||||
extract_chunks_from_documents)
|
||||
from cognee.tasks.graph import extract_graph_from_data
|
||||
from cognee.tasks.ingestion import ingest_data_with_metadata
|
||||
from cognee.tasks.repo_processor import (enrich_dependency_graph,
|
||||
expand_dependency_graph,
|
||||
get_data_list_for_user,
|
||||
get_non_code_files,
|
||||
get_repo_file_dependencies)
|
||||
from cognee.tasks.storage import add_data_points
|
||||
|
||||
from cognee.base_config import get_base_config
|
||||
from cognee.shared.data_models import MonitoringTool
|
||||
|
||||
monitoring = get_base_config().monitoring_tool
|
||||
if monitoring == MonitoringTool.LANGFUSE:
|
||||
from langfuse.decorators import observe
|
||||
|
||||
from cognee.tasks.summarization import summarize_code
|
||||
from cognee.tasks.summarization import summarize_code, summarize_text
|
||||
|
||||
logger = logging.getLogger("code_graph_pipeline")
|
||||
update_status_lock = asyncio.Lock()
|
||||
|
||||
@observe
|
||||
async def run_code_graph_pipeline(repo_path):
|
||||
async def run_code_graph_pipeline(repo_path, include_docs=True):
|
||||
import os
|
||||
import pathlib
|
||||
|
||||
import cognee
|
||||
from cognee.infrastructure.databases.relational import create_db_and_tables
|
||||
|
||||
|
|
@ -38,6 +46,9 @@ async def run_code_graph_pipeline(repo_path):
|
|||
await cognee.prune.prune_system(metadata=True)
|
||||
await create_db_and_tables()
|
||||
|
||||
cognee_config = get_cognify_config()
|
||||
user = await get_default_user()
|
||||
|
||||
tasks = [
|
||||
Task(get_repo_file_dependencies),
|
||||
Task(enrich_dependency_graph, task_config={"batch_size": 50}),
|
||||
|
|
@ -46,4 +57,24 @@ async def run_code_graph_pipeline(repo_path):
|
|||
Task(add_data_points, task_config={"batch_size": 50}),
|
||||
]
|
||||
|
||||
return run_tasks(tasks, repo_path, "cognify_code_pipeline")
|
||||
if include_docs:
|
||||
non_code_tasks = [
|
||||
Task(get_non_code_files, task_config={"batch_size": 50}),
|
||||
Task(ingest_data_with_metadata, dataset_name="repo_docs", user=user),
|
||||
Task(get_data_list_for_user, dataset_name="repo_docs", user=user),
|
||||
Task(classify_documents),
|
||||
Task(extract_chunks_from_documents),
|
||||
Task(extract_graph_from_data, graph_model=KnowledgeGraph, task_config={"batch_size": 50}),
|
||||
Task(
|
||||
summarize_text,
|
||||
summarization_model=cognee_config.summarization_model,
|
||||
task_config={"batch_size": 50}
|
||||
),
|
||||
]
|
||||
|
||||
if include_docs:
|
||||
async for result in run_tasks(non_code_tasks, repo_path):
|
||||
yield result
|
||||
|
||||
async for result in run_tasks(tasks, repo_path, "cognify_code_pipeline"):
|
||||
yield result
|
||||
|
|
@ -12,7 +12,8 @@ from cognee.infrastructure.llm.llm_interface import LLMInterface
|
|||
from cognee.infrastructure.llm.prompts import read_query_prompt
|
||||
from cognee.base_config import get_base_config
|
||||
|
||||
if MonitoringTool.LANGFUSE:
|
||||
monitoring = get_base_config().monitoring_tool
|
||||
if monitoring == MonitoringTool.LANGFUSE:
|
||||
from langfuse.decorators import observe
|
||||
|
||||
class OpenAIAdapter(LLMInterface):
|
||||
|
|
@ -43,7 +44,7 @@ class OpenAIAdapter(LLMInterface):
|
|||
base_config = get_base_config()
|
||||
|
||||
|
||||
@observe()
|
||||
@observe(as_type='generation')
|
||||
async def acreate_structured_output(self, text_input: str, system_prompt: str,
|
||||
response_model: Type[BaseModel]) -> BaseModel:
|
||||
|
||||
|
|
|
|||
|
|
@ -1,12 +1,17 @@
|
|||
from typing import Type
|
||||
import logging
|
||||
import os
|
||||
from typing import Type
|
||||
|
||||
from instructor.exceptions import InstructorRetryException
|
||||
from pydantic import BaseModel
|
||||
from tenacity import RetryError
|
||||
|
||||
from cognee.infrastructure.llm.get_llm_client import get_llm_client
|
||||
from cognee.infrastructure.llm.prompts import read_query_prompt
|
||||
from cognee.shared.data_models import SummarizedCode, SummarizedClass, SummarizedFunction
|
||||
from cognee.shared.data_models import SummarizedCode
|
||||
from cognee.tasks.summarization.mock_summary import get_mock_summarized_code
|
||||
|
||||
logger = logging.getLogger("extract_summary")
|
||||
|
||||
async def extract_summary(content: str, response_model: Type[BaseModel]):
|
||||
llm_client = get_llm_client()
|
||||
|
|
@ -14,7 +19,7 @@ async def extract_summary(content: str, response_model: Type[BaseModel]):
|
|||
system_prompt = read_query_prompt("summarize_content.txt")
|
||||
|
||||
llm_output = await llm_client.acreate_structured_output(content, system_prompt, response_model)
|
||||
|
||||
|
||||
return llm_output
|
||||
|
||||
async def extract_code_summary(content: str):
|
||||
|
|
@ -27,5 +32,10 @@ async def extract_code_summary(content: str):
|
|||
result = get_mock_summarized_code()
|
||||
return result
|
||||
else:
|
||||
result = await extract_summary(content, response_model=SummarizedCode)
|
||||
try:
|
||||
result = await extract_summary(content, response_model=SummarizedCode)
|
||||
except (RetryError, InstructorRetryException) as e:
|
||||
logger.error("Failed to extract code summary, falling back to mock summary", exc_info=e)
|
||||
result = get_mock_summarized_code()
|
||||
|
||||
return result
|
||||
|
|
|
|||
|
|
@ -4,4 +4,5 @@ logger = logging.getLogger("task:repo_processor")
|
|||
|
||||
from .enrich_dependency_graph import enrich_dependency_graph
|
||||
from .expand_dependency_graph import expand_dependency_graph
|
||||
from .get_non_code_files import get_data_list_for_user, get_non_py_files
|
||||
from .get_repo_file_dependencies import get_repo_file_dependencies
|
||||
|
|
|
|||
48
cognee/tasks/repo_processor/get_non_code_files.py
Normal file
48
cognee/tasks/repo_processor/get_non_code_files.py
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
import os
|
||||
|
||||
import aiofiles
|
||||
|
||||
import cognee.modules.ingestion as ingestion
|
||||
from cognee.infrastructure.engine import DataPoint
|
||||
from cognee.modules.data.methods import get_datasets
|
||||
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
|
||||
from cognee.modules.data.methods.get_datasets_by_name import \
|
||||
get_datasets_by_name
|
||||
from cognee.modules.data.models import Data
|
||||
from cognee.modules.data.operations.write_metadata import write_metadata
|
||||
from cognee.modules.ingestion.data_types import BinaryData
|
||||
from cognee.modules.users.methods import get_default_user
|
||||
from cognee.shared.CodeGraphEntities import Repository
|
||||
|
||||
|
||||
async def get_non_py_files(repo_path):
|
||||
"""Get files that are not .py files and their contents"""
|
||||
if not os.path.exists(repo_path):
|
||||
return {}
|
||||
|
||||
IGNORED_PATTERNS = {
|
||||
'.git', '__pycache__', '*.pyc', '*.pyo', '*.pyd',
|
||||
'node_modules', '*.egg-info'
|
||||
}
|
||||
|
||||
def should_process(path):
|
||||
return not any(pattern in path for pattern in IGNORED_PATTERNS)
|
||||
|
||||
non_py_files_paths = [
|
||||
os.path.join(root, file)
|
||||
for root, _, files in os.walk(repo_path) for file in files
|
||||
if not file.endswith(".py") and should_process(os.path.join(root, file))
|
||||
]
|
||||
return non_py_files_paths
|
||||
|
||||
|
||||
async def get_data_list_for_user(_, dataset_name, user):
|
||||
# Note: This method is meant to be used as a Task in a pipeline.
|
||||
# By the nature of pipelines, the output of the previous Task will be passed as the first argument here,
|
||||
# but it is not needed here, hence the "_" input.
|
||||
datasets = await get_datasets_by_name(dataset_name, user.id)
|
||||
data_documents: list[Data] = []
|
||||
for dataset in datasets:
|
||||
data_docs: list[Data] = await get_dataset_data(dataset_id=dataset.id)
|
||||
data_documents.extend(data_docs)
|
||||
return data_documents
|
||||
|
|
@ -1,15 +1,16 @@
|
|||
import argparse
|
||||
import asyncio
|
||||
|
||||
from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline
|
||||
|
||||
|
||||
async def main(repo_path):
|
||||
async for result in await run_code_graph_pipeline(repo_path):
|
||||
async def main(repo_path, include_docs):
|
||||
async for result in run_code_graph_pipeline(repo_path, include_docs):
|
||||
print(result)
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--repo-path", type=str, required=True, help="Path to the repository")
|
||||
parser.add_argument("--repo_path", type=str, required=True, help="Path to the repository")
|
||||
parser.add_argument("--include_docs", type=bool, default=True, help="Whether or not to process non-code files")
|
||||
args = parser.parse_args()
|
||||
asyncio.run(main(args.repo_path))
|
||||
|
||||
asyncio.run(main(args.repo_path, args.include_docs))
|
||||
Loading…
Add table
Reference in a new issue