Source code for fedn.network.storage.s3.repository

import datetime
import uuid

from fedn.common.log_config import logger
from fedn.network.storage.s3.miniorepository import MINIORepository


[docs] class Repository: """Interface for storing model objects and compute packages in S3 compatible storage.""" def __init__(self, config, init_buckets=True): self.model_bucket = config["storage_bucket"] self.context_bucket = config["context_bucket"] try: self.prediction_bucket = config["prediction_bucket"] except KeyError: self.prediction_bucket = "fedn-prediction" # TODO: Make a plug-in solution self.client = MINIORepository(config) if init_buckets: self.client.create_bucket(self.context_bucket) self.client.create_bucket(self.model_bucket) self.client.create_bucket(self.prediction_bucket)
[docs] def get_model(self, model_id): """Retrieve a model with id model_id. :param model_id: Unique identifier for model to retrive. :return: The model object """ logger.info("Client {} trying to get model with id: {}".format(self.client.name, model_id)) return self.client.get_artifact(model_id, self.model_bucket)
[docs] def get_model_stream(self, model_id): """Retrieve a stream handle to model with id model_id. :param model_id: :return: Handle to model object """ logger.info("Client {} trying to get model with id: {}".format(self.client.name, model_id)) return self.client.get_artifact_stream(model_id, self.model_bucket)
[docs] def set_model(self, model, is_file=True): """Upload model object. :param model: The model object :type model: BytesIO or str file name. :param is_file: True if model is a file name, else False :return: id for the uploaded object (str) """ model_id = uuid.uuid4() try: self.client.set_artifact(str(model_id), model, bucket=self.model_bucket, is_file=is_file) except Exception: logger.error("Failed to upload model with ID {} to repository.".format(model_id)) raise return str(model_id)
[docs] def delete_model(self, model_id): """Delete model. :param model_id: The id of the model to delete :type model_id: str """ try: self.client.delete_artifact(model_id, bucket=self.model_bucket) except Exception: logger.error("Failed to delete model {} repository.".format(model_id)) raise
[docs] def set_compute_package(self, name, compute_package, is_file=True): """Upload compute package. :param name: The name of the compute package. :type name: str :param compute_package: The compute package :type compute_pacakge: BytesIO or str file name. :param is_file: True if model is a file name, else False """ try: self.client.set_artifact(str(name), compute_package, bucket=self.context_bucket, is_file=is_file) except Exception: logger.error("Failed to write compute_package to repository.") raise
[docs] def get_compute_package(self, compute_package): """Retrieve compute package from object store. :param compute_package: The name of the compute package. :type compute_pacakge: str :return: Compute package. """ try: data = self.client.get_artifact(compute_package, bucket=self.context_bucket) except Exception: logger.error("Failed to get compute_package from repository.") raise return data
[docs] def delete_compute_package(self, compute_package): """Delete a compute package from storage. :param compute_package: The name of the compute_package :type compute_package: str """ try: self.client.delete_artifact(compute_package, bucket=[self.context_bucket]) except Exception: logger.error("Failed to delete compute_package from repository.") raise
[docs] def presigned_put_url(self, bucket: str, object_name: str, expires: datetime.timedelta = datetime.timedelta(hours=1)): """Generate a presigned URL for an upload object request. :param bucket: The bucket name :type bucket: str :param object_name: The object name :type object_name: str :param expires: The time the URL is valid :type expires: datetime.timedelta :return: The URL :rtype: str """ return self.client.client.presigned_put_object(bucket, object_name, expires)
[docs] def presigned_get_url(self, bucket: str, object_name: str, expires: datetime.timedelta = datetime.timedelta(hours=1)) -> str: """Generate a presigned URL for a download object request. :param bucket: The bucket name :type bucket: str :param object_name: The object name :type object_name: str :param expires: The time the URL is valid :type expires: datetime.timedelta :return: The URL :rtype: str """ return self.client.client.presigned_get_object(bucket, object_name, expires)