diff --git a/common/constants.py b/common/constants.py index 574786d00..4cced1266 100644 --- a/common/constants.py +++ b/common/constants.py @@ -148,6 +148,7 @@ class Storage(Enum): AWS_S3 = 4 OSS = 5 OPENDAL = 6 + GCS = 7 # environment # ENV_STRONG_TEST_COUNT = "STRONG_TEST_COUNT" diff --git a/common/settings.py b/common/settings.py index 81a2c19a4..d81567fd6 100644 --- a/common/settings.py +++ b/common/settings.py @@ -35,6 +35,7 @@ from rag.utils.minio_conn import RAGFlowMinio from rag.utils.opendal_conn import OpenDALStorage from rag.utils.s3_conn import RAGFlowS3 from rag.utils.oss_conn import RAGFlowOSS +from rag.utils.gcs_conn import RAGFlowGCS from rag.nlp import search @@ -109,6 +110,7 @@ MINIO = {} OB = {} OSS = {} OS = {} +GCS = {} DOC_MAXIMUM_SIZE: int = 128 * 1024 * 1024 DOC_BULK_SIZE: int = 4 @@ -151,7 +153,8 @@ class StorageFactory: Storage.AZURE_SAS: RAGFlowAzureSasBlob, Storage.AWS_S3: RAGFlowS3, Storage.OSS: RAGFlowOSS, - Storage.OPENDAL: OpenDALStorage + Storage.OPENDAL: OpenDALStorage, + Storage.GCS: RAGFlowGCS } @classmethod @@ -250,7 +253,7 @@ def init_settings(): else: raise Exception(f"Not supported doc engine: {DOC_ENGINE}") - global AZURE, S3, MINIO, OSS + global AZURE, S3, MINIO, OSS, GCS if STORAGE_IMPL_TYPE in ['AZURE_SPN', 'AZURE_SAS']: AZURE = get_base_config("azure", {}) elif STORAGE_IMPL_TYPE == 'AWS_S3': @@ -259,6 +262,8 @@ def init_settings(): MINIO = decrypt_database_config(name="minio") elif STORAGE_IMPL_TYPE == 'OSS': OSS = get_base_config("oss", {}) + elif STORAGE_IMPL_TYPE == 'GCS': + GCS = get_base_config("gcs", {}) global STORAGE_IMPL STORAGE_IMPL = StorageFactory.create(Storage[STORAGE_IMPL_TYPE]) diff --git a/pyproject.toml b/pyproject.toml index 341354111..cdb3b7657 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ dependencies = [ "flask-session==0.8.0", "google-search-results==2.4.2", "google-auth-oauthlib>=1.2.0,<2.0.0", + "google-cloud-storage>=2.10.0,<3.0.0", "groq==0.9.0", "hanziconv==0.3.2", "html-text==0.6.2", diff --git a/rag/utils/gcs_conn.py b/rag/utils/gcs_conn.py new file mode 100644 index 000000000..8c09914ce --- /dev/null +++ b/rag/utils/gcs_conn.py @@ -0,0 +1,430 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Google Cloud Storage (GCS) Backend for RAGFlow + +This module provides GCS support for RAGFlow's object storage layer. +It uses Application Default Credentials (ADC) for authentication and +implements a single-bucket architecture with logical prefix-based separation. + +Configuration: + STORAGE_IMPL=GCS + RAGFLOW__GCS__BUCKET=your-gcs-bucket-name + GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json (optional if using Workload Identity) + +IAM Requirements: + - Storage Object User (roles/storage.objectUser) + - Service Account Token Creator (roles/iam.serviceAccountTokenCreator) for presigned URLs +""" + +import logging +import time +from io import BytesIO +from datetime import timedelta +from google.cloud import storage +from google.cloud.exceptions import NotFound, GoogleCloudError +from common.decorator import singleton +from common import settings + + +@singleton +class RAGFlowGCS: + """ + Google Cloud Storage backend implementation for RAGFlow. + + This class implements the RAGFlow storage interface using GCS. + It uses a single GCS bucket with prefix-based logical separation + for different "buckets" (tenants, workspaces, etc.). + + Architecture: + - Single physical GCS bucket (configured in settings) + - Logical "buckets" mapped to prefixes: {logical_bucket}/{filename} + - Example: put("rag-data", "file.txt") -> gs://bucket/rag-data/file.txt + + Authentication: + Uses Application Default Credentials (ADC): + - GOOGLE_APPLICATION_CREDENTIALS environment variable + - Workload Identity (GKE/Cloud Run) + - gcloud CLI credentials + """ + + def __init__(self): + """Initialize GCS client and configuration.""" + self.client = None + self.bucket_name = None + self.bucket = None + self.__open__() + + def __open__(self): + """ + Initialize or reinitialize the GCS client connection. + + This method: + 1. Closes any existing connection + 2. Loads configuration from settings + 3. Creates a new GCS client using ADC + 4. Gets a reference to the configured bucket + """ + try: + if self.client: + self.__close__() + except Exception: + pass + + try: + # Load GCS configuration from settings + gcs_config = settings.GCS + self.bucket_name = gcs_config.get('bucket') + + if not self.bucket_name: + raise ValueError("GCS bucket name not configured in settings") + + # Initialize GCS client with Application Default Credentials + # This automatically uses: + # - GOOGLE_APPLICATION_CREDENTIALS env var + # - Workload Identity on GKE/Cloud Run + # - gcloud CLI credentials + self.client = storage.Client() + + # Get bucket reference + self.bucket = self.client.bucket(self.bucket_name) + + logging.info(f"GCS client initialized successfully for bucket: {self.bucket_name}") + + except Exception as e: + logging.exception(f"Failed to connect to GCS bucket {self.bucket_name}: {e}") + raise + + def __close__(self): + """Close the GCS client connection.""" + if self.client: + self.client.close() + self.client = None + self.bucket = None + + def _get_blob_name(self, logical_bucket, filename): + """ + Convert logical bucket + filename to GCS blob name (prefix). + + Args: + logical_bucket (str): Logical bucket name (used as prefix) + filename (str): File name within the logical bucket + + Returns: + str: Full blob path in format: {logical_bucket}/{filename} + """ + return f"{logical_bucket}/{filename}" + + def health(self): + """ + Health check: Verify GCS connection by writing a test object. + + Returns: + bool: True if health check passes + + Raises: + Exception: If health check fails + """ + logical_bucket = "txtxtxtxt1" + fnm = "txtxtxtxt1" + binary = b"_t@@@1" + + try: + blob_name = self._get_blob_name(logical_bucket, fnm) + blob = self.bucket.blob(blob_name) + blob.upload_from_string(binary) + logging.info("GCS health check passed") + return True + except Exception as e: + logging.exception(f"GCS health check failed: {e}") + raise + + def put(self, logical_bucket, fnm, binary, tenant_id=None): + """ + Upload an object to GCS. + + Args: + logical_bucket (str): Logical bucket name (used as prefix) + fnm (str): Filename + binary (bytes): File content as bytes + tenant_id (str, optional): Tenant ID for logging + + Returns: + True if successful + + Raises: + Exception: If upload fails after retries + """ + blob_name = self._get_blob_name(logical_bucket, fnm) + + for attempt in range(3): + try: + blob = self.bucket.blob(blob_name) + + if isinstance(binary, bytes): + blob.upload_from_string(binary) + elif isinstance(binary, BytesIO): + binary.seek(0) + blob.upload_from_file(binary) + else: + # Try to handle as file-like object + blob.upload_from_file(BytesIO(binary)) + + logging.debug(f"Successfully uploaded {blob_name}") + return True + + except Exception as e: + logging.warning(f"Failed to put {blob_name} (attempt {attempt + 1}/3): {e}") + if attempt < 2: # Don't reconnect on last attempt + self.__open__() + time.sleep(1) + else: + logging.exception(f"Failed to put {logical_bucket}/{fnm} after all retries") + raise + + return False + + def rm(self, logical_bucket, fnm, tenant_id=None): + """ + Delete an object from GCS. + + Args: + logical_bucket (str): Logical bucket name (used as prefix) + fnm (str): Filename + tenant_id (str, optional): Tenant ID for logging + """ + blob_name = self._get_blob_name(logical_bucket, fnm) + + try: + blob = self.bucket.blob(blob_name) + blob.delete() + logging.debug(f"Successfully deleted {blob_name}") + except NotFound: + logging.warning(f"Object not found for deletion: {blob_name}") + except Exception as e: + logging.exception(f"Failed to remove {logical_bucket}/{fnm}: {e}") + + def get(self, logical_bucket, filename, tenant_id=None): + """ + Download an object from GCS. + + Args: + logical_bucket (str): Logical bucket name (used as prefix) + filename (str): Filename + tenant_id (str, optional): Tenant ID for logging + + Returns: + bytes: File content, or None if not found + """ + blob_name = self._get_blob_name(logical_bucket, filename) + + for attempt in range(1): + try: + blob = self.bucket.blob(blob_name) + content = blob.download_as_bytes() + logging.debug(f"Successfully downloaded {blob_name}") + return content + + except NotFound: + logging.warning(f"Object not found: {blob_name}") + return None + except Exception as e: + logging.exception(f"Failed to get {logical_bucket}/{filename}: {e}") + if attempt < 0: # Retry logic (currently only 1 attempt) + self.__open__() + time.sleep(1) + + return None + + def obj_exist(self, logical_bucket, filename, tenant_id=None): + """ + Check if an object exists in GCS. + + Args: + logical_bucket (str): Logical bucket name (used as prefix) + filename (str): Filename + tenant_id (str, optional): Tenant ID for logging + + Returns: + bool: True if object exists, False otherwise + """ + blob_name = self._get_blob_name(logical_bucket, filename) + + try: + blob = self.bucket.blob(blob_name) + exists = blob.exists() + logging.debug(f"Object exists check for {blob_name}: {exists}") + return exists + except Exception as e: + logging.exception(f"Failed to check if object exists {logical_bucket}/{filename}: {e}") + return False + + def bucket_exists(self, logical_bucket): + """ + Check if the physical GCS bucket exists. + + Note: In this implementation, "logical buckets" are just prefixes, + so this always checks the physical bucket. + + Args: + logical_bucket (str): Logical bucket name (ignored, checks physical bucket) + + Returns: + bool: True if physical bucket exists + """ + try: + exists = self.bucket.exists() + logging.debug(f"Bucket exists check for {self.bucket_name}: {exists}") + return exists + except Exception as e: + logging.exception(f"Failed to check if bucket exists {self.bucket_name}: {e}") + return False + + def get_presigned_url(self, logical_bucket, fnm, expires, tenant_id=None): + """ + Generate a presigned URL for temporary access to an object. + + Args: + logical_bucket (str): Logical bucket name (used as prefix) + fnm (str): Filename + expires (int): Expiration time in seconds + tenant_id (str, optional): Tenant ID for logging + + Returns: + str: Presigned URL, or None if generation fails + + Note: + Requires Service Account Token Creator role for generating signed URLs + when running on GCP infrastructure. + """ + blob_name = self._get_blob_name(logical_bucket, fnm) + + for attempt in range(10): + try: + blob = self.bucket.blob(blob_name) + + # Generate signed URL with specified expiration + url = blob.generate_signed_url( + version="v4", + expiration=timedelta(seconds=expires), + method="GET" + ) + + logging.debug(f"Successfully generated presigned URL for {blob_name}") + return url + + except Exception as e: + logging.warning(f"Failed to generate presigned URL for {blob_name} (attempt {attempt + 1}/10): {e}") + if attempt < 9: # Don't reconnect on last attempt + self.__open__() + time.sleep(1) + else: + logging.exception(f"Failed to generate presigned URL for {logical_bucket}/{fnm} after all retries") + + return None + + def remove_bucket(self, logical_bucket): + """ + Remove all objects with a given prefix (logical bucket). + + Args: + logical_bucket (str): Logical bucket name (prefix to delete) + + Warning: + This deletes ALL objects under the prefix. Use with caution! + """ + try: + # List all blobs with the prefix + prefix = f"{logical_bucket}/" + blobs = list(self.bucket.list_blobs(prefix=prefix)) + + logging.info(f"Removing {len(blobs)} objects with prefix {prefix}") + + # Delete all blobs + for blob in blobs: + try: + blob.delete() + except Exception as e: + logging.error(f"Failed to delete {blob.name}: {e}") + + logging.info(f"Successfully removed logical bucket {logical_bucket}") + + except Exception as e: + logging.exception(f"Failed to remove logical bucket {logical_bucket}: {e}") + + def copy(self, src_bucket, src_path, dest_bucket, dest_path): + """ + Copy an object from one location to another. + + Args: + src_bucket (str): Source logical bucket + src_path (str): Source filename + dest_bucket (str): Destination logical bucket + dest_path (str): Destination filename + + Returns: + bool: True if copy succeeded, False otherwise + """ + try: + src_blob_name = self._get_blob_name(src_bucket, src_path) + dest_blob_name = self._get_blob_name(dest_bucket, dest_path) + + # Check if source exists + src_blob = self.bucket.blob(src_blob_name) + if not src_blob.exists(): + logging.error(f"Source object not found: {src_blob_name}") + return False + + # Copy the blob + dest_blob = self.bucket.blob(dest_blob_name) + self.bucket.copy_blob(src_blob, self.bucket, dest_blob_name) + + logging.info(f"Successfully copied {src_blob_name} to {dest_blob_name}") + return True + + except Exception as e: + logging.exception(f"Failed to copy {src_bucket}/{src_path} -> {dest_bucket}/{dest_path}: {e}") + return False + + def move(self, src_bucket, src_path, dest_bucket, dest_path): + """ + Move an object from one location to another (copy + delete). + + Args: + src_bucket (str): Source logical bucket + src_path (str): Source filename + dest_bucket (str): Destination logical bucket + dest_path (str): Destination filename + + Returns: + bool: True if move succeeded, False otherwise + """ + try: + # Copy the object + if self.copy(src_bucket, src_path, dest_bucket, dest_path): + # Delete the source + self.rm(src_bucket, src_path) + logging.info(f"Successfully moved {src_bucket}/{src_path} to {dest_bucket}/{dest_path}") + return True + else: + logging.error(f"Copy failed, move aborted: {src_bucket}/{src_path}") + return False + + except Exception as e: + logging.exception(f"Failed to move {src_bucket}/{src_path} -> {dest_bucket}/{dest_path}: {e}") + return False +