Add PostgreSQL connection retry mechanism with comprehensive error handling

• Implement connection retry with backoff
• Add transient error detection
• Pool management with timeout guards

(cherry picked from commit e758204ab2)
This commit is contained in:
yangdx 2025-10-10 01:58:51 +08:00 committed by Raphaël MANSUY
parent 89f32c4c49
commit de2713ca93
2 changed files with 28 additions and 72 deletions

View file

@ -85,11 +85,24 @@ class PostgreSQLDB:
# Statement LRU cache size (keep as-is, allow None for optional configuration) # Statement LRU cache size (keep as-is, allow None for optional configuration)
self.statement_cache_size = config.get("statement_cache_size") self.statement_cache_size = config.get("statement_cache_size")
if self.user is None or self.password is None or self.database is None: # Connection retry configuration
raise ValueError("Missing database user, password, or database") self.connection_retry_attempts = max(
1, min(10, int(os.environ.get("POSTGRES_CONNECTION_RETRIES", 3)))
# Guard concurrent pool resets )
self._pool_reconnect_lock = asyncio.Lock() self.connection_retry_backoff = max(
0.1,
min(5.0, float(os.environ.get("POSTGRES_CONNECTION_RETRY_BACKOFF", 0.5))),
)
self.connection_retry_backoff_max = max(
self.connection_retry_backoff,
min(
60.0,
float(os.environ.get("POSTGRES_CONNECTION_RETRY_BACKOFF_MAX", 5.0)),
),
)
self.pool_close_timeout = max(
1.0, min(30.0, float(os.environ.get("POSTGRES_POOL_CLOSE_TIMEOUT", 5.0)))
)
self._transient_exceptions = ( self._transient_exceptions = (
asyncio.TimeoutError, asyncio.TimeoutError,
@ -104,14 +117,12 @@ class PostgreSQLDB:
asyncpg.exceptions.ConnectionFailureError, asyncpg.exceptions.ConnectionFailureError,
) )
# Connection retry configuration # Guard concurrent pool resets
self.connection_retry_attempts = config["connection_retry_attempts"] self._pool_reconnect_lock = asyncio.Lock()
self.connection_retry_backoff = config["connection_retry_backoff"]
self.connection_retry_backoff_max = max( if self.user is None or self.password is None or self.database is None:
self.connection_retry_backoff, raise ValueError("Missing database user, password, or database")
config["connection_retry_backoff_max"],
)
self.pool_close_timeout = config["pool_close_timeout"]
logger.info( logger.info(
"PostgreSQL, Retry config: attempts=%s, backoff=%.1fs, backoff_max=%.1fs, pool_close_timeout=%.1fs", "PostgreSQL, Retry config: attempts=%s, backoff=%.1fs, backoff_max=%.1fs, pool_close_timeout=%.1fs",
self.connection_retry_attempts, self.connection_retry_attempts,
@ -204,7 +215,9 @@ class PostgreSQLDB:
# Only add statement_cache_size if it's configured # Only add statement_cache_size if it's configured
if self.statement_cache_size is not None: if self.statement_cache_size is not None:
connection_params["statement_cache_size"] = int(self.statement_cache_size) connection_params["statement_cache_size"] = int(
self.statement_cache_size
)
logger.info( logger.info(
f"PostgreSQL, statement LRU cache size set as: {self.statement_cache_size}" f"PostgreSQL, statement LRU cache size set as: {self.statement_cache_size}"
) )
@ -1553,49 +1566,6 @@ class ClientManager:
"POSTGRES_STATEMENT_CACHE_SIZE", "POSTGRES_STATEMENT_CACHE_SIZE",
config.get("postgres", "statement_cache_size", fallback=None), config.get("postgres", "statement_cache_size", fallback=None),
), ),
# Connection retry configuration
"connection_retry_attempts": min(
10,
int(
os.environ.get(
"POSTGRES_CONNECTION_RETRIES",
config.get("postgres", "connection_retries", fallback=3),
)
),
),
"connection_retry_backoff": min(
5.0,
float(
os.environ.get(
"POSTGRES_CONNECTION_RETRY_BACKOFF",
config.get(
"postgres", "connection_retry_backoff", fallback=0.5
),
)
),
),
"connection_retry_backoff_max": min(
60.0,
float(
os.environ.get(
"POSTGRES_CONNECTION_RETRY_BACKOFF_MAX",
config.get(
"postgres",
"connection_retry_backoff_max",
fallback=5.0,
),
)
),
),
"pool_close_timeout": min(
30.0,
float(
os.environ.get(
"POSTGRES_POOL_CLOSE_TIMEOUT",
config.get("postgres", "pool_close_timeout", fallback=5.0),
)
),
),
} }
@classmethod @classmethod

View file

@ -16,10 +16,9 @@ import os
import time import time
from dotenv import load_dotenv from dotenv import load_dotenv
from unittest.mock import patch from unittest.mock import patch
import asyncpg
from lightrag.kg.postgres_impl import PostgreSQLDB from lightrag.kg.postgres_impl import PostgreSQLDB
asyncpg = pytest.importorskip("asyncpg")
# Load environment variables # Load environment variables
load_dotenv(dotenv_path=".env", override=False) load_dotenv(dotenv_path=".env", override=False)
@ -38,19 +37,6 @@ class TestPostgresRetryIntegration:
"database": os.getenv("POSTGRES_DATABASE", "postgres"), "database": os.getenv("POSTGRES_DATABASE", "postgres"),
"workspace": os.getenv("POSTGRES_WORKSPACE", "test_retry"), "workspace": os.getenv("POSTGRES_WORKSPACE", "test_retry"),
"max_connections": int(os.getenv("POSTGRES_MAX_CONNECTIONS", "10")), "max_connections": int(os.getenv("POSTGRES_MAX_CONNECTIONS", "10")),
# Connection retry configuration
"connection_retry_attempts": min(
10, int(os.getenv("POSTGRES_CONNECTION_RETRIES", "3"))
),
"connection_retry_backoff": min(
5.0, float(os.getenv("POSTGRES_CONNECTION_RETRY_BACKOFF", "0.5"))
),
"connection_retry_backoff_max": min(
60.0, float(os.getenv("POSTGRES_CONNECTION_RETRY_BACKOFF_MAX", "5.0"))
),
"pool_close_timeout": min(
30.0, float(os.getenv("POSTGRES_POOL_CLOSE_TIMEOUT", "5.0"))
),
} }
@pytest.fixture @pytest.fixture