is it possible to conditionally apply a decorator ...
# ask-metaflow
h
is it possible to conditionally apply a decorator on a step based on a value in a
Config
(https://docs.metaflow.org/metaflow/configuring-flows/basic-configuration). I was able to get this to work on the flow-level (see đź§µ), but at step-level, the object type is still
DelayEvaluator
when I try a similar decorator approach to that in my flow-level example.
âś… 1
flow-level example that works:
Copy code
def custom_schedule(
    is_cron: str, cron_schedule: str, cron_timezone: str
) -> Callable[[FlowSpecMeta], FlowSpecMeta]:
    """Decorator to add cron schedule to a flow."""

    def _custom_schedule(
        flow_spec_meta: FlowSpecMeta,
    ) -> FlowSpecMeta:
        """Decorator to add cron schedule to a flow."""
        if is_cron == "false":
            return flow_spec_meta

        return schedule(cron=cron_schedule, timezone=cron_timezone)(flow_spec_meta)

    return _custom_schedule

@custom_schedule(
    is_cron=config_expr("config.is_cron"),
    cron_schedule=config_expr("config.schedule"),
    cron_timezone=config_expr("config.timezone"),
)
step-level example that fails: I want to control whether a certain step is shipped off for execution on a Ray cluster or run on a local Ray cluster based on configuration.
Copy code
def custom_engine_config(
        engine: str, local_mode: bool
    ) -> Callable[[FlowSpecMeta], FlowSpecMeta]:
        """Decorator to add engine config to a flow."""

        def _custom_engine_config(
            flow_spec_meta: FlowSpecMeta,
        ) -> FlowSpecMeta:
            """Decorator to add engine config to a flow."""
            if engine == "ray":
                if local_mode:
                    return kubernetes(
                        image=flow_constants.IMAGE,
                        cpu=4,
                        memory=24000,
                        qos="Guaranteed",
                    )(flow_spec_meta)

                return raystep(
                    address=flow_constants.RAY_CLUSTER_NAME,
                    timeout_seconds=flow_constants.RAY_TIMEOUT_SECONDS,
                    env_vars=flow_constants.RAY_ENV_VARS,
                )(flow_spec_meta)

            raise ValueError(f"Invalid engine {engine}. Supported engines are: ray.")
ValueError: Invalid engine <metaflow.user_configs.config_parameters.DelayEvaluator object at 0x1047b92e0>. Supported engines are: ray.
d
yes — very much possible. This is not public or documented yet but take a look here: https://github.com/Netflix/metaflow/blob/master/test/test_config/mutable_flow.py
it will be made public soon hopefully
but it’s already there and you can use it in the current versions of metaflow.
let me know if you have questions.
interestingly, I think we forgot to support the “add decorator to flow” using a nice API. I’ll add that.
among us party 1
h
nice! worked perfectly at flow-level, will try step-level