fix: Add support for SQLite and PostgreSQL for inserting data in SQLAlchemyAdapter
This commit is contained in:
parent
90657a262c
commit
d4453e4a1d
1 changed files with 30 additions and 7 deletions
|
|
@ -77,14 +77,37 @@ class SQLAlchemyAdapter:
|
|||
text(f"DROP TABLE IF EXISTS {schema_name}.{table_name} CASCADE;")
|
||||
)
|
||||
|
||||
async def insert_data(self, schema_name: str, table_name: str, data: list[dict]):
|
||||
columns = ", ".join(data[0].keys())
|
||||
values = ", ".join([f"({', '.join([f':{key}' for key in row.keys()])})" for row in data])
|
||||
insert_query = text(f"INSERT INTO {schema_name}.{table_name} ({columns}) VALUES {values};")
|
||||
async def insert_data(
|
||||
self,
|
||||
table_name: str,
|
||||
data: list[dict],
|
||||
schema_name: Optional[str] = "public",
|
||||
) -> int:
|
||||
"""
|
||||
Insert data into specified table using SQLAlchemy Core with batch optimization
|
||||
Returns number of inserted rows
|
||||
"""
|
||||
if not data:
|
||||
logger.info("No data provided for insertion")
|
||||
return 0
|
||||
|
||||
async with self.engine.begin() as connection:
|
||||
await connection.execute(insert_query, data)
|
||||
await connection.close()
|
||||
try:
|
||||
# Dialect-agnostic table reference
|
||||
if self.engine.dialect.name == "sqlite":
|
||||
table = await self.get_table(table_name) # SQLite ignores schemas
|
||||
else:
|
||||
table = await self.get_table(table_name, schema_name)
|
||||
|
||||
# Use SQLAlchemy Core insert with execution options
|
||||
async with self.engine.begin() as conn:
|
||||
result = await conn.execute(table.insert().values(data))
|
||||
|
||||
# Return rowcount for validation
|
||||
return result.rowcount
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Insert failed: {str(e)}")
|
||||
raise e # Re-raise for error handling upstream
|
||||
|
||||
async def get_schema_list(self) -> List[str]:
|
||||
"""
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue