diff --git a/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py b/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py index 68561979d..f09435b4f 100644 --- a/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py +++ b/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py @@ -77,14 +77,37 @@ class SQLAlchemyAdapter: text(f"DROP TABLE IF EXISTS {schema_name}.{table_name} CASCADE;") ) - async def insert_data(self, schema_name: str, table_name: str, data: list[dict]): - columns = ", ".join(data[0].keys()) - values = ", ".join([f"({', '.join([f':{key}' for key in row.keys()])})" for row in data]) - insert_query = text(f"INSERT INTO {schema_name}.{table_name} ({columns}) VALUES {values};") + async def insert_data( + self, + table_name: str, + data: list[dict], + schema_name: Optional[str] = "public", + ) -> int: + """ + Insert data into specified table using SQLAlchemy Core with batch optimization + Returns number of inserted rows + """ + if not data: + logger.info("No data provided for insertion") + return 0 - async with self.engine.begin() as connection: - await connection.execute(insert_query, data) - await connection.close() + try: + # Dialect-agnostic table reference + if self.engine.dialect.name == "sqlite": + table = await self.get_table(table_name) # SQLite ignores schemas + else: + table = await self.get_table(table_name, schema_name) + + # Use SQLAlchemy Core insert with execution options + async with self.engine.begin() as conn: + result = await conn.execute(table.insert().values(data)) + + # Return rowcount for validation + return result.rowcount + + except Exception as e: + logger.error(f"Insert failed: {str(e)}") + raise e # Re-raise for error handling upstream async def get_schema_list(self) -> List[str]: """