hundreds-wire-22547
04/04/2025, 4:59 PMConfig
(https://docs.metaflow.org/metaflow/configuring-flows/basic-configuration). I was able to get this to work on the flow-level (see đź§µ), but at step-level, the object type is still DelayEvaluator
when I try a similar decorator approach to that in my flow-level example.hundreds-wire-22547
04/04/2025, 4:59 PMdef custom_schedule(
is_cron: str, cron_schedule: str, cron_timezone: str
) -> Callable[[FlowSpecMeta], FlowSpecMeta]:
"""Decorator to add cron schedule to a flow."""
def _custom_schedule(
flow_spec_meta: FlowSpecMeta,
) -> FlowSpecMeta:
"""Decorator to add cron schedule to a flow."""
if is_cron == "false":
return flow_spec_meta
return schedule(cron=cron_schedule, timezone=cron_timezone)(flow_spec_meta)
return _custom_schedule
@custom_schedule(
is_cron=config_expr("config.is_cron"),
cron_schedule=config_expr("config.schedule"),
cron_timezone=config_expr("config.timezone"),
)
hundreds-wire-22547
04/04/2025, 5:03 PMdef custom_engine_config(
engine: str, local_mode: bool
) -> Callable[[FlowSpecMeta], FlowSpecMeta]:
"""Decorator to add engine config to a flow."""
def _custom_engine_config(
flow_spec_meta: FlowSpecMeta,
) -> FlowSpecMeta:
"""Decorator to add engine config to a flow."""
if engine == "ray":
if local_mode:
return kubernetes(
image=flow_constants.IMAGE,
cpu=4,
memory=24000,
qos="Guaranteed",
)(flow_spec_meta)
return raystep(
address=flow_constants.RAY_CLUSTER_NAME,
timeout_seconds=flow_constants.RAY_TIMEOUT_SECONDS,
env_vars=flow_constants.RAY_ENV_VARS,
)(flow_spec_meta)
raise ValueError(f"Invalid engine {engine}. Supported engines are: ray.")
hundreds-wire-22547
04/04/2025, 5:07 PMValueError: Invalid engine <metaflow.user_configs.config_parameters.DelayEvaluator object at 0x1047b92e0>. Supported engines are: ray.
dry-beach-38304
04/05/2025, 12:55 AMdry-beach-38304
04/05/2025, 12:55 AMdry-beach-38304
04/05/2025, 12:56 AMdry-beach-38304
04/05/2025, 12:56 AMdry-beach-38304
04/05/2025, 12:57 AMhundreds-wire-22547
04/07/2025, 8:03 PM