Hi Metaflow peeps, Is there a way I can run a ste...
# dev-metaflow
c
Hi Metaflow peeps, Is there a way I can run a step on a condition eg.
Copy code
def determine_flow_run_type(self):
        """
        This step determines if this is the first run of the flow, or if this is an update to the flow.
        """
        if self.is_first_run:
            self.next(self.get_data)
        else:
            self.next(self.load_todays_first_run_data)
getting the error:
Copy code
Validating your flow...
    Validity checker found an issue on line 85:
    Step determine_flow_run_type is missing a self.next() transition to the next step. Add a self.next() as the last line in the function.
1
s
No, conditional execution like that is not supported. There are a few alternatives: • Have an
if
condition in a step to choose between different modes • Have an
if
condition to make a step a no-op e.g. based on
self.is_first_run
• Have a foreach where you alter the number of tasks executed based on some conditional
c
ok thanks @straight-shampoo-11124, interesting because step functions support choosing between steps, I just assumed metaflow would do the same.
v
right. An interesting question is how you’d express the condition. It would be most natural to do it in Python, maybe using an
if
condition like in your example. There would need to be some mechanism to communicate the choice back to the orchestrator
c
random question: can you attach multiple
@schedule
decorators to a flow?
s
not currently. What would you like to achieve?
c
for example, running a flow at 7.15 am and then running a flow every 30 mins from 10am to 3pm
m
Coming over from Kubeflow, most of Metaflow's API seems like an obvious improvement, but conditionals is the one area that seems to be missing. Are there any plans to support conditional branching beyond the workarounds suggested in the thread?
s
we are still collecting use cases for conditionals. What's your use case?
m
Thanks for the quick reply @straight-shampoo-11124 -- we have a couple, but our main one is around transfer learning use-cases: would like to be able to optionally run a pretraining step if a particular embedding is no longer up-to-date, and then train downstream models either from the output of the pretraining step or using existing weights if they exist. Other use-cases for this include being able to possibly "skip" stages in a large fan-out pipeline (for example, when running a job over quarterly data we'd like a skip/no-op if a particular period is empty/has fewer than a particular number of rows) before proceeding with the rest of the workflow for that period. That said, both of these could be solved by changing internal logic in the various steps, so curious if you think these are appropriate use-cases for conditionals or if another solution is recommended.
s
thanks! Makes sense. The two main mechanisms thus far have been conditionals inside steps and splitting a large workflow into multiple separate workflows which can be triggered conditionally
m
Do you have any examples of what the 2nd approach would look like? I noticed another thread which suggested extending workflow classes for composability, but how would the conditional triggering work?
s
if you deploy e.g. two workflows, A and B on Argo, you can use Argo Events to trigger B upon the completion of A. Or if you deploy on AWS Step Functions, you can use EventBridge Triggering like this is a key feature of Metaflow inside Netflix and we are looking to support it officially in open-source too. Take a look at this memo and comment https://docs.google.com/document/d/1liTvpACWKioCSQTUv5iO3g2AKuLu4x3EYFwEl43WAZU/edit?usp=sharing
thankyou 1
let us know if you want to test the pattern. We are happy to help
m
Definitely, am planning on circling back and doing a full bake-off between pure Argo, Kubeflow, & Metaflow for some of our large pipelines in a few weeks once the Argo support lands. Would be happy to test the pattern as part of it, just point me in the right direction to install an RC/build it from a branch
👍 2
s
cool! @square-wire-39606 has been testing it on Argo so he will be able to help
u
@straight-shampoo-11124 is this thread still active? I am wondering whether you could give example of the 2nd alternative - add condition at the top of @step . Cc @cuddly-rocket-69327 https://zillowgroup.slack.com/archives/C020U025QJK/p1641948076018500?thread_ts=1641946316.015000&cid=C020U025QJK
u
This also refers to the post from another thread “you can add a conditional at the top of your @step, like if x, which simply makes the step a no-op if the conditional is false”
s
here's an example of a `@skip` decorator that makes a step a no-op based on a condition
1
u
Thank you Ville for the reply. Can you paste the example content in this thread directly? I couldn’t open the archived example slack link. Can’t find @skip decorator in the official metaflow api page either.
s
no problem. Can you see this?
u
I can see it now. This is great. Thanks so much.
👍 1
u
@straight-shampoo-11124 Can I use a variable which value can be changed in the flow, like self.conditions instead of the parameter? I tried @skip(check=“self.conditions”, next=‘end’) , and changed the value self.conditions = 1 in the step, it throws error: AttributeError: Flow level attributes and Parameters are not modifiable
v
yep, it should work with any artifact. Try e.g. setting this in the
start
step
Copy code
self.x = 1
and then use
@skip(check='x', next='end')
thanks ty 1
u
Fantastic! Thanks.
among us party 1
b
(continuing an older thread to prevent too much duplication of the same topic) @victorious-lawyer-58417: I gave the skip decorator example a test where I wanted to skip a middle step but go to another logic step instead of
end
and ran into an error. Is there a better way of skipping middle steps? If I have 20 steps in my flow, but based on a parameter I want to skip steps 3-8, how would I be able to do that?
Copy code
Internal error:
    Based on static analysis of the code, step middle was expected to transition to step(s) middle2. However, when the code was executed, self.next() was called with end. Make sure there is only one unconditional self.next() call in the end of your step.
e
Lookint into the same question as above ⬆️
152 Views