# # Copyright 2025 The InfiniFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import time from minio import Minio from minio.error import S3Error from io import BytesIO from common.decorator import singleton from common import settings @singleton class RAGFlowMinio: def __init__(self): self.conn = None self.bucket = settings.MINIO.get('bucket', None) self.prefix_path = settings.MINIO.get('prefix_path', None) self.__open__() @staticmethod def use_default_bucket(method): def wrapper(self, bucket, *args, **kwargs): # If there is a default bucket, use the default bucket # but preserve the original bucket identifier so it can be # used as a path prefix inside the physical/default bucket. original_bucket = bucket actual_bucket = self.bucket if self.bucket else bucket if self.bucket: # pass original identifier forward for use by other decorators kwargs['_orig_bucket'] = original_bucket return method(self, actual_bucket, *args, **kwargs) return wrapper @staticmethod def use_prefix_path(method): def wrapper(self, bucket, fnm, *args, **kwargs): # If a default MINIO bucket is configured, the use_default_bucket # decorator will have replaced the `bucket` arg with the physical # bucket name and forwarded the original identifier as `_orig_bucket`. # Prefer that original identifier when constructing the key path so # objects are stored under //... orig_bucket = kwargs.pop('_orig_bucket', None) if self.prefix_path: # If a prefix_path is configured, include it and then the identifier if orig_bucket: fnm = f"{self.prefix_path}/{orig_bucket}/{fnm}" else: fnm = f"{self.prefix_path}/{fnm}" else: # No prefix_path configured. If orig_bucket exists and the # physical bucket equals configured default, use orig_bucket as a path. if orig_bucket and bucket == self.bucket: fnm = f"{orig_bucket}/{fnm}" return method(self, bucket, fnm, *args, **kwargs) return wrapper def __open__(self): try: if self.conn: self.__close__() except Exception: pass try: self.conn = Minio(settings.MINIO["host"], access_key=settings.MINIO["user"], secret_key=settings.MINIO["password"], secure=False ) except Exception: logging.exception( "Fail to connect %s " % settings.MINIO["host"]) def __close__(self): del self.conn self.conn = None def health(self): bucket = self.bucket if self.bucket else "ragflow-bucket" fnm = "_health_check" if self.prefix_path: fnm = f"{self.prefix_path}/{fnm}" binary = b"_t@@@1" # Don't try to create bucket - it should already exist # if not self.conn.bucket_exists(bucket): # self.conn.make_bucket(bucket) r = self.conn.put_object(bucket, fnm, BytesIO(binary), len(binary) ) return r @use_default_bucket @use_prefix_path def put(self, bucket, fnm, binary): for _ in range(3): try: # Note: bucket must already exist - we don't have permission to create buckets # if not self.conn.bucket_exists(bucket): # self.conn.make_bucket(bucket) r = self.conn.put_object(bucket, fnm, BytesIO(binary), len(binary) ) return r except Exception: logging.exception(f"Fail to put {bucket}/{fnm}:") self.__open__() time.sleep(1) @use_default_bucket @use_prefix_path def rm(self, bucket, fnm): try: self.conn.remove_object(bucket, fnm) except Exception: logging.exception(f"Fail to remove {bucket}/{fnm}:") @use_default_bucket @use_prefix_path def get(self, bucket, filename): for _ in range(1): try: r = self.conn.get_object(bucket, filename) return r.read() except Exception: logging.exception(f"Fail to get {bucket}/{filename}") self.__open__() time.sleep(1) return @use_default_bucket @use_prefix_path def obj_exist(self, bucket, filename): try: if not self.conn.bucket_exists(bucket): return False if self.conn.stat_object(bucket, filename): return True else: return False except S3Error as e: if e.code in ["NoSuchKey", "NoSuchBucket", "ResourceNotFound"]: return False except Exception: logging.exception(f"obj_exist {bucket}/{filename} got exception") return False @use_default_bucket def bucket_exists(self, bucket): try: if not self.conn.bucket_exists(bucket): return False else: return True except S3Error as e: if e.code in ["NoSuchKey", "NoSuchBucket", "ResourceNotFound"]: return False except Exception: logging.exception(f"bucket_exist {bucket} got exception") return False @use_default_bucket @use_prefix_path def get_presigned_url(self, bucket, fnm, expires): for _ in range(10): try: return self.conn.get_presigned_url("GET", bucket, fnm, expires) except Exception: logging.exception(f"Fail to get_presigned {bucket}/{fnm}:") self.__open__() time.sleep(1) return @use_default_bucket def remove_bucket(self, bucket): try: if self.conn.bucket_exists(bucket): objects_to_delete = self.conn.list_objects(bucket, recursive=True) for obj in objects_to_delete: self.conn.remove_object(bucket, obj.object_name) self.conn.remove_bucket(bucket) except Exception: logging.exception(f"Fail to remove bucket {bucket}")