Source code for fedn.network.api.network

import os
from typing import List

from fedn.common.log_config import logger
from fedn.network.combiner.interfaces import CombinerInterface
from fedn.network.loadbalancer.leastpacked import LeastPacked
from fedn.network.storage.dbconnection import DatabaseConnection

__all__ = ("Network",)


[docs] class Network: """FEDn network interface. This class is used to interact with the network. Note: This class contain redundant code, which is not used in the current version of FEDn. Some methods has been moved to :class:`fedn.network.api.interface.API`. """
[docs] def __init__(self, control, network_id: str, dbconn: DatabaseConnection, load_balancer=None): """ """ self.control = control self.id = network_id self.db = dbconn if not load_balancer: self.load_balancer = LeastPacked(self) else: self.load_balancer = load_balancer
[docs] def get_combiner(self, name): """Get combiner by name. :param name: name of combiner :type name: str :return: The combiner instance object :rtype: :class:`fedn.network.combiner.interfaces.CombinerInterface` """ combiners = self.get_combiners() for combiner in combiners: if name == combiner.name: return combiner return None
[docs] def get_combiners(self) -> List[CombinerInterface]: """Get all combiners in the network. :return: list of combiners objects :rtype: list(:class:`fedn.network.combiner.interfaces.CombinerInterface`) """ result = self.db.combiner_store.list(limit=0, skip=0, sort_key=None) combiners = [] for combiner in result: name = combiner.name.upper() # General certificate handling, same for all combiners. if os.environ.get("FEDN_GRPC_CERT_PATH"): with open(os.environ.get("FEDN_GRPC_CERT_PATH"), "rb") as f: cert = f.read() # Specific certificate handling for each combiner. elif os.environ.get(f"FEDN_GRPC_CERT_PATH_{name}"): cert_path = os.environ.get(f"FEDN_GRPC_CERT_PATH_{name}") with open(cert_path, "rb") as f: cert = f.read() else: cert = None combiners.append( CombinerInterface(combiner.parent, combiner.name, combiner.address, combiner.fqdn, combiner.port, certificate=cert, ip=combiner.ip) ) return combiners
[docs] def find_available_combiner(self): """Find an available combiner in the network. :return: The combiner instance object :rtype: :class:`fedn.network.combiner.interfaces.CombinerInterface` """ combiner = self.load_balancer.find_combiner() return combiner
[docs] def handle_unavailable_combiner(self, combiner): """This callback is triggered if a combiner is found to be unresponsive. :param combiner: The combiner instance object :type combiner: :class:`fedn.network.combiner.interfaces.CombinerInterface` :return: None """ # TODO: Implement strategy to handle an unavailable combiner. logger.warning("REDUCER CONTROL: Combiner {} unavailable.".format(combiner.name))