Another question -- it looks like `@resources` dec...
# dev-metaflow
s
Another question -- it looks like
@resources
decorator is being ignored when creating Argo workflow templates, but changing it to
@kubernetes
fixes the issue. From my understanding
@resources
should work the same locally and on the cloud, so I wonder if this is a bug? Thanks!
1
a
Interesting - let me verify and get back to you on this. If this is happening, it's clearly a bug that needs to be fixed.
s
Sounds great. Thank you!
a
I am unable to reproduce this issue. Do you have a reproducible example?
this works as expected -
Copy code
from metaflow import FlowSpec, step, resources

class BranchFlow(FlowSpec):

    @resources(cpu=2)
    @step
    def start(self):
        print("hi")
        self.next(self.a, self.b)

    @step
    def a(self):
        self.x = 1
        self.next(self.join)

    @step
    def b(self):
        self.x = 2
        self.next(self.join)

    @step
    def join(self, inputs):
        print('a is %s' % inputs.a.x)
        print('b is %s' % inputs.b.x)
        print('total is %d' % sum(input.x for input in inputs))
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == '__main__':
    BranchFlow()
as does
python branch_flow.py --with resources:cpu=2 argo-workflows create
s
Your example works for me as well. However this one does not:
Copy code
from metaflow import FlowSpec, step, resources


class ForeachFlowK8s(FlowSpec):
    @resources(memory=128, cpu=0.1)
    @step
    def start(self):
        self.titles = ["Stranger Things", "House of Cards", "Narcos"]
        self.next(self.a, foreach="titles")

    @resources(memory=128, cpu=0.1)
    @step
    def a(self):
        self.title = "%s processed" % self.input
        self.next(self.join)

    @resources(memory=128, cpu=0.1)
    @step
    def join(self, inputs):
        self.results = [input.title for input in inputs]
        self.next(self.end)

    @resources(memory=128, cpu=0.1)
    @step
    def end(self):
        print("\n".join(self.results))


if __name__ == "__main__":
    ForeachFlowK8s()
Copy code
$  python example.py argo-workflows create
Metaflow 2.6.1 executing ForeachFlowK8s for user:yan
Validating your flow...
    The graph looks good!
Running pylint...
    Pylint is happy!
Deploying foreachflowk8s to Argo Workflows...
It seems this is the first time you are deploying foreachflowk8s to Argo Workflows.

A new production token generated.

The namespace of this production flow is
    production:foreachflowk8s-0-rxph
To analyze results of this production flow add this line in your notebooks:
    namespace("production:foreachflowk8s-0-rxph")
If you want to authorize other people to deploy new versions of this flow to Argo Workflows, they need to call
    argo-workflows create --authorize foreachflowk8s-0-rxph
when deploying this flow to Argo Workflows for the first time.
See "Organizing Results" at <https://docs.metaflow.org/> for more information about production tokens.

Workflow foreachflowk8s for flow ForeachFlowK8s pushed to Argo Workflows successfully.

Note that the flow was deployed with a modified name due to Kubernetes naming conventions
on Argo Workflows. The original flow name is stored in the workflow annotation.

What will trigger execution of the workflow:
    No triggers defined. You need to launch this workflow manually.
Copy code
$  argo template get foreachflowk8s -o yaml | grep cpu
        >/dev/null 2>/dev/null; then python example.py --with kubernetes:cpu=1.0,memory=4096.0,disk=10240,image=python:3.10,service_account=argo-workflow,namespace=yan,gpu_vendor=nvidia
        $METAFLOW_TASK_ID-params; fi && python example.py --with kubernetes:cpu=1.0,memory=4096.0,disk=10240,image=python:3.10,service_account=argo-workflow,namespace=yan,gpu_vendor=nvidia
          cpu: "1"
        tar xf job.tar && mflog 'Task is starting.' && (python example.py --with kubernetes:cpu=1.0,memory=4096.0,disk=10240,image=python:3.10,service_account=argo-workflow,namespace=yan,gpu_vendor=nvidia
          cpu: "1"
        tar xf job.tar && mflog 'Task is starting.' && (python example.py --with kubernetes:cpu=1.0,memory=4096.0,disk=10240,image=python:3.10,service_account=argo-workflow,namespace=yan,gpu_vendor=nvidia
          cpu: "1"
        tar xf job.tar && mflog 'Task is starting.' && (python example.py --with kubernetes:cpu=1.0,memory=4096.0,disk=10240,image=python:3.10,service_account=argo-workflow,namespace=yan,gpu_vendor=nvidia
          cpu: "1"
a
Ah yes - there is a known issue where you can't specify values less than 1 for CPUs. We plan to roll out a fix very soon.
I will follow up when the fix ships.
s
Oh I see, makes sense! Thank you! I'll request more CPU for now.