I'm trying to launch a distributed training job us...
# ask-metaflow
i
I'm trying to launch a distributed training job using metaflow-checkpoint, torchrun, and Huggingface Trainer in kubernetes on CPUs however I'm seeing my code is running N times where N is the number of cpus I request per pod. For example, the job below is launching on 4 pods each w/ 14 vCPU and 50Gi memory but I'm seeing the same debug lines 14 times per pod. Does anyone know why this is happening and how I can fix it?
Copy code
from metaflow import (
    FlowSpec,
    step,
    pypi_base,
    kubernetes,
    project,
    Parameter,
    current,
    torchrun,
    checkpoint,
    environment,
    model,
    retry,
)

NUM_WORKERS: int = 4
NUM_GPUS: int = 0
USE_GPU: bool = NUM_GPUS > 0
CPUS_PER_WORKER: int = 14
MEMORY_MB_PER_WORKER: int = 50000
SHARED_MEMORY_MB_BETWEEN_WORKERS: int = 12000
NODE_STARTUP_TIMEOUT_SECONDS = 10 * 60

k8s_config = {
    "cpu": CPUS_PER_WORKER,
    "memory": MEMORY_MB_PER_WORKER,
    "gpu": NUM_GPUS if USE_GPU else None,
    "shared_memory": SHARED_MEMORY_MB_BETWEEN_WORKERS,
    "use_tmpfs": SHARED_MEMORY_MB_BETWEEN_WORKERS > 0,
    "tolerations": [
        {"key": "preemptible", "operator": "Exists", "effect": "NoSchedule"}
    ],
}


@project(name="uids_label_classification")
@pypi_base(
    packages={
        ...
    },
)
class TorchRun(FlowSpec):
    ...

    @step
    def start(self):
        self.next(self.train_model, num_parallel=NUM_WORKERS)

    @kubernetes(**k8s_config)
    @model
    @torchrun
    @retry(times=3)
    @checkpoint
    @environment(vars={"OMP_NUM_THREADS": "1"})
    @step
    def train_model(self):
        print("Training model...")

        base_model_name = None
        resume_from_checkpoint: bool = False
        if current.checkpoint.is_loaded:
            print(f'current.checkpoint: {current.checkpoint}')
            print(f'current.checkpoint.info: {current.checkpoint.info}')
            print(f'Resuming training from: {current.checkpoint.directory}')
            base_model_name = current.checkpoint.directory
            resume_from_checkpoint = True
        else:
            print('Not resuming from checkpoint')

        current.torch.run(
            entrypoint="torchrun_trainer.py",
            nproc_per_node=1,
            entrypoint_args={
                "output_dir": current.checkpoint.directory,
                "batch_size": self.batch_size,
                "epochs": self.epochs,
                "learning_rate": self.learning_rate,
                "eval_strategy": self.eval_strategy,
                "use_fp16": self.use_fp16,
                "use_cpu": True,
                "use_ipex": False,
                "max_steps": self.max_steps,
                "eval_steps": self.eval_steps,
                "base_model_name": self.base_model_name or base_model_name,
                "report_to": "mlflow",
                "test_size": self.train_test_split_ratio,
                "eval_size": self.eval_test_split_ratio,
                "load_best_model_at_end": True,
                "metric_for_best_model": "eval_f1",
                "mlflow_experiment_name": self.mlflow_experiment_name,
                "early_stopping_patience": self.early_stopping_patience,
                "resume_from_checkpoint": resume_from_checkpoint,
            },
        )
        # Save the checkpoint from the controller node
        if current.parallel.node_index == 0:
            self.best_checkpoint = current.model.save(
                current.checkpoint.directory,
                label="best_checkpoint",
                metadata={
                    "epochs": self.epochs,
                    "batch_size": self.batch_size,
                    "learning_rate": self.learning_rate,
                    "base_model_name": self.base_model_name,
                },
            )
        self.next(self.join)

    @kubernetes
    @step
    def join(self, inputs):
        print('Finished training')
        for input_ in inputs:
            if hasattr(input_, "best_checkpoint"):
                self.best_checkpoint = input_.best_checkpoint
                break
        self.next(self.load_model_artifacts)

    @kubernetes
    @model(load="best_checkpoint")
    @step
    def load_model_artifacts(self):
        ...

    @kubernetes
    @step
    def log_artifacts(self):
        ... 
        self.next(self.end)

    @kubernetes
    @step
    def end(self):
        print("Ending the flow")


