18.7. Kafka Index Configuration

GeoMesa exposes a variety of configuration options that can be used to customize and optimize a given installation. Because a Kafka data store maintains an in-memory cache of features, most of these options can be configured on a consumer data store and take immediate effect.

18.7.1. Initial Load (Replay)

By default, a Kafka consumer data store will start consuming from the end of a topic. This means that it will only see new updates that are written after it has spun up. Optionally, the consumer may start from earlier in the topic, by setting kafka.consumer.read-back to a duration, such as 1 hour, in the data store parameters. This allows a consumer to replay old messages and establish a baseline state. To read the entire message queue, use the value Inf.

Note that a feature store will not return any query results during this initial load, until it has caught up to head state.

Also see Kafka Topic Compaction for details on managing the size and history of the Kafka topic.

18.7.2. Feature Expiration

Generally, a Kafka consumer data store will keep any features that are written by a producer data store, until they are explicitly deleted by the producer using a modifying feature writer. Optionally, a consumer data store may expire features after a certain timeout, by specifying the kafka.cache.expiry data store parameter. When a producer writes an update to an existing feature, the consumer will reset the expiration timeout. Once the timeout is hit without any updates, the feature will be removed from the consumer cache and will no longer be returned when querying.

For advanced use cases, the kafka.cache.expiry.dynamic data store parameter can be used to expire features based on matching filter predicates. The value should be a TypeSafe config document where the keys are CQL filter strings and the values are expiration duration strings (for example, { "type = 'boat'": "100ms" }). The filters will be evaluated in the order they are declared. Any features which do not match one of the filters will use the default kafka.cache.expiry value, if provided.

If the expiry is set to zero, features will not be indexed or searchable. However, they will still be passed to any feature listeners that are configured.

18.7.3. Feature Event Time

By default, expiration and updates are determined by Kafka message time. Feature updates will replace any prior feature message, and feature will expire based on when they were read. Alternatively, one or both of these values may be based on feature attributes.

To enable event time, specify a property name or CQL expression using the kafka.cache.event-time data store parameter. This expression will be evaluated on a per-feature basis, and must evaluate to either a date or a number representing milliseconds since the Java epoch. This value will be combined with the kafka.cache.expiry value to set an expiration time for the feature.

To also enable event time ordering, set the kafka.cache.event-time.ordering data store parameter to true. When enabled, if a feature update is read that has an older event time than the current feature, the message will be discarded. This can be useful for handling irregular update streams.

18.7.4. Spatial Index Resolution

The Kafka consumer data store uses an in-memory spatial index for querying. The spatial index breaks the world up into a coarse grid, and then only examines the relevant grid cells when running a spatial query. The grid size can be modified by setting the kafka.index.resolution.x and/or kafka.index.resolution.y data store parameters. By default, the grid is 360 by 180 cells. Increasing the grid resolution may reduce the number of false-positive features that must be considered when querying, and can reduce contention between simultaneous updates, deletes and queries. However, it also requires more memory.

18.7.5. Spatial Index Tiering

For geometries with extents (i.e. non-points), the Kafka consumer data store uses a tiered in-memory spatial index for querying. Geometries are stored in a tier based on their envelope size. The number and size (in degrees) of tiers can be modified by setting the kafka.index.tiers data store parameter. By default, four tiers are created of sizes 1x1, 4x4, 32x32 and 360x180. In general, you want the tiers to correspond to the size of the geometries you are indexing. Geometries which are larger than any of the available tiers will not be indexable; thus it is standard to include a ‘catch-all’ tier that encompases the whole world.

Tiers may be specified by comma-separated pairs of numbers, where each pair is separated with a :. For example, the default tiers would be specified as 1:1,4:4,32:32,360:180.

18.7.6. CQEngine Indexing

By default, a Kafka consumer data store only creates a spatial index. Any other queries (for example, date queries) will have to iterate over all the features in the index. Generally the number of features is reasonable, and this is still a fast operation.

For more advanced use-cases, additional in-memory index structures can be created to satisfy non-spatial queries. This can be enabled by setting the kafka.index.cqengine data store parameter. The value should be a comma-delimited list of name:type, where name is an attribute name and type is a CQEngine index type. See In-Memory Indexing for more information. Note that when using CQEngine, the default geometry will not be indexed without an explicit configuration (e.g. geom:geometry in the parameter value). In addition, CQEngine may require more processing than the standard index.

As an example, consider the schema name:String,age:Int,dtg:Date,*geom:Point:srid=4326. To create an index on each attribute, you could set kafka.index.cqengine to name:radix,age:default,dtg:navigable,geom:geometry. See In-Memory Indexing for an explanation of the index types.

18.7.7. Lazy Deserialization

By default, a Kafka consumer data store will use lazy (on-demand) deserialization of feature attributes. For rendering maps (which usually only require the geometry attribute) or for write-heavy workflows, this can avoid the overhead of deserializing and instantiating attributes that may never be read. If writes are infrequent, or all features and attributes are consistently read, then lazy deserialization can be disabled by setting the kafka.serialization.lazy data store parameter to false. Lazy deserialization incurs a very small runtime (query) penalty, as each attribute has to be checked for deserialization before being returned.