important-london-94970
03/25/2025, 1:13 PMfrom metaflow import (
FlowSpec,
step,
pypi_base,
kubernetes,
project,
Parameter,
current,
torchrun,
checkpoint,
environment,
model,
retry,
)
NUM_WORKERS: int = 4
NUM_GPUS: int = 0
USE_GPU: bool = NUM_GPUS > 0
CPUS_PER_WORKER: int = 14
MEMORY_MB_PER_WORKER: int = 50000
SHARED_MEMORY_MB_BETWEEN_WORKERS: int = 12000
NODE_STARTUP_TIMEOUT_SECONDS = 10 * 60
k8s_config = {
"cpu": CPUS_PER_WORKER,
"memory": MEMORY_MB_PER_WORKER,
"gpu": NUM_GPUS if USE_GPU else None,
"shared_memory": SHARED_MEMORY_MB_BETWEEN_WORKERS,
"use_tmpfs": SHARED_MEMORY_MB_BETWEEN_WORKERS > 0,
"tolerations": [
{"key": "preemptible", "operator": "Exists", "effect": "NoSchedule"}
],
}
@project(name="uids_label_classification")
@pypi_base(
packages={
...
},
)
class TorchRun(FlowSpec):
...
@step
def start(self):
self.next(self.train_model, num_parallel=NUM_WORKERS)
@kubernetes(**k8s_config)
@model
@torchrun
@retry(times=3)
@checkpoint
@environment(vars={"OMP_NUM_THREADS": "1"})
@step
def train_model(self):
print("Training model...")
base_model_name = None
resume_from_checkpoint: bool = False
if current.checkpoint.is_loaded:
print(f'current.checkpoint: {current.checkpoint}')
print(f'current.checkpoint.info: {current.checkpoint.info}')
print(f'Resuming training from: {current.checkpoint.directory}')
base_model_name = current.checkpoint.directory
resume_from_checkpoint = True
else:
print('Not resuming from checkpoint')
current.torch.run(
entrypoint="torchrun_trainer.py",
nproc_per_node=1,
entrypoint_args={
"output_dir": current.checkpoint.directory,
"batch_size": self.batch_size,
"epochs": self.epochs,
"learning_rate": self.learning_rate,
"eval_strategy": self.eval_strategy,
"use_fp16": self.use_fp16,
"use_cpu": True,
"use_ipex": False,
"max_steps": self.max_steps,
"eval_steps": self.eval_steps,
"base_model_name": self.base_model_name or base_model_name,
"report_to": "mlflow",
"test_size": self.train_test_split_ratio,
"eval_size": self.eval_test_split_ratio,
"load_best_model_at_end": True,
"metric_for_best_model": "eval_f1",
"mlflow_experiment_name": self.mlflow_experiment_name,
"early_stopping_patience": self.early_stopping_patience,
"resume_from_checkpoint": resume_from_checkpoint,
},
)
# Save the checkpoint from the controller node
if current.parallel.node_index == 0:
self.best_checkpoint = current.model.save(
current.checkpoint.directory,
label="best_checkpoint",
metadata={
"epochs": self.epochs,
"batch_size": self.batch_size,
"learning_rate": self.learning_rate,
"base_model_name": self.base_model_name,
},
)
self.next(self.join)
@kubernetes
@step
def join(self, inputs):
print('Finished training')
for input_ in inputs:
if hasattr(input_, "best_checkpoint"):
self.best_checkpoint = input_.best_checkpoint
break
self.next(self.load_model_artifacts)
@kubernetes
@model(load="best_checkpoint")
@step
def load_model_artifacts(self):
...
@kubernetes
@step
def log_artifacts(self):
...
self.next(self.end)
@kubernetes
@step
def end(self):
print("Ending the flow")
if __name__ == "__main__":
TorchRun()
important-london-94970
03/25/2025, 1:15 PMtorchrun_trainer.py
script in case that helps
import torch.distributed as dist
import mlflow
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
TrainingArguments,
Trainer,
EarlyStoppingCallback,
)
from utils import filter_rare_labels
from utils.training_utils import compute_trainer_metrics
from utils.metaflow_callback import MetaflowCheckpointCallback
import pandas as pd
import numpy as np
from datasets import Dataset, ClassLabel, load_dataset
from metaflow import Flow
from functools import partial
import os
def load_dataset_from_flow(
flow_name: str = "UIDsLabelClassificationGenerateData",
) -> tuple[Dataset, list]:
"""
Given a Metaflow Flow name, this function loads the
`uids_dataset` data from the most recent successful run
preprocesses it and returns it.
Args:
flow_name (str):
The Metaflow FlowSpec that has a `uids_dataset` attribute.
Returns:
tuple[Dataset, list ]:
Dataset and labels.
"""
latest_run = Flow(flow_name).latest_successful_run
dataset: pd.DataFrame = latest_run.data.uids_dataset
dataset: pd.DataFrame = filter_rare_labels(dataset)
dataset = dataset[["banner", "label"]]
dataset.columns = ["text", "labels"]
dataset['text'] = dataset['text'].apply(lambda x: None if str(x).lower() == "none" else x)
dataset.fillna(value=np.nan, inplace=True)
dataset.dropna(inplace=True)
label_counts = dataset["labels"].value_counts()
print(f"label_counts: {label_counts}")
dataset = Dataset.from_pandas(dataset, split="train")
labels = sorted(dataset.unique("labels"))
new_features = dataset.features.copy()
new_features["labels"] = ClassLabel(names=labels)
dataset = dataset.cast(new_features)
return dataset, labels
def train_test_eval_split(
dataset: Dataset, test_size: float, eval_size: float
) -> tuple[Dataset, Dataset, Dataset]:
"""Modified to maintain streaming nature of dataset"""
# Calculate split sizes
test_dataset = dataset.take(int(len(dataset) * test_size))
remaining = dataset.skip(int(len(dataset) * test_size))
eval_dataset = remaining.take(int(len(dataset) * eval_size))
train_dataset = remaining.skip(int(len(dataset) * eval_size))
return train_dataset, test_dataset, eval_dataset
def tokenize(batch: dict, tokenizer: AutoTokenizer) -> dict:
"""Modified for memory efficiency"""
# Add max_length and return_tensors=None for memory efficiency
tokens = tokenizer(
batch["text"],
padding="max_length",
truncation=True,
max_length=512, # Adjust based on your needs
return_tensors=None, # Don't convert to tensors yet
)
return {**tokens, "labels": batch["labels"]}
def save_datasets_to_csv(train_dataset, eval_dataset, test_dataset, output_dir: str):
"""Save datasets to CSV files"""
data_dir = os.path.join(output_dir, "data")
os.makedirs(data_dir, exist_ok=True)
paths = {}
for dataset, name in [
(train_dataset, "train"),
(eval_dataset, "eval"),
(test_dataset, "test")
]:
file_path = os.path.join(data_dir, f"{name}.csv")
# Convert to pandas and save in chunks
df = dataset.to_pandas()
df.to_csv(file_path, index=False)
paths[name] = file_path
return paths, data_dir
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Training configuration for the model")
# Add all arguments based on training requirements
...
args = parser.parse_args()
try:
tokenizer = AutoTokenizer.from_pretrained(args.base_model_name, use_fast=True)
except Exception as e:
print(f"Error loading fast tokenizer: {e}")
tokenizer = AutoTokenizer.from_pretrained(args.base_model_name, use_fast=False)
dataset, labels = load_dataset_from_flow(flow_name="UIDsLabelClassificationGenerateData")
id2label: dict[int, str] = {idx: label for idx, label in enumerate(labels)}
label2id: dict[str, int] = {value_: key_ for key_, value_ in id2label.items()}
train_dataset, test_dataset, eval_dataset = train_test_eval_split(
dataset,
test_size=args.test_size,
eval_size=args.eval_size,
)
# After loading the initial dataset and before splitting
total_samples = len(dataset) # Get total number of samples from the original dataset
# Calculate samples per split
train_samples = int(total_samples * (1 - args.test_size - (args.eval_size * (1 - args.test_size))))
# Calculate max steps
max_steps = int(
(train_samples / args.batch_size) * args.epochs
)
print(f"Total samples: {total_samples}")
print(f"Training samples: {train_samples}")
print(f"Max steps: {max_steps}")
training_arguments = TrainingArguments(
output_dir=args.output_dir,
per_device_train_batch_size=args.batch_size,
per_device_eval_batch_size=args.batch_size,
num_train_epochs=args.epochs,
learning_rate=args.learning_rate,
eval_strategy=args.eval_strategy,
save_strategy=args.eval_strategy,
bf16=True if args.use_fp16.lower() == "true" else False,
use_cpu=True if args.use_cpu.lower() == "true" else False,
use_ipex=True if args.use_ipex.lower() == "true" else False,
# run_name=run_name,
max_steps=max_steps, # Set the calculated max_steps
report_to=args.report_to,
eval_steps=args.eval_steps,
save_steps=args.eval_steps,
save_total_limit=2,
load_best_model_at_end=args.load_best_model_at_end,
metric_for_best_model=args.metric_for_best_model,
greater_is_better=True if args.greater_is_better.lower() == "true" else False,
ddp_find_unused_parameters=False,
)
print(f'\n\n\ntraining_arguments.local_rank: {training_arguments.local_rank}\n\n')
# Save datasets to CSV
dataset_paths, data_dir = save_datasets_to_csv(
train_dataset,
eval_dataset,
test_dataset,
args.output_dir
)
with training_arguments.main_process_first(desc="dataset map pre-processing"):
# Load datasets in streaming mode
datasets = load_dataset(
"csv",
data_files={
"train": dataset_paths["train"],
"eval": dataset_paths["eval"],
"test": dataset_paths["test"]
},
streaming=True,
)
# Process datasets with streaming
train_dataset = datasets["train"].map(
tokenize,
remove_columns=datasets["train"].column_names,
fn_kwargs={"tokenizer": tokenizer},
)
eval_dataset = datasets["eval"].map(
tokenize,
remove_columns=datasets["eval"].column_names,
fn_kwargs={"tokenizer": tokenizer},
)
test_dataset = datasets["test"].map(
tokenize,
remove_columns=datasets["test"].column_names,
fn_kwargs={"tokenizer": tokenizer},
)
model = AutoModelForSequenceClassification.from_pretrained(
args.base_model_name,
num_labels=len(labels),
id2label=id2label,
label2id=label2id,
)
print('Loaded model')
callbacks = [MetaflowCheckpointCallback()]
if args.early_stopping_patience > 0:
callbacks.append(EarlyStoppingCallback(early_stopping_patience=args.early_stopping_patience))
trainer = Trainer(
model=model,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
args=training_arguments,
compute_metrics=compute_trainer_metrics,
callbacks=callbacks,
)
trainer.train(resume_from_checkpoint=args.output_dir if args.resume_from_checkpoint.lower() == "true" else None)
trainer.save_model(
os.path.join(args.output_dir, "best_model")
)
hundreds-rainbow-67050
03/25/2025, 1:20 PMrank_zero_only()
decorator to limit functions to only execute on rank 0: https://pytorch-lightning.readthedocs.io/en/1.7.7/api/pytorch_lightning.utilities.rank_zero.html#pytorch_lightning.utilit[…]rank_zero.rank_zero_onlyimportant-london-94970
03/25/2025, 1:31 PMddp_backend="gloo"
in my transformers.TrainingArugments object which I believe should be handling DDP https://huggingface.co/docs/transformers/v4.49.0/en/main_classes/trainer#transformers.TrainingArguments.ddp_backend