349 lines
13 KiB
Python
349 lines
13 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
from enum import Enum
|
|
from typing import Dict, Any
|
|
|
|
import uvicorn
|
|
from fastapi import FastAPI, BackgroundTasks, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
from pydantic import BaseModel
|
|
|
|
from cognitive_architecture.database.postgres.database import AsyncSessionLocal
|
|
from cognitive_architecture.database.postgres.database_crud import session_scope
|
|
from cognitive_architecture.vectorstore_manager import Memory
|
|
from dotenv import load_dotenv
|
|
from main import add_documents_to_graph_db, user_context_enrichment
|
|
from cognitive_architecture.config import Config
|
|
from fastapi import Depends
|
|
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
|
|
|
|
# Set up logging
|
|
logging.basicConfig(
|
|
level=logging.INFO, # Set the logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL)
|
|
format="%(asctime)s [%(levelname)s] %(message)s", # Set the log message format
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
load_dotenv()
|
|
|
|
config = Config()
|
|
config.load()
|
|
|
|
app = FastAPI(debug=True)
|
|
#
|
|
# from auth.cognito.JWTBearer import JWTBearer
|
|
# from auth.auth import jwks
|
|
#
|
|
# auth = JWTBearer(jwks)
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
"""
|
|
Root endpoint that returns a welcome message.
|
|
"""
|
|
return { "message": "Hello, World, I am alive!" }
|
|
|
|
|
|
@app.get("/health")
|
|
def health_check():
|
|
"""
|
|
Health check endpoint that returns the server status.
|
|
"""
|
|
return {"status": "OK"}
|
|
|
|
|
|
class Payload(BaseModel):
|
|
payload: Dict[str, Any]
|
|
|
|
|
|
@app.post("/add-memory", response_model=dict)
|
|
async def add_memory(
|
|
payload: Payload,
|
|
# files: List[UploadFile] = File(...),
|
|
):
|
|
try:
|
|
logging.info(" Adding to Memory ")
|
|
decoded_payload = payload.payload
|
|
async with session_scope(session=AsyncSessionLocal()) as session:
|
|
from main import load_documents_to_vectorstore
|
|
|
|
if 'settings' in decoded_payload and decoded_payload['settings'] is not None:
|
|
settings_for_loader = decoded_payload['settings']
|
|
else:
|
|
settings_for_loader = None
|
|
|
|
if 'content' in decoded_payload and decoded_payload['content'] is not None:
|
|
content = decoded_payload['content']
|
|
else:
|
|
content = None
|
|
|
|
output = await load_documents_to_vectorstore(session, decoded_payload['user_id'], content=content,
|
|
loader_settings=settings_for_loader)
|
|
return JSONResponse(content={"response": output}, status_code=200)
|
|
|
|
except Exception as e:
|
|
return JSONResponse(
|
|
content={"response": {"error": str(e)}}, status_code=503
|
|
)
|
|
|
|
|
|
@app.post("/add-architecture-public-memory", response_model=dict)
|
|
async def add_memory(
|
|
payload: Payload,
|
|
# files: List[UploadFile] = File(...),
|
|
):
|
|
try:
|
|
logging.info(" Adding to Memory ")
|
|
decoded_payload = payload.payload
|
|
async with session_scope(session=AsyncSessionLocal()) as session:
|
|
from main import load_documents_to_vectorstore
|
|
if 'content' in decoded_payload and decoded_payload['content'] is not None:
|
|
content = decoded_payload['content']
|
|
else:
|
|
content = None
|
|
|
|
user_id = 'system_user'
|
|
loader_settings = {
|
|
"format": "PDF",
|
|
"source": "DEVICE",
|
|
"path": [".data"]
|
|
}
|
|
|
|
output = await load_documents_to_vectorstore(session, user_id=user_id, content=content,
|
|
loader_settings=loader_settings)
|
|
return JSONResponse(content={"response": output}, status_code=200)
|
|
|
|
except Exception as e:
|
|
return JSONResponse(
|
|
content={"response": {"error": str(e)}}, status_code=503
|
|
)
|
|
|
|
|
|
@app.post("/user-query-to-graph")
|
|
async def user_query_to_graph(payload: Payload):
|
|
try:
|
|
from main import user_query_to_graph_db
|
|
decoded_payload = payload.payload
|
|
# Execute the query - replace this with the actual execution method
|
|
async with session_scope(session=AsyncSessionLocal()) as session:
|
|
# Assuming you have a method in Neo4jGraphDB to execute the query
|
|
result = await user_query_to_graph_db(session=session, user_id=decoded_payload['user_id'],
|
|
query_input=decoded_payload['query'])
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/document-to-graph-db")
|
|
async def document_to_graph_db(payload: Payload):
|
|
logging.info("Adding documents to graph db")
|
|
try:
|
|
decoded_payload = payload.payload
|
|
if 'settings' in decoded_payload and decoded_payload['settings'] is not None:
|
|
settings_for_loader = decoded_payload['settings']
|
|
else:
|
|
settings_for_loader = None
|
|
if 'memory_type' in decoded_payload and decoded_payload['memory_type'] is not None:
|
|
memory_type = decoded_payload['memory_type']
|
|
else:
|
|
memory_type = None
|
|
async with session_scope(session=AsyncSessionLocal()) as session:
|
|
result = await add_documents_to_graph_db(session=session, user_id=decoded_payload['user_id'],
|
|
document_memory_types=memory_type)
|
|
return result
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/cognitive-context-enrichment")
|
|
async def cognitive_context_enrichment(payload: Payload):
|
|
try:
|
|
decoded_payload = payload.payload
|
|
async with session_scope(session=AsyncSessionLocal()) as session:
|
|
result = await user_context_enrichment(session, user_id=decoded_payload['user_id'],
|
|
query=decoded_payload['query'],
|
|
generative_response=decoded_payload['generative_response'],
|
|
memory_type=decoded_payload['memory_type'])
|
|
return JSONResponse(content={"response": result}, status_code=200)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/classify-user-query")
|
|
async def classify_user_query(payload: Payload):
|
|
try:
|
|
decoded_payload = payload.payload
|
|
async with session_scope(session=AsyncSessionLocal()) as session:
|
|
from main import relevance_feedback
|
|
result = await relevance_feedback(query=decoded_payload['query'],
|
|
input_type=decoded_payload['knowledge_type'])
|
|
return JSONResponse(content={"response": result}, status_code=200)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/user-query-classifier")
|
|
async def user_query_classfier(payload: Payload):
|
|
try:
|
|
decoded_payload = payload.payload
|
|
|
|
# Execute the query - replace this with the actual execution method
|
|
async with session_scope(session=AsyncSessionLocal()) as session:
|
|
from cognitive_architecture.classifiers.classifier import classify_user_query
|
|
# Assuming you have a method in Neo4jGraphDB to execute the query
|
|
result = await classify_user_query(session, decoded_payload['user_id'], decoded_payload['query'])
|
|
return JSONResponse(content={"response": result}, status_code=200)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/drop-db")
|
|
async def drop_db(payload: Payload):
|
|
try:
|
|
decoded_payload = payload.payload
|
|
|
|
if decoded_payload['operation'] == 'drop':
|
|
|
|
if os.environ.get('AWS_ENV') == 'dev':
|
|
host = os.environ.get('POSTGRES_HOST')
|
|
username = os.environ.get('POSTGRES_USER')
|
|
password = os.environ.get('POSTGRES_PASSWORD')
|
|
database_name = os.environ.get('POSTGRES_DB')
|
|
else:
|
|
pass
|
|
|
|
from cognitive_architecture.database.create_database import drop_database, create_admin_engine
|
|
|
|
engine = create_admin_engine(username, password, host, database_name)
|
|
connection = engine.raw_connection()
|
|
drop_database(connection, database_name)
|
|
return JSONResponse(content={"response": "DB dropped"}, status_code=200)
|
|
else:
|
|
|
|
if os.environ.get('AWS_ENV') == 'dev':
|
|
host = os.environ.get('POSTGRES_HOST')
|
|
username = os.environ.get('POSTGRES_USER')
|
|
password = os.environ.get('POSTGRES_PASSWORD')
|
|
database_name = os.environ.get('POSTGRES_DB')
|
|
else:
|
|
pass
|
|
|
|
from cognitive_architecture.database.create_database import create_database, create_admin_engine
|
|
|
|
engine = create_admin_engine(username, password, host, database_name)
|
|
connection = engine.raw_connection()
|
|
create_database(connection, database_name)
|
|
return JSONResponse(content={"response": " DB drop"}, status_code=200)
|
|
|
|
|
|
|
|
except Exception as e:
|
|
return HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/create-public-memory")
|
|
async def create_public_memory(payload: Payload):
|
|
try:
|
|
decoded_payload = payload.payload
|
|
|
|
if 'user_id' in decoded_payload and decoded_payload['user_id'] is not None:
|
|
user_id = decoded_payload['user_id']
|
|
else:
|
|
user_id = None
|
|
|
|
if 'labels' in decoded_payload and decoded_payload['labels'] is not None:
|
|
labels = decoded_payload['labels']
|
|
else:
|
|
labels = None
|
|
|
|
if 'topic' in decoded_payload and decoded_payload['topic'] is not None:
|
|
topic = decoded_payload['topic']
|
|
else:
|
|
topic = None
|
|
|
|
# Execute the query - replace this with the actual execution method
|
|
# async with session_scope(session=AsyncSessionLocal()) as session:
|
|
# from main import create_public_memory
|
|
# Assuming you have a method in Neo4jGraphDB to execute the query
|
|
result = await create_public_memory(user_id=user_id, labels=labels, topic=topic)
|
|
return JSONResponse(content={"response": result}, status_code=200)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/attach-user-to-public-memory")
|
|
async def attach_user_to_public_memory(payload: Payload):
|
|
try:
|
|
decoded_payload = payload.payload
|
|
|
|
if 'topic' in decoded_payload and decoded_payload['topic'] is not None:
|
|
topic = decoded_payload['topic']
|
|
else:
|
|
topic = None
|
|
if 'labels' in decoded_payload and decoded_payload['labels'] is not None:
|
|
labels = decoded_payload['labels']
|
|
else:
|
|
labels = ['sr']
|
|
|
|
# Execute the query - replace this with the actual execution method
|
|
async with session_scope(session=AsyncSessionLocal()) as session:
|
|
from main import attach_user_to_memory, create_public_memory
|
|
# Assuming you have a method in Neo4jGraphDB to execute the query
|
|
await create_public_memory(user_id=decoded_payload['user_id'], topic=topic, labels=labels)
|
|
result = await attach_user_to_memory(user_id=decoded_payload['user_id'], topic=topic, labels=labels)
|
|
return JSONResponse(content={"response": result}, status_code=200)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/unlink-user-from-public-memory")
|
|
async def unlink_user_from_public_memory(payload: Payload):
|
|
try:
|
|
decoded_payload = payload.payload
|
|
|
|
if 'topic' in decoded_payload and decoded_payload['topic'] is not None:
|
|
topic = decoded_payload['topic']
|
|
else:
|
|
topic = None
|
|
|
|
# Execute the query - replace this with the actual execution method
|
|
async with session_scope(session=AsyncSessionLocal()) as session:
|
|
from main import unlink_user_from_memory
|
|
# Assuming you have a method in Neo4jGraphDB to execute the query
|
|
result = await unlink_user_from_memory(user_id=decoded_payload['user_id'], topic=topic,
|
|
labels=decoded_payload['labels'])
|
|
return JSONResponse(content={"response": result}, status_code=200)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
def start_api_server(host: str = "0.0.0.0", port: int = 8000):
|
|
"""
|
|
Start the API server using uvicorn.
|
|
|
|
Parameters:
|
|
host (str): The host for the server.
|
|
port (int): The port for the server.
|
|
"""
|
|
try:
|
|
logger.info(f"Starting server at {host}:{port}")
|
|
uvicorn.run(app, host=host, port=port)
|
|
except Exception as e:
|
|
logger.exception(f"Failed to start server: {e}")
|
|
# Here you could add any cleanup code or error recovery code.
|
|
|
|
|
|
if __name__ == "__main__":
|
|
start_api_server()
|