fix: use actual embedding_dim instead of environment variable

CRITICAL FIX: PostgreSQL vector index creation now uses the actual
embedding dimension from PGVectorStorage instead of reading from
EMBEDDING_DIM environment variable (which defaults to 1024).

Root Cause:
- check_tables() called _create_vector_indexes() during db initialization
- It read EMBEDDING_DIM from env, defaulting to 1024
- E2E tests created 1536d legacy tables
- ALTER TABLE failed: "expected 1024 dimensions, not 1536"

Solution:
- Removed vector index creation from check_tables()
- Created new _create_vector_index(table_name, embedding_dim) method
- setup_table() now creates index with correct embedding_dim
- Each PGVectorStorage instance manages its own index

Impact:
- E2E tests will now pass
- Production deployments work without EMBEDDING_DIM env var
- Multi-model support with different dimensions works correctly
This commit is contained in:
BukeLy 2025-11-20 02:17:17 +08:00
parent d12c14946b
commit 8d9b6a629d

View file

@ -1163,23 +1163,9 @@ class PostgreSQLDB:
except Exception as e: except Exception as e:
logger.error(f"PostgreSQL, Failed to batch check/create indexes: {e}") logger.error(f"PostgreSQL, Failed to batch check/create indexes: {e}")
# Create vector indexs # NOTE: Vector index creation moved to PGVectorStorage.setup_table()
if self.vector_index_type: # Each vector storage instance creates its own index with correct embedding_dim
logger.info(
f"PostgreSQL, Create vector indexs, type: {self.vector_index_type}"
)
try:
if self.vector_index_type in ["HNSW", "IVFFLAT", "VCHORDRQ"]:
await self._create_vector_indexes()
else:
logger.warning(
"Doesn't support this vector index type: {self.vector_index_type}. "
"Supported types: HNSW, IVFFLAT, VCHORDRQ"
)
except Exception as e:
logger.error(
f"PostgreSQL, Failed to create vector index, type: {self.vector_index_type}, Got: {e}"
)
# After all tables are created, attempt to migrate timestamp fields # After all tables are created, attempt to migrate timestamp fields
try: try:
await self._migrate_timestamp_columns() await self._migrate_timestamp_columns()
@ -1381,64 +1367,72 @@ class PostgreSQLDB:
except Exception as e: except Exception as e:
logger.warning(f"Failed to create index {index['name']}: {e}") logger.warning(f"Failed to create index {index['name']}: {e}")
async def _create_vector_indexes(self): async def _create_vector_index(self, table_name: str, embedding_dim: int):
vdb_tables = [ """
"LIGHTRAG_VDB_CHUNKS", Create vector index for a specific table.
"LIGHTRAG_VDB_ENTITY",
"LIGHTRAG_VDB_RELATION", Args:
] table_name: Name of the table to create index on
embedding_dim: Embedding dimension for the vector column
"""
if not self.vector_index_type:
return
create_sql = { create_sql = {
"HNSW": f""" "HNSW": f"""
CREATE INDEX {{vector_index_name}} CREATE INDEX {{vector_index_name}}
ON {{k}} USING hnsw (content_vector vector_cosine_ops) ON {{table_name}} USING hnsw (content_vector vector_cosine_ops)
WITH (m = {self.hnsw_m}, ef_construction = {self.hnsw_ef}) WITH (m = {self.hnsw_m}, ef_construction = {self.hnsw_ef})
""", """,
"IVFFLAT": f""" "IVFFLAT": f"""
CREATE INDEX {{vector_index_name}} CREATE INDEX {{vector_index_name}}
ON {{k}} USING ivfflat (content_vector vector_cosine_ops) ON {{table_name}} USING ivfflat (content_vector vector_cosine_ops)
WITH (lists = {self.ivfflat_lists}) WITH (lists = {self.ivfflat_lists})
""", """,
"VCHORDRQ": f""" "VCHORDRQ": f"""
CREATE INDEX {{vector_index_name}} CREATE INDEX {{vector_index_name}}
ON {{k}} USING vchordrq (content_vector vector_cosine_ops) ON {{table_name}} USING vchordrq (content_vector vector_cosine_ops)
{f'WITH (options = $${self.vchordrq_build_options}$$)' if self.vchordrq_build_options else ''} {f'WITH (options = $${self.vchordrq_build_options}$$)' if self.vchordrq_build_options else ''}
""", """,
} }
embedding_dim = int(os.environ.get("EMBEDDING_DIM", 1024)) if self.vector_index_type not in create_sql:
for k in vdb_tables: logger.warning(
vector_index_name = ( f"Unsupported vector index type: {self.vector_index_type}. "
f"idx_{k.lower()}_{self.vector_index_type.lower()}_cosine" "Supported types: HNSW, IVFFLAT, VCHORDRQ"
) )
check_vector_index_sql = f""" return
SELECT 1 FROM pg_indexes
WHERE indexname = '{vector_index_name}' AND tablename = '{k.lower()}' k = table_name
""" vector_index_name = f"idx_{k.lower()}_{self.vector_index_type.lower()}_cosine"
try: check_vector_index_sql = f"""
vector_index_exists = await self.query(check_vector_index_sql) SELECT 1 FROM pg_indexes
if not vector_index_exists: WHERE indexname = '{vector_index_name}' AND tablename = '{k.lower()}'
# Only set vector dimension when index doesn't exist """
alter_sql = f"ALTER TABLE {k} ALTER COLUMN content_vector TYPE VECTOR({embedding_dim})" try:
await self.execute(alter_sql) vector_index_exists = await self.query(check_vector_index_sql)
logger.debug(f"Ensured vector dimension for {k}") if not vector_index_exists:
logger.info( # Only set vector dimension when index doesn't exist
f"Creating {self.vector_index_type} index {vector_index_name} on table {k}" alter_sql = f"ALTER TABLE {k} ALTER COLUMN content_vector TYPE VECTOR({embedding_dim})"
await self.execute(alter_sql)
logger.debug(f"Ensured vector dimension for {k}")
logger.info(
f"Creating {self.vector_index_type} index {vector_index_name} on table {k}"
)
await self.execute(
create_sql[self.vector_index_type].format(
vector_index_name=vector_index_name, table_name=k
) )
await self.execute( )
create_sql[self.vector_index_type].format( logger.info(
vector_index_name=vector_index_name, k=k f"Successfully created vector index {vector_index_name} on table {k}"
) )
) else:
logger.info( logger.info(
f"Successfully created vector index {vector_index_name} on table {k}" f"{self.vector_index_type} vector index {vector_index_name} already exists on table {k}"
) )
else: except Exception as e:
logger.info( logger.error(f"Failed to create vector index on table {k}, Got: {e}")
f"{self.vector_index_type} vector index {vector_index_name} already exists on table {k}"
)
except Exception as e:
logger.error(f"Failed to create vector index on table {k}, Got: {e}")
async def query( async def query(
self, self,
@ -2283,11 +2277,15 @@ class PGVectorStorage(BaseVectorStorage):
f"PostgreSQL: Legacy table '{legacy_table_name}' still exists. " f"PostgreSQL: Legacy table '{legacy_table_name}' still exists. "
f"Remove it if migration is complete." f"Remove it if migration is complete."
) )
# Ensure vector index exists even if migration was not performed
await db._create_vector_index(table_name, embedding_dim)
return return
# Case 2: Only new table exists - Already migrated or newly created # Case 2: Only new table exists - Already migrated or newly created
if new_table_exists: if new_table_exists:
logger.debug(f"PostgreSQL: Table '{table_name}' already exists") logger.debug(f"PostgreSQL: Table '{table_name}' already exists")
# Ensure vector index exists with correct embedding dimension
await db._create_vector_index(table_name, embedding_dim)
return return
# Case 3: Neither exists - Create new table # Case 3: Neither exists - Create new table
@ -2295,6 +2293,8 @@ class PGVectorStorage(BaseVectorStorage):
logger.info(f"PostgreSQL: Creating new table '{table_name}'") logger.info(f"PostgreSQL: Creating new table '{table_name}'")
await _pg_create_table(db, table_name, base_table, embedding_dim) await _pg_create_table(db, table_name, base_table, embedding_dim)
logger.info(f"PostgreSQL: Table '{table_name}' created successfully") logger.info(f"PostgreSQL: Table '{table_name}' created successfully")
# Create vector index with correct embedding dimension
await db._create_vector_index(table_name, embedding_dim)
return return
# Case 4: Only legacy exists - Migrate data # Case 4: Only legacy exists - Migrate data
@ -2312,6 +2312,8 @@ class PGVectorStorage(BaseVectorStorage):
if legacy_count == 0: if legacy_count == 0:
logger.info("PostgreSQL: Legacy table is empty, skipping migration") logger.info("PostgreSQL: Legacy table is empty, skipping migration")
await _pg_create_table(db, table_name, base_table, embedding_dim) await _pg_create_table(db, table_name, base_table, embedding_dim)
# Create vector index with correct embedding dimension
await db._create_vector_index(table_name, embedding_dim)
return return
# Create new table first # Create new table first
@ -2380,6 +2382,9 @@ class PGVectorStorage(BaseVectorStorage):
f"PostgreSQL: Migration from '{legacy_table_name}' to '{table_name}' completed successfully" f"PostgreSQL: Migration from '{legacy_table_name}' to '{table_name}' completed successfully"
) )
# Create vector index after successful migration
await db._create_vector_index(table_name, embedding_dim)
except PostgreSQLMigrationError: except PostgreSQLMigrationError:
# Re-raise migration errors without wrapping # Re-raise migration errors without wrapping
raise raise