18.8. Data Management¶
18.8.1. Kafka Topic Name¶
Each SimpleFeatureType (or schema) will be written to a unique Kafka topic. By default, the topic will be
named based on the kafka.zk.path
data store parameter and the SimpleFeatureType name, by appending
the two together and replacing any /
characters with -
. For example, with the default zookeeper path
(geomesa/ds/kafka
), a SimpleFeatureType name of ‘foo’ would result in the topic geomesa-ds-kafka-foo
.
If desired, the topic name can be set to an arbitrary value by setting the user data key geomesa.kafka.topic
before calling createSchema
:
SimpleFeatureType sft = ....;
sft.getUserData().put("geomesa.kafka.topic", "myTopicName");
For more information on how to set schema options, see Setting Schema Options.
18.8.2. Kafka Topic Configuration¶
The Kafka topic for a given SimpleFeatureType will be created when calling createSchema
(if it doesn’t already
exist). GeoMesa exposes a few configuration options through data store parameters. Additional options can
be configured by setting the user data key kafka.topic.config
before calling createSchema
:
SimpleFeatureType sft = ....;
sft.getUserData().put("kafka.topic.config", "cleanup.policy=compact\nretention.ms=86400000");
The value should be in standard Java properties format. For a list of available configurations, refer to the Kafka documentation. For more information on how to set schema options, see Setting Schema Options.
Parallelism in Kafka is achieved through the use of multiple topic partitions. Each partition can only be read
by a single Kafka consumer. The number of consumers can be controlled through the kafka.consumer.count
data
store parameter; however, this will have no effect if there is only a single topic partition. To create more than
one partition, use the kafka.topic.partitions
data store parameter.
Replication in Kafka ensures that data is not lost. To enable replication, use the kafka.topic.replication
data store parameter.
18.8.3. Kafka Topic Compaction¶
Kafka has various options for preventing data from growing unbounded. The simplest is to set a size or time-based retention policy. This will cause older messages to be deleted when the topic reaches a certain threshold.
Starting with GeoMesa 2.1.0, the Kafka data store supports Kafka log compaction. This allows for the topic size to be managed, while preserving the latest state for each feature. When combined with Initial Load (Replay), the persistent state of a system can be maintained through restarts and down-time. Note that when using log compaction, it is important to send explicit deletes for each feature, otherwise the feature will never be compacted out from the log, and the log size will start to grow unbounded.
If upgrading from a version of GeoMesa prior to 2.1.0, the topic should be run for a while using a size or time-based retention policy before enabling compaction, as messages written with older versions of GeoMesa will never be compacted out.
18.8.4. Integration with Other Systems¶
The Kafka data store is easy to integrate with by consuming the Kafka topic. The messages are a change log of
updates. Message keys consist of the simple feature ID, as UTF-8 bytes. Message bodies are serialized simple
features, or null to indicate deletion. The internal serialization version is set as a message header under the
key "v"
, when using Kafka 0.11.x or newer.
By default, message bodies are serialized with a custom Kryo serializer. For Java/Scala clients, the
org.locationtech.geomesa.features.kryo.KryoFeatureSerializer
class may be used to decode messages, available
in the geomesa-feature-kryo
module through Maven. Alternatively, producers can be configured to send
Avro-encoded messages through the kafka.serialization.type
data store parameter. Avro libraries exist in many
languages, and Avro messages follow a defined schema that allows for cross-platform parsing.
If you are using the Confluent platform to manage Kafka, please see Confluent Integration.