faint-hair-28386
08/03/2022, 2:22 PMjoin
will be OOM. Is there a way to pass references within join, so it doesn’t load data? Can I then read data iteratively?ancient-application-36103
08/03/2022, 3:33 PMfaint-hair-28386
08/03/2022, 3:35 PMancient-application-36103
08/03/2022, 3:36 PMmerge_artifacts
in the join
step to pass the variables down, it should work without any issues.square-wire-39606
08/03/2022, 3:38 PMfaint-hair-28386
08/03/2022, 3:39 PMyield
where it pulls data only at the actual yieldingfaint-hair-28386
08/03/2022, 3:39 PMfaint-hair-28386
08/03/2022, 3:39 PMfaint-hair-28386
08/03/2022, 3:41 PM@retry(times=2)
@resources(cpu=2, memory=16000)
@step
def join(self, inputs):
import numpy as np
self.Xnew = np.concatenate([result.converted for result in inputs])
print(f'Assigned new X {len(self.Xnew)}')
self.merge_artifacts(inputs, include=['y', 'labels'])
self.next(self.split)
faint-hair-28386
08/03/2022, 3:42 PMancient-application-36103
08/03/2022, 3:42 PMresult.converted
a big object in the preceding step?ancient-application-36103
08/03/2022, 3:44 PMfaint-hair-28386
08/03/2022, 3:44 PM@resources(cpu=2)
@step
def convert(self):
from PIL import Image
from hxml.constants import DEFAULT_IMAGE_SIZE
import io
import numpy as np
def resize_image(image):
import tensorflow as tf
return tf.image.resize(image, DEFAULT_IMAGE_SIZE)
def convert_to_PIL(image):
img = resize_image(image)
# print(f'Generating {img.shape} based on {i}: {d.shape}')
pimg = Image.fromarray(np.uint8(img))
imgByteArr = io.BytesIO()
pimg.save(imgByteArr, format='PNG')
return imgByteArr.getvalue()
# print(f'Converting {len(self.input)} input')
self.converted = [convert_to_PIL(image) for image in self.input]
self.next(self.join)
faint-hair-28386
08/03/2022, 3:45 PMancient-application-36103
08/03/2022, 3:46 PMjoin
stepfaint-hair-28386
08/03/2022, 3:46 PMfaint-hair-28386
08/03/2022, 3:47 PMfaint-hair-28386
08/03/2022, 3:47 PMfaint-hair-28386
08/03/2022, 3:48 PMancient-application-36103
08/03/2022, 3:48 PMfaint-hair-28386
08/03/2022, 3:49 PM@resources(cpu=2)
@step
def schedule_conversion(self):
import numpy as np
self.chunks = np.split(self.X, len(self.X) // 5000, axis=0)
self.next(self.convert, foreach='chunks')
60000 total by 5000 = 12ancient-application-36103
08/03/2022, 3:50 PMconvert
step was processing 1000 images?ancient-application-36103
08/03/2022, 3:50 PMself.Xnew = np.concatenate([result.converted for result in inputs])
will consume that much memoryfaint-hair-28386
08/03/2022, 3:51 PMfaint-hair-28386
08/03/2022, 3:51 PMancient-application-36103
08/03/2022, 3:52 PMfaint-hair-28386
08/03/2022, 3:52 PMancient-application-36103
08/03/2022, 3:53 PMXnew
further downstream in your flowfaint-hair-28386
08/03/2022, 3:56 PMself.X_train, self.X_test, self.y_train, self.y_test = \
train_test_split(self.Xnew, self.y, test_size=0.2,
random_state=42, stratify=self.y)
ancient-application-36103
08/03/2022, 3:58 PMfaint-hair-28386
08/04/2022, 6:57 AMfresh-laptop-72652
08/04/2022, 4:24 PMfresh-laptop-72652
08/04/2022, 4:30 PMfaint-hair-28386
08/04/2022, 7:02 PMfaint-hair-28386
08/04/2022, 7:05 PMfaint-hair-28386
08/04/2022, 7:06 PMfresh-laptop-72652
08/04/2022, 7:23 PMfaint-hair-28386
08/04/2022, 7:31 PMfaint-hair-28386
08/04/2022, 7:34 PM