import io
import json
import os
import shutil
import tempfile
import uuid
import warnings
import zipfile
from contextlib import contextmanager
from typing import BinaryIO, Iterable, Optional
import scaleoututil.grpc.scaleout_pb2 as scaleout_msg
from scaleoututil.utils.checksum import compute_checksum_from_stream
from scaleoututil.utils.signing import compute_model_signature, verify_model_signature
from scaleoututil.helpers.helpers import get_helper
from scaleoututil.helpers.plugins.numpyhelper import Helper
CHUNK_SIZE = 1 * 1024 * 1024 # 1 MB chunk size for reading/writing files
_ZIP_MAGIC = b"PK"
_FORMAT_VERSION = 2
_TRAINING_MODEL_ENTRY = "training_model.bin"
_METADATA_ENTRY = "metadata.json"
_INFERENCE_MODEL_ENTRY = "inference_model.bin"
def _read_metadata_from_zip(zip_path: str) -> dict:
"""Read and return metadata.json from a ZIP file, stripping the version key."""
with zipfile.ZipFile(zip_path, "r") as zf:
if _METADATA_ENTRY in zf.namelist():
meta = json.loads(zf.read(_METADATA_ENTRY).decode("utf-8"))
meta.pop("version", None)
return meta
return {}
[docs]
class ScaleoutModel:
"""The ScaleoutModel class is the primary model representation in the Scaleout framework.
A ScaleoutModel is a self-describing, immutable container holding:
- The serialized model weights (raw bytes, via a helper)
- An optional full model representation (e.g. ONNX, .pt, …)
- A metadata dict (eagerly loaded): model_id, helper, inference_model_format, and any user-supplied key-value pairs
Models are always immutable after construction. To create a modified copy, use
:meth:`to_builder` to obtain a pre-seeded :class:`ScaleoutModelBuilder`::
new_model = model.to_builder().set_model_id("m-002").build()
new_model = model.to_builder().set_metadata("tag", "v2").build()
API
---
Reading metadata::
model.metadata # shallow copy of the metadata dict
model.model_id # convenience property
Model parameters::
with model.get_training_model_stream() as s: # read directly from ZIP, no temp file
data = s.read()
params = model.get_training_model(helper) # decodes from ZIP
Full model representation::
model.has_inference_model # bool
model.get_inference_model_format() # format string or None
with model.get_inference_model_stream() as s: # stream inference_model.bin directly from ZIP
data = s.read()
On-disk format
--------------
Files are ZIP archives (stdlib zipfile, DEFLATED) containing:
metadata.json – eagerly loaded on open
training_model.bin – serialized weights
inference_model.bin – optional full model representation
Legacy raw-binary streams (no ZIP header, e.g. bare NPZ) are detected automatically
and loaded with empty metadata for backward compatibility.
Storage
-------
A single ZIP file (``_zip_path``) is the canonical representation from construction
time onwards. The model always owns its ZIP and deletes it on garbage collection.
``get_training_model_stream()`` and ``get_inference_model_stream()`` stream entries directly
from the ZIP without extracting to a temp file.
"""
[docs]
def __init__(self):
raise TypeError(
"ScaleoutModel cannot be instantiated directly. Use a factory method: from_training_model(), "
"from_file(), from_stream(), or from_filechunk_stream() or use ScaleoutModelBuilder for more control."
)
@classmethod
def _create(cls) -> "ScaleoutModel":
"""Internal factory — bypasses __init__ to construct a blank instance."""
obj = object.__new__(cls)
# ZIP file — always the canonical representation; always owned
obj._zip_path = None
obj._zip_is_owned = False
# Open file handle held for the model's lifetime so the OS knows the file is in use
obj._file_handle = None
# Metadata dict – always in memory, eagerly available
obj._metadata = {}
obj._helper = None
obj._checksum = None
obj._legacy_source = False # True if loaded from legacy raw binary (no ZIP)
return obj
def __del__(self):
self._cleanup()
def __repr__(self) -> str:
mid = self._metadata.get("model_id", "?")
helper = self._metadata.get("helper_type", "?")
return f"<ScaleoutModel model_id={mid} helper={helper}>"
def __enter__(self):
return self
def __exit__(self, *_):
self._cleanup()
def _cleanup(self):
fh = getattr(self, "_file_handle", None)
if fh is not None:
try:
fh.close()
except OSError:
pass
self._file_handle = None
zip_path = getattr(self, "_zip_path", None)
if zip_path and getattr(self, "_zip_is_owned", False):
try:
os.unlink(zip_path)
except OSError:
pass
# ------------------------------------------------------------------
# Convenience property
# ------------------------------------------------------------------
@property
def model_id(self) -> Optional[str]:
return self._metadata.get("model_id")
@property
def legacy_source(self) -> bool:
"""True if this model was loaded from a legacy raw binary (no ZIP)."""
return self._legacy_source
# ------------------------------------------------------------------
# Metadata management
# ------------------------------------------------------------------
@property
def metadata(self) -> dict:
"""Returns a shallow copy of the metadata dict."""
return dict(self._metadata)
# ------------------------------------------------------------------
# Model parameters
# ------------------------------------------------------------------
@property
def has_training_model(self) -> bool:
"""True if the model contains serialized parameters."""
if self._zip_path is not None:
try:
with zipfile.ZipFile(self._zip_path, "r") as zf:
return _TRAINING_MODEL_ENTRY in zf.namelist()
except Exception:
pass
return False
[docs]
@contextmanager
def get_training_model_stream(self):
"""Context manager yielding a read stream of the raw model binary.
Reads directly from the ZIP entry — no temp-file extraction.
Usage::
with model.get_training_model_stream() as stream:
data = stream.read()
"""
with zipfile.ZipFile(self._zip_path, "r") as zf, zf.open(_TRAINING_MODEL_ENTRY) as f:
yield f
[docs]
def get_training_model(self, helper=None):
"""Decodes and returns the model parameters via the helper."""
self._helper = helper or self._helper
if self._helper is None:
raise ValueError("No helper provided to unpack model parameters.")
with self.get_training_model_stream() as s:
return self._helper.load(s)
[docs]
def get_model_params(self, helper=None):
"""Alias for get_training_model() for backward compatibility."""
warnings.warn("`get_model_params` is deprecated. Use `get_training_model` instead.", DeprecationWarning, stacklevel=2)
return self.get_training_model(helper)
# ------------------------------------------------------------------
# Full model representation
# ------------------------------------------------------------------
@property
def has_inference_model(self) -> bool:
"""True if the model contains a full model representation."""
if self._zip_path is not None:
try:
with zipfile.ZipFile(self._zip_path, "r") as zf:
return _INFERENCE_MODEL_ENTRY in zf.namelist()
except Exception:
pass
return False
[docs]
@contextmanager
def get_inference_model_stream(self):
"""Context manager yielding a read stream of the full model representation binary.
Reads directly from the ZIP entry — no temp-file extraction.
Usage::
if model.has_inference_model:
with model.get_inference_model_stream() as stream:
data = stream.read()
"""
with zipfile.ZipFile(self._zip_path, "r") as zf, zf.open(_INFERENCE_MODEL_ENTRY) as f:
yield f
# ------------------------------------------------------------------
# Checksum
# ------------------------------------------------------------------
@property
def checksum(self) -> str:
if self._checksum is None:
with self.get_file_stream() as s:
self._checksum = compute_checksum_from_stream(s)
return self._checksum
[docs]
def verify_checksum(self, checksum: str) -> bool:
return checksum is None or self.checksum == checksum
# ------------------------------------------------------------------
# Signature
# ------------------------------------------------------------------
[docs]
def sign(self, private_key, signer_id: Optional[str] = None) -> dict:
"""Compute and return a signature dict for this model.
Does not modify the ZIP. POST the returned dict to
``/api/v1/model-signatures/`` to persist it.
Args:
private_key: An ``Ed25519PrivateKey`` from the ``cryptography`` package.
signer_id: Optional free-form string identifying the signer.
Returns:
A dict with keys ``model_id``, ``algorithm``, ``signature``
(base64-encoded), and optionally ``signer_id``.
"""
sig = compute_model_signature(self._zip_path, private_key, signer_id)
sig["model_id"] = self.model_id
return sig
[docs]
def verify_signature(self, public_key, signature: str) -> bool:
"""Return ``True`` if *signature* is valid for this model.
Args:
public_key: An ``Ed25519PublicKey`` from the ``cryptography`` package.
signature: Base64-encoded signature string — as returned by
``GET /api/v1/model-signatures/<id>`` ``signature`` field.
"""
return verify_model_signature(self._zip_path, public_key, signature)
# ------------------------------------------------------------------
# Builder
# ------------------------------------------------------------------
[docs]
def to_builder(self) -> "ScaleoutModelBuilder":
"""Return a factory pre-loaded from this model's current state.
Use this to create a modified copy without mutating the original::
new_model = model.to_builder().set_model_id("m-002").build()
new_model = model.to_builder().set_metadata("tag", "v2").build()
"""
# Copy to a temp file so the builder owns its own working copy and the
# original model's file is not affected by the builder's lifecycle.
f = ScaleoutModelBuilder()
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
tmp.close()
shutil.copy2(self._zip_path, tmp.name)
f._loaded_zip_path = tmp.name
f._loaded_zip_owned = True
f._loaded_metadata = dict(self._metadata)
f._legacy_source = self._legacy_source
return f
# ------------------------------------------------------------------
# ZIP format detection
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# Factory methods (backward-compatible wrappers — delegate to ScaleoutModelBuilder)
# ------------------------------------------------------------------
[docs]
@staticmethod
def from_training_model(
model_params,
helper=None,
metadata: Optional[dict] = None,
) -> "ScaleoutModel":
"""Creates a ScaleoutModel from model parameters.
Delegates to :class:`ScaleoutModelBuilder`. A fresh ``model_id`` is always
generated; any ``model_id`` key present in ``metadata`` is ignored.
Args:
model_params: Parameters accepted by the helper's ``save()`` method.
helper: Serialization helper. Falls back to NumpyHelper if not provided.
metadata: Optional initial metadata dict.
"""
f = ScaleoutModelBuilder.from_training_model(model_params, helper)
if metadata:
f.set_metadata_dict(metadata)
return f.build()
[docs]
@staticmethod
def from_file(file_path: str) -> "ScaleoutModel":
"""Creates a ScaleoutModel from a file (ZIP or legacy raw binary).
Delegates to :class:`ScaleoutModelBuilder`.
"""
return ScaleoutModelBuilder.from_file(file_path).build()
[docs]
@staticmethod
def from_stream(stream: BinaryIO) -> "ScaleoutModel":
"""Creates a ScaleoutModel from a stream (ZIP or legacy raw binary).
Delegates to :class:`ScaleoutModelBuilder`.
"""
return ScaleoutModelBuilder.from_stream(stream).build()
[docs]
@staticmethod
def from_filechunk_stream(
filechunk_stream: Iterable[scaleout_msg.FileChunk],
) -> "ScaleoutModel":
"""Creates a ScaleoutModel from a gRPC FileChunk iterator.
Delegates to :class:`ScaleoutModelBuilder`.
"""
return ScaleoutModelBuilder.from_filechunk_stream(filechunk_stream).build()
# ------------------------------------------------------------------
# Persistence
# ------------------------------------------------------------------
[docs]
def save_to_file(self, file_path: str):
"""Saves the model to a file in ZIP format."""
if os.path.abspath(self._zip_path) == os.path.abspath(file_path):
return
shutil.copy2(self._zip_path, file_path)
# Close the handle to the old path before potentially unlinking it
if self._file_handle is not None:
try:
self._file_handle.close()
except OSError:
pass
self._file_handle = None
# Drop old temp if we owned it, then reference the saved file
if self._zip_is_owned:
try:
os.unlink(self._zip_path)
except OSError:
pass
self._zip_path = file_path
self._zip_is_owned = False
self._file_handle = open(file_path, "rb")
[docs]
def get_file_stream(self):
"""Returns a new read handle to the packed ZIP file."""
return open(self._zip_path, "rb")
[docs]
def get_filechunk_stream(self, chunk_size=CHUNK_SIZE):
"""Yields gRPC FileChunk messages of the raw model bytes."""
with open(self._zip_path, "rb") as stream:
while chunk := stream.read(chunk_size):
yield scaleout_msg.FileChunk(data=chunk)
class ScaleoutModelBuilder:
"""Builder for :class:`ScaleoutModel` instances.
Initialise with a classmethod, configure with ``set_*`` calls, then call
:meth:`build` to produce the :class:`ScaleoutModel`.
Examples::
# New model from parameters
model = (
ScaleoutModelBuilder.from_training_model(params, helper)
.set_metadata("session_id", "s1")
.set_inference_model(onnx_bytes, "onnx")
.build()
)
# Load existing model, optionally enrich metadata
model = (
ScaleoutModelBuilder.from_file("/path/model.zip")
.set_metadata("tag", "v2")
.build()
)
# Create a modified copy of an existing model
new_model = model.to_builder().set_model_id("m-002").build()
``build()`` model-id behaviour
--------------------------------
*New-model path* (``from_training_model``): a fresh ``model_id`` UUID is always
generated. Any ``model_id`` passed via ``set_metadata`` / ``set_metadata_dict``
is silently ignored.
*Load path* (``from_file``, ``from_stream``, ``from_filechunk_stream``,
``ScaleoutModel.to_builder()``): the existing ``model_id`` is preserved **unless**
``set_inference_model`` or ``set_model_id`` was called before ``build()``.
"""
def __init__(self):
self._helper = None
self._helper_type: Optional[str] = None
self._create_new: bool = False
# Load-path state
self._loaded_zip_path: Optional[str] = None
self._loaded_zip_owned: bool = False # True if we created/own the loaded ZIP temp file
self._loaded_training_model_stream = None # binary stream: set for legacy loads; mutually exclusive with _training_model
self._loaded_metadata: dict = {}
# Configuration
self._training_model = None # raw Python params; mutually exclusive with _loaded_training_model_stream
self._extra_metadata: dict = {}
self._inference_model_stream = None # SpooledTemporaryFile or None
self._inference_model_stream_fmt: Optional[str] = None
self._model_id_explicitly_set: bool = False
# Legacy source tracking (for backward compatibility with non-ZIP raw binary inputs)
self._legacy_source = False # True if loaded from legacy raw binary (no ZIP)
# ------------------------------------------------------------------
# Initialisation classmethods
# ------------------------------------------------------------------
@classmethod
def from_training_model(cls, model_params, helper=None) -> "ScaleoutModelBuilder":
"""Initialise the factory with raw model parameters (new-model path)."""
f = cls()
f._create_new = True
if helper is not None:
f._helper = helper
f._training_model = model_params
return f
@classmethod
def from_file(cls, file_path: str) -> "ScaleoutModelBuilder":
"""Initialise the factory by loading a ZIP or legacy file.
The original file is referenced directly — no copy is made. Use
:meth:`ScaleoutModel.to_builder` when you need a mutable working copy
that is independent of the source file's lifetime.
"""
f = cls()
if ScaleoutModel.detect_format_file(file_path) == "zip":
f._loaded_zip_path = os.path.abspath(file_path)
f._loaded_zip_owned = False # external file — we must not delete it
f._loaded_metadata = _read_metadata_from_zip(file_path)
else:
# Legacy: copy into a spooled temp file so the file handle can be closed immediately
buf = tempfile.SpooledTemporaryFile(max_size=10 * 1024 * 1024)
with open(file_path, "rb") as fh:
shutil.copyfileobj(fh, buf)
buf.seek(0)
return cls.from_stream(buf)
return f
@classmethod
def from_stream(cls, stream: BinaryIO) -> "ScaleoutModelBuilder":
"""Initialise the factory from a binary stream (ZIP or legacy raw binary)."""
f = cls()
if ScaleoutModel.detect_format(stream) == "zip":
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
tmp.close()
with open(tmp.name, "wb") as fh:
shutil.copyfileobj(stream, fh)
f._loaded_zip_path = tmp.name
f._loaded_zip_owned = True
f._loaded_metadata = _read_metadata_from_zip(tmp.name)
else:
# Legacy raw binary: defer wrapping until build()
f._loaded_training_model_stream = stream
f._loaded_metadata = {}
f._legacy_source = True
return f
@classmethod
def from_filechunk_stream(
cls,
filechunk_stream: Iterable[scaleout_msg.FileChunk],
) -> "ScaleoutModelBuilder":
"""Initialise the factory from a gRPC FileChunk iterator."""
buf = tempfile.SpooledTemporaryFile(max_size=10 * 1024 * 1024) # 10 MB threshold for in-memory buffering
for chunk in filechunk_stream:
if chunk.data:
buf.write(chunk.data)
buf.seek(0)
return cls.from_stream(buf)
# ------------------------------------------------------------------
# Configuration methods
# ------------------------------------------------------------------
def set_metadata(self, key: str, value) -> "ScaleoutModelBuilder":
"""Set a single metadata key. Returns ``self`` for chaining."""
self._extra_metadata[key] = value
return self
def set_metadata_dict(self, metadata: dict) -> "ScaleoutModelBuilder":
"""Merge a dict of metadata. Returns ``self`` for chaining."""
self._extra_metadata.update(metadata)
return self
def set_model_id(self, model_id: str) -> "ScaleoutModelBuilder":
"""Override the model_id. Returns ``self`` for chaining.
Suppresses auto-generation of a new ``model_id`` that would otherwise occur
when metadata, model parameters, or the full model representation change.
"""
self._extra_metadata["model_id"] = model_id
self._model_id_explicitly_set = True
return self
def set_helper_type(self, helper_type: str) -> "ScaleoutModelBuilder":
"""Set the helper by type name. Returns ``self`` for chaining."""
self._helper_type = helper_type
self._helper = None
return self
def set_helper(self, helper) -> "ScaleoutModelBuilder":
"""Set the helper instance. Returns ``self`` for chaining."""
self._helper = helper
self._helper_type = None
return self
def set_training_model(self, model_params, helper=None) -> "ScaleoutModelBuilder":
"""Override the model parameters. Returns ``self`` for chaining.
Stores the raw parameters for serialization at :meth:`build` time.
Clears any previously set ``_loaded_training_model_stream``.
Args:
model_params: Parameters accepted by the helper's ``save()`` method.
helper: Optional helper instance. Overrides any previously configured helper.
"""
if helper is not None:
self._helper = helper
self._helper_type = None
self._training_model = model_params
self._loaded_training_model_stream = None
return self
def set_inference_model(self, data: bytes, fmt: str) -> "ScaleoutModelBuilder":
"""Attach a full model representation from bytes. Returns ``self`` for chaining.
Args:
data: Raw bytes of the full model representation.
fmt: Format identifier string (e.g. ``'onnx'``, ``'pt'``).
"""
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
buf = tempfile.SpooledTemporaryFile(max_size=10 * 1024 * 1024)
buf.write(data)
buf.seek(0)
self._inference_model_stream = buf
self._inference_model_stream_fmt = fmt
return self
def set_inference_model_stream(self, stream: BinaryIO, fmt: str) -> "ScaleoutModelBuilder":
"""Attach a full model representation from a stream. Returns ``self`` for chaining.
Args:
stream: Readable binary stream of the full model representation.
fmt: Format identifier string (e.g. ``'onnx'``, ``'pt'``).
"""
buf = tempfile.SpooledTemporaryFile(max_size=10 * 1024 * 1024)
shutil.copyfileobj(stream, buf)
buf.seek(0)
self._inference_model_stream = buf
self._inference_model_stream_fmt = fmt
return self
# ------------------------------------------------------------------
# Build
# ------------------------------------------------------------------
def build(self) -> ScaleoutModel:
"""Construct and return the :class:`ScaleoutModel`."""
zip_path, zip_is_owned, metadata = self._prepare_zip()
return self._load_model(zip_path, zip_is_owned, metadata)
# ------------------------------------------------------------------
# Build step 1: prepare the ZIP file
# ------------------------------------------------------------------
def _prepare_zip(self) -> tuple:
"""Step 1: create or select the ZIP file.
Returns ``(zip_path, zip_is_owned, metadata)``.
Rebuilds a new ZIP whenever model parameters, full model, or metadata have
changed — and always assigns a fresh model_id in that case. Only when the
loaded ZIP is reused unchanged is the existing model_id preserved.
"""
# Resolve helper
if self._helper is None and self._helper_type:
self._helper = get_helper(self._helper_type)
if self._helper is None:
helper_type = self._loaded_metadata.get("helper_type")
if helper_type:
self._helper = get_helper(helper_type)
# Serialize raw model params if provided (from_training_model / set_training_model path)
training_model_stream = self._loaded_training_model_stream
if self._training_model is not None:
helper = self._helper if self._helper is not None else Helper()
self._helper = helper
buf = tempfile.SpooledTemporaryFile(max_size=10 * 1024 * 1024)
helper.save(self._training_model, buf)
buf.seek(0)
training_model_stream = buf
# Build metadata: start from loaded state, apply user overrides (model_id handled below)
metadata: dict = dict(self._loaded_metadata)
for k, v in self._extra_metadata.items():
if k != "model_id":
metadata[k] = v
if self._helper:
metadata.setdefault("helper_type", self._helper.name)
if self._inference_model_stream is not None:
metadata["inference_model_format"] = self._inference_model_stream_fmt
needs_rebuild = training_model_stream is not None or self._inference_model_stream is not None or bool(self._extra_metadata)
# Assign model_id: auto-generate when rebuilding; keep existing only when reusing the file as-is.
# set_model_id() overrides auto-generation on the load path; ignored for from_training_model.
if needs_rebuild:
if self._model_id_explicitly_set and not self._create_new:
metadata["model_id"] = self._extra_metadata["model_id"]
else:
metadata["model_id"] = str(uuid.uuid4())
if not needs_rebuild:
return self._loaded_zip_path, self._loaded_zip_owned, metadata
zip_path = ScaleoutModelBuilder._build_zip(
metadata,
training_model_stream=training_model_stream,
source_zip_path=self._loaded_zip_path,
inference_model_stream=self._inference_model_stream,
)
if self._loaded_zip_owned and self._loaded_zip_path:
try:
os.unlink(self._loaded_zip_path)
except OSError:
pass
return zip_path, True, metadata
@staticmethod
def _build_zip(
metadata: dict,
training_model_stream: Optional[BinaryIO] = None,
source_zip_path: Optional[str] = None,
inference_model_stream: Optional[BinaryIO] = None,
) -> str:
"""Build a new owned temp ZIP from components. Returns the temp file path."""
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
with zipfile.ZipFile(tmp, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
meta = dict(metadata)
meta["version"] = _FORMAT_VERSION
zf.writestr(_METADATA_ENTRY, json.dumps(meta))
if training_model_stream is not None:
with zf.open(_TRAINING_MODEL_ENTRY, "w", force_zip64=True) as dst_f:
shutil.copyfileobj(training_model_stream, dst_f)
elif source_zip_path and os.path.exists(source_zip_path):
with zipfile.ZipFile(source_zip_path, "r") as src_zf:
if _TRAINING_MODEL_ENTRY in src_zf.namelist():
with src_zf.open(_TRAINING_MODEL_ENTRY) as src_f, zf.open(_TRAINING_MODEL_ENTRY, "w", force_zip64=True) as dst_f:
shutil.copyfileobj(src_f, dst_f)
if inference_model_stream is not None:
with zf.open(_INFERENCE_MODEL_ENTRY, "w", force_zip64=True) as dst_f:
shutil.copyfileobj(inference_model_stream, dst_f)
elif source_zip_path and os.path.exists(source_zip_path):
with zipfile.ZipFile(source_zip_path, "r") as src_zf:
if _INFERENCE_MODEL_ENTRY in src_zf.namelist():
with src_zf.open(_INFERENCE_MODEL_ENTRY) as src_f, zf.open(_INFERENCE_MODEL_ENTRY, "w", force_zip64=True) as dst_f:
shutil.copyfileobj(src_f, dst_f)
tmp.close()
return tmp.name
# ------------------------------------------------------------------
# Build step 2: load the ZIP into a ScaleoutModel
# ------------------------------------------------------------------
def _load_model(self, zip_path: str, zip_is_owned: bool, metadata: dict) -> ScaleoutModel:
"""Step 2: open the ZIP and populate a :class:`ScaleoutModel` instance."""
model = ScaleoutModel._create()
model._zip_path = zip_path
model._zip_is_owned = zip_is_owned
model._file_handle = open(zip_path, "rb")
model._legacy_source = self._legacy_source
model._metadata = metadata
model._helper = self._helper # already resolved in _prepare_zip
return model