Hello guys, a question regarding large dataset as ...
# dev-metaflow
f
Hello guys, a question regarding large dataset as artifact. I’m building a demo with metaflow and a few steps are about converting data and splitting it later into test|train sets. Right now I’m thinking to use metaflow.S3 to store output results of a parallel conversion step and then read it in another step. Despite that, looking into docs, there are a few notes that Datastore should be enough for the most cases. I think my case should be under that “most cases”. I wonder how metaflow devs approach this issue. Increasing resources isn’t feasible, since a dataset can be extremely large. So,
join
will be OOM. Is there a way to pass references within join, so it doesn’t load data? Can I then read data iteratively?
1
a
How large is the large dataset?
f
large enough to not consider allocating an instance with sufficient RAM. Let’s say a few dozens GB
👍 1
a
If you are using
merge_artifacts
in the
join
step to pass the variables down, it should work without any issues.
f
it didn’t 🙂 not that long ago we had a discussion here - let’s take CIFAR10, upscale it to 299x299 and try to pass it between steps. Sequential steps do work. Join though, blows up. I tried 24GB of ram if I recall correctly. Anyway, I’m rather looking for a way, how not to load everything into the memory at once. If I can use a generator with
yield
where it pulls data only at the actual yielding
I hope it makes sense 😄
my join looks like this now
Copy code
@retry(times=2)
    @resources(cpu=2, memory=16000)
    @step
    def join(self, inputs):
        import numpy as np
        self.Xnew = np.concatenate([result.converted for result in inputs])
        print(f'Assigned new X {len(self.Xnew)}')
        self.merge_artifacts(inputs, include=['y', 'labels'])
        self.next(self.split)
I limited dataset to 22% out of the original one. It was for a quick demo. That metaflow is feasible to work with. Now I’m doing something more advanced. With more data
a
is
result.converted
a big object in the preceding step?
How do you want to access the several large objects that are generated in the upstream foreach? Is it within this foreach join or somewhere below?
f
define big 🙂 it’s a list of 1000 upscaled images 299x299
Copy code
@resources(cpu=2)
    @step
    def convert(self):
        from PIL import Image
        from hxml.constants import DEFAULT_IMAGE_SIZE
        import io
        import numpy as np

        def resize_image(image):
            import tensorflow as tf
            return tf.image.resize(image, DEFAULT_IMAGE_SIZE)

        def convert_to_PIL(image):
            img = resize_image(image)
            # print(f'Generating {img.shape} based on {i}: {d.shape}')
            pimg = Image.fromarray(np.uint8(img))
            imgByteArr = io.BytesIO()
            pimg.save(imgByteArr, format='PNG')
            return imgByteArr.getvalue()

        # print(f'Converting {len(self.input)} input')
        self.converted = [convert_to_PIL(image) for image in self.input]
        self.next(self.join)
