Thoughts on a Flow step function decorator that in...
# dev-metaflow
c
Thoughts on a Flow step function decorator that injects a variable as a function parameter?
Copy code
class HelloFlow(FlowSpec):
    @datasets.dataset("my_dataset")
    @step
    def start(self, my_dataset: Dataset):
       pass
This would change how Metaflow parses a Flow
join
step https://github.com/zillow/metaflow/blob/tz/datasets/metaflow/graph.py#L114-L116
Copy code
elif len(self.out_funcs) == 1:
                    # TYPE: linear
                    if self.num_args > 1:
                        self.type = 'join'
                    else:
                        self.type = 'linear
The conundrum for me is that the definition of a
join
step is a Flow function with
> 1
argument...
āœ… 1
s
right.. that's the main conundrum
you don't want to pass
my_dataset
as an artifact,
<http://self.my|self.my>_dataset
? It would work
c
we could, but because of scoping that may be confusing
s
you don't want the artifact to be visible outside this step?
c
one (an Applied Scientist user, or any engineer) would likely think that a function decorator introduces a variable at function scope
while a class decorator could introduce
<http://self.my|self.my>_dataset
I could add a new function to
StepDecorator
say,
def parameters_introduced(self) -> int
And update the Graph parser self.num_args = len(func_ast.args.args) to take a step decorator
parameters_introduced()
into account
Thoughts?
This isn't as versatile as a more versatile definition of
join
step... It'd be nice to use a Python decorator that's not part of Metaflow, and let it introduce it's own function parameter .
s
this small hack works
you just need to add
*
in front of the extra arg
not optimal but works today šŸ™‚
c
extra
is a tuple šŸ™‚
s
that's true
c
I wish a join step had to marked or have variable named
inputs
(making it a keyword)
Copy code
@step
def join_step(self, inputs):
   pass
Do all join steps use
inputs
as variable name? I bet you can't guarantee that...
s
right. Given that we don't want to break existing flows, we'd just need to find a way to make the change in a backwards-compatible manner. I am optimistic that it is doable but requires some thinking
yeah, we can't assume
inputs
another option is to override
self
- you could inject your object in its place. Not optimal either but it should work today
c
it's what we currently do šŸ™‚
Copy code
class VanillaDatasetFlow(FlowSpec):
    @datasets.dataset(name="vanilla_dataset", partition_by="group", mode=Mode.Write)
    @step
    def start(self):
        df = pd.DataFrame({"group": ["A", "A", "A", "B", "B", "B"], "value": [1, 2, 3, 4, 5, 6]})
        print(f"saving: {df=}")

        # Example of writing to a vanilla dataset 
        self.vanilla_dataset.write(df)

        self.next(self.end)

    @step
    def end(self):
        print(f"I have dataset \n{self.vanilla_dataset=}")

        # vanilla_dataset read_pandas()
        print("self.vanilla_dataset.read_pandas:\n", self.vanilla_dataset.read_pandas())

        # vanilla_dataset read_spark()
        with SparkManager(auth=False):
            spark_df: DataFrame = self.vanilla_dataset.read_spark()
            spark_df.show()

        # vanilla_dataset read_dask()
        dask_df: DataFrame = self.vanilla_dataset.read_dask()
        dask_df = dask_df[dask_df.value < 4]
        print("self.vanilla_dataset.read_dask: [value < 4]\n", dask_df.compute())
Copy code
$ tree .metaflow/datastore                                              
.metaflow/datastore
└── VanillaDatasetFlow
    └── vanilla_dataset
        ā”œā”€ā”€ group=A
        │   └── run_id=1632960232898019
        │       └── run_time=20210929-170354
        │           └── part.0.parquet
        └── group=B
            └── run_id=1632960232898019
                └── run_time=20210929-170354
                    └── part.0.parquet
s
cool, that should work
what I meant is an approach like this
you can inject whatever properties to this
FakeSelf
without them becoming artifacts, which addresses the scoping issue