high-scooter-88084
04/06/2023, 6:18 PM[KILLED BY ORCHESTRATOR]
. Is it possible to catch that signal in the branches that haven't failed yet ? Specifically,
from metaflow import FlowSpec, step, project
@project(name="someflow")
class SomeFlow(FlowSpec):
@step
def start(self):
self.nums = [1, 2]
self.next(self.process, foreach="nums")
@step
def process(self):
import time
print(f"Setup compute resources")
if self.input == 2:
raise ValueError(f"{self.input} is cursed")
print(f"{self.input} is good")
time.sleep(10)
print(f"{self.input} cleanup compute resources")
self.next(self.join)
@step
def join(self, inputs):
self.merge_artifacts(
inputs,
)
self.next(self.end)
@step
def end(self):
print(f"ended")
if __name__ == "__main__":
SomeFlow()
In the process
step with input==1
, I want cleanup print statement to be executed. I have tried, context managers, try catch blocks, SIGINT
and SIGTERM
handlers. I guess my question is, what signal does the process receive ? Is it SIGKILL
? If so, what is the best practice to cleanup resources that are spun up but are left hanging ?