Hey everyone, I have some questions regarding `for...
# ask-metaflow
i
Hey everyone, I have some questions regarding
foreach
and the merging process. Here’s how the flow roughly looks
Copy code
@step
    def start(self):
        """This is the first step"""
        # set environment variable for reading config
        self.var_to_keep = "x"

        self.next(self.prepare_parquets)

    @step
    def prepare_parquets(self):
        self.parquet_files = ["parquet_1", "parquet_2"]

        self.next(self.process_parquets, foreach="parquet_files")

    @catch(var="process_jobs_failed")
    @step
    def process_parquets(self):
        self.batches = read(self.input) # assume we want to extract every row and do something with it, in groups of 10 or something
        self.next(self.prepare_batches)

    @step
    def prepare_batches(self):
        self.next(self.zip_and_archive, foreach="batches")

    @catch(var="zip_and_archive_failed")
    @step
    def zip_and_archive(self):
        self.zip_key = archive(self.input) # assume we are zipping up the files together
        self.next(self.aggregate_zip_results)

    @retry(times=3)
    @step
    def aggregate_zip_results(self, inputs):
        self.merge_artifacts(
            inputs, include=["var_to_keep"]
        )

        # try to merge failed parquet files from previous run, doesn't always exist
        try:
            self.merge_artifacts(inputs, include=["process_jobs_failed"])
        except MetaflowNotFound:
            pass
        
        # collect results.
        self.batch_success = []
        self.batch_failed = []

        for i in inputs:
            if i.input is None:
                # skip if none.
                continue
            elif hasattr(i, "zip_and_archive_failed") and i.zip_and_archive_failed:
                # log the failed process, to retry in the future
                self.batch_failed.append(i.input)
            else:
                self.batch_success.append(i.zip_key)

        self.next(self.aggregate_job_results)

    @step
    def aggregate_job_results(self, inputs):
        """Aggregate the results of the archive job process"""
        from archiver.archiver import ArchiveJob

        # combine 
        self.failed_parquet_files = []
        self.failed_batches = []
        self.keys = []

        for i in inputs:
            if hasattr(i, "process_jobs_failed") and i.process_jobs_failed:
                # log the failed process, to retry in the future
                archive_job: ArchiveJob = i.archive_job
                if archive_job.report_file:
                    self.failed_parquet_files.append(archive_job.report_file)
                if archive_job.archiver_batches:
                    self.failed_batches.extend(archive_job.archiver_batches)
            else:
                # combine the list of successful zip and archive jobs
                self.keys.extend(i.keys)
                # combine the list of failed batches
                self.failed_batches.extend(i.failed_batches)

        self.next(self.end)
What is essentially happening is, it’s a nested foreach. • one to process multiple parquet files in parallel • then another to process each row of the parquet in variable batch sizes Here’s the problem that I am trying to figure out. Since the input can somewhat vary, some runs can have 10 parquets, some have thousands. Similarly for the contents of the parquet files. The parallel processing itself is fine, as the amount of memory/cpu needed for each of the parallel step is somewhat consistent. The trouble comes in the joining part. It keeps running out of memory. Now… question is, was there something in the code that should be avoided in order not to bloat the join step? e.g. the
aggregate_zip_results
step. • I’ve read somewhere that referencing the
input
would load the whole artifact. if I were to run
self.merge_artifacts(inputs, include["var_to_keep"]
would that essentially load every single artifact that was run in parallel? • Similarly there’s a variable that exist if the previous step throws an exception
process_jobs_failed
, which I try to merge when it exists. Is this the right approach? Or would there be a better way. • Finally, I also want to aggregate all the results together. Which means I’ll need to load the inputs anyway. What are my options here? Should I just give it a bunch of memory? Or was there a better way of writing/designing this? Thanks!
1
d
Hey @icy-terabyte-20903, a few clarifications: • no, merge artifacts does load any artifact. It simply compares hashes. The statement you did doesn’t do much though since it will look at one artifact
var_to_keep
and check if they are the same on all the incoming branches and if so will propagate it along. • The code looks a bit weird in the sense that you have
process_jobs_failed
in
process_parquests
but then you use
batches
without issue —
batches
is not guaranteed to exist if there was an exception. Further, I would not split
process_parquets
and
prepare_batches
• But yes, if in your
aggregate_job_results
you are loading everything, it will need to have enough memory. I am not too sure of the final goal of the program but depending on what your output needs to be maybe it doesn’t need to be stored as an artifact and can instead be written out to S3 (or somewhere else) and a reference can be returned. Basically similar to processing and storing large tables (a pattern fairly common here), the data being “returned” is written to a table and not an artifact. Artifacts are great as intermediate state and all but do show limitations when very large sizes are involved (work on that is in progress so stay tuned).
i
Thanks for the insights @dry-beach-38304, sorry the code was kind of hacked together so might have missed a few things 😅. But regardless, I think we got some insights from your responses. Will figure out what we can do on our end. Using S3 did cross our mind, but was concerned about race conditions, as there will be a lot of them running at the same time. Unless that’s not a problem at all? 🤔.
d
You can use separate paths to avoid conflicts. That should be race free. Simple way would be to generate uuids and pass them down
i
yep, makes sense. thanks!