feat: add vector index creation functionality for PostgreSQL

This commit is contained in:
Matt23-star 2025-08-07 23:07:18 +08:00
parent 7780776af6
commit 727ca43d3c
2 changed files with 139 additions and 40 deletions

View file

@ -234,6 +234,14 @@ POSTGRES_DATABASE=your_database
POSTGRES_MAX_CONNECTIONS=12
# POSTGRES_WORKSPACE=forced_workspace_name
### PostgreSQL Vector Storage Configuration
### Vector storage type: HNSW, IVFFlat, FLAT
VECTOR_INDEX=FLAT
POSTGRES_HNSW_M=16
POSTGRES_HNSW_EF=200
POSTGRES_IVFFLAT_LISTS=100
### PostgreSQL SSL Configuration (Optional)
# POSTGRES_SSL_MODE=require
# POSTGRES_SSL_CERT=/path/to/client-cert.pem

View file

@ -66,6 +66,9 @@ class PostgreSQLDB:
self.ssl_root_cert = config.get("ssl_root_cert")
self.ssl_crl = config.get("ssl_crl")
#Vector configuration
self.vector_index_type = config.get("vector_index_type")
if self.user is None or self.password is None or self.database is None:
raise ValueError("Missing database user, password, or database")
@ -899,6 +902,21 @@ class PostgreSQLDB:
f"PostgreSQL, Failed to create composite index on table {k}, Got: {e}"
)
# Create vector indexs
if self.vector_index_type:
try:
if self.vector_index_type == 'HNSW':
await self._create_hnsw_vector_indexs()
elif self.vector_index_type == 'IVFFLAT':
await self._create_ivfflat_vector_indexs()
elif self.vector_index_type == 'FLAT':
await self._create_flat_vector_indexes()
else:
logger.warning(f"Doesn't support this vector index type: {self.vector_index_type}. Create indexs failed!")
except Exception as e:
logger.error(
f"PostgreSQL, Failed to create vector index on table {k}, type: {self.vector_index_type}, Got: {e}"
)
# After all tables are created, attempt to migrate timestamp fields
try:
await self._migrate_timestamp_columns()
@ -1101,6 +1119,81 @@ class PostgreSQLDB:
except Exception as e:
logger.warning(f"Failed to create index {index['name']}: {e}")
async def _create_hnsw_vector_indexs(self):
vdb_tables = ["LIGHTRAG_VDB_CHUNKS", "LIGHTRAG_VDB_ENTITY", "LIGHTRAG_VDB_RELATION"]
for k in vdb_tables:
vector_index_name = f"idx_{k.lower()}_hnsw_cosine"
check_vector_index_sql = f"""
SELECT 1 FROM pg_indexes
WHERE indexname = '{vector_index_name}'
AND tablename = '{k.lower()}'
"""
try:
vector_index_exists = await self.query(check_vector_index_sql)
if not vector_index_exists:
create_vector_index_sql = f"""
CREATE INDEX {vector_index_name}
ON {k} USING hnsw (content_vector vector_cosine_ops)
WITH (m = {os.getenv("POSTGRES_HNSW_M", 16)}, ef_construction = {os.getenv("POSTGRES_HNSW_EF", 200)})
"""
logger.info(f"Creating hnsw index {vector_index_name} on table {k}")
await self.execute(create_vector_index_sql)
logger.info(f"Successfully created vector index {vector_index_name} on table {k}")
else:
logger.info(f"HNSW vector index {vector_index_name} already exists on table {k}")
except Exception as e:
logger.error(f"Failed to create vector index on table {k}, Got: {e}")
async def _create_ivfflat_vector_indexs(self):
vdb_tables = ["LIGHTRAG_VDB_CHUNKS", "LIGHTRAG_VDB_ENTITY", "LIGHTRAG_VDB_RELATION"]
for k in vdb_tables:
index_name = f"idx_{k.lower()}_ivfflat_cosine"
check_index_sql = f"""
SELECT 1 FROM pg_indexes
WHERE indexname = '{index_name}' AND tablename = '{k.lower()}'
"""
try:
exists = await self.query(check_index_sql)
if not exists:
create_sql = f"""
CREATE INDEX {index_name}
ON {k} USING ivfflat (content_vector vector_cosine_ops)
WITH (lists = {os.getenv("POSTGRES_IVFFLAT_LISTS", 100)})
"""
logger.info(f"Creating ivfflat index {index_name} on table {k}")
await self.execute(create_sql)
logger.info(f"Successfully created ivfflat index {index_name} on table {k}")
else:
logger.info(f"Ivfflat vector index {index_name} already exists on table {k}")
except Exception as e:
logger.error(f"Failed to create ivfflat index on {k}: {e}")
async def _create_flat_vector_indexes(self):
vdb_tables = ["LIGHTRAG_VDB_CHUNKS", "LIGHTRAG_VDB_ENTITY", "LIGHTRAG_VDB_RELATION"]
for k in vdb_tables:
index_name = f"idx_{k.lower()}_flat_cosine"
check_index_sql = f"""
SELECT 1 FROM pg_indexes
WHERE indexname = '{index_name}' AND tablename = '{k.lower()}'
"""
try:
exists = await self.query(check_index_sql)
if not exists:
create_sql = f"""
CREATE INDEX {index_name}
ON {k} USING flat (content_vector vector_cosine_ops)
"""
logger.info(f"Creating flat index {index_name} on table {k}")
await self.execute(create_sql)
logger.info(f"Successfully created flat index {index_name} on table {k}")
else:
logger.info(f"flat index {index_name} already exists on table {k}")
except Exception as e:
logger.error(f"Failed to create flat index on {k}: {e}")
async def query(
self,
sql: str,
@ -1241,6 +1334,10 @@ class ClientManager:
"POSTGRES_SSL_CRL",
config.get("postgres", "ssl_crl", fallback=None),
),
"vector_index_type" : os.environ.get(
"POSTGRES_VECTOR_INDEX",
"FLAT"
)
}
@classmethod
@ -1878,7 +1975,7 @@ class PGVectorStorage(BaseVectorStorage):
params = {
"workspace": self.db.workspace,
"doc_ids": ids,
"better_than_threshold": self.cosine_better_than_threshold,
"closer_than_threshold": 1 - self.cosine_better_than_threshold,
"top_k": top_k,
}
results = await self.db.query(sql, params=params, multirows=True)
@ -4035,14 +4132,14 @@ TABLES = {
)"""
},
"LIGHTRAG_VDB_CHUNKS": {
"ddl": """CREATE TABLE LIGHTRAG_VDB_CHUNKS (
"ddl": f"""CREATE TABLE LIGHTRAG_VDB_CHUNKS (
id VARCHAR(255),
workspace VARCHAR(255),
full_doc_id VARCHAR(256),
chunk_order_index INTEGER,
tokens INTEGER,
content TEXT,
content_vector VECTOR,
content_vector VECTOR({os.environ.get("EMBEDDING_DIM", 1024)}),
file_path TEXT NULL,
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
@ -4050,12 +4147,12 @@ TABLES = {
)"""
},
"LIGHTRAG_VDB_ENTITY": {
"ddl": """CREATE TABLE LIGHTRAG_VDB_ENTITY (
"ddl": f"""CREATE TABLE LIGHTRAG_VDB_ENTITY (
id VARCHAR(255),
workspace VARCHAR(255),
entity_name VARCHAR(512),
content TEXT,
content_vector VECTOR,
content_vector VECTOR({os.environ.get("EMBEDDING_DIM", 1024)}),
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
chunk_ids VARCHAR(255)[] NULL,
@ -4064,13 +4161,13 @@ TABLES = {
)"""
},
"LIGHTRAG_VDB_RELATION": {
"ddl": """CREATE TABLE LIGHTRAG_VDB_RELATION (
"ddl": f"""CREATE TABLE LIGHTRAG_VDB_RELATION (
id VARCHAR(255),
workspace VARCHAR(255),
source_id VARCHAR(512),
target_id VARCHAR(512),
content TEXT,
content_vector VECTOR,
content_vector VECTOR({os.environ.get("EMBEDDING_DIM", 1024)}),
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
chunk_ids VARCHAR(255)[] NULL,
@ -4270,21 +4367,19 @@ SQL_TEMPLATES = {
update_time = EXCLUDED.update_time
""",
"relationships": """
WITH relevant_chunks AS (
SELECT id as chunk_id
FROM LIGHTRAG_VDB_CHUNKS
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
)
SELECT source_id as src_id, target_id as tgt_id, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at
FROM (
SELECT r.id, r.source_id, r.target_id, r.create_time, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance
WITH relevant_chunks AS (
SELECT id as chunk_id
FROM LIGHTRAG_VDB_CHUNKS
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
)
SELECT r.source_id as src_id, r.target_id as tgt_id,
EXTRACT(EPOCH FROM r.create_time)::BIGINT as created_at
FROM LIGHTRAG_VDB_RELATION r
JOIN relevant_chunks c ON c.chunk_id = ANY(r.chunk_ids)
WHERE r.workspace=$1
) filtered
WHERE distance>$3
ORDER BY distance DESC
LIMIT $4
WHERE r.workspace = $1
AND r.content_vector <=> '[{embedding_string}]'::vector < $3
ORDER BY r.content_vector <=> '[{embedding_string}]'::vector
LIMIT $4
""",
"entities": """
WITH relevant_chunks AS (
@ -4292,16 +4387,14 @@ SQL_TEMPLATES = {
FROM LIGHTRAG_VDB_CHUNKS
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
)
SELECT entity_name, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM
(
SELECT e.id, e.entity_name, e.create_time, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance
FROM LIGHTRAG_VDB_ENTITY e
JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids)
WHERE e.workspace=$1
) as chunk_distances
WHERE distance>$3
ORDER BY distance DESC
LIMIT $4
SELECT e.entity_name,
EXTRACT(EPOCH FROM e.create_time)::BIGINT as created_at
FROM LIGHTRAG_VDB_ENTITY e
JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids)
WHERE e.workspace = $1
AND e.content_vector <=> '[{embedding_string}]'::vector < $3
ORDER BY e.content_vector <=> '[{embedding_string}]'::vector
LIMIT $4
""",
"chunks": """
WITH relevant_chunks AS (
@ -4309,16 +4402,14 @@ SQL_TEMPLATES = {
FROM LIGHTRAG_VDB_CHUNKS
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
)
SELECT id, content, file_path, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM
(
SELECT id, content, file_path, create_time, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
FROM LIGHTRAG_VDB_CHUNKS
WHERE workspace=$1
AND id IN (SELECT chunk_id FROM relevant_chunks)
) as chunk_distances
WHERE distance>$3
ORDER BY distance DESC
LIMIT $4
SELECT id, content, file_path,
EXTRACT(EPOCH FROM create_time)::BIGINT as created_at
FROM LIGHTRAG_VDB_CHUNKS
WHERE workspace = $1
AND id IN (SELECT chunk_id FROM relevant_chunks)
AND content_vector <=> '[{embedding_string}]'::vector < $3
ORDER BY content_vector <=> '[{embedding_string}]'::vector
LIMIT $4
""",
# DROP tables
"drop_specifiy_table_workspace": """