feat(gcs): Add support for Google Cloud Storage (GCS) integration
This commit is contained in:
parent
3c224c817b
commit
26431cb851
4 changed files with 217 additions and 2 deletions
|
|
@ -148,6 +148,7 @@ class Storage(Enum):
|
|||
AWS_S3 = 4
|
||||
OSS = 5
|
||||
OPENDAL = 6
|
||||
GCS = 7
|
||||
|
||||
# environment
|
||||
# ENV_STRONG_TEST_COUNT = "STRONG_TEST_COUNT"
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ import rag.utils.ob_conn
|
|||
import rag.utils.opensearch_conn
|
||||
from rag.utils.azure_sas_conn import RAGFlowAzureSasBlob
|
||||
from rag.utils.azure_spn_conn import RAGFlowAzureSpnBlob
|
||||
from rag.utils.gcs_conn import RAGFlowGCS
|
||||
from rag.utils.minio_conn import RAGFlowMinio
|
||||
from rag.utils.opendal_conn import OpenDALStorage
|
||||
from rag.utils.s3_conn import RAGFlowS3
|
||||
|
|
@ -109,6 +110,7 @@ MINIO = {}
|
|||
OB = {}
|
||||
OSS = {}
|
||||
OS = {}
|
||||
GCS = {}
|
||||
|
||||
DOC_MAXIMUM_SIZE: int = 128 * 1024 * 1024
|
||||
DOC_BULK_SIZE: int = 4
|
||||
|
|
@ -151,7 +153,8 @@ class StorageFactory:
|
|||
Storage.AZURE_SAS: RAGFlowAzureSasBlob,
|
||||
Storage.AWS_S3: RAGFlowS3,
|
||||
Storage.OSS: RAGFlowOSS,
|
||||
Storage.OPENDAL: OpenDALStorage
|
||||
Storage.OPENDAL: OpenDALStorage,
|
||||
Storage.GCS: RAGFlowGCS,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
|
|
@ -250,7 +253,7 @@ def init_settings():
|
|||
else:
|
||||
raise Exception(f"Not supported doc engine: {DOC_ENGINE}")
|
||||
|
||||
global AZURE, S3, MINIO, OSS
|
||||
global AZURE, S3, MINIO, OSS, GCS
|
||||
if STORAGE_IMPL_TYPE in ['AZURE_SPN', 'AZURE_SAS']:
|
||||
AZURE = get_base_config("azure", {})
|
||||
elif STORAGE_IMPL_TYPE == 'AWS_S3':
|
||||
|
|
@ -259,6 +262,8 @@ def init_settings():
|
|||
MINIO = decrypt_database_config(name="minio")
|
||||
elif STORAGE_IMPL_TYPE == 'OSS':
|
||||
OSS = get_base_config("oss", {})
|
||||
elif STORAGE_IMPL_TYPE == 'GCS':
|
||||
GCS = get_base_config("gcs", {})
|
||||
|
||||
global STORAGE_IMPL
|
||||
STORAGE_IMPL = StorageFactory.create(Storage[STORAGE_IMPL_TYPE])
|
||||
|
|
|
|||
|
|
@ -60,6 +60,8 @@ user_default_llm:
|
|||
# access_key: 'access_key'
|
||||
# secret_key: 'secret_key'
|
||||
# region: 'region'
|
||||
#gcs:
|
||||
# bucket: 'bridgtl-edm-d-bucket-ragflow'
|
||||
# oss:
|
||||
# access_key: 'access_key'
|
||||
# secret_key: 'secret_key'
|
||||
|
|
|
|||
207
rag/utils/gcs_conn.py
Normal file
207
rag/utils/gcs_conn.py
Normal file
|
|
@ -0,0 +1,207 @@
|
|||
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
import logging
|
||||
import time
|
||||
import datetime
|
||||
from io import BytesIO
|
||||
from google.cloud import storage
|
||||
from google.api_core.exceptions import NotFound
|
||||
from common.decorator import singleton
|
||||
from common import settings
|
||||
|
||||
|
||||
@singleton
|
||||
class RAGFlowGCS:
|
||||
def __init__(self):
|
||||
self.client = None
|
||||
self.bucket_name = None
|
||||
self.__open__()
|
||||
|
||||
def __open__(self):
|
||||
try:
|
||||
if self.client:
|
||||
self.client = None
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
self.client = storage.Client()
|
||||
self.bucket_name = settings.GCS["bucket"]
|
||||
except Exception:
|
||||
logging.exception("Fail to connect to GCS")
|
||||
|
||||
def _get_blob_path(self, folder, filename):
|
||||
"""Helper to construct the path: folder/filename"""
|
||||
if not folder:
|
||||
return filename
|
||||
return f"{folder}/{filename}"
|
||||
|
||||
def health(self):
|
||||
folder, fnm, binary = "ragflow-health", "health_check", b"_t@@@1"
|
||||
try:
|
||||
bucket_obj = self.client.bucket(self.bucket_name)
|
||||
if not bucket_obj.exists():
|
||||
logging.error(f"Health check failed: Main bucket '{self.bucket_name}' does not exist.")
|
||||
return False
|
||||
|
||||
blob_path = self._get_blob_path(folder, fnm)
|
||||
blob = bucket_obj.blob(blob_path)
|
||||
blob.upload_from_file(BytesIO(binary), content_type='application/octet-stream')
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.exception(f"Health check failed: {e}")
|
||||
return False
|
||||
|
||||
def put(self, bucket, fnm, binary, tenant_id=None):
|
||||
# RENAMED PARAMETER: bucket_name -> bucket (to match interface)
|
||||
for _ in range(3):
|
||||
try:
|
||||
bucket_obj = self.client.bucket(self.bucket_name)
|
||||
blob_path = self._get_blob_path(bucket, fnm)
|
||||
blob = bucket_obj.blob(blob_path)
|
||||
|
||||
blob.upload_from_file(BytesIO(binary), content_type='application/octet-stream')
|
||||
return True
|
||||
except NotFound:
|
||||
logging.error(f"Fail to put: Main bucket {self.bucket_name} does not exist.")
|
||||
return False
|
||||
except Exception:
|
||||
logging.exception(f"Fail to put {bucket}/{fnm}:")
|
||||
self.__open__()
|
||||
time.sleep(1)
|
||||
return False
|
||||
|
||||
def rm(self, bucket, fnm, tenant_id=None):
|
||||
# RENAMED PARAMETER: bucket_name -> bucket
|
||||
try:
|
||||
bucket_obj = self.client.bucket(self.bucket_name)
|
||||
blob_path = self._get_blob_path(bucket, fnm)
|
||||
blob = bucket_obj.blob(blob_path)
|
||||
blob.delete()
|
||||
except NotFound:
|
||||
pass
|
||||
except Exception:
|
||||
logging.exception(f"Fail to remove {bucket}/{fnm}:")
|
||||
|
||||
def get(self, bucket, filename, tenant_id=None):
|
||||
# RENAMED PARAMETER: bucket_name -> bucket
|
||||
for _ in range(1):
|
||||
try:
|
||||
bucket_obj = self.client.bucket(self.bucket_name)
|
||||
blob_path = self._get_blob_path(bucket, filename)
|
||||
blob = bucket_obj.blob(blob_path)
|
||||
return blob.download_as_bytes()
|
||||
except NotFound:
|
||||
logging.warning(f"File not found {bucket}/{filename} in {self.bucket_name}")
|
||||
return None
|
||||
except Exception:
|
||||
logging.exception(f"Fail to get {bucket}/{filename}")
|
||||
self.__open__()
|
||||
time.sleep(1)
|
||||
return None
|
||||
|
||||
def obj_exist(self, bucket, filename, tenant_id=None):
|
||||
# RENAMED PARAMETER: bucket_name -> bucket
|
||||
try:
|
||||
bucket_obj = self.client.bucket(self.bucket_name)
|
||||
blob_path = self._get_blob_path(bucket, filename)
|
||||
blob = bucket_obj.blob(blob_path)
|
||||
return blob.exists()
|
||||
except Exception:
|
||||
logging.exception(f"obj_exist {bucket}/{filename} got exception")
|
||||
return False
|
||||
|
||||
def bucket_exists(self, bucket):
|
||||
# RENAMED PARAMETER: bucket_name -> bucket
|
||||
try:
|
||||
bucket_obj = self.client.bucket(self.bucket_name)
|
||||
return bucket_obj.exists()
|
||||
except Exception:
|
||||
logging.exception(f"bucket_exist check for {self.bucket_name} got exception")
|
||||
return False
|
||||
|
||||
def get_presigned_url(self, bucket, fnm, expires, tenant_id=None):
|
||||
# RENAMED PARAMETER: bucket_name -> bucket
|
||||
for _ in range(10):
|
||||
try:
|
||||
bucket_obj = self.client.bucket(self.bucket_name)
|
||||
blob_path = self._get_blob_path(bucket, fnm)
|
||||
blob = bucket_obj.blob(blob_path)
|
||||
|
||||
expiration = expires
|
||||
if isinstance(expires, int):
|
||||
expiration = datetime.timedelta(seconds=expires)
|
||||
|
||||
url = blob.generate_signed_url(
|
||||
version="v4",
|
||||
expiration=expiration,
|
||||
method="GET"
|
||||
)
|
||||
return url
|
||||
except Exception:
|
||||
logging.exception(f"Fail to get_presigned {bucket}/{fnm}:")
|
||||
self.__open__()
|
||||
time.sleep(1)
|
||||
return None
|
||||
|
||||
def remove_bucket(self, bucket):
|
||||
# RENAMED PARAMETER: bucket_name -> bucket
|
||||
try:
|
||||
bucket_obj = self.client.bucket(self.bucket_name)
|
||||
prefix = f"{bucket}/"
|
||||
|
||||
blobs = list(self.client.list_blobs(self.bucket_name, prefix=prefix))
|
||||
|
||||
if blobs:
|
||||
bucket_obj.delete_blobs(blobs)
|
||||
except Exception:
|
||||
logging.exception(f"Fail to remove virtual bucket (folder) {bucket}")
|
||||
|
||||
def copy(self, src_bucket, src_path, dest_bucket, dest_path):
|
||||
# RENAMED PARAMETERS to match original interface
|
||||
try:
|
||||
bucket_obj = self.client.bucket(self.bucket_name)
|
||||
|
||||
src_blob_path = self._get_blob_path(src_bucket, src_path)
|
||||
dest_blob_path = self._get_blob_path(dest_bucket, dest_path)
|
||||
|
||||
src_blob = bucket_obj.blob(src_blob_path)
|
||||
|
||||
if not src_blob.exists():
|
||||
logging.error(f"Source object not found: {src_blob_path}")
|
||||
return False
|
||||
|
||||
bucket_obj.copy_blob(src_blob, bucket_obj, dest_blob_path)
|
||||
return True
|
||||
|
||||
except NotFound:
|
||||
logging.error(f"Copy failed: Main bucket {self.bucket_name} does not exist.")
|
||||
return False
|
||||
except Exception:
|
||||
logging.exception(f"Fail to copy {src_bucket}/{src_path} -> {dest_bucket}/{dest_path}")
|
||||
return False
|
||||
|
||||
def move(self, src_bucket, src_path, dest_bucket, dest_path):
|
||||
try:
|
||||
if self.copy(src_bucket, src_path, dest_bucket, dest_path):
|
||||
self.rm(src_bucket, src_path)
|
||||
return True
|
||||
else:
|
||||
logging.error(f"Copy failed, move aborted: {src_bucket}/{src_path}")
|
||||
return False
|
||||
except Exception:
|
||||
logging.exception(f"Fail to move {src_bucket}/{src_path} -> {dest_bucket}/{dest_path}")
|
||||
return False
|
||||
Loading…
Add table
Reference in a new issue