Hi guys. Thank you for making metaflow a great too...
# dev-metaflow
s
Hi guys. Thank you for making metaflow a great tool! I’m interested in the support of multi-node parallel jobs with AWS Batch https://docs.aws.amazon.com/batch/latest/userguide/multi-node-parallel-jobs.html I have the need to go beyond p3.16xlarge GPU instance, which has 8 Tesla V100 Nvidia GPUs Cheers
1
s
@silly-motorcycle-86975 Are you interested in distributed model training (with MNP for Batch) or training multiple models in parallel (which can be achieved either through Metaflow
for-each
or AWS Batch array jobs)?
s
Hi Savin! I’m in interested in training one model in distributed fashion (with MNP for Batch)
s
Supporting distributed training is on our roadmap. How are you training your models currently?
I have opened a GH issue here - https://github.com/Netflix/metaflow/issues/648
s
I’m training my models in single-machine-multiple-gpu-cards fashion (single machine distributed training across cards) with the p3.16xlarge instances and AWS Batch
s
We had worked with the team at determined.ai a little while back to document a way to launch distributed training tasks on a GPU cluster managed by Determined. We can brainstorm a much more native integration between Metaflow and MNP for Batch 🙂
Which training library are you using at the moment?
s
pytorch lightning dp (data parallel) + ray.tune (with population based training for hyperopt) or pytorch lightning ddp (distributed data parallel) I tried to make pytorch lightning ddp with ray.tune but it doesn’t seem to work
I know Zillow has a fork of metaflow and they developed a
@pytorch_distributed
decorator: https://github.com/zillow/metaflow/pull/57 I haven’t looked in detail so I don’t know if this works on multi-node or not
pytorch lighting doc says that their DDP (DistributedDataParallel) supports multiple-gpus across many machines: https://pytorch-lightning.readthedocs.io/en/stable/advanced/multi_gpu.html#distributed-modes Lightning allows multiple ways of training • Data Parallel (
accelerator='dp'
) (multiple-gpus, 1 machine) • DistributedDataParallel (
accelerator='ddp'
) (multiple-gpus across many machines (python script based)). • DistributedDataParallel (
accelerator='ddp_spawn'
) (multiple-gpus across many machines (spawn based)). • DistributedDataParallel 2 (
accelerator='ddp2'
) (DP in a machine, DDP across machines). • Horovod (
accelerator='horovod'
) (multi-machine, multi-gpu, configured at runtime) • TPUs (
tpu_cores=8|x
) (tpu or TPU pod)
It also says that it supports
nccl
and
gloo
as distributed backend
However I have never tried multi-machine multi-gpu distributed training before in any way so I’m not 100% how these will work with multi-node AWS Batch jobs
f
out of curiosity, is your model bounded by VRAM requirements? So far you’ve mentioned data parallel but from a distributed DL model perspective, it’s usually more of a question of having either data-parallelism (replicate same model across GPUs for accelerated training + data parallel to feed them) or model-parallelism (split parts of same model across multiple GPUs)
if it’s mainly focused around data parallelism and train time performance, you could probably benefit greatly by just swapping to the newer p4d.24xl instance and use the A100 GPUs
they also have 320GB of combined VRAM across the 8 GPUs instead of 256GB if you want to bump the batch sizes
h
@silly-motorcycle-86975: If you want to run DDP over many nodes then you may need TCP initialization; For such cases Metaflow requires some hard plumbing; As batch is throwing the jobs in a job queue, you need to ensure a few things : 1) All containers have to thrown into same VPC; 2) Master container where you are running initialization Needs to have an IP which is known; This is to ensure that all other worker containers connect to the same master container; This is non-trivial with Batch few workarounds: 1. Run a bigger 🦣 GPU node as @fresh-laptop-72652 suggests ; 2. If u need DDP over many nodes, then write all the code under a foreach that throws all jobs in the same VPC; Keep workers equal to the `world_size`; The IP resolution seems non trivial but I think there may be a hack with CIDR specification for the batch VPC
❤️ 2
s
@fresh-laptop-72652 I’m not bounded by VRAM nor VCPU nor disk. I simply want to use more cards to increase my batch size to speed up the training. Yeah I also just found out this p4d.24xl instance that has the A100 GPUs. I’m going to add it to my compute env deployparrot
🚀 1
f
Why incur the cost of distributed compute overhead when you can swap to newer tech where 8 A100s is roughly equal to using 24 V100s 😄 https://lambdalabs.com/blog/nvidia-a100-vs-v100-benchmarks/
❤️ 1
🙌 2
image.png
s
@little-apartment-49355
1) All containers have to thrown into same VPC;
I think this can be easily done with my metaflow infra as all of its compute envs/job queues are configured under the same VPC
2) Master container where you are running initialization Needs to have an IP which is known;
Yeah this sounds un-trivial to me
If u need DDP over many nodes, then write all the code under a foreach that throws all jobs in the same VPC; Keep workers equal to the  world_size; The IP resolution seems non trivial but I think there may be a hack with CIDR specification for the batch VPC
I don’t understand what this means
write all the code under a foreach that throws all jobs in the same VPC
Do you mean write model training code under a
foreach
metaflow step? What should I foreach on?
l
@silly-motorcycle-86975 What I am expressing is an extremely hacky way; I wouldn't recommend it; This needs a lot of plumbing to make it work with DDP; But the boiler plate looks something like this;
Copy code
from metaflow import FlowSpec,step
class DDPExample(FlowSpec):

    @step
    def start(self):
        self.new_workers = [1,2,3,4,5]
        self.next(self.ddp_step,foreach='new_workers')
    
    @step
    def ddp_step(self):
        # There will be five instances of this step;
        # One of the container instances needs to be the master container
        # All the other containers need the IP address of the master container; 
        # Doing this seems Non Trivial as you need to set IP address in some or the other way;

        # Your ddp synchronization code comes here
        ...
        ...
        ...
        self.next(self.join)

    @step
    def join(self,inputs):
        self.next(self.end)

    @step
    def end(self):
        print("done")
s
Yeah indeed. Thank you very much for the boiler plate!
s
Just to add to the example above - you would need some form of gang scheduling in AWS Batch to guarantee that all 5
ddp_step
tasks execute concurrently.
s
that’s a good point