DIS Event Source¶
Data Ingestion Service (DIS) can ingest large amounts of data in real time. You can create a function to automatically poll a DIS stream and process all new data records, such as website click streams, financial transactions, social media streams, IT logs, and location-tracking events. FunctionGraph periodically polls the stream for new data records. For details, see Using a DIS Trigger.
DIS example event¶
{
"ShardID": "shardId-0000000000",
"Message": {
"next_partition_cursor": "eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiZGlzLXN3dGVzdCIsInBhcnRpdGlvbi1pZCI6InNoYXJkSWQtMDAwMDAwMDAwMCIsImN1cnNvci10eXBlIjoiVFJJTV9IT1JJWk9OIiwic3RhcnRpbmctc2VxdWVuY2UtbnVtYmVyIjoiNCJ9LCJnZW5lcmF0ZVRpbWVzdGFtcCI6MTUwOTYwNjM5MjE5MX0",
"records": [
{
"partition_key": "shardId_0000000000",
"data": "d2VsY29tZQ==",
"sequence_number": "0"
},
{
"partition_key": "shardId_0000000000",
"data": "dXNpbmc=",
"sequence_number": "1"
},
{
"partition_key": "shardId_0000000000",
"data": "RnVuY3Rpb25TdGFnZQ==",
"sequence_number": "2"
},
{
"partition_key": "shardId_0000000000",
"data": "c2VydmljZQ==",
"sequence_number": "3"
}
],
"millis_behind_latest": ""
},
"Tag": "latest",
"StreamName": "dis-swtest"
}
Parameter description¶
Parameter |
Type |
Example Value |
Description |
|---|---|---|---|
ShardID |
String |
shardId-0000000000 |
Partition ID |
next_partition_cursor |
String |
See the example. |
Next partition cursor |
Records |
Map |
See the example. |
Data records stored in a DIS stream |
partition_key |
String |
See the example. |
Partition key |
data |
String |
See the example. |
Data blocks, which are added by the data producer to the stream |
sequence_number |
Int |
See the example. |
Record ID, which is automatically allocated by DIS |
Tag |
String |
latest |
Stream tag |
StreamName |
String |
dis-swtest |
Stream name |
Example¶
package main
import (
"encoding/json"
"fmt"
"github.com/opentelekomcloud-community/otc-functiongraph-go-runtime/go-runtime/events/dis"
"github.com/opentelekomcloud-community/otc-functiongraph-go-runtime/go-runtime/go-api/context"
"github.com/opentelekomcloud-community/otc-functiongraph-go-runtime/go-runtime/pkg/runtime"
)
// Example for Data Ingestion Service handler
func DisTest(payload []byte, ctx context.RuntimeContext) (interface{}, error) {
var disEvent dis.DISTriggerEvent
err := json.Unmarshal(payload, &disEvent)
if err != nil {
fmt.Println("Unmarshal failed")
return "invalid data", err
}
ctx.GetLogger().Logf("payload:%s", disEvent.String())
return "ok", nil
}
func main() {
runtime.Register(DisTest)
}