sparse-belgium-16816
09/27/2024, 7:44 PMbrainy-truck-72938
09/27/2024, 8:55 PMRun
object..
But, with all that said, it shouldn't be too slow..sparse-belgium-16816
09/27/2024, 8:59 PMbrainy-truck-72938
09/27/2024, 9:00 PMsparse-belgium-16816
09/27/2024, 9:02 PMfrom typing import List, Literal
from metaflow import FlowSpec, JSONType, Parameter, step
from pydantic import BaseModel
class ImagePath(BaseModel):
img_path: str
class SampleFlowResult(ImagePath):
"""
Sample Flow workflow response
"""
result: Literal["success", "failed", "in_progress"]
class SampleFlowResponse(BaseModel):
"""
Sample flow response
"""
operation: Literal["sample_flow"]
response: List[SampleFlowResult]
class SampleFlowWorkflowInput(BaseModel):
request_id: str
images: list[str]
class SimpleFlow(FlowSpec):
input_payload = Parameter(
"input_payload", help="The payload containing the images to be processed.", required=True, type=JSONType
)
some_key_1 = Parameter(
"some_key_1", help="Some Extra Parameter provided to the workflow", type=str, default="key_1"
)
some_key_2 = Parameter(
"some_key_2",
help="Some Extra Parameter provided to the workflow",
type=str,
default="key_2",
)
@step
def start(self) -> None:
print("This is the start step.")
self.next(self.process_data)
@step
def process_data(self) -> None:
print("This is the data processing step.")
self.next(self.end)
@step
def end(self) -> None:
self.output_payload = SampleFlowResponse(
operation="sample_flow",
response=[
SampleFlowResult(
img_path="path/to/image.jpg",
result="success",
)
],
).model_dump_json()
print("This is the end step.")
if __name__ == '__main__':
SimpleFlow()
sparse-belgium-16816
09/27/2024, 9:04 PMsparse-belgium-16816
09/27/2024, 9:05 PMbrainy-truck-72938
09/27/2024, 9:06 PMsparse-belgium-16816
09/27/2024, 9:21 PMsparse-belgium-16816
09/27/2024, 9:48 PMsparse-belgium-16816
09/30/2024, 4:30 PMsubprocess.Popen
being used. However if we change this to subprocess.run blocking call the response times are atleast 5 times fasterbrainy-truck-72938
09/30/2024, 4:30 PMsparse-belgium-16816
09/30/2024, 5:43 PMself.process = subprocess.Popen(
shlex.join(self.command),
cwd=self.cwd,
env=self.env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines=True,
shell=True
)
sparse-belgium-16816
09/30/2024, 5:44 PMbrainy-truck-72938
09/30/2024, 5:45 PMshell=True
doesn't affect anything else..sparse-belgium-16816
09/30/2024, 5:47 PMbrainy-truck-72938
09/30/2024, 5:49 PMsparse-belgium-16816
10/01/2024, 8:39 AM