graphiti/mcp_server/queue_service.py
Daniel Chalef 452a45cb4e wip
2025-08-30 08:50:48 -07:00

86 lines
3.3 KiB
Python

"""Queue service for managing episode processing."""
import asyncio
import logging
from collections.abc import Awaitable, Callable
logger = logging.getLogger(__name__)
class QueueService:
"""Service for managing sequential episode processing queues by group_id."""
def __init__(self):
"""Initialize the queue service."""
# Dictionary to store queues for each group_id
self._episode_queues: dict[str, asyncio.Queue] = {}
# Dictionary to track if a worker is running for each group_id
self._queue_workers: dict[str, bool] = {}
async def add_episode_task(
self, group_id: str, process_func: Callable[[], Awaitable[None]]
) -> int:
"""Add an episode processing task to the queue.
Args:
group_id: The group ID for the episode
process_func: The async function to process the episode
Returns:
The position in the queue
"""
# Initialize queue for this group_id if it doesn't exist
if group_id not in self._episode_queues:
self._episode_queues[group_id] = asyncio.Queue()
# Add the episode processing function to the queue
await self._episode_queues[group_id].put(process_func)
# Start a worker for this queue if one isn't already running
if not self._queue_workers.get(group_id, False):
asyncio.create_task(self._process_episode_queue(group_id))
return self._episode_queues[group_id].qsize()
async def _process_episode_queue(self, group_id: str) -> None:
"""Process episodes for a specific group_id sequentially.
This function runs as a long-lived task that processes episodes
from the queue one at a time.
"""
logger.info(f'Starting episode queue worker for group_id: {group_id}')
self._queue_workers[group_id] = True
try:
while True:
# Get the next episode processing function from the queue
# This will wait if the queue is empty
process_func = await self._episode_queues[group_id].get()
try:
# Process the episode
await process_func()
except Exception as e:
logger.error(
f'Error processing queued episode for group_id {group_id}: {str(e)}'
)
finally:
# Mark the task as done regardless of success/failure
self._episode_queues[group_id].task_done()
except asyncio.CancelledError:
logger.info(f'Episode queue worker for group_id {group_id} was cancelled')
except Exception as e:
logger.error(f'Unexpected error in queue worker for group_id {group_id}: {str(e)}')
finally:
self._queue_workers[group_id] = False
logger.info(f'Stopped episode queue worker for group_id: {group_id}')
def get_queue_size(self, group_id: str) -> int:
"""Get the current queue size for a group_id."""
if group_id not in self._episode_queues:
return 0
return self._episode_queues[group_id].qsize()
def is_worker_running(self, group_id: str) -> bool:
"""Check if a worker is running for a group_id."""
return self._queue_workers.get(group_id, False)