Add metadata_id attribute to Document and DocumentChunk, make ingest_with_metadata default

This commit is contained in:
Leon Luithlen 2024-11-26 16:30:25 +01:00
parent fd987ed61e
commit 7324564655
4 changed files with 8 additions and 3 deletions

View file

@ -2,7 +2,7 @@ from typing import Union, BinaryIO
from cognee.modules.users.models import User from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines import run_tasks, Task from cognee.modules.pipelines import run_tasks, Task
from cognee.tasks.ingestion import save_data_to_storage, ingest_data from cognee.tasks.ingestion import ingest_data_with_metadata
from cognee.infrastructure.databases.relational import create_db_and_tables as create_relational_db_and_tables from cognee.infrastructure.databases.relational import create_db_and_tables as create_relational_db_and_tables
from cognee.infrastructure.databases.vector.pgvector import create_db_and_tables as create_pgvector_db_and_tables from cognee.infrastructure.databases.vector.pgvector import create_db_and_tables as create_pgvector_db_and_tables
@ -14,8 +14,7 @@ async def add(data: Union[BinaryIO, list[BinaryIO], str, list[str]], dataset_nam
user = await get_default_user() user = await get_default_user()
tasks = [ tasks = [
Task(save_data_to_storage, dataset_name), Task(ingest_data_with_metadata, dataset_name, user)
Task(ingest_data, dataset_name, user)
] ]
pipeline = run_tasks(tasks, data, "add_pipeline") pipeline = run_tasks(tasks, data, "add_pipeline")

View file

@ -35,6 +35,7 @@ class TextChunker():
is_part_of = self.document, is_part_of = self.document,
chunk_index = self.chunk_index, chunk_index = self.chunk_index,
cut_type = chunk_data["cut_type"], cut_type = chunk_data["cut_type"],
metadata_id = self.document.metadata_id
) )
paragraph_chunks = [] paragraph_chunks = []
self.chunk_size = 0 self.chunk_size = 0
@ -48,6 +49,7 @@ class TextChunker():
is_part_of = self.document, is_part_of = self.document,
chunk_index = self.chunk_index, chunk_index = self.chunk_index,
cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
metadata_id = self.document.metadata_id
) )
except Exception as e: except Exception as e:
print(e) print(e)
@ -65,6 +67,7 @@ class TextChunker():
is_part_of = self.document, is_part_of = self.document,
chunk_index = self.chunk_index, chunk_index = self.chunk_index,
cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
metadata_id = self.document.metadata_id
) )
except Exception as e: except Exception as e:
print(e) print(e)

View file

@ -1,9 +1,11 @@
from cognee.infrastructure.engine import DataPoint from cognee.infrastructure.engine import DataPoint
from uuid import UUID
class Document(DataPoint): class Document(DataPoint):
type: str type: str
name: str name: str
raw_data_location: str raw_data_location: str
metadata_id: UUID
def read(self, chunk_size: int) -> str: def read(self, chunk_size: int) -> str:
pass pass

View file

@ -45,6 +45,7 @@ def classify_documents(data_documents: list[Data]) -> list[Document]:
title=f"{data_item.name}.{data_item.extension}", title=f"{data_item.name}.{data_item.extension}",
raw_data_location=data_item.raw_data_location, raw_data_location=data_item.raw_data_location,
name=data_item.name, name=data_item.name,
metadata_id=data_item.metadata_id
) )
for data_item in data_documents for data_item in data_documents
] ]