Added chunking
This commit is contained in:
parent
0f02edf30d
commit
1c4caa9ee8
5 changed files with 66 additions and 7 deletions
|
|
@ -1,4 +1,6 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import uuid
|
||||||
|
from os import path
|
||||||
from typing import List, Union
|
from typing import List, Union
|
||||||
import logging
|
import logging
|
||||||
import instructor
|
import instructor
|
||||||
|
|
@ -25,6 +27,8 @@ from cognee.modules.data.get_content_categories import get_content_categories
|
||||||
from cognee.modules.data.get_content_summary import get_content_summary
|
from cognee.modules.data.get_content_summary import get_content_summary
|
||||||
from cognee.modules.data.get_cognitive_layers import get_cognitive_layers
|
from cognee.modules.data.get_cognitive_layers import get_cognitive_layers
|
||||||
from cognee.modules.data.get_layer_graphs import get_layer_graphs
|
from cognee.modules.data.get_layer_graphs import get_layer_graphs
|
||||||
|
from cognee.modules.ingestion.chunkers import chunk_data
|
||||||
|
from cognee.shared.data_models import ChunkStrategy
|
||||||
|
|
||||||
config = Config()
|
config = Config()
|
||||||
config.load()
|
config.load()
|
||||||
|
|
@ -73,24 +77,31 @@ async def cognify(datasets: Union[str, List[str]] = None):
|
||||||
|
|
||||||
data_chunks = {}
|
data_chunks = {}
|
||||||
|
|
||||||
|
chunk_strategy = infrastructure_config.get_config()["chunk_strategy"]
|
||||||
|
|
||||||
for (dataset_name, files) in dataset_files:
|
for (dataset_name, files) in dataset_files:
|
||||||
for file_metadata in files[:3]:
|
for file_metadata in files[:3]:
|
||||||
with open(file_metadata["file_path"], "rb") as file:
|
with open(file_metadata["file_path"], "rb") as file:
|
||||||
try:
|
try:
|
||||||
file_type = guess_file_type(file)
|
file_type = guess_file_type(file)
|
||||||
text = extract_text_from_file(file, file_type)
|
text = extract_text_from_file(file, file_type)
|
||||||
|
subchunks = chunk_data(chunk_strategy, text, config.chunk_size, config.chunk_overlap)
|
||||||
|
|
||||||
if dataset_name not in data_chunks:
|
if dataset_name not in data_chunks:
|
||||||
data_chunks[dataset_name] = []
|
data_chunks[dataset_name] = []
|
||||||
|
|
||||||
data_chunks[dataset_name].append(dict(text = text, file_metadata = file_metadata))
|
for subchunk in subchunks:
|
||||||
|
|
||||||
|
data_chunks[dataset_name].append(dict(text = subchunk, chunk_id=str(uuid.uuid4()), file_metadata = file_metadata))
|
||||||
except FileTypeException:
|
except FileTypeException:
|
||||||
logger.warning("File (%s) has an unknown file type. We are skipping it.", file_metadata["id"])
|
logger.warning("File (%s) has an unknown file type. We are skipping it.", file_metadata["id"])
|
||||||
|
print("Added chunks are: ", data_chunks)
|
||||||
added_chunks: list[tuple[str, str, dict]] = await add_data_chunks(data_chunks)
|
added_chunks: list[tuple[str, str, dict]] = await add_data_chunks(data_chunks)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
await asyncio.gather(
|
await asyncio.gather(
|
||||||
*[process_text(chunk["collection"], chunk["id"], chunk["text"], chunk["file_metadata"]) for chunk in added_chunks]
|
*[process_text(chunk["collection"], chunk["chunk_id"], chunk["text"], chunk["file_metadata"]) for chunk in added_chunks]
|
||||||
)
|
)
|
||||||
|
|
||||||
return graph_client.graph
|
return graph_client.graph
|
||||||
|
|
@ -161,3 +172,34 @@ async def process_text(chunk_collection: str, chunk_id: str, input_text: str, fi
|
||||||
)
|
)
|
||||||
|
|
||||||
print(f"Document ({document_id}) cognified.")
|
print(f"Document ({document_id}) cognified.")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
text = """Natural language processing (NLP) is an interdisciplinary
|
||||||
|
subfield of computer science and information retrieval"""
|
||||||
|
|
||||||
|
from cognee.api.v1.add.add import add
|
||||||
|
|
||||||
|
data_path = path.abspath(".data")
|
||||||
|
async def add_(text):
|
||||||
|
await add("data://" + "/Users/vasa/Projects/cognee/cognee/.data", "explanations")
|
||||||
|
|
||||||
|
|
||||||
|
asyncio.run(add_(text))
|
||||||
|
asyncio.run(cognify("explanations"))
|
||||||
|
|
||||||
|
import cognee
|
||||||
|
|
||||||
|
# datasets = cognee.datasets.list_datasets()
|
||||||
|
# print(datasets)
|
||||||
|
# # print(vv)
|
||||||
|
# for dataset in datasets:
|
||||||
|
# print(dataset)
|
||||||
|
# data_from_dataset = cognee.datasets.query_data(dataset)
|
||||||
|
# for file_info in data_from_dataset:
|
||||||
|
# print(file_info)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -67,4 +67,8 @@ class config():
|
||||||
infrastructure_config.set_config({
|
infrastructure_config.set_config({
|
||||||
"connect_documents": connect_documents
|
"connect_documents": connect_documents
|
||||||
})
|
})
|
||||||
|
@staticmethod
|
||||||
|
def set_chunk_strategy(chunk_strategy: object):
|
||||||
|
infrastructure_config.set_config({
|
||||||
|
"chunk_strategy": chunk_strategy
|
||||||
|
})
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ from dataclasses import dataclass, field
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from cognee.root_dir import get_absolute_path
|
from cognee.root_dir import get_absolute_path
|
||||||
|
from cognee.shared.data_models import ChunkStrategy
|
||||||
|
|
||||||
base_dir = Path(__file__).resolve().parent.parent
|
base_dir = Path(__file__).resolve().parent.parent
|
||||||
# Load the .env file from the base directory
|
# Load the .env file from the base directory
|
||||||
|
|
@ -116,6 +116,11 @@ class Config:
|
||||||
# Client ID
|
# Client ID
|
||||||
anon_clientid: Optional[str] = field(default_factory=lambda: uuid.uuid4().hex)
|
anon_clientid: Optional[str] = field(default_factory=lambda: uuid.uuid4().hex)
|
||||||
|
|
||||||
|
#Chunking parameters
|
||||||
|
chunk_size: int = 1500
|
||||||
|
chunk_overlap: int = 0
|
||||||
|
chunk_strategy: str = ChunkStrategy.PARAGRAPH
|
||||||
|
|
||||||
def load(self):
|
def load(self):
|
||||||
"""Loads the configuration from a file or environment variables."""
|
"""Loads the configuration from a file or environment variables."""
|
||||||
config = configparser.ConfigParser()
|
config = configparser.ConfigParser()
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ class InfrastructureConfig():
|
||||||
connect_documents = config.connect_documents
|
connect_documents = config.connect_documents
|
||||||
database_directory_path: str = None
|
database_directory_path: str = None
|
||||||
database_file_path: str = None
|
database_file_path: str = None
|
||||||
|
chunk_strategy = config.chunk_strategy
|
||||||
|
|
||||||
def get_config(self, config_entity: str = None) -> dict:
|
def get_config(self, config_entity: str = None) -> dict:
|
||||||
if (config_entity is None or config_entity == "database_engine") and self.database_engine is None:
|
if (config_entity is None or config_entity == "database_engine") and self.database_engine is None:
|
||||||
|
|
@ -69,6 +70,9 @@ class InfrastructureConfig():
|
||||||
if self.connect_documents is None:
|
if self.connect_documents is None:
|
||||||
self.connect_documents = config.connect_documents
|
self.connect_documents = config.connect_documents
|
||||||
|
|
||||||
|
if self.chunk_strategy is None:
|
||||||
|
self.chunk_strategy = config.chunk_strategy
|
||||||
|
|
||||||
if (config_entity is None or config_entity == "llm_engine") and self.llm_engine is None:
|
if (config_entity is None or config_entity == "llm_engine") and self.llm_engine is None:
|
||||||
self.llm_engine = OpenAIAdapter(config.openai_key, config.openai_model)
|
self.llm_engine = OpenAIAdapter(config.openai_key, config.openai_model)
|
||||||
|
|
||||||
|
|
@ -120,7 +124,8 @@ class InfrastructureConfig():
|
||||||
"embedding_engine": self.embedding_engine,
|
"embedding_engine": self.embedding_engine,
|
||||||
"connect_documents": self.connect_documents,
|
"connect_documents": self.connect_documents,
|
||||||
"database_directory_path": self.database_directory_path,
|
"database_directory_path": self.database_directory_path,
|
||||||
"database_path": self.database_file_path
|
"database_path": self.database_file_path,
|
||||||
|
"chunk_strategy": self.chunk_strategy
|
||||||
}
|
}
|
||||||
|
|
||||||
def set_config(self, new_config: dict):
|
def set_config(self, new_config: dict):
|
||||||
|
|
@ -169,4 +174,7 @@ class InfrastructureConfig():
|
||||||
if "connect_documents" in new_config:
|
if "connect_documents" in new_config:
|
||||||
self.connect_documents = new_config["connect_documents"]
|
self.connect_documents = new_config["connect_documents"]
|
||||||
|
|
||||||
|
if "chunk_strategy" in new_config:
|
||||||
|
self.chunk_strategy = new_config["chunk_strategy"]
|
||||||
|
|
||||||
infrastructure_config = InfrastructureConfig()
|
infrastructure_config = InfrastructureConfig()
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ async def add_data_chunks(dataset_data_chunks: dict[str, list[TextChunk]]):
|
||||||
dataset_name,
|
dataset_name,
|
||||||
[
|
[
|
||||||
DataPoint(
|
DataPoint(
|
||||||
id = chunk["id"],
|
id = chunk["chunk_id"],
|
||||||
payload = dict(text = chunk["text"]),
|
payload = dict(text = chunk["text"]),
|
||||||
embed_field = "text"
|
embed_field = "text"
|
||||||
) for chunk in dataset_chunks
|
) for chunk in dataset_chunks
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue