Model Training Pipeline
class DistributedTrainer:
def __init__(self, model, node_config):
self.model = model
self.node_id = node_config.id
self.batch_size = node_config.batch_size
async def train_iteration(self, data_batch):
# Local computation
gradients = self.compute_gradients(data_batch)
# Submit to blockchain
tx_hash = await self.submit_gradients(gradients)
# Wait for consensus
consensus = await self.wait_for_consensus(tx_hash)
if consensus.achieved:
self.update_model(consensus.aggregated_gradients)
def compute_gradients(self, batch):
with torch.no_grad():
predictions = self.model(batch.features)
loss = self.criterion(predictions, batch.labels)
return torch.autograd.grad(loss, self.model.parameters())
Last updated