PySpark

PySpark apply function to column

PySpark apply function to column in dataframe to get desired transformation as output. In this post, we will see 2 of the most common ways of applying function to column in PySpark. First is applying spark built-in functions to column and second is applying user defined custom function to columns in Dataframe.

PySpark apply spark built-in function to column

In this example, we will apply spark built-in function “lower()” to column to convert string value into lowercase. We can add a new column or even overwrite existing column using withColumn method in PySpark.

PySpark apply function to column to create a new column

In this example, we will add a new column “marketplace_lower” which will be derived from existing column “marketplace”. We will apply lower function to existing value to convert string to lowercase.

>>> df_shoes = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Shoes/")
>>> df_shoes.select("marketplace","star_rating").distinct().show(5)
+-----------+-----------+
|marketplace|star_rating|
+-----------+-----------+
|         JP|          3|
|         US|          2|
|         DE|          2|
|         US|          3|
|         FR|          1|
+-----------+-----------+
only showing top 5 rows

>>> from pyspark.sql.functions import lower, col
>>> df_shoes.withColumn("marketplace_lower",lower(col("marketplace"))).select("marketplace","marketplace_lower","star_rating").distinct().show(5)
+-----------+-----------------+-----------+
|marketplace|marketplace_lower|star_rating|
+-----------+-----------------+-----------+
|         FR|               fr|          4|
|         UK|               uk|          5|
|         JP|               jp|          2|
|         JP|               jp|          1|
|         FR|               fr|          3|
+-----------+-----------------+-----------+
only showing top 5 rows

Apply function to update column in PySpark

In this example, we will update value of existing column “marketplace” by applying lower function to convert string to lowercase.

>>> df_shoes.select("marketplace","star_rating").distinct().show(5)
+-----------+-----------+
|marketplace|star_rating|
+-----------+-----------+
|         JP|          3|
|         US|          2|
|         DE|          2|
|         US|          3|
|         FR|          1|
+-----------+-----------+
only showing top 5 rows

>>> from pyspark.sql.functions import lower, col
>>> df_shoes.withColumn("marketplace",lower(col("marketplace"))).select("marketplace","star_rating").distinct().show(5)
+-----------+-----------+
|marketplace|star_rating|
+-----------+-----------+
|         jp|          1|
|         de|          4|
|         fr|          5|
|         de|          5|
|         fr|          4|
+-----------+-----------+
only showing top 5 rows

PySpark apply custom UDF to column

In this example we will create a custom UDF (User Defined Function) and will apply to column in dataframe in PySpark. This custom UDF basically reads “star_rating” given to any product in data and will return corresponding description to it.

def star_rating_description(v_star_rating):
	if v_star_rating == 1:
		return "Poor"
	elif v_star_rating == 2:
		return "Fair"
	elif v_star_rating == 3:
		return "Average"
	elif v_star_rating == 4:
		return "Good"
	else:
		return "Excellent"

We will use this Python UDF in PySpark code and will apply to column.

Apply UDF to create a new column in PySpark

In this example , we will convert Python UDF to PySpark UDF and will use it in dataframe code.

from pyspark.sql.functions import udf,col
from pyspark.sql.types import StringType
udf_star_desc = udf(lambda x:star_rating_description(x),StringType() )

We have converted python UDF “star_rating_description” to PySpark UDF “udf_star_desc”. Now we can use it in PySpark code. “StringType” is the return type of PySpark function.

>>> df_shoes.withColumn("rating_description",udf_star_desc(col("star_rating"))).select("marketplace","star_rating","rating_description").distinct().show(5)
+-----------+-----------+------------------+
|marketplace|star_rating|rating_description|
+-----------+-----------+------------------+
|         FR|          1|              Poor|
|         DE|          3|           Average|
|         US|          1|              Poor|
|         US|          4|              Good|
|         UK|          3|           Average|
+-----------+-----------+------------------+
only showing top 5 rows

I have also created a post that shares 25 examples to teach you everything about FILTERS in PySpark. You can read that detailed post here:
PySpark Filter – 25 examples to teach you everything

Apply UDF to update existing column in PySpark

You can update existing column value in PySpark using withColumn method and passing same name as column name to update its value.

>>> df_shoes.withColumn("star_rating",udf_star_desc(col("star_rating"))).select("marketplace","star_rating").distinct().show(5)
+-----------+-----------+
|marketplace|star_rating|
+-----------+-----------+
|         JP|  Excellent|
|         US|       Poor|
|         DE|       Good|
|         UK|       Fair|
|         DE|    Average|
+-----------+-----------+
only showing top 5 rows

Leave a Reply