Server Functions & Aggregators

The Federated Engine is designed to be flexible. While Scaleout Edge comes with industry-standard algorithms out of the box, advanced use cases often require custom orchestration logic—whether it’s complex client selection strategies, dynamic hyperparameter tuning, or domain-specific aggregation formulas.

This guide explains how to configure built-in aggregators and how to extend the Control Plane using Server Functions.

1. Built-in Aggregators

For 90% of use cases, the built-in aggregation algorithms are sufficient. These are optimized for performance and stability within the Scaleout architecture.

Supported Algorithms

  • fedavg (Default): Federated Averaging. Weights are averaged based on the number of examples contributed by each client.

  • fedopt Family: Adaptive optimization algorithms that treat the server-side model update as a gradient step. Includes:

    • FedAdam

    • FedYogi

    • FedAdaGrad

Configuration

You configure the aggregator when starting a session via the API or CLI.

# Example: Starting a session with FedAdam
session_config = {
    "name": "adaptive-optimization-run",
    "rounds": 20,
    "aggregator": "fedopt",
    "aggregator_kwargs": {
        "serveropt": "adam",
        "learning_rate": 0.01,
        "beta1": 0.9,
        "beta2": 0.99,
        "tau": 1e-3
    }
}

client.start_session(**session_config)

Note

The fedopt algorithms maintain server-side momentum. This state is reset between sessions but persists across rounds within a session.

2. Custom Logic: Server Functions

When built-in algorithms aren’t enough, you can implement a Server Functions class. This allows you to inject custom Python logic directly into the Combiner (Tier 2) execution loop.

The ServerFunctions class allows you to override three critical phases of a round:

  • client_selection: Which clients should participate?

  • client_settings: What configuration (e.g., learning rate) should they receive?

  • aggregate: How should their updates be mathematically combined?

Requirements

  • Class Name: Must be exactly ServerFunctions.

  • Security: The class runs in a sandboxed environment on the server. Only a whitelist of safe libraries (numpy, random, math, etc.) is available.

Example Implementation

Below is an example of a custom strategy that:

  1. Selects a random 50% subset of clients.

  2. Decays the learning rate every 10 rounds.

  3. Implements a custom weighted average aggregation.

from scaleoututil.serverfunctions.allowed_import import (
    Dict,
    List,
    RoundType,
    ServerFunctionsBase,
    Tuple,
    np,
    random,
)

class ServerFunctions(ServerFunctionsBase):
    def __init__(self) -> None:
        self.round_index = 0
        self.initial_lr = 0.1

    def client_selection(self, client_ids: List[str], round_type: RoundType) -> List[str]:
        """
        Select a random 50% of available clients for this round.
        """
        n_select = max(1, len(client_ids) // 2)
        return random.sample(client_ids, n_select)

    def client_settings(self, global_model: List[np.ndarray]) -> Dict:
        """
        Dynamic Hyperparameters: Decay learning rate every 10 rounds.
        """
        self.round_index += 1
        decay = 0.1 if (self.round_index % 10 == 0) else 1.0
        self.initial_lr *= decay

        # These settings are passed to the client's 'train' entry point
        return {
            "learning_rate": self.initial_lr,
            "batch_size": 32
        }

    def aggregate(
        self,
        previous_global_model: List[np.ndarray],
        client_updates: Dict[str, Tuple[List[np.ndarray], Dict]],
    ) -> List[np.ndarray]:
        """
        Custom Aggregation: Weighted average based on 'num_examples'.
        """
        # Initialize accumulator with zeros
        weighted_sum = [np.zeros_like(w) for w in previous_global_model]
        total_weight = 0.0

        for client_id, (update, metadata) in client_updates.items():
            # 'metadata' comes from the client's training result JSON
            weight = metadata.get("num_examples", 1.0)
            total_weight += weight

            for i, layer_weights in enumerate(update):
                weighted_sum[i] += layer_weights * weight

        # Normalize
        if total_weight == 0:
            return previous_global_model

        new_global_model = [w / total_weight for w in weighted_sum]
        return new_global_model

3. Deploying Server Functions

To use your custom logic, you do not need to rebuild the server. You upload the definition script to the Control Plane.

Via API

For instructions how to connect to and use the api client, see Using the API Client.

After connecting to Scaleout Edge, start a session with your server functions implementation.

from server_functions import ServerFunctions
# Start a session with the class definition of your server functions.
client.start_session(server_functions=ServerFunctions)

4. Monitoring and advanced options

Logs from your ServerFunctions implementation can be viewed on the Scaleout dashboard under the “Events” section.

For a complete example on how to use advanced options such as utilizing the api_client and client attributes for client selection, options to do memory-safe aggregation and more, see the server functions example under in the Scaleout client repository.

Differential Privacy via Server Functions

You can implement Differential Privacy (DP) mechanisms by customizing the aggregate method in your ServerFunctions class. For example, you might add Gaussian noise to the aggregated model weights to ensure privacy guarantees.