cuddly-rocket-69327
09/29/2021, 11:10 PMclass HelloFlow(FlowSpec):
@datasets.dataset("my_dataset")
@step
def start(self, my_dataset: Dataset):
pass
This would change how Metaflow parses a Flow join
step https://github.com/zillow/metaflow/blob/tz/datasets/metaflow/graph.py#L114-L116
elif len(self.out_funcs) == 1:
# TYPE: linear
if self.num_args > 1:
self.type = 'join'
else:
self.type = 'linear
The conundrum for me is that the definition of a join
step is a Flow function with > 1
argument...straight-shampoo-11124
09/29/2021, 11:42 PMstraight-shampoo-11124
09/29/2021, 11:42 PMmy_dataset
as an artifact, <http://self.my|self.my>_dataset
? It would workcuddly-rocket-69327
09/29/2021, 11:42 PMstraight-shampoo-11124
09/29/2021, 11:43 PMcuddly-rocket-69327
09/29/2021, 11:43 PMcuddly-rocket-69327
09/29/2021, 11:44 PM<http://self.my|self.my>_dataset
cuddly-rocket-69327
09/29/2021, 11:47 PMStepDecorator
say, def parameters_introduced(self) -> int
And update the Graph parser self.num_args = len(func_ast.args.args) to take a step decorator parameters_introduced()
into accountcuddly-rocket-69327
09/29/2021, 11:47 PMcuddly-rocket-69327
09/29/2021, 11:48 PMjoin
step...
It'd be nice to use a Python decorator that's not part of Metaflow, and let it introduce it's own function parameter .straight-shampoo-11124
09/29/2021, 11:50 PMstraight-shampoo-11124
09/29/2021, 11:50 PM*
in front of the extra argstraight-shampoo-11124
09/29/2021, 11:50 PMcuddly-rocket-69327
09/29/2021, 11:54 PMextra
is a tuple šstraight-shampoo-11124
09/29/2021, 11:54 PMcuddly-rocket-69327
09/29/2021, 11:56 PMinputs
(making it a keyword)
@step
def join_step(self, inputs):
pass
Do all join steps use inputs
as variable name? I bet you can't guarantee that...straight-shampoo-11124
09/29/2021, 11:57 PMstraight-shampoo-11124
09/29/2021, 11:57 PMinputs
straight-shampoo-11124
09/30/2021, 12:00 AMself
- you could inject your object in its place. Not optimal either but it should work todaycuddly-rocket-69327
09/30/2021, 12:02 AMclass VanillaDatasetFlow(FlowSpec):
@datasets.dataset(name="vanilla_dataset", partition_by="group", mode=Mode.Write)
@step
def start(self):
df = pd.DataFrame({"group": ["A", "A", "A", "B", "B", "B"], "value": [1, 2, 3, 4, 5, 6]})
print(f"saving: {df=}")
# Example of writing to a vanilla dataset
self.vanilla_dataset.write(df)
self.next(self.end)
@step
def end(self):
print(f"I have dataset \n{self.vanilla_dataset=}")
# vanilla_dataset read_pandas()
print("self.vanilla_dataset.read_pandas:\n", self.vanilla_dataset.read_pandas())
# vanilla_dataset read_spark()
with SparkManager(auth=False):
spark_df: DataFrame = self.vanilla_dataset.read_spark()
spark_df.show()
# vanilla_dataset read_dask()
dask_df: DataFrame = self.vanilla_dataset.read_dask()
dask_df = dask_df[dask_df.value < 4]
print("self.vanilla_dataset.read_dask: [value < 4]\n", dask_df.compute())
cuddly-rocket-69327
09/30/2021, 12:04 AM$ tree .metaflow/datastore
.metaflow/datastore
āāā VanillaDatasetFlow
āāā vanilla_dataset
āāā group=A
āĀ Ā āāā run_id=1632960232898019
āĀ Ā āāā run_time=20210929-170354
āĀ Ā āāā part.0.parquet
āāā group=B
āāā run_id=1632960232898019
āāā run_time=20210929-170354
āāā part.0.parquet
straight-shampoo-11124
09/30/2021, 12:10 AMstraight-shampoo-11124
09/30/2021, 12:10 AMstraight-shampoo-11124
09/30/2021, 12:11 AMFakeSelf
without them becoming artifacts, which addresses the scoping issue