LightRAG/examples/lightrag_azure_openai_demo.py
2025-09-30 22:24:01 +03:00

266 lines
8.2 KiB
Python

==========controled ingestion in batches=============
import os
import asyncio
from lightrag import LightRAG, QueryParam
from lightrag.utils import EmbeddingFunc
import numpy as np
from dotenv import load_dotenv
import logging
from openai import AzureOpenAI
import time
start_time = time.time()
logging.basicConfig(level=logging.INFO)
load_dotenv()
AZURE_OPENAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION")
AZURE_OPENAI_DEPLOYMENT = os.getenv("AZURE_OPENAI_DEPLOYMENT")
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_EMBEDDING_DEPLOYMENT = os.getenv("AZURE_EMBEDDING_DEPLOYMENT")
AZURE_EMBEDDING_API_VERSION = os.getenv("AZURE_EMBEDDING_API_VERSION")
WORKING_DIR = "C:\\Users\\user\\testfolder"
# The code below removes the working_dir and creates a new one!
# if os.path.exists(WORKING_DIR):
# import shutil
# shutil.rmtree(WORKING_DIR)
# os.mkdir(WORKING_DIR)
async def llm_model_func(
prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs
) -> str:
client = AzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version=AZURE_OPENAI_API_VERSION,
azure_endpoint=AZURE_OPENAI_ENDPOINT,
)
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
if history_messages:
messages.extend(history_messages)
messages.append({"role": "user", "content": prompt})
chat_completion = client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT, # model = "deployment_name".
messages=messages,
temperature=kwargs.get("temperature", 0),
top_p=kwargs.get("top_p", 1),
n=kwargs.get("n", 1),
)
return chat_completion.choices[0].message.content
async def embedding_func(texts: list[str]) -> np.ndarray:
client = AzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version=AZURE_EMBEDDING_API_VERSION,
azure_endpoint=AZURE_OPENAI_ENDPOINT,
)
embedding = client.embeddings.create(model=AZURE_EMBEDDING_DEPLOYMENT, input=texts)
embeddings = [item.embedding for item in embedding.data]
return np.array(embeddings)
async def test_funcs():
result = await llm_model_func("How are you?")
print("Response from llm_model_func: ", result)
result = await embedding_func(["How are you?"])
print("Result from embedding_func: ", result.shape)
print("Embedding dimension: ", result.shape[1])
asyncio.run(test_funcs())
embedding_dimension = 1536
rag = LightRAG(
working_dir=WORKING_DIR,
addon_params={"insert_batch_size": 3},
llm_model_func=llm_model_func,
embedding_func=EmbeddingFunc(
embedding_dim=embedding_dimension,
max_token_size=8192,
func=embedding_func,
),
)
folder_path = 'C:/Users/example/test/LightRAG/my_docs' # With os, this specification of the documents folder is not a problem.
def normalize_path(path):
# Normalize the path
normalized_path = os.path.normpath(path)
# Replace backslashes with forward slashes
return normalized_path.replace('\\', '/')
# Output file where we store filenames
output_file = 'processed.txt'
# The maximum number of files to process
batch_files = 5
# Function to include document in vector-store
def process_doc(file_path):
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
return content
# logging.info(f"Adding document: {file_path}")
# rag.insert(content)
# Function to read existing filenames from the output file
def read_existing_files(output_file):
if os.path.exists(output_file):
with open(output_file, 'r') as f:
return set(line.strip() for line in f.readlines())
return set()
input_docs = []
# Check if the folder exists
if os.path.exists(folder_path) and os.path.isdir(folder_path):
# Read existing filenames from the output file
existing_files = read_existing_files(output_file)
# Open the output
==========Querier==============================================
import os
import asyncio
from lightrag import LightRAG, QueryParam
from lightrag.utils import EmbeddingFunc
import numpy as np
from dotenv import load_dotenv
import logging
from openai import AzureOpenAI
import time
from multiprocessing import freeze_support
# Configure logging
logging.basicConfig(level=logging.INFO)
def main():
start_time = time.time()
load_dotenv()
# Version of the script
VERSION = "0.2"
AZURE_OPENAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION")
AZURE_OPENAI_DEPLOYMENT = os.getenv("AZURE_OPENAI_DEPLOYMENT")
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_EMBEDDING_DEPLOYMENT = os.getenv("AZURE_EMBEDDING_DEPLOYMENT")
AZURE_EMBEDDING_API_VERSION = os.getenv("AZURE_EMBEDDING_API_VERSION")
WORKING_DIR = "C:/Users/user/testfolder"
async def llm_model_func(
prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs
) -> str:
client = AzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version=AZURE_OPENAI_API_VERSION,
azure_endpoint=AZURE_OPENAI_ENDPOINT,
)
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
if history_messages:
messages.extend(history_messages)
messages.append({"role": "user", "content": prompt})
chat_completion = client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT,
messages=messages,
temperature=kwargs.get("temperature", 0.5),
max_tokens=kwargs.get("max_tokens", 1500)
)
return chat_completion.choices[0].message.content
async def embedding_func(texts: list[str]) -> np.ndarray:
client = AzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version=AZURE_EMBEDDING_API_VERSION,
azure_endpoint=AZURE_OPENAI_ENDPOINT,
)
embedding = client.embeddings.create(model=AZURE_EMBEDDING_DEPLOYMENT, input=texts)
embeddings = [item.embedding for item in embedding.data]
return np.array(embeddings)
# async def test_funcs():
# try:
# result_llm = await llm_model_func("How are you?", system_prompt="Act as a friendly assistant.")
# print("Response from llm_model_func: ", result_llm)
# result_embedding = await embedding_func(["How are you?"])
# print("Result from embedding_func: ", result_embedding.shape)
# print("Embedding dimension: ", result_embedding.shape[1])
# except Exception as e:
# print(f"An error occurred in test_funcs: {e}")
# asyncio.run(test_funcs())
embedding_dimension = 1536
rag = LightRAG(
working_dir=WORKING_DIR,
llm_model_func=llm_model_func,
embedding_func=EmbeddingFunc(
embedding_dim=embedding_dimension,
max_token_size=8192,
func=embedding_func,
),
)
def query_rag(question):
"""
Execute a query in hybrid mode.
"""
try:
start_time = time.time()
response = rag.query(
question,
param=QueryParam(mode="hybrid") # Set hybrid mode
)
if response is None or not response.strip():
print("No relevant answer found. Check if the database is correctly populated.")
return
print("\n--- Answer ---")
print(response)
print("\n")
duration = time.time() - start_time
print(f"The answer took {duration} seconds.")
except Exception as e:
print(f"An error occurred while executing the query: {e}")
print("LightRAG - Interactive Questioning")
print(f"Version: {VERSION}")
print("Type 'exit' to terminate the program.\n")
while True:
try:
question = input("Ask your question: ")
if question.lower() in ["exit", "quit"]:
print("Program terminated.")
break
query_rag(question)
except:
print("Error answering the question")
if __name__ == '__main__':