icy-terabyte-20903
10/25/2024, 5:33 AMforeach
and the merging process.
Here’s how the flow roughly looks
@step
def start(self):
"""This is the first step"""
# set environment variable for reading config
self.var_to_keep = "x"
self.next(self.prepare_parquets)
@step
def prepare_parquets(self):
self.parquet_files = ["parquet_1", "parquet_2"]
self.next(self.process_parquets, foreach="parquet_files")
@catch(var="process_jobs_failed")
@step
def process_parquets(self):
self.batches = read(self.input) # assume we want to extract every row and do something with it, in groups of 10 or something
self.next(self.prepare_batches)
@step
def prepare_batches(self):
self.next(self.zip_and_archive, foreach="batches")
@catch(var="zip_and_archive_failed")
@step
def zip_and_archive(self):
self.zip_key = archive(self.input) # assume we are zipping up the files together
self.next(self.aggregate_zip_results)
@retry(times=3)
@step
def aggregate_zip_results(self, inputs):
self.merge_artifacts(
inputs, include=["var_to_keep"]
)
# try to merge failed parquet files from previous run, doesn't always exist
try:
self.merge_artifacts(inputs, include=["process_jobs_failed"])
except MetaflowNotFound:
pass
# collect results.
self.batch_success = []
self.batch_failed = []
for i in inputs:
if i.input is None:
# skip if none.
continue
elif hasattr(i, "zip_and_archive_failed") and i.zip_and_archive_failed:
# log the failed process, to retry in the future
self.batch_failed.append(i.input)
else:
self.batch_success.append(i.zip_key)
self.next(self.aggregate_job_results)
@step
def aggregate_job_results(self, inputs):
"""Aggregate the results of the archive job process"""
from archiver.archiver import ArchiveJob
# combine
self.failed_parquet_files = []
self.failed_batches = []
self.keys = []
for i in inputs:
if hasattr(i, "process_jobs_failed") and i.process_jobs_failed:
# log the failed process, to retry in the future
archive_job: ArchiveJob = i.archive_job
if archive_job.report_file:
self.failed_parquet_files.append(archive_job.report_file)
if archive_job.archiver_batches:
self.failed_batches.extend(archive_job.archiver_batches)
else:
# combine the list of successful zip and archive jobs
self.keys.extend(i.keys)
# combine the list of failed batches
self.failed_batches.extend(i.failed_batches)
self.next(self.end)
What is essentially happening is, it’s a nested foreach.
• one to process multiple parquet files in parallel
• then another to process each row of the parquet in variable batch sizes
Here’s the problem that I am trying to figure out. Since the input can somewhat vary, some runs can have 10 parquets, some have thousands. Similarly for the contents of the parquet files. The parallel processing itself is fine, as the amount of memory/cpu needed for each of the parallel step is somewhat consistent. The trouble comes in the joining part. It keeps running out of memory.
Now… question is, was there something in the code that should be avoided in order not to bloat the join step?
e.g. the aggregate_zip_results
step.
• I’ve read somewhere that referencing the input
would load the whole artifact. if I were to run self.merge_artifacts(inputs, include["var_to_keep"]
would that essentially load every single artifact that was run in parallel?
• Similarly there’s a variable that exist if the previous step throws an exception process_jobs_failed
, which I try to merge when it exists. Is this the right approach? Or would there be a better way.
• Finally, I also want to aggregate all the results together. Which means I’ll need to load the inputs anyway.
What are my options here? Should I just give it a bunch of memory? Or was there a better way of writing/designing this?
Thanks!dry-beach-38304
10/25/2024, 6:26 AMvar_to_keep
and check if they are the same on all the incoming branches and if so will propagate it along.
• The code looks a bit weird in the sense that you have process_jobs_failed
in process_parquests
but then you use batches
without issue — batches
is not guaranteed to exist if there was an exception. Further, I would not split process_parquets
and prepare_batches
• But yes, if in your aggregate_job_results
you are loading everything, it will need to have enough memory.
I am not too sure of the final goal of the program but depending on what your output needs to be maybe it doesn’t need to be stored as an artifact and can instead be written out to S3 (or somewhere else) and a reference can be returned. Basically similar to processing and storing large tables (a pattern fairly common here), the data being “returned” is written to a table and not an artifact. Artifacts are great as intermediate state and all but do show limitations when very large sizes are involved (work on that is in progress so stay tuned).icy-terabyte-20903
10/28/2024, 11:31 PMdry-beach-38304
10/29/2024, 2:15 AMicy-terabyte-20903
10/29/2024, 2:28 AM