From 2e367198cd32acaad1691cc108d6424db97ed673 Mon Sep 17 00:00:00 2001 From: Vasilije <8619304+Vasilije1990@users.noreply.github.com> Date: Wed, 7 Aug 2024 18:21:14 +0200 Subject: [PATCH] Task updates and updates to SQLAlchemy Adapter --- cognee/api/v1/add/add.py | 1 + cognee/api/v1/cognify/cognify.py | 308 ------------------ .../sqlalchemy/SqlAlchemyAdapter.py | 65 ++-- .../data_summary/summarize_text_chunks.py | 37 --- .../data/processing/filter_affected_chunks.py | 25 -- .../data/processing/process_documents.py | 44 --- .../data/processing/remove_obsolete_chunks.py | 29 -- .../search/llm/get_relevant_summary.py | 1 - .../chunk_extract_summary.py | 2 +- .../models/TextSummary.py | 0 .../save_chunks_to_store}/__init__.py | 0 cognee/tests/test_library.py | 3 +- 12 files changed, 42 insertions(+), 473 deletions(-) delete mode 100644 cognee/api/v1/cognify/cognify.py delete mode 100644 cognee/modules/data/extraction/data_summary/summarize_text_chunks.py delete mode 100644 cognee/modules/data/processing/filter_affected_chunks.py delete mode 100644 cognee/modules/data/processing/process_documents.py delete mode 100644 cognee/modules/data/processing/remove_obsolete_chunks.py rename cognee/{modules/data/extraction/data_summary => tasks/chunk_extract_summary}/models/TextSummary.py (100%) rename cognee/{modules/data/extraction/data_summary => tasks/save_chunks_to_store}/__init__.py (100%) diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index 90e259508..a1077ee21 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -128,6 +128,7 @@ async def add_files(file_paths: List[str], dataset_name: str, user): data.mime_type = file_metadata["mime_type"] await session.merge(data) + await session.commit() else: data = Data( id = data_id, diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py deleted file mode 100644 index a11a2cb8b..000000000 --- a/cognee/api/v1/cognify/cognify.py +++ /dev/null @@ -1,308 +0,0 @@ -import asyncio -from uuid import uuid4 -from typing import List, Union -import logging -import nltk -from asyncio import Lock -from nltk.corpus import stopwords - -from cognee.infrastructure.data.chunking.get_chunking_engine import get_chunk_engine -from cognee.infrastructure.databases.graph.config import get_graph_config -from cognee.infrastructure.databases.vector.embeddings.LiteLLMEmbeddingEngine import LiteLLMEmbeddingEngine -from cognee.modules.cognify.graph.add_node_connections import group_nodes_by_layer, \ - graph_ready_output, connect_nodes_in_graph -from cognee.modules.cognify.graph.add_data_chunks import add_data_chunks -from cognee.modules.cognify.graph.add_document_node import add_document_node -from cognee.modules.cognify.graph.add_classification_nodes import add_classification_nodes -from cognee.modules.cognify.graph.add_cognitive_layer_graphs import add_cognitive_layer_graphs -from cognee.modules.cognify.graph.add_summary_nodes import add_summary_nodes -from cognee.modules.cognify.llm.resolve_cross_graph_references import resolve_cross_graph_references -from cognee.infrastructure.databases.graph import get_graph_engine -from cognee.modules.cognify.graph.add_cognitive_layers import add_cognitive_layers -from cognee.infrastructure.files.utils.guess_file_type import guess_file_type, FileTypeException -from cognee.infrastructure.files.utils.extract_text_from_file import extract_text_from_file -from cognee.modules.data.get_content_categories import get_content_categories -from cognee.modules.data.get_content_summary import get_content_summary -from cognee.modules.data.get_cognitive_layers import get_cognitive_layers -from cognee.modules.data.get_layer_graphs import get_layer_graphs -from cognee.shared.data_models import KnowledgeGraph, ChunkStrategy, ChunkEngine -from cognee.shared.utils import send_telemetry -from cognee.modules.tasks import create_task_status_table, update_task_status -from cognee.shared.SourceCodeGraph import SourceCodeGraph -from cognee.modules.tasks import get_task_status -from cognee.modules.data.operations.get_dataset_data import get_dataset_data -from cognee.infrastructure.data.chunking.config import get_chunk_config -from cognee.modules.cognify.config import get_cognify_config -from cognee.infrastructure.databases.relational import get_relational_engine - -USER_ID = "default_user" - -logger = logging.getLogger("cognify") - -update_status_lock = Lock() - -async def cognify(datasets: Union[str, List[str]] = None): - """This function is responsible for the cognitive processing of the content.""" - # Has to be loaded in advance, multithreading doesn't work without it. - nltk.download("stopwords", quiet=True) - stopwords.ensure_loaded() - await create_task_status_table() - - graph_client = await get_graph_engine() - - db_engine = get_relational_engine() - - if datasets is None or len(datasets) == 0: - datasets = await db_engine.get_datasets() - - awaitables = [] - - async def handle_cognify_task(dataset_name: str): - async with update_status_lock: - task_status = get_task_status([dataset_name]) - - if dataset_name in task_status and task_status[dataset_name] == "DATASET_PROCESSING_STARTED": - logger.info(f"Dataset {dataset_name} is being processed.") - return - - update_task_status(dataset_name, "DATASET_PROCESSING_STARTED") - - try: - await cognify(dataset_name) - update_task_status(dataset_name, "DATASET_PROCESSING_FINISHED") - except Exception as error: - update_task_status(dataset_name, "DATASET_PROCESSING_ERROR") - raise error - - - # datasets is a list of dataset names - if isinstance(datasets, list): - for dataset_name in datasets: - awaitables.append(handle_cognify_task(dataset_name)) - - graphs = await asyncio.gather(*awaitables) - return graphs[0] - - added_datasets = await db_engine.get_datasets() - - # datasets is a dataset name string - dataset_files = [] - dataset_name = datasets.replace(".", "_").replace(" ", "_") - - for added_dataset in added_datasets: - if dataset_name in added_dataset: - dataset_files.append((added_dataset, await get_dataset_data(dataset_name = added_dataset))) - - chunk_config = get_chunk_config() - chunk_engine = get_chunk_engine() - chunk_strategy = chunk_config.chunk_strategy - - async def process_batch(files_batch): - data_chunks = {} - - for dataset_name, file_metadata, document_id in files_batch: - with open(file_metadata["file_path"], "rb") as file: - try: - file_type = guess_file_type(file) - text = extract_text_from_file(file, file_type) - if text is None: - text = "empty file" - if text == "": - text = "empty file" - subchunks,_ = chunk_engine.chunk_data(chunk_strategy, text, chunk_config.chunk_size, chunk_config.chunk_overlap) - - if dataset_name not in data_chunks: - data_chunks[dataset_name] = [] - - for subchunk in subchunks: - data_chunks[dataset_name].append(dict( - document_id = document_id, - chunk_id = str(uuid4()), - text = subchunk, - file_metadata = file_metadata, - )) - - except FileTypeException: - logger.warning("File (%s) has an unknown file type. We are skipping it.", file_metadata["id"]) - - added_chunks = await add_data_chunks(data_chunks) - # await add_data_chunks_basic_rag(data_chunks) - - await asyncio.gather( - *[process_text( - chunk["collection"], - chunk["chunk_id"], - chunk["text"], - chunk["file_metadata"], - chunk["document_id"] - ) for chunk in added_chunks] - ) - - batch_size = 20 - file_count = 0 - files_batch = [] - - graph_config = get_graph_config() - - if graph_config.infer_graph_topology and graph_config.graph_topology_task: - from cognee.modules.topology.topology import TopologyEngine - topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology) - await topology_engine.add_graph_topology(dataset_files=dataset_files) - elif not graph_config.infer_graph_topology: - from cognee.modules.topology.topology import TopologyEngine - topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology) - await topology_engine.add_graph_topology(graph_config.topology_file_path) - elif not graph_config.graph_topology_task: - parent_node_id = f"DefaultGraphModel__{USER_ID}" - - - for (dataset_name, files) in dataset_files: - for file_metadata in files: - if parent_node_id: - document_id = await add_document_node( - graph_client, - parent_node_id = parent_node_id, - document_metadata = file_metadata, - ) - else: - document_id = await add_document_node( - graph_client, - parent_node_id = file_metadata["id"], - document_metadata = file_metadata, - ) - - files_batch.append((dataset_name, file_metadata, document_id)) - file_count += 1 - - if file_count >= batch_size: - await process_batch(files_batch) - files_batch = [] - file_count = 0 - - # Process any remaining files in the last batch - if len(files_batch) > 0: - await process_batch(files_batch) - - return graph_client.graph - - -async def process_text(chunk_collection: str, chunk_id: str, input_text: str, file_metadata: dict, document_id: str): - print(f"Processing chunk ({chunk_id}) from document ({file_metadata['id']}).") - - graph_config = get_graph_config() - graph_client = await get_graph_engine() - graph_topology = graph_config.graph_model - - if graph_topology == SourceCodeGraph: - classified_categories = [{"data_type": "text", "category_name": "Code and functions"}] - elif graph_topology == KnowledgeGraph: - classified_categories = await get_content_categories(input_text) - else: - classified_categories = [{"data_type": "text", "category_name": "Unclassified text"}] - - await add_classification_nodes( - graph_client, - parent_node_id = document_id, - categories = classified_categories, - ) - print(f"Chunk ({chunk_id}) classified.") - - content_summary = await get_content_summary(input_text) - await add_summary_nodes(graph_client, document_id, content_summary) - print(f"Chunk ({chunk_id}) summarized.") - - cognify_config = get_cognify_config() - - cognitive_layers = await get_cognitive_layers(input_text, classified_categories) - cognitive_layers = cognitive_layers[:cognify_config.cognitive_layers_limit] - - try: - cognitive_layers = (await add_cognitive_layers(graph_client, document_id, cognitive_layers))[:2] - print("cognitive_layers", cognitive_layers) - layer_graphs = await get_layer_graphs(input_text, cognitive_layers) - await add_cognitive_layer_graphs(graph_client, chunk_collection, chunk_id, layer_graphs) - except: - pass - - - if cognify_config.connect_documents is True: - db_engine = get_relational_engine() - relevant_documents_to_connect = db_engine.fetch_cognify_data(excluded_document_id = document_id) - - list_of_nodes = [] - - relevant_documents_to_connect.append({ - "layer_id": document_id, - }) - - for document in relevant_documents_to_connect: - node_descriptions_to_match = await graph_client.extract_node_description(document["layer_id"]) - list_of_nodes.extend(node_descriptions_to_match) - - nodes_by_layer = await group_nodes_by_layer(list_of_nodes) - - results = await resolve_cross_graph_references(nodes_by_layer) - - relationships = graph_ready_output(results) - - await connect_nodes_in_graph( - graph_client, - relationships, - score_threshold = cognify_config.intra_layer_score_treshold - ) - - send_telemetry("cognee.cognify") - - print(f"Chunk ({chunk_id}) cognified.") - - - -if __name__ == "__main__": - - async def test(): - # await prune.prune_system() - # # - # from cognee.api.v1.add import add - # data_directory_path = os.path.abspath("../../.data") - # # print(data_directory_path) - # # config.data_root_directory(data_directory_path) - # # cognee_directory_path = os.path.abspath(".cognee_system") - # # config.system_root_directory(cognee_directory_path) - # - # await add("data://" +data_directory_path, "example") - - text = """Conservative PP in the lead in Spain, according to estimate - An estimate has been published for Spain: - - Opposition leader Alberto Núñez Feijóo’s conservative People’s party (PP): 32.4% - - Spanish prime minister Pedro Sánchez’s Socialist party (PSOE): 30.2% - - The far-right Vox party: 10.4% - - In Spain, the right has sought to turn the European election into a referendum on Sánchez. - - Ahead of the vote, public attention has focused on a saga embroiling the prime minister’s wife, Begoña Gómez, who is being investigated over allegations of corruption and influence-peddling, which Sanchez has dismissed as politically-motivated and totally baseless.""" - - from cognee.api.v1.add import add - - await add([text], "example_dataset") - - from cognee.api.v1.config.config import config - config.set_chunk_engine(ChunkEngine.LANGCHAIN_ENGINE ) - config.set_chunk_strategy(ChunkStrategy.LANGCHAIN_CHARACTER) - config.embedding_engine = LiteLLMEmbeddingEngine() - - graph = await cognify() - # vector_client = infrastructure_config.get_config("vector_engine") - # - # out = await vector_client.search(collection_name ="basic_rag", query_text="show_all_processes", limit=10) - # - # print("results", out) - # - # from cognee.shared.utils import render_graph - # - # await render_graph(graph, include_color=True, include_nodes=False, include_size=False) - - import asyncio - asyncio.run(test()) diff --git a/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py b/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py index 799c79d82..0bec5cd90 100644 --- a/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py +++ b/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py @@ -13,7 +13,11 @@ def make_async_sessionmaker(sessionmaker): @asynccontextmanager async def async_session_maker(): await asyncio.sleep(0.1) - yield FakeAsyncSession(sessionmaker()) + session = FakeAsyncSession(sessionmaker()) + try: + yield session + finally: + await session.close() # Ensure the session is closed return async_session_maker @@ -26,28 +30,34 @@ class SQLAlchemyAdapter(): LocalStorage.ensure_directory_exists(db_path) self.engine = create_engine(f"duckdb:///{self.db_location}") - self.sessionmaker = make_async_sessionmaker(sessionmaker(bind = self.engine)) + self.sessionmaker = make_async_sessionmaker(sessionmaker(bind=self.engine)) else: self.engine = create_async_engine(f"postgresql+asyncpg://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}") - self.sessionmaker = async_sessionmaker(bind = self.engine, expire_on_commit = False) - + self.sessionmaker = async_sessionmaker(bind=self.engine, expire_on_commit=False) @asynccontextmanager async def get_async_session(self) -> AsyncGenerator[AsyncSession, None]: async_session_maker = self.sessionmaker async with async_session_maker() as session: - yield session + try: + yield session + finally: + await session.close() # Ensure the session is closed def get_session(self): session_maker = self.sessionmaker with session_maker() as session: - yield session + try: + yield session + finally: + session.close() # Ensure the session is closed async def get_datasets(self): from cognee.modules.data.models import Dataset async with self.get_async_session() as session: - datasets = (await session.execute(select(Dataset).options(joinedload(Dataset.data)))).unique().scalars().all() + result = await session.execute(select(Dataset).options(joinedload(Dataset.data))) + datasets = result.unique().scalars().all() return datasets async def create_table(self, schema_name: str, table_name: str, table_config: list[dict]): @@ -55,20 +65,21 @@ class SQLAlchemyAdapter(): async with self.engine.begin() as connection: await connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name};")) await connection.execute(text(f"CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ({', '.join(fields_query_parts)});")) + await connection.close() async def delete_table(self, table_name: str): - async with self.engine.connect() as connection: + async with self.engine.begin() as connection: await connection.execute(text(f"DROP TABLE IF EXISTS {table_name} CASCADE;")) - + await connection.close() async def insert_data(self, schema_name: str, table_name: str, data: list[dict]): columns = ", ".join(data[0].keys()) values = ", ".join([f"({', '.join([f':{key}' for key in row.keys()])})" for row in data]) insert_query = text(f"INSERT INTO {schema_name}.{table_name} ({columns}) VALUES {values};") - async with self.engine.connect() as connection: + async with self.engine.begin() as connection: await connection.execute(insert_query, data) - + await connection.close() async def get_data(self, table_name: str, filters: dict = None): - async with self.engine.connect() as connection: + async with self.engine.begin() as connection: query = f"SELECT * FROM {table_name}" if filters: filter_conditions = " AND ".join([ @@ -85,26 +96,26 @@ class SQLAlchemyAdapter(): return {result["data_id"]: result["status"] for result in results} async def execute_query(self, query): - async with self.engine.connect() as connection: + async with self.engine.begin() as connection: result = await connection.execute(text(query)) return [dict(row) for row in result] - async def drop_tables(self, connection): - try: - await connection.execute(text("DROP TABLE IF EXISTS group_permission CASCADE")) - await connection.execute(text("DROP TABLE IF EXISTS permissions CASCADE")) - # Add more DROP TABLE statements for other tables as needed - print("Database tables dropped successfully.") - except Exception as e: - print(f"Error dropping database tables: {e}") + async def drop_tables(self): + async with self.engine.begin() as connection: + try: + await connection.execute(text("DROP TABLE IF EXISTS group_permission CASCADE")) + await connection.execute(text("DROP TABLE IF EXISTS permissions CASCADE")) + # Add more DROP TABLE statements for other tables as needed + print("Database tables dropped successfully.") + except Exception as e: + print(f"Error dropping database tables: {e}") async def delete_database(self): - async with self.engine.connect() as connection: + async with self.engine.begin() as connection: try: - async with connection.begin() as trans: - for table in Base.metadata.sorted_tables: - drop_table_query = text(f'DROP TABLE IF EXISTS {table.name} CASCADE') - await connection.execute(drop_table_query) + for table in Base.metadata.sorted_tables: + drop_table_query = text(f'DROP TABLE IF EXISTS {table.name} CASCADE') + await connection.execute(drop_table_query) print("Database deleted successfully.") except Exception as e: - print(f"Error deleting database: {e}") \ No newline at end of file + print(f"Error deleting database: {e}") diff --git a/cognee/modules/data/extraction/data_summary/summarize_text_chunks.py b/cognee/modules/data/extraction/data_summary/summarize_text_chunks.py deleted file mode 100644 index 922ece6c9..000000000 --- a/cognee/modules/data/extraction/data_summary/summarize_text_chunks.py +++ /dev/null @@ -1,37 +0,0 @@ - -import asyncio -from typing import Type -from pydantic import BaseModel -from cognee.infrastructure.databases.vector import get_vector_engine, DataPoint -from ...processing.chunk_types.DocumentChunk import DocumentChunk -from ...extraction.extract_summary import extract_summary -from .models.TextSummary import TextSummary - -async def summarize_text_chunks(data_chunks: list[DocumentChunk], summarization_model: Type[BaseModel], collection_name: str = "summaries"): - if len(data_chunks) == 0: - return data_chunks - - chunk_summaries = await asyncio.gather( - *[extract_summary(chunk.text, summarization_model) for chunk in data_chunks] - ) - - vector_engine = get_vector_engine() - - await vector_engine.create_collection(collection_name, payload_schema = TextSummary) - - await vector_engine.create_data_points( - collection_name, - [ - DataPoint[TextSummary]( - id = str(chunk.chunk_id), - payload = dict( - chunk_id = str(chunk.chunk_id), - document_id = str(chunk.document_id), - text = chunk_summaries[chunk_index].summary, - ), - embed_field = "text", - ) for (chunk_index, chunk) in enumerate(data_chunks) - ], - ) - - return data_chunks diff --git a/cognee/modules/data/processing/filter_affected_chunks.py b/cognee/modules/data/processing/filter_affected_chunks.py deleted file mode 100644 index 599d16ebf..000000000 --- a/cognee/modules/data/processing/filter_affected_chunks.py +++ /dev/null @@ -1,25 +0,0 @@ -from cognee.infrastructure.databases.vector import get_vector_engine -from .chunk_types import DocumentChunk - -async def filter_affected_chunks(data_chunks: list[DocumentChunk], collection_name: str) -> list[DocumentChunk]: - vector_engine = get_vector_engine() - - if not await vector_engine.has_collection(collection_name): - # If collection doesn't exist, all data_chunks are new - return data_chunks - - existing_chunks = await vector_engine.retrieve( - collection_name, - [str(chunk.chunk_id) for chunk in data_chunks], - ) - - existing_chunks_map = {chunk.id: chunk.payload for chunk in existing_chunks} - - affected_data_chunks = [] - - for chunk in data_chunks: - if chunk.chunk_id not in existing_chunks_map or \ - chunk.text != existing_chunks_map[chunk.chunk_id]["text"]: - affected_data_chunks.append(chunk) - - return affected_data_chunks diff --git a/cognee/modules/data/processing/process_documents.py b/cognee/modules/data/processing/process_documents.py deleted file mode 100644 index 8df8067b6..000000000 --- a/cognee/modules/data/processing/process_documents.py +++ /dev/null @@ -1,44 +0,0 @@ -from cognee.infrastructure.databases.graph import get_graph_engine -from .document_types import Document - -async def process_documents(documents: list[Document], parent_node_id: str = None, user:str=None, user_permissions:str=None): - graph_engine = await get_graph_engine() - - nodes = [] - edges = [] - - if parent_node_id and await graph_engine.extract_node(parent_node_id) is None: - nodes.append((parent_node_id, {})) - - document_nodes = await graph_engine.extract_nodes([str(document.id) for document in documents]) - - for (document_index, document) in enumerate(documents): - document_node = document_nodes[document_index] if document_index in document_nodes else None - - if document_node is None: - document_dict = document.to_dict() - document_dict["user"] = user - document_dict["user_permissions"] = user_permissions - nodes.append((str(document.id), document.to_dict())) - - if parent_node_id: - edges.append(( - parent_node_id, - str(document.id), - "has_document", - dict( - relationship_name = "has_document", - source_node_id = parent_node_id, - target_node_id = str(document.id), - ), - )) - - if len(nodes) > 0: - await graph_engine.add_nodes(nodes) - await graph_engine.add_edges(edges) - - for document in documents: - document_reader = document.get_reader() - - for document_chunk in document_reader.read(max_chunk_size = 1024): - yield document_chunk diff --git a/cognee/modules/data/processing/remove_obsolete_chunks.py b/cognee/modules/data/processing/remove_obsolete_chunks.py deleted file mode 100644 index 1bc70a394..000000000 --- a/cognee/modules/data/processing/remove_obsolete_chunks.py +++ /dev/null @@ -1,29 +0,0 @@ - -from cognee.infrastructure.databases.graph import get_graph_engine -# from cognee.infrastructure.databases.vector import get_vector_engine -from .chunk_types import DocumentChunk - -async def remove_obsolete_chunks(data_chunks: list[DocumentChunk]) -> list[DocumentChunk]: - graph_engine = await get_graph_engine() - - document_ids = set((data_chunk.document_id for data_chunk in data_chunks)) - - obsolete_chunk_ids = [] - - for document_id in document_ids: - chunk_ids = await graph_engine.get_successor_ids(document_id, edge_label = "has_chunk") - - for chunk_id in chunk_ids: - previous_chunks = await graph_engine.get_predecessor_ids(chunk_id, edge_label = "next_chunk") - - if len(previous_chunks) == 0: - obsolete_chunk_ids.append(chunk_id) - - if len(obsolete_chunk_ids) > 0: - await graph_engine.delete_nodes(obsolete_chunk_ids) - - disconnected_nodes = await graph_engine.get_disconnected_nodes() - if len(disconnected_nodes) > 0: - await graph_engine.delete_nodes(disconnected_nodes) - - return data_chunks diff --git a/cognee/modules/search/llm/get_relevant_summary.py b/cognee/modules/search/llm/get_relevant_summary.py index 4de0a971f..f5a3c8efb 100644 --- a/cognee/modules/search/llm/get_relevant_summary.py +++ b/cognee/modules/search/llm/get_relevant_summary.py @@ -4,7 +4,6 @@ from cognee.modules.cognify.config import get_cognify_config from .extraction.categorize_relevant_summary import categorize_relevant_summary logger = logging.getLogger(__name__) - async def get_cognitive_layers(content: str, categories: List[Dict]): try: cognify_config = get_cognify_config() diff --git a/cognee/tasks/chunk_extract_summary/chunk_extract_summary.py b/cognee/tasks/chunk_extract_summary/chunk_extract_summary.py index ffb16f6d5..8ebd5d5c3 100644 --- a/cognee/tasks/chunk_extract_summary/chunk_extract_summary.py +++ b/cognee/tasks/chunk_extract_summary/chunk_extract_summary.py @@ -3,7 +3,7 @@ import asyncio from typing import Type from pydantic import BaseModel from cognee.infrastructure.databases.vector import get_vector_engine, DataPoint -from cognee.modules.data.extraction.data_summary.models.TextSummary import TextSummary +from cognee.tasks.chunk_extract_summary.models.TextSummary import TextSummary from cognee.modules.data.extraction.extract_summary import extract_summary from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk diff --git a/cognee/modules/data/extraction/data_summary/models/TextSummary.py b/cognee/tasks/chunk_extract_summary/models/TextSummary.py similarity index 100% rename from cognee/modules/data/extraction/data_summary/models/TextSummary.py rename to cognee/tasks/chunk_extract_summary/models/TextSummary.py diff --git a/cognee/modules/data/extraction/data_summary/__init__.py b/cognee/tasks/save_chunks_to_store/__init__.py similarity index 100% rename from cognee/modules/data/extraction/data_summary/__init__.py rename to cognee/tasks/save_chunks_to_store/__init__.py diff --git a/cognee/tests/test_library.py b/cognee/tests/test_library.py index 1480020d5..b19d63d6b 100755 --- a/cognee/tests/test_library.py +++ b/cognee/tests/test_library.py @@ -6,6 +6,7 @@ import cognee logging.basicConfig(level = logging.DEBUG) async def main(): + data_directory_path = str(pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_library")).resolve()) cognee.config.data_root_directory(data_directory_path) cognee_directory_path = str(pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_library")).resolve()) @@ -69,4 +70,4 @@ async def main(): if __name__ == "__main__": import asyncio - asyncio.run(main()) + asyncio.run(main(), debug=True)