fedn.network.storage.statestore package

Module handling storing state in the backend database (currently MongoDB).

Submodules

fedn.network.storage.statestore.mongostatestore module

class fedn.network.storage.statestore.mongostatestore.MongoStateStore(network_id, config)[source]

Bases: object

Statestore implementation using MongoDB.

Parameters:
  • network_id (str) – The network id.

  • config (dict) – The statestore configuration.

  • defaults (dict) – The default configuration. Given by config/settings-reducer.yaml.template

connect()[source]

Establish client connection to MongoDB.

Parameters:
  • config (dict) – Dictionary containing connection strings and security credentials.

  • network_id (str) – Unique identifier for the FEDn network, used as db name

Returns:

MongoDB client pointing to the db corresponding to network_id

create_round(round_data)[source]

Create a new round.

Parameters:

round_data (dict) – Dictionary with round data.

create_session(id=None)[source]

Create a new session object.

Parameters:

id (uuid, str) – The ID of the created session.

delete_combiner(combiner)[source]

Delete a combiner from statestore.

Parameters:

combiner (str) – name of combiner to delete.

Returns:

drop_status()[source]

Drop the status collection.

get_client(name)[source]

Get client by name.

Parameters:

name (str) – name of client to get.

Returns:

The client. None if not found.

Return type:

ObjectId

get_combiner(name)[source]

Get combiner by name.

Parameters:

name (str) – name of combiner to get.

Returns:

The combiner.

Return type:

ObjectId

get_combiners(limit=None, skip=None, sort_key='updated_at', sort_order=-1, projection={})[source]

Get all combiners.

Parameters:
  • limit (int) – The maximum number of combiners to return.

  • skip (int) – The number of combiners to skip.

  • sort_key (str) – The key to sort by.

  • sort_order (pymongo.ASCENDING or pymongo.DESCENDING) – The sort order.

  • projection (dict) – The projection.

Returns:

Dictionary of combiners in result and count.

Return type:

dict

get_compute_package()[source]

Get the active compute package.

Returns:

The active compute package.

Return type:

ObjectID

get_config()[source]

Retrive the statestore config.

Returns:

The statestore config.

Return type:

dict

get_events(**kwargs)[source]

Get events from the database.

Parameters:

kwargs (dict) – query to filter events

Returns:

events matching query

Return type:

ObjectId

get_helper()[source]

Get the active helper package.

Returns:

The active helper set for the package.

Return type:

str

get_initial_model()[source]

Return model_id for the initial model in the model trail

Returns:

The initial model id. None if no model is found.

Return type:

str

get_latest_model()[source]

Return model_id for the latest model in the model_trail

Returns:

The latest model id. None if no model is found.

Return type:

str

get_latest_round()[source]

Get the id of the most recent round.

Returns:

The id of the most recent round.

Return type:

ObjectId

get_model(model_id)[source]

Get model with id.

Parameters:

model_id (str) – id of model to get

Returns:

model with id

Return type:

ObjectId

get_model_ancestors(model_id: str, limit: int)[source]

Get the model ancestors.

Parameters:
  • model_id (str) – The model id.

  • limit (int) – The maximum number of ancestors to return.

Returns:

List of model ancestors.

Return type:

list

get_model_descendants(model_id: str, limit: int)[source]

Get the model descendants.

Parameters:
  • model_id (str) – The model id.

  • limit (int) – The maximum number of descendants to return.

Returns:

List of model descendants.

Return type:

list

get_model_trail()[source]

Get the model trail.

Returns:

dictionary of model_id: committed_at

Return type:

dict

get_reducer()[source]

Get reducer.config.

return: reducer config. rtype: ObjectId

get_round(id)[source]

Get round with id.

Parameters:

id (int) – id of round to get

Returns:

round with id, reducer and combiners

Return type:

ObjectId

get_rounds()[source]

Get all rounds.

Returns:

All rounds.

Return type:

ObjectId

get_session(session_id)[source]

Get session with id.

Parameters:

session_id (str) – The session id.

Returns:

The session.

Return type:

ObjectID

get_sessions(limit=None, skip=None, sort_key='_id', sort_order=-1)[source]

Get all sessions.

Parameters:
  • limit (int) – The maximum number of sessions to return.

  • skip (int) – The number of sessions to skip.

  • sort_key (str) – The key to sort by.

  • sort_order (pymongo.ASCENDING or pymongo.DESCENDING) – The sort order.

Returns:

Dictionary of sessions in result (array of session objects) and count.

get_storage_backend()[source]

Get the storage backend.

Returns:

The storage backend.

Return type:

ObjectID

get_validations(**kwargs)[source]

Get validations from the database.

Parameters:

kwargs (dict) – query to filter validations

Returns:

validations matching query

Return type:

ObjectId

init_index()[source]
is_inited()[source]

Check if the statestore is intialized.

Returns:

True if initialized, else False.

Return type:

bool

