Source code for fedn.network.storage.s3.miniorepository

import io

from minio import Minio
from minio.error import InvalidResponseError
from urllib3.poolmanager import PoolManager

from fedn.common.log_config import logger
from fedn.network.storage.s3.base import RepositoryBase


[docs] class MINIORepository(RepositoryBase): """Class implementing Repository for MinIO.""" client = None def __init__(self, config): """Initialize object. :param config: Dictionary containing configuration for credentials and bucket names. :type config: dict """ super().__init__() self.name = "MINIORepository" if config["storage_secure_mode"]: manager = PoolManager(num_pools=100, cert_reqs="CERT_NONE", assert_hostname=False) self.client = Minio( "{0}:{1}".format(config["storage_hostname"], config["storage_port"]), access_key=config["storage_access_key"], secret_key=config["storage_secret_key"], secure=config["storage_secure_mode"], http_client=manager, ) else: self.client = Minio( "{0}:{1}".format(config["storage_hostname"], config["storage_port"]), access_key=config["storage_access_key"], secret_key=config["storage_secret_key"], secure=config["storage_secure_mode"], )
[docs] def set_artifact(self, instance_name, instance, bucket, is_file=False): if is_file: self.client.fput_object(bucket, instance_name, instance) else: try: self.client.put_object(bucket, instance_name, io.BytesIO(instance), len(instance)) except Exception as e: raise Exception("Could not load data into bytes {}".format(e)) return True
[docs] def get_artifact(self, instance_name, bucket): try: data = self.client.get_object(bucket, instance_name) return data.read() except Exception as e: raise Exception("Could not fetch data from bucket, {}".format(e)) finally: data.close() data.release_conn()
[docs] def get_artifact_stream(self, instance_name, bucket): try: data = self.client.get_object(bucket, instance_name) return data except Exception as e: raise Exception("Could not fetch data from bucket, {}".format(e))
[docs] def list_artifacts(self, bucket): """List all objects in bucket. :param bucket: Name of the bucket :type bucket: str :return: A list of object names """ objects = [] try: objs = self.client.list_objects(bucket) for obj in objs: objects.append(obj.object_name) except Exception: raise Exception("Could not list models in bucket {}".format(bucket)) return objects
[docs] def delete_artifact(self, instance_name, bucket): """Delete object with name instance_name from buckets. :param instance_name: The object name :param bucket: Buckets to delete from :type bucket: str """ try: self.client.remove_object(bucket, instance_name) except InvalidResponseError as err: logger.error("Could not delete artifact: {0} err: {1}".format(instance_name, err)) pass
[docs] def create_bucket(self, bucket_name): """Create a new bucket. If bucket exists, do nothing. :param bucket_name: The name of the bucket :type bucket_name: str """ found = self.client.bucket_exists(bucket_name) if not found: try: self.client.make_bucket(bucket_name) except InvalidResponseError: raise