fedn.network.combiner package

The FEDn Combiner package is responsible for combining models from multiple clients. It’s the acting gRPC server for the federated network.

Subpackages

Submodules

fedn.network.combiner.combiner module

class fedn.network.combiner.combiner.Combiner(config)[source]

Bases: CombinerServicer, ReducerServicer, ConnectorServicer, ControlServicer

Combiner gRPC server.

Parameters:

config (dict) – configuration for the combiner

AcceptingClients(request: ConnectionRequest, context)[source]

RPC endpoint that returns a ConnectionResponse indicating whether the server is accepting clients or not.

Parameters:
  • request (fedn.network.grpc.fedn_pb2.ConnectionRequest) – the request (unused)

  • context (grpc._server._Context) – the context (unused)

Returns:

the response

Return type:

fedn.network.grpc.fedn_pb2.ConnectionResponse

FlushAggregationQueue(control: ControlRequest, context)[source]

Flush the queue.

Parameters:
  • control (fedn.network.grpc.fedn_pb2.ControlRequest) – the control request

  • context (grpc._server._Context) – the context (unused)

Returns:

the control response

Return type:

fedn.network.grpc.fedn_pb2.ControlResponse

ListActiveClients(request: ListClientsRequest, context)[source]
RPC endpoint that returns a ClientList containing the names of all active clients.

An active client has sent a status message / responded to a heartbeat request in the last 10 seconds.

Parameters:
  • request (fedn.network.grpc.fedn_pb2.ListClientsRequest) – the request

  • context (grpc._server._Context) – the context (unused)

Returns:

the client list

Return type:

fedn.network.grpc.fedn_pb2.ClientList

SendHeartbeat(heartbeat: Heartbeat, context)[source]
RPC that lets clients send a hearbeat, notifying the server that

the client is available.

Parameters:
  • heartbeat (fedn.network.grpc.fedn_pb2.Heartbeat) – the heartbeat

  • context (grpc._server._Context) – the context (unused)

Returns:

the response

Return type:

fedn.network.grpc.fedn_pb2.Response

SendModelUpdate(request, context)[source]

Send a model update response.

Parameters:
  • request (fedn.network.grpc.fedn_pb2.ModelUpdate) – the request

  • context (grpc._server._Context) – the context

Returns:

the response

Return type:

fedn.network.grpc.fedn_pb2.Response

SendModelValidation(request, context)[source]

Send a model validation response.

Parameters:
  • request (fedn.network.grpc.fedn_pb2.ModelValidation) – the request

  • context (grpc._server._Context) – the context

Returns:

the response

Return type:

fedn.network.grpc.fedn_pb2.Response

SendStatus(status: Status, context)[source]

A client RPC endpoint that accepts status messages.

Parameters:
  • status (fedn.network.grpc.fedn_pb2.Status) – the status message

  • context (grpc._server._Context) – the context (unused)

Returns:

the response

Return type:

fedn.network.grpc.fedn_pb2.Response

SetAggregator(control: ControlRequest, context)[source]

Set the active aggregator.

Parameters:
  • control (fedn.network.grpc.fedn_pb2.ControlRequest) – the control request

  • context (grpc._server._Context) – the context (unused)

Returns:

the control response

Return type:

fedn.network.grpc.fedn_pb2.ControlResponse

Start(control: ControlRequest, context)[source]

Start a round of federated learning”

Parameters:
  • control (fedn.network.grpc.fedn_pb2.ControlRequest) – the control request

  • context (grpc._server._Context) – the context (unused)

Returns:

the control response

Return type:

fedn.network.grpc.fedn_pb2.ControlResponse

Stop(control: ControlRequest, context)[source]

TODO: Not yet implemented.

Parameters:
  • control (fedn.network.grpc.fedn_pb2.ControlRequest) – the control request

  • context (grpc._server._Context) – the context (unused)

Returns:

the control response

Return type:

fedn.network.grpc.fedn_pb2.ControlResponse

TaskStream(response, context)[source]

A server stream RPC endpoint (Update model). Messages from client stream.

Parameters:
  • response (fedn.network.grpc.fedn_pb2.ModelUpdateRequest) – the response

  • context (grpc._server._Context) – the context

get_active_trainers()[source]

Get a list of active trainers.

Returns:

the list of active trainers

Return type:

list

get_active_validators()[source]

Get a list of active validators.

Returns:

the list of active validators

Return type:

list

nr_active_trainers()[source]

Get the number of active trainers.

Returns:

the number of active trainers

Return type:

int

register_model_validation(validation)[source]

Register a model validation.

Parameters:

validation (fedn.network.grpc.fedn_pb2.ModelValidation) – the model validation

request_model_update(config, clients=[])[source]

Ask clients to update the current global model.

Parameters:
  • config (dict) – the model configuration to send to clients

  • clients (list) – the clients to send the request to

request_model_validation(model_id, config, clients=[])[source]

Ask clients to validate the current global model.

Parameters:
  • model_id (str) – the model id to validate

  • config (dict) – the model configuration to send to clients

  • clients (list) – the clients to send the request to

run()[source]

Start the server.

