Hi guys! I'm new to Metaflow, so apologies if this...
# ask-metaflow
n
Hi guys! I'm new to Metaflow, so apologies if this is a silly question, but I'm curious about dynamic compute pool selection in flows. I work on a project called CarbonAware, which enables carbon aware scheduling in workflow orchestrators (the goal being to run workloads in times and places where energy is the greenest). I'm currently investigating building an integration for Metaflow (to go along with existing time shift integrations for Prefect and Airflow). My question is -- is it possible to dynamically select a compute pool, at flow runtime in Metaflow? Stealing the example code from the cross cloud example, would it be reasonable to implement something like the following?
Copy code
from metaflow import FlowSpec, step, resources
from carbonaware_metaflow import green_kubernetes
import urllib

class GreenCrossCloudFlow(FlowSpec):

    @step
    @kubernetes
    def start(self):
        req = urllib.request.Request('<https://raw.githubusercontent.com/dominictarr/random-name/master/first-names.txt>')
        with urllib.request.urlopen(req) as response:
            data = response.read()
        i = 0
        self.titles = data[:10]
        self.next(self.process, foreach='titles')

    @resources(cpu=1,memory=512)
    @green_kubernetes(node_selectors=[
        ("<http://outerbounds.co/provider=azure|outerbounds.co/provider=azure>", "<http://outerbounds.co/region=us-central1|outerbounds.co/region=us-central1>"),
        ("<http://outerbounds.co/provider=aws|outerbounds.co/provider=aws>", "<http://outerbounds.co/region=us-west-2|outerbounds.co/region=us-west-2>"),
    ]
    @step
    def process(self):
        self.title = '%s processed' % self.input
        self.next(self.join)

    @step
    def join(self, inputs):
        self.results = [input.title for input in inputs]
        self.next(self.end)

    @step
    def end(self):
        print('\n'.join(self.results))

if __name__ == '__main__':
    GreenCrossCloudFlow()
Thanks in advance for the help! Looking forward to working with this community :)
v
hi Ryan! That's a fun idea 🌤️ You can't choose a compute pool (or other decorator options) on the fly, but it might not be a big issue. if you want to use your data to choose the time and place where to run the flow, presumably one would call your service periodically (or you'd send a trigger from your side), to choose the best time to run a workload. This can be implemented as a scheduled flow, say,
GreenSensor
you can then have K different variants (branches) of the actual workflow,
MyFlow
, configured to target different environments, which you can define through configs - the flow code remains the same. Then based on the information received by
GreenSensor,
it decides which branch of
MyFlow
to call and at what time using event triggering. This approach has the benefit of making it possible to customize each variant a bit too, since running a flow in AWS might not be exactly the same as for Nebius, or Crusoe, or a random cluster next to solar panels. You may need to choose a different data path etc.
another benefit of this
GreenSensor
(that you can provide), is that folks can deploy it to drive their existing flows without having to change anything in their code (besides the deployment target). In particular security-sensitive folks like the idea of not having their business-critical flows calling some external 3rd party service - you can keep that separate
n
Thanks Ville, that's super helpful! Just to check my understanding, your suggestion is to implement a
GreenSensor
flow, which calls my external green scheduler service to decide on an optimal time and compute pool to run the downstream workflow. So if a users current flow looks like the first image... They would update it to the following...
UserFlow
would then be updated to include the following:
Copy code
@trigger(event="green_UserFlow")
class UserFlow(FlowSpec):
    node_selector = Paramater("node_selector", default=None)

    @kubernetes(node_selector=config.node_selector)
    @step
    def step(self):
        ...
v
almost! Instead of
Paramater("node_selector", default=None)
you need to have
Config("node_selector", default=None)
- the difference is that the former gets its value from the event at runtime whereas the latter is defined at deploy-time. You'd deploy a separate variant (or
--branch
using
@project
) for each compute environment with
@trigger(event=config.event_name)
where
event_name
could be
greenSensor.aws
or
greenSensor.gcp
or whatever environments you support Then your
greenSensor
creates a suitable event depending on the optimal environment at any given time For inspiration, here's a SensorFlow that triggers downstream (e.g. this visualization flow) whenever new data appears in Snowflake
n
I see, so the end user would have to define all possible compute pool variants of their workflow, at development time. What would happen if the user already had some variants of their workflow (say they're doing AB testing on model architecture)? Would they then have to define a product of workflow variants (each combination of region and model variant)? An important principle for my development of CarbonAware is that it's low friction for a user to integrate into their existing workflows. This feels like it'd be quite onerous for the end user. I'm curious if you have thoughts on how this could be abstracted away from the end user. Perhaps an
@green_project
decorator that creates a set of compute pool variants for a user? Also, just to make sure I'm super clear, there's no way to pass parameters from an event that are used by flow decorators? Really appreciate all of your help on this!
v
they just need to do a deployment for each compute pool variant, which can be done through configs if you wanted to, you could make a script that generates the necessary configs and even deploys them automatically, along the lines of this example, but it might be a bit overkill I don't know how things work in your end, but I'd imagine the user need to have an account in every cloud which they want to target, which is the main big lift. Once you have the clouds set up, it's very easy to setup configs and deploy (maybe in a script)
actually we have new features coming really soon which can make this even easier you could indeed introduce a custom
@green_project
flow-level decorator which modifies any flow to deploy to an environment of their/your choosing
n
this is super exciting -- thank you both for the above thread 🙇
🙌 2
n
Same new feature(s) as referenced in this thread? That's exciting to hear though! Anywhere that I can start learning more about that / develop against the new custom decorators feature? I understand your take that the main big lift is provisioning in each cloud / region that a customer wants to target. However, that lift is largely taken up-front, and likely by a dedicated ops team. The user experience I'd like to target is a developer / DS being able to make a decision like: "This workflow can run in any US region that we have allocated" Or "This workflow can be delayed by up to <X> hours" And then be able to, with as little effort as possible (e.g: adding an
@green(max_delay=<foo>, node_tolerations=<bar>)
decorator), implement that on their flow. Which sounds like it may be made possible with this new feature -- very exciting!