Model Training Pipeline

class DistributedTrainer:
    def __init__(self, model, node_config):
        self.model = model
        self.node_id = node_config.id
        self.batch_size = node_config.batch_size
        
    async def train_iteration(self, data_batch):
        # Local computation
        gradients = self.compute_gradients(data_batch)

        # Submit to blockchain
        tx_hash = await self.submit_gradients(gradients)

        # Wait for consensus
        consensus = await self.wait_for_consensus(tx_hash)
        if consensus.achieved:
            self.update_model(consensus.aggregated_gradients)

    def compute_gradients(self, batch):
        with torch.no_grad():
            predictions = self.model(batch.features)
            loss = self.criterion(predictions, batch.labels)
            return torch.autograd.grad(loss, self.model.parameters())

Last updated