Hopefully this helps anyone who wants to perform b...
# dev-metaflow
s
Hopefully this helps anyone who wants to perform bayesian optimisation (eg: hyperopt, optuna) on metaflow in a distributed manner using "foreach". It works on AWS Batch as well as locally. We manage to get it to work with optuna with a minimal inconvenience of setting up a postgresql/mysql database to coordinate between workers. You can parallelise in the cloud auto hyperparameter tuning of 100s of model training (any architecture, XGB/Tensorflow/Pytorch/etc) with this approach. Keen to hear feedback if there is a better way. https://github.com/msubzero2000/bayesian-optimisation-metaflow-optuna
πŸ™Œ 2
πŸ₯³ 5
πŸ‘€ 3
βœ… 1
a
This is great! To avoid the dependency on mysql, we can also use s3. We recently introduced the notion of UnboundedForEach to support a future integration with Optuna.
s
awesome news @square-wire-39606. really looking forward to see it when its out. hey, did u say that optuna can also use s3 for worker communication? I did not see it in their doco. would be awesome if it can
a
I think theoretically, we should be able to read a sqlite file from S3 at the beginning of the trial and write it back at the end of the trial. I haven't checked, but if I pass a local url to a sqlite file as storage, does Optuna barf at that?
@stocky-twilight-23298 Curious, how does Optuna guarantee this - https://github.com/msubzero2000/bayesian-optimisation-metaflow-optuna/blob/main/main.py#L83
s
Its an interesting idea. I will give the sqlite in s3 a go. re: the above code, at completion, the best trials for each worker always refer to the same global trial number with the best result.
πŸ‘ 1
s
But the workers may not execute concurrently. What happens when we do
python flow.py run --max-workers 1
?
u
well, if they're coordinating across workers with a shared db that would explain how each worker would have consensus on the best trial result regardless of execution order/concurrency, right?
u
if that's the case, a sqllite db won't do (no distributed transactions among the workers in that case).
a
Not necessarily - if worker 1 ends before any of the other workers were spawned, worker 1 would have no knowledge of the global best trial.
πŸ‘ 1
u
Ah yes, you're right. I assumed this global "best trial" assumption was in the join step but I see the problem now.
πŸ‘ 1
s
U right. I think I have gotten lucky that the best global trial was found before the earliest completed worker. It wont be efficient if each worker has to wait for everyone to complete to sync up on completion. I’ll review the code. Tnx Peeps! πŸ‘Œ
f
definitely an interesting idea! I’d been thinking of refactoring a custom bayesopt wrapper I’d built around metaflow (at the flow level) and bake it into something like an
@tune
decorator to slap on a train step. Needed to have a notion like unbounded foreach in place and also some minimal compute to evaluate future trial candidates which could even be done in an
@awslambda
quite trivially after each train step evaluation. Thought would be to keep the Ray Tune hyperparameter spec and just limit to the subset of scheduler algos that don’t use early stopping (otherwise need more robust orchestrator to externally check metrics mid-training loop)
s
yup confirmed that at completion, each worker's best trials can be different. I've added the fix. tnx @square-wire-39606 for spotting the bug
s
Great! @stocky-twilight-23298 - since the workers are now executing independently, do they need the mysql instance to coordinate?
s
they do. each worker sharing their trials progressively which will be used to guide what hyperparam to try in their next trial to achieve global convergence
πŸ™Œ 1
s
@fresh-laptop-72652 That is indeed the philosophy behind unbounded foreach where you can have a control task that's responsible for generating new trials - the trials as well as control task execute as normal Metaflow tasks.
πŸš€ 1
m
@stocky-twilight-23298 Sorry to resurrect an old thread, but why do you need to get the best trial from each of the different (metaflow) tasks? My understanding was that when you are using a db for study storage, you can just get the best trial directly from the storage after the study is over.
e.g.,
study = optuna.load_study(study_name=self.input.study_name, storage=self.input.storage)
and then
study.best_trial
should give the best trial, and this is retrieved using the db directly. I don't think there is any need to handle the outputs of the individual metaflow tasks to get the best trial.
Am I missing something?
@square-wire-39606 This integration with Optuna sounds interesting. Do you have any design doc for it?
Also, are there any docs for
UnboundedForEach
? I had a look online, but couldn't really find anything.
Copy code
import logging
import os
import uuid
from functools import partial
from typing import Optional, List

