Hi, I am working with data that is provided to me ...
# ask-metaflow
b
Hi, I am working with data that is provided to me as a large csv file stored on s3. I load the data using the s3 client and I only use the required columns. The data load works fine. Now, i want to pass each row through a machine learning model and save the results in parallel using foreach. However, i get the following error:
botocore.*exceptions*.ClientError: An *error* occurred (ValidationException) when calling the PutItem operation: Item size has exceeded the maximum allowed size
. the sample data i am working has 1.5 mil rows but for the real data the number of rows can be around 10 million. below is my code. any suggestion is appreciated. Thanks.
Copy code
@project(name="sample")
class SampleFlow(FlowSpec):
    @batch(image=DEFAULT_IMAGE, **{'cpu': 4, 'memory': 20480})
    @step
    def start(self):
        import pandas as pd
        with S3() as s3:
            s3obj = s3.get('<s3://data.csv>')
            df = pd.read_csv(s3obj.path)
        self.rows=df.to_dict('records')
        self.next(self.ml_model,foreach='rows')

    @batch(image=DEFAULT_IMAGE, **{'cpu': 6, 'memory': 15000})
    @step
    def ml_model(self):
        import MLModel


        row = self.input
        result = MLModel(row)
        result = {**row, **result}
        self.result = result
        print(result)
        self.next(self.join_results)

    @batch(image=DEFAULT_IMAGE, **{'cpu': 6, 'memory': 180000})
    @step
    def join_results(self,inputs):
        import pandas as pd
        dict_list = [inp.result for inp in inputs]
        self.processed_df = pd.DataFrame(dict_list)
        self.next(self.end)
1