list_clients(limit=None, skip=None, status=None, sort_key='last_seen', sort_order=-1)[source]

List all clients registered on the network.

Parameters:
  • limit (int) – The maximum number of clients to return.

  • skip (int) – The number of clients to skip.

  • status (str) – online | offline

  • sort_key – The key to sort by.

list_combiners_data(combiners, sort_key='count', sort_order=-1)[source]

List all combiner data.

Parameters:
  • combiners (list) – list of combiners to get data for.

  • sort_key (str) – The key to sort by.

  • sort_order (pymongo.ASCENDING or pymongo.DESCENDING) – The sort order.

Returns:

list of combiner data.

Return type:

list(ObjectId)

list_compute_packages(limit: int | None = None, skip: int | None = None, sort_key='committed_at', sort_order=-1)[source]

List compute packages in the statestore (paginated).

Parameters:
  • limit (int) – The maximum number of compute packages to return.

  • skip (int) – The number of compute packages to skip.

  • sort_key (str) – The key to sort by.

  • sort_order (pymongo.ASCENDING or pymongo.DESCENDING) – The sort order.

Returns:

Dictionary of compute packages in result and count.

Return type:

dict

list_models(session_id=None, limit=None, skip=None, sort_key='committed_at', sort_order=-1)[source]

List all models in the statestore.

Parameters:
  • session_id (str) – The session id.

  • limit (int) – The maximum number of models to return.

  • skip (int) – The number of models to skip.

Returns:

List of models.

Return type:

list

report_status(msg)[source]

Write status message to the database.

Parameters:

msg (str) – The status message.

report_validation(validation)[source]

Write model validation to database.

Parameters:

validation (dict) – The model validation.

set_active_compute_package(id: str)[source]

Set the active compute package in statestore.

Parameters:

id (str) – The id of the compute package (not document _id).

Returns:

True if successful.

Return type:

bool

set_client(client_data)[source]

Set client in statestore.

Parameters:

client_data (dict) – dictionary of client config.

Returns:

set_combiner(combiner_data)[source]

Set combiner in statestore.

Parameters:

combiner_data (dict) – dictionary of combiner config

Returns:

set_compute_package(file_name: str, storage_file_name: str, helper_type: str, name: str | None = None, description: str | None = None)[source]

Set the active compute package in statestore.

Parameters:

file_name (str) – The file_name of the compute package.

Returns:

True if successful.

Return type:

bool

set_current_model(model_id: str)[source]

Set the current model in statestore.

Parameters:

model_id (str) – The model id.

Returns:

set_helper(helper)[source]

Set the active helper package in statestore.

Parameters:

helper (str) – The name of the helper package. See helper.py for available helpers.

Returns:

set_latest_model(model_id, session_id=None)[source]

Set the latest model id.

Parameters:

model_id (str) – The model id.

Returns:

set_reducer(reducer_data)[source]

Set the reducer in the statestore.

Parameters:

reducer_data (dict) – dictionary of reducer config.

Returns:

set_round_combiner_data(data)[source]

Set combiner round controller data.

Parameters:

data (dict) – The combiner data

set_round_config(round_id, round_config: RoundConfig)[source]

Set round configuration.

Parameters:
  • round_id (str) – The round unique identifier

  • round_config (dict) – The round configuration

set_round_data(round_id, round_data)[source]

Update round metadata

Parameters:
  • round_id (str) – The round unique identifier

  • round_data (dict) – The round metadata

set_round_status(round_id, round_status)[source]

Set round status.

Parameters:
  • round_id (str) – The round unique identifier

  • round_status – The status of the round.

set_session_config(id: str, config: RoundConfig) None[source]

Set the session configuration.

Parameters:
  • id (str) – The session id

  • config (dict) – Session configuration

set_session_status(id, status)[source]

Set session status.

Parameters:
  • round_id (str) – The round unique identifier

  • round_status – The status of the session.

set_storage_backend(config)[source]

Set the storage backend.

Parameters:

config (dict) – The storage backend configuration.

Returns:

state()[source]

Get the current state.

Returns:

The current state.

Return type:

str

transition(state)[source]

Transition to a new state.

Parameters:

state (str) – The new state.

Returns:

update_client_status(clients, status)[source]

Update client status in statestore. :param client_name: The client name :type client_name: str :param status: The client status :type status: str :return: None

fedn.network.storage.statestore.statestorebase module

class fedn.network.storage.statestore.statestorebase.StateStoreBase[source]

Bases: ABC

abstract get_latest_model()[source]

Get the latest model id from the statestore.

Returns:

The model object.

Return type:

ObjectId

abstract is_inited()[source]

Check if the statestore is initialized.

Returns:

True if initialized, else False.

Return type:

bool

abstract set_latest_model(model_id)[source]

Set the latest model id in the statestore.

Parameters:

model_id (str) – The model id.

abstract state()[source]

Return the current state of the statestore.

abstract transition(state)[source]

Transition the statestore to a new state.

Parameters:

state (str) – The new state.