Kafka Route
Supported Kafka Clients
The currently supported Kafka client version is 0.10.2.0
EC uses Apache Camel to route messages to an external Kafka destination.
Parameter Name | Description | Default |
---|---|---|
Kafka Brokers | Comma separated list of Kafka brokers in the format host:port (or ip:port) | 127.0.0.1:9092 |
Header Property | Header property name in the Camel message where Camel Kafka component will find the composed URL representing the endpoint to invoke. For most of the use cases the default configuration works good and should not to be changed. | evaluatedKafkaEndPoint |
Original Topic | The name of the topic where the message was originally sent to by the device. By default it is read from the originalTopic property of the Camel message. It is used by Kafka to calculate the partition key. For most of the use cases the default configuration works good and should not to be changed. | $simple{in.header.originalTopic} |
Message Converter | Message converter to conver the incoming message before forwarding it to Kafka Available options are: - none: no message converter will be used so the message forwarded to Kafka will be the same received by the broker - json: the incoming message will be converted to a Kura payload then to Json (see 1). | none |
Key Serializer | Kafka key serializer. For the serializers provided by Kafka please refer to the Kafka documentation. | org.apache.kafka.common.serialization.StringSerializer |
Serializer | Kafka message serializer. For the serializers provided by Kafka please refer to the Kafka documentation. | org.apache.kafka.common.serialization.StringSerializer |
URL Options | Additional Kafka endpoint options. This field allows to specify more parameters to be used by the Kafka client. The options are added to the composed URL. For the available client parameters please refer to the Kafka client documentation (see 3). | none |
Topic Matching | Describes how to map the incoming messages topics to Kafka topics (see 2). The field expects a list of rows composed by comma separeted key value pair. The key is the regex to match the incoming topics. The value is the corresponding destination Kafka topic. The destination Kafka topic can contain variables recognized by the routing service (currently only ACCOUNT variable is supported). The default value maps all the incoming topics into the ${ACCOUNT} (account name) topic. | .*,${ACCOUNT} |
1) Output Json example
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"scopeId": "AQ",
"deviceId": "RyO9_XKGRsw",
"clientId": "client-id-0",
"receivedOn": 1543486113,
"sentOn": 1543486113,
"capturedOn": 1543486113,
"position": {
"longitude": 0.0,
"latitude": 0.0,
"precision": 50,
"heading": 240,
"speed": 110,
"timestamp": 1543486113,
"satellites": 2,
"status": 0
},
"channel": {
"semanticParts": [ "one", "two", "three" ]
},
"payload": {
"metrics": {
"metric_one": {
"value": "string_value",
"valueType": "string"
},
"metric_two": {
"value": 123,
"valueType": "integer"
},
"metric_three": {
"value": true,
"valueType": "boolean"
}
},
"body": "VGhpcyBpcyBhIGJvZHkgY29udGVudA=="
}
}
2) Topic matching example
This is a topic matching example.
Please note that the incoming message use the JMS syntax (so the topic levels are . sepated instead of / separated)
(.*\.){2}topic1\.topic1_1,kafka_topic_1
(.*\.){2}topic2,kafka_topic_2
(.*\.){2}topic3\.topic3_1,kafka_topic_3
This configuration forwards the messages for all the account's clients to different destination topics depending on 3 semantic topics:
MQTT semantic topic | Kafka topic |
---|---|
topic1/topic1_1 | kafka_topic_1 |
topic2 | kafka_topic_2 |
opic3/topic3_1 | kafka_topic_3 |
Topic Names
Kafka allows only few characters in the topic (regex coming from the Kafka source code "[a-zA-Z0-9._-]"). Please take care of this limitation while setting the topic name route field.
3) URL Options
The route supports various configuration options. For example these are various combinations used for different authentication methods.
No authentication without SSL | &securityProtocol=PLAINTEXT |
SASL/PLAIN without SSL | &securityProtocol=SASL_PLAINTEXT&saslMechanism=PLAIN |
SASL/PLAIN with SSL | &securityProtocol=SASL_SSL&saslMechanism=PLAIN |
SASL/GSSAPI without SSL | &securityProtocol=SASL_PLAINTEXT&saslMechanism=GSSAPI&saslKerberosServiceName=kafka |
SASL/GSSAPI with SSL | &securityProtocol=SASL_SSL&saslMechanism=GSSAPI&saslKerberosServiceName=kafka |
For more details regarding security configuration refer to section Kafka Integration
Updated over 5 years ago