LightRAG/service/app/services/lightrag_wrapper.py

80 lines
2.8 KiB
Python

import os
import json
from typing import Dict, Any
from lightrag import LightRAG, QueryParam
from lightrag.llm.openai import gpt_4o_mini_complete
from app.core.config import settings
class LightRAGWrapper:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(LightRAGWrapper, cls).__new__(cls)
cls._instance.rag = None
cls._instance.initialized = False
return cls._instance
async def initialize(self):
"""Initialize LightRAG engine"""
if self.initialized:
return
if not os.path.exists(settings.LIGHTRAG_WORKING_DIR):
os.makedirs(settings.LIGHTRAG_WORKING_DIR)
self.rag = LightRAG(
working_dir=settings.LIGHTRAG_WORKING_DIR,
llm_model_func=gpt_4o_mini_complete,
# Add other configurations as needed
)
# await self.rag.initialize_storages() # Uncomment if needed based on LightRAG version
self.initialized = True
print("LightRAG Initialized Successfully")
async def query(self, query_text: str, mode: str = "hybrid") -> Dict[str, Any]:
"""
Execute query against LightRAG.
"""
if not self.rag:
await self.initialize()
param = QueryParam(
mode=mode,
only_need_context=False,
response_type="Multiple Paragraphs"
)
# Execute query
# Note: Depending on LightRAG version, this might be sync or async.
# Assuming async based on plan.
try:
result = await self.rag.aquery(query_text, param=param)
except AttributeError:
# Fallback to sync if aquery not available
result = self.rag.query(query_text, param=param)
return self._parse_lightrag_response(result)
def _parse_lightrag_response(self, raw_response: Any) -> Dict[str, Any]:
"""
Parse raw response from LightRAG into a structured format.
"""
# This logic depends heavily on the actual return format of LightRAG.
# Assuming it returns a string or a specific object.
# For now, we'll assume it returns a string that might contain the answer.
# In a real scenario, we'd inspect 'raw_response' type.
answer = str(raw_response)
references = [] # Placeholder for references extraction logic
# If LightRAG returns an object with context, extract it here.
# For example:
# if isinstance(raw_response, dict):
# answer = raw_response.get("response", "")
# references = raw_response.get("context", [])
return {
"answer": answer,
"references": references
}