working cognify on large dataset

This commit is contained in:
Vasilije 2024-05-12 21:58:09 +02:00
parent 7249c240d0
commit 3c261ce6a1
6 changed files with 101 additions and 48 deletions

View file

@ -11,7 +11,7 @@ from nltk.corpus import stopwords
from cognee.api.v1.prune import prune
from cognee.config import Config
from cognee.infrastructure.data.chunking.LangchainChunkingEngine import LangchainChunkEngine
from cognee.infrastructure.databases.vector.embeddings.DefaultEmbeddingEngine import OpenAIEmbeddingEngine
from cognee.infrastructure.databases.vector.embeddings.DefaultEmbeddingEngine import LiteLLMEmbeddingEngine
from cognee.modules.cognify.graph.add_data_chunks import add_data_chunks
from cognee.modules.cognify.graph.add_document_node import add_document_node
from cognee.modules.cognify.graph.add_classification_nodes import add_classification_nodes
@ -32,6 +32,7 @@ from cognee.modules.data.get_content_summary import get_content_summary
from cognee.modules.data.get_cognitive_layers import get_cognitive_layers
from cognee.modules.data.get_layer_graphs import get_layer_graphs
from cognee.modules.topology.topology import TopologyEngine
from cognee.shared.GithubClassification import CodeContentPrediction
from cognee.shared.data_models import ChunkStrategy
from cognee.utils import send_telemetry
@ -94,7 +95,7 @@ async def cognify(datasets: Union[str, List[str]] = None):
file_type = guess_file_type(file)
text = extract_text_from_file(file, file_type)
if text is None:
text = ""
text = "empty file"
subchunks = chunk_engine.chunk_data(chunk_strategy, text, config.chunk_size, config.chunk_overlap)
if dataset_name not in data_chunks:
@ -105,6 +106,9 @@ async def cognify(datasets: Union[str, List[str]] = None):
except FileTypeException:
logger.warning("File (%s) has an unknown file type. We are skipping it.", file_metadata["id"])
added_chunks: list[tuple[str, str, dict]] = await add_data_chunks(data_chunks)
await asyncio.gather(
@ -129,12 +133,12 @@ async def process_text(chunk_collection: str, chunk_id: str, input_text: str, fi
#
await add_label_nodes(graph_client, document_id, chunk_id, file_metadata["keywords"].split("|"))
# classified_categories = await get_content_categories(input_text)
# await add_classification_nodes(
# graph_client,
# parent_node_id = document_id,
# categories = classified_categories,
# )
classified_categories = await get_content_categories(input_text)
await add_classification_nodes(
graph_client,
parent_node_id = document_id,
categories = classified_categories,
)
# print(f"Chunk ({chunk_id}) classified.")
@ -145,11 +149,11 @@ async def process_text(chunk_collection: str, chunk_id: str, input_text: str, fi
print(f"Chunk ({chunk_id}) summarized.")
# cognitive_layers = await get_cognitive_layers(input_text, classified_categories)
# cognitive_layers = (await add_cognitive_layers(graph_client, document_id, cognitive_layers))[:2]
cognitive_layers = await get_cognitive_layers(input_text, classified_categories)
cognitive_layers = (await add_cognitive_layers(graph_client, document_id, cognitive_layers))[:2]
#
# layer_graphs = await get_layer_graphs(input_text, cognitive_layers)
# await add_cognitive_layer_graphs(graph_client, chunk_collection, chunk_id, layer_graphs)
layer_graphs = await get_layer_graphs(input_text, cognitive_layers)
await add_cognitive_layer_graphs(graph_client, chunk_collection, chunk_id, layer_graphs)
#
# if infrastructure_config.get_config()["connect_documents"] is True:
# db_engine = infrastructure_config.get_config()["database_engine"]
@ -196,7 +200,13 @@ if __name__ == "__main__":
#
# await add("data://" +data_directory_path, "example")
infrastructure_config.set_config( {"chunk_engine": LangchainChunkEngine() , "chunk_strategy": ChunkStrategy.CODE,'embedding_engine': OpenAIEmbeddingEngine()})
infrastructure_config.set_config( {"chunk_engine": LangchainChunkEngine() , "chunk_strategy": ChunkStrategy.CODE,'embedding_engine': LiteLLMEmbeddingEngine()})
from cognee.shared.SourceCodeGraph import SourceCodeGraph
from cognee.api.v1.config import config
config.set_graph_model(SourceCodeGraph)
config.set_classification_model(CodeContentPrediction)
graph = await cognify()
#

View file

@ -63,6 +63,8 @@ class Config:
openai_temperature: float = float(os.getenv("OPENAI_TEMPERATURE", 0.0))
openai_embedding_model = "text-embedding-3-large"
openai_embedding_dimensions = 3072
litellm_embedding_model = "text-embedding-3-large"
litellm_embedding_dimensions = 3072
graphistry_username = os.getenv("GRAPHISTRY_USERNAME")
graphistry_password = os.getenv("GRAPHISTRY_PASSWORD")

View file

@ -46,5 +46,7 @@ class LangchainChunkEngine():
)
code_chunks = python_splitter.create_documents([data_chunks])
return code_chunks
only_content = [chunk.page_content for chunk in code_chunks]
return only_content

View file

@ -11,6 +11,9 @@ from cognee.config import Config
from cognee.root_dir import get_absolute_path
from cognee.infrastructure.databases.vector.embeddings.EmbeddingEngine import EmbeddingEngine
from litellm import aembedding
import litellm
litellm.set_verbose = True
config = Config()
config.load()
@ -25,25 +28,35 @@ class DefaultEmbeddingEngine(EmbeddingEngine):
return config.embedding_dimensions
class OpenAIEmbeddingEngine(EmbeddingEngine):
class LiteLLMEmbeddingEngine(EmbeddingEngine):
async def embed_text(self, text: List[str]) -> List[float]:
response = await aembedding(config.openai_embedding_model, input=text)
print("text", text)
try:
text = str(text[0])
except:
text = str(text)
response = await aembedding(config.litellm_embedding_model, input=text)
# embedding = response.data[0].embedding
# embeddings_list = list(map(lambda embedding: embedding.tolist(), embedding_model.embed(text)))
print("response", type(response.data[0]['embedding']))
return response.data[0]['embedding']
def get_vector_size(self) -> int:
return config.openai_embedding_dimensions
return config.litellm_embedding_dimensions
if __name__ == "__main__":
async def gg():
openai_embedding_engine = OpenAIEmbeddingEngine()
openai_embedding_engine = LiteLLMEmbeddingEngine()
# print(openai_embedding_engine.embed_text(["Hello, how are you?"]))
# print(openai_embedding_engine.get_vector_size())
# default_embedding_engine = DefaultEmbeddingEngine()

View file

@ -1,3 +1,4 @@
import json
import logging
from typing import TypedDict
from cognee.infrastructure import infrastructure_config
@ -14,12 +15,12 @@ async def add_data_chunks(dataset_data_chunks: dict[str, list[TextChunk]]):
identified_chunks = []
for (dataset_name, chunks) in dataset_data_chunks.items():
try:
# if not await vector_client.collection_exists(dataset_name):
# logging.error(f"Creating collection {str(dataset_name)}")
await vector_client.create_collection(dataset_name)
except Exception:
pass
# try:
# # if not await vector_client.collection_exists(dataset_name):
# # logging.error(f"Creating collection {str(dataset_name)}")
# await vector_client.create_collection(dataset_name)
# except Exception:
# pass
dataset_chunks = [
dict(
@ -32,29 +33,29 @@ async def add_data_chunks(dataset_data_chunks: dict[str, list[TextChunk]]):
identified_chunks.extend(dataset_chunks)
# if not await vector_client.collection_exists(dataset_name):
try:
logging.error("Collection still not found. Creating collection again.")
await vector_client.create_collection(dataset_name)
except:
pass
async def create_collection_retry(dataset_name, dataset_chunks):
await vector_client.create_data_points(
dataset_name,
[
DataPoint(
id = chunk["chunk_id"],
payload = dict(text = chunk["text"]),
embed_field = "text"
) for chunk in dataset_chunks
],
)
try:
await create_collection_retry(dataset_name, dataset_chunks)
except Exception:
logging.error("Collection not found in create data points.")
await create_collection_retry(dataset_name, dataset_chunks)
# # if not await vector_client.collection_exists(dataset_name):
# try:
# logging.error("Collection still not found. Creating collection again.")
# await vector_client.create_collection(dataset_name)
# except:
# pass
#
# async def create_collection_retry(dataset_name, dataset_chunks):
# await vector_client.create_data_points(
# dataset_name,
# [
# DataPoint(
# id = chunk["chunk_id"],
# payload = dict(text = chunk["text"]),
# embed_field = "text"
# ) for chunk in dataset_chunks
# ],
# )
#
# try:
# await create_collection_retry(dataset_name, dataset_chunks)
# except Exception:
# logging.error("Collection not found in create data points.")
# await create_collection_retry(dataset_name, dataset_chunks)
return identified_chunks

View file

@ -0,0 +1,25 @@
from enum import Enum
from typing import List
from pydantic import BaseModel
class TextSubclass(str, Enum):
SOURCE_CODE = "Source code in various programming languages"
SHELL_SCRIPTS = "Shell commands and scripts"
MARKUP_LANGUAGES = "Markup languages (HTML, XML)"
STYLESHEETS = "Stylesheets (CSS) and configuration files (YAML, JSON, INI)"
OTHER = "Other that does not fit into any of the above categories"
class ContentType(BaseModel):
"""Base class for content type, storing type of content as string."""
type: str = "TEXT"
class TextContent(ContentType):
"""Textual content class for more specific text categories."""
subclass: List[TextSubclass]
class CodeContentPrediction(BaseModel):
"""Model to predict the type of content."""
label: TextContent