Source code for fedn.network.combiner.aggregators.fedopt

import math

from fedn.common.exceptions import InvalidParameterError
from fedn.common.log_config import logger
from fedn.network.combiner.aggregators.aggregatorbase import AggregatorBase


[docs] class Aggregator(AggregatorBase): """Federated Optimization (FedOpt) aggregator. Implmentation following: https://arxiv.org/pdf/2003.00295.pdf This aggregator computes pseudo gradients by subtracting the model update from the global model weights from the previous round. A server-side scheme is then applied, currenty supported schemes are "adam", "yogi", "adagrad". :param control: A handle to the :class: `fedn.network.combiner.updatehandler.UpdateHandler` :type control: class: `fedn.network.combiner.updatehandler.UpdateHandler` """ def __init__(self, update_handler): super().__init__(update_handler) self.name = "fedopt" # To store momentum self.v = None self.m = None
[docs] def combine_models(self, helper=None, delete_models=True, parameters=None): """Compute pseudo gradients using model updates in the queue. :param helper: An instance of :class: `fedn.utils.helpers.helpers.HelperBase`, ML framework specific helper, defaults to None :type helper: class: `fedn.utils.helpers.helpers.HelperBase`, optional :param time_window: The time window for model aggregation, defaults to 180 :type time_window: int, optional :param max_nr_models: The maximum number of updates aggregated, defaults to 100 :type max_nr_models: int, optional :param delete_models: Delete models from storage after aggregation, defaults to True :type delete_models: bool, optional :param parameters: Aggregator hyperparameters. :type parameters: `fedn.utils.parmeters.Parameters`, optional :return: The global model and metadata :rtype: tuple """ data = {} data["time_model_load"] = 0.0 data["time_model_aggregation"] = 0.0 # Define parameter schema parameter_schema = { "serveropt": str, "learning_rate": float, "beta1": float, "beta2": float, "tau": float, } try: parameters.validate(parameter_schema) except InvalidParameterError as e: logger.error("Aggregator {} recieved invalid parameters. Reason {}".format(self.name, e)) return None, data # Default hyperparameters. Note that these may need fine tuning. default_parameters = { "serveropt": "adam", "learning_rate": 1e-3, "beta1": 0.9, "beta2": 0.99, "tau": 1e-4, } # Validate parameters if parameters: try: parameters.validate(parameter_schema) except InvalidParameterError as e: logger.error("Aggregator {} recieved invalid parameters. Reason {}".format(self.name, e)) return None, data else: logger.info("Aggregator {} using default parameteres.", format(self.name)) parameters = self.default_parameters # Override missing paramters with defaults for key, value in default_parameters.items(): if key not in parameters: parameters[key] = value model = None nr_aggregated_models = 0 total_examples = 0 logger.info("AGGREGATOR({}): Aggregating model updates... ".format(self.name)) while not self.update_handler.model_updates.empty(): try: logger.info("AGGREGATOR({}): Getting next model update from queue.".format(self.name)) model_update = self.update_handler.next_model_update() # Load model paratmeters and metadata model_next, metadata = self.update_handler.load_model_update(model_update, helper) logger.info("AGGREGATOR({}): Processing model update {}".format(self.name, model_update.model_update_id)) # Increment total number of examples total_examples += metadata["num_examples"] if nr_aggregated_models == 0: model_old = self.update_handler.load_model(helper, model_update.model_id) pseudo_gradient = helper.subtract(model_next, model_old) else: pseudo_gradient_next = helper.subtract(model_next, model_old) pseudo_gradient = helper.increment_average(pseudo_gradient, pseudo_gradient_next, metadata["num_examples"], total_examples) nr_aggregated_models += 1 # Delete model from storage if delete_models: self.update_handler.delete_model(model_update.model_update_id) logger.info("AGGREGATOR({}): Deleted model update {} from storage.".format(self.name, model_update.model_update_id)) except Exception as e: logger.error("AGGREGATOR({}): Error encoutered while processing model update {}, skipping this update.".format(self.name, e)) if parameters["serveropt"] == "adam": model = self.serveropt_adam(helper, pseudo_gradient, model_old, parameters) elif parameters["serveropt"] == "yogi": model = self.serveropt_yogi(helper, pseudo_gradient, model_old, parameters) elif parameters["serveropt"] == "adagrad": model = self.serveropt_adagrad(helper, pseudo_gradient, model_old, parameters) else: logger.error("Unsupported server optimizer passed to FedOpt.") return None, data data["nr_aggregated_models"] = nr_aggregated_models logger.info("AGGREGATOR({}): Aggregation completed, aggregated {} models.".format(self.name, nr_aggregated_models)) return model, data
[docs] def serveropt_adam(self, helper, pseudo_gradient, model_old, parameters): """Server side optimization, FedAdam. :param helper: instance of helper class. :type helper: Helper :param pseudo_gradient: The pseudo gradient. :type pseudo_gradient: As defined by helper. :param model_old: The current global model. :type model_old: As defined in helper. :param parameters: Hyperparamters for the aggregator. :type parameters: dict :return: new model weights. :rtype: as defined by helper. """ beta1 = parameters["beta1"] beta2 = parameters["beta2"] learning_rate = parameters["learning_rate"] tau = parameters["tau"] if not self.v: self.v = helper.ones(pseudo_gradient, math.pow(tau, 2)) if not self.m: self.m = helper.multiply(pseudo_gradient, [(1.0 - beta1)] * len(pseudo_gradient)) else: self.m = helper.add(self.m, pseudo_gradient, beta1, (1.0 - beta1)) p = helper.power(pseudo_gradient, 2) self.v = helper.add(self.v, p, beta2, (1.0 - beta2)) sv = helper.add(helper.sqrt(self.v), helper.ones(self.v, tau)) t = helper.divide(self.m, sv) model = helper.add(model_old, t, 1.0, learning_rate) return model
[docs] def serveropt_yogi(self, helper, pseudo_gradient, model_old, parameters): """Server side optimization, FedYogi. :param helper: instance of helper class. :type helper: Helper :param pseudo_gradient: The pseudo gradient. :type pseudo_gradient: As defined by helper. :param model_old: The current global model. :type model_old: As defined in helper. :param parameters: Hyperparamters for the aggregator. :type parameters: dict :return: new model weights. :rtype: as defined by helper. """ beta1 = parameters["beta1"] beta2 = parameters["beta2"] learning_rate = parameters["learning_rate"] tau = parameters["tau"] if not self.v: self.v = helper.ones(pseudo_gradient, math.pow(tau, 2)) if not self.m: self.m = helper.multiply(pseudo_gradient, [(1.0 - beta1)] * len(pseudo_gradient)) else: self.m = helper.add(self.m, pseudo_gradient, beta1, (1.0 - beta1)) p = helper.power(pseudo_gradient, 2) s = helper.sign(helper.add(self.v, p, 1.0, -1.0)) s = helper.multiply(s, p) self.v = helper.add(self.v, s, 1.0, -(1.0 - beta2)) sv = helper.add(helper.sqrt(self.v), helper.ones(self.v, tau)) t = helper.divide(self.m, sv) model = helper.add(model_old, t, 1.0, learning_rate) return model
[docs] def serveropt_adagrad(self, helper, pseudo_gradient, model_old, parameters): """Server side optimization, FedAdam. :param helper: instance of helper class. :type helper: Helper :param pseudo_gradient: The pseudo gradient. :type pseudo_gradient: As defined by helper. :param model_old: The current global model. :type model_old: As defined in helper. :param parameters: Hyperparamters for the aggregator. :type parameters: dict :return: new model weights. :rtype: as defined by helper. """ beta1 = parameters["beta1"] learning_rate = parameters["learning_rate"] tau = parameters["tau"] if not self.v: self.v = helper.ones(pseudo_gradient, math.pow(tau, 2)) if not self.m: self.m = helper.multiply(pseudo_gradient, [(1.0 - beta1)] * len(pseudo_gradient)) else: self.m = helper.add(self.m, pseudo_gradient, beta1, (1.0 - beta1)) p = helper.power(pseudo_gradient, 2) self.v = helper.add(self.v, p, 1.0, 1.0) sv = helper.add(helper.sqrt(self.v), helper.ones(self.v, tau)) t = helper.divide(self.m, sv) model = helper.add(model_old, t, 1.0, learning_rate) return model