Using Kafka trigger to invoke a Function

This tutorial will demonstrate how to use a Kafka trigger to invoke a function. We’ll assume you have Fission and Kubernetes installed with Kafka MQ integration installed. If not, please head over to the install guide.

You will also need Kafka setup which is reachable from the Fission Kubernetes cluster. If you want to setup Kafka on the Kubernetes cluster, you can use the information here.


Before we dive into details, let’s walk through overall flow of event and functions involved.

  1. A Go lang producer function (producerfunc) acts as a producer and drops a message in a Kafka topic named input.
  2. Fission kafka trigger activates and invokes another function (consumerfunc) with body of Kafka message.
  3. The consumer function (consumerfunc) gets body of message and returns a response.
  4. Fission Kafka trigger takes the response of consumer function (consumerfunc) and drops the message in a response topic named output. If there is an error, the message is dropped in error topic named error.

Building the app

Producer Function

The producer function is a go program which creates a message with timestamp and drops into a topic input. For brevity all values have been hard coded in the code itself.

package main

import (

	sarama ""

// Handler posts a message to Kafka Topic
func Handler(w http.ResponseWriter, r *http.Request) {
	brokers := []string{"broker.kafka.svc.cluster.local:9092"}
	producerConfig := sarama.NewConfig()
	producerConfig.Producer.RequiredAcks = sarama.WaitForAll
	producerConfig.Producer.Retry.Max = 10
	producerConfig.Producer.Return.Successes = true
	producer, err := sarama.NewSyncProducer(brokers, producerConfig)
	if err != nil {
	t := time.Now()
	ts := t.Format(time.RFC3339)
	message := fmt.Sprintf("{\"name\": \"value %s \"}", ts)
	_, _, err = producer.SendMessage(&sarama.ProducerMessage{
		Topic: "input",
		Value: sarama.StringEncoder(message),

	if err != nil {
		w.Write([]byte(fmt.Sprintf("Failed to publish message to topic %s: %v", "input", err)))
	w.Write([]byte("Successfully sent to input"))

Since the go program uses sarama library, we need to provide that package for building & running the program. We use glide tool with simple glide.yaml declaration below to download the package.

- package:

With these two files in a directory, run the command glide install -v. The resulting directory structure will look like below:

├── glide.lock
├── glide.yaml
├── kafka-pub.go
└── vendor

1 directory, 3 files

We are now ready to package this code and create a function so that we can execute it later. Following commands will create a environment, package and function. Verify that build for package succeeded before proceeding.

$ fission env create --name goenv --image fission/go-env --builder fission/go-builder
$ zip -qr * 
$ fission package create --env goenv --src
Package 'kafka-zip-tzsu' created
$ fission fn create --name producerfunc --env goenv --pkg kafka-zip-tzsu --entrypoint Handler
$ fission package info --name kafka-zip-tzsu
Name:        kafka-zip-tzsu
Environment: go-kafka
Status:      succeeded
Build Logs:
Building in directory /usr/src/kafka-zip-tzsu-1bicov

Consumer function

The consumer function is nodejs function which takes the body of the request, appends a “Hello” and returns the resulting string.

module.exports = async function (context) {
    let obj = context.request.body;
    return {
        status: 200,
        body: "Hello "+ JSON.stringify(obj)

Let’s create the environment and function:

$ fission env create --name nodeenv --image fission/node-env
$ fission fn create --name consumerfunc --env nodeenv --code hellokafka.js

Connecting via trigger

We have both the functions ready but the connection between them is the missing glue. Let’s create a message queue trigger which will invoke the consumerfunc every time there is a message in input topic. The response will be sent to output topic and in case of consumerfunc invocation fails, the error is written to error topic.

$ fission mqt create --name kafkatest --function consumerfunc --mqtype kafka --topic input --resptopic output --errortopic error

If your Kafka broker is running somewhere else (not at broker.kafka:9092), you will have to provide custom configuration for Kafka broker host while installing fission. You can do that by creating a config file, set the value of kafka.brokers to your broker URL and provide this config file while installing fission through helm using -f flag. You can refer this link to find out more about this config parameter.

Testing it out

Let’s invoke the producer function so that the topic input gets some messages and we can see the consumer function in action.

$ fission fn test --name producerfunc
Successfully sent to input

There are a couple of ways you can verify that the consumerfunc is called:

time="2018-10-29T10:46:12Z" level=info msg="Calling message handler with value {"name": "value 2018-10-29T10:46:12Z "}" 
time="2018-10-29T10:46:12Z" level=info msg="Making HTTP request to http://router.fission/fission-function/consumer-func" 
time="2018-10-29T10:46:12Z" level=info msg="Got response Hello {"name":"value 2018-10-29T10:46:12Z "}" 
$ curl -X GET ''
    "key": null,
    "value": "SGVsbG8geyJuYW1lIjoidmFsdWUgMjAxOC0xMC0yOVQxMDo0NjoxMlogIn0=",
    "partition": 0,
    "offset": 2
$ echo "SGVsbG8geyJuYW1lIjoidmFsdWUgMjAxOC0xMC0yOVQxMDo0NjoxMlogIn0=" | base64 -D
Hello {"name":"value 2018-10-29T10:46:12Z "}

Introducing an error

Let’s introduce an error scenario - instead of consumer function returning a 200, you can return 400 which will cause an error:

module.exports = async function (context) {
    let obj = context.request.body;
    return {
        status: 400,
        body: "Hello "+ JSON.stringify(obj)

Update the function with new code and invoke the producer function:

$ fission fn update --name consumerfunc --code hellokafka.js 

$ fission fn test --name producerfunc
Successfully sent to input

We can verify the message in error topic as we did earlier:

$ curl -X GET ''
    "key": null,
    "value": "UmVxdWVzdCByZXR1cm5lZCBmYWlsdXJlOiA0MDA=",
    "partition": 0,
    "offset": 4

$ echo "UmVxdWVzdCByZXR1cm5lZCBmYWlsdXJlOiA0MDA="| base64 -D
Request returned failure: 400

More examples