import os import json import uvicorn from fastapi import Depends import logging # Set up logging logging.basicConfig( level=logging.INFO, # Set the logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL) format="%(asctime)s [%(levelname)s] %(message)s", # Set the log message format ) logger = logging.getLogger(__name__) from cognitive_architecture.config import Config config = Config() config.load() from typing import Dict, Any from fastapi import FastAPI, BackgroundTasks, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel from cognitive_architecture.database.relationaldb.database import AsyncSessionLocal from cognitive_architecture.database.relationaldb.database_crud import session_scope from cognitive_architecture.vectorstore_manager import Memory from main import add_documents_to_graph_db, user_context_enrichment OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") app = FastAPI(debug=True) # # from auth.cognito.JWTBearer import JWTBearer # from auth.auth import jwks # # auth = JWTBearer(jwks) @app.get("/") async def root(): """ Root endpoint that returns a welcome message. """ return {"message": "Hello, World, I am alive!"} @app.get("/health") def health_check(): """ Health check endpoint that returns the server status. """ return {"status": "OK"} class Payload(BaseModel): payload: Dict[str, Any] @app.post("/add-memory", response_model=dict) async def add_memory( payload: Payload, # files: List[UploadFile] = File(...), ): try: logging.info(" Adding to Memory ") decoded_payload = payload.payload async with session_scope(session=AsyncSessionLocal()) as session: from main import load_documents_to_vectorstore if ( "settings" in decoded_payload and decoded_payload["settings"] is not None ): settings_for_loader = decoded_payload["settings"] else: settings_for_loader = None if "content" in decoded_payload and decoded_payload["content"] is not None: content = decoded_payload["content"] else: content = None output = await load_documents_to_vectorstore( session, decoded_payload["user_id"], content=content, loader_settings=settings_for_loader, ) return JSONResponse(content={"response": output}, status_code=200) except Exception as e: return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) @app.post("/add-architecture-public-memory", response_model=dict) async def add_memory( payload: Payload, # files: List[UploadFile] = File(...), ): try: logging.info(" Adding to Memory ") decoded_payload = payload.payload async with session_scope(session=AsyncSessionLocal()) as session: from main import load_documents_to_vectorstore if "content" in decoded_payload and decoded_payload["content"] is not None: content = decoded_payload["content"] else: content = None user_id = "system_user" loader_settings = {"format": "PDF", "source": "DEVICE", "path": [".data"]} output = await load_documents_to_vectorstore( session, user_id=user_id, content=content, loader_settings=loader_settings, ) return JSONResponse(content={"response": output}, status_code=200) except Exception as e: return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) @app.post("/user-query-to-graph") async def user_query_to_graph(payload: Payload): try: from main import user_query_to_graph_db decoded_payload = payload.payload # Execute the query - replace this with the actual execution method async with session_scope(session=AsyncSessionLocal()) as session: # Assuming you have a method in Neo4jGraphDB to execute the query result = await user_query_to_graph_db( session=session, user_id=decoded_payload["user_id"], query_input=decoded_payload["query"], ) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/document-to-graph-db") async def document_to_graph_db(payload: Payload): logging.info("Adding documents to graph db") try: decoded_payload = payload.payload if "settings" in decoded_payload and decoded_payload["settings"] is not None: settings_for_loader = decoded_payload["settings"] else: settings_for_loader = None if ( "memory_type" in decoded_payload and decoded_payload["memory_type"] is not None ): memory_type = decoded_payload["memory_type"] else: memory_type = None async with session_scope(session=AsyncSessionLocal()) as session: result = await add_documents_to_graph_db( session=session, user_id=decoded_payload["user_id"], document_memory_types=memory_type, ) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/cognitive-context-enrichment") async def cognitive_context_enrichment(payload: Payload): try: decoded_payload = payload.payload async with session_scope(session=AsyncSessionLocal()) as session: result = await user_context_enrichment( session, user_id=decoded_payload["user_id"], query=decoded_payload["query"], generative_response=decoded_payload["generative_response"], memory_type=decoded_payload["memory_type"], ) return JSONResponse(content={"response": result}, status_code=200) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/classify-user-query") async def classify_user_query(payload: Payload): try: decoded_payload = payload.payload async with session_scope(session=AsyncSessionLocal()) as session: from main import relevance_feedback result = await relevance_feedback( query=decoded_payload["query"], input_type=decoded_payload["knowledge_type"], ) return JSONResponse(content={"response": result}, status_code=200) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/user-query-classifier") async def user_query_classfier(payload: Payload): try: decoded_payload = payload.payload # Execute the query - replace this with the actual execution method async with session_scope(session=AsyncSessionLocal()) as session: from cognitive_architecture.classifiers.classify_user_input import ( classify_user_query, ) # Assuming you have a method in Neo4jGraphDB to execute the query result = await classify_user_query( session, decoded_payload["user_id"], decoded_payload["query"] ) return JSONResponse(content={"response": result}, status_code=200) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/drop-db") async def drop_db(payload: Payload): try: decoded_payload = payload.payload if decoded_payload["operation"] == "drop": if os.environ.get("AWS_ENV") == "dev": host = os.environ.get("POSTGRES_HOST") username = os.environ.get("POSTGRES_USER") password = os.environ.get("POSTGRES_PASSWORD") database_name = os.environ.get("POSTGRES_DB") else: pass from cognitive_architecture.database.create_database import ( drop_database, create_admin_engine, ) engine = create_admin_engine(username, password, host, database_name) connection = engine.raw_connection() drop_database(connection, database_name) return JSONResponse(content={"response": "DB dropped"}, status_code=200) else: if os.environ.get("AWS_ENV") == "dev": host = os.environ.get("POSTGRES_HOST") username = os.environ.get("POSTGRES_USER") password = os.environ.get("POSTGRES_PASSWORD") database_name = os.environ.get("POSTGRES_DB") else: pass from cognitive_architecture.database.create_database import ( create_database, create_admin_engine, ) engine = create_admin_engine(username, password, host, database_name) connection = engine.raw_connection() create_database(connection, database_name) return JSONResponse(content={"response": " DB drop"}, status_code=200) except Exception as e: return HTTPException(status_code=500, detail=str(e)) @app.post("/create-public-memory") async def create_public_memory(payload: Payload): try: decoded_payload = payload.payload if "user_id" in decoded_payload and decoded_payload["user_id"] is not None: user_id = decoded_payload["user_id"] else: user_id = None if "labels" in decoded_payload and decoded_payload["labels"] is not None: labels = decoded_payload["labels"] else: labels = None if "topic" in decoded_payload and decoded_payload["topic"] is not None: topic = decoded_payload["topic"] else: topic = None # Execute the query - replace this with the actual execution method # async with session_scope(session=AsyncSessionLocal()) as session: # from main import create_public_memory # Assuming you have a method in Neo4jGraphDB to execute the query result = await create_public_memory(user_id=user_id, labels=labels, topic=topic) return JSONResponse(content={"response": result}, status_code=200) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/attach-user-to-public-memory") async def attach_user_to_public_memory(payload: Payload): try: decoded_payload = payload.payload if "topic" in decoded_payload and decoded_payload["topic"] is not None: topic = decoded_payload["topic"] else: topic = None if "labels" in decoded_payload and decoded_payload["labels"] is not None: labels = decoded_payload["labels"] else: labels = ["sr"] # Execute the query - replace this with the actual execution method async with session_scope(session=AsyncSessionLocal()) as session: from main import attach_user_to_memory, create_public_memory # Assuming you have a method in Neo4jGraphDB to execute the query await create_public_memory( user_id=decoded_payload["user_id"], topic=topic, labels=labels ) result = await attach_user_to_memory( user_id=decoded_payload["user_id"], topic=topic, labels=labels ) return JSONResponse(content={"response": result}, status_code=200) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/unlink-user-from-public-memory") async def unlink_user_from_public_memory(payload: Payload): try: decoded_payload = payload.payload if "topic" in decoded_payload and decoded_payload["topic"] is not None: topic = decoded_payload["topic"] else: topic = None # Execute the query - replace this with the actual execution method async with session_scope(session=AsyncSessionLocal()) as session: from main import unlink_user_from_memory # Assuming you have a method in Neo4jGraphDB to execute the query result = await unlink_user_from_memory( user_id=decoded_payload["user_id"], topic=topic, labels=decoded_payload["labels"], ) return JSONResponse(content={"response": result}, status_code=200) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) def start_api_server(host: str = "0.0.0.0", port: int = 8000): """ Start the API server using uvicorn. Parameters: host (str): The host for the server. port (int): The port for the server. """ try: logger.info(f"Starting server at {host}:{port}") uvicorn.run(app, host=host, port=port) except Exception as e: logger.exception(f"Failed to start server: {e}") # Here you could add any cleanup code or error recovery code. if __name__ == "__main__": start_api_server()