careful-dress-39510
05/26/2022, 10:43 PMresume_config = Parameter("resume_run_id", type=JSONType, default=None) # {'run_id': 'sfn-76a57bef-5f83-70b0-4284-e9807e49998f', 'last_successful_step': 'create_jockey_features'}
I also set a variable has_resumed
in case the flow has already resumed.
Then I have a check at the start of each step:
if self.resume_config and not self.has_resumed:
if current.step_name == self.resume_config["last_successful_step"]:
successful_steps = get_flow_run_successful_steps(run_id=self.resume_config["run_id"], flow_name=current.flow_name)
step = [s for s in successful_steps if s.id == current.step_name][0]
data = step.task.data # how to set properly
self.has_resumed = True
self.next(self.process_data)
I was just wondering is there a way I can actually set the entire step.task.data object for that step, instead of having to manually get each data object and set it with self.df = some_code_to_get_previous_flow_run_data.step.data
( if you follow my drift )careful-dress-39510
05/26/2022, 10:44 PMancient-application-36103
05/26/2022, 11:35 PMMetaflowData
objects exposed by task.data
and make the assignments?careful-dress-39510
05/26/2022, 11:37 PMcareful-dress-39510
05/26/2022, 11:38 PMself.df = step.task.data.df
self.df2 = step.task.data.df2
careful-dress-39510
05/26/2022, 11:39 PMself.data = step.task.data
careful-dress-39510
05/26/2022, 11:39 PMfresh-laptop-72652
05/27/2022, 12:02 AMcurrent
object for the origin_run_id
, e.g.
# Check if there's an origin run id for the case of resumes instead of retries.
run_id = current.origin_run_id
if not run_id:
run_id = current.run_id
careful-dress-39510
05/27/2022, 12:57 AMcareful-dress-39510
05/27/2022, 12:57 AMcareful-dress-39510
05/27/2022, 12:58 AMcareful-dress-39510
05/27/2022, 12:59 AMStep("something").tasks.data
Does that make sense?fresh-laptop-72652
05/27/2022, 1:13 AMpython flow.py resume --origin-run-id sfn-248a324-23ec23h49-42fea39
and also include a --production
tag if you have particular behavior for that.
the caveat is that the machine you run the resume from will act as the job scheduler, rather than step functions, so it’s like running a flow with @batch
decorators.
if I’m understanding correctly, you need the resumed jobs to also be using step functions as the job scheduler instead of another machine?careful-dress-39510
05/27/2022, 1:57 AMcareful-dress-39510
05/27/2022, 1:57 AMfresh-laptop-72652
05/27/2022, 2:32 AMcareful-dress-39510
05/27/2022, 3:47 AMcareful-dress-39510
05/27/2022, 7:18 PMancient-application-36103
05/27/2022, 7:49 PMpython flow.py resume --origin-run-id <sfn-id> --tag <tag>
where the tag is the namespace you are interested in.careful-dress-39510
05/29/2022, 12:33 AMcareful-dress-39510
05/29/2022, 12:34 AMcareful-dress-39510
05/29/2022, 12:34 AMancient-application-36103
05/29/2022, 2:57 AMancient-application-36103
05/29/2022, 2:57 AMancient-application-36103
05/29/2022, 2:57 AMcareful-dress-39510
05/29/2022, 2:58 AMcareful-dress-39510
05/29/2022, 3:01 AMancient-application-36103
05/29/2022, 3:59 AMquiet-motherboard-43023
05/30/2022, 6:31 PMambitious-bird-15073
04/17/2023, 8:54 AMambitious-bird-15073
04/17/2023, 9:19 AM