Source code for fedn.network.clients.client

import io
import json
import os
import queue
import re
import sys
import threading
import time
import uuid
from datetime import datetime
from io import BytesIO

import grpc
import requests
from google.protobuf.json_format import MessageToJson
from tenacity import retry, stop_after_attempt

import fedn.network.grpc.fedn_pb2 as fedn
import fedn.network.grpc.fedn_pb2_grpc as rpc
from fedn.common.config import FEDN_AUTH_SCHEME, FEDN_PACKAGE_EXTRACT_DIR
from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream
from fedn.network.clients.connect import ConnectorClient, Status
from fedn.network.clients.package import PackageRuntime
from fedn.network.clients.state import ClientState, ClientStateToString
from fedn.network.combiner.modelservice import get_tmp_path, upload_request_generator
from fedn.utils.helpers.helpers import get_helper, load_metadata, save_metadata

CHUNK_SIZE = 1024 * 1024
VALID_NAME_REGEX = "^[a-zA-Z0-9_-]*$"


[docs] class GrpcAuth(grpc.AuthMetadataPlugin): def __init__(self, key): self._key = key def __call__(self, context, callback): callback((("authorization", f"{FEDN_AUTH_SCHEME} {self._key}"),), None)
[docs] class Client: """FEDn Client. Service running on client/datanodes in a federation, recieving and handling model update and model validation requests. :param config: A configuration dictionary containing connection information for the discovery service (controller) and settings governing e.g. client-combiner assignment behavior. :type config: dict """ def __init__(self, config): """Initialize the client.""" self.state = None self.error_state = False self._connected = False self._missed_heartbeat = 0 self.config = config self.trace_attribs = False set_log_level_from_string(config.get("verbosity", "INFO")) set_log_stream(config.get("logfile", None)) self.id = config["client_id"] or str(uuid.uuid4()) # Validate client name match = re.search(VALID_NAME_REGEX, config["name"]) if not match: raise ValueError("Unallowed character in client name. Allowed characters: a-z, A-Z, 0-9, _, -.") # Folder where the client will store downloaded compute package and logs self.name = config["name"] if FEDN_PACKAGE_EXTRACT_DIR: self.run_path = os.path.join(os.getcwd(), FEDN_PACKAGE_EXTRACT_DIR) else: dirname = self.name + "-" + time.strftime("%Y%m%d-%H%M%S") self.run_path = os.path.join(os.getcwd(), dirname) if not os.path.exists(self.run_path): os.mkdir(self.run_path) self.started_at = datetime.now() self.logs = [] self.inbox = queue.Queue() # Attach to the FEDn network (get combiner or attach directly) if config["combiner"]: combiner_config = {"status": "assigned", "host": config["combiner"], "port": config["combiner_port"], "helper_type": ""} if config["proxy_server"]: combiner_config["fqdn"] = config["proxy_server"] else: self.connector = ConnectorClient( host=config["discover_host"], port=config["discover_port"], token=config["token"], name=config["name"], remote_package=config["remote_compute_context"], force_ssl=config["force_ssl"], verify=config["verify"], combiner=config["preferred_combiner"], id=self.id, ) combiner_config = self.assign() self.connect(combiner_config) self._initialize_dispatcher(self.config) self._initialize_helper(combiner_config) if not self.helper: logger.warning("Failed to retrieve helper class settings: {}".format(combiner_config)) self._subscribe_to_combiner(self.config) self.state = ClientState.idle
[docs] def assign(self): """Contacts the controller and asks for combiner assignment. :return: A configuration dictionary containing connection information for combiner. :rtype: dict """ logger.info("Initiating assignment request.") while True: status, response = self.connector.assign() if status == Status.TryAgain: logger.warning(response) logger.info("Assignment request failed. Retrying in 5 seconds.") time.sleep(5) continue if status == Status.Assigned: combiner_config = response break if status == Status.UnAuthorized: logger.critical(response) sys.exit("Exiting: Unauthorized") if status == Status.UnMatchedConfig: logger.critical(response) sys.exit("Exiting: UnMatchedConfig") time.sleep(5) logger.info("Assignment successfully received.") logger.info("Received combiner configuration: {}".format(combiner_config)) return combiner_config
def _add_grpc_metadata(self, key, value): """Add metadata for gRPC calls. :param key: The key of the metadata. :type key: str :param value: The value of the metadata. :type value: str """ # Check if metadata exists and add if not if not hasattr(self, "metadata"): self.metadata = () # Check if metadata key already exists and replace value if so for i, (k, v) in enumerate(self.metadata): if k == key: # Replace value self.metadata = self.metadata[:i] + ((key, value),) + self.metadata[i + 1 :] return # Set metadata using tuple concatenation self.metadata += ((key, value),)
[docs] def connect(self, combiner_config): """Connect to combiner. :param combiner_config: connection information for the combiner. :type combiner_config: dict """ if self._connected: logger.info("Client is already attached. ") return # TODO use the combiner_config['certificate'] for setting up secure comms' host = combiner_config["host"] # Add host to gRPC metadata self._add_grpc_metadata("grpc-server", host) logger.debug("Client using metadata: {}.".format(self.metadata)) port = combiner_config["port"] secure = False if "fqdn" in combiner_config.keys() and combiner_config["fqdn"] is not None: host = combiner_config["fqdn"] # assuming https if fqdn is used port = 443 logger.info(f"Initiating connection to combiner host at: {host}:{port}") if os.getenv("FEDN_GRPC_ROOT_CERT_PATH"): secure = True logger.info("Using root certificate from environment variable for GRPC channel.") with open(os.environ["FEDN_GRPC_ROOT_CERT_PATH"], "rb") as f: credentials = grpc.ssl_channel_credentials(f.read()) channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) elif self.config["secure"]: secure = True logger.info("Using default location for root certificates.") credentials = grpc.ssl_channel_credentials() if self.config["token"]: token = self.config["token"] auth_creds = grpc.metadata_call_credentials(GrpcAuth(token)) channel = grpc.secure_channel("{}:{}".format(host, str(port)), grpc.composite_channel_credentials(credentials, auth_creds)) else: channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) else: logger.info("Using insecure GRPC channel.") if port == 443: port = 80 channel = grpc.insecure_channel("{}:{}".format(host, str(port))) self.channel = channel self.connectorStub = rpc.ConnectorStub(channel) self.combinerStub = rpc.CombinerStub(channel) self.modelStub = rpc.ModelServiceStub(channel) logger.info("Successfully established {} connection to {}:{}".format("secure" if secure else "insecure", host, port)) self._connected = True
[docs] def disconnect(self): """Disconnect from the combiner.""" if not self._connected: logger.info("Client is not connected.") self.channel.close() self._connected = False logger.info("Client {} disconnected.".format(self.name))
def _initialize_helper(self, combiner_config): """Initialize the helper class for the client. :param combiner_config: A configuration dictionary containing connection information for | the discovery service (controller) and settings governing e.g. | client-combiner assignment behavior. :type combiner_config: dict :return: """ if "helper_type" in combiner_config.keys(): if not combiner_config["helper_type"]: # Default to numpyhelper self.helper = get_helper("numpyhelper") else: self.helper = get_helper(combiner_config["helper_type"]) def _subscribe_to_combiner(self, config): """Listen to combiner message stream and start all processing threads. :param config: A configuration dictionary containing connection information for | the discovery service (controller) and settings governing e.g. | client-combiner assignment behavior. """ # Start sending heartbeats to the combiner. threading.Thread(target=self._send_heartbeat, kwargs={"update_frequency": config["heartbeat_interval"]}, daemon=True).start() # Start listening for combiner training and validation messages threading.Thread(target=self._listen_to_task_stream, daemon=True).start() self._connected = True # Start processing the client message inbox threading.Thread(target=self.process_request, daemon=True).start()
[docs] @retry(stop=stop_after_attempt(3)) def untar_package(self, package_runtime): _, package_runpath = package_runtime.unpack() return package_runpath
def _initialize_dispatcher(self, config): """Initialize the dispatcher for the client. :param config: A configuration dictionary containing connection information for | the discovery service (controller) and settings governing e.g. | client-combiner assignment behavior. :type config: dict :return: """ pr = PackageRuntime(self.run_path) if config["remote_compute_context"]: retval = None tries = 10 while tries > 0: retval = pr.download( host=config["discover_host"], port=config["discover_port"], token=config["token"], force_ssl=config["force_ssl"], secure=config["secure"] ) if retval: break time.sleep(60) logger.warning("Compute package not available. Retrying in 60 seconds. {} attempts remaining.".format(tries)) tries -= 1 if retval: if "checksum" not in config: logger.warning("Bypassing validation of package checksum. Ensure the package source is trusted.") else: checks_out = pr.validate(config["checksum"]) if not checks_out: logger.critical("Validation of local package failed. Client terminating.") self.error_state = True return package_runpath = "" if retval: package_runpath = self.untar_package(pr) self.dispatcher = pr.dispatcher(package_runpath) try: logger.info("Initiating Dispatcher with entrypoint set to: startup") activate_cmd = self.dispatcher._get_or_create_python_env() self.dispatcher.run_cmd("startup") except KeyError: logger.info("No startup command found in package. Continuing.") pass except Exception as e: logger.error(f"Caught exception: {type(e).__name__}") else: from_path = os.path.join(os.getcwd(), "client") self.dispatcher = pr.dispatcher(from_path) # Get or create python environment activate_cmd = self.dispatcher._get_or_create_python_env() if activate_cmd: logger.info("To activate the virtual environment, run: {}".format(activate_cmd))
[docs] def get_model_from_combiner(self, id, timeout=20): """Fetch a model from the assigned combiner. Downloads the model update object via a gRPC streaming channel. :param id: The id of the model update object. :type id: str :return: The model update object. :rtype: BytesIO """ data = BytesIO() time_start = time.time() request = fedn.ModelRequest(id=id) request.sender.name = self.name request.sender.role = fedn.CLIENT try: for part in self.modelStub.Download(request, metadata=self.metadata): if part.status == fedn.ModelStatus.IN_PROGRESS: data.write(part.data) if part.status == fedn.ModelStatus.OK: return data if part.status == fedn.ModelStatus.FAILED: return None if part.status == fedn.ModelStatus.UNKNOWN: if time.time() - time_start >= timeout: return None continue except grpc.RpcError as e: logger.critical(f"GRPC: An error occurred during model download: {e}") return data
[docs] def send_model_to_combiner(self, model, id): """Send a model update to the assigned combiner. Uploads the model updated object via a gRPC streaming channel, Upload. :param model: The model update object. :type model: BytesIO :param id: The id of the model update object. :type id: str :return: The model update object. :rtype: BytesIO """ if not isinstance(model, BytesIO): bt = BytesIO() for d in model.stream(32 * 1024): bt.write(d) else: bt = model bt.seek(0, 0) try: result = self.modelStub.Upload(upload_request_generator(bt, id), metadata=self.metadata) except grpc.RpcError as e: logger.critical(f"GRPC: An error occurred during model upload: {e}") return result
def _listen_to_task_stream(self): """Subscribe to the model update request stream. :return: None :rtype: None """ r = fedn.ClientAvailableMessage() r.sender.name = self.name r.sender.role = fedn.CLIENT r.sender.client_id = self.id # Add client to metadata self._add_grpc_metadata("client", self.name) status_code = None while self._connected: try: if status_code == grpc.StatusCode.UNAVAILABLE: logger.info("GRPC TaskStream: server available again.") status_code = None for request in self.combinerStub.TaskStream(r, metadata=self.metadata): if request: logger.debug("Received model update request from combiner: {}.".format(request)) if request.sender.role == fedn.COMBINER: # Process training request self.send_status( "Received model update request.", log_level=fedn.LogLevel.AUDIT, type=fedn.StatusType.MODEL_UPDATE_REQUEST, request=request, sesssion_id=request.session_id, ) logger.info("Received task request of type {} for model_id {}".format(request.type, request.model_id)) if request.type == fedn.StatusType.MODEL_UPDATE and self.config["trainer"]: self.inbox.put(("train", request)) elif request.type == fedn.StatusType.MODEL_VALIDATION and self.config["validator"]: self.inbox.put(("validate", request)) elif request.type == fedn.StatusType.MODEL_PREDICTION and self.config["validator"]: logger.info("Received prediction request for model_id {}".format(request.model_id)) presigned_url = json.loads(request.data) presigned_url = presigned_url["presigned_url"] logger.info("Prediction presigned URL: {}".format(presigned_url)) self.inbox.put(("predict", request)) else: logger.error("Unknown request type: {}".format(request.type)) except grpc.RpcError as e: # Handle gRPC errors status_code = e.code() if status_code == grpc.StatusCode.UNAVAILABLE: logger.warning("GRPC TaskStream: server unavailable during model update request stream. Retrying.") # Retry after a delay time.sleep(5) continue if status_code == grpc.StatusCode.UNAUTHENTICATED: details = e.details() if details == "Token expired": logger.warning("GRPC TaskStream: Token expired. Reconnecting.") self.disconnect() if status_code == grpc.StatusCode.CANCELLED: # Expected if the client is disconnected logger.critical("GRPC TaskStream: Client disconnected from combiner. Trying to reconnect.") else: # Log the error and continue logger.error(f"GRPC TaskStream: An error occurred during model update request stream: {e}") except Exception as ex: # Handle other exceptions logger.error(f"GRPC TaskStream: An error occurred during model update request stream: {ex}") # Detach if not attached if not self._connected: return def _process_training_request(self, model_id: str, session_id: str = None, client_settings: dict = None): """Process a training (model update) request. :param model_id: The model id of the model to be updated. :type model_id: str :param session_id: The id of the current session :type session_id: str :return: The model id of the updated model, or None if the update failed. And a dict with metadata. :rtype: tuple """ self.send_status("\t Starting processing of training request for model_id {}".format(model_id), sesssion_id=session_id) self.state = ClientState.training try: meta = {} tic = time.time() mdl = self.get_model_from_combiner(str(model_id)) if mdl is None: logger.error("Could not retrieve model from combiner. Aborting training request.") return None, None meta["fetch_model"] = time.time() - tic inpath = self.helper.get_tmp_path() with open(inpath, "wb") as fh: fh.write(mdl.getbuffer()) save_metadata(metadata=client_settings, filename=inpath) outpath = self.helper.get_tmp_path() tic = time.time() # TODO: Check return status, fail gracefully self.dispatcher.run_cmd("train {} {}".format(inpath, outpath)) meta["exec_training"] = time.time() - tic tic = time.time() out_model = None with open(outpath, "rb") as fr: out_model = io.BytesIO(fr.read()) # Stream model update to combiner server updated_model_id = uuid.uuid4() self.send_model_to_combiner(out_model, str(updated_model_id)) meta["upload_model"] = time.time() - tic # Read the metadata file training_metadata = load_metadata(outpath) meta["training_metadata"] = training_metadata os.unlink(inpath) os.unlink(outpath) os.unlink(outpath + "-metadata") except Exception as e: logger.error("Could not process training request due to error: {}".format(e)) updated_model_id = None meta = {"status": "failed", "error": str(e)} self.state = ClientState.idle return updated_model_id, meta def _process_validation_request(self, model_id: str, session_id: str = None): """Process a validation request. :param model_id: The model id of the model to be validated. :type model_id: str :param session_id: The id of the current session. :type session_id: str :return: The validation metrics, or None if validation failed. :rtype: dict """ self.send_status(f"Processing validation request for model_id {model_id}", sesssion_id=session_id) self.state = ClientState.validating try: model = self.get_model_from_combiner(str(model_id)) if model is None: logger.error("Could not retrieve model from combiner. Aborting validation request.") return None inpath = self.helper.get_tmp_path() with open(inpath, "wb") as fh: fh.write(model.getbuffer()) outpath = get_tmp_path() self.dispatcher.run_cmd(f"validate {inpath} {outpath}") with open(outpath, "r") as fh: validation = json.loads(fh.read()) os.unlink(inpath) os.unlink(outpath) except Exception as e: logger.warning("Validation failed with exception {}".format(e)) self.state = ClientState.idle return None self.state = ClientState.idle return validation def _process_prediction_request(self, model_id: str, session_id: str, presigned_url: str): """Process a prediction request. :param model_id: The model id of the model to be used for prediction. :type model_id: str :param session_id: The id of the current session. :type session_id: str :param presigned_url: The presigned URL for the data to be used for prediction. :type presigned_url: str :return: None """ self.send_status(f"Processing prediction request for model_id {model_id}", sesssion_id=session_id) try: model = self.get_model_from_combiner(str(model_id)) if model is None: logger.error("Could not retrieve model from combiner. Aborting prediction request.") return inpath = self.helper.get_tmp_path() with open(inpath, "wb") as fh: fh.write(model.getbuffer()) outpath = get_tmp_path() self.dispatcher.run_cmd(f"predict {inpath} {outpath}") # Upload the prediction result to the presigned URL with open(outpath, "rb") as fh: response = requests.put(presigned_url, data=fh.read()) os.unlink(inpath) os.unlink(outpath) if response.status_code != 200: logger.warning("Prediction upload failed with status code {}".format(response.status_code)) self.state = ClientState.idle return except Exception as e: logger.warning("Prediction failed with exception {}".format(e)) self.state = ClientState.idle return self.state = ClientState.idle return
[docs] def process_request(self): """Process training and validation tasks.""" while True: if not self._connected: return try: (task_type, request) = self.inbox.get(timeout=1.0) if task_type == "train": tic = time.time() self.state = ClientState.training client_settings = json.loads(request.data).get("client_settings", {}) model_id, meta = self._process_training_request(request.model_id, session_id=request.session_id, client_settings=client_settings) if meta is not None: processing_time = time.time() - tic meta["processing_time"] = processing_time meta["config"] = request.data if model_id is not None: # Send model update to combiner update = fedn.ModelUpdate() update.sender.name = self.name update.sender.client_id = self.id update.sender.role = fedn.CLIENT update.receiver.name = request.sender.name update.receiver.role = request.sender.role update.model_id = request.model_id update.model_update_id = str(model_id) update.timestamp = str(datetime.now()) update.correlation_id = request.correlation_id update.meta = json.dumps(meta) try: _ = self.combinerStub.SendModelUpdate(update, metadata=self.metadata) self.send_status( "Model update completed.", log_level=fedn.LogLevel.AUDIT, type=fedn.StatusType.MODEL_UPDATE, request=update, sesssion_id=request.session_id, ) except grpc.RpcError as e: status_code = e.code() logger.error("GRPC error, {}.".format(status_code.name)) logger.debug(e) except ValueError as e: logger.error("GRPC error, RPC channel closed. {}".format(e)) logger.debug(e) else: self.send_status( "Client {} failed to complete model update.", log_level=fedn.LogLevel.WARNING, request=request, sesssion_id=request.session_id ) self.state = ClientState.idle self.inbox.task_done() elif task_type == "validate": self.state = ClientState.validating metrics = self._process_validation_request(request.model_id, request.session_id) if metrics is not None: # Send validation validation = fedn.ModelValidation() validation.sender.name = self.name validation.sender.role = fedn.CLIENT validation.receiver.name = request.sender.name validation.receiver.role = request.sender.role validation.model_id = str(request.model_id) validation.data = json.dumps(metrics) validation.timestamp.GetCurrentTime() validation.correlation_id = request.correlation_id validation.session_id = request.session_id try: _ = self.combinerStub.SendModelValidation(validation, metadata=self.metadata) status_type = fedn.StatusType.MODEL_VALIDATION self.send_status( "Model validation completed.", log_level=fedn.LogLevel.AUDIT, type=status_type, request=validation, sesssion_id=request.session_id, ) except grpc.RpcError as e: status_code = e.code() logger.error("GRPC error, {}.".format(status_code.name)) logger.debug(e) except ValueError as e: logger.error("GRPC error, RPC channel closed. {}".format(e)) logger.debug(e) else: self.send_status( "Client {} failed to complete model validation.".format(self.name), log_level=fedn.LogLevel.WARNING, request=request, sesssion_id=request.session_id, ) self.state = ClientState.idle self.inbox.task_done() elif task_type == "predict": self.state = ClientState.predicting try: presigned_url = json.loads(request.data) except json.JSONDecodeError as e: logger.error(f"Failed to decode prediction request data: {e}") self.state = ClientState.idle continue if "presigned_url" not in presigned_url: logger.error("Prediction request missing presigned_url.") self.state = ClientState.idle continue presigned_url = presigned_url["presigned_url"] # Obs that session_id in request is the prediction_id _ = self._process_prediction_request(request.model_id, request.session_id, presigned_url) prediction = fedn.ModelPrediction() prediction.sender.name = self.name prediction.sender.role = fedn.CLIENT prediction.receiver.name = request.sender.name prediction.receiver.name = request.sender.name prediction.receiver.role = request.sender.role prediction.model_id = str(request.model_id) # TODO: Add prediction data prediction.data = "" prediction.timestamp.GetCurrentTime() prediction.correlation_id = request.correlation_id # Obs that session_id in request is the prediction_id prediction.prediction_id = request.session_id try: _ = self.combinerStub.SendModelPrediction(prediction, metadata=self.metadata) status_type = fedn.StatusType.MODEL_PREDICTION self.send_status( "Model prediction completed.", log_level=fedn.LogLevel.AUDIT, type=status_type, request=prediction, sesssion_id=request.session_id ) except grpc.RpcError as e: status_code = e.code() logger.error("GRPC error, {}.".format(status_code.name)) logger.debug(e) self.state = ClientState.idle except queue.Empty: pass except grpc.RpcError as e: logger.critical(f"GRPC process_request: An error occurred during process request: {e}")
def _send_heartbeat(self, update_frequency=2.0): """Send a heartbeat to the combiner. :param update_frequency: The frequency of the heartbeat in seconds. :type update_frequency: float :return: None if the client is disconnected. :rtype: None """ while True: heartbeat = fedn.Heartbeat(sender=fedn.Client(name=self.name, role=fedn.CLIENT, client_id=self.id)) try: self.connectorStub.SendHeartbeat(heartbeat, metadata=self.metadata) if self._missed_heartbeat > 0: logger.info("GRPC heartbeat: combiner available again after {} missed heartbeats.".format(self._missed_heartbeat)) self._missed_heartbeat = 0 except grpc.RpcError as e: status_code = e.code() if status_code == grpc.StatusCode.UNAVAILABLE: self._missed_heartbeat += 1 logger.error( "GRPC hearbeat: combiner unavailable, retrying (attempt {}/{}).".format( self._missed_heartbeat, self.config["reconnect_after_missed_heartbeat"] ) ) if self._missed_heartbeat > self.config["reconnect_after_missed_heartbeat"]: self.disconnect() self._missed_heartbeat = 0 if status_code == grpc.StatusCode.UNAUTHENTICATED: details = e.details() if details == "Token expired": logger.error("GRPC hearbeat: Token expired. Disconnecting.") self.disconnect() sys.exit("Unauthorized. Token expired. Please obtain a new token.") logger.debug(e) time.sleep(update_frequency) if not self._connected: logger.info("SendStatus: Client disconnected.") return
[docs] def send_status(self, msg, log_level=fedn.LogLevel.INFO, type=None, request=None, sesssion_id: str = None): """Send status message. :param msg: The message to send. :type msg: str :param log_level: The log level of the message. :type log_level: fedn.LogLevel.INFO, fedn.LogLevel.WARNING, fedn.LogLevel.ERROR :param type: The type of the message. :type type: str :param request: The request message. :type request: fedn.Request """ if not self._connected: logger.info("SendStatus: Client disconnected.") return status = fedn.Status() status.timestamp.GetCurrentTime() status.sender.name = self.name status.sender.role = fedn.CLIENT status.log_level = log_level status.status = str(msg) status.session_id = sesssion_id if type is not None: status.type = type if request is not None: status.data = MessageToJson(request) self.logs.append("{} {} LOG LEVEL {} MESSAGE {}".format(str(datetime.now()), status.sender.name, status.log_level, status.status)) try: _ = self.connectorStub.SendStatus(status, metadata=self.metadata) except grpc.RpcError as e: status_code = e.code() if status_code == grpc.StatusCode.UNAVAILABLE: logger.warning("GRPC SendStatus: server unavailable during send status.") if status_code == grpc.StatusCode.UNAUTHENTICATED: details = e.details() if details == "Token expired": logger.warning("GRPC SendStatus: Token expired.")
[docs] def run(self): """Run the client.""" try: cnt = 0 old_state = self.state while True: time.sleep(1) if cnt == 0: logger.info("Client is active, waiting for model update requests.") cnt = 1 if self.state != old_state: logger.info("Client in {} state.".format(ClientStateToString(self.state))) if not self._connected: logger.warning("Client lost connection to combiner. Attempting to reconnect to FEDn network.") combiner_config = self.assign() self.connect(combiner_config) self._subscribe_to_combiner(self.config) cnt = 0 if self.error_state: logger.error("Client in error state. Terminiating.") sys.exit("Client in error state. Terminiating.") except KeyboardInterrupt: logger.info("Shutting down.")