some-ghost-45692
05/24/2022, 9:44 PM@resources
decorator is being ignored when creating Argo workflow templates, but changing it to @kubernetes
fixes the issue.
From my understanding @resources
should work the same locally and on the cloud, so I wonder if this is a bug? Thanks!ancient-application-36103
05/24/2022, 9:45 PMsome-ghost-45692
05/24/2022, 9:46 PMancient-application-36103
05/24/2022, 9:50 PMancient-application-36103
05/24/2022, 9:51 PMfrom metaflow import FlowSpec, step, resources
class BranchFlow(FlowSpec):
@resources(cpu=2)
@step
def start(self):
print("hi")
self.next(self.a, self.b)
@step
def a(self):
self.x = 1
self.next(self.join)
@step
def b(self):
self.x = 2
self.next(self.join)
@step
def join(self, inputs):
print('a is %s' % inputs.a.x)
print('b is %s' % inputs.b.x)
print('total is %d' % sum(input.x for input in inputs))
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
BranchFlow()
ancient-application-36103
05/24/2022, 9:56 PMpython branch_flow.py --with resources:cpu=2 argo-workflows create
some-ghost-45692
05/24/2022, 9:58 PMfrom metaflow import FlowSpec, step, resources
class ForeachFlowK8s(FlowSpec):
@resources(memory=128, cpu=0.1)
@step
def start(self):
self.titles = ["Stranger Things", "House of Cards", "Narcos"]
self.next(self.a, foreach="titles")
@resources(memory=128, cpu=0.1)
@step
def a(self):
self.title = "%s processed" % self.input
self.next(self.join)
@resources(memory=128, cpu=0.1)
@step
def join(self, inputs):
self.results = [input.title for input in inputs]
self.next(self.end)
@resources(memory=128, cpu=0.1)
@step
def end(self):
print("\n".join(self.results))
if __name__ == "__main__":
ForeachFlowK8s()
$ python example.py argo-workflows create
Metaflow 2.6.1 executing ForeachFlowK8s for user:yan
Validating your flow...
The graph looks good!
Running pylint...
Pylint is happy!
Deploying foreachflowk8s to Argo Workflows...
It seems this is the first time you are deploying foreachflowk8s to Argo Workflows.
A new production token generated.
The namespace of this production flow is
production:foreachflowk8s-0-rxph
To analyze results of this production flow add this line in your notebooks:
namespace("production:foreachflowk8s-0-rxph")
If you want to authorize other people to deploy new versions of this flow to Argo Workflows, they need to call
argo-workflows create --authorize foreachflowk8s-0-rxph
when deploying this flow to Argo Workflows for the first time.
See "Organizing Results" at <https://docs.metaflow.org/> for more information about production tokens.
Workflow foreachflowk8s for flow ForeachFlowK8s pushed to Argo Workflows successfully.
Note that the flow was deployed with a modified name due to Kubernetes naming conventions
on Argo Workflows. The original flow name is stored in the workflow annotation.
What will trigger execution of the workflow:
No triggers defined. You need to launch this workflow manually.
$ argo template get foreachflowk8s -o yaml | grep cpu
>/dev/null 2>/dev/null; then python example.py --with kubernetes:cpu=1.0,memory=4096.0,disk=10240,image=python:3.10,service_account=argo-workflow,namespace=yan,gpu_vendor=nvidia
$METAFLOW_TASK_ID-params; fi && python example.py --with kubernetes:cpu=1.0,memory=4096.0,disk=10240,image=python:3.10,service_account=argo-workflow,namespace=yan,gpu_vendor=nvidia
cpu: "1"
tar xf job.tar && mflog 'Task is starting.' && (python example.py --with kubernetes:cpu=1.0,memory=4096.0,disk=10240,image=python:3.10,service_account=argo-workflow,namespace=yan,gpu_vendor=nvidia
cpu: "1"
tar xf job.tar && mflog 'Task is starting.' && (python example.py --with kubernetes:cpu=1.0,memory=4096.0,disk=10240,image=python:3.10,service_account=argo-workflow,namespace=yan,gpu_vendor=nvidia
cpu: "1"
tar xf job.tar && mflog 'Task is starting.' && (python example.py --with kubernetes:cpu=1.0,memory=4096.0,disk=10240,image=python:3.10,service_account=argo-workflow,namespace=yan,gpu_vendor=nvidia
cpu: "1"
ancient-application-36103
05/24/2022, 10:01 PMancient-application-36103
05/24/2022, 10:01 PMsome-ghost-45692
05/24/2022, 10:02 PM