18.8. Data Management

18.8.1. Kafka Topic Name

Each SimpleFeatureType (or schema) will be written to a unique Kafka topic. By default, the topic will be named based on the kafka.zk.path data store parameter and the SimpleFeatureType name, by appending the two together and replacing any / characters with -. For example, with the default zookeeper path (geomesa/ds/kafka), a SimpleFeatureType name of ‘foo’ would result in the topic geomesa-ds-kafka-foo.

If desired, the topic name can be set to an arbitrary value by setting the user data key geomesa.kafka.topic before calling createSchema:

SimpleFeatureType sft = ....;
sft.getUserData().put("geomesa.kafka.topic", "myTopicName");

For more information on how to set schema options, see Setting Schema Options.

18.8.2. Kafka Topic Configuration

The Kafka topic for a given SimpleFeatureType will be created when calling createSchema (if it doesn’t already exist). GeoMesa exposes a few configuration options through data store parameters. Additional options can be configured by setting the user data key kafka.topic.config before calling createSchema:

SimpleFeatureType sft = ....;
sft.getUserData().put("kafka.topic.config", "cleanup.policy=compact\nretention.ms=86400000");

The value should be in standard Java properties format. For a list of available configurations, refer to the Kafka documentation. For more information on how to set schema options, see Setting Schema Options.

Parallelism in Kafka is achieved through the use of multiple topic partitions. Each partition can only be read by a single Kafka consumer. The number of consumers can be controlled through the kafka.consumer.count data store parameter; however, this will have no effect if there is only a single topic partition. To create more than one partition, use the kafka.topic.partitions data store parameter.

Replication in Kafka ensures that data is not lost. To enable replication, use the kafka.topic.replication data store parameter.

18.8.3. Kafka Topic Compaction

Kafka has various options for preventing data from growing unbounded. The simplest is to set a size or time-based retention policy. This will cause older messages to be deleted when the topic reaches a certain threshold.

Starting with GeoMesa 2.1.0, the Kafka data store supports Kafka log compaction. This allows for the topic size to be managed, while preserving the latest state for each feature. When combined with Initial Load (Replay), the persistent state of a system can be maintained through restarts and down-time. Note that when using log compaction, it is important to send explicit deletes for each feature, otherwise the feature will never be compacted out from the log, and the log size will start to grow unbounded.

If upgrading from a version of GeoMesa prior to 2.1.0, the topic should be run for a while using a size or time-based retention policy before enabling compaction, as messages written with older versions of GeoMesa will never be compacted out.

18.8.4. Integration with Other Systems

The Kafka data store is easy to integrate with by consuming the Kafka topic. The messages are a change log of updates. Message keys consist of the simple feature ID, as UTF-8 bytes. Message bodies are serialized simple features, or null to indicate deletion. The internal serialization version is set as a message header under the key "v".

By default, message bodies are serialized with a custom Kryo serializer. For Java/Scala clients, the org.locationtech.geomesa.features.kryo.KryoFeatureSerializer class may be used to decode messages, available in the geomesa-feature-kryo module through Maven. Alternatively, producers can be configured to send Avro-encoded messages through the kafka.serialization.type data store parameter. Avro libraries exist in many languages, and Avro messages follow a defined schema that allows for cross-platform parsing.

If you are using the Confluent platform to manage Kafka, please see Confluent Integration.