DMS for Kafka (OPENSOURCE) Event Source¶
DMS for Kafka is a message queuing service that provides Kafka premium instances. If you create a Kafka trigger for a function, when a message is sent to a Kafka instance topic, FunctionGraph will retrieve the message and trigger the function to perform other operations. For details, see Using a Kafka Trigger.
Kafka example event¶
{
"event_version": "v1.0",
"event_time": 1576737962,
"trigger_type": "KAFKA",
"region": "{region}",
"instance_id": "81335d56-b9fe-4679-ba95-7030949cc76b",
"records": [
{
"messages": [
"kafka message1",
"kafka message2",
"kafka message3",
"kafka message4",
"kafka message5"
],
"topic_id": "topic-test"
}
]
}
Parameter description¶
Parameter |
Type |
Example Value |
Description |
|---|---|---|---|
event_version |
String |
v1.0 |
Event version |
event_time |
String |
2018-01-09T07:50:50.028Z |
Time when an event occurs |
trigger_type |
String |
KAFKA |
Event type |
region |
String |
{region} |
Region where a Kafka instance resides |
instance_id |
String |
81335d56-b9fe-4679-ba95-7030949cc76b |
Kafka instance ID |
messages |
String[] |
See the example. |
Message content |
topic_id |
String |
topic-test |
Message ID |
Example¶
package main
import (
"encoding/json"
"fmt"
"github.com/opentelekomcloud-community/otc-functiongraph-go-runtime/go-runtime/events/kafka"
"github.com/opentelekomcloud-community/otc-functiongraph-go-runtime/go-runtime/go-api/context"
"github.com/opentelekomcloud-community/otc-functiongraph-go-runtime/go-runtime/pkg/runtime"
)
// Example for Kafka (Opensourcekafka) handler
func KafkaTest(payload []byte, ctx context.RuntimeContext) (interface{}, error) {
var kafkaEvent kafka.KAFKATriggerEvent
err := json.Unmarshal(payload, &kafkaEvent)
if err != nil {
fmt.Println("Unmarshal failed")
return "invalid data", err
}
ctx.GetLogger().Logf("payload:%s", kafkaEvent.String())
return "ok", nil
}
func main() {
runtime.Register(KafkaTest)
}