Merge branch 'postgres-vector-index'
This commit is contained in:
commit
c2eefec707
3 changed files with 179 additions and 41 deletions
|
|
@ -26,8 +26,12 @@ port = 5432
|
||||||
user = your_username
|
user = your_username
|
||||||
password = your_password
|
password = your_password
|
||||||
database = your_database
|
database = your_database
|
||||||
workspace = default # 可选,默认为default
|
# workspace = default
|
||||||
max_connections = 12
|
max_connections = 12
|
||||||
|
vector_index_type = HNSW # HNSW or IVFFLAT
|
||||||
|
hnsw_m = 16
|
||||||
|
hnsw_ef = 64
|
||||||
|
ivfflat_lists = 100
|
||||||
|
|
||||||
[memgraph]
|
[memgraph]
|
||||||
uri = bolt://localhost:7687
|
uri = bolt://localhost:7687
|
||||||
|
|
|
||||||
|
|
@ -234,6 +234,13 @@ POSTGRES_DATABASE=your_database
|
||||||
POSTGRES_MAX_CONNECTIONS=12
|
POSTGRES_MAX_CONNECTIONS=12
|
||||||
# POSTGRES_WORKSPACE=forced_workspace_name
|
# POSTGRES_WORKSPACE=forced_workspace_name
|
||||||
|
|
||||||
|
### PostgreSQL Vector Storage Configuration
|
||||||
|
### Vector storage type: HNSW, IVFFlat
|
||||||
|
POSTGRES_VECTOR_INDEX_TYPE=HNSW
|
||||||
|
POSTGRES_HNSW_M=16
|
||||||
|
POSTGRES_HNSW_EF=200
|
||||||
|
POSTGRES_IVFFLAT_LISTS=100
|
||||||
|
|
||||||
### PostgreSQL SSL Configuration (Optional)
|
### PostgreSQL SSL Configuration (Optional)
|
||||||
# POSTGRES_SSL_MODE=require
|
# POSTGRES_SSL_MODE=require
|
||||||
# POSTGRES_SSL_CERT=/path/to/client-cert.pem
|
# POSTGRES_SSL_CERT=/path/to/client-cert.pem
|
||||||
|
|
|
||||||
|
|
@ -66,6 +66,12 @@ class PostgreSQLDB:
|
||||||
self.ssl_root_cert = config.get("ssl_root_cert")
|
self.ssl_root_cert = config.get("ssl_root_cert")
|
||||||
self.ssl_crl = config.get("ssl_crl")
|
self.ssl_crl = config.get("ssl_crl")
|
||||||
|
|
||||||
|
# Vector configuration
|
||||||
|
self.vector_index_type = config.get("vector_index_type")
|
||||||
|
self.hnsw_m = config.get("hnsw_m")
|
||||||
|
self.hnsw_ef = config.get("hnsw_ef")
|
||||||
|
self.ivfflat_lists = config.get("ivfflat_lists")
|
||||||
|
|
||||||
if self.user is None or self.password is None or self.database is None:
|
if self.user is None or self.password is None or self.database is None:
|
||||||
raise ValueError("Missing database user, password, or database")
|
raise ValueError("Missing database user, password, or database")
|
||||||
|
|
||||||
|
|
@ -899,6 +905,30 @@ class PostgreSQLDB:
|
||||||
f"PostgreSQL, Failed to create composite index on table {k}, Got: {e}"
|
f"PostgreSQL, Failed to create composite index on table {k}, Got: {e}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Create vector indexs
|
||||||
|
if self.vector_index_type:
|
||||||
|
logger.info(
|
||||||
|
f"PostgreSQL, Create vector indexs, type: {self.vector_index_type}"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
if self.vector_index_type == "HNSW":
|
||||||
|
await self._create_hnsw_vector_indexes()
|
||||||
|
elif self.vector_index_type == "IVFFLAT":
|
||||||
|
await self._create_ivfflat_vector_indexes()
|
||||||
|
elif self.vector_index_type == "FLAT":
|
||||||
|
logger.warning(
|
||||||
|
"FLAT index type is not supported by pgvector. Skipping vector index creation. "
|
||||||
|
"Please use 'HNSW' or 'IVFFLAT' instead."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"Doesn't support this vector index type: {self.vector_index_type}. "
|
||||||
|
"Supported types: HNSW, IVFFLAT"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f"PostgreSQL, Failed to create vector index, type: {self.vector_index_type}, Got: {e}"
|
||||||
|
)
|
||||||
# After all tables are created, attempt to migrate timestamp fields
|
# After all tables are created, attempt to migrate timestamp fields
|
||||||
try:
|
try:
|
||||||
await self._migrate_timestamp_columns()
|
await self._migrate_timestamp_columns()
|
||||||
|
|
@ -1101,6 +1131,87 @@ class PostgreSQLDB:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to create index {index['name']}: {e}")
|
logger.warning(f"Failed to create index {index['name']}: {e}")
|
||||||
|
|
||||||
|
async def _create_hnsw_vector_indexes(self):
|
||||||
|
vdb_tables = [
|
||||||
|
"LIGHTRAG_VDB_CHUNKS",
|
||||||
|
"LIGHTRAG_VDB_ENTITY",
|
||||||
|
"LIGHTRAG_VDB_RELATION",
|
||||||
|
]
|
||||||
|
|
||||||
|
embedding_dim = int(os.environ.get("EMBEDDING_DIM", 1024))
|
||||||
|
|
||||||
|
for k in vdb_tables:
|
||||||
|
vector_index_name = f"idx_{k.lower()}_hnsw_cosine"
|
||||||
|
check_vector_index_sql = f"""
|
||||||
|
SELECT 1 FROM pg_indexes
|
||||||
|
WHERE indexname = '{vector_index_name}'
|
||||||
|
AND tablename = '{k.lower()}'
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
vector_index_exists = await self.query(check_vector_index_sql)
|
||||||
|
if not vector_index_exists:
|
||||||
|
# Only set vector dimension when index doesn't exist
|
||||||
|
alter_sql = f"ALTER TABLE {k} ALTER COLUMN content_vector TYPE VECTOR({embedding_dim})"
|
||||||
|
await self.execute(alter_sql)
|
||||||
|
logger.debug(f"Ensured vector dimension for {k}")
|
||||||
|
|
||||||
|
create_vector_index_sql = f"""
|
||||||
|
CREATE INDEX {vector_index_name}
|
||||||
|
ON {k} USING hnsw (content_vector vector_cosine_ops)
|
||||||
|
WITH (m = {self.hnsw_m}, ef_construction = {self.hnsw_ef})
|
||||||
|
"""
|
||||||
|
logger.info(f"Creating hnsw index {vector_index_name} on table {k}")
|
||||||
|
await self.execute(create_vector_index_sql)
|
||||||
|
logger.info(
|
||||||
|
f"Successfully created vector index {vector_index_name} on table {k}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
f"HNSW vector index {vector_index_name} already exists on table {k}"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to create vector index on table {k}, Got: {e}")
|
||||||
|
|
||||||
|
async def _create_ivfflat_vector_indexes(self):
|
||||||
|
vdb_tables = [
|
||||||
|
"LIGHTRAG_VDB_CHUNKS",
|
||||||
|
"LIGHTRAG_VDB_ENTITY",
|
||||||
|
"LIGHTRAG_VDB_RELATION",
|
||||||
|
]
|
||||||
|
|
||||||
|
embedding_dim = int(os.environ.get("EMBEDDING_DIM", 1024))
|
||||||
|
|
||||||
|
for k in vdb_tables:
|
||||||
|
index_name = f"idx_{k.lower()}_ivfflat_cosine"
|
||||||
|
check_index_sql = f"""
|
||||||
|
SELECT 1 FROM pg_indexes
|
||||||
|
WHERE indexname = '{index_name}' AND tablename = '{k.lower()}'
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
exists = await self.query(check_index_sql)
|
||||||
|
if not exists:
|
||||||
|
# Only set vector dimension when index doesn't exist
|
||||||
|
alter_sql = f"ALTER TABLE {k} ALTER COLUMN content_vector TYPE VECTOR({embedding_dim})"
|
||||||
|
await self.execute(alter_sql)
|
||||||
|
logger.debug(f"Ensured vector dimension for {k}")
|
||||||
|
|
||||||
|
create_sql = f"""
|
||||||
|
CREATE INDEX {index_name}
|
||||||
|
ON {k} USING ivfflat (content_vector vector_cosine_ops)
|
||||||
|
WITH (lists = {self.ivfflat_lists})
|
||||||
|
"""
|
||||||
|
logger.info(f"Creating ivfflat index {index_name} on table {k}")
|
||||||
|
await self.execute(create_sql)
|
||||||
|
logger.info(
|
||||||
|
f"Successfully created ivfflat index {index_name} on table {k}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
f"Ivfflat vector index {index_name} already exists on table {k}"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to create ivfflat index on {k}: {e}")
|
||||||
|
|
||||||
async def query(
|
async def query(
|
||||||
self,
|
self,
|
||||||
sql: str,
|
sql: str,
|
||||||
|
|
@ -1241,6 +1352,28 @@ class ClientManager:
|
||||||
"POSTGRES_SSL_CRL",
|
"POSTGRES_SSL_CRL",
|
||||||
config.get("postgres", "ssl_crl", fallback=None),
|
config.get("postgres", "ssl_crl", fallback=None),
|
||||||
),
|
),
|
||||||
|
"vector_index_type": os.environ.get(
|
||||||
|
"POSTGRES_VECTOR_INDEX_TYPE",
|
||||||
|
config.get("postgres", "vector_index_type", fallback="HNSW"),
|
||||||
|
),
|
||||||
|
"hnsw_m": int(
|
||||||
|
os.environ.get(
|
||||||
|
"POSTGRES_HNSW_M",
|
||||||
|
config.get("postgres", "hnsw_m", fallback="16"),
|
||||||
|
)
|
||||||
|
),
|
||||||
|
"hnsw_ef": int(
|
||||||
|
os.environ.get(
|
||||||
|
"POSTGRES_HNSW_EF",
|
||||||
|
config.get("postgres", "hnsw_ef", fallback="64"),
|
||||||
|
)
|
||||||
|
),
|
||||||
|
"ivfflat_lists": int(
|
||||||
|
os.environ.get(
|
||||||
|
"POSTGRES_IVFFLAT_LISTS",
|
||||||
|
config.get("postgres", "ivfflat_lists", fallback="100"),
|
||||||
|
)
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|
@ -1878,7 +2011,7 @@ class PGVectorStorage(BaseVectorStorage):
|
||||||
params = {
|
params = {
|
||||||
"workspace": self.db.workspace,
|
"workspace": self.db.workspace,
|
||||||
"doc_ids": ids,
|
"doc_ids": ids,
|
||||||
"better_than_threshold": self.cosine_better_than_threshold,
|
"closer_than_threshold": 1 - self.cosine_better_than_threshold,
|
||||||
"top_k": top_k,
|
"top_k": top_k,
|
||||||
}
|
}
|
||||||
results = await self.db.query(sql, params=params, multirows=True)
|
results = await self.db.query(sql, params=params, multirows=True)
|
||||||
|
|
@ -4035,14 +4168,14 @@ TABLES = {
|
||||||
)"""
|
)"""
|
||||||
},
|
},
|
||||||
"LIGHTRAG_VDB_CHUNKS": {
|
"LIGHTRAG_VDB_CHUNKS": {
|
||||||
"ddl": """CREATE TABLE LIGHTRAG_VDB_CHUNKS (
|
"ddl": f"""CREATE TABLE LIGHTRAG_VDB_CHUNKS (
|
||||||
id VARCHAR(255),
|
id VARCHAR(255),
|
||||||
workspace VARCHAR(255),
|
workspace VARCHAR(255),
|
||||||
full_doc_id VARCHAR(256),
|
full_doc_id VARCHAR(256),
|
||||||
chunk_order_index INTEGER,
|
chunk_order_index INTEGER,
|
||||||
tokens INTEGER,
|
tokens INTEGER,
|
||||||
content TEXT,
|
content TEXT,
|
||||||
content_vector VECTOR,
|
content_vector VECTOR({os.environ.get("EMBEDDING_DIM", 1024)}),
|
||||||
file_path TEXT NULL,
|
file_path TEXT NULL,
|
||||||
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
|
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
|
||||||
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
|
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
|
@ -4050,12 +4183,12 @@ TABLES = {
|
||||||
)"""
|
)"""
|
||||||
},
|
},
|
||||||
"LIGHTRAG_VDB_ENTITY": {
|
"LIGHTRAG_VDB_ENTITY": {
|
||||||
"ddl": """CREATE TABLE LIGHTRAG_VDB_ENTITY (
|
"ddl": f"""CREATE TABLE LIGHTRAG_VDB_ENTITY (
|
||||||
id VARCHAR(255),
|
id VARCHAR(255),
|
||||||
workspace VARCHAR(255),
|
workspace VARCHAR(255),
|
||||||
entity_name VARCHAR(512),
|
entity_name VARCHAR(512),
|
||||||
content TEXT,
|
content TEXT,
|
||||||
content_vector VECTOR,
|
content_vector VECTOR({os.environ.get("EMBEDDING_DIM", 1024)}),
|
||||||
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
|
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
|
||||||
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
|
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
|
||||||
chunk_ids VARCHAR(255)[] NULL,
|
chunk_ids VARCHAR(255)[] NULL,
|
||||||
|
|
@ -4064,13 +4197,13 @@ TABLES = {
|
||||||
)"""
|
)"""
|
||||||
},
|
},
|
||||||
"LIGHTRAG_VDB_RELATION": {
|
"LIGHTRAG_VDB_RELATION": {
|
||||||
"ddl": """CREATE TABLE LIGHTRAG_VDB_RELATION (
|
"ddl": f"""CREATE TABLE LIGHTRAG_VDB_RELATION (
|
||||||
id VARCHAR(255),
|
id VARCHAR(255),
|
||||||
workspace VARCHAR(255),
|
workspace VARCHAR(255),
|
||||||
source_id VARCHAR(512),
|
source_id VARCHAR(512),
|
||||||
target_id VARCHAR(512),
|
target_id VARCHAR(512),
|
||||||
content TEXT,
|
content TEXT,
|
||||||
content_vector VECTOR,
|
content_vector VECTOR({os.environ.get("EMBEDDING_DIM", 1024)}),
|
||||||
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
|
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
|
||||||
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
|
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
|
||||||
chunk_ids VARCHAR(255)[] NULL,
|
chunk_ids VARCHAR(255)[] NULL,
|
||||||
|
|
@ -4270,21 +4403,19 @@ SQL_TEMPLATES = {
|
||||||
update_time = EXCLUDED.update_time
|
update_time = EXCLUDED.update_time
|
||||||
""",
|
""",
|
||||||
"relationships": """
|
"relationships": """
|
||||||
WITH relevant_chunks AS (
|
WITH relevant_chunks AS (
|
||||||
SELECT id as chunk_id
|
SELECT id as chunk_id
|
||||||
FROM LIGHTRAG_VDB_CHUNKS
|
FROM LIGHTRAG_VDB_CHUNKS
|
||||||
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
||||||
)
|
)
|
||||||
SELECT source_id as src_id, target_id as tgt_id, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at
|
SELECT r.source_id as src_id, r.target_id as tgt_id,
|
||||||
FROM (
|
EXTRACT(EPOCH FROM r.create_time)::BIGINT as created_at
|
||||||
SELECT r.id, r.source_id, r.target_id, r.create_time, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance
|
|
||||||
FROM LIGHTRAG_VDB_RELATION r
|
FROM LIGHTRAG_VDB_RELATION r
|
||||||
JOIN relevant_chunks c ON c.chunk_id = ANY(r.chunk_ids)
|
JOIN relevant_chunks c ON c.chunk_id = ANY(r.chunk_ids)
|
||||||
WHERE r.workspace=$1
|
WHERE r.workspace = $1
|
||||||
) filtered
|
AND r.content_vector <=> '[{embedding_string}]'::vector < $3
|
||||||
WHERE distance>$3
|
ORDER BY r.content_vector <=> '[{embedding_string}]'::vector
|
||||||
ORDER BY distance DESC
|
LIMIT $4
|
||||||
LIMIT $4
|
|
||||||
""",
|
""",
|
||||||
"entities": """
|
"entities": """
|
||||||
WITH relevant_chunks AS (
|
WITH relevant_chunks AS (
|
||||||
|
|
@ -4292,16 +4423,14 @@ SQL_TEMPLATES = {
|
||||||
FROM LIGHTRAG_VDB_CHUNKS
|
FROM LIGHTRAG_VDB_CHUNKS
|
||||||
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
||||||
)
|
)
|
||||||
SELECT entity_name, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM
|
SELECT e.entity_name,
|
||||||
(
|
EXTRACT(EPOCH FROM e.create_time)::BIGINT as created_at
|
||||||
SELECT e.id, e.entity_name, e.create_time, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance
|
FROM LIGHTRAG_VDB_ENTITY e
|
||||||
FROM LIGHTRAG_VDB_ENTITY e
|
JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids)
|
||||||
JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids)
|
WHERE e.workspace = $1
|
||||||
WHERE e.workspace=$1
|
AND e.content_vector <=> '[{embedding_string}]'::vector < $3
|
||||||
) as chunk_distances
|
ORDER BY e.content_vector <=> '[{embedding_string}]'::vector
|
||||||
WHERE distance>$3
|
LIMIT $4
|
||||||
ORDER BY distance DESC
|
|
||||||
LIMIT $4
|
|
||||||
""",
|
""",
|
||||||
"chunks": """
|
"chunks": """
|
||||||
WITH relevant_chunks AS (
|
WITH relevant_chunks AS (
|
||||||
|
|
@ -4309,16 +4438,14 @@ SQL_TEMPLATES = {
|
||||||
FROM LIGHTRAG_VDB_CHUNKS
|
FROM LIGHTRAG_VDB_CHUNKS
|
||||||
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
||||||
)
|
)
|
||||||
SELECT id, content, file_path, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM
|
SELECT id, content, file_path,
|
||||||
(
|
EXTRACT(EPOCH FROM create_time)::BIGINT as created_at
|
||||||
SELECT id, content, file_path, create_time, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
|
FROM LIGHTRAG_VDB_CHUNKS
|
||||||
FROM LIGHTRAG_VDB_CHUNKS
|
WHERE workspace = $1
|
||||||
WHERE workspace=$1
|
AND id IN (SELECT chunk_id FROM relevant_chunks)
|
||||||
AND id IN (SELECT chunk_id FROM relevant_chunks)
|
AND content_vector <=> '[{embedding_string}]'::vector < $3
|
||||||
) as chunk_distances
|
ORDER BY content_vector <=> '[{embedding_string}]'::vector
|
||||||
WHERE distance>$3
|
LIMIT $4
|
||||||
ORDER BY distance DESC
|
|
||||||
LIMIT $4
|
|
||||||
""",
|
""",
|
||||||
# DROP tables
|
# DROP tables
|
||||||
"drop_specifiy_table_workspace": """
|
"drop_specifiy_table_workspace": """
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue