# This file contains the PackageRuntime class, which is used to download, validate and unpack compute packages.
#
#
import cgi
import os
import tarfile
from distutils.dir_util import copy_tree
import requests
import yaml
from fedn.utils.checksum import sha
from fedn.utils.dispatcher import Dispatcher
[docs]
class PackageRuntime:
""" PackageRuntime is used to download, validate and unpack compute packages.
:param package_path: path to compute package
:type package_path: str
:param package_dir: directory to unpack compute package
:type package_dir: str
"""
def __init__(self, package_path, package_dir):
self.dispatch_config = {'entry_points':
{'predict': {'command': 'python3 predict.py'},
'train': {'command': 'python3 train.py'},
'validate': {'command': 'python3 validate.py'}}}
self.pkg_path = package_path
self.pkg_name = None
self.dir = package_dir
self.checksum = None
self.expected_checksum = None
[docs]
def download(self, host, port, token, force_ssl=False, secure=False, name=None):
""" Download compute package from controller
:param host: host of controller
:param port: port of controller
:param token: token for authentication
:param name: name of package
:return: True if download was successful, None otherwise
:rtype: bool
"""
# for https we assume a an ingress handles permanent redirect (308)
if force_ssl:
scheme = "https"
else:
scheme = "http"
if port:
path = f"{scheme}://{host}:{port}/download_package"
else:
path = f"{scheme}://{host}/download_package"
if name:
path = path + "?name={}".format(name)
with requests.get(path, stream=True, verify=False, headers={'Authorization': 'Token {}'.format(token)}) as r:
if 200 <= r.status_code < 204:
params = cgi.parse_header(
r.headers.get('Content-Disposition', ''))[-1]
try:
self.pkg_name = params['filename']
except KeyError:
print("No package returned!", flush=True)
return None
r.raise_for_status()
with open(os.path.join(self.pkg_path, self.pkg_name), 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
if port:
path = f"{scheme}://{host}:{port}/get_package_checksum"
else:
path = f"{scheme}://{host}/get_package_checksum"
if name:
path = path + "?name={}".format(name)
with requests.get(path, verify=False, headers={'Authorization': 'Token {}'.format(token)}) as r:
if 200 <= r.status_code < 204:
data = r.json()
try:
self.checksum = data['checksum']
except Exception:
print("Could not extract checksum!")
return True
[docs]
def validate(self, expected_checksum):
""" Validate the package against the checksum provided by the controller
:param expected_checksum: checksum provided by the controller
:return: True if checksums match, False otherwise
:rtype: bool
"""
self.expected_checksum = expected_checksum
# crosscheck checksum and unpack if security checks are ok.
file_checksum = str(sha(os.path.join(self.pkg_path, self.pkg_name)))
if self.checksum == self.expected_checksum == file_checksum:
print("Package validated {}".format(self.checksum))
return True
else:
return False
[docs]
def unpack(self):
""" Unpack the compute package
:return: True if unpacking was successful, False otherwise
:rtype: bool
"""
if self.pkg_name:
f = None
if self.pkg_name.endswith('tar.gz'):
f = tarfile.open(os.path.join(
self.pkg_path, self.pkg_name), 'r:gz')
if self.pkg_name.endswith('.tgz'):
f = tarfile.open(os.path.join(
self.pkg_path, self.pkg_name), 'r:gz')
if self.pkg_name.endswith('tar.bz2'):
f = tarfile.open(os.path.join(
self.pkg_path, self.pkg_name), 'r:bz2')
else:
print(
"Failed to unpack compute package, no pkg_name set."
"Has the reducer been configured with a compute package?"
)
return False
os.getcwd()
try:
os.chdir(self.dir)
if f:
f.extractall()
print("Successfully extracted compute package content in {}".format(
self.dir), flush=True)
return True
except Exception:
print("Error extracting files!")
return False
[docs]
def dispatcher(self, run_path):
""" Dispatch the compute package
:param run_path: path to dispatch the compute package
:type run_path: str
:return: Dispatcher object
:rtype: :class:`fedn.utils.dispatcher.Dispatcher`
"""
from_path = os.path.join(os.getcwd(), 'client')
# preserve_times=False ensures compatibility with Gramine LibOS
copy_tree(from_path, run_path, preserve_times=False)
try:
cfg = None
with open(os.path.join(run_path, 'fedn.yaml'), 'rb') as config_file:
cfg = yaml.safe_load(config_file.read())
self.dispatch_config = cfg
except Exception:
print(
"Error trying to load and unpack dispatcher config - trying default", flush=True)
dispatcher = Dispatcher(self.dispatch_config, run_path)
return dispatcher