best-accountant-46528
07/17/2024, 1:37 PM@trigger
decorator in my flow, my messages were being lost if the sensor or the workflow template were unavailable. This means that if an unavailability event were to happen, critical messages would be lost, which does not happen in vanilla RabbitMQ consumption if I correctly use it.
In my pipeline, it is critical that all messages are processed. The end result for the client can vary in unacceptable ways if just one message is lost.
My configuration follows:
The EventSource I'm using to connect to the RabbitMQ server:
apiVersion: v1
kind: Secret
metadata:
namespace: argo
name: my-secret
type: kubernetes.io/basic-auth
stringData:
password: -
username: -
---
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
namespace: argo
name: amqp
spec:
amqp:
dummyflow:
url: <amqps://url>:port
# jsonBody specifies that all event body payload coming from this
# source will be JSON
jsonBody: true
exchangeName: test
exchangeType: fanout
routingKey: argo-testing-queue
connectionBackoff:
duration: 10s
steps: 5
factor: 2
jitter: 0.2
exchangeDeclare:
durable: true
autoDelete: false
internal: false
noWait: false
queueDeclare:
name: "argo-testing-queue"
durable: false
autoDelete: false
exclusive: true
noWait: false
queueBind:
noWait: false
consume:
consumerTag: "my-consumer-tag" # Consumer identifier, must be unique for the same channel
autoAck: true # If set to true, the consumer will send ack immediately after receiving the message
exclusive: false # If set to true, the client will request to be the only consumer on the queue
noLocal: false
noWait: false # If set to true, the client should not wait for server response for the consume method
auth:
username:
name: my-secret
key: username
password:
name: my-secret
key: password
The Metaflow file that I'm using for testing:
from metaflow import FlowSpec, step, kubernetes, trigger
@trigger(event="dummyflow")
class DummyFlow(FlowSpec):
@kubernetes(cpu=0.2, memory=300, disk=1024)
@step
def start(self):
print('testing')
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
DummyFlow()
My .env
file:
METAFLOW_SERVICE_URL="<https://external_metaflow_url>"
METAFLOW_SERVICE_INTERNAL_URL="<http://metadata-service.metaflow.svc.cluster.local:8080>"
METAFLOW_ARGO_EVENTS_SERVICE_ACCOUNT="metaflow-executor"
METAFLOW_KUBERNETES_SERVICE_ACCOUNT="metaflow-executor"
METAFLOW_KUBERNETES_SECRETS="metaflow-executor-token-secret"
METAFLOW_KUBERNETES_NAMESPACE="flows"
METAFLOW_DEFAULT_METADATA="service"
METAFLOW_DEFAULT_DATASTORE=gs
METAFLOW_DATASTORE_SYSROOT_GS=<gs://my_storage_bucket>
METAFLOW_ARGO_EVENTS_WEBHOOK_URL="<http://webhookurl>"
METAFLOW_ARGO_EVENTS_EVENT=dummyflow
METAFLOW_ARGO_EVENTS_EVENT_BUS=default
victorious-lawyer-58417
07/20/2024, 6:42 PMvictorious-lawyer-58417
07/20/2024, 6:47 PM