"""Portions of this code are derived from the Apache 2.0 licensed project mlflow (https://mlflow.org/).,
with modifications made by Scaleout Systems AB.
Copyright (c) 2018 Databricks, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import shutil
import sys
import tempfile
import uuid
from contextlib import contextmanager
from pathlib import Path
import yaml
from fedn.common.log_config import logger
from fedn.utils import PYTHON_VERSION
from fedn.utils.environment import _PythonEnv
from fedn.utils.process import _exec_cmd, _join_commands
_IS_UNIX = os.name != "nt"
[docs]
@contextmanager
def remove_on_error(path: os.PathLike, onerror=None):
"""A context manager that removes a file or directory if an exception is raised during
execution.
"""
try:
yield
except Exception as e:
if onerror:
onerror(e)
if os.path.exists(path):
if os.path.isfile(path):
os.remove(path)
elif os.path.isdir(path):
shutil.rmtree(path)
raise
def _install_python(version, pyenv_root=None, capture_output=False):
"""Installs a specified version of python with pyenv and returns a path to the installed python
binary.
"""
raise NotImplementedError("This function is not implemented yet.")
def _is_virtualenv_available():
"""Returns True if virtualenv is available, otherwise False.
"""
return shutil.which("virtualenv") is not None
def _validate_virtualenv_is_available():
"""Validates virtualenv is available. If not, throws an `Exception` with a brief instruction
on how to install virtualenv.
"""
if not _is_virtualenv_available():
raise Exception("Could not find the virtualenv binary. Run `pip install virtualenv` to install " "virtualenv.")
def _get_virtualenv_extra_env_vars(env_root_dir=None):
extra_env = {
# PIP_NO_INPUT=1 makes pip run in non-interactive mode,
# otherwise pip might prompt "yes or no" and ask stdin input
"PIP_NO_INPUT": "1",
}
return extra_env
def _get_python_env(python_env_file):
"""Parses a python environment file and returns a dictionary with the parsed content.
"""
if os.path.exists(python_env_file):
return _PythonEnv.from_yaml(python_env_file)
def _create_virtualenv(python_bin_path, env_dir, python_env, extra_env=None, capture_output=False):
# Created a command to activate the environment
paths = ("bin", "activate") if _IS_UNIX else ("Scripts", "activate.bat")
activate_cmd = env_dir.joinpath(*paths)
activate_cmd = f"source {activate_cmd}" if _IS_UNIX else str(activate_cmd)
if env_dir.exists():
logger.info("Environment %s already exists", env_dir)
return activate_cmd
with remove_on_error(
env_dir,
onerror=lambda e: logger.warning(
"Encountered an unexpected error: %s while creating a virtualenv environment in %s, " "removing the environment directory...",
repr(e),
env_dir,
),
):
logger.info("Creating a new environment in %s with %s", env_dir, python_bin_path)
_exec_cmd(
[sys.executable, "-m", "virtualenv", "--python", python_bin_path, env_dir],
capture_output=capture_output,
)
logger.info("Installing dependencies")
for deps in filter(None, [python_env.build_dependencies, python_env.dependencies]):
with tempfile.TemporaryDirectory() as tmpdir:
tmp_req_file = f"requirements.{uuid.uuid4().hex}.txt"
Path(tmpdir).joinpath(tmp_req_file).write_text("\n".join(deps))
cmd = _join_commands(activate_cmd, f"python -m pip install -r {tmp_req_file}")
_exec_cmd(cmd, capture_output=capture_output, cwd=tmpdir, extra_env=extra_env)
return activate_cmd
def _read_yaml_file(file_path):
try:
cfg = None
with open(file_path, "rb") as config_file:
cfg = yaml.safe_load(config_file.read())
except Exception as e:
logger.error(f"Error trying to read yaml file: {file_path}")
raise e
return cfg
[docs]
class Dispatcher:
"""Dispatcher class for compute packages.
:param config: The configuration.
:type config: dict
:param dir: The directory to dispatch to.
:type dir: str
"""
def __init__(self, config, project_dir):
"""Initialize the dispatcher."""
self.config = config
self.project_dir = project_dir
self.activate_cmd = ""
self.python_env_path = ""
def _get_or_create_python_env(self, capture_output=False, pip_requirements_override=None):
python_env = self.config.get("python_env", "")
if not python_env:
logger.info("No python_env specified in the configuration, using the system Python.")
return python_env
else:
python_env_path = os.path.join(self.project_dir, python_env)
if not os.path.exists(python_env_path):
raise Exception("Compute package specified python_env file %s, but no such " "file was found." % python_env_path)
python_env = _get_python_env(python_env_path)
extra_env = _get_virtualenv_extra_env_vars()
env_dir = Path(self.project_dir) / Path(python_env.name)
self.python_env_path = env_dir
try:
python_bin_path = _install_python(python_env.python, capture_output=True)
except NotImplementedError:
logger.warning("Failed to install Python: %s", python_env.python)
logger.warning("Python version installation is not implemented yet.")
logger.info(f"Using the system Python version: {PYTHON_VERSION}")
python_bin_path = Path(sys.executable)
try:
activate_cmd = _create_virtualenv(
python_bin_path,
env_dir,
python_env,
extra_env=extra_env,
capture_output=capture_output,
)
# Install additional dependencies specified by `requirements_override`
if pip_requirements_override:
logger.info("Installing additional dependencies specified by " f"pip_requirements_override: {pip_requirements_override}")
cmd = _join_commands(
activate_cmd,
f"python -m pip install --quiet -U {' '.join(pip_requirements_override)}",
)
_exec_cmd(cmd, capture_output=capture_output, extra_env=extra_env)
self.activate_cmd = activate_cmd
return activate_cmd
except Exception:
logger.critical("Encountered unexpected error while creating %s", env_dir)
if env_dir.exists():
logger.warning("Attempting to remove %s", env_dir)
shutil.rmtree(env_dir, ignore_errors=True)
msg = "Failed to remove %s" if env_dir.exists() else "Successfully removed %s"
logger.warning(msg, env_dir)
raise
[docs]
def run_cmd(self, cmd_type, capture_output=False, extra_env=None, synchronous=True, stream_output=False):
"""Run a command.
:param cmd_type: The command type.
:type cmd_type: str
:return:
"""
try:
cmdsandargs = cmd_type.split(" ")
entry_point = self.config["entry_points"][cmdsandargs[0]]["command"]
# remove the first element, that is not a file but a command
args = cmdsandargs[1:]
# Join entry point and arguments into a single command as a string
entry_point_args = " ".join(args)
entry_point = f"{entry_point} {entry_point_args}"
if self.activate_cmd:
cmd = _join_commands(self.activate_cmd, entry_point)
else:
cmd = _join_commands(entry_point)
logger.info("Running command: {}".format(cmd))
_exec_cmd(
cmd,
cwd=self.project_dir,
throw_on_error=True,
extra_env=extra_env,
capture_output=capture_output,
synchronous=synchronous,
stream_output=stream_output,
)
logger.info("Done executing {}".format(cmd_type))
except IndexError:
message = "No such argument or configuration to run."
logger.error(message)