import optuna
import pandas as pd
import torch
from metaflow import (
    FlowSpec,
    Parameter,
    conda_base,
    step,
)
from model import (
    LitMNIST,
)
from pytorch_lightning import (
    Trainer,
)

logging.basicConfig(
    level=<http://logging.INFO|logging.INFO>,
    format='%(asctime)s | [%(levelname)s] | %(name)s | %(message)s',
)
logger = logging.getLogger(__name__)


def construct_study_name(study_name_prefix: str):
    """Construct the name of the Optuna study."""
    study_uuid = str(uuid.uuid4())
    return '{}{}'.format(study_name_prefix, study_uuid)


def calculate_n_trials_by_task(n_trials: int, n_tasks: int) -> List[int]:
    """
    Calculate the number of trials to perform in each of the Metaflow tasks.

    Calculate the number of Optuna trials to perform in each Metaflow task in order to perform a total of n_trials
    trials across the n_tasks tasks.
    :param: n_trials: The number of Optuna trials.
    :param: n_tasks: The number of Metaflow tasks.
    """
    n_trials_by_task_lower = n_trials // n_tasks
    n_trials_by_task_upper = n_trials // n_tasks + 1
    n_trials_by_task = n_tasks * [n_trials_by_task_lower]
    n_trials_by_task[:n_trials % n_tasks] = n_trials % n_tasks * [n_trials_by_task_upper]
    return n_trials_by_task


def objective(trial: optuna.Trial, data_path: str, available_gpus: int, max_epochs: int = 3,
              progress_bar_refresh_rate: Optional[int] = None):
    """
    Calculate the value of the Optuna objective function for the given Optuna trial.

    This function calculates the loss on the test set for the given Optuna trial.
    param: trial: The instance of the Optuna trial for which the model will be trained and evaluated.
    param: data_path: The path to the data set for the model.
    param: available_gpus: Number of GPUs to train on (int) or which GPUs to train on (list or str) applied per node.
    param: max_epochs: Stop training once this number of epochs is reached. Disabled by default (None). If both
    max_epochs and max_steps are not specified, defaults to ``max_epochs = 1000``. To enable infinite training, set
    ``max_epochs = -1``.
    param: progress_bar_refresh_rate: How often to refresh progress bar (in steps). Value ``0`` disables progress bar.
    Ignored when a custom progress bar is passed to :paramref:`~Trainer.callbacks`. Default: None, means a suitable
    value will be chosen based on the environment (terminal, Google COLAB, etc.).
    """
    learning_rate = trial.suggest_float('learning_rate', 0.0001, 0.1)

    model = LitMNIST(
        data_path,
        learning_rate=learning_rate,
    )
    trainer = Trainer(
        gpus=available_gpus,
        max_epochs=max_epochs,
        progress_bar_refresh_rate=progress_bar_refresh_rate,
    )
    trainer.fit(model)
    test_results = trainer.test(model)
    return test_results[0]['val_loss']


