Spatial join with spark pandas_udf

Áron Asztalos
4 min readJun 29, 2021

I am a data scientist. During my work, I meet huge datasets that contain spatial data information for example coordinates.

Introduction

Spark is the very commonly used language in the world of Big Data. I don’t say, that is the easiest tool. When you start using Spark, there is the typical moment when you realize that data science and data engineering are not separable. You should know, what is under the hood. When you set up a spark session you need to be aware of what the cluster and your database look like. But this will be another post.

I will talk about the following:

  • What is spark udf?
  • What is pandas_udf?
  • How can we create a spatial join function in pyspark?

I assume you know what is spatial join, if you don't I warmly recommend this post to read:

What is spark udf?

Spark udf is a User Defined Functions that execute your python functions on worker nodes (executors). You will found a lot of articles about “why you should avoid UDFs”. Easy to say hard to do.

For example, if you want to know if your coordinates are within a chosen area, you should use a spatial query which is not implemented in spark functions.

Important! Every python package that you use in UDF should be installed on worker nodes or send it to workers!

import os
from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
"spark.archives", # 'spark.yarn.dist.archives' in YARN.
"pyspark_conda_env.tar.gz#environment").getOrCreate()

What is pandas_udf?

pandas_udf is pyspark User Defined Functions in which input should be one or more pandas series and the output should be one pandas series.

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType

def own_pandas_func(x,y…):
"""
"""
return pandas_series
own_pandas_udf = pandas_udf(own_pandas_udf)

Output length must equal input length.

How can we create a spatial join function in pyspark?

Let’s see an example code:

Create geopandas dataframe with two polygons.

import geopandas as gpd
from shapely.geometry import Polygon
polys1 = gpd.GeoSeries([Polygon([(0,0), (2,0),(2.5,0.5), (2,2), (0.5,1.5)]),Polygon([(1,1), (3,1), (3,3), (1,3)])])
df1 = gpd.GeoDataFrame({'geometry': polys1, 'poly_ID':[1,2]})
df1.head()
df1.head()
df1.plot(color=['red','green'],alpha=0.5)
df1 GeoDataFrame

Generate random points

import pandas as pd
from random import random
range_num = 10000
data = [[x,random()*3,random()*3] for x in range(range_num)]
df = pd.DataFrame(data,columns=['id','x','y'])
%matplotlib inline
ax = gpd.GeoDataFrame(data=df, geometry=gpd.points_from_xy(df.x,df.y)).plot(figsize=(10,10),color='black',markersize=1)
ax = df1.plot(color=['red','green'],alpha=0.5,ax=ax)
Polygons with random points

Create spark dataframe

from pyspark.sql import SparkSession
spark = SparkSession\
.builder.enableHiveSupport()\
.appName('Pandas_udf_example')\
.getOrCreate()
spark_df = spark.createDataFrame(df)
spark_df.show()
spark dataframe

Define the spatial join function

It has a lot of tricky sections. Encapsulating the geopandas dataframe, using left join, grouping, etc …

sjoin_udf

raw code:

from pyspark.sql.types import StringType
from pyspark.sql.functions import col, pandas_udf
def create_sjoin_udf(gdf_with_poly,join_column_name):
def sjoin_settlement(x, y):
gdf_temp = gpd.GeoDataFrame(data = [[x] for x in range(len(x))],geometry=gpd.points_from_xy(x,y),columns=['id'])
settlement = gpd.sjoin(gdf_temp,gdf_with_poly,how='left',op='within')#.fillna(np.nan)
return settlement.groupby('id').agg({'poly_ID':lambda x: list(x)}).reset_index().sort_values(by='id').loc[:,join_column_name].astype('str')# if pd.isnull(sum(x)) == False else np.nan}).reset_index().sort_values(by='id').loc[:,join_column_name]
return pandas_udf(sjoin_settlement, returnType=StringType())sjoin_udf = create_sjoin_udf(df1,'poly_ID')

Run the sjoin_udf on the spark dataframe:

spark_df = spark_df.withColumn('poly_ID',sjoin_udf(spark_df.x,spark_df.y))
spark_df.show()
spark_df

I hope this post was useful for you! If you have any questions about it use the comment section.

--

--

Áron Asztalos

I am a Data Scientist with physicist and GIS background. In the world of big data, I learn something about data engineering every day.