Merge pull request #2237 from yrangana/feat/optimize-postgres-initialization

Optimize PostgreSQL initialization performance
This commit is contained in:
Daniel.y 2025-10-21 22:17:46 +08:00 committed by GitHub
commit 907204714b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -537,49 +537,74 @@ class PostgreSQLDB:
"LIGHTRAG_DOC_STATUS": ["created_at", "updated_at"],
}
for table_name, columns in tables_to_migrate.items():
for column_name in columns:
try:
# Check if column exists
check_column_sql = f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = '{table_name.lower()}'
AND column_name = '{column_name}'
"""
try:
# Optimization: Batch check all columns in one query instead of 8 separate queries
table_names_lower = [t.lower() for t in tables_to_migrate.keys()]
all_column_names = list(
set(col for cols in tables_to_migrate.values() for col in cols)
)
column_info = await self.query(check_column_sql)
if not column_info:
check_all_columns_sql = """
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_name = ANY($1)
AND column_name = ANY($2)
"""
all_columns_result = await self.query(
check_all_columns_sql,
[table_names_lower, all_column_names],
multirows=True,
)
# Build lookup dict: (table_name, column_name) -> data_type
column_types = {}
if all_columns_result:
column_types = {
(row["table_name"].upper(), row["column_name"]): row["data_type"]
for row in all_columns_result
}
# Now iterate and migrate only what's needed
for table_name, columns in tables_to_migrate.items():
for column_name in columns:
try:
data_type = column_types.get((table_name, column_name))
if not data_type:
logger.warning(
f"Column {table_name}.{column_name} does not exist, skipping migration"
)
continue
# Check column type
if data_type == "timestamp without time zone":
logger.debug(
f"Column {table_name}.{column_name} is already witimezone-free, no migration needed"
)
continue
# Execute migration, explicitly specifying UTC timezone for interpreting original data
logger.info(
f"Migrating {table_name}.{column_name} from {data_type} to TIMESTAMP(0) type"
)
migration_sql = f"""
ALTER TABLE {table_name}
ALTER COLUMN {column_name} TYPE TIMESTAMP(0),
ALTER COLUMN {column_name} SET DEFAULT CURRENT_TIMESTAMP
"""
await self.execute(migration_sql)
logger.info(
f"Successfully migrated {table_name}.{column_name} to timezone-free type"
)
except Exception as e:
# Log error but don't interrupt the process
logger.warning(
f"Column {table_name}.{column_name} does not exist, skipping migration"
f"Failed to migrate {table_name}.{column_name}: {e}"
)
continue
# Check column type
data_type = column_info.get("data_type")
if data_type == "timestamp without time zone":
logger.debug(
f"Column {table_name}.{column_name} is already witimezone-free, no migration needed"
)
continue
# Execute migration, explicitly specifying UTC timezone for interpreting original data
logger.info(
f"Migrating {table_name}.{column_name} from {data_type} to TIMESTAMP(0) type"
)
migration_sql = f"""
ALTER TABLE {table_name}
ALTER COLUMN {column_name} TYPE TIMESTAMP(0),
ALTER COLUMN {column_name} SET DEFAULT CURRENT_TIMESTAMP
"""
await self.execute(migration_sql)
logger.info(
f"Successfully migrated {table_name}.{column_name} to timezone-free type"
)
except Exception as e:
# Log error but don't interrupt the process
logger.warning(f"Failed to migrate {table_name}.{column_name}: {e}")
except Exception as e:
logger.error(f"Failed to batch check timestamp columns: {e}")
async def _migrate_doc_chunks_to_vdb_chunks(self):
"""
@ -956,73 +981,89 @@ class PostgreSQLDB:
},
]
for migration in field_migrations:
try:
# Check current column definition
check_column_sql = """
SELECT column_name, data_type, character_maximum_length, is_nullable
FROM information_schema.columns
WHERE table_name = $1 AND column_name = $2
"""
params = {
"table_name": migration["table"].lower(),
"column_name": migration["column"],
try:
# Optimization: Batch check all columns in one query instead of 5 separate queries
unique_tables = list(set(m["table"].lower() for m in field_migrations))
unique_columns = list(set(m["column"] for m in field_migrations))
check_all_columns_sql = """
SELECT table_name, column_name, data_type, character_maximum_length, is_nullable
FROM information_schema.columns
WHERE table_name = ANY($1)
AND column_name = ANY($2)
"""
all_columns_result = await self.query(
check_all_columns_sql, [unique_tables, unique_columns], multirows=True
)
# Build lookup dict: (table_name, column_name) -> column_info
column_info_map = {}
if all_columns_result:
column_info_map = {
(row["table_name"].upper(), row["column_name"]): row
for row in all_columns_result
}
column_info = await self.query(
check_column_sql,
list(params.values()),
)
if not column_info:
# Now iterate and migrate only what's needed
for migration in field_migrations:
try:
column_info = column_info_map.get(
(migration["table"], migration["column"])
)
if not column_info:
logger.warning(
f"Column {migration['table']}.{migration['column']} does not exist, skipping migration"
)
continue
current_type = column_info.get("data_type", "").lower()
current_length = column_info.get("character_maximum_length")
# Check if migration is needed
needs_migration = False
if migration["column"] == "entity_name" and current_length == 255:
needs_migration = True
elif (
migration["column"] in ["source_id", "target_id"]
and current_length == 256
):
needs_migration = True
elif (
migration["column"] == "file_path"
and current_type == "character varying"
):
needs_migration = True
if needs_migration:
logger.info(
f"Migrating {migration['table']}.{migration['column']}: {migration['description']}"
)
# Execute the migration
alter_sql = f"""
ALTER TABLE {migration["table"]}
ALTER COLUMN {migration["column"]} TYPE {migration["new_type"]}
"""
await self.execute(alter_sql)
logger.info(
f"Successfully migrated {migration['table']}.{migration['column']}"
)
else:
logger.debug(
f"Column {migration['table']}.{migration['column']} already has correct type, no migration needed"
)
except Exception as e:
# Log error but don't interrupt the process
logger.warning(
f"Column {migration['table']}.{migration['column']} does not exist, skipping migration"
f"Failed to migrate {migration['table']}.{migration['column']}: {e}"
)
continue
current_type = column_info.get("data_type", "").lower()
current_length = column_info.get("character_maximum_length")
# Check if migration is needed
needs_migration = False
if migration["column"] == "entity_name" and current_length == 255:
needs_migration = True
elif (
migration["column"] in ["source_id", "target_id"]
and current_length == 256
):
needs_migration = True
elif (
migration["column"] == "file_path"
and current_type == "character varying"
):
needs_migration = True
if needs_migration:
logger.info(
f"Migrating {migration['table']}.{migration['column']}: {migration['description']}"
)
# Execute the migration
alter_sql = f"""
ALTER TABLE {migration["table"]}
ALTER COLUMN {migration["column"]} TYPE {migration["new_type"]}
"""
await self.execute(alter_sql)
logger.info(
f"Successfully migrated {migration['table']}.{migration['column']}"
)
else:
logger.debug(
f"Column {migration['table']}.{migration['column']} already has correct type, no migration needed"
)
except Exception as e:
# Log error but don't interrupt the process
logger.warning(
f"Failed to migrate {migration['table']}.{migration['column']}: {e}"
)
except Exception as e:
logger.error(f"Failed to batch check field lengths: {e}")
async def check_tables(self):
# First create all tables
@ -1042,47 +1083,59 @@ class PostgreSQLDB:
)
raise e
# Create index for id column in each table
try:
# Batch check all indexes at once (optimization: single query instead of N queries)
try:
table_names = list(TABLES.keys())
table_names_lower = [t.lower() for t in table_names]
# Get all existing indexes for our tables in one query
check_all_indexes_sql = """
SELECT indexname, tablename
FROM pg_indexes
WHERE tablename = ANY($1)
"""
existing_indexes_result = await self.query(
check_all_indexes_sql, [table_names_lower], multirows=True
)
# Build a set of existing index names for fast lookup
existing_indexes = set()
if existing_indexes_result:
existing_indexes = {row["indexname"] for row in existing_indexes_result}
# Create missing indexes
for k in table_names:
# Create index for id column if missing
index_name = f"idx_{k.lower()}_id"
check_index_sql = f"""
SELECT 1 FROM pg_indexes
WHERE indexname = '{index_name}'
AND tablename = '{k.lower()}'
"""
index_exists = await self.query(check_index_sql)
if index_name not in existing_indexes:
try:
create_index_sql = f"CREATE INDEX {index_name} ON {k}(id)"
logger.info(
f"PostgreSQL, Creating index {index_name} on table {k}"
)
await self.execute(create_index_sql)
except Exception as e:
logger.error(
f"PostgreSQL, Failed to create index {index_name}, Got: {e}"
)
if not index_exists:
create_index_sql = f"CREATE INDEX {index_name} ON {k}(id)"
logger.info(f"PostgreSQL, Creating index {index_name} on table {k}")
await self.execute(create_index_sql)
except Exception as e:
logger.error(
f"PostgreSQL, Failed to create index on table {k}, Got: {e}"
)
# Create composite index for (workspace, id) columns in each table
try:
# Create composite index for (workspace, id) if missing
composite_index_name = f"idx_{k.lower()}_workspace_id"
check_composite_index_sql = f"""
SELECT 1 FROM pg_indexes
WHERE indexname = '{composite_index_name}'
AND tablename = '{k.lower()}'
"""
composite_index_exists = await self.query(check_composite_index_sql)
if not composite_index_exists:
create_composite_index_sql = (
f"CREATE INDEX {composite_index_name} ON {k}(workspace, id)"
)
logger.info(
f"PostgreSQL, Creating composite index {composite_index_name} on table {k}"
)
await self.execute(create_composite_index_sql)
except Exception as e:
logger.error(
f"PostgreSQL, Failed to create composite index on table {k}, Got: {e}"
)
if composite_index_name not in existing_indexes:
try:
create_composite_index_sql = (
f"CREATE INDEX {composite_index_name} ON {k}(workspace, id)"
)
logger.info(
f"PostgreSQL, Creating composite index {composite_index_name} on table {k}"
)
await self.execute(create_composite_index_sql)
except Exception as e:
logger.error(
f"PostgreSQL, Failed to create composite index {composite_index_name}, Got: {e}"
)
except Exception as e:
logger.error(f"PostgreSQL, Failed to batch check/create indexes: {e}")
# Create vector indexs
if self.vector_index_type: