Hello folks, I'm using Metaflow with Argo Workflo...
# ask-metaflow
b
Hello folks, I'm using Metaflow with Argo Workflows/Events in k8s. I'm trying to transfer my video processing pipeline, which is a basic message-based producer/consumer set of services. In my recent tests, I noticed a weird behavior: by using an Argo Events AMQP EventSource and a
@trigger
decorator in my flow, my messages were being lost if the sensor or the workflow template were unavailable. This means that if an unavailability event were to happen, critical messages would be lost, which does not happen in vanilla RabbitMQ consumption if I correctly use it. In my pipeline, it is critical that all messages are processed. The end result for the client can vary in unacceptable ways if just one message is lost. My configuration follows: The EventSource I'm using to connect to the RabbitMQ server:
Copy code
apiVersion: v1
kind: Secret
metadata:
  namespace: argo
  name: my-secret
type: kubernetes.io/basic-auth
stringData:
  password: -
  username: -
---
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  namespace: argo
  name: amqp
spec:
  amqp:
    dummyflow:
      url: <amqps://url>:port
      # jsonBody specifies that all event body payload coming from this
      # source will be JSON
      jsonBody: true
      exchangeName: test
      exchangeType: fanout
      routingKey: argo-testing-queue
      connectionBackoff:
        duration: 10s
        steps: 5
        factor: 2
        jitter: 0.2
      exchangeDeclare:
        durable: true
        autoDelete: false
        internal: false
        noWait: false
      queueDeclare:
        name: "argo-testing-queue"
        durable: false
        autoDelete: false
        exclusive: true
        noWait: false
      queueBind:
        noWait: false
      consume:
        consumerTag: "my-consumer-tag" # Consumer identifier, must be unique for the same channel
        autoAck: true # If set to true, the consumer will send ack immediately after receiving the message
        exclusive: false # If set to true, the client will request to be the only consumer on the queue
        noLocal: false
        noWait: false # If set to true, the client should not wait for server response for the consume method
      auth:
        username:
          name: my-secret
          key: username
        password:
          name: my-secret
          key: password
The Metaflow file that I'm using for testing:
Copy code
from metaflow import FlowSpec, step, kubernetes, trigger

@trigger(event="dummyflow")
class DummyFlow(FlowSpec):
    @kubernetes(cpu=0.2, memory=300, disk=1024)
    @step
    def start(self):
        print('testing')
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == '__main__':
    DummyFlow()
My
.env
file:
Copy code
METAFLOW_SERVICE_URL="<https://external_metaflow_url>"
METAFLOW_SERVICE_INTERNAL_URL="<http://metadata-service.metaflow.svc.cluster.local:8080>"
METAFLOW_ARGO_EVENTS_SERVICE_ACCOUNT="metaflow-executor"
METAFLOW_KUBERNETES_SERVICE_ACCOUNT="metaflow-executor"
METAFLOW_KUBERNETES_SECRETS="metaflow-executor-token-secret"
METAFLOW_KUBERNETES_NAMESPACE="flows"
METAFLOW_DEFAULT_METADATA="service"
METAFLOW_DEFAULT_DATASTORE=gs
METAFLOW_DATASTORE_SYSROOT_GS=<gs://my_storage_bucket>
METAFLOW_ARGO_EVENTS_WEBHOOK_URL="<http://webhookurl>"
METAFLOW_ARGO_EVENTS_EVENT=dummyflow
METAFLOW_ARGO_EVENTS_EVENT_BUS=default
1
v
if it is critical that every event gets processed extremely reliably, it'd be a recommended pattern to store the events in an event log (e.g. in a database), and have a flow process all unprocessed events from the log every time it executes. This guarantees that events are recorded reliably and they are guaranteed to get processed eventually - typically in real-time. Also, it gives you an audit trail
even if message delivery was perfectly reliable, a flow processing an event might fail (repeatedly), causing an event to be lost effectively. With an event log, you'd know that all unprocessed events will get processed eventually