11.7. GeoMesa PySpark¶
GeoMesa provides integration with the Spark Python API for accessing data in GeoMesa data stores.
11.7.1. Prerequisites¶
Spark 2.4.x, 3.0.x or 3.1.x should be installed.
Python 2.7 or 3.x should be installed.
pip or
pip3
should be installed.conda-pack is optional.
11.7.2. Installation¶
The geomesa_pyspark
package is not available for download. Build the artifact locally with the profile
-Ppython
. Then install using pip
or pip3
as below. You will also need an appropriate
geomesa-spark-runtime
JAR. We assume the use of Accumulo here, but you may alternatively use any of
the providers outlined in Spatial RDD Providers.
mvn clean install -Ppython
pip3 install geomesa-spark/geomesa_pyspark/target/geomesa_pyspark-$VERSION.tar.gz
cp geomesa-accumulo/geomesa-accumulo-spark-runtime-accumulo2/target/geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar /path/to/
Alternatively, you can use conda-pack
to bundle the dependencies for your project. This may be more appropriate if
you have additional dependencies.
export ENV_NAME=geomesa-pyspark
conda create --name $ENV_NAME -y python=3.7
conda activate $ENV_NAME
pip install geomesa-spark/geomesa_pyspark/target/geomesa_pyspark-$VERSION.tar.gz
# Install additional dependencies using conda or pip here
conda pack -o environment.tar.gz
cp geomesa-accumulo/geomesa-accumulo-spark-runtime-accumulo2/target/geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar /path/to/
Warning
conda-pack
currently has issues with Python 3.8, and pyspark
has issues with Python 3.9, hence the explicit
use of Python 3.7
11.7.3. Using GeoMesa PySpark¶
You may then access Spark using a Yarn master by default. Importantly, because of the way the geomesa_pyspark
library interacts with the underlying Java libraries, you must set up the GeoMesa configuration before referencing
the pyspark
library.
import geomesa_pyspark
conf = geomesa_pyspark.configure(
jars=['/path/to/geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar'],
packages=['geomesa_pyspark','pytz'],
spark_home='/path/to/spark/').\
setAppName('MyTestApp')
conf.get('spark.master')
# u'yarn'
from pyspark.sql import SparkSession
spark = ( SparkSession
.builder
.config(conf=conf)
.enableHiveSupport()
.getOrCreate()
)
Alternatively, if you used conda-pack
then you do not need to set up the GeoMesa configuration as above, but you
must start pyspark
or your application as follows, updating paths as required:
PYSPARK_DRIVER_PYTHON=/opt/anaconda3/envs/$ENV_NAME/bin/python PYSPARK_PYTHON=./environment/bin/python pyspark \
--jars /path/to/geomesa-accumulo-spark-runtime_${VERSION}.jar \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python \
--master yarn --deploy-mode client --archives environment.tar.gz#environment
At this point you are ready to create a dict of connection parameters to your Accumulo data store and get a spatial data frame.
params = {
"accumulo.instance.id": "myInstance",
"accumulo.zookeepers": "zoo1,zoo2,zoo3",
"accumulo.user": "user",
"accumulo.password": "password",
"accumulo.catalog": "myCatalog"
}
feature = "mySchema"
df = ( spark
.read
.format("geomesa")
.options(**params)
.option("geomesa.feature", feature)
.load()
)
df.createOrReplaceTempView("tbl")
spark.sql("show tables").show()
# Count features in a bounding box.
spark.sql("""
select count(*)
from tbl
where st_contains(st_makeBBOX(-72.0, 40.0, -71.0, 41.0), geom)
""").show()
GeoMesa PySpark can also be used in the absence of a GeoMesa data store. Registering user-defined types and functions
can be done manually by invoking geomesa_pyspark.init_sql()
on the Spark session object:
geomesa_pyspark.init_sql(spark)
You can terminate the Spark job on YARN using spark.stop()
.
11.7.4. Using Geomesa UDFs in PySpark¶
There are 3 different ways to use the Geomesa UDFs from PySpark: from the SQL API, from the Fluent API via SQL expressions, or from the Fluent API via Python wrappers. These approaches are equivalent performance-wise, so choosing the best approach for your project comes down to preference.
11.7.4.1. 1. Accessing the Geomesa UDFs from the SQL API¶
We can access the Geomesa UDFs via the SQL API by simply including the functions in our SQL expressions.
df.createOrReplaceTempView("tbl")
spark.sql("""
select count(*) from tbl
where st_contains(st_makeBBOX(-72.0, 40.0, -71.0, 41.0), geom)
""").show()
11.7.4.2. 2. Accessing the Geomesa UDFs from the Fluent API via SQL Expressions¶
We can also access the Geomesa UDFs from the Fluent API via the pyspark.sql.functions module. This module has an expr function that we can use to access the Geomesa UDFs.
import pyspark.sql.functions as F
# add a new column
df = df.withColumn("geom_wkt", F.expr("st_asText(geom)"))
# filter using SQL where expression
df = df.select("*").where("st_area(geom) > 0.001")
df.show()
11.7.4.3. 3. Accessing the Geomesa UDFs from the Fluent API via Python Wrappers¶
We also support using the Geomesa UDFs as standalone functions through the use of Python wrappers. The Python wrappers for the Geomesa UDFs run on the JVM and are faster than logically equivalent Python UDFs.
from geomesa_pyspark.scala.functions import st_asText, st_area
df = df.withColumn("geom_wkt", st_asText("geom"))
df = df.withColumn("geom_area", st_area("geom"))
df.show()
11.7.5. Using Custom Scala UDFs from PySpark¶
We provide some utility functions in geomesa_pyspark that allow you to use your own Scala UDFs as standalone functions from PySpark. The advantage here is that you can write your UDFs in java or scala (so they run on the JVM), but can be used naturally from PySpark as if it were part of the Fluent API. This gives us the ability to write and use performant UDFs from PySpark without having to rely on Python UDFs, which can often be prohibitively slow for larger datasets.
from functools import partial
from geomesa_pyspark.scala.udf import build_scala_udf, scala_udf, ColumnOrName
from pyspark import SparkContext
from pyspark.sql.column import Column
sc = SparkContext.getOrCreate()
custom_udfs = sc._jvm.path.to.your.CustomUserDefinedFunctions
# use the helper function for building your udf
def my_scala_udf(col: ColumnOrName) -> Column:
"""helpful docstring that explains what col is"""
return build_scala_udf(sc, custom_udfs.my_scala_udf)(col)
# or alternatively, build it directly by partially applying the scala udf
my_other_udf = partial(scala_udf, sc, custom_udfs.my_other_udf())
df.withColumn("edited_field_1", my_scala_udf("field_1")).show()
df.withColumn("edited_field_2", my_other_udf("field_2")).show()
Recall that these UDFs can actually take either a pyspark.sql.column.Column or the string name of the column we wish to operate on, so the following are equivalent:
# this is more readable
df.withColumn("edited_field_1", my_scala_udf("field_1")).show()
# but we can also do this
df.withColumn("edited_field_1", my_scala_udf(col("field_1"))).show()
11.7.6. Jupyter¶
To use the geomesa_pyspark
package within Jupyter, you only needs a Python2 or Python3 kernel, which is
provided by default. Substitute the appropriate Spark home and runtime JAR paths in the above code blocks. Be sure
the GeoMesa Accumulo client and server side versions match, as described in Installing GeoMesa Accumulo.