Fix: Resolve timezone handling problem in PostgreSQL storage

- Changed timestamp columns to naive UTC
- Added datetime formatting utilities
- Updated SQL templates for timestamp extraction
- Simplified timestamp migration logic
This commit is contained in:
yangdx 2025-07-14 04:12:52 +08:00
parent 375bfd57a4
commit 7e988158a9

View file

@ -209,20 +209,20 @@ class PostgreSQLDB:
# Check column type # Check column type
data_type = column_info.get("data_type") data_type = column_info.get("data_type")
if data_type == "timestamp with time zone": if data_type == "timestamp without time zone":
logger.info( logger.debug(
f"Column {table_name}.{column_name} is already timezone-aware, no migration needed" f"Column {table_name}.{column_name} is already timezone-aware, no migration needed"
) )
continue continue
# Execute migration, explicitly specifying UTC timezone for interpreting original data # Execute migration, explicitly specifying UTC timezone for interpreting original data
logger.info( logger.info(
f"Migrating {table_name}.{column_name} to timezone-aware type" f"Migrating {table_name}.{column_name} from {data_type} to TIMESTAMP(0) type"
) )
migration_sql = f""" migration_sql = f"""
ALTER TABLE {table_name} ALTER TABLE {table_name}
ALTER COLUMN {column_name} TYPE TIMESTAMP(0) WITH TIME ZONE ALTER COLUMN {column_name} TYPE TIMESTAMP(0),
USING {column_name} AT TIME ZONE 'UTC' ALTER COLUMN {column_name} SET DEFAULT CURRENT_TIMESTAMP
""" """
await self.execute(migration_sql) await self.execute(migration_sql)
@ -569,7 +569,7 @@ class PostgreSQLDB:
f"Successfully migrated {migration['table']}.{migration['column']}" f"Successfully migrated {migration['table']}.{migration['column']}"
) )
else: else:
logger.info( logger.debug(
f"Column {migration['table']}.{migration['column']} already has correct type, no migration needed" f"Column {migration['table']}.{migration['column']} already has correct type, no migration needed"
) )
@ -1054,7 +1054,8 @@ class PGKVStorage(BaseKVStorage):
return return
if is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS): if is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
current_time = datetime.datetime.now(timezone.utc) # Get current UTC time and convert to naive datetime for database storage
current_time = datetime.datetime.now(timezone.utc).replace(tzinfo=None)
for k, v in data.items(): for k, v in data.items():
upsert_sql = SQL_TEMPLATES["upsert_text_chunk"] upsert_sql = SQL_TEMPLATES["upsert_text_chunk"]
_data = { _data = {
@ -1292,8 +1293,8 @@ class PGVectorStorage(BaseVectorStorage):
if not data: if not data:
return return
# Get current time with UTC timezone # Get current UTC time and convert to naive datetime for database storage
current_time = datetime.datetime.now(timezone.utc) current_time = datetime.datetime.now(timezone.utc).replace(tzinfo=None)
list_data = [ list_data = [
{ {
"__id__": k, "__id__": k,
@ -1489,6 +1490,15 @@ class PGVectorStorage(BaseVectorStorage):
class PGDocStatusStorage(DocStatusStorage): class PGDocStatusStorage(DocStatusStorage):
db: PostgreSQLDB = field(default=None) db: PostgreSQLDB = field(default=None)
def _format_datetime_with_timezone(self, dt):
"""Convert datetime to ISO format string with timezone info"""
if dt is None:
return None
# If no timezone info, assume it's UTC time
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.isoformat()
async def initialize(self): async def initialize(self):
if self.db is None: if self.db is None:
self.db = await ClientManager.get_client() self.db = await ClientManager.get_client()
@ -1548,14 +1558,18 @@ class PGDocStatusStorage(DocStatusStorage):
except json.JSONDecodeError: except json.JSONDecodeError:
chunks_list = [] chunks_list = []
# Convert datetime objects to ISO format strings with timezone info
created_at = self._format_datetime_with_timezone(result[0]["created_at"])
updated_at = self._format_datetime_with_timezone(result[0]["updated_at"])
return dict( return dict(
content=result[0]["content"], content=result[0]["content"],
content_length=result[0]["content_length"], content_length=result[0]["content_length"],
content_summary=result[0]["content_summary"], content_summary=result[0]["content_summary"],
status=result[0]["status"], status=result[0]["status"],
chunks_count=result[0]["chunks_count"], chunks_count=result[0]["chunks_count"],
created_at=result[0]["created_at"], created_at=created_at,
updated_at=result[0]["updated_at"], updated_at=updated_at,
file_path=result[0]["file_path"], file_path=result[0]["file_path"],
chunks_list=chunks_list, chunks_list=chunks_list,
) )
@ -1583,6 +1597,10 @@ class PGDocStatusStorage(DocStatusStorage):
except json.JSONDecodeError: except json.JSONDecodeError:
chunks_list = [] chunks_list = []
# Convert datetime objects to ISO format strings with timezone info
created_at = self._format_datetime_with_timezone(row["created_at"])
updated_at = self._format_datetime_with_timezone(row["updated_at"])
processed_results.append( processed_results.append(
{ {
"content": row["content"], "content": row["content"],
@ -1590,8 +1608,8 @@ class PGDocStatusStorage(DocStatusStorage):
"content_summary": row["content_summary"], "content_summary": row["content_summary"],
"status": row["status"], "status": row["status"],
"chunks_count": row["chunks_count"], "chunks_count": row["chunks_count"],
"created_at": row["created_at"], "created_at": created_at,
"updated_at": row["updated_at"], "updated_at": updated_at,
"file_path": row["file_path"], "file_path": row["file_path"],
"chunks_list": chunks_list, "chunks_list": chunks_list,
} }
@ -1629,13 +1647,17 @@ class PGDocStatusStorage(DocStatusStorage):
except json.JSONDecodeError: except json.JSONDecodeError:
chunks_list = [] chunks_list = []
# Convert datetime objects to ISO format strings with timezone info
created_at = self._format_datetime_with_timezone(element["created_at"])
updated_at = self._format_datetime_with_timezone(element["updated_at"])
docs_by_status[element["id"]] = DocProcessingStatus( docs_by_status[element["id"]] = DocProcessingStatus(
content=element["content"], content=element["content"],
content_summary=element["content_summary"], content_summary=element["content_summary"],
content_length=element["content_length"], content_length=element["content_length"],
status=element["status"], status=element["status"],
created_at=element["created_at"], created_at=created_at,
updated_at=element["updated_at"], updated_at=updated_at,
chunks_count=element["chunks_count"], chunks_count=element["chunks_count"],
file_path=element["file_path"], file_path=element["file_path"],
chunks_list=chunks_list, chunks_list=chunks_list,
@ -1687,19 +1709,26 @@ class PGDocStatusStorage(DocStatusStorage):
return return
def parse_datetime(dt_str): def parse_datetime(dt_str):
"""Parse datetime and ensure it's stored as UTC time in database"""
if dt_str is None: if dt_str is None:
return None return None
if isinstance(dt_str, (datetime.date, datetime.datetime)): if isinstance(dt_str, (datetime.date, datetime.datetime)):
# If it's a datetime object without timezone info, remove timezone info # If it's a datetime object
if isinstance(dt_str, datetime.datetime): if isinstance(dt_str, datetime.datetime):
# Remove timezone info, return naive datetime object # If no timezone info, assume it's UTC
return dt_str.replace(tzinfo=None) if dt_str.tzinfo is None:
dt_str = dt_str.replace(tzinfo=timezone.utc)
# Convert to UTC and remove timezone info for storage
return dt_str.astimezone(timezone.utc).replace(tzinfo=None)
return dt_str return dt_str
try: try:
# Process ISO format string with timezone # Process ISO format string with timezone
dt = datetime.datetime.fromisoformat(dt_str) dt = datetime.datetime.fromisoformat(dt_str)
# Remove timezone info, return naive datetime object # If no timezone info, assume it's UTC
return dt.replace(tzinfo=None) if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
# Convert to UTC and remove timezone info for storage
return dt.astimezone(timezone.utc).replace(tzinfo=None)
except (ValueError, TypeError): except (ValueError, TypeError):
logger.warning(f"Unable to parse datetime string: {dt_str}") logger.warning(f"Unable to parse datetime string: {dt_str}")
return None return None
@ -3095,8 +3124,8 @@ TABLES = {
doc_name VARCHAR(1024), doc_name VARCHAR(1024),
content TEXT, content TEXT,
meta JSONB, meta JSONB,
create_time TIMESTAMP(0), create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0), update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_DOC_FULL_PK PRIMARY KEY (workspace, id) CONSTRAINT LIGHTRAG_DOC_FULL_PK PRIMARY KEY (workspace, id)
)""" )"""
}, },
@ -3110,8 +3139,8 @@ TABLES = {
content TEXT, content TEXT,
file_path TEXT NULL, file_path TEXT NULL,
llm_cache_list JSONB NULL DEFAULT '[]'::jsonb, llm_cache_list JSONB NULL DEFAULT '[]'::jsonb,
create_time TIMESTAMP(0) WITH TIME ZONE, create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) WITH TIME ZONE, update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id) CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
)""" )"""
}, },
@ -3125,8 +3154,8 @@ TABLES = {
content TEXT, content TEXT,
content_vector VECTOR, content_vector VECTOR,
file_path TEXT NULL, file_path TEXT NULL,
create_time TIMESTAMP(0) WITH TIME ZONE, create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) WITH TIME ZONE, update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_VDB_CHUNKS_PK PRIMARY KEY (workspace, id) CONSTRAINT LIGHTRAG_VDB_CHUNKS_PK PRIMARY KEY (workspace, id)
)""" )"""
}, },
@ -3137,8 +3166,8 @@ TABLES = {
entity_name VARCHAR(512), entity_name VARCHAR(512),
content TEXT, content TEXT,
content_vector VECTOR, content_vector VECTOR,
create_time TIMESTAMP(0) WITH TIME ZONE, create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) WITH TIME ZONE, update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
chunk_ids VARCHAR(255)[] NULL, chunk_ids VARCHAR(255)[] NULL,
file_path TEXT NULL, file_path TEXT NULL,
CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id) CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id)
@ -3152,8 +3181,8 @@ TABLES = {
target_id VARCHAR(512), target_id VARCHAR(512),
content TEXT, content TEXT,
content_vector VECTOR, content_vector VECTOR,
create_time TIMESTAMP(0) WITH TIME ZONE, create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) WITH TIME ZONE, update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
chunk_ids VARCHAR(255)[] NULL, chunk_ids VARCHAR(255)[] NULL,
file_path TEXT NULL, file_path TEXT NULL,
CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id) CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id)
@ -3168,7 +3197,7 @@ TABLES = {
return_value TEXT, return_value TEXT,
chunk_id VARCHAR(255) NULL, chunk_id VARCHAR(255) NULL,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP, update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_LLM_CACHE_PK PRIMARY KEY (workspace, mode, id) CONSTRAINT LIGHTRAG_LLM_CACHE_PK PRIMARY KEY (workspace, mode, id)
)""" )"""
}, },
@ -3183,8 +3212,8 @@ TABLES = {
status varchar(64) NULL, status varchar(64) NULL,
file_path TEXT NULL, file_path TEXT NULL,
chunks_list JSONB NULL DEFAULT '[]'::jsonb, chunks_list JSONB NULL DEFAULT '[]'::jsonb,
created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id) CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id)
)""" )"""
}, },
@ -3199,11 +3228,13 @@ SQL_TEMPLATES = {
"get_by_id_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content, "get_by_id_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
chunk_order_index, full_doc_id, file_path, chunk_order_index, full_doc_id, file_path,
COALESCE(llm_cache_list, '[]'::jsonb) as llm_cache_list, COALESCE(llm_cache_list, '[]'::jsonb) as llm_cache_list,
create_time, update_time EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id=$2 FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id=$2
""", """,
"get_by_id_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id, cache_type, "get_by_id_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id, cache_type,
create_time, update_time EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id=$2 FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id=$2
""", """,
"get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id
@ -3215,11 +3246,13 @@ SQL_TEMPLATES = {
"get_by_ids_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content, "get_by_ids_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
chunk_order_index, full_doc_id, file_path, chunk_order_index, full_doc_id, file_path,
COALESCE(llm_cache_list, '[]'::jsonb) as llm_cache_list, COALESCE(llm_cache_list, '[]'::jsonb) as llm_cache_list,
create_time, update_time EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id IN ({ids}) FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id IN ({ids})
""", """,
"get_by_ids_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id, cache_type, "get_by_ids_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id, cache_type,
create_time, update_time EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id IN ({ids}) FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id IN ({ids})
""", """,
"filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})", "filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})",