if __name__ == "__main__":
    TorchRun()
And here is the
torchrun_trainer.py
script in case that helps
Copy code
import torch.distributed as dist
import mlflow
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    EarlyStoppingCallback,
)
from utils import filter_rare_labels
from utils.training_utils import compute_trainer_metrics
from utils.metaflow_callback import MetaflowCheckpointCallback
import pandas as pd
import numpy as np
from datasets import Dataset, ClassLabel, load_dataset
from metaflow import Flow
from functools import partial
import os


def load_dataset_from_flow(
    flow_name: str = "UIDsLabelClassificationGenerateData",
) -> tuple[Dataset, list]:
    """
    Given a Metaflow Flow name, this function loads the
    `uids_dataset` data from the most recent successful run
    preprocesses it and returns it.

    Args:
        flow_name (str):
            The Metaflow FlowSpec that has a `uids_dataset` attribute.

    Returns:
        tuple[Dataset, list ]:
            Dataset and labels.
    """

    latest_run = Flow(flow_name).latest_successful_run
    dataset: pd.DataFrame = latest_run.data.uids_dataset
    dataset: pd.DataFrame = filter_rare_labels(dataset)

    dataset = dataset[["banner", "label"]]
    dataset.columns = ["text", "labels"]

    dataset['text'] = dataset['text'].apply(lambda x: None if str(x).lower() == "none" else x)
    dataset.fillna(value=np.nan, inplace=True)
    dataset.dropna(inplace=True)
    label_counts = dataset["labels"].value_counts()
    print(f"label_counts: {label_counts}")

    dataset = Dataset.from_pandas(dataset, split="train")
    labels = sorted(dataset.unique("labels"))

    new_features = dataset.features.copy()
    new_features["labels"] = ClassLabel(names=labels)
    dataset = dataset.cast(new_features)
    return dataset, labels


def train_test_eval_split(
    dataset: Dataset, test_size: float, eval_size: float
) -> tuple[Dataset, Dataset, Dataset]:
    """Modified to maintain streaming nature of dataset"""
    
    # Calculate split sizes
    test_dataset = dataset.take(int(len(dataset) * test_size))
    remaining = dataset.skip(int(len(dataset) * test_size))
    
    eval_dataset = remaining.take(int(len(dataset) * eval_size))
    train_dataset = remaining.skip(int(len(dataset) * eval_size))    
    return train_dataset, test_dataset, eval_dataset


def tokenize(batch: dict, tokenizer: AutoTokenizer) -> dict:
    """Modified for memory efficiency"""

    # Add max_length and return_tensors=None for memory efficiency
    tokens = tokenizer(
        batch["text"],
        padding="max_length",
        truncation=True,
        max_length=512,  # Adjust based on your needs
        return_tensors=None,  # Don't convert to tensors yet
    )
    
    return {**tokens, "labels": batch["labels"]}


