Merge pull request #467 from topoteretes/fix-insert-data

fix: Add support for SQLite and PostgreSQL for inserting data in SQLA…
This commit is contained in:
Igor Ilic 2025-01-24 19:05:39 +01:00 committed by GitHub
commit 7d23b32b5f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -77,14 +77,51 @@ class SQLAlchemyAdapter:
text(f"DROP TABLE IF EXISTS {schema_name}.{table_name} CASCADE;")
)
async def insert_data(self, schema_name: str, table_name: str, data: list[dict]):
columns = ", ".join(data[0].keys())
values = ", ".join([f"({', '.join([f':{key}' for key in row.keys()])})" for row in data])
insert_query = text(f"INSERT INTO {schema_name}.{table_name} ({columns}) VALUES {values};")
async def insert_data(
self,
table_name: str,
data: list[dict],
schema_name: Optional[str] = "public",
) -> int:
"""
Insert data into specified table using SQLAlchemy Core with batch optimization
Returns number of inserted rows
async with self.engine.begin() as connection:
await connection.execute(insert_query, data)
await connection.close()
Usage Example:
from cognee.infrastructure.databases.relational.get_relational_engine import get_relational_engine
from uuid import UUID
db = get_relational_engine()
table_name = "groups"
data = {
"id": UUID("c70a3cec-3309-44df-8ee6-eced820cf438"),
"name": "test"
}
await db.insert_data(table_name, data)
"""
if not data:
logger.info("No data provided for insertion")
return 0
try:
# Use SQLAlchemy Core insert with execution options
async with self.engine.begin() as conn:
# Dialect-agnostic table reference
if self.engine.dialect.name == "sqlite":
# Foreign key constraints are disabled by default in SQLite (for backwards compatibility),
# so must be enabled for each database connection/session separately.
await conn.execute(text("PRAGMA foreign_keys=ON"))
table = await self.get_table(table_name) # SQLite ignores schemas
else:
table = await self.get_table(table_name, schema_name)
result = await conn.execute(table.insert().values(data))
# Return rowcount for validation
return result.rowcount
except Exception as e:
logger.error(f"Insert failed: {str(e)}")
raise e # Re-raise for error handling upstream
async def get_schema_list(self) -> List[str]:
"""