Hi team, we’re trying to add custom step decorator...
# ask-metaflow
r
Hi team, we’re trying to add custom step decorator to emit some flow metrics. For example, we want to capture the time cost to run the flow. runtime seems to be a good fit.
Copy code
flow_time =  
 runtime_finished and step_name = "end"
 minus
 runtime_init and step_name = "start"
But we observed that: 1. in local mode,
start_step's runtime_finished
is called after
end_step's runtime_finished
. meaning the former would be more accurate. (i assume that the longer the more accurate when measuring flow run time for my use case. ) curious if this is some known impl details or just our specific env (multithread exec order?) 2. i think
step_init
is called before
runtime_init
, but there’s no
step_finished
what’s the best approach if we want to measure and emit metrics about a flow’s run’s
startedAt
and
endedAt
? which are the best hooks to recommend? https://github.com/Netflix/metaflow/blob/master/metaflow/decorators.py#L360
v
by far the easiest approach is to create a pure Python decorator like here:
Copy code
from metaflow import FlowSpec, step
from functools import wraps
import time

def timing(f):
    @wraps(f)
    def wrapper(self):
        start = time.time()
        f(self)
        self.duration = time.time() - start
        print(f"Task took {self.duration} seconds")
    return wrapper

class TimingFlow(FlowSpec):

    @timing
    @step
    def start(self):
        for i in range(3):
            print(i)
            time.sleep(1)
        self.next(self.end)

    @step
    def end(self):
        print("done!")

if __name__ == '__main__':
    TimingFlow()
👀 1
no need to create an extension for it
if you want to measure the execution time of the whole flow, you could do something like this:
Copy code
def timing(f):
    @wraps(f)
    def wrapper(self):
        if f.__name__ == 'start':
            self.flow_start = time.time()
        f(self)
        if f.__name__ == 'end':
            self.duration = time.time() - self.flow_start
            print(f"Task took {self.duration} seconds")
    return wrapper
🙌 1
r
cc @mammoth-monitor-19889
m
The wrapper would then have to be explicitly mentioned in every flow. What we are trying to achieve is have a wrapper sort of for every flow that is run. Hence the decorator approach
r
we’re already using the
@poetry
flow level decorator right? can we use the flow level decorator to add this custom decorator(? or wrapper) for every step?
s
btw the blogpost yesterday was a great read. would you be open to publicly sharing more about how your @poetry and @raystep decorators work?
thankyou 1
1
r
Thanks Savin! Defer to @brash-wolf-45301 on this.
b
Hi @square-wire-39606 ideally it would be great if we can integrate and commit these decorators to open source metaflow. What do you think? I can work with @ripe-alarm-8919 and @mammoth-monitor-19889 (who are currently working on it) to write maybe a one-pager on how we have it implemented, to get some of y'alls eyes on the approach.
👍 1
I am a proponent of open source software and if it would help the community, we can contribute back.
s
Yes, we would love to learn more and figure out a way forward
v
@mammoth-monitor-19889 makes sense - in that case you can create an extension • take a look at this `@pyspark` extension as an example which uses the
task_decorate
hook to attach behavior pre/post-step • you can then execute code before/after the user code in your wrapper - just call `step_func()` to invoke user code in the middle. You can create artifacts too to store timing info etc • there are a few different mechanisms for including the decorator by default in all runs, but you can do it e.g. via Metaflow config or an env var or a custom base class
we have also new functionality that will make this much easier soon - stay tuned! (but the above extension approach works today, albeit it's a bit more complicated)
🌟 1
👀 1