Fix: Silence PostgreSQL logs during idempotent graph initialization

This commit is contained in:
yangdx 2025-06-23 23:08:56 +08:00
parent c18065a912
commit cc12460b38

View file

@ -289,25 +289,31 @@ class PostgreSQLDB:
sql: str, sql: str,
data: dict[str, Any] | None = None, data: dict[str, Any] | None = None,
upsert: bool = False, upsert: bool = False,
ignore_if_exists: bool = False,
with_age: bool = False, with_age: bool = False,
graph_name: str | None = None, graph_name: str | None = None,
): ):
try: try:
async with self.pool.acquire() as connection: # type: ignore async with self.pool.acquire() as connection: # type: ignore
if with_age and graph_name: if with_age and graph_name:
await self.configure_age(connection, graph_name) # type: ignore await self.configure_age(connection, graph_name)
elif with_age and not graph_name: elif with_age and not graph_name:
raise ValueError("Graph name is required when with_age is True") raise ValueError("Graph name is required when with_age is True")
if data is None: if data is None:
await connection.execute(sql) # type: ignore await connection.execute(sql)
else: else:
await connection.execute(sql, *data.values()) # type: ignore await connection.execute(sql, *data.values())
except ( except (
asyncpg.exceptions.UniqueViolationError, asyncpg.exceptions.UniqueViolationError,
asyncpg.exceptions.DuplicateTableError, asyncpg.exceptions.DuplicateTableError,
asyncpg.exceptions.DuplicateObjectError, # Catch "already exists" error
asyncpg.exceptions.InvalidSchemaNameError, # Also catch for AGE extension "already exists"
) as e: ) as e:
if upsert: if ignore_if_exists:
# If the flag is set, just ignore these specific errors
pass
elif upsert:
print("Key value duplicate, but upsert succeeded.") print("Key value duplicate, but upsert succeeded.")
else: else:
logger.error(f"Upsert error: {e}") logger.error(f"Upsert error: {e}")
@ -1212,16 +1218,15 @@ class PGGraphStorage(BaseGraphStorage):
] ]
for query in queries: for query in queries:
try: # Use the new flag to silently ignore "already exists" errors
await self.db.execute( # at the source, preventing log spam.
query, await self.db.execute(
upsert=True, query,
with_age=True, upsert=True,
graph_name=self.graph_name, ignore_if_exists=True, # Pass the new flag
) with_age=True,
# logger.info(f"Successfully executed: {query}") graph_name=self.graph_name,
except Exception: )
continue
async def finalize(self): async def finalize(self):
if self.db is not None: if self.db is not None: