12.7. Listening for Feature Events¶
The GeoTools API includes a mechanism to fire off a FeatureEvent object each time
that there is an “event,” which occurs when data is added, changed, or deleted in a
SimpleFeatureSource. A client may implement a FeatureListener, which has a single
method called changed()
that is invoked each time that each FeatureEvent is
fired.
Three types of messages are produced by a GeoMesa Kafka producer. Each message will
cause a FeatureEvent to be fired when read by a GeoMesa Kafka consumer. All feature
event classes extend org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent
and are
contained in the companion object of the same name.
Message read | Class of event fired | FeatureEvent.Type | Filter |
---|---|---|---|
CreateOrUpdate | KafkaFeatureChanged |
CHANGED |
IN (<id>) |
A single feature with a given id has been added; this may be a new feature or an update of an existing feature | |||
Delete | KafkaFeatureRemoved |
REMOVED |
IN (<id>) |
The feature with the given id has been removed | |||
Clear | KafkaFeatureCleared |
REMOVED |
Filter.INCLUDE |
All features have been removed |
In addition to the normal information in a FeatureEvent, CreateOrUpdate messages expose the
relevant SimpleFeature
with the method feature()
. Delete messages expose the feature ID
with the method id()
, and also include the SimpleFeature
if it is available (it may be null).
All events expose the original Kafka timestamp with the method time()
.
To register a FeatureListener, create the SimpleFeatureSource from a GeoMesa
Kafka consumer data store, and use the addFeatureListener()
method. For example, the
following listener simply prints out the events it receives:
import org.geotools.data.FeatureEvent;
import org.geotools.data.FeatureListener;
import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent.KafkaFeatureChanged;
import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent.KafkaFeatureRemoved;
import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent.KafkaFeatureCleared;
// unless specified, the consumer will only read data written after its instantiation
SimpleFeatureSource source = ds.getFeatureSource(sftName);
FeatureListener listener = new FeatureListener() {
@Override
public void changed(FeatureEvent featureEvent) {
if (featureEvent instanceof KafkaFeatureChanged) {
KafkaFeatureChanged event = ((KafkaFeatureChanged) featureEvent);
System.out.println("Received add/update for " + event.feature() +
" at " + new java.util.Date(event.time()));
} else if (featureEvent instanceof KafkaFeatureRemoved) {
KafkaFeatureRemoved event = ((KafkaFeatureRemoved) featureEvent);
System.out.println("Received delete for " + event.id() + " " + event.feature() +
" at " + new java.util.Date(event.time()));
} else if (featureEvent instanceof KafkaFeatureCleared) {
KafkaFeatureCleared event = ((KafkaFeatureCleared) featureEvent);
System.out.println("Received clear at " + new java.util.Date(event.time()));
}
}
};
store.addFeatureListener(listener);
At cleanup time, it is important to unregister the feature listener with removeFeatureListener()
.
For example, for code run in a bean in GeoServer, the javax.annotation.PreDestroy
annotation may
be used to mark the method that does the deregistration:
@PreDestroy
public void dispose() throws Exception {
store.removeFeatureListener(listener);
// other cleanup
}