Federated
Pipe
:class:Pipe
maintains the communication operation between two nodes, including tensor data and info message.
It uses a store to transfer info message and process group with gloo
or mpi
to transfer tensor data.
DistributedProperties
:class:DistributedProperties
contains all distributed attributions of torch.distributed.distributed_c10d
.
Usually, you can use it with context environment.
with dist_props:
...
FederatedProperties
:class:FederatedProperties
contains all federated attributions, such as address, role and nick name.
It is usually generated via :func:openfed.topo.analysis
.
Examples
Here, we try to communicate some information among aggregator
, collaborator_alpha
and collaborator_beta
.
You need to open three independent terminals to run the following three scripts.
Aggregator:
# build a topology first
import time
# transfer data
import torch
import openfed
import openfed.topo as topo
aggregator = topo.Node('aggregator', openfed.default_tcp_address)
alpha = topo.Node('alpha', openfed.empty_address)
beta = topo.Node('beta', openfed.empty_address)
topology = topo.Topology()
topology.add_node_list([aggregator, alpha, beta])
topology.add_edge(alpha, aggregator)
topology.add_edge(beta, aggregator)
# analysis topology to get federated group props
federated_group_props = topo.analysis(topology, aggregator)
assert len(federated_group_props) == 1
federated_group_prop = federated_group_props[0]
# build pipe
pipes = openfed.federated.init_federated_group(federated_group_prop)
assert len(pipes) == 2
alpha_pipe, beta_pipe = pipes
# transfer message
alpha_pipe.direct_set('message_0', 'hello world from aggregator to alpha')
beta_pipe.direct_set('message_0', 'hello world from aggregator to beta')
print(alpha_pipe.direct_get('message_1'))
print(beta_pipe.direct_get('message_1'))
data = torch.tensor(-1)
with alpha_pipe.dist_props:
time.sleep(0.5)
# send data to alpha
alpha_pipe.upload(data)
time.sleep(0.5)
# download data from alpha
assert alpha_pipe.download() == 1
with beta_pipe.dist_props:
time.sleep(0.5)
# send data to beta
beta_pipe.upload(data)
time.sleep(0.5)
# download data from beta
assert beta_pipe.download() == 2
time.sleep(1)
Collaborator alpha:
# build a topology first
import time
# transfer tensor
import torch
import openfed
import openfed.topo as topo
aggregator = topo.Node('aggregator', openfed.default_tcp_address)
alpha = topo.Node('alpha', openfed.empty_address)
beta = topo.Node('beta', openfed.empty_address)
topology = topo.Topology()
topology.add_node_list([aggregator, alpha, beta])
topology.add_edge(alpha, aggregator)
topology.add_edge(beta, aggregator)
# analysis topology to get federated group props
federated_group_props = topo.analysis(topology, alpha)
assert len(federated_group_props) == 1
federated_group_prop = federated_group_props[0]
# build pipe
pipes = openfed.federated.init_federated_group(federated_group_prop)
alpha_pipe = pipes[0]
# transfer message
print(alpha_pipe.direct_get('message_0'))
alpha_pipe.direct_set('message_1', 'hello world from alpha to aggregator')
data = torch.tensor(1)
with alpha_pipe.dist_props:
# download data from aggregator
assert alpha_pipe.download() == -1
# upload data to aggregator
alpha_pipe.upload(data)
time.sleep(1)
Collaborator beta:
# build a topology first
import time
# transfer data
import torch
import openfed
import openfed.topo as topo
aggregator = topo.Node('aggregator', openfed.default_tcp_address)
alpha = topo.Node('alpha', openfed.empty_address)
beta = topo.Node('beta', openfed.empty_address)
topology = topo.Topology()
topology.add_node_list([aggregator, alpha, beta])
topology.add_edge(alpha, aggregator)
topology.add_edge(beta, aggregator)
# analysis topology to get federated group props
federated_group_props = topo.analysis(topology, beta)
assert len(federated_group_props) == 1
federated_group_prop = federated_group_props[0]
# build pipe
pipes = openfed.federated.init_federated_group(federated_group_prop)
beta_pipe = pipes[0]
# transfer message
print(beta_pipe.direct_get('message_0'))
beta_pipe.direct_set('message_1', 'hello world from beta to aggregator')
data = torch.tensor(2)
with beta_pipe.dist_props:
# download data from aggregator
assert beta_pipe.download() == -1
# upload data to aggregator
beta_pipe.upload(data)
time.sleep(1)
The output of aggregator:
(openfed) python aggregator.py
hello world from alpha to aggregator
hello world from beta to aggregator
The output of collaborator alpha:
(openfed) python collaborator_alpha.py
hello world from aggregator to alpha
The output of collaborator beta:
(openfed) python collaborator_beta.py
hello world from aggregator to beta