import base64
import copy
import json
import time
from io import BytesIO
import grpc
import fedn.network.grpc.fedn_pb2 as fedn
import fedn.network.grpc.fedn_pb2_grpc as rpc
from fedn.network.combiner.roundhandler import RoundConfig
[docs]
class CombinerUnavailableError(Exception):
pass
[docs]
class Channel:
"""Wrapper for a gRPC channel.
:param address: The address for the gRPC server.
:type address: str
:param port: The port for connecting to the gRPC server.
:type port: int
:param certificate: The certificate for connecting to the gRPC server (optional)
:type certificate: str
"""
def __init__(self, address, port, certificate=None):
"""Create a channel.
If a valid certificate is given, a secure channel is created, else insecure.
:parameter address: The address for the gRPC server.
:type address: str
:parameter port: The port for connecting to the gRPC server.
:type port: int
:parameter certificate: The certificate for connecting to the gRPC server (optional)
:type certificate: str
"""
self.address = address
self.port = port
self.certificate = certificate
if self.certificate:
credentials = grpc.ssl_channel_credentials(root_certificates=copy.deepcopy(certificate))
self.channel = grpc.secure_channel("{}:{}".format(self.address, str(self.port)), credentials)
else:
self.channel = grpc.insecure_channel("{}:{}".format(self.address, str(self.port)))
[docs]
def get_channel(self):
"""Get a channel.
:return: An instance of a gRPC channel
:rtype: :class:`grpc.Channel`
"""
return copy.copy(self.channel)
[docs]
class CombinerInterface:
"""Interface for the Combiner (aggregation server).
Abstraction on top of the gRPC server servicer.
:param parent: The parent combiner (controller)
:type parent: :class:`fedn.network.api.interfaces.API`
:param name: The name of the combiner.
:type name: str
:param address: The address of the combiner.
:type address: str
:param fqdn: The fully qualified domain name of the combiner.
:type fqdn: str
:param port: The port of the combiner.
:type port: int
:param certificate: The certificate of the combiner (optional).
:type certificate: str
:param key: The key of the combiner (optional).
:type key: str
:param ip: The ip of the combiner (optional).
:type ip: str
:param config: The configuration of the combiner (optional).
:type config: dict
"""
def __init__(self, parent, name, address, fqdn, port, certificate=None, key=None, ip=None, config=None):
"""Initialize the combiner interface."""
self.parent = parent
self.name = name
self.address = address
self.fqdn = fqdn
self.port = port
self.certificate = certificate
self.key = key
self.ip = ip
if not config:
self.config = {"max_clients": 8}
else:
self.config = config
[docs]
@classmethod
def from_json(combiner_config):
"""Initialize the combiner config from a json document.
:parameter combiner_config: The combiner configuration.
:type combiner_config: dict
:return: An instance of the combiner interface.
:rtype: :class:`fedn.network.combiner.interfaces.CombinerInterface`
"""
return CombinerInterface(**combiner_config)
[docs]
def to_dict(self):
"""Export combiner configuration to a dictionary.
:return: A dictionary with the combiner configuration.
:rtype: dict
"""
data = {
"parent": self.parent,
"name": self.name,
"address": self.address,
"fqdn": self.fqdn,
"port": self.port,
"ip": self.ip,
"certificate": None,
"key": None,
"config": self.config,
}
if self.certificate:
cert_b64 = base64.b64encode(self.certificate)
key_b64 = base64.b64encode(self.key)
data["certificate"] = str(cert_b64).split("'")[1]
data["key"] = str(key_b64).split("'")[1]
return data
[docs]
def to_json(self):
"""Export combiner configuration to json.
:return: A json document with the combiner configuration.
:rtype: str
"""
return json.dumps(self.to_dict())
[docs]
def get_certificate(self):
"""Get combiner certificate.
:return: The combiner certificate.
:rtype: str, None if no certificate is set.
"""
if self.certificate:
cert_b64 = base64.b64encode(self.certificate)
return str(cert_b64).split("'")[1]
else:
return None
[docs]
def get_key(self):
"""Get combiner key.
:return: The combiner key.
:rtype: str, None if no key is set.
"""
if self.key:
key_b64 = base64.b64encode(self.key)
return str(key_b64).split("'")[1]
else:
return None
[docs]
def flush_model_update_queue(self):
"""Reset the model update queue on the combiner."""
channel = Channel(self.address, self.port, self.certificate).get_channel()
control = rpc.ControlStub(channel)
request = fedn.ControlRequest()
try:
control.FlushAggregationQueue(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
raise CombinerUnavailableError
else:
raise
[docs]
def set_aggregator(self, aggregator):
"""Set the active aggregator module.
:param aggregator: The name of the aggregator module.
:type config: str
"""
channel = Channel(self.address, self.port, self.certificate).get_channel()
control = rpc.ControlStub(channel)
request = fedn.ControlRequest()
p = request.parameter.add()
p.key = "aggregator"
p.value = aggregator
try:
control.SetAggregator(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
raise CombinerUnavailableError
else:
raise
[docs]
def submit(self, config: RoundConfig):
"""Submit a compute plan to the combiner.
:param config: The job configuration.
:type config: dict
:return: Server ControlResponse object.
:rtype: :class:`fedn.network.grpc.fedn_pb2.ControlResponse`
"""
channel = Channel(self.address, self.port, self.certificate).get_channel()
control = rpc.ControlStub(channel)
request = fedn.ControlRequest()
request.command = fedn.Command.START
for k, v in config.items():
p = request.parameter.add()
p.key = str(k)
p.value = str(v)
try:
response = control.Start(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
raise CombinerUnavailableError
else:
raise
return response
[docs]
def get_model(self, id, timeout=10):
"""Download a model from the combiner server.
:param id: The model id.
:type id: str
:return: A file-like object containing the model.
:rtype: :class:`io.BytesIO`, None if the model is not available.
"""
channel = Channel(self.address, self.port, self.certificate).get_channel()
modelservice = rpc.ModelServiceStub(channel)
data = BytesIO()
data.seek(0, 0)
time_start = time.time()
request = fedn.ModelRequest(id=id)
request.sender.name = self.name
request.sender.role = fedn.WORKER
parts = modelservice.Download(request)
for part in parts:
if part.status == fedn.ModelStatus.IN_PROGRESS:
data.write(part.data)
if part.status == fedn.ModelStatus.OK:
return data
if part.status == fedn.ModelStatus.FAILED:
return None
if part.status == fedn.ModelStatus.UNKNOWN:
if time.time() - time_start > timeout:
return None
continue
[docs]
def allowing_clients(self):
"""Check if the combiner is allowing additional client connections.
:return: True if accepting, else False.
:rtype: bool
"""
channel = Channel(self.address, self.port, self.certificate).get_channel()
connector = rpc.ConnectorStub(channel)
request = fedn.ConnectionRequest()
try:
response = connector.AcceptingClients(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
raise CombinerUnavailableError
else:
raise
if response.status == fedn.ConnectionStatus.NOT_ACCEPTING:
return False
if response.status == fedn.ConnectionStatus.ACCEPTING:
return True
if response.status == fedn.ConnectionStatus.TRY_AGAIN_LATER:
return False
return False
[docs]
def list_active_clients(self, queue=1):
"""List active clients.
:param queue: The channel (queue) to use (optional). Default is 1 = MODEL_UPDATE_REQUESTS channel.
see :class:`fedn.network.grpc.fedn_pb2.Channel`
:type channel: int
:return: A list of active clients.
:rtype: json
"""
channel = Channel(self.address, self.port, self.certificate).get_channel()
control = rpc.ConnectorStub(channel)
request = fedn.ListClientsRequest()
request.channel = queue
try:
response = control.ListActiveClients(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
raise CombinerUnavailableError
else:
raise
return response.client