Core
Maintainer
:class:Maintainer
bridges the connection between upper(federated algorithms) and lower(communication and topology) layers. It has the following properties:
pipe
: The currently target to communicate with. A maintainer will manage several pipes in the same time, andpipe
will indicate what is the current target.pipes
: A list of pipes to communicate with.current_step
: It is used to indicate which step is running on.fed_props
: Actually, a maintainer is corresponding to a specified federated group. We record the related federated group properties in this attributions.
You can use :class:Maintainer
to conduct a flexible communication with other nodes more easily than :class:Pipe
.
Examples
Aggregator:
# build a topology first
import openfed
import openfed.topo as topo
aggregator = topo.Node('aggregator', openfed.default_tcp_address)
alpha = topo.Node('alpha', openfed.empty_address)
beta = topo.Node('beta', openfed.empty_address)
topology = topo.Topology()
topology.add_node_list([aggregator, alpha, beta])
topology.add_edge(alpha, aggregator)
topology.add_edge(beta, aggregator)
# analysis topology to get federated group props
federated_group_props = topo.analysis(topology, aggregator)
assert len(federated_group_props) == 1
federated_group_prop = federated_group_props[0]
# build network
import torch.nn as nn
network = nn.Linear(10, 1)
# build maintainer
from openfed.core import Maintainer
maintainer = Maintainer(federated_group_prop,
network.state_dict(keep_vars=True))
with maintainer:
openfed.functional.device_alignment()
openfed.functional.count_step(2)
maintainer.step()
Collaborator alpha:
# build a topology first
import openfed
import openfed.topo as topo
aggregator = topo.Node('aggregator', openfed.default_tcp_address)
alpha = topo.Node('alpha', openfed.empty_address)
beta = topo.Node('beta', openfed.empty_address)
topology = topo.Topology()
topology.add_node_list([aggregator, alpha, beta])
topology.add_edge(alpha, aggregator)
topology.add_edge(beta, aggregator)
# analysis topology to get federated group props
federated_group_props = topo.analysis(topology, alpha)
assert len(federated_group_props) == 1
federated_group_prop = federated_group_props[0]
# build network
import torch.nn as nn
network = nn.Linear(10, 1)
# build maintainer
from openfed.core import Maintainer
maintainer = Maintainer(federated_group_prop,
network.state_dict(keep_vars=True))
with maintainer:
openfed.functional.device_alignment()
maintainer.step(upload=False)
maintainer.package()
maintainer.step(download=False)
Collaborator beta:
# build a topology first
import openfed
import openfed.topo as topo
aggregator = topo.Node('aggregator', openfed.default_tcp_address)
alpha = topo.Node('alpha', openfed.empty_address)
beta = topo.Node('beta', openfed.empty_address)
topology = topo.Topology()
topology.add_node_list([aggregator, alpha, beta])
topology.add_edge(alpha, aggregator)
topology.add_edge(beta, aggregator)
# analysis topology to get federated group props
federated_group_props = topo.analysis(topology, beta)
assert len(federated_group_props) == 1
federated_group_prop = federated_group_props[0]
# build network
import torch.nn as nn
network = nn.Linear(10, 1)
# build maintainer
from openfed.core import Maintainer
maintainer = Maintainer(federated_group_prop,
network.state_dict(keep_vars=True))
with maintainer:
openfed.functional.device_alignment()
maintainer.step(upload=False)
maintainer.package()
maintainer.step(download=False)