class fedn.network.combiner.combiner.Role(value)[source]

Bases: Enum

Enum for combiner roles.

COMBINER = 2
OTHER = 4
REDUCER = 3
WORKER = 1
fedn.network.combiner.combiner.role_to_proto_role(role)[source]

Convert a Role to a proto Role.

Parameters:

role (fedn.network.combiner.server.Role) – the role to convert

Returns:

proto role

Return type:

fedn.network.grpc.fedn_pb2.Role

fedn.network.combiner.combiner_tests module

fedn.network.combiner.connect module

class fedn.network.combiner.connect.ConnectorCombiner(host, port, myhost, fqdn, myport, token, name, secure=False, verify=False)[source]

Bases: object

Connector for annnouncing combiner to the FEDn network.

Parameters:
  • host (str) – host of discovery service

  • port (int) – port of discovery service

  • myhost (str) – host of combiner

  • fqdn (str) – fully qualified domain name of combiner

  • myport (int) – port of combiner

  • token (str) – token for authentication

  • name (str) – name of combiner

  • secure (bool) – True if https is used, False if http

  • verify (bool) – True if certificate is verified, False if not

announce()[source]

Announce combiner to FEDn network via discovery service (REST-API).

Returns:

Tuple with announcement Status, FEDn network configuration if sucessful, else None.

Return type:

fedn.network.combiner.connect.Status, str

class fedn.network.combiner.connect.Status(value)[source]

Bases: Enum

Enum for representing the status of a combiner announcement.

Assigned = 1
TryAgain = 2
UnAuthorized = 3
UnMatchedConfig = 4
Unassigned = 0

fedn.network.combiner.interfaces module

class fedn.network.combiner.interfaces.Channel(address, port, certificate=None)[source]

Bases: object

Wrapper for a gRPC channel.

Parameters:
  • address (str) – The address for the gRPC server.

  • port (int) – The port for connecting to the gRPC server.

  • certificate (str) – The certificate for connecting to the gRPC server (optional)

get_channel()[source]

Get a channel.

Returns:

An instance of a gRPC channel

Return type:

grpc.Channel

class fedn.network.combiner.interfaces.CombinerInterface(parent, name, address, fqdn, port, certificate=None, key=None, ip=None, config=None)[source]

Bases: object

Interface for the Combiner (aggregation server).

Abstraction on top of the gRPC server servicer.

Parameters:
  • parent (fedn.network.api.interfaces.API) – The parent combiner (controller)

  • name (str) – The name of the combiner.

  • address (str) – The address of the combiner.

  • fqdn (str) – The fully qualified domain name of the combiner.

  • port (int) – The port of the combiner.

  • certificate (str) – The certificate of the combiner (optional).

  • key (str) – The key of the combiner (optional).

  • ip (str) – The ip of the combiner (optional).

  • config (dict) – The configuration of the combiner (optional).

allowing_clients()[source]

Check if the combiner is allowing additional client connections.

Returns:

True if accepting, else False.

Return type:

bool

flush_model_update_queue()[source]

Reset the model update queue on the combiner.

classmethod from_json()[source]

Initialize the combiner config from a json document.

Parameters:

combiner_config (dict) – The combiner configuration.

Returns:

An instance of the combiner interface.

Return type:

fedn.network.combiner.interfaces.CombinerInterface

get_certificate()[source]

Get combiner certificate.

Returns:

The combiner certificate.

Return type:

str, None if no certificate is set.

get_key()[source]

Get combiner key.

Returns:

The combiner key.

Return type:

str, None if no key is set.

get_model(id, timeout=10)[source]

Download a model from the combiner server.

Parameters:

id (str) – The model id.

Returns:

A file-like object containing the model.

Return type:

io.BytesIO, None if the model is not available.

list_active_clients(queue=1)[source]

List active clients.

Parameters:

queue – The channel (queue) to use (optional). Default is 1 = MODEL_UPDATE_REQUESTS channel. see fedn.network.grpc.fedn_pb2.Channel

Returns:

A list of active clients.

Return type:

json

set_aggregator(aggregator)[source]

Set the active aggregator module.

Parameters:

aggregator – The name of the aggregator module.

submit(config)[source]

Submit a compute plan to the combiner.

Parameters:

config (dict) – The job configuration.

Returns:

Server ControlResponse object.

Return type:

fedn.network.grpc.fedn_pb2.ControlResponse

to_dict()[source]

Export combiner configuration to a dictionary.

Returns:

A dictionary with the combiner configuration.

Return type:

dict

to_json()[source]

Export combiner configuration to json.

Returns:

A json document with the combiner configuration.

Return type:

str

exception fedn.network.combiner.interfaces.CombinerUnavailableError[source]

Bases: Exception

fedn.network.combiner.modelservice module

class fedn.network.combiner.modelservice.ModelService[source]

Bases: ModelServiceServicer

Service for handling download and upload of models to the server.

Download(request, context)[source]

RPC endpoints for downloading a model.

Parameters:
  • request (fedn.network.grpc.fedn_pb2.ModelRequest) – The model request object.

  • context (grpc._server._Context) – The context object (unused)

Returns:

