11.3. Spark Core¶
geomesa-spark-core
is used to work directly with RDD
s of features
from GeoMesa and other geospatial data stores.
11.3.1. Example¶
The following is a complete Scala example of creating an RDD via a geospatial query against a GeoMesa data store:
// DataStore params to a hypothetical GeoMesa Accumulo table
val dsParams = Map(
"accumulo.instance.name" -> "instance",
"accumulo.zookeepers" -> "zoo1,zoo2,zoo3",
"accumulo.user" -> "user",
"accumulo.password" -> "*****",
"accumulo.catalog" -> "geomesa_catalog",
"geomesa.security.auths" -> "USER,ADMIN")
// set SparkContext
val conf = new SparkConf().setMaster("local[*]").setAppName("testSpark")
val sc = SparkContext.getOrCreate(conf)
// create RDD with a geospatial query using GeoMesa functions
val spatialRDDProvider = GeoMesaSpark(dsParams)
val filter = ECQL.toFilter("CONTAINS(POLYGON((0 0, 0 90, 90 90, 90 0, 0 0)), geom)")
val query = new Query("chicago", filter)
val resultRDD = spatialRDDProvider.rdd(new Configuration, sc, dsParams, query)
resultRDD.collect
// Array[org.geotools.api.feature.simple.SimpleFeature] = Array(
// ScalaSimpleFeature:4, ScalaSimpleFeature:5, ScalaSimpleFeature:6,
// ScalaSimpleFeature:7, ScalaSimpleFeature:9)
11.3.2. Configuration¶
geomesa-spark-core
provides an API for accessing geospatial data
in Spark, by defining an interface called SpatialRDDProvider
. Different
implementations of this interface connect to different input sources. These different
providers are described in more detail in Usage below.
GeoMesa provides several JAR-with-dependencies to simplify setting up the Spark
classpath. To use these libraries in Spark, the appropriate shaded JAR can be passed (for example)
to the spark-submit
command via the --jars
option:
--jars file://path/to/geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar
or passed to Spark via the appropriate mechanism in notebook servers such as Jupyter (see Deploying GeoMesa Spark with Jupyter Notebook) or Zeppelin.
Note
See Spatial RDD Providers for details on choosing the correct GeoMesa Spark runtime JAR.
The shaded JAR should also provide the dependencies needed for the
Converter RDD Provider and GeoTools RDD Provider, so these JARs
may simply be added to --jars
as well (though in the latter
case additional JARs may be needed to implement the GeoTools data store accessed).
11.3.3. Simple Feature Serialization¶
To serialize RDD
s of SimpleFeature
s between nodes of a cluster, Spark
must be configured with a Kryo serialization registrator provided in geomesa-spark-core
.
Note
Configuring Kryo serialization is not needed when running Spark in local
mode, as jobs will be executed within a single JVM.
Add these two entries to $SPARK_HOME/conf/spark-defaults.conf
(or pass them as --conf
arguments to spark-submit
):
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator
Note
Alternatively, these may be set in the SparkConf
object used to create the
SparkContext
:
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[GeoMesaSparkKryoRegistrator].getName)
When using Spark in a notebook server, this will require disabling the automatic
creation of a SparkContext
.
After setting the configuration options, RDDs created by the GeoMesa
SpatialRDDProvider
implementations will be properly registered with the
serializer provider.
11.3.4. Usage¶
The main point of entry for the functionality provided by geomesa-spark-core
is the
GeoMesaSpark
object:
val spatialRDDProvider = GeoMesaSpark(params)
GeoMesaSpark
loads a SpatialRDDProvider
implementation via SPI when the appropriate JAR is included on the classpath.
The implementation returned by GeoMesaSpark
is chosen based on the
parameters passed as an argument, as shown in the Scala code below:
// parameters to pass to the SpatialRDDProvider implementation
val params = Map(
"param1" -> "foo",
"param2" -> "bar")
// GeoTools Query; may be used to filter results retrieved from the data store
val query = new Query("foo")
// val query = new Query("foo", ECQL.toFilter("name like 'A%'"))
// get the RDD, using the SparkContext configured as above
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)
To save features, use the save()
method:
GeoMesaSpark(params).save(rdd, params, "gdelt")
Warning
The save()
method executes an appending write, and does not currently support updating existing features.
Reusing feature IDs is a logical error, and may produce inconsistencies in your data.
Note that some providers may be read-only.
See Spatial RDD Providers for details on specific provider implementations.
11.3.5. GeoJSON Output¶
The geomesa-spark-core
module provides a means of exporting an RDD[SimpleFeature]
to a
GeoJSON string. This allows for quick visualization of the data in many front-end mapping
libraries that support GeoJSON input such as Leaflet or Open Layers.
To convert an RDD, import the implicit conversion and invoke the asGeoJSONString
method.
import org.locationtech.geomesa.spark.SpatialRDD._
val rdd: RDD[SimpleFeature] = ???
val geojson = rdd.asGeoJSONString