Refactor of the tasks
This commit is contained in:
parent
156c7bec68
commit
1c9bbd7a43
2 changed files with 2 additions and 12 deletions
|
|
@ -89,18 +89,6 @@ async def cognify(datasets: Union[str, list[str]] = None, user: User = None):
|
||||||
cognee_config = get_cognify_config()
|
cognee_config = get_cognify_config()
|
||||||
graph_config = get_graph_config()
|
graph_config = get_graph_config()
|
||||||
root_node_id = None
|
root_node_id = None
|
||||||
#
|
|
||||||
# if graph_config.infer_graph_topology and graph_config.graph_topology_task:
|
|
||||||
# from cognee.modules.topology.topology import TopologyEngine
|
|
||||||
# topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology)
|
|
||||||
# root_node_id = await topology_engine.add_graph_topology(files = data)
|
|
||||||
# elif graph_config.infer_graph_topology and not graph_config.infer_graph_topology:
|
|
||||||
# from cognee.modules.topology.topology import TopologyEngine
|
|
||||||
# topology_engine = TopologyEngine(infer=graph_config.infer_graph_topology)
|
|
||||||
# await topology_engine.add_graph_topology(graph_config.topology_file_path)
|
|
||||||
# elif not graph_config.graph_topology_task:
|
|
||||||
# root_node_id = "ROOT"
|
|
||||||
|
|
||||||
tasks = [
|
tasks = [
|
||||||
Task(document_to_ontology, root_node_id = root_node_id),
|
Task(document_to_ontology, root_node_id = root_node_id),
|
||||||
Task(source_documents_to_chunks, parent_node_id = root_node_id), # Classify documents and save them as a nodes in graph db, extract text chunks based on the document type
|
Task(source_documents_to_chunks, parent_node_id = root_node_id), # Classify documents and save them as a nodes in graph db, extract text chunks based on the document type
|
||||||
|
|
|
||||||
|
|
@ -70,11 +70,13 @@ class SQLAlchemyAdapter():
|
||||||
async def delete_table(self, table_name: str):
|
async def delete_table(self, table_name: str):
|
||||||
async with self.engine.begin() as connection:
|
async with self.engine.begin() as connection:
|
||||||
await connection.execute(text(f"DROP TABLE IF EXISTS {table_name} CASCADE;"))
|
await connection.execute(text(f"DROP TABLE IF EXISTS {table_name} CASCADE;"))
|
||||||
|
|
||||||
await connection.close()
|
await connection.close()
|
||||||
async def insert_data(self, schema_name: str, table_name: str, data: list[dict]):
|
async def insert_data(self, schema_name: str, table_name: str, data: list[dict]):
|
||||||
columns = ", ".join(data[0].keys())
|
columns = ", ".join(data[0].keys())
|
||||||
values = ", ".join([f"({', '.join([f':{key}' for key in row.keys()])})" for row in data])
|
values = ", ".join([f"({', '.join([f':{key}' for key in row.keys()])})" for row in data])
|
||||||
insert_query = text(f"INSERT INTO {schema_name}.{table_name} ({columns}) VALUES {values};")
|
insert_query = text(f"INSERT INTO {schema_name}.{table_name} ({columns}) VALUES {values};")
|
||||||
|
|
||||||
async with self.engine.begin() as connection:
|
async with self.engine.begin() as connection:
|
||||||
await connection.execute(insert_query, data)
|
await connection.execute(insert_query, data)
|
||||||
await connection.close()
|
await connection.close()
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue