25. GeoMesa Stream Processing¶
The GeoMesa Stream library (geomesa-stream
in the source distribution)
provides tools to process streams of
SimpleFeatures
. The library can be used to instantiate a
DataStore
either in GeoServer or in a user’s application to serve as
a constant source of SimpleFeatures
. For example, you can
instantiate a DataStore
that will connect to Twitter and show the
most recent tweets in a spatial context. The timeout for the
DataStore
is configurable. A stream can be defined against any
source that can be processed by Apache Camel. A
SimpleFeatureConverter
can be attached to the stream to translate
the underlying data into SimpleFeatures
.
25.1. Modules¶
geomesa-stream-api
- the stream source and processing APIsgeomesa-stream-generic
- definition of the Camel generic sourcegeomesa-stream-datastore
-DataStore
implementationgeomesa-stream-gs-plugin
- GeoServer hooks for stream sources
25.2. Usage¶
To illustrate usage, assume we are processing a stream of Twitter data as a csv. The configuration in GeoServer is as follows:
{
type = "generic"
source-route = "netty4:tcp://localhost:5899?textline=true"
sft = {
type-name = "twitter"
fields = [
{ name = "user", type = "String" }
{ name = "msg", type = "String" }
{ name = "geom", type = "Point", index = true, srid = 4326, default = true }
{ name = "dtg", type = "Date", index = true }
]
}
converter = {
id-field = "md5(string2bytes($0))"
type = "delimited-text"
format = "DEFAULT"
fields = [
{ name = "user", transform = "$0" }
{ name = "msg", transform = "$1" }
{ name = "geom", transform = "point($2::double, $3::double)" }
{ name = "dtg", transform = "datetime($4)" }
]
}
}
This defines a stream source that will listen on port 5899 for csv
messages that have the following columns: user
, msg
, lon
,
lat
, dtg
. To instantiate a DataStore
for this type that
keeps the last 30 seconds of tweets, use the following code.
val ds = DataStoreFinder.getDataStore(
Map(
StreamDataStoreParams.STREAM_DATASTORE_CONFIG.key -> sourceConf,
StreamDataStoreParams.CACHE_TIMEOUT.key -> Integer.valueOf(30)
))
To query this stream source, use a FilterFactory
from
org.geotools.factory.CommonFactoryFinder
. To receive notifications
on new SimpleFeatures
, use a StreamListener
:
val listener =
new StreamListener {
def onNext(sf: SimpleFeature) = println(s"Received a new feature: ${sf.getID}")
}
ds.asInstanceOf[org.locationtech.geomesa.stream.datastore.StreamDataStore].registerListener(listener)
25.3. UDP¶
The generic source can be used with UDP as well, although there are some caveats:
- If you are sending text, the source route must include ‘?textline=true’, even though the Camel docs say that only applies to TCP
- Each UDP packet data must end with a newline character
- Each UDP packet data must contain exactly one line - everything after the newline will be dropped
- Maximum text line size can be controlled by the route parameter ‘decoderMaxLineLength’ with a maximum value of 2048
- If the message is longer than the line size then the message will be dropped
- Default maximum text line length is 1024
- Note that technically the line length can be longer, but Camel does not expose the Netty UDP RCVBUF_ALLOCATOR option, which causes messages to be truncated at 2048 bytes.