Source code for fedn.network.combiner.modelservice

import os
import tempfile
from io import BytesIO

import numpy as np

import fedn.network.grpc.fedn_pb2 as fedn
import fedn.network.grpc.fedn_pb2_grpc as rpc
from fedn.common.log_config import logger
from fedn.network.storage.models.tempmodelstorage import TempModelStorage

CHUNK_SIZE = 1024 * 1024


[docs] def upload_request_generator(mdl, id): """Generator function for model upload requests. :param mdl: The model update object. :type mdl: BytesIO :return: A model update request. :rtype: fedn.ModelRequest """ while True: b = mdl.read(CHUNK_SIZE) if b: result = fedn.ModelRequest(data=b, id=id, status=fedn.ModelStatus.IN_PROGRESS) else: result = fedn.ModelRequest(id=id, data=None, status=fedn.ModelStatus.OK) yield result if not b: break
[docs] def bytesIO_request_generator(mdl, request_function, args): """Generator function for model upload requests. :param mdl: The model update object. :type mdl: BytesIO :param request_function: Function for sending requests. :type request_function: Function :param args: request arguments, excluding data argument. :type args: dict :return: Yields grpc request for streaming. :rtype: grpc request generator. """ while True: b = mdl.read(CHUNK_SIZE) if b: result = request_function(data=b, **args) else: result = request_function(data=None, **args) yield result if not b: break
[docs] def model_as_bytesIO(model, helper=None): if isinstance(model, list): bt = BytesIO() model_dict = {str(i): w for i, w in enumerate(model)} np.savez_compressed(bt, **model_dict) bt.seek(0) return bt if not isinstance(model, BytesIO): bt = BytesIO() written_total = 0 for d in model.stream(32 * 1024): written = bt.write(d) written_total += written else: bt = model bt.seek(0, 0) return bt
[docs] def unpack_model(request_iterator, helper): """Unpack an incoming model sent in chunks from a request iterator. :param request_iterator: A streaming iterator from an gRPC service. :return: The reconstructed model parameters. """ model_buffer = BytesIO() try: for request in request_iterator: if request.data: model_buffer.write(request.data) except MemoryError as e: logger.error(f"Memory error occured when loading model, reach out to the FEDn team if you need a solution to this. {e}") raise except Exception as e: logger.error(f"Exception occured during model loading: {e}") raise model_buffer.seek(0) model_bytes = model_buffer.getvalue() return load_model_from_bytes(model_bytes, helper), request
[docs] def get_tmp_path(): """Return a temporary output path compatible with save_model, load_model.""" fd, path = tempfile.mkstemp() os.close(fd) return path
[docs] def load_model_from_bytes(model_bytes, helper): """Load a model from a bytes object. :param model_bytesio: A bytes object containing the model. :type model_bytes: :class:`bytes` :param helper: The helper object for the model. :type helper: :class:`fedn.utils.helperbase.HelperBase` :return: The model object. :rtype: return type of helper.load """ path = get_tmp_path() with open(path, "wb") as fh: fh.write(model_bytes) fh.flush() model = helper.load(path) os.unlink(path) return model
[docs] def serialize_model_to_BytesIO(model, helper): """Serialize a model to a BytesIO object. :param model: The model object. :type model: return type of helper.load :param helper: The helper object for the model. :type helper: :class:`fedn.utils.helperbase.HelperBase` :return: A BytesIO object containing the model. :rtype: :class:`io.BytesIO` """ outfile_name = helper.save(model) a = BytesIO() a.seek(0, 0) with open(outfile_name, "rb") as f: a.write(f.read()) a.seek(0) os.unlink(outfile_name) return a
[docs] class ModelService(rpc.ModelServiceServicer): """Service for handling download and upload of models to the server.""" def __init__(self): self.temp_model_storage = TempModelStorage()
[docs] def exist(self, model_id): """Check if a model exists on the server. :param model_id: The model id. :return: True if the model exists, else False. """ return self.temp_model_storage.exist(model_id)
[docs] def get_model(self, id): """Download model with id 'id' from server. :param id: The model id. :type id: str :return: A BytesIO object containing the model. :rtype: :class:`io.BytesIO`, None if model does not exist. """ data = BytesIO() data.seek(0, 0) parts = self.Download(fedn.ModelRequest(id=id), self) for part in parts: if part.status == fedn.ModelStatus.IN_PROGRESS: data.write(part.data) if part.status == fedn.ModelStatus.OK: return data if part.status == fedn.ModelStatus.FAILED: return None
[docs] def set_model(self, model, id): """Upload model to server. :param model: A model object (BytesIO) :type model: :class:`io.BytesIO` :param id: The model id. :type id: str """ bt = model_as_bytesIO(model) # TODO: Check result _ = self.Upload(upload_request_generator(bt, id), self)
# Model Service
[docs] def Upload(self, request_iterator, context): """RPC endpoints for uploading a model. :param request_iterator: The model request iterator. :type request_iterator: :class:`fedn.network.grpc.fedn_pb2.ModelRequest` :param context: The context object (unused) :type context: :class:`grpc._server._Context` :return: A model response object. :rtype: :class:`fedn.network.grpc.fedn_pb2.ModelResponse` """ logger.debug("grpc.ModelService.Upload: Called") result = None for request in request_iterator: if request.status == fedn.ModelStatus.IN_PROGRESS: self.temp_model_storage.get_ptr(request.id).write(request.data) self.temp_model_storage.set_model_metadata(request.id, fedn.ModelStatus.IN_PROGRESS) if request.status == fedn.ModelStatus.OK and not request.data: result = fedn.ModelResponse(id=request.id, status=fedn.ModelStatus.OK, message="Got model successfully.") # self.temp_model_storage_metadata.update({request.id: fedn.ModelStatus.OK}) self.temp_model_storage.set_model_metadata(request.id, fedn.ModelStatus.OK) self.temp_model_storage.get_ptr(request.id).flush() self.temp_model_storage.get_ptr(request.id).close() return result
[docs] def Download(self, request, context): """RPC endpoints for downloading a model. :param request: The model request object. :type request: :class:`fedn.network.grpc.fedn_pb2.ModelRequest` :param context: The context object (unused) :type context: :class:`grpc._server._Context` :return: A model response iterator. :rtype: :class:`fedn.network.grpc.fedn_pb2.ModelResponse` """ logger.info(f"grpc.ModelService.Download: {request.sender.role}:{request.sender.client_id} requested model {request.id}") try: status = self.temp_model_storage.get_model_metadata(request.id) if status != fedn.ModelStatus.OK: logger.error(f"model file is not ready: {request.id}, status: {status}") yield fedn.ModelResponse(id=request.id, data=None, status=status) except Exception: logger.error("Error file does not exist: {}".format(request.id)) yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.FAILED) try: obj = self.temp_model_storage.get(request.id) if obj is None: raise Exception(f"File not found: {request.id}") with obj as f: while True: piece = f.read(CHUNK_SIZE) if len(piece) == 0: yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.OK) return yield fedn.ModelResponse(id=request.id, data=piece, status=fedn.ModelStatus.IN_PROGRESS) except Exception as e: logger.error("Downloading went wrong: {} {}".format(request.id, e)) yield fedn.ModelResponse(id=request.id, data=None, status=fedn.ModelStatus.FAILED)