import base64
from fedn.common.log_config import logger
from fedn.network.combiner.interfaces import CombinerInterface
from fedn.network.loadbalancer.leastpacked import LeastPacked
__all__ = ("Network",)
[docs]
class Network:
"""FEDn network interface. This class is used to interact with the network.
Note: This class contain redundant code, which is not used in the current version of FEDn.
Some methods has been moved to :class:`fedn.network.api.interface.API`.
"""
def __init__(self, control, statestore, load_balancer=None):
""" """
self.statestore = statestore
self.control = control
self.id = statestore.network_id
if not load_balancer:
self.load_balancer = LeastPacked(self)
else:
self.load_balancer = load_balancer
[docs]
def get_combiner(self, name):
"""Get combiner by name.
:param name: name of combiner
:type name: str
:return: The combiner instance object
:rtype: :class:`fedn.network.combiner.interfaces.CombinerInterface`
"""
combiners = self.get_combiners()
for combiner in combiners:
if name == combiner.name:
return combiner
return None
[docs]
def get_combiners(self):
"""Get all combiners in the network.
:return: list of combiners objects
:rtype: list(:class:`fedn.network.combiner.interfaces.CombinerInterface`)
"""
data = self.statestore.get_combiners()
combiners = []
for c in data["result"]:
if c["certificate"]:
cert = base64.b64decode(c["certificate"])
key = base64.b64decode(c["key"])
else:
cert = None
key = None
combiners.append(CombinerInterface(c["parent"], c["name"], c["address"], c["fqdn"], c["port"], certificate=cert, key=key, ip=c["ip"]))
return combiners
[docs]
def add_combiner(self, combiner):
"""Add a new combiner to the network.
:param combiner: The combiner instance object
:type combiner: :class:`fedn.network.combiner.interfaces.CombinerInterface`
:return: None
"""
if not self.control.idle():
logger.warning("Reducer is not idle, cannot add additional combiner.")
return
if self.get_combiner(combiner.name):
return
logger.info("adding combiner {}".format(combiner.name))
self.statestore.set_combiner(combiner.to_dict())
[docs]
def remove_combiner(self, combiner):
"""Remove a combiner from the network.
:param combiner: The combiner instance object
:type combiner: :class:`fedn.network.combiner.interfaces.CombinerInterface`
:return: None
"""
if not self.control.idle():
logger.warning("Reducer is not idle, cannot remove combiner.")
return
self.statestore.delete_combiner(combiner.name)
[docs]
def find_available_combiner(self):
"""Find an available combiner in the network.
:return: The combiner instance object
:rtype: :class:`fedn.network.combiner.interfaces.CombinerInterface`
"""
combiner = self.load_balancer.find_combiner()
return combiner
[docs]
def handle_unavailable_combiner(self, combiner):
"""This callback is triggered if a combiner is found to be unresponsive.
:param combiner: The combiner instance object
:type combiner: :class:`fedn.network.combiner.interfaces.CombinerInterface`
:return: None
"""
# TODO: Implement strategy to handle an unavailable combiner.
logger.warning("REDUCER CONTROL: Combiner {} unavailable.".format(combiner.name))
[docs]
def add_client(self, client):
"""Add a new client to the network.
:param client: The client instance object
:type client: dict
:return: None
"""
if self.get_client(client["client_id"]):
return
logger.info("adding client {}".format(client["client_id"]))
self.statestore.set_client(client)
[docs]
def get_client(self, name):
"""Get client by name.
:param name: name of client
:type name: str
:return: The client instance object
:rtype: ObjectId
"""
ret = self.statestore.get_client(name)
return ret
[docs]
def update_client_data(self, client_data, status, role):
"""Update client status in statestore.
:param client_data: The client instance object
:type client_data: dict
:param status: The client status
:type status: str
:param role: The client role
:type role: str
:return: None
"""
self.statestore.update_client_status(client_data, status, role)
[docs]
def get_client_info(self):
"""List available client in statestore.
:return: list of client objects
:rtype: list(ObjectId)
"""
return self.statestore.list_clients()