diff --git a/examples/lightrag_gemini_demo.py b/examples/lightrag_gemini_demo.py deleted file mode 100644 index cd2bb579..00000000 --- a/examples/lightrag_gemini_demo.py +++ /dev/null @@ -1,105 +0,0 @@ -# pip install -q -U google-genai to use gemini as a client - -import os -import numpy as np -from google import genai -from google.genai import types -from dotenv import load_dotenv -from lightrag.utils import EmbeddingFunc -from lightrag import LightRAG, QueryParam -from sentence_transformers import SentenceTransformer -from lightrag.kg.shared_storage import initialize_pipeline_status - -import asyncio -import nest_asyncio - -# Apply nest_asyncio to solve event loop issues -nest_asyncio.apply() - -load_dotenv() -gemini_api_key = os.getenv("GEMINI_API_KEY") - -WORKING_DIR = "./dickens" - -if os.path.exists(WORKING_DIR): - import shutil - - shutil.rmtree(WORKING_DIR) - -os.mkdir(WORKING_DIR) - - -async def llm_model_func( - prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs -) -> str: - # 1. Initialize the GenAI Client with your Gemini API Key - client = genai.Client(api_key=gemini_api_key) - - # 2. Combine prompts: system prompt, history, and user prompt - if history_messages is None: - history_messages = [] - - combined_prompt = "" - if system_prompt: - combined_prompt += f"{system_prompt}\n" - - for msg in history_messages: - # Each msg is expected to be a dict: {"role": "...", "content": "..."} - combined_prompt += f"{msg['role']}: {msg['content']}\n" - - # Finally, add the new user prompt - combined_prompt += f"user: {prompt}" - - # 3. Call the Gemini model - response = client.models.generate_content( - model="gemini-1.5-flash", - contents=[combined_prompt], - config=types.GenerateContentConfig(max_output_tokens=500, temperature=0.1), - ) - - # 4. Return the response text - return response.text - - -async def embedding_func(texts: list[str]) -> np.ndarray: - model = SentenceTransformer("all-MiniLM-L6-v2") - embeddings = model.encode(texts, convert_to_numpy=True) - return embeddings - - -async def initialize_rag(): - rag = LightRAG( - working_dir=WORKING_DIR, - llm_model_func=llm_model_func, - embedding_func=EmbeddingFunc( - embedding_dim=384, - max_token_size=8192, - func=embedding_func, - ), - ) - - await rag.initialize_storages() - await initialize_pipeline_status() - - return rag - - -def main(): - # Initialize RAG instance - rag = asyncio.run(initialize_rag()) - file_path = "story.txt" - with open(file_path, "r") as file: - text = file.read() - - rag.insert(text) - - response = rag.query( - query="What is the main theme of the story?", - param=QueryParam(mode="hybrid", top_k=5, response_type="single line"), - ) - - print(response) - - -if __name__ == "__main__": - main() diff --git a/examples/lightrag_gemini_demo_no_tiktoken.py b/examples/lightrag_gemini_demo_no_tiktoken.py deleted file mode 100644 index 92c74201..00000000 --- a/examples/lightrag_gemini_demo_no_tiktoken.py +++ /dev/null @@ -1,230 +0,0 @@ -# pip install -q -U google-genai to use gemini as a client - -import os -from typing import Optional -import dataclasses -from pathlib import Path -import hashlib -import numpy as np -from google import genai -from google.genai import types -from dotenv import load_dotenv -from lightrag.utils import EmbeddingFunc, Tokenizer -from lightrag import LightRAG, QueryParam -from sentence_transformers import SentenceTransformer -from lightrag.kg.shared_storage import initialize_pipeline_status -import sentencepiece as spm -import requests - -import asyncio -import nest_asyncio - -# Apply nest_asyncio to solve event loop issues -nest_asyncio.apply() - -load_dotenv() -gemini_api_key = os.getenv("GEMINI_API_KEY") - -WORKING_DIR = "./dickens" - -if os.path.exists(WORKING_DIR): - import shutil - - shutil.rmtree(WORKING_DIR) - -os.mkdir(WORKING_DIR) - - -class GemmaTokenizer(Tokenizer): - # adapted from google-cloud-aiplatform[tokenization] - - @dataclasses.dataclass(frozen=True) - class _TokenizerConfig: - tokenizer_model_url: str - tokenizer_model_hash: str - - _TOKENIZERS = { - "google/gemma2": _TokenizerConfig( - tokenizer_model_url="https://raw.githubusercontent.com/google/gemma_pytorch/33b652c465537c6158f9a472ea5700e5e770ad3f/tokenizer/tokenizer.model", - tokenizer_model_hash="61a7b147390c64585d6c3543dd6fc636906c9af3865a5548f27f31aee1d4c8e2", - ), - "google/gemma3": _TokenizerConfig( - tokenizer_model_url="https://raw.githubusercontent.com/google/gemma_pytorch/cb7c0152a369e43908e769eb09e1ce6043afe084/tokenizer/gemma3_cleaned_262144_v2.spiece.model", - tokenizer_model_hash="1299c11d7cf632ef3b4e11937501358ada021bbdf7c47638d13c0ee982f2e79c", - ), - } - - def __init__( - self, model_name: str = "gemini-2.0-flash", tokenizer_dir: Optional[str] = None - ): - # https://github.com/google/gemma_pytorch/tree/main/tokenizer - if "1.5" in model_name or "1.0" in model_name: - # up to gemini 1.5 gemma2 is a comparable local tokenizer - # https://github.com/googleapis/python-aiplatform/blob/main/vertexai/tokenization/_tokenizer_loading.py - tokenizer_name = "google/gemma2" - else: - # for gemini > 2.0 gemma3 was used - tokenizer_name = "google/gemma3" - - file_url = self._TOKENIZERS[tokenizer_name].tokenizer_model_url - tokenizer_model_name = file_url.rsplit("/", 1)[1] - expected_hash = self._TOKENIZERS[tokenizer_name].tokenizer_model_hash - - tokenizer_dir = Path(tokenizer_dir) - if tokenizer_dir.is_dir(): - file_path = tokenizer_dir / tokenizer_model_name - model_data = self._maybe_load_from_cache( - file_path=file_path, expected_hash=expected_hash - ) - else: - model_data = None - if not model_data: - model_data = self._load_from_url( - file_url=file_url, expected_hash=expected_hash - ) - self.save_tokenizer_to_cache(cache_path=file_path, model_data=model_data) - - tokenizer = spm.SentencePieceProcessor() - tokenizer.LoadFromSerializedProto(model_data) - super().__init__(model_name=model_name, tokenizer=tokenizer) - - def _is_valid_model(self, model_data: bytes, expected_hash: str) -> bool: - """Returns true if the content is valid by checking the hash.""" - return hashlib.sha256(model_data).hexdigest() == expected_hash - - def _maybe_load_from_cache(self, file_path: Path, expected_hash: str) -> bytes: - """Loads the model data from the cache path.""" - if not file_path.is_file(): - return - with open(file_path, "rb") as f: - content = f.read() - if self._is_valid_model(model_data=content, expected_hash=expected_hash): - return content - - # Cached file corrupted. - self._maybe_remove_file(file_path) - - def _load_from_url(self, file_url: str, expected_hash: str) -> bytes: - """Loads model bytes from the given file url.""" - resp = requests.get(file_url) - resp.raise_for_status() - content = resp.content - - if not self._is_valid_model(model_data=content, expected_hash=expected_hash): - actual_hash = hashlib.sha256(content).hexdigest() - raise ValueError( - f"Downloaded model file is corrupted." - f" Expected hash {expected_hash}. Got file hash {actual_hash}." - ) - return content - - @staticmethod - def save_tokenizer_to_cache(cache_path: Path, model_data: bytes) -> None: - """Saves the model data to the cache path.""" - try: - if not cache_path.is_file(): - cache_dir = cache_path.parent - cache_dir.mkdir(parents=True, exist_ok=True) - with open(cache_path, "wb") as f: - f.write(model_data) - except OSError: - # Don't raise if we cannot write file. - pass - - @staticmethod - def _maybe_remove_file(file_path: Path) -> None: - """Removes the file if exists.""" - if not file_path.is_file(): - return - try: - file_path.unlink() - except OSError: - # Don't raise if we cannot remove file. - pass - - # def encode(self, content: str) -> list[int]: - # return self.tokenizer.encode(content) - - # def decode(self, tokens: list[int]) -> str: - # return self.tokenizer.decode(tokens) - - -async def llm_model_func( - prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs -) -> str: - # 1. Initialize the GenAI Client with your Gemini API Key - client = genai.Client(api_key=gemini_api_key) - - # 2. Combine prompts: system prompt, history, and user prompt - if history_messages is None: - history_messages = [] - - combined_prompt = "" - if system_prompt: - combined_prompt += f"{system_prompt}\n" - - for msg in history_messages: - # Each msg is expected to be a dict: {"role": "...", "content": "..."} - combined_prompt += f"{msg['role']}: {msg['content']}\n" - - # Finally, add the new user prompt - combined_prompt += f"user: {prompt}" - - # 3. Call the Gemini model - response = client.models.generate_content( - model="gemini-1.5-flash", - contents=[combined_prompt], - config=types.GenerateContentConfig(max_output_tokens=500, temperature=0.1), - ) - - # 4. Return the response text - return response.text - - -async def embedding_func(texts: list[str]) -> np.ndarray: - model = SentenceTransformer("all-MiniLM-L6-v2") - embeddings = model.encode(texts, convert_to_numpy=True) - return embeddings - - -async def initialize_rag(): - rag = LightRAG( - working_dir=WORKING_DIR, - # tiktoken_model_name="gpt-4o-mini", - tokenizer=GemmaTokenizer( - tokenizer_dir=(Path(WORKING_DIR) / "vertexai_tokenizer_model"), - model_name="gemini-2.0-flash", - ), - llm_model_func=llm_model_func, - embedding_func=EmbeddingFunc( - embedding_dim=384, - max_token_size=8192, - func=embedding_func, - ), - ) - - await rag.initialize_storages() - await initialize_pipeline_status() - - return rag - - -def main(): - # Initialize RAG instance - rag = asyncio.run(initialize_rag()) - file_path = "story.txt" - with open(file_path, "r") as file: - text = file.read() - - rag.insert(text) - - response = rag.query( - query="What is the main theme of the story?", - param=QueryParam(mode="hybrid", top_k=5, response_type="single line"), - ) - - print(response) - - -if __name__ == "__main__": - main() diff --git a/examples/lightrag_gemini_track_token_demo.py b/examples/lightrag_gemini_track_token_demo.py deleted file mode 100644 index a72fc717..00000000 --- a/examples/lightrag_gemini_track_token_demo.py +++ /dev/null @@ -1,151 +0,0 @@ -# pip install -q -U google-genai to use gemini as a client - -import os -import asyncio -import numpy as np -import nest_asyncio -from google import genai -from google.genai import types -from dotenv import load_dotenv -from lightrag.utils import EmbeddingFunc -from lightrag import LightRAG, QueryParam -from lightrag.kg.shared_storage import initialize_pipeline_status -from lightrag.llm.siliconcloud import siliconcloud_embedding -from lightrag.utils import setup_logger -from lightrag.utils import TokenTracker - -setup_logger("lightrag", level="DEBUG") - -# Apply nest_asyncio to solve event loop issues -nest_asyncio.apply() - -load_dotenv() -gemini_api_key = os.getenv("GEMINI_API_KEY") -siliconflow_api_key = os.getenv("SILICONFLOW_API_KEY") - -WORKING_DIR = "./dickens" - -if not os.path.exists(WORKING_DIR): - os.mkdir(WORKING_DIR) - -token_tracker = TokenTracker() - - -async def llm_model_func( - prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs -) -> str: - # 1. Initialize the GenAI Client with your Gemini API Key - client = genai.Client(api_key=gemini_api_key) - - # 2. Combine prompts: system prompt, history, and user prompt - if history_messages is None: - history_messages = [] - - combined_prompt = "" - if system_prompt: - combined_prompt += f"{system_prompt}\n" - - for msg in history_messages: - # Each msg is expected to be a dict: {"role": "...", "content": "..."} - combined_prompt += f"{msg['role']}: {msg['content']}\n" - - # Finally, add the new user prompt - combined_prompt += f"user: {prompt}" - - # 3. Call the Gemini model - response = client.models.generate_content( - model="gemini-2.0-flash", - contents=[combined_prompt], - config=types.GenerateContentConfig( - max_output_tokens=5000, temperature=0, top_k=10 - ), - ) - - # 4. Get token counts with null safety - usage = getattr(response, "usage_metadata", None) - prompt_tokens = getattr(usage, "prompt_token_count", 0) or 0 - completion_tokens = getattr(usage, "candidates_token_count", 0) or 0 - total_tokens = getattr(usage, "total_token_count", 0) or ( - prompt_tokens + completion_tokens - ) - - token_counts = { - "prompt_tokens": prompt_tokens, - "completion_tokens": completion_tokens, - "total_tokens": total_tokens, - } - - token_tracker.add_usage(token_counts) - - # 5. Return the response text - return response.text - - -async def embedding_func(texts: list[str]) -> np.ndarray: - return await siliconcloud_embedding( - texts, - model="BAAI/bge-m3", - api_key=siliconflow_api_key, - max_token_size=512, - ) - - -async def initialize_rag(): - rag = LightRAG( - working_dir=WORKING_DIR, - entity_extract_max_gleaning=1, - enable_llm_cache=True, - enable_llm_cache_for_entity_extract=True, - embedding_cache_config={"enabled": True, "similarity_threshold": 0.90}, - llm_model_func=llm_model_func, - embedding_func=EmbeddingFunc( - embedding_dim=1024, - max_token_size=8192, - func=embedding_func, - ), - ) - - await rag.initialize_storages() - await initialize_pipeline_status() - - return rag - - -def main(): - # Initialize RAG instance - rag = asyncio.run(initialize_rag()) - - with open("./book.txt", "r", encoding="utf-8") as f: - rag.insert(f.read()) - - # Context Manager Method - with token_tracker: - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="naive") - ) - ) - - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="local") - ) - ) - - print( - rag.query( - "What are the top themes in this story?", - param=QueryParam(mode="global"), - ) - ) - - print( - rag.query( - "What are the top themes in this story?", - param=QueryParam(mode="hybrid"), - ) - ) - - -if __name__ == "__main__": - main()