From de2713ca937cabe3b69d4bb1a0acd6349b159a96 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 10 Oct 2025 01:58:51 +0800 Subject: [PATCH] Add PostgreSQL connection retry mechanism with comprehensive error handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Implement connection retry with backoff • Add transient error detection • Pool management with timeout guards (cherry picked from commit e758204ab2a15b6cc269baaebfc6055b141fc146) --- lightrag/kg/postgres_impl.py | 84 ++++++++---------------- tests/test_postgres_retry_integration.py | 16 +---- 2 files changed, 28 insertions(+), 72 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 93f0cad7..703730ab 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -85,11 +85,24 @@ class PostgreSQLDB: # Statement LRU cache size (keep as-is, allow None for optional configuration) self.statement_cache_size = config.get("statement_cache_size") - if self.user is None or self.password is None or self.database is None: - raise ValueError("Missing database user, password, or database") - - # Guard concurrent pool resets - self._pool_reconnect_lock = asyncio.Lock() + # Connection retry configuration + self.connection_retry_attempts = max( + 1, min(10, int(os.environ.get("POSTGRES_CONNECTION_RETRIES", 3))) + ) + self.connection_retry_backoff = max( + 0.1, + min(5.0, float(os.environ.get("POSTGRES_CONNECTION_RETRY_BACKOFF", 0.5))), + ) + self.connection_retry_backoff_max = max( + self.connection_retry_backoff, + min( + 60.0, + float(os.environ.get("POSTGRES_CONNECTION_RETRY_BACKOFF_MAX", 5.0)), + ), + ) + self.pool_close_timeout = max( + 1.0, min(30.0, float(os.environ.get("POSTGRES_POOL_CLOSE_TIMEOUT", 5.0))) + ) self._transient_exceptions = ( asyncio.TimeoutError, @@ -104,14 +117,12 @@ class PostgreSQLDB: asyncpg.exceptions.ConnectionFailureError, ) - # Connection retry configuration - self.connection_retry_attempts = config["connection_retry_attempts"] - self.connection_retry_backoff = config["connection_retry_backoff"] - self.connection_retry_backoff_max = max( - self.connection_retry_backoff, - config["connection_retry_backoff_max"], - ) - self.pool_close_timeout = config["pool_close_timeout"] + # Guard concurrent pool resets + self._pool_reconnect_lock = asyncio.Lock() + + if self.user is None or self.password is None or self.database is None: + raise ValueError("Missing database user, password, or database") + logger.info( "PostgreSQL, Retry config: attempts=%s, backoff=%.1fs, backoff_max=%.1fs, pool_close_timeout=%.1fs", self.connection_retry_attempts, @@ -204,7 +215,9 @@ class PostgreSQLDB: # Only add statement_cache_size if it's configured if self.statement_cache_size is not None: - connection_params["statement_cache_size"] = int(self.statement_cache_size) + connection_params["statement_cache_size"] = int( + self.statement_cache_size + ) logger.info( f"PostgreSQL, statement LRU cache size set as: {self.statement_cache_size}" ) @@ -1553,49 +1566,6 @@ class ClientManager: "POSTGRES_STATEMENT_CACHE_SIZE", config.get("postgres", "statement_cache_size", fallback=None), ), - # Connection retry configuration - "connection_retry_attempts": min( - 10, - int( - os.environ.get( - "POSTGRES_CONNECTION_RETRIES", - config.get("postgres", "connection_retries", fallback=3), - ) - ), - ), - "connection_retry_backoff": min( - 5.0, - float( - os.environ.get( - "POSTGRES_CONNECTION_RETRY_BACKOFF", - config.get( - "postgres", "connection_retry_backoff", fallback=0.5 - ), - ) - ), - ), - "connection_retry_backoff_max": min( - 60.0, - float( - os.environ.get( - "POSTGRES_CONNECTION_RETRY_BACKOFF_MAX", - config.get( - "postgres", - "connection_retry_backoff_max", - fallback=5.0, - ), - ) - ), - ), - "pool_close_timeout": min( - 30.0, - float( - os.environ.get( - "POSTGRES_POOL_CLOSE_TIMEOUT", - config.get("postgres", "pool_close_timeout", fallback=5.0), - ) - ), - ), } @classmethod diff --git a/tests/test_postgres_retry_integration.py b/tests/test_postgres_retry_integration.py index 515f3072..71e5c47d 100644 --- a/tests/test_postgres_retry_integration.py +++ b/tests/test_postgres_retry_integration.py @@ -16,10 +16,9 @@ import os import time from dotenv import load_dotenv from unittest.mock import patch +import asyncpg from lightrag.kg.postgres_impl import PostgreSQLDB -asyncpg = pytest.importorskip("asyncpg") - # Load environment variables load_dotenv(dotenv_path=".env", override=False) @@ -38,19 +37,6 @@ class TestPostgresRetryIntegration: "database": os.getenv("POSTGRES_DATABASE", "postgres"), "workspace": os.getenv("POSTGRES_WORKSPACE", "test_retry"), "max_connections": int(os.getenv("POSTGRES_MAX_CONNECTIONS", "10")), - # Connection retry configuration - "connection_retry_attempts": min( - 10, int(os.getenv("POSTGRES_CONNECTION_RETRIES", "3")) - ), - "connection_retry_backoff": min( - 5.0, float(os.getenv("POSTGRES_CONNECTION_RETRY_BACKOFF", "0.5")) - ), - "connection_retry_backoff_max": min( - 60.0, float(os.getenv("POSTGRES_CONNECTION_RETRY_BACKOFF_MAX", "5.0")) - ), - "pool_close_timeout": min( - 30.0, float(os.getenv("POSTGRES_POOL_CLOSE_TIMEOUT", "5.0")) - ), } @pytest.fixture