PySpark apply function to column in dataframe to get desired transformation as output. In this post, we will see 2 of the most common ways of applying function to column in PySpark. First is applying spark built-in functions to column and second is applying user defined custom function to columns in Dataframe.
PySpark apply spark built-in function to column
In this example, we will apply spark built-in function “lower()” to column to convert string value into lowercase. We can add a new column or even overwrite existing column using withColumn method in PySpark.
PySpark apply function to column to create a new column
In this example, we will add a new column “marketplace_lower” which will be derived from existing column “marketplace”. We will apply lower function to existing value to convert string to lowercase.
>>> df_shoes = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Shoes/") >>> df_shoes.select("marketplace","star_rating").distinct().show(5) +-----------+-----------+ |marketplace|star_rating| +-----------+-----------+ | JP| 3| | US| 2| | DE| 2| | US| 3| | FR| 1| +-----------+-----------+ only showing top 5 rows >>> from pyspark.sql.functions import lower, col >>> df_shoes.withColumn("marketplace_lower",lower(col("marketplace"))).select("marketplace","marketplace_lower","star_rating").distinct().show(5) +-----------+-----------------+-----------+ |marketplace|marketplace_lower|star_rating| +-----------+-----------------+-----------+ | FR| fr| 4| | UK| uk| 5| | JP| jp| 2| | JP| jp| 1| | FR| fr| 3| +-----------+-----------------+-----------+ only showing top 5 rows
Apply function to update column in PySpark
In this example, we will update value of existing column “marketplace” by applying lower function to convert string to lowercase.
>>> df_shoes.select("marketplace","star_rating").distinct().show(5) +-----------+-----------+ |marketplace|star_rating| +-----------+-----------+ | JP| 3| | US| 2| | DE| 2| | US| 3| | FR| 1| +-----------+-----------+ only showing top 5 rows >>> from pyspark.sql.functions import lower, col >>> df_shoes.withColumn("marketplace",lower(col("marketplace"))).select("marketplace","star_rating").distinct().show(5) +-----------+-----------+ |marketplace|star_rating| +-----------+-----------+ | jp| 1| | de| 4| | fr| 5| | de| 5| | fr| 4| +-----------+-----------+ only showing top 5 rows
PySpark apply custom UDF to column
In this example we will create a custom UDF (User Defined Function) and will apply to column in dataframe in PySpark. This custom UDF basically reads “star_rating” given to any product in data and will return corresponding description to it.
def star_rating_description(v_star_rating): if v_star_rating == 1: return "Poor" elif v_star_rating == 2: return "Fair" elif v_star_rating == 3: return "Average" elif v_star_rating == 4: return "Good" else: return "Excellent"
We will use this Python UDF in PySpark code and will apply to column.
Apply UDF to create a new column in PySpark
In this example , we will convert Python UDF to PySpark UDF and will use it in dataframe code.
from pyspark.sql.functions import udf,col from pyspark.sql.types import StringType udf_star_desc = udf(lambda x:star_rating_description(x),StringType() )
We have converted python UDF “star_rating_description” to PySpark UDF “udf_star_desc”. Now we can use it in PySpark code. “StringType” is the return type of PySpark function.
>>> df_shoes.withColumn("rating_description",udf_star_desc(col("star_rating"))).select("marketplace","star_rating","rating_description").distinct().show(5) +-----------+-----------+------------------+ |marketplace|star_rating|rating_description| +-----------+-----------+------------------+ | FR| 1| Poor| | DE| 3| Average| | US| 1| Poor| | US| 4| Good| | UK| 3| Average| +-----------+-----------+------------------+ only showing top 5 rows
I have also created a post that shares 25 examples to teach you everything about FILTERS in PySpark. You can read that detailed post here:
PySpark Filter – 25 examples to teach you everything
Apply UDF to update existing column in PySpark
You can update existing column value in PySpark using withColumn method and passing same name as column name to update its value.
>>> df_shoes.withColumn("star_rating",udf_star_desc(col("star_rating"))).select("marketplace","star_rating").distinct().show(5) +-----------+-----------+ |marketplace|star_rating| +-----------+-----------+ | JP| Excellent| | US| Poor| | DE| Good| | UK| Fair| | DE| Average| +-----------+-----------+ only showing top 5 rows