fedn.network.combiner package
The FEDn Combiner package is responsible for combining models from multiple clients. It’s the acting gRPC server for the federated network.
Subpackages
- fedn.network.combiner.aggregators package
- fedn.network.combiner.hooks package
- Submodules
- fedn.network.combiner.hooks.allowed_import module
- fedn.network.combiner.hooks.hook_client module
- fedn.network.combiner.hooks.hooks module
FunctionServiceServicer
FunctionServiceServicer.HandleAggregation()
FunctionServiceServicer.HandleClientConfig()
FunctionServiceServicer.HandleClientSelection()
FunctionServiceServicer.HandleMetadata()
FunctionServiceServicer.HandleProvidedFunctions()
FunctionServiceServicer.HandleStoreModel()
FunctionServiceServicer.__init__()
FunctionServiceServicer.check_incremental_aggregate()
serve()
- fedn.network.combiner.hooks.serverfunctionsbase module
- fedn.network.combiner.hooks.serverfunctionstest module
Submodules
fedn.network.combiner.combiner module
- class fedn.network.combiner.combiner.Combiner(config, repository: Repository, db: DatabaseConnection)[source]
Bases:
CombinerServicer
,ReducerServicer
,ConnectorServicer
,ControlServicer
Combiner gRPC server.
- Parameters:
config (dict) – configuration for the combiner
- AcceptingClients(request: ConnectionRequest, context)[source]
RPC endpoint that returns a ConnectionResponse indicating whether the server is accepting clients or not.
- Parameters:
request (
fedn.network.grpc.fedn_pb2.ConnectionRequest
) – the request (unused)context (
grpc._server._Context
) – the context (unused)
- Returns:
the response
- Return type:
fedn.network.grpc.fedn_pb2.ConnectionResponse
- FlushAggregationQueue(control: ControlRequest, context)[source]
Flush the queue.
- Parameters:
control (
fedn.network.grpc.fedn_pb2.ControlRequest
) – the control requestcontext (
grpc._server._Context
) – the context (unused)
- Returns:
the control response
- Return type:
fedn.network.grpc.fedn_pb2.ControlResponse
- ListActiveClients(request: ListClientsRequest, context)[source]
- RPC endpoint that returns a ClientList containing the names of all active clients.
An active client has sent a status message / responded to a heartbeat request in the last 10 seconds.
- Parameters:
request (
fedn.network.grpc.fedn_pb2.ListClientsRequest
) – the requestcontext (
grpc._server._Context
) – the context (unused)
- Returns:
the client list
- Return type:
fedn.network.grpc.fedn_pb2.ClientList
- SendAttributeMessage(request, context)[source]
Send a model attribute response.
- Parameters:
request (
fedn.network.grpc.fedn_pb2.AttributeMessage
) – the request
- SendBackwardCompletion(request, context)[source]
Send a backward completion response.
- Parameters:
request (
fedn.network.grpc.fedn_pb2.BackwardCompletion
) – the requestcontext (
grpc._server._Context
) – the context
- Returns:
the response
- Return type:
fedn.network.grpc.fedn_pb2.Response
- SendHeartbeat(heartbeat: Heartbeat, context)[source]
- RPC that lets clients send a hearbeat, notifying the server that
the client is available.
- Parameters:
heartbeat (
fedn.network.grpc.fedn_pb2.Heartbeat
) – the heartbeatcontext (
grpc._server._Context
) – the context (unused)
- Returns:
the response
- Return type:
fedn.network.grpc.fedn_pb2.Response
- SendModelMetric(request, context)[source]
Send a model metric response.
- Parameters:
request (
fedn.network.grpc.fedn_pb2.ModelMetric
) – the requestcontext (
grpc._server._Context
) – the context
- Returns:
the response
- Return type:
fedn.network.grpc.fedn_pb2.Response
- SendModelPrediction(request, context)[source]
Send a model prediction response.
- Parameters:
request (
fedn.network.grpc.fedn_pb2.ModelPrediction
) – the requestcontext (
grpc._server._Context
) – the context
- Returns:
the response
- Return type:
fedn.network.grpc.fedn_pb2.Response
- SendModelUpdate(request, context)[source]
Send a model update response.
- Parameters:
request (
fedn.network.grpc.fedn_pb2.ModelUpdate
) – the requestcontext (
grpc._server._Context
) – the context
- Returns:
the response
- Return type:
fedn.network.grpc.fedn_pb2.Response
- SendModelValidation(request, context)[source]
Send a model validation response.
- Parameters:
request (
fedn.network.grpc.fedn_pb2.ModelValidation
) – the requestcontext (
grpc._server._Context
) – the context
- Returns:
the response
- Return type:
fedn.network.grpc.fedn_pb2.Response
- SendStatus(status: Status, context)[source]
A client RPC endpoint that accepts status messages.
- Parameters:
status (
fedn.network.grpc.fedn_pb2.Status
) – the status messagecontext (
grpc._server._Context
) – the context (unused)
- Returns:
the response
- Return type:
fedn.network.grpc.fedn_pb2.Response
- SendTelemetryMessage(request, context)[source]
Send a telemetry message.
- Parameters:
request (
fedn.network.grpc.fedn_pb2.TelemetryMessage
) – the request
- SetAggregator(control: ControlRequest, context)[source]
Set the active aggregator.
- Parameters:
control (
fedn.network.grpc.fedn_pb2.ControlRequest
) – the control requestcontext (
grpc._server._Context
) – the context (unused)
- Returns:
the control response
- Return type:
fedn.network.grpc.fedn_pb2.ControlResponse
- SetServerFunctions(control: ControlRequest, context)[source]
Set a function provider.
- Parameters:
control (
fedn.network.grpc.fedn_pb2.ControlRequest
) – the control requestcontext (
grpc._server._Context
) – the context (unused)
- Returns:
the control response
- Return type:
fedn.network.grpc.fedn_pb2.ControlResponse
- Start(control: ControlRequest, context)[source]
Start a round of federated learning”
- Parameters:
control (
fedn.network.grpc.fedn_pb2.ControlRequest
) – the control requestcontext (
grpc._server._Context
) – the context (unused)
- Returns:
the control response
- Return type:
fedn.network.grpc.fedn_pb2.ControlResponse
- Stop(control: ControlRequest, context)[source]
TODO: Not yet implemented.
- Parameters:
control (
fedn.network.grpc.fedn_pb2.ControlRequest
) – the control requestcontext (
grpc._server._Context
) – the context (unused)
- Returns:
the control response
- Return type:
fedn.network.grpc.fedn_pb2.ControlResponse
- TaskStream(response, context)[source]
A server stream RPC endpoint (Update model). Messages from client stream.
- Parameters:
response (
fedn.network.grpc.fedn_pb2.ModelUpdateRequest
) – the responsecontext (
grpc._server._Context
) – the context
- __init__(config, repository: Repository, db: DatabaseConnection)[source]
Initialize Combiner server.
- classmethod create_instance(config: CombinerConfig, repository: Repository, db: DatabaseConnection)[source]
Create a new singleton instance of the combiner.
- Parameters:
config (dict) – configuration for the combiner
- Returns:
the instance of the combiner
- Return type:
fedn.network.combiner.server.Combiner
- get_active_trainers()[source]
Get a list of active trainers.
- Returns:
the list of active trainers
- Return type:
- get_active_validators()[source]
Get a list of active validators.
- Returns:
the list of active validators
- Return type:
- nr_active_trainers()[source]
Get the number of active trainers.
- Returns:
the number of active trainers
- Return type:
- register_model_validation(validation)[source]
Register a model validation.
- Parameters:
validation (
fedn.network.grpc.fedn_pb2.ModelValidation
) – the model validation
- request_backward_pass(session_id: str, gradient_id: str, config: dict, clients=[]) None [source]
Ask clients to perform backward pass.
- request_forward_pass(session_id: str, model_id: str, config: dict, clients=[]) None [source]
Ask clients to perform forward pass.
- request_model_prediction(prediction_id: str, model_id: str, clients: list = []) None [source]
Ask clients to perform prediction on the model.
- request_model_update(session_id, model_id, config, clients=[])[source]
Ask clients to update the current global model.
- class fedn.network.combiner.combiner.CombinerConfig[source]
Bases:
TypedDict
Configuration for the combiner.
fedn.network.combiner.connect module
- class fedn.network.combiner.connect.ConnectorCombiner(host, port, myhost, fqdn, myport, token, name, secure=False, verify=False)[source]
Bases:
object
Connector for annnouncing combiner to the FEDn network.
- Parameters:
host (str) – host of discovery service
port (int) – port of discovery service
myhost (str) – host of combiner
fqdn (str) – fully qualified domain name of combiner
myport (int) – port of combiner
token (str) – token for authentication
name (str) – name of combiner
secure (bool) – True if https is used, False if http
verify (bool) – True if certificate is verified, False if not
- __init__(host, port, myhost, fqdn, myport, token, name, secure=False, verify=False)[source]
Initialize the ConnectorCombiner.
- Parameters:
host (str) – The host of the discovery service.
port (int) – The port of the discovery service.
myhost (str) – The host of the combiner.
fqdn (str) – The fully qualified domain name of the combiner.
myport (int) – The port of the combiner.
token (str) – The token for the discovery service.
name (str) – The name of the combiner.
secure (bool) – Use https for the connection to the discovery service.
verify (bool) – Verify the connection to the discovery service.
fedn.network.combiner.interfaces module
- class fedn.network.combiner.interfaces.Channel(address, port, certificate=None)[source]
Bases:
object
Wrapper for a gRPC channel.
- Parameters:
- class fedn.network.combiner.interfaces.CombinerInterface(parent, name, address, fqdn, port, certificate=None, key=None, ip=None, config=None)[source]
Bases:
object
- Interface for the Combiner (aggregation server).
Abstraction on top of the gRPC server servicer.
- Parameters:
parent (
fedn.network.api.interfaces.API
) – The parent combiner (controller)name (str) – The name of the combiner.
address (str) – The address of the combiner.
fqdn (str) – The fully qualified domain name of the combiner.
port (int) – The port of the combiner.
certificate (str) – The certificate of the combiner (optional).
key (str) – The key of the combiner (optional).
ip (str) – The ip of the combiner (optional).
config (dict) – The configuration of the combiner (optional).
- __init__(parent, name, address, fqdn, port, certificate=None, key=None, ip=None, config=None)[source]
Initialize the combiner interface.
- allowing_clients()[source]
Check if the combiner is allowing additional client connections.
- Returns:
True if accepting, else False.
- Return type:
- classmethod from_json()[source]
Initialize the combiner config from a json document.
- Parameters:
combiner_config (dict) – The combiner configuration.
- Returns:
An instance of the combiner interface.
- Return type:
- get_certificate()[source]
Get combiner certificate.
- Returns:
The combiner certificate.
- Return type:
str, None if no certificate is set.
- get_key()[source]
Get combiner key.
- Returns:
The combiner key.
- Return type:
str, None if no key is set.
- list_active_clients(queue=1)[source]
List active clients.
- Parameters:
queue – The channel (queue) to use (optional). Default is 1 = MODEL_UPDATE_REQUESTS channel. see
fedn.network.grpc.fedn_pb2.Channel
- Returns:
A list of active clients.
- Return type:
json
- set_aggregator(aggregator)[source]
Set the active aggregator module.
- Parameters:
aggregator – The name of the aggregator module.
- set_server_functions(server_functions)[source]
Set the function provider module.
- Parameters:
provider (function) – Stringified function provider code.
- submit(config: RoundConfig)[source]
Submit a compute plan to the combiner.
- Parameters:
config (dict) – The job configuration.
- Returns:
Server ControlResponse object.
- Return type:
fedn.network.grpc.fedn_pb2.ControlResponse
Bases:
Exception
fedn.network.combiner.modelservice module
- class fedn.network.combiner.modelservice.ModelService[source]
Bases:
ModelServiceServicer
Service for handling download and upload of models to the server.
- Download(request, context)[source]
RPC endpoints for downloading a model.
- Parameters:
request (
fedn.network.grpc.fedn_pb2.ModelRequest
) – The model request object.context (
grpc._server._Context
) – The context object (unused)
- Returns:
A model response iterator.
- Return type:
fedn.network.grpc.fedn_pb2.ModelResponse
- Upload(request_iterator, context)[source]
RPC endpoints for uploading a model.
- Parameters:
request_iterator (
fedn.network.grpc.fedn_pb2.ModelRequest
) – The model request iterator.context (
grpc._server._Context
) – The context object (unused)
- Returns:
A model response object.
- Return type:
fedn.network.grpc.fedn_pb2.ModelResponse
- exist(model_id)[source]
Check if a model exists on the server.
- Parameters:
model_id – The model id.
- Returns:
True if the model exists, else False.
- get_model(id)[source]
Download model with id ‘id’ from server.
- Parameters:
id (str) – The model id.
- Returns:
A BytesIO object containing the model.
- Return type:
io.BytesIO
, None if model does not exist.
- set_model(model, id)[source]
Upload model to server.
- Parameters:
model (
io.BytesIO
) – A model object (BytesIO)id (str) – The model id.
- fedn.network.combiner.modelservice.bytesIO_request_generator(mdl, request_function, args)[source]
Generator function for model upload requests.
- Parameters:
mdl (BytesIO) – The model update object.
request_function (Function) – Function for sending requests.
args (dict) – request arguments, excluding data argument.
- Returns:
Yields grpc request for streaming.
- Return type:
grpc request generator.
- fedn.network.combiner.modelservice.get_tmp_path()[source]
Return a temporary output path compatible with save_model, load_model.
- fedn.network.combiner.modelservice.load_model_from_bytes(model_bytes, helper)[source]
Load a model from a bytes object. :param model_bytesio: A bytes object containing the model. :type model_bytes:
bytes
:param helper: The helper object for the model. :type helper:fedn.utils.helperbase.HelperBase
:return: The model object. :rtype: return type of helper.load
- fedn.network.combiner.modelservice.serialize_model_to_BytesIO(model, helper)[source]
Serialize a model to a BytesIO object.
- Parameters:
model (return type of helper.load) – The model object.
helper (
fedn.utils.helperbase.HelperBase
) – The helper object for the model.
- Returns:
A BytesIO object containing the model.
- Return type:
fedn.network.combiner.roundhandler module
- class fedn.network.combiner.roundhandler.RoundConfig[source]
Bases:
TypedDict
Round configuration.
- Parameters:
_job_id (str) – A universally unique identifier for the round. Set by Combiner.
committed_at (str) – The time the round was committed. Set by Controller.
task (str) – The task to perform in the round. Set by Controller. Supported tasks are “training”, “validation”, and “prediction”.
round_id (str) – The round identifier as str(int)
round_timeout (str) – The round timeout in seconds. Set by user interfaces or Controller.
rounds – The number of rounds. Set by user interfaces.
model_id (str) – The model identifier. Set by user interfaces or Controller.
model_version (str) – The model version. Currently not used.
model_type (str) – The model type. Currently not used.
model_size (int) – The size of the model. Currently not used.
model_parameters (dict) – The model parameters. Currently not used.
model_metadata (dict) – The model metadata. Currently not used.
session_id (str) – The session identifier. Set by (Controller?).
prediction_id (str) – The prediction identifier. Only used for prediction tasks.
helper_type (str) – The helper type.
aggregator (str) – The aggregator type.
client_settings (dict) – Settings that are distributed to clients.
- class fedn.network.combiner.roundhandler.RoundHandler(server: Combiner, repository: Repository, modelservice: ModelService)[source]
Bases:
object
Round handler.
The round handler processes requests from the global controller to produce model updates and perform model validations.
- Parameters:
aggregator_name (str) – The name of the aggregator plugin module.
storage (class: fedn.common.storage.s3.s3repo.S3ModelRepository) – Model repository for :class: fedn.network.combiner.Combiner
server (class: fedn.network.combiner.Combiner) – A handle to the Combiner class :class: fedn.network.combiner.Combiner
modelservice (class: fedn.network.combiner.modelservice.ModelService) – A handle to the model service :class: fedn.network.combiner.modelservice.ModelService
- __init__(server: Combiner, repository: Repository, modelservice: ModelService)[source]
Initialize the RoundHandler.
- execute_prediction_round(prediction_id: str, model_id: str) None [source]
Coordinate prediction rounds as specified in config.
- Parameters:
round_config (dict) – The round config object.
- execute_validation_round(session_id, model_id)[source]
Coordinate validation rounds as specified in config.
- Parameters:
round_config (dict) – The round config object.
- push_round_config(round_config: RoundConfig) str [source]
Add a round_config (job description) to the inbox.
- run(polling_interval=1.0)[source]
Main control loop. Execute rounds based on round config on the queue.
- Parameters:
polling_interval (float) – The polling interval in seconds for checking if a new job/config is available.
fedn.network.combiner.updatehandler module
- class fedn.network.combiner.updatehandler.UpdateHandler(modelservice: ModelService)[source]
Bases:
object
Update handler.
Responsible for receiving, loading and supplying client model updates.
- Parameters:
modelservice (class: fedn.network.combiner.modelservice.ModelService) – A handle to the model service :class: fedn.network.combiner.modelservice.ModelService
- __init__(modelservice: ModelService) None [source]
- load_model(helper, model_id)[source]
Load model update with id model_id into its memory representation.
- Parameters:
helper (class: fedn.utils.helpers.helpers.HelperBase) – An instance of :class: fedn.utils.helpers.helpers.HelperBase
model_id (str) – The ID of the model update, UUID in str format
- load_model_update(model_update, helper)[source]
Load the memory representation of the model update.
Load the model update paramters and the associate metadata into memory.
- Parameters:
model_update (fedn.network.grpc.fedn.proto.ModelUpdate) – The model update.
helper (fedn.utils.helpers.helperbase.Helper) – A helper object.
- Returns:
A tuple of (parameters, metadata)
- Return type:
- load_model_update_byte(model_update)[source]
Load the memory representation of the model update.
Load the model update paramters and the associate metadata into memory.
- Parameters:
model_update (fedn.network.grpc.fedn.proto.ModelUpdate) – The model update.
- Returns:
A tuple of parameters(bytes), metadata
- Return type:
- load_model_update_bytesIO(model_id, retry=3)[source]
Load model update object and return it as BytesIO.
- next_model_update()[source]
Get the next model update from the queue.
- Parameters:
helper (object) – A helper object.
- Returns:
The model update.
- Return type:
fedn.network.grpc.fedn.proto.ModelUpdate
- on_model_update(model_update)[source]
Callback when a new client model update is recieved.
Performs (optional) validation and pre-processing, and then puts the update id on the aggregation queue. Override in subclass as needed.
- Parameters:
model_update – fedn.network.grpc.fedn.proto.ModelUpdate
- waitforbackwardcompletion(config, required_backward_completions=-1, polling_interval=0.1)[source]
Wait for backward completion messages.
- Parameters:
config – The round config object
required_backward_completions – Number of required backward completions
- waitforit(config, buffer_size=100, polling_interval=0.1)[source]
Defines the policy for how long the server should wait before starting to aggregate models.
- The policy is as follows:
Wait a maximum of time_window time until the round times out.
Terminate if a preset number of model updates (buffer_size) are in the queue.