Source code for fedn.network.clients.connect

# This file contains the Connector class for assigning client to the FEDn network via the discovery service (REST-API).
# The Connector class is used by the Client class in fedn/network/clients/client.py.
# Once assigned, the client will retrieve combiner assignment from the discovery service.
# The discovery service will also add the client to the statestore.
#
#
import enum

import requests

from fedn.common.config import FEDN_AUTH_REFRESH_TOKEN, FEDN_AUTH_REFRESH_TOKEN_URI, FEDN_AUTH_SCHEME, FEDN_CUSTOM_URL_PREFIX
from fedn.common.log_config import logger


[docs] class Status(enum.Enum): """Enum for representing the status of a client assignment.""" Unassigned = 0 Assigned = 1 TryAgain = 2 UnAuthorized = 3 UnMatchedConfig = 4
[docs] class ConnectorClient: """Connector for assigning client to a combiner in the FEDn network. :param host: host of discovery service :type host: str :param port: port of discovery service :type port: int :param token: token for authentication :type token: str :param name: name of client :type name: str :param remote_package: True if remote package is used, False if local :type remote_package: bool :param force_ssl: True if https is used, False if http :type force_ssl: bool :param verify: True if certificate is verified, False if not :type verify: bool :param combiner: name of preferred combiner :type combiner: str :param id: id of client """ def __init__(self, host, port, token, name, remote_package, force_ssl=False, verify=False, combiner=None, id=None): self.host = host self.port = port self.token = token self.name = name self.verify = verify self.preferred_combiner = combiner self.id = id self.package = "remote" if remote_package else "local" # for https we assume a an ingress handles permanent redirect (308) if force_ssl: self.prefix = "https://" else: self.prefix = "http://" if self.port: self.connect_string = "{}{}:{}".format(self.prefix, self.host, self.port) else: self.connect_string = "{}{}".format(self.prefix, self.host) logger.info("Setting connection string to {}.".format(self.connect_string))
[docs] def assign(self): """Connect client to FEDn network discovery service, ask for combiner assignment. :return: Tuple with assingment status, combiner connection information if sucessful, else None. :rtype: tuple(:class:`fedn.network.clients.connect.Status`, str) """ try: retval = None payload = {"name": self.name, "client_id": self.id, "preferred_combiner": self.preferred_combiner, "package": self.package} retval = requests.post( self.connect_string + FEDN_CUSTOM_URL_PREFIX + "/add_client", json=payload, verify=self.verify, allow_redirects=True, headers={"Authorization": f"{FEDN_AUTH_SCHEME} {self.token}"}, ) except Exception as e: logger.debug("***** {}".format(e)) return Status.Unassigned, {} if retval.status_code == 400: # Get error messange from response reason = retval.json()["message"] return Status.UnMatchedConfig, reason if retval.status_code == 401: if "message" in retval.json(): reason = retval.json()["message"] logger.warning(reason) if reason == "Token expired": status_code = self.refresh_token() if status_code >= 200 and status_code < 204: logger.info("Token refreshed.") return Status.TryAgain, reason else: return Status.UnAuthorized, "Could not refresh token" reason = "Unauthorized connection to reducer, make sure the correct token is set" return Status.UnAuthorized, reason if retval.status_code >= 200 and retval.status_code < 204: if retval.json()["status"] == "retry": if "message" in retval.json(): reason = retval.json()["message"] else: reason = "Controller was not ready. Try again later." return Status.TryAgain, reason return Status.Assigned, retval.json() return Status.Unassigned, None
[docs] def refresh_token(self): """Refresh client token. :return: Tuple with assingment status, combiner connection information if sucessful, else None. :rtype: tuple(:class:`fedn.network.clients.connect.Status`, str) """ if not FEDN_AUTH_REFRESH_TOKEN_URI or not FEDN_AUTH_REFRESH_TOKEN: logger.error("No refresh token URI/Token set, cannot refresh token.") return 401 payload = requests.post(FEDN_AUTH_REFRESH_TOKEN_URI, verify=self.verify, allow_redirects=True, json={"refresh": FEDN_AUTH_REFRESH_TOKEN}) self.token = payload.json()["access"] return payload.status_code