diff --git a/env.example b/env.example index 5720affc..e986ff94 100644 --- a/env.example +++ b/env.example @@ -234,6 +234,14 @@ POSTGRES_DATABASE=your_database POSTGRES_MAX_CONNECTIONS=12 # POSTGRES_WORKSPACE=forced_workspace_name +### PostgreSQL Vector Storage Configuration +### Vector storage type: HNSW, IVFFlat, FLAT + +VECTOR_INDEX=FLAT +POSTGRES_HNSW_M=16 +POSTGRES_HNSW_EF=200 +POSTGRES_IVFFLAT_LISTS=100 + ### PostgreSQL SSL Configuration (Optional) # POSTGRES_SSL_MODE=require # POSTGRES_SSL_CERT=/path/to/client-cert.pem diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 804454f7..48df5533 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -66,6 +66,9 @@ class PostgreSQLDB: self.ssl_root_cert = config.get("ssl_root_cert") self.ssl_crl = config.get("ssl_crl") + #Vector configuration + self.vector_index_type = config.get("vector_index_type") + if self.user is None or self.password is None or self.database is None: raise ValueError("Missing database user, password, or database") @@ -899,6 +902,21 @@ class PostgreSQLDB: f"PostgreSQL, Failed to create composite index on table {k}, Got: {e}" ) + # Create vector indexs + if self.vector_index_type: + try: + if self.vector_index_type == 'HNSW': + await self._create_hnsw_vector_indexs() + elif self.vector_index_type == 'IVFFLAT': + await self._create_ivfflat_vector_indexs() + elif self.vector_index_type == 'FLAT': + await self._create_flat_vector_indexes() + else: + logger.warning(f"Doesn't support this vector index type: {self.vector_index_type}. Create indexs failed!") + except Exception as e: + logger.error( + f"PostgreSQL, Failed to create vector index on table {k}, type: {self.vector_index_type}, Got: {e}" + ) # After all tables are created, attempt to migrate timestamp fields try: await self._migrate_timestamp_columns() @@ -1101,6 +1119,81 @@ class PostgreSQLDB: except Exception as e: logger.warning(f"Failed to create index {index['name']}: {e}") + async def _create_hnsw_vector_indexs(self): + vdb_tables = ["LIGHTRAG_VDB_CHUNKS", "LIGHTRAG_VDB_ENTITY", "LIGHTRAG_VDB_RELATION"] + + for k in vdb_tables: + vector_index_name = f"idx_{k.lower()}_hnsw_cosine" + check_vector_index_sql = f""" + SELECT 1 FROM pg_indexes + WHERE indexname = '{vector_index_name}' + AND tablename = '{k.lower()}' + """ + try: + vector_index_exists = await self.query(check_vector_index_sql) + if not vector_index_exists: + create_vector_index_sql = f""" + CREATE INDEX {vector_index_name} + ON {k} USING hnsw (content_vector vector_cosine_ops) + WITH (m = {os.getenv("POSTGRES_HNSW_M", 16)}, ef_construction = {os.getenv("POSTGRES_HNSW_EF", 200)}) + """ + logger.info(f"Creating hnsw index {vector_index_name} on table {k}") + await self.execute(create_vector_index_sql) + logger.info(f"Successfully created vector index {vector_index_name} on table {k}") + else: + logger.info(f"HNSW vector index {vector_index_name} already exists on table {k}") + except Exception as e: + logger.error(f"Failed to create vector index on table {k}, Got: {e}") + + async def _create_ivfflat_vector_indexs(self): + vdb_tables = ["LIGHTRAG_VDB_CHUNKS", "LIGHTRAG_VDB_ENTITY", "LIGHTRAG_VDB_RELATION"] + + for k in vdb_tables: + index_name = f"idx_{k.lower()}_ivfflat_cosine" + check_index_sql = f""" + SELECT 1 FROM pg_indexes + WHERE indexname = '{index_name}' AND tablename = '{k.lower()}' + """ + try: + exists = await self.query(check_index_sql) + if not exists: + create_sql = f""" + CREATE INDEX {index_name} + ON {k} USING ivfflat (content_vector vector_cosine_ops) + WITH (lists = {os.getenv("POSTGRES_IVFFLAT_LISTS", 100)}) + """ + logger.info(f"Creating ivfflat index {index_name} on table {k}") + await self.execute(create_sql) + logger.info(f"Successfully created ivfflat index {index_name} on table {k}") + else: + logger.info(f"Ivfflat vector index {index_name} already exists on table {k}") + except Exception as e: + logger.error(f"Failed to create ivfflat index on {k}: {e}") + + async def _create_flat_vector_indexes(self): + vdb_tables = ["LIGHTRAG_VDB_CHUNKS", "LIGHTRAG_VDB_ENTITY", "LIGHTRAG_VDB_RELATION"] + + for k in vdb_tables: + index_name = f"idx_{k.lower()}_flat_cosine" + check_index_sql = f""" + SELECT 1 FROM pg_indexes + WHERE indexname = '{index_name}' AND tablename = '{k.lower()}' + """ + try: + exists = await self.query(check_index_sql) + if not exists: + create_sql = f""" + CREATE INDEX {index_name} + ON {k} USING flat (content_vector vector_cosine_ops) + """ + logger.info(f"Creating flat index {index_name} on table {k}") + await self.execute(create_sql) + logger.info(f"Successfully created flat index {index_name} on table {k}") + else: + logger.info(f"flat index {index_name} already exists on table {k}") + except Exception as e: + logger.error(f"Failed to create flat index on {k}: {e}") + async def query( self, sql: str, @@ -1241,6 +1334,10 @@ class ClientManager: "POSTGRES_SSL_CRL", config.get("postgres", "ssl_crl", fallback=None), ), + "vector_index_type" : os.environ.get( + "POSTGRES_VECTOR_INDEX", + "FLAT" + ) } @classmethod @@ -1878,7 +1975,7 @@ class PGVectorStorage(BaseVectorStorage): params = { "workspace": self.db.workspace, "doc_ids": ids, - "better_than_threshold": self.cosine_better_than_threshold, + "closer_than_threshold": 1 - self.cosine_better_than_threshold, "top_k": top_k, } results = await self.db.query(sql, params=params, multirows=True) @@ -4035,14 +4132,14 @@ TABLES = { )""" }, "LIGHTRAG_VDB_CHUNKS": { - "ddl": """CREATE TABLE LIGHTRAG_VDB_CHUNKS ( + "ddl": f"""CREATE TABLE LIGHTRAG_VDB_CHUNKS ( id VARCHAR(255), workspace VARCHAR(255), full_doc_id VARCHAR(256), chunk_order_index INTEGER, tokens INTEGER, content TEXT, - content_vector VECTOR, + content_vector VECTOR({os.environ.get("EMBEDDING_DIM", 1024)}), file_path TEXT NULL, create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, @@ -4050,12 +4147,12 @@ TABLES = { )""" }, "LIGHTRAG_VDB_ENTITY": { - "ddl": """CREATE TABLE LIGHTRAG_VDB_ENTITY ( + "ddl": f"""CREATE TABLE LIGHTRAG_VDB_ENTITY ( id VARCHAR(255), workspace VARCHAR(255), entity_name VARCHAR(512), content TEXT, - content_vector VECTOR, + content_vector VECTOR({os.environ.get("EMBEDDING_DIM", 1024)}), create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, chunk_ids VARCHAR(255)[] NULL, @@ -4064,13 +4161,13 @@ TABLES = { )""" }, "LIGHTRAG_VDB_RELATION": { - "ddl": """CREATE TABLE LIGHTRAG_VDB_RELATION ( + "ddl": f"""CREATE TABLE LIGHTRAG_VDB_RELATION ( id VARCHAR(255), workspace VARCHAR(255), source_id VARCHAR(512), target_id VARCHAR(512), content TEXT, - content_vector VECTOR, + content_vector VECTOR({os.environ.get("EMBEDDING_DIM", 1024)}), create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP, chunk_ids VARCHAR(255)[] NULL, @@ -4270,21 +4367,19 @@ SQL_TEMPLATES = { update_time = EXCLUDED.update_time """, "relationships": """ - WITH relevant_chunks AS ( - SELECT id as chunk_id - FROM LIGHTRAG_VDB_CHUNKS - WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) - ) - SELECT source_id as src_id, target_id as tgt_id, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at - FROM ( - SELECT r.id, r.source_id, r.target_id, r.create_time, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance + WITH relevant_chunks AS ( + SELECT id as chunk_id + FROM LIGHTRAG_VDB_CHUNKS + WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) + ) + SELECT r.source_id as src_id, r.target_id as tgt_id, + EXTRACT(EPOCH FROM r.create_time)::BIGINT as created_at FROM LIGHTRAG_VDB_RELATION r JOIN relevant_chunks c ON c.chunk_id = ANY(r.chunk_ids) - WHERE r.workspace=$1 - ) filtered - WHERE distance>$3 - ORDER BY distance DESC - LIMIT $4 + WHERE r.workspace = $1 + AND r.content_vector <=> '[{embedding_string}]'::vector < $3 + ORDER BY r.content_vector <=> '[{embedding_string}]'::vector + LIMIT $4 """, "entities": """ WITH relevant_chunks AS ( @@ -4292,16 +4387,14 @@ SQL_TEMPLATES = { FROM LIGHTRAG_VDB_CHUNKS WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) ) - SELECT entity_name, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM - ( - SELECT e.id, e.entity_name, e.create_time, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance - FROM LIGHTRAG_VDB_ENTITY e - JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids) - WHERE e.workspace=$1 - ) as chunk_distances - WHERE distance>$3 - ORDER BY distance DESC - LIMIT $4 + SELECT e.entity_name, + EXTRACT(EPOCH FROM e.create_time)::BIGINT as created_at + FROM LIGHTRAG_VDB_ENTITY e + JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids) + WHERE e.workspace = $1 + AND e.content_vector <=> '[{embedding_string}]'::vector < $3 + ORDER BY e.content_vector <=> '[{embedding_string}]'::vector + LIMIT $4 """, "chunks": """ WITH relevant_chunks AS ( @@ -4309,16 +4402,14 @@ SQL_TEMPLATES = { FROM LIGHTRAG_VDB_CHUNKS WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) ) - SELECT id, content, file_path, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM - ( - SELECT id, content, file_path, create_time, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance - FROM LIGHTRAG_VDB_CHUNKS - WHERE workspace=$1 - AND id IN (SELECT chunk_id FROM relevant_chunks) - ) as chunk_distances - WHERE distance>$3 - ORDER BY distance DESC - LIMIT $4 + SELECT id, content, file_path, + EXTRACT(EPOCH FROM create_time)::BIGINT as created_at + FROM LIGHTRAG_VDB_CHUNKS + WHERE workspace = $1 + AND id IN (SELECT chunk_id FROM relevant_chunks) + AND content_vector <=> '[{embedding_string}]'::vector < $3 + ORDER BY content_vector <=> '[{embedding_string}]'::vector + LIMIT $4 """, # DROP tables "drop_specifiy_table_workspace": """