DMS for Kafka (OPENSOURCE) Event Source

DMS for Kafka is a message queuing service that provides Kafka premium instances. If you create a Kafka trigger for a function, when a message is sent to a Kafka instance topic, FunctionGraph will retrieve the message and trigger the function to perform other operations. For details, see Using a Kafka Trigger.

Kafka example event

{
  "event_version": "v1.0",
  "event_time": 1576737962,
  "trigger_type": "KAFKA",
  "region": "eu-de",
  "instance_id": "08fd3e1b-cf56-401f-b4c6-81fd2a1d3ae6",
  "records": [
      {
          "messages": [
              "kafka message1",
              "kafka message2",
              "kafka message3",
              "kafka message4",
              "kafka message5"
          ],
          "topic_id": "topic-test"
      }
  ]
}

Parameter description

Parameter

Type

Description

event_version

String

Event version

event_time

String

Time when an event occurs

trigger_type

String

Event type: KAFKA

region

String

Region where a Kafka instance resides

instance_id

String

Kafka instance ID

messages

String[]

Message content

topic_id

String

Message ID

Example

package main

import (
	"encoding/json"
	"fmt"

	"github.com/opentelekomcloud-community/otc-functiongraph-go-runtime/go-runtime/events/kafka"
	"github.com/opentelekomcloud-community/otc-functiongraph-go-runtime/go-runtime/go-api/context"
	"github.com/opentelekomcloud-community/otc-functiongraph-go-runtime/go-runtime/pkg/runtime"
)

// Example for Kafka (Opensourcekafka) handler
func KafkaTest(payload []byte, ctx context.RuntimeContext) (interface{}, error) {
	var kafkaEvent kafka.KAFKATriggerEvent
	err := json.Unmarshal(payload, &kafkaEvent)
	if err != nil {
		fmt.Println("Unmarshal failed")
		return "invalid data", err
	}
	ctx.GetLogger().Logf("payload:%s", kafkaEvent.String())
	return "ok", nil
}

func main() {
	runtime.Register(KafkaTest)
}