lively-lunch-9285
06/10/2025, 11:45 PMpython <flow>.py ...
cli has a lot going on. There are a lot of arguments and options you can pass in.
We could totally make a VS Code extension that turns some of it into a push-button affair:
• killing kubernetes pods
• resuming (and from which step? 🤔 )
• running
• choosing your environment
• launching a debug session that stops if you set a breakpoint
I'm just saying, there are a TON of subcommands. And I've been guilty of asking repeat questions on these, as well as fielding tons of questions about it all.chilly-france-99853
06/10/2025, 6:47 AMshy-midnight-40599
06/06/2025, 2:44 PMfast-vr-44972
06/06/2025, 12:42 PMgentle-author-38571
06/06/2025, 6:40 AMquick-carpet-67110
06/05/2025, 11:37 AMpython flow.py --environment=pypi --with retry airflow create flow_dag.py
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/pluggy/_callers.py", line 139, in _multicall
raise exception.with_traceback(exception.__traceback__)
File "/opt/python3.11/lib/python3.11/site-packages/pluggy/_callers.py", line 103, in _multicall
res = hook_impl.function(*args)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/etc/airflow/config/airflow_local_settings.py", line 62, in pod_mutation_hook
and any(env_var.name == "AIRFLOW_IS_K8S_EXECUTOR_POD" for env_var in container.env)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/etc/airflow/config/airflow_local_settings.py", line 62, in <genexpr>
and any(env_var.name == "AIRFLOW_IS_K8S_EXECUTOR_POD" for env_var in container.env)
^^^^^^^^^^^^
AttributeError: 'dict' object has no attribute 'name'
bland-helicopter-46902
05/31/2025, 1:06 PMcolossal-scooter-60260
05/31/2025, 2:27 AMbulky-portugal-95315
05/29/2025, 7:00 PMlively-lunch-9285
05/29/2025, 3:02 PMstale-grass-96920
05/29/2025, 12:27 AMbotocore.exceptions.NoCredentialsError: Unable to locate credentials
• I'm running this to execute: python 02-statistics/stats.py argo-workflows create --max-workers 2
• I have my ~/.metaflowconfig/config.json set with METAFLOW_DATASTORE_SYSROOT_S3, METAFLOW_DATATOOLS_S3ROOT, METAFLOW_DEFAULT_DATASTORE (is there a
• IMO Argo Workflow is configured correctly because it can read/write to the Minio Bucket.
I'm looking through the metaflow service code and see the plugins/aws code uses boto but I don't see anyway obvious way to pass Minio's aws_access_key_id and aws_secret_access_key.
Any guidance is appreciated... just point me in the right direction. I feel like I'm missing something obvious.hundreds-wire-22547
05/27/2025, 8:20 PMArgoEvent
? Looking at underlying code, I didn't see an env var or something similar to do this but maybe I missed itmillions-church-9220
05/27/2025, 9:57 AMdry-angle-21635
05/27/2025, 8:20 AM@step
def evaluate_regression(self) -> None:
if self.infra:
self.next(self.evaluate_on_kubernetes)
else:
self.next(self.evaluate_locally)
@kubernetes(
gpu=1,
cpu=4,
memory=16_000, # 16Gb
namespace="X",
)
@step
def evaluate_on_kubernetes(self) -> None:
...
self.next(self.join_target_results)
@step
def evaluate_locally(self) -> None:
...
self.next(self.join_target_results)
millions-church-9220
05/27/2025, 7:08 AMthousands-rocket-55304
05/26/2025, 11:27 PMnutritious-coat-36638
05/26/2025, 3:59 PMfrom metaflow import FlowSpec, step, resources
from carbonaware_metaflow import green_kubernetes
import urllib
class GreenCrossCloudFlow(FlowSpec):
@step
@kubernetes
def start(self):
req = urllib.request.Request('<https://raw.githubusercontent.com/dominictarr/random-name/master/first-names.txt>')
with urllib.request.urlopen(req) as response:
data = response.read()
i = 0
self.titles = data[:10]
self.next(self.process, foreach='titles')
@resources(cpu=1,memory=512)
@green_kubernetes(node_selectors=[
("<http://outerbounds.co/provider=azure|outerbounds.co/provider=azure>", "<http://outerbounds.co/region=us-central1|outerbounds.co/region=us-central1>"),
("<http://outerbounds.co/provider=aws|outerbounds.co/provider=aws>", "<http://outerbounds.co/region=us-west-2|outerbounds.co/region=us-west-2>"),
]
@step
def process(self):
self.title = '%s processed' % self.input
self.next(self.join)
@step
def join(self, inputs):
self.results = [input.title for input in inputs]
self.next(self.end)
@step
def end(self):
print('\n'.join(self.results))
if __name__ == '__main__':
GreenCrossCloudFlow()
Thanks in advance for the help! Looking forward to working with this community :)purple-air-87768
05/22/2025, 11:57 PM@validate(input_schema={var: schema}, output_schema={var: schema})
@step
def step_func()
the decorator can hook the step function and perform validation at the start and end of the step function (after unpickling input vars and before pickling output vars).
i wonder if this is something interesting to Metaflow, or if there are more native way of doing validation as soon as the serialization happens.
thanks!enough-article-90757
05/22/2025, 4:18 PM@kubernetes(
image='a_image_name'
)
@step
def a(self):
from some_package_for_a import a_logic
self.output = a_logic()
self.next(self.b)
@kubernetes
@step
def b(self, inputs):
print([input for input in inputs])
I'd expect that to finish without errors, but instead we get:
2025-05-22 16:02:22.552 [11/b/60 (pid 179819)] ModuleNotFoundError: No module named 'some_package_for_a'
Why is this the case?billowy-agency-44736
05/22/2025, 7:48 AMimportant-bear-42262
05/22/2025, 7:23 AMdef load_model(model_name):
with S3(run=Flow('MyFlow').latest_successful_run) as s3:
response = s3.get(model_name)
model_bytes = io.BytesIO(response.blob)
model = joblib.load(model_bytes)
return model
It is trying to store file metaflow.s3.ee54mdiw
when I call it. Is it possible to change path where it is going to store it?gifted-shampoo-74550
05/20/2025, 8:19 PMpython myflow.py --help
(and all the subcommands recursively under that).
I was actually curious for the sake of using the Runner
API, but the docs just say things like "Additional arguments that you would pass to python myflow.py
after the run
command" 🙂; further, in some cases these appear to be positional args for the CLI but must be kwargs for the Runner
methods. E.g., specifically I was looking for runner.resume(step_to_rerun="step_name")
, which, to its credit does match the CLI --help
docs, but feels a bit surprising.important-london-94970
05/20/2025, 6:41 PMfull-kilobyte-32033
05/20/2025, 5:49 PMfull-kilobyte-32033
05/20/2025, 5:48 PMMetadata request (/flows/FlowyFlow) failed (code 500): "{\"err_msg\": {\"pgerror\": \"ERROR: relation \\\"flows_v3\\\" does not exist\\nLINE 4: FROM flows_v3\\n ^\\n\", \"pgcode\": \"42P01\", \"diag\": {\"message_primary\": \"relation \\\"flows_v3\\\" does not exist\", \"severity\": \"ERROR\"}}}"
adamant-psychiatrist-48924
05/20/2025, 8:15 AMwhite-helicopter-28706
05/20/2025, 2:28 AM@pypi
in the table of contents of Metaflow docsclever-arm-75024
05/19/2025, 10:35 PMrich-agent-87730
05/19/2025, 5:01 PMValueError: Metaflow service [<http://metaflow/metadata/service.com>] unreachable.
Do you guys know why this is happening? I’m guessing it’s because the service is closing the connection for some reason? Are there any fixes that we can do?cuddly-rocket-69327
05/17/2025, 12:25 AM