feat: enhance BaseVectorStorage for model isolation

Why this change is needed:
To enforce consistent naming and migration strategy across all vector storages.

How it solves it:
- Added _generate_collection_suffix() helper
- Added _get_legacy_collection_name() and _get_new_collection_name() interfaces

Impact:
Prepares storage implementations for multi-model support.

Testing:
Added tests/test_base_storage_integrity.py passing.
This commit is contained in:
BukeLy 2025-11-19 02:15:22 +08:00
parent 5c10d3d58e
commit 13f2440bbf
2 changed files with 53 additions and 0 deletions

View file

@ -220,6 +220,25 @@ class BaseVectorStorage(StorageNameSpace, ABC):
cosine_better_than_threshold: float = field(default=0.2)
meta_fields: set[str] = field(default_factory=set)
def _generate_collection_suffix(self) -> str:
"""Generates collection/table suffix from embedding_func.
Returns:
str: Suffix string, e.g. "text_embedding_3_large_3072d"
"""
return self.embedding_func.get_model_identifier()
def _get_legacy_collection_name(self) -> str:
"""Get legacy collection/table name (without suffix).
Used for data migration detection.
"""
raise NotImplementedError("Subclasses must implement this method")
def _get_new_collection_name(self) -> str:
"""Get new collection/table name (with suffix)."""
raise NotImplementedError("Subclasses must implement this method")
@abstractmethod
async def query(
self, query: str, top_k: int, query_embedding: list[float] = None

View file

@ -0,0 +1,34 @@
import pytest
from lightrag.base import BaseVectorStorage
from lightrag.utils import EmbeddingFunc
def test_base_vector_storage_integrity():
# Just checking if we can import and inspect the class
assert hasattr(BaseVectorStorage, '_generate_collection_suffix')
assert hasattr(BaseVectorStorage, '_get_legacy_collection_name')
assert hasattr(BaseVectorStorage, '_get_new_collection_name')
# Verify methods raise NotImplementedError
class ConcreteStorage(BaseVectorStorage):
async def query(self, *args, **kwargs): pass
async def upsert(self, *args, **kwargs): pass
async def delete_entity(self, *args, **kwargs): pass
async def delete_entity_relation(self, *args, **kwargs): pass
async def get_by_id(self, *args, **kwargs): pass
async def get_by_ids(self, *args, **kwargs): pass
async def delete(self, *args, **kwargs): pass
async def get_vectors_by_ids(self, *args, **kwargs): pass
async def index_done_callback(self): pass
async def drop(self): pass
func = EmbeddingFunc(embedding_dim=128, func=lambda x: x)
storage = ConcreteStorage(namespace="test", workspace="test", global_config={}, embedding_func=func)
assert storage._generate_collection_suffix() == "unknown_128d"
with pytest.raises(NotImplementedError):
storage._get_legacy_collection_name()
with pytest.raises(NotImplementedError):
storage._get_new_collection_name()