Hey all! I'm working with a fairly gnarly codebas...
# ask-metaflow
b
Hey all! I'm working with a fairly gnarly codebase that I think might benefit from some metaflow magic . I'm curious if anyone has any tips for serializing variables as flow artifacts inside highly nested function calls? without massively rewriting code or passing down
self
through a ton of calls. for example:
Copy code
def f():
    # ideally here i'd like to serialise `a` here as Flow artifact 
    a = 5


class Flow(FlowSpec):
    @step
    def start(self):
        f()

        self.next(self.end)

    @step
    def end(self):
        pass
I'm aware of
metaflow.S3
but I don't think that helps much, since I'd still have to store the S3 key somewhere to be able to easily access the artifact using the clientAPI 🤔
h
How about this approach which saves
self
using thread-local storage? flow.py:
Copy code
from metaflow import FlowSpec, step, current
from utils import self_context_manager, serialize_as_artifact


def f():
    # Inside a deeply nested function, serialize variable `a` as an artifact
    a = 42
    print(f"Serializing variable `a` with value: {a}")
    serialize_as_artifact("a", a)  # Dynamically store `a` as an artifact


class MyFlow(FlowSpec):
    @step
    def start(self):
        with self_context_manager(self):
            f()
        self.next(self.end)

    @step
    def end(self):
        # Access the serialized artifact
        print(f"Value of `a`: {self.a}")


if __name__ == "__main__":
    MyFlow()
utils.py:
Copy code
import threading
from contextlib import contextmanager

# Thread-local storage for step context
_step_context = threading.local()


@contextmanager
def self_context_manager(self):
    """Context manager to set and reset the self context."""
    _step_context.self = self
    try:
        yield
    finally:
        _step_context.self = None


def get_self_context():
    """Retrieve the current self context."""
    return getattr(_step_context, "self", None)


def serialize_as_artifact(name, value):
    """Serialize a variable as an artifact."""
    self = get_self_context()
    if self:
        setattr(self, name, value)
    else:
        raise RuntimeError("No current self. Is this running inside a step?")
it's able to access
a
from
end
Copy code
2025-01-23 08:28:34.893 [59/start/261397941 (pid 84467)] Task is starting.
2025-01-23 08:28:43.613 [59/start/261397941 (pid 84467)] Serializing variable `a` with value: 42
2025-01-23 08:28:47.736 [59/start/261397941 (pid 84467)] Task finished successfully.
2025-01-23 08:28:50.161 [59/end/261397950 (pid 84488)] Task is starting.
2025-01-23 08:28:59.114 [59/end/261397950 (pid 84488)] Value of `a`: 42
2025-01-23 08:29:03.350 [59/end/261397950 (pid 84488)] Task finished successfully.