DMS for Kafka (OPENSOURCE) Event Source¶
DMS for Kafka is a message queuing service that provides Kafka premium instances. If you create a Kafka trigger for a function, when a message is sent to a Kafka instance topic, FunctionGraph will retrieve the message and trigger the function to perform other operations. For details, see Using a Kafka Trigger.
Kafka example event¶
{
"event_version": "v1.0",
"event_time": 1576737962,
"trigger_type": "KAFKA",
"region": "eu-de",
"instance_id": "08fd3e1b-cf56-401f-b4c6-81fd2a1d3ae6",
"records": [
{
"messages": [
"kafka message1",
"kafka message2",
"kafka message3",
"kafka message4",
"kafka message5"
],
"topic_id": "topic-test"
}
]
}
Parameter description¶
Parameter |
Type |
Description |
|---|---|---|
event_version |
String |
Event version |
event_time |
String |
Time when an event occurs |
trigger_type |
String |
Event type: KAFKA |
region |
String |
Region where a Kafka instance resides |
instance_id |
String |
Kafka instance ID |
messages |
String[] |
Message content |
topic_id |
String |
Message ID |
Example¶
package main
import (
"encoding/json"
"fmt"
"github.com/opentelekomcloud-community/otc-functiongraph-go-runtime/go-runtime/events/kafka"
"github.com/opentelekomcloud-community/otc-functiongraph-go-runtime/go-runtime/go-api/context"
"github.com/opentelekomcloud-community/otc-functiongraph-go-runtime/go-runtime/pkg/runtime"
)
// Example for Kafka (Opensourcekafka) handler
func KafkaTest(payload []byte, ctx context.RuntimeContext) (interface{}, error) {
var kafkaEvent kafka.KAFKATriggerEvent
err := json.Unmarshal(payload, &kafkaEvent)
if err != nil {
fmt.Println("Unmarshal failed")
return "invalid data", err
}
ctx.GetLogger().Logf("payload:%s", kafkaEvent.String())
return "ok", nil
}
func main() {
runtime.Register(KafkaTest)
}