dry-city-99474
10/17/2024, 12:08 PMMETAFLOW_DEFAULT_DATASTORE
METAFLOW_DATASTORE_SYSROOT_S3
METAFLOW_DATATOOLS_S3ROOT
and it seems to work okay - I can get the python flow.py card server
to work and show me my results (I'm actually running this on an ec2 instance via ssh and then using ssh tunneling locally to see the page)
however, now I'm trying to load the flow run artifacts in a notebook but it says this flow doesn't exist (see screenshot)
I did double check that the name is correct in the string... I was wondering how I could go about debugging this. a couple questions:
• is there any way to list all the namespaces for the results?
• what's the easiest way to check/see where the Client API is trying to look for the artifacts?dry-city-99474
10/17/2024, 12:09 PMdry-beach-38304
10/17/2024, 4:57 PMdry-beach-38304
10/17/2024, 4:58 PMdry-city-99474
10/19/2024, 11:38 AMdry-beach-38304
10/19/2024, 8:10 PMdry-city-99474
11/20/2024, 10:44 AMdry-city-99474
11/20/2024, 3:50 PMnamespace(None)
and this doesn't seem to resolve the problem (it's still only retrieving my own flow runs)dry-city-99474
11/20/2024, 5:21 PMdry-city-99474
11/20/2024, 5:22 PMdry-beach-38304
11/20/2024, 10:29 PMdry-city-99474
11/21/2024, 11:23 AMfrom typing import Any
from metaflow.datastore.datastore_set import TaskDataStoreSet
from metaflow.datastore.flow_datastore import FlowDataStore
from metaflow.datastore.task_datastore import TaskDataStore
from metaflow.metaflow_environment import MetaflowEnvironment
from metaflow.plugins.datastores.s3_storage import S3Storage
def list_runs(flow_name: str) -> list[str]:
"""Gets all run IDs vailable on S3
Parameters
----------
flow_name : str
Metaflow FlowSpec class name
Returns
-------
list[str]
Valid Run IDs available for the given FlowSpec
"""
storage = S3Storage(S3Storage.get_datastore_root_from_config(None))
flow_content_list = storage.list_content([flow_name])
return [
entry.path.split("/")[1]
for entry in flow_content_list
if not entry.path.endswith("data/")
]
def load_all_steps(flow_name: str, run_id: str) -> dict[str, TaskDataStore]:
"""Loads all data for every step for the given flow run
Parameters
----------
flow_name : str
Metaflow FlowSpec class name
run_id : str
Valid flow run id
Returns
-------
dict[str, TaskDataStore]
Dictionary from step name to step artifacts
"""
environment = MetaflowEnvironment(None)
flow_datastore = FlowDataStore(
flow_name,
environment,
storage_impl=S3Storage,
ds_root=S3Storage.get_datastore_root_from_config(None),
)
ds_set = TaskDataStoreSet(flow_datastore, run_id)
return {a.step_name: a for a in ds_set}
def load_step(flow_name: str, run_id: str, step_name: str) -> dict[str, Any]:
"""Load data for a specific step for a specific run
Parameters
----------
flow_name : str
Metaflow FlowSpec class name
run_id : str
Flow run id
step_name : str
Name of the step
Returns
-------
dict[str, Any]
Dictionary containing all artifacts available for
the step
"""
step_datastore = load_all_steps(flow_name, run_id)[step_name]
return step_datastore.to_dict()
def load_step_end_data(flow_name: str, run_id: str) -> dict[str, Any]:
"""This is the go-to/standard function to use when
inspecting flow run results.
Loads the artifacts available at the end of the flow.
Parameters
----------
flow_name : str
Metaflow FlowSpec class name
run_id : str
Flow run id
Returns
-------
dict[str, Any]
All artifacts available at the end step,
keyed by variable name
"""
return load_step(flow_name, run_id, "end")
dry-city-99474
11/21/2024, 11:23 AM