@conda_base(libraries={
    'optuna': '2.9.1',
    'psycopg2-binary': '2.9.2',
    'pytorch': '1.10.0',
    'pytorch-lightning': '1.5.1',
    'sqlalchemy': '1.4.27',
    'torchvision': '0.11.1',
})
class HyperparameterOptimisationFlow(FlowSpec):
    """Flow class for performing hyperparameter optimisation of the model defined in model.py on MNist data set."""

    study_name_prefix: str = Parameter(
        'study_name_prefix',
        help='The prefix of the name of the study',
        default='mnist_hyperparameter_study',
        type=str,
    )
    n_trials: int = Parameter(
        'n_trials',
        help='An upper limit on the number of Optuna trials to perform.',
        type=int,
        required=True,
    )
    n_tasks: int = Parameter(
        'n_tasks',
        help='The number of Metaflow tasks over which to split the hyperparameter optimisation.',
        type=int,
        required=True,
    )

    def __init__(self, use_cli=True):
        """Initialise the flow class to perform model hyperparameter optimisation."""
        super(HyperparameterOptimisationFlow, self).__init__(use_cli=use_cli)

        self.PATH_DATASETS = None
        self.AVAIL_GPUS = None
        self.BATCH_SIZE = None

        self.study_name: Optional[str] = None
        self.n_trials_by_task: Optional[List[int]] = None
        self.trials_dataframe: Optional[pd.DataFrame] = None

    @step
    def start(self):
        """Initialise constants to be used during model training."""
        self.PATH_DATASETS = os.environ.get('PATH_DATASETS', '.')
        self.AVAIL_GPUS = min(1, torch.cuda.device_count())
        self.BATCH_SIZE = 256 if self.AVAIL_GPUS else 64
        self.next(self.download_dataset)

    @step
    def download_dataset(self):
        """Download MNist data set."""
        LitMNIST(self.PATH_DATASETS).prepare_data()
        self.next(self.initialise_study)

    @step
    def initialise_study(self):
        """Initialise the Optuna study with the specified storage."""
        self.study_name = construct_study_name(self.study_name_prefix)
        <http://logger.info|logger.info>('Creating study: %s', self.study_name)
        optuna.create_study(
            direction='maximize',
            study_name=self.study_name,
            storage='<postgresql://postgres:docker@localhost:5432>'
        )
        self.n_trials_by_task = calculate_n_trials_by_task(self.n_trials, self.n_tasks)
        <http://logger.info|logger.info>('Specifying number of trials per task: %s', self.n_trials_by_task)
        self.next(self.optimise_study, foreach='n_trials_by_task')

    @step
    def optimise_study(self):
        """Perform the optimisation of the Optuna study."""
        partial_objective = partial(
            objective,
            data_path=self.PATH_DATASETS,
            available_gpus=self.AVAIL_GPUS,
            progress_bar_refresh_rate=20,
        )
        study = optuna.load_study(
            study_name=self.study_name,
            storage='<postgresql://postgres:docker@localhost:5432>',
        )
        study.optimize(
            partial_objective,
            n_trials=self.input,
        )
        self.next(self.summarise_hyperparameter_results)

    @step
    def summarise_study_results(self, inputs):
        """
        Summarise the results of the study, including saving the trial dataframe to the data artifacts.

        This function summarises the results of the Optuna study.
        param: inputs: The inputs from the preceding foreach tasks.
        """
        self.merge_artifacts(inputs)
        study = optuna.load_study(
            study_name=self.study_name,
            storage='<postgresql://postgres:docker@localhost:5432>',
        )
        <http://logger.info|logger.info>('Number of finished trials: %s', len(study.trials))
        <http://logger.info|logger.info>('Best trial: %s', study.best_trial)
        self.trials_dataframe = study.trials_dataframe()
        self.next(self.end)

    @step
    def end(self):
        """Perform some final logging."""
        <http://logger.info|logger.info>('Dataset Paths: %s', self.PATH_DATASETS)
        <http://logger.info|logger.info>('Available GPUs: %s', self.AVAIL_GPUS)
        <http://logger.info|logger.info>('Batch Size: %s', self.BATCH_SIZE)


if __name__ == '__main__':
    HyperparameterOptimisationFlow()
πŸ™Œ 1
ok, so I think this is what it would look like if you were to use the relationaldb backend for the study.
I did a different model in my example. It is just tested locally and I have hardcoded the postgres connection string, but it is working.
The main functions are
initialise_study
,
optimise_study
and
summarise_study_results