👍 1
they are coming from a foreach. I want to join them and then in the next step - split. Perhaps I don’t want to bring actual data to the split. It can be just indices that I will use later down the stream.
a
If you don't want to use actual data anywhere downstream, you can even not refer to them in the
join
step
f
or I don’t split them all at once. I can just take chunks , shuffle them and split the chunks. Still, there should be a point where I join them into something. So I can fit the model
and for join I would think of a generator, that takes data from artifact by chunks
anyway. It seems that it’s a very default expectation for any pipeline. And I wonder what are the good practices
how do you handle large datasets where you need to preprocess, split and train
a
It seems that a single image is no more than half a megabyte - so roughly half GB from each foreach split. How many foreach splits do you have?
f
so, let’s count
Copy code
@resources(cpu=2)
    @step
    def schedule_conversion(self):
        import numpy as np
        self.chunks = np.split(self.X, len(self.X) // 5000, axis=0)
        self.next(self.convert, foreach='chunks')
60000 total by 5000 = 12
a
each
convert
step was processing 1000 images?
independent of that, the overall data is indeed around ~30GB. The snippet
self.Xnew = np.concatenate([result.converted for result in inputs])
will consume that much memory
f
ah sorry, my bad. Yes, 5000. Got lost in my attempts
so, concatenate is the problem?
a
Yep
f
and, how can I overcome it? 🙂
a
depends on how you want to use
Xnew
further downstream in your flow
f
a dummy version was to use it for the split
Copy code
self.X_train, self.X_test, self.y_train, self.y_test = \
            train_test_split(self.Xnew, self.y, test_size=0.2,
                             random_state=42, stratify=self.y)
a
And this step will run on an instance that has 32GB memory?
f
it might, but that’s my question - is there a design pattern for such scenario. Allocating 32gb instance for a split, or any other type of processing doesn’t sound right. As I mentioned above, there are multiple ways to split data without pulling it all at once. At the same time, let’s imagine there will be a next step, like training. Where I would like to use batches. At the moment I doubt that I can use artifacts as is
f
rather than joining together the image bytes into a single array (and loading it all into memory at once to do so), could you pass the pointers to the S3 URLS to defer loading? Depending on your use-case, you could also potentially do the train/test splits relative to the URLs without loading the image bytes
that said, I’m still a bit unclear on the use-case and going to throw out some thoughts: • it’s often best to do image preprocessing/resizing as part of a dataloader for a model, as image transformations can vary quite a bit from model to model so abstracting that away from the base image dataset can be beneficial. Performance can be kept in check with sufficient dataloader prefetching/multiple workers. • 32GB really isn’t that much data and this feels like a bit of premature optimization 😉 • At the end of the day it kinda just depends on your use-case for how to structure the data, or change the design of your dataloader to accomodate existing structures. Maybe you want to do oversampling/undersampling of a given class (without having to load it all into memory at once), maybe you’re doing self-supervised representation learning and don’t care about labels at all – once you step away from precanned image datasets for benchmarks, there’s no avoiding a bit of getting your hands dirty 😛
f
s3 URL was my actual thing today. We’re building an ML platform where Metaflow serves as an ETL. At the end, the first pipeline won’t have any training, just data processing - like rescaling images, converting them from something to something - think like PSD->png. And finishing with an actual dataset of TF Records. Then this will be fed to Sagemaker where we have actual train/tune scripts. We want to have preprocessed data stored for later retrospection, since our data will be regularly updated and we need to perform dedup and other technics to keep models fine. Of course, we want to retrospect train/test split as well. So, it’s more like a production scale than just an experiments pipeline
you can think of it as a premature optimization, I guess. At the same time, I’m just watching resources. I can throw 120GB if I really want to. But my internal engineer tells me that batch processing shouldn’t be so demanding 😉
and btw when we talk about image processing, same PSD can be 2GB (like easily) and if I want to process it, I don’t see another option but S3
f
S3 is certainly a great place for storing image data! It sounds like one of the largest concerns with the batch processing workflow you’ve described would be the variation in file sizes say you’re wanting to batch process 1000 images as you’ve described, and each of those images is a PSD file that varies in size from 1MB-2GB. If the goal is to efficiently utilize compute resources within those constraints, you would need to efficiently be able to construct those batches of 1000 images such that they’re similar in aggregate size across batches. from those 1000 example PSD files, you’d have a lower bound of 1GB (1k 1MB files) up to 2TB (1k 2GB files). The mean/median file size will be somewhere in the middle, and you could look at the distribution of file sizes and try to be clever with batch sampling and/or process them serially within batches to cap the upper bound – but the main thing that seems to stand out is that you’ll want to store the files in S3 (which you could do via metaflow or manually) and then reference pointers to the S3 image urls for tracking the lineage, splits, etc
f
yes, that’s the plan for now. And files won’t stay in Metaflow s3, they will be transferred at the end of the pipeline to an appropritate datastore. But yes,thank you for confirmation
Still, this makes me think of an idea for metaflow. To have a generator that pulls data piece by piece. Would be a nice feature. Might look into your source code one day 🙂