def save_datasets_to_csv(train_dataset, eval_dataset, test_dataset, output_dir: str):
    """Save datasets to CSV files"""
    data_dir = os.path.join(output_dir, "data")
    os.makedirs(data_dir, exist_ok=True)
    
    paths = {}
    for dataset, name in [
        (train_dataset, "train"),
        (eval_dataset, "eval"),
        (test_dataset, "test")
    ]:
        file_path = os.path.join(data_dir, f"{name}.csv")
        # Convert to pandas and save in chunks
        df = dataset.to_pandas()
        df.to_csv(file_path, index=False)
        paths[name] = file_path
    
    return paths, data_dir


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Training configuration for the model")

    # Add all arguments based on training requirements
    ...

    args = parser.parse_args()

    try:
        tokenizer = AutoTokenizer.from_pretrained(args.base_model_name, use_fast=True)
    except Exception as e:
        print(f"Error loading fast tokenizer: {e}")
        tokenizer = AutoTokenizer.from_pretrained(args.base_model_name, use_fast=False)

    dataset, labels = load_dataset_from_flow(flow_name="UIDsLabelClassificationGenerateData")
    id2label: dict[int, str] = {idx: label for idx, label in enumerate(labels)}
    label2id: dict[str, int] = {value_: key_ for key_, value_ in id2label.items()}
    train_dataset, test_dataset, eval_dataset = train_test_eval_split(
        dataset,
        test_size=args.test_size,
        eval_size=args.eval_size,
    )
    
    # After loading the initial dataset and before splitting
    total_samples = len(dataset)  # Get total number of samples from the original dataset
    
    # Calculate samples per split
    train_samples = int(total_samples * (1 - args.test_size - (args.eval_size * (1 - args.test_size))))
    
    # Calculate max steps
    max_steps = int(
        (train_samples / args.batch_size) * args.epochs
    )
    print(f"Total samples: {total_samples}")
    print(f"Training samples: {train_samples}")
    print(f"Max steps: {max_steps}")

    training_arguments = TrainingArguments(
        output_dir=args.output_dir,
        per_device_train_batch_size=args.batch_size,
        per_device_eval_batch_size=args.batch_size,
        num_train_epochs=args.epochs,
        learning_rate=args.learning_rate,
        eval_strategy=args.eval_strategy,
        save_strategy=args.eval_strategy,
        bf16=True if args.use_fp16.lower() == "true" else False,
        use_cpu=True if args.use_cpu.lower() == "true" else False,
        use_ipex=True if args.use_ipex.lower() == "true" else False,
        # run_name=run_name,
        max_steps=max_steps,  # Set the calculated max_steps
        report_to=args.report_to,
        eval_steps=args.eval_steps,
        save_steps=args.eval_steps,
        save_total_limit=2,
        load_best_model_at_end=args.load_best_model_at_end,
        metric_for_best_model=args.metric_for_best_model,
        greater_is_better=True if args.greater_is_better.lower() == "true" else False,
        ddp_find_unused_parameters=False,
    )

    print(f'\n\n\ntraining_arguments.local_rank: {training_arguments.local_rank}\n\n')

    # Save datasets to CSV
    dataset_paths, data_dir = save_datasets_to_csv(
        train_dataset, 
        eval_dataset, 
        test_dataset, 
        args.output_dir
    )
    
    with training_arguments.main_process_first(desc="dataset map pre-processing"):
    
        # Load datasets in streaming mode
        datasets = load_dataset(
            "csv",
            data_files={
                "train": dataset_paths["train"],
                "eval": dataset_paths["eval"],
                "test": dataset_paths["test"]
            },
            streaming=True,
        )
        
        # Process datasets with streaming
        train_dataset = datasets["train"].map(
            tokenize,
            remove_columns=datasets["train"].column_names,
            fn_kwargs={"tokenizer": tokenizer},
        )
        
        eval_dataset = datasets["eval"].map(
            tokenize,
            remove_columns=datasets["eval"].column_names,
            fn_kwargs={"tokenizer": tokenizer},
        )
        
        test_dataset = datasets["test"].map(
            tokenize,
            remove_columns=datasets["test"].column_names,
            fn_kwargs={"tokenizer": tokenizer},
        )
    
    model = AutoModelForSequenceClassification.from_pretrained(
        args.base_model_name,
        num_labels=len(labels),
        id2label=id2label,
        label2id=label2id,
    )
    print('Loaded model')

    
    callbacks = [MetaflowCheckpointCallback()]
    if args.early_stopping_patience > 0:
        callbacks.append(EarlyStoppingCallback(early_stopping_patience=args.early_stopping_patience))
    trainer = Trainer(
        model=model,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        args=training_arguments,
        compute_metrics=compute_trainer_metrics,
        callbacks=callbacks,
    )


    
    trainer.train(resume_from_checkpoint=args.output_dir if args.resume_from_checkpoint.lower() == "true" else None)

    trainer.save_model(
        os.path.join(args.output_dir, "best_model")
    )
h
I suspect this is due to using torch DDP since each worker will execute the same code (although I don't see exactly where you are using DDP in your sample code). You can use this
rank_zero_only()
decorator to limit functions to only execute on rank 0: https://pytorch-lightning.readthedocs.io/en/1.7.7/api/pytorch_lightning.utilities.rank_zero.html#pytorch_lightning.utilit[…]rank_zero.rank_zero_only
i
I'm setting
ddp_backend="gloo"
in my transformers.TrainingArugments object which I believe should be handling DDP https://huggingface.co/docs/transformers/v4.49.0/en/main_classes/trainer#transformers.TrainingArguments.ddp_backend