12.2. Using the Kafka Data Store Programmatically¶
12.2.1. Creating a Data Store¶
An instance of a Kafka data store can be obtained through the normal GeoTools discovery methods,
assuming that the GeoMesa code is on the classpath. To create a KafkaDataStore
there are two
required properties, one for the Apache Kafka connection, kafka.brokers
, and one for the Apache
Zookeeper connection, kafka.zookeepers
. An optional parameter, kafka.zk.path
is
used to specify a path in Zookeeper under which schemas are stored. If
no zk path is specified then a default path will be used. Configuration
parameters are described fully below.
Map<String, Serializable> parameters = new HashMap<>();
parameters.put("kafka.zookeepers", "localhost:2181");
parameters.put("kafka.brokers", "localhost:9092");
org.geotools.data.DataStore dataStore =
org.geotools.data.DataStoreFinder.getDataStore(parameters);
12.2.2. Kafka Data Store Parameters¶
The Kafka Data Store takes several parameters (required parameters are marked with *
):
Parameter | Type | Description |
---|---|---|
kafka.brokers * |
String | Kafka brokers, e.g. “localhost:9092” |
kafka.zookeepers * |
String | Kafka zookeepers, e.g “localhost:2181” |
kafka.zk.path |
String | Zookeeper discoverable path (namespace) |
kafka.producer.config |
String | Configuration options for kafka producer, in Java properties format. See Producer Configs |
kafka.consumer.config |
String | Configuration options for kafka consumer, in Java properties format. See New Consumer Configs |
kafka.consumer.from-beginning |
Boolean | Start reading from the beginning of the topic (vs ignore existing messages). If enabled, features will not be available for query until all existing messages are processed. However, feature listeners will still be invoked as normal. |
kafka.consumer.count |
Integer | Number of kafka consumers used per feature type. Set to 0 to disable consuming (i.e. producer only) |
kafka.topic.partitions |
Integer | Number of partitions to use in kafka topics |
kafka.topic.replication |
Integer | Replication factor to use in kafka topics |
kafka.cache.expiry |
String | Expire features from in-memory cache after this delay, e.g. “10 minutes” |
kafka.cache.cleanup |
String | Clean expired cache entries every so often, e.g. “60 seconds”. If not specified, expired features will be cleaned incrementally during reads and writes |
kafka.cache.consistency |
String | Check the feature cache for consistency at this interval, e.g. “1 hour” |
kafka.cache.cqengine |
Boolean | Use CQEngine-based implementation of in-memory feature cache |
geomesa.query.loose-bounding-box |
Boolean | Use loose bounding boxes, which offer improved performance but are not exact |
geomesa.query.audit |
Boolean | Audit incoming queries. By default audits are written to a log file |
geomesa.security.auths |
String | Default authorizations used to query data, comma-separated |
More information on using GeoTools can be found in the GeoTools user guide.