From 0e52b339b2329d2f09ee6e6dd59457dcba46dd72 Mon Sep 17 00:00:00 2001 From: Lars Varming Date: Fri, 14 Nov 2025 14:34:40 +0100 Subject: [PATCH] Fix: Add shutdown handler for proper connection cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added GraphitiService.shutdown() and QueueService.shutdown() methods, and registered shutdown handler in main MCP server lifecycle to ensure proper cleanup of Neo4j connections on server exit. Changes: - Added GraphitiService.shutdown() method to close Neo4j driver - Added QueueService.shutdown() method to log worker cancellation - Wrapped run_mcp_server() in try/finally block to call shutdown - Updated dependency to graphiti-core-varming>=0.23.2 (connection pool config) This prevents "Response write failure" errors during server shutdown by ensuring all database connections are properly closed before exit. Version: graphiti-mcp-varming 1.0.6 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- mcp_server/pyproject.toml | 8 +- mcp_server/src/graphiti_mcp_server.py | 105 +++++++++++++++-------- mcp_server/src/services/queue_service.py | 19 ++++ 3 files changed, 91 insertions(+), 41 deletions(-) diff --git a/mcp_server/pyproject.toml b/mcp_server/pyproject.toml index 4c87a1e4..88f8231f 100644 --- a/mcp_server/pyproject.toml +++ b/mcp_server/pyproject.toml @@ -10,7 +10,7 @@ allow-direct-references = true [project] name = "graphiti-mcp-varming" -version = "1.0.5" +version = "1.0.6" description = "Graphiti MCP Server - Enhanced fork with additional tools by Varming" readme = "README.md" requires-python = ">=3.10,<4" @@ -31,7 +31,7 @@ classifiers = [ dependencies = [ "mcp>=1.21.0", "openai>=1.91.0", - "graphiti-core-varming>=0.23.1", # Varming fork with database parameter fix (0.23.1+varming.1) + "graphiti-core-varming>=0.23.2", # Varming fork with connection pool config (0.23.2) "pydantic-settings>=2.0.0", "pyyaml>=6.0", ] @@ -46,7 +46,7 @@ Issues = "https://github.com/Varming73/graphiti/issues" [project.optional-dependencies] # FalkorDB support (Neo4j is included in graphiti-core-varming by default) -falkordb = ["graphiti-core-varming[falkordb]>=0.23.1"] +falkordb = ["graphiti-core-varming[falkordb]>=0.23.2"] # Azure support azure = [ @@ -82,7 +82,7 @@ all = [ ] dev = [ - "graphiti-core-varming>=0.23.1", + "graphiti-core-varming>=0.23.2", "httpx>=0.28.1", "mcp>=1.21.0", "pyright>=1.1.404", diff --git a/mcp_server/src/graphiti_mcp_server.py b/mcp_server/src/graphiti_mcp_server.py index a92e5c29..5e6f7410 100644 --- a/mcp_server/src/graphiti_mcp_server.py +++ b/mcp_server/src/graphiti_mcp_server.py @@ -333,6 +333,29 @@ class GraphitiService: logger.error(f'Failed to initialize Graphiti client: {e}') raise + async def shutdown(self) -> None: + """Clean shutdown of Graphiti service. + + Closes database connections and cancels queue workers. + """ + logger.info('Shutting down Graphiti service...') + + try: + # Cancel all queue workers + if self.queue_service: + await self.queue_service.shutdown() + + # Close Graphiti client (which closes the driver) + if self.client and self.client.driver: + logger.info('Closing Neo4j driver...') + await self.client.driver.close() + logger.info('Neo4j driver closed successfully') + + except Exception as e: + logger.error(f'Error during shutdown: {e}') + finally: + logger.info('Graphiti service shutdown complete') + async def get_client(self) -> Graphiti: """Get the Graphiti client, initializing if necessary.""" if self.client is None: @@ -1614,46 +1637,54 @@ async def initialize_server() -> ServerConfig: async def run_mcp_server(): - """Run the MCP server in the current event loop.""" - # Initialize the server - mcp_config = await initialize_server() + """Run the MCP server with proper lifecycle management.""" + global graphiti_service - # Run the server with configured transport - logger.info(f'Starting MCP server with transport: {mcp_config.transport}') - if mcp_config.transport == 'stdio': - await mcp.run_stdio_async() - elif mcp_config.transport == 'sse': - logger.info( - f'Running MCP server with SSE transport on {mcp.settings.host}:{mcp.settings.port}' - ) - logger.info(f'Access the server at: http://{mcp.settings.host}:{mcp.settings.port}/sse') - await mcp.run_sse_async() - elif mcp_config.transport == 'http': - # HTTP/streamable-http is not yet supported in the current FastMCP version - # Fall back to SSE which provides similar functionality for remote connections - display_host = 'localhost' if mcp.settings.host == '0.0.0.0' else mcp.settings.host - logger.warning( - 'HTTP transport requested but not yet supported in FastMCP. ' - 'Using SSE transport instead for remote connections.' - ) - logger.info( - f'Running MCP server with SSE transport on {mcp.settings.host}:{mcp.settings.port}' - ) - logger.info('=' * 60) - logger.info('MCP Server Access Information:') - logger.info(f' Base URL: http://{display_host}:{mcp.settings.port}/') - logger.info(f' SSE Endpoint: http://{display_host}:{mcp.settings.port}/sse') - logger.info(' Transport: SSE (Server-Sent Events)') - logger.info('=' * 60) - logger.info('For MCP clients, connect to the /sse endpoint above') + try: + # Initialize the server + mcp_config = await initialize_server() - # Configure uvicorn logging to match our format - configure_uvicorn_logging() + # Run the server with configured transport + logger.info(f'Starting MCP server with transport: {mcp_config.transport}') + if mcp_config.transport == 'stdio': + await mcp.run_stdio_async() + elif mcp_config.transport == 'sse': + logger.info( + f'Running MCP server with SSE transport on {mcp.settings.host}:{mcp.settings.port}' + ) + logger.info(f'Access the server at: http://{mcp.settings.host}:{mcp.settings.port}/sse') + await mcp.run_sse_async() + elif mcp_config.transport == 'http': + # HTTP/streamable-http is not yet supported in the current FastMCP version + # Fall back to SSE which provides similar functionality for remote connections + display_host = 'localhost' if mcp.settings.host == '0.0.0.0' else mcp.settings.host + logger.warning( + 'HTTP transport requested but not yet supported in FastMCP. ' + 'Using SSE transport instead for remote connections.' + ) + logger.info( + f'Running MCP server with SSE transport on {mcp.settings.host}:{mcp.settings.port}' + ) + logger.info('=' * 60) + logger.info('MCP Server Access Information:') + logger.info(f' Base URL: http://{display_host}:{mcp.settings.port}/') + logger.info(f' SSE Endpoint: http://{display_host}:{mcp.settings.port}/sse') + logger.info(' Transport: SSE (Server-Sent Events)') + logger.info('=' * 60) + logger.info('For MCP clients, connect to the /sse endpoint above') - # Use SSE transport as fallback - await mcp.run_sse_async() - else: - raise ValueError(f'Unsupported transport: {mcp_config.transport}. Use "sse" or "stdio"') + # Configure uvicorn logging to match our format + configure_uvicorn_logging() + + # Use SSE transport as fallback + await mcp.run_sse_async() + else: + raise ValueError(f'Unsupported transport: {mcp_config.transport}. Use "sse" or "stdio"') + + finally: + # Always clean up on exit + if graphiti_service: + await graphiti_service.shutdown() def main(): diff --git a/mcp_server/src/services/queue_service.py b/mcp_server/src/services/queue_service.py index 4c4f8530..62d0c2fa 100644 --- a/mcp_server/src/services/queue_service.py +++ b/mcp_server/src/services/queue_service.py @@ -98,6 +98,25 @@ class QueueService: self._graphiti_client = graphiti_client logger.info('Queue service initialized with graphiti client') + async def shutdown(self) -> None: + """Cancel all queue workers and wait for completion.""" + if not self._queue_workers: + logger.info('No queue workers to shut down') + return + + logger.info(f'Shutting down {len(self._queue_workers)} queue workers...') + + # Cancel all worker tasks if we have task references + # Note: Currently we don't store task references, so workers will be + # cancelled when the event loop shuts down. This is a placeholder for + # Phase 4 enhancement where we'll add task tracking. + + for group_id, is_running in list(self._queue_workers.items()): + if is_running: + logger.info(f'Queue worker for {group_id} will be cancelled on shutdown') + + logger.info('Queue service shutdown complete') + async def add_episode( self, group_id: str,