feat: Add Dlt support for Sqlite
Added support for using sqlite with dlt Feature COG-678
This commit is contained in:
parent
9bd3011264
commit
56367cb0c3
4 changed files with 50 additions and 21 deletions
|
|
@ -170,7 +170,7 @@ class SQLAlchemyAdapter():
|
|||
results = await connection.execute(query)
|
||||
return {result["data_id"]: result["status"] for result in results}
|
||||
|
||||
async def get_all_data_from_table(self, table_name: str, schema: str = None):
|
||||
async def get_all_data_from_table(self, table_name: str, schema: str = "public"):
|
||||
async with self.get_async_session() as session:
|
||||
# Validate inputs to prevent SQL injection
|
||||
if not table_name.isidentifier():
|
||||
|
|
@ -178,7 +178,10 @@ class SQLAlchemyAdapter():
|
|||
if schema and not schema.isidentifier():
|
||||
raise ValueError("Invalid schema name")
|
||||
|
||||
table = await self.get_table(table_name, schema)
|
||||
if self.engine.dialect.name == "sqlite":
|
||||
table = await self.get_table(table_name)
|
||||
else:
|
||||
table = await self.get_table(table_name, schema)
|
||||
|
||||
# Query all data from the table
|
||||
query = select(table)
|
||||
|
|
@ -222,7 +225,6 @@ class SQLAlchemyAdapter():
|
|||
from cognee.infrastructure.files.storage import LocalStorage
|
||||
|
||||
LocalStorage.remove(self.db_path)
|
||||
self.db_path = None
|
||||
else:
|
||||
async with self.engine.begin() as connection:
|
||||
schema_list = await self.get_schema_list()
|
||||
|
|
|
|||
|
|
@ -1,12 +1,20 @@
|
|||
from ...relational.ModelBase import Base
|
||||
from ..get_vector_engine import get_vector_engine, get_vectordb_config
|
||||
from sqlalchemy import text
|
||||
|
||||
async def create_db_and_tables():
|
||||
vector_config = get_vectordb_config()
|
||||
vector_engine = get_vector_engine()
|
||||
|
||||
if vector_config.vector_db_provider == "pgvector":
|
||||
await vector_engine.create_database()
|
||||
async with vector_engine.engine.begin() as connection:
|
||||
await connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))
|
||||
# Note: Variable is used like a caching mechanism to not recreate tables in case they were already created
|
||||
created=False
|
||||
|
||||
async def create_db_and_tables():
|
||||
vector_config = get_vectordb_config()
|
||||
vector_engine = get_vector_engine()
|
||||
|
||||
if vector_config.vector_db_provider == "pgvector":
|
||||
global created
|
||||
if not created:
|
||||
await vector_engine.create_database()
|
||||
created = True
|
||||
|
||||
async with vector_engine.engine.begin() as connection:
|
||||
await connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))
|
||||
|
||||
|
|
|
|||
|
|
@ -1,9 +1,12 @@
|
|||
import os
|
||||
from functools import lru_cache
|
||||
|
||||
import dlt
|
||||
from typing import Union
|
||||
|
||||
from cognee.infrastructure.databases.relational import get_relational_config
|
||||
|
||||
@lru_cache
|
||||
def get_dlt_destination() -> Union[type[dlt.destinations.sqlalchemy], None]:
|
||||
"""
|
||||
Handles propagation of the cognee database configuration to the dlt library
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import dlt
|
||||
import cognee.modules.ingestion as ingestion
|
||||
|
||||
from uuid import UUID
|
||||
from cognee.shared.utils import send_telemetry
|
||||
from cognee.modules.users.models import User
|
||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||
|
|
@ -43,7 +44,7 @@ async def ingest_data(file_paths: list[str], dataset_name: str, user: User):
|
|||
dataset = await create_dataset(dataset_name, user.id, session)
|
||||
|
||||
data = (await session.execute(
|
||||
select(Data).filter(Data.id == file_metadata["id"])
|
||||
select(Data).filter(Data.id == UUID(file_metadata["id"]))
|
||||
)).scalar_one_or_none()
|
||||
|
||||
if data is not None:
|
||||
|
|
@ -56,7 +57,7 @@ async def ingest_data(file_paths: list[str], dataset_name: str, user: User):
|
|||
await session.commit()
|
||||
else:
|
||||
data = Data(
|
||||
id = file_metadata["id"],
|
||||
id = UUID(file_metadata["id"]),
|
||||
name = file_metadata["name"],
|
||||
raw_data_location = file_metadata["file_path"],
|
||||
extension = file_metadata["extension"],
|
||||
|
|
@ -66,17 +67,32 @@ async def ingest_data(file_paths: list[str], dataset_name: str, user: User):
|
|||
dataset.data.append(data)
|
||||
await session.commit()
|
||||
|
||||
await give_permission_on_document(user, file_metadata["id"], "read")
|
||||
await give_permission_on_document(user, file_metadata["id"], "write")
|
||||
await give_permission_on_document(user, UUID(file_metadata["id"]), "read")
|
||||
await give_permission_on_document(user, UUID(file_metadata["id"]), "write")
|
||||
|
||||
|
||||
send_telemetry("cognee.add EXECUTION STARTED", user_id = user.id)
|
||||
run_info = pipeline.run(
|
||||
data_resources(file_paths),
|
||||
table_name = "file_metadata",
|
||||
dataset_name = dataset_name,
|
||||
write_disposition = "merge",
|
||||
)
|
||||
|
||||
db_engine = get_relational_engine()
|
||||
|
||||
# Note: DLT pipeline has its own event loop, therefore objects created in another event loop
|
||||
# can't be used inside the pipeline
|
||||
if db_engine.engine.dialect.name == "sqlite":
|
||||
# To use sqlite with dlt dataset_name must be set to "main".
|
||||
# Sqlite doesn't support schemas
|
||||
run_info = pipeline.run(
|
||||
data_resources(file_paths),
|
||||
table_name = "file_metadata",
|
||||
dataset_name = "main",
|
||||
write_disposition = "merge",
|
||||
)
|
||||
else:
|
||||
run_info = pipeline.run(
|
||||
data_resources(file_paths),
|
||||
table_name="file_metadata",
|
||||
dataset_name=dataset_name,
|
||||
write_disposition="merge",
|
||||
)
|
||||
|
||||
await data_storing("file_metadata", dataset_name, user)
|
||||
send_telemetry("cognee.add EXECUTION COMPLETED", user_id = user.id)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue