From 57e3e2ef9029398d243cc8f706d8b36df101f191 Mon Sep 17 00:00:00 2001 From: Vasilije <8619304+Vasilije1990@users.noreply.github.com> Date: Sat, 18 May 2024 11:51:10 +0200 Subject: [PATCH] fix graph logic --- cognee/api/v1/cognify/cognify.py | 165 +++++------------- cognee/config.py | 1 + .../cognify/graph/add_document_node.py | 1 + 3 files changed, 48 insertions(+), 119 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 3cf5804ce..d6ae1f0b0 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -16,6 +16,7 @@ from cognee.modules.cognify.graph.add_data_chunks import add_data_chunks from cognee.modules.cognify.graph.add_document_node import add_document_node from cognee.modules.cognify.graph.add_classification_nodes import add_classification_nodes from cognee.modules.cognify.graph.add_cognitive_layer_graphs import add_cognitive_layer_graphs +from cognee.modules.cognify.graph.add_label_nodes import add_label_nodes from cognee.modules.cognify.graph.add_summary_nodes import add_summary_nodes from cognee.modules.cognify.graph.add_node_connections import group_nodes_by_layer, \ graph_ready_output, connect_nodes_in_graph @@ -32,9 +33,9 @@ from cognee.modules.data.get_cognitive_layers import get_cognitive_layers from cognee.modules.data.get_layer_graphs import get_layer_graphs from cognee.modules.topology.topology import TopologyEngine from cognee.shared.GithubClassification import CodeContentPrediction -from cognee.shared.data_models import ChunkStrategy, DefaultGraphModel +from cognee.shared.data_models import ChunkStrategy, DefaultGraphModel, KnowledgeGraph from cognee.utils import send_telemetry - +from cognee.shared.SourceCodeGraph import SourceCodeGraph config = Config() config.load() @@ -111,7 +112,7 @@ async def cognify(datasets: Union[str, List[str]] = None): await asyncio.gather( - *[process_text(chunk["collection"], chunk["chunk_id"], chunk["text"], chunk["file_metadata"]) for chunk in + *[process_text(chunk["collection"], chunk["chunk_id"], chunk["text"], chunk["file_metadata"],chunk['document_id']) for chunk in added_chunks] ) @@ -122,9 +123,17 @@ async def cognify(datasets: Union[str, List[str]] = None): for (dataset_name, files) in dataset_files: for file_metadata in files: + graph_topology = infrastructure_config.get_config()["graph_model"] + if graph_topology == SourceCodeGraph: + parent_node_id = f"{file_metadata['name']}.{file_metadata['extension']}" + + elif graph_topology == KnowledgeGraph: + parent_node_id = f"DefaultGraphModel__{USER_ID}" + else: + parent_node_id = f"DefaultGraphModel__{USER_ID}" document_id = await add_document_node( graph_client, - parent_node_id=f"DefaultGraphModel__{USER_ID}", + parent_node_id=parent_node_id, document_metadata=file_metadata, ) files_batch.append((dataset_name, file_metadata, document_id)) @@ -141,132 +150,48 @@ async def cognify(datasets: Union[str, List[str]] = None): return graph_client.graph - # - # for (dataset_name, files) in dataset_files: - # for file_metadata in files: - # with open(file_metadata["file_path"], "rb") as file: - # try: - # file_type = guess_file_type(file) - # text = extract_text_from_file(file, file_type) - # if text is None: - # text = "empty file" - # subchunks = chunk_engine.chunk_data(chunk_strategy, text, config.chunk_size, config.chunk_overlap) - # - # if dataset_name not in data_chunks: - # data_chunks[dataset_name] = [] - # - # for subchunk in subchunks: - # data_chunks[dataset_name].append(dict(text = subchunk, chunk_id = str(uuid4()), file_metadata = file_metadata)) - # except FileTypeException: - # logger.warning("File (%s) has an unknown file type. We are skipping it.", file_metadata["id"]) - # - # - # - # - # added_chunks: list[tuple[str, str, dict]] = await add_data_chunks(data_chunks) - # - # await asyncio.gather( - # *[process_text(chunk["collection"], chunk["chunk_id"], chunk["text"], chunk["file_metadata"]) for chunk in added_chunks] - # ) - # - # return graph_client.graph -async def process_text(chunk_collection: str, chunk_id: str, input_text: str, file_metadata: dict): +async def process_text(chunk_collection: str, chunk_id: str, input_text: str, file_metadata: dict, document_id: str): print(f"Processing chunk ({chunk_id}) from document ({file_metadata['id']}).") graph_client = await get_graph_client(infrastructure_config.get_config()["graph_engine"]) - graph_topology = infrastructure_config.get_config()["graph_topology"] - if graph_topology == "default": - parent_node_id = f"{file_metadata['name']}.{file_metadata['extension']}" - elif graph_topology == DefaultGraphModel: - parent_node_id = f"DefaultGraphModel__{USER_ID}" + graph_topology = infrastructure_config.get_config()["graph_model"] + if graph_topology == SourceCodeGraph: + classified_categories = [{'data_type': 'text', 'category_name': 'Code and functions'}] + elif graph_topology == KnowledgeGraph: + classified_categories = await get_content_categories(input_text) + else: + classified_categories = [{'data_type': 'text', 'category_name': 'Unclassified text'}] - document_id = await add_document_node( - graph_client, - parent_node_id = f"{file_metadata['name']}.{file_metadata['extension']}", #make a param of defaultgraph model to make sure when user passes his stuff, it doesn't break pipeline - document_metadata = file_metadata, - ) - # print("got here2") # await add_label_nodes(graph_client, document_id, chunk_id, file_metadata["keywords"].split("|")) - # classified_categories = await get_content_categories(input_text) - # - # print("classified_categories", classified_categories) - # await add_classification_nodes( - # graph_client, - # parent_node_id = document_id, - # categories = classified_categories, - # ) + await add_classification_nodes( + graph_client, + parent_node_id = document_id, + categories = classified_categories, + ) + + print(f"Chunk ({chunk_id}) classified.") + content_summary = await get_content_summary(input_text) + await add_summary_nodes(graph_client, document_id, content_summary) + print(f"Chunk ({chunk_id}) summarized.") + cognitive_layers = await get_cognitive_layers(input_text, classified_categories) + cognitive_layers = cognitive_layers[:config.cognitive_layers_limit] + + try: + cognitive_layers = (await add_cognitive_layers(graph_client, document_id, cognitive_layers))[:2] + print("cognitive_layers", cognitive_layers) + layer_graphs = await get_layer_graphs(input_text, cognitive_layers) + await add_cognitive_layer_graphs(graph_client, chunk_collection, chunk_id, layer_graphs) + except: + pass + + - classified_categories= [{'data_type': 'text', 'category_name': 'Source code in various programming languages'}] -# -# async def process_text(document_id: str, chunk_id: str, chunk_collection: str, input_text: str): -# raw_document_id = document_id.split("__")[-1] -# -# print(f"Processing chunk ({chunk_id}) from document ({raw_document_id}).") -# -# graph_client = await get_graph_client(infrastructure_config.get_config()["graph_engine"]) -# -# classified_categories = await get_content_categories(input_text) -# await add_classification_nodes( -# graph_client, -# parent_node_id = document_id, -# categories = classified_categories, -# ) -# >>>>>>> origin/main -# -# print(f"Chunk ({chunk_id}) classified.") -# -# # print("document_id", document_id) -# # -# # content_summary = await get_content_summary(input_text) -# # await add_summary_nodes(graph_client, document_id, content_summary) -# -# print(f"Chunk ({chunk_id}) summarized.") -# # -# cognitive_layers = await get_cognitive_layers(input_text, classified_categories) -# cognitive_layers = (await add_cognitive_layers(graph_client, document_id, cognitive_layers))[:2] -# # -# layer_graphs = await get_layer_graphs(input_text, cognitive_layers) -# await add_cognitive_layer_graphs(graph_client, chunk_collection, chunk_id, layer_graphs) -# -# <<<<<<< HEAD -# print("got here 4444") - # - # if infrastructure_config.get_config()["connect_documents"] is True: - # db_engine = infrastructure_config.get_config()["database_engine"] - # relevant_documents_to_connect = db_engine.fetch_cognify_data(excluded_document_id = file_metadata["id"]) - # - # list_of_nodes = [] - # - # relevant_documents_to_connect.append({ - # "layer_id": document_id, - # }) - # - # for document in relevant_documents_to_connect: - # node_descriptions_to_match = await graph_client.extract_node_description(document["layer_id"]) - # list_of_nodes.extend(node_descriptions_to_match) - # - # nodes_by_layer = await group_nodes_by_layer(list_of_nodes) - # - # results = await resolve_cross_graph_references(nodes_by_layer) - # - # relationships = graph_ready_output(results) - # - # await connect_nodes_in_graph( - # graph_client, - # relationships, - # score_threshold = infrastructure_config.get_config()["intra_layer_score_treshold"] - # ) - # - # send_telemetry("cognee.cognify") - # - # print(f"Chunk ({chunk_id}) cognified.") -# ======= # if infrastructure_config.get_config()["connect_documents"] is True: # db_engine = infrastructure_config.get_config()["database_engine"] # relevant_documents_to_connect = db_engine.fetch_cognify_data(excluded_document_id = raw_document_id) @@ -296,7 +221,7 @@ async def process_text(chunk_collection: str, chunk_id: str, input_text: str, fi # send_telemetry("cognee.cognify") # # print(f"Chunk ({chunk_id}) cognified.") -# >>>>>>> origin/main + if __name__ == "__main__": @@ -319,6 +244,8 @@ if __name__ == "__main__": config.set_graph_model(SourceCodeGraph) + + config.set_classification_model(CodeContentPrediction) graph = await cognify() diff --git a/cognee/config.py b/cognee/config.py index 149a72eab..c9548198b 100644 --- a/cognee/config.py +++ b/cognee/config.py @@ -77,6 +77,7 @@ class Config: # Database parameters graph_database_provider: str = os.getenv("GRAPH_DB_PROVIDER", "NETWORKX") graph_topology:str = DefaultGraphModel + cognitive_layers_limit: int = 2 if ( os.getenv("ENV") == "prod" diff --git a/cognee/modules/cognify/graph/add_document_node.py b/cognee/modules/cognify/graph/add_document_node.py index b70f700e5..af4477f8a 100644 --- a/cognee/modules/cognify/graph/add_document_node.py +++ b/cognee/modules/cognify/graph/add_document_node.py @@ -17,6 +17,7 @@ async def add_document_node(graph_client: GraphDBInterface, parent_node_id, docu document["type"] = "Document" await graph_client.add_node(document_id, document) + print(f"Added document node: {document_id}") await graph_client.add_edge( parent_node_id,