A model response iterator.

Return type:

fedn.network.grpc.fedn_pb2.ModelResponse

Upload(request_iterator, context)[source]

RPC endpoints for uploading a model.

Parameters:
  • request_iterator (fedn.network.grpc.fedn_pb2.ModelRequest) – The model request iterator.

  • context (grpc._server._Context) – The context object (unused)

Returns:

A model response object.

Return type:

fedn.network.grpc.fedn_pb2.ModelResponse

exist(model_id)[source]

Check if a model exists on the server.

Parameters:

model_id – The model id.

Returns:

True if the model exists, else False.

get_model(id)[source]

Download model with id ‘id’ from server.

Parameters:

id (str) – The model id.

Returns:

A BytesIO object containing the model.

Return type:

io.BytesIO, None if model does not exist.

set_model(model, id)[source]

Upload model to server.

Parameters:
  • model (io.BytesIO) – A model object (BytesIO)

  • id (str) – The model id.

fedn.network.combiner.modelservice.get_tmp_path()[source]

Return a temporary output path compatible with save_model, load_model.

fedn.network.combiner.modelservice.load_model_from_BytesIO(model_bytesio, helper)[source]

Load a model from a BytesIO object. :param model_bytesio: A BytesIO object containing the model. :type model_bytesio: io.BytesIO :param helper: The helper object for the model. :type helper: fedn.utils.helperbase.HelperBase :return: The model object. :rtype: return type of helper.load

fedn.network.combiner.modelservice.model_as_bytesIO(model)[source]
fedn.network.combiner.modelservice.serialize_model_to_BytesIO(model, helper)[source]

Serialize a model to a BytesIO object.

Parameters:
  • model (return type of helper.load) – The model object.

  • helper (fedn.utils.helperbase.HelperBase) – The helper object for the model.

Returns:

A BytesIO object containing the model.

Return type:

io.BytesIO

fedn.network.combiner.modelservice.upload_request_generator(mdl, id)[source]

Generator function for model upload requests.

Parameters:

mdl (BytesIO) – The model update object.

Returns:

A model update request.

Return type:

fedn.ModelRequest

fedn.network.combiner.roundhandler module

exception fedn.network.combiner.roundhandler.ModelUpdateError[source]

Bases: Exception

class fedn.network.combiner.roundhandler.RoundHandler(storage, server, modelservice)[source]

Bases: object

Round handler.

The round handler processes requests from the global controller to produce model updates and perform model validations.

Parameters:
  • aggregator_name (str) – The name of the aggregator plugin module.

  • storage (class: fedn.common.storage.s3.s3repo.S3ModelRepository) – Model repository for :class: fedn.network.combiner.Combiner

  • server (class: fedn.network.combiner.Combiner) – A handle to the Combiner class :class: fedn.network.combiner.Combiner

  • modelservice (class: fedn.network.combiner.modelservice.ModelService) – A handle to the model service :class: fedn.network.combiner.modelservice.ModelService

execute_training_round(config)[source]

Coordinates clients to execute training tasks.

Parameters:

config (dict) – The round config object.

Returns:

metadata about the training round.

Return type:

dict

execute_validation_round(round_config)[source]

Coordinate validation rounds as specified in config.

Parameters:

round_config (dict) – The round config object.

load_model_update(helper, model_id)[source]

Load model update with id model_id into its memory representation.

Parameters:
  • helper (class: fedn.utils.helpers.helpers.HelperBase) – An instance of :class: fedn.utils.helpers.helpers.HelperBase

  • model_id (str) – The ID of the model update, UUID in str format

load_model_update_str(model_id, retry=3)[source]

Load model update object and return it as BytesIO.

Parameters:
  • model_id (str) – The ID of the model

  • retry (int, optional) – number of times retrying load model update, defaults to 3

Returns:

Updated model

Return type:

class: io.BytesIO

push_round_config(round_config)[source]

Add a round_config (job description) to the inbox.

Parameters:

round_config (dict) – A dict containing the round configuration (from global controller).

Returns:

A job id (universally unique identifier) for the round.

Return type:

str

run(polling_interval=1.0)[source]

Main control loop. Execute rounds based on round config on the queue.

Parameters:

polling_interval (float) – The polling interval in seconds for checking if a new job/config is available.

set_aggregator(aggregator)[source]
stage_model(model_id, timeout_retry=3, retry=2)[source]

Download a model from persistent storage and set in modelservice.

Parameters:
  • model_id (str) – ID of the model update object to stage.

  • timeout_retry (int, optional) – Sleep before retrying download again(sec), defaults to 3

  • retry (int, optional) – Number of retries, defaults to 2

waitforit(config, buffer_size=100, polling_interval=0.1)[source]

Defines the policy for how long the server should wait before starting to aggregate models.

The policy is as follows:
  1. Wait a maximum of time_window time until the round times out.

  2. Terminate if a preset number of model updates (buffer_size) are in the queue.

Parameters:
  • config (dict) – The round config object

  • buffer_size (int, optional) – The number of model updates to wait for before starting aggregation, defaults to 100

  • polling_interval (float, optional) – The polling interval, defaults to 0.1