Server Functions & Aggregators
The Federated Engine is designed to be flexible. While Scaleout Edge comes with industry-standard algorithms out of the box, advanced use cases often require custom orchestration logic—whether it’s complex client selection strategies, dynamic hyperparameter tuning, or domain-specific aggregation formulas.
This guide explains how to configure built-in aggregators and how to extend the Control Plane using Server Functions.
1. Built-in Aggregators
For 90% of use cases, the built-in aggregation algorithms are sufficient. These are optimized for performance and stability within the Scaleout architecture.
Supported Algorithms
fedavg (Default): Federated Averaging. Weights are averaged based on the number of examples contributed by each client.
fedopt Family: Adaptive optimization algorithms that treat the server-side model update as a gradient step. Includes:
FedAdam
FedYogi
FedAdaGrad
Configuration
You configure the aggregator when starting a session via the API or CLI.
# Example: Starting a session with FedAdam
session_config = {
"name": "adaptive-optimization-run",
"rounds": 20,
"aggregator": "fedopt",
"aggregator_kwargs": {
"serveropt": "adam",
"learning_rate": 0.01,
"beta1": 0.9,
"beta2": 0.99,
"tau": 1e-3
}
}
client.start_session(**session_config)
Note
The fedopt algorithms maintain server-side momentum. This state is reset between sessions but persists across rounds within a session.
2. Custom Logic: Server Functions
When built-in algorithms aren’t enough, you can implement a Server Functions class. This allows you to inject custom Python logic directly into the Combiner (Tier 2) execution loop.
The ServerFunctions class allows you to override three critical phases of a round:
client_selection: Which clients should participate?
client_settings: What configuration (e.g., learning rate) should they receive?
aggregate: How should their updates be mathematically combined?
Requirements
Class Name: Must be exactly
ServerFunctions.Security: The class runs in a sandboxed environment on the server. Only a whitelist of safe libraries (numpy, random, math, etc.) is available.
Example Implementation
Below is an example of a custom strategy that:
Selects a random 50% subset of clients.
Decays the learning rate every 10 rounds.
Implements a custom weighted average aggregation.
from scaleoututil.serverfunctions.allowed_import import (
Dict,
List,
RoundType,
ServerFunctionsBase,
Tuple,
np,
random,
)
class ServerFunctions(ServerFunctionsBase):
def __init__(self) -> None:
self.round_index = 0
self.initial_lr = 0.1
def client_selection(self, client_ids: List[str], round_type: RoundType) -> List[str]:
"""
Select a random 50% of available clients for this round.
"""
n_select = max(1, len(client_ids) // 2)
return random.sample(client_ids, n_select)
def client_settings(self, global_model: List[np.ndarray]) -> Dict:
"""
Dynamic Hyperparameters: Decay learning rate every 10 rounds.
"""
self.round_index += 1
decay = 0.1 if (self.round_index % 10 == 0) else 1.0
self.initial_lr *= decay
# These settings are passed to the client's 'train' entry point
return {
"learning_rate": self.initial_lr,
"batch_size": 32
}
def aggregate(
self,
previous_global_model: List[np.ndarray],
client_updates: Dict[str, Tuple[List[np.ndarray], Dict]],
) -> List[np.ndarray]:
"""
Custom Aggregation: Weighted average based on 'num_examples'.
"""
# Initialize accumulator with zeros
weighted_sum = [np.zeros_like(w) for w in previous_global_model]
total_weight = 0.0
for client_id, (update, metadata) in client_updates.items():
# 'metadata' comes from the client's training result JSON
weight = metadata.get("num_examples", 1.0)
total_weight += weight
for i, layer_weights in enumerate(update):
weighted_sum[i] += layer_weights * weight
# Normalize
if total_weight == 0:
return previous_global_model
new_global_model = [w / total_weight for w in weighted_sum]
return new_global_model
3. Deploying Server Functions
To use your custom logic, you do not need to rebuild the server. You upload the definition script to the Control Plane.
Via API
For instructions how to connect to and use the api client, see Using the API Client.
After connecting to Scaleout Edge, start a session with your server functions implementation.
from server_functions import ServerFunctions
# Start a session with the class definition of your server functions.
client.start_session(server_functions=ServerFunctions)
4. Monitoring and advanced options
Logs from your ServerFunctions implementation can be viewed on the Scaleout dashboard under the “Events” section.
For a complete example on how to use advanced options such as utilizing the api_client and client attributes for client selection, options to do memory-safe aggregation and more, see the server functions example under in the Scaleout client repository.
Differential Privacy via Server Functions
You can implement Differential Privacy (DP) mechanisms by customizing the aggregate method in your ServerFunctions class. For example, you might add Gaussian noise to the aggregated model weights to ensure privacy guarantees.