nutritious-coat-36638
05/26/2025, 3:59 PMfrom metaflow import FlowSpec, step, resources
from carbonaware_metaflow import green_kubernetes
import urllib
class GreenCrossCloudFlow(FlowSpec):
@step
@kubernetes
def start(self):
req = urllib.request.Request('<https://raw.githubusercontent.com/dominictarr/random-name/master/first-names.txt>')
with urllib.request.urlopen(req) as response:
data = response.read()
i = 0
self.titles = data[:10]
self.next(self.process, foreach='titles')
@resources(cpu=1,memory=512)
@green_kubernetes(node_selectors=[
("<http://outerbounds.co/provider=azure|outerbounds.co/provider=azure>", "<http://outerbounds.co/region=us-central1|outerbounds.co/region=us-central1>"),
("<http://outerbounds.co/provider=aws|outerbounds.co/provider=aws>", "<http://outerbounds.co/region=us-west-2|outerbounds.co/region=us-west-2>"),
]
@step
def process(self):
self.title = '%s processed' % self.input
self.next(self.join)
@step
def join(self, inputs):
self.results = [input.title for input in inputs]
self.next(self.end)
@step
def end(self):
print('\n'.join(self.results))
if __name__ == '__main__':
GreenCrossCloudFlow()
Thanks in advance for the help! Looking forward to working with this community :)victorious-lawyer-58417
05/26/2025, 7:59 PMGreenSensor
you can then have K different variants (branches) of the actual workflow, MyFlow
, configured to target different environments, which you can define through configs - the flow code remains the same.
Then based on the information received by GreenSensor,
it decides which branch of MyFlow
to call and at what time using event triggering.
This approach has the benefit of making it possible to customize each variant a bit too, since running a flow in AWS might not be exactly the same as for Nebius, or Crusoe, or a random cluster next to solar panels. You may need to choose a different data path etc.victorious-lawyer-58417
05/26/2025, 8:03 PMGreenSensor
(that you can provide), is that folks can deploy it to drive their existing flows without having to change anything in their code (besides the deployment target). In particular security-sensitive folks like the idea of not having their business-critical flows calling some external 3rd party service - you can keep that separatenutritious-coat-36638
05/27/2025, 3:30 AMGreenSensor
flow, which calls my external green scheduler service to decide on an optimal time and compute pool to run the downstream workflow.
So if a users current flow looks like the first image...
They would update it to the following...nutritious-coat-36638
05/27/2025, 3:34 AMUserFlow
would then be updated to include the following:
@trigger(event="green_UserFlow")
class UserFlow(FlowSpec):
node_selector = Paramater("node_selector", default=None)
@kubernetes(node_selector=config.node_selector)
@step
def step(self):
...
victorious-lawyer-58417
05/27/2025, 3:58 AMParamater("node_selector", default=None)
you need to have Config("node_selector", default=None)
- the difference is that the former gets its value from the event at runtime whereas the latter is defined at deploy-time. You'd deploy a separate variant (or --branch
using @project
) for each compute environment with @trigger(event=config.event_name)
where event_name
could be greenSensor.aws
or greenSensor.gcp
or whatever environments you support
Then your greenSensor
creates a suitable event depending on the optimal environment at any given time
For inspiration, here's a SensorFlow that triggers downstream (e.g. this visualization flow) whenever new data appears in Snowflakenutritious-coat-36638
05/27/2025, 6:12 PM@green_project
decorator that creates a set of compute pool variants for a user?
Also, just to make sure I'm super clear, there's no way to pass parameters from an event that are used by flow decorators?
Really appreciate all of your help on this!victorious-lawyer-58417
05/30/2025, 4:34 AMvictorious-lawyer-58417
05/30/2025, 4:36 AM@green_project
flow-level decorator which modifies any flow to deploy to an environment of their/your choosingnutritious-magazine-38839
05/30/2025, 6:44 AMnutritious-coat-36638
05/30/2025, 10:13 PM@green(max_delay=<foo>, node_tolerations=<bar>)
decorator), implement that on their flow. Which sounds like it may be made possible with this new feature -- very exciting!