Argo Events — Event Bus and Webhook
Argo Event is a Kubernetes based event automation engine. It is part of the Argo project. Argo Events can be used with or independent of other projects in Argo.
I will be writing a series of articles on Argo Events; in these articles I will be looking at how we can use Argo Event to automate process within and without a Kubernetes cluster.
For this first article in this series, we will examine Argo Events core concepts, installation and provisioning different event buses which Argo Event uses to forward events to their sink. Finally we will look at setting up a webhook event flow to verify our setup.
Core Concepts¶
- Events are activities that we are interested in; for example every 12 AM on the the first of each month, a file has just been uploaded to a S3 bucket, a messages has just arrived on a queue, etc. We are interested in these events because we would like to perform some task when the event occurs.
- Event sources is the way by which external real-world events are captured and routed into Argo Events engine; they are created by CRDs called
EventSource. Argo support many common event sources like HTTP request, S3, Slack, etc; you can find the list of the supported events here. - Triggers are processes that are executed in response to an event. The is the action that you want to take when an event happens. Examples of triggers include sending an email message, rolling out a new application release, forwarding the event to a Kafka queue, invoking a serverless operation, etc. You can find a list of supported triggers here.
- Sensor, created by the CRD
Sensor, determines which trigger to execute based on the events from event sources; for example a sensor can trigger a serverless operation from an upload event on a S3 bucket or perform a deployment of an application to the testing environment in response to a pull request event. The event sources are inputs, also known as dependencies, into sensors; triggers are the outputs from sensors.
Argo Events uses a message queue, called an event bus, to reliably transfer events from event sources to sensors.
The following diagram shows the relationship between the described components
Argo Event conceptual architecture
Installation Workflow¶
The following are the required steps to setup and deploy Argo Event
- Install Argo Events. Create a namespace call
argo-eventsand install the event controller and a validating webhook. You can find the instructions here. - Deploy one or more event buses. Since
EventBus,EventSourceandSensorare are namespace scoped resource, event buses must be deployed into the same namespace as theEventSourceandSensor; event source and sensor can only use event buses in their namespace for communication. You may also need to manually create topics/queues if your event bus do not support auto topic creation. More on this later in this article - Deploy one or more event sources into the same namespace as the event bus.
- Deploy one or more sensors into the same namespace as the event bus, specifying the deployed event sources as dependencies (inputs) and the corresponding triggers that will be called if the dependencies are resolved. Sensors must use the same event bus as the event source if a sensor is to receive events from that event source.
- Install Argo Workflow if you are planning to trigger Argo Workflows. You can also use Argo Workflow’s web interface to view and manage event flow graphically; Argo Event do not come with any graphical interface.
Event Bus¶
Event bus is a message broker; data is enqueued by producers is transmitted to its destination where they are dequeued and consumed as shown in the following diagram
Image from https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview
In the context of Argo Events, event sources are the producers. They capture real world events like a push to GitHub, uploading a file to a S3 bucket, data from a MQTT topic, Stripe transaction, etc and enqueued these events as cloudevents objects into an event bus. These events are then consumed by sensors which can be transformed and/or forwarded to triggers.
Argo Events uses existing messaging systems rather than implementing its own message broker. Message brokers can be deployed in-cluster or leverage existing external message brokers. Argo supports the following event bus: NATS (deprecated), Jetstream or Kafka.
In the next section, we will create 2 event bus: the first is in-cluster and the second uses an existing Kafka cluster.
Jetstream Event Bus¶
We will create an in-cluster EventBus based on Jetstream.
The EventBus is called jetstream-eb and it is created in the playground namespace. The Jetstream message broker consists of 3 nodes (jetstream-eventbus.yaml line 10). You can pass additional parameters to the broker with the streamConfig parameter; in the above example, messages are duplicated to 2 nodes for resiliency, and messages are retained for 5 minutes (jetstream-eventbys.yaml lines 12,13). You can see a list of other Jetstream options here.
Verify that the event bus have been created with the following command
kubectl get all,sts,eventbus -nplayground
eb is the shortname for eventbus resource.
Kafka Event Bus¶
In the second event bus example, we will use an existing Kafka cluster as an event bus. I have a Kafka cluster deployed on Upstash with the following properties:
The following EventBus CRD deploys an event bus that uses the above Kafka cluster for event delivery.
We start by configuring the cluster’s bootstrap server and the topic that the event bus will be using to queue events (kafka-eventbus.yaml lines 21, 22).
The Kafka topic can be created automatically by Argo Events if the property [auto.create.topics.enabled](https://kafka.apache.org/documentation/#brokerconfigs_auto.create.topics.enable) is enabled.
Since my Kafka cluster has disabled auto topic creation, I have manually created the topic, argo-kafka-eventbus, and include the topic name in the topic attribute (kafka-eventbus.yaml line 22) of the EventBus resource. Note that for every sensor, you will need an additional 2 topics; more details on this later.
The next major configuration block is the credentials used by Argo Events to login to the cluster. Examining the above configuration file, kafka.properties, we have the following:
- Client authentication is performed over SSL — SASL_SSL (
kafka.propertiesline 3) - Authentication mechanism — SCRAM-SHA-256 (
kafka.propertiesline 2) - Username and password (
kafka.propertieslines 5, 6)
We will need to create a Secret to hold the username and password (kafka-eventbus.yaml lines 2 — 10). The EventBus will source the username and password from this Secret (kafka-eventbus.yaml lines 27–32).
Finally we configure the event bus to use SASL_SSL and SCRAM-SHA-256 for authentication (kafka-eventbus.yaml lines 25, 26). I could not find Upstash’s CA certificate so I am skipping the certificate verification with tls.insecureSkipVerify set to true (kakfa-eventbus.yaml lines 23, 24). If you have the cluster’s CA certificate, then you can configure the CA with tls.caCertSecret attribute (see this).
Producing Events with EventSource¶
An EventSource resources queue real world events into an event bus. You will have to start your event workflow by sourcing from one or more of the supported event sources.
For this article, we will look at one of the most common type of event: the webhook event.
Webhook Event Source¶
A webhook is a communication endpoint, typically over HTTP, that allows one application to send notification to another using the endpoint. Whereas ‘traditional’ API endpoint is request-response viz. the application that makes the request will also expect a response, e.g. what is the weather in Singapore?
A webhook on the other hand is a one-way exchange; when you invoke a webhook, you will typically not expect a response except an acknowledgement eg. with a 202 status code in the case of a HTTP. An common example of webhook is receiving a transaction details in your email when you use your phone for payment.
The following EventSource resource creates a webhook for receiving notifications.
- The
webhookattribute (webhook-es.yamlline 10) creates a HTTP endpoint on port 12000; events are sent with thePOSTmethod to/notifyresource (webhook-es.yamllines 12–14). - The event’s name is called
simple(webhook-es.yamlline 11). You can have multiple events underwebhook; however the event names must be unique. You can also configure multiple types of events in a singleEventSourceresource. See theEventSourcedocumentation for the complete list. In future articles, we will look at other event sources. - Events received by the webhook event source are queue to the
kafka-ebevent bus (webhook-es.yamlline 9).
When the above webhook EventSource is deployed, Argo Event provisions a Deployment with 1 pod; the pod listens on port 12000. To expose webhook outside world, you have to deploy an additional Service and an Ingress resource. The webhook event source pod has the following labels: eventsource-name and owner-name. These values default to the name of the event source: webhook-es (webhook-es.yaml line 5). Create a Service to forward traffic to these pods and expose the service with an Ingress as shown in the following YAML.
The following shows the output listing of EventSource and the resulting Deployment and Pod along with the Service and Ingress resource to expose the event.
According to Argo Event documentation, several event sources starts a HTTP service to receive incoming events. For these event sources, deploy an Ingress if these event sources need to be accessible from outside of the cluster or a ClusterIP Service only if it is for the cluster only.
Consuming Events with Sensor¶
A sensor consumes events, called dependencies, on the same event bus as the event source from which the sensor is receiving its events. When the event fires, a Sensor will dequeue the event from the bus, extract the event object and use it to invoke one or more defined triggers.
HTTP Trigger¶
One of the most common trigger is the HTTP trigger. Think of a HTTP trigger like a curl; which you can use to invoke any REST endpoint including serverless, web application, trigger other callbacks, etc.
In the following example, we create a Sensor to ingest events from webhook-es EventSource (webhook-es.yaml). The Sensor then selects some of the (arbitrary) fields from the event payload and forwards it to a deployed application, a simple web application that logs the contents of a POST request (see this gist).
eventBusName(webhook-sn.yamlline 8) specifies which bus to subscribe to for event delivery.- The
dependenciesdefine what are the events theSensoris interested in (webhook-es.yamllines 9–12). - A dependency consist of 3 attributes: the event source’s name (
webhook-es,webhook-es.yamlline 11) and the event name (simple,webhook-es.yamlline 12) from the event source and name (message,webhook-es.yamlline 10) to associated the event source, event name pair locally. This name,message, can be used as a reference when we are extracting values from the event. triggersdefine the action to take when the there is a event match from the event source. Each trigger is defined within a trigger template (webhook-sn.yamllines 14–37). A template consist of a unique trigger name (webhook-sn.yamlline 15) and the type of trigger (webhook-sn.yamlline 16).- In the above example, we use the HTTP trigger (
webhook-sn.yamllines 16–37) to invoke theshttpbin-svcservice (webhook-sn.yamlline 17) with aPOSTmethod. The payload of thePOSTmethod is extracted (webhook-sn.yamllines 21 — 37) from the event object in thepayload(webhook-sn.yamlline 21) attribute. The extracted values are passed asapplication/jsontoshttpbin-svcservice. - Each of the
payloadhas a source, which uses JSON path to extract the value (webhook-sn.yamllines 22–25, 26–29, 30–33) from the event object, and a destination (webhook-sn.yamlline 25, 29, 33), the name to bind the extracted value to. - The JSON path address an attribute in the playload of the webhook. You can find out about the paybload by examining the cloudevent that the event source has queued. The following diagram shows the cloudevent object in the
argo-kafka-eventbus.
cloudevent object queued by webhook-es event source on kafka-eb event bus
- The webhook event is encoded as base64 in the
data_base64attribute. If you decode thedata_base64, you will get the following JSON structure.
Decoded base64 string from data_base64
- To extract the email value, the JSON path would be
body.message.email. Argo Events undecodedata_base64before extracting the values. The JSON path is based on tidwall/sjson implementation. Refer to the documentation for more path expressions.. - cloudevent metadata can be extracted with the
contextKey(webhook-sn.yamlline 37) instead ofdataKeyattribute. - You can also include headers with the
headersattribute (webhook-sn.yamllines 19, 20) and optionally define a success criteria of the HTTP trigger invocation withpolicy.status.allow(webhook-sn.yamllines 38–41). Ifshttpbin-svcreturns a 200 status, then the invocation is successful.
The following figure summarizes the dependencies between EventSource and Sensor.
If you have Argo Workflow installed you can view the event flow in Argo Workflow’s web console. Port forward to the argo-server service with the following command
kubectl port-forward svc/argo-server 2746:2746 -nargo
The events can be found under the Event Flow tab.
Event flow
Kafka Topics for Sensors¶
According to Argo Event documentation, for every Sensor, we need to create 2 additional topic, one for triggers and the other for actions. The topics are created according a strict naming convention.
For our example, the EventSource webhook-es and Sensor webhook-sn, publish to and subscribe from the topic argo-kafka-eventbus from the kafka-eb event bus.
For the webhook-sn sensor, to receive events from webhook-es, we will need to create 2 additional topics in our Kafka cluster using the following naming convention <event bus topic name>-<sensor name>-action and <event bus topic name>-<sensor name>-action; see the following figure for the 2 additional topics created to support webhook-sn Sensor. You do not have to manually create these topics if your Kafka cluster supports creating topics automatically.
2 additional topics for every sensor
Testing the Event Flow¶
Test the webhook by sending a POST request to /notify at the webhook-es Ingress endpoint; the following example uses curl to POST JSON payload to the webhook endpoint at webhook-192.168.39.85.nip.io.
curl -v -X POST webhook-192.168.39.85.nip.io/notify \
-H 'Content-Type: application/json' \
-d '{ "message": { "id": "abc123", "name": "fred", "email": "fred@gmail.com" } }'
If you are not able to see the event arriving at shttpbin-deploy, then you can trace the event from the source to its sink by examining the following:
- Logging the
EventSourcedeployment - Viewing the contents of
argo-kafka-eventbustopic from your Kafka console or with thekafka-topics.shcommand - Logging the
EventSensordeployment - Viewing the contents of
argo-kafka-eventbus-webhook-sn-triggertopic - Your application logs which in this case is
shttpbin-deploy
We will explore other event sources and and triggers in future articles.
Till next time…



