Introduction
PySpark is becoming obvious choice for the enterprises when it comes to moving to Spark. As per my understanding , this is primarily for 2 reasons :
- The developers are people like me who are expert in SQL but not in programming language like Java, C#. So when it comes to picking one language for Spark, existing capability team is more comfortable working on Python than Scala.
- Extension to Data Analytics is Data Science. AI & Machine learning projects are also gaining lot of attention from enterprises now. Python is obvious choice here due to rich library support for Machine Learning projects. The same team members can contribute to Data Engineering, Data Analytics & Data Science and switching across teams is easy if you are comfortable with Python.
In this post, we will see how to learn PySpark from Scratch and this shall especially help people who are good with SQL and now wish to adopt Spark. So I will try to highlight the similarities between SQL and PySpark as well.
We will cover below 5 points in this post:
- Check Hadoop/Python/Spark version
- Connect to PySpark CLI
- Read CSV file into Dataframe and check some/all columns & rows in it.
- Check schema and copy schema from one dataframe to another
- Basic Metadata info of Dataframe
Check Hadoop/Python/Spark version
Check Hadoop Version: hadoop version Hadoop 2.10.1-amzn-0 Check Python Version: python --version Python 3.7.9 Check Spark Version: pyspark --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.7-amzn-0 /_/ Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_272
Connect to PySpark CLI
If you have pyspark installed and configured correctly, just type “pyspark” and hit enter. This will open pyspark in the terminal and you may see prompt like “SparkSession available as ‘spark'”. So now you don’t have to create a SparkSession explicitly and you can use ‘spark’ directly. We will see in later posts how to create and use SparkSession when running ETL jobs in batch mode.
Read CSV file into a PySpark Dataframe
Next step is to read a csv file into a Dataframe and run some select operation on it. For now, just assume Dataframe to be equivalent to Tables we have in RDBMS.
You can download the data file used in below examples for practice. I have uploaded file into S3 and not everyone may have access to it. Please change the path to point to path where you have placed data file.
df_category = spark.read.csv("hdfs:///pyspark.demo/raw/category_pipe.txt") df_category.show() +--------------------+ | _c0| +--------------------+ |1|Sports|MLB|Majo...| |2|Sports|NHL|Nati...| |3|Sports|NFL|Nati...| |4|Sports|NBA|Nati...| |5|Sports|MLS|Majo...| |6|Shows|Musicals|...| |7|Shows|Plays|All...| |8|Shows|Opera|All...| |9|Concerts|Pop|Al...| |10|Concerts|Jazz|...| |11|Concerts|Class...| +--------------------+
So we have loaded one file into a dataframe but all the columns are going into a single column. To fix this we have to specify proper delimiter while reading csv file.
df_category = spark.read.option("delimiter","|").csv("hdfs:///pyspark.demo/raw/category_pipe.txt") df_category.show() +---+--------+---------+--------------------+ |_c0| _c1| _c2| _c3| +---+--------+---------+--------------------+ | 1| Sports| MLB|Major League Base...| | 2| Sports| NHL|National Hockey L...| | 3| Sports| NFL|National Football...| | 4| Sports| NBA|National Basketba...| | 5| Sports| MLS| Major League Soccer| | 6| Shows| Musicals| Musical theatre| | 7| Shows| Plays|All non-musical t...| | 8| Shows| Opera|All opera and lig...| | 9|Concerts| Pop|All rock and pop ...| | 10|Concerts| Jazz|All jazz singers ...| | 11|Concerts|Classical|All symphony, con...| +---+--------+---------+--------------------+
Now the column looks ok however we see some values in last column is truncated. So to view the complete values without truncation in PySpark Dataframe we use “truncate=False” in show function.
df_category.show(truncate=False) +---+--------+---------+------------------------------------------+ |_c0|_c1 |_c2 |_c3 | +---+--------+---------+------------------------------------------+ |1 |Sports |MLB |Major League Baseball | |2 |Sports |NHL |National Hockey League | |3 |Sports |NFL |National Football League | |4 |Sports |NBA |National Basketball Association | |5 |Sports |MLS |Major League Soccer | |6 |Shows |Musicals |Musical theatre | |7 |Shows |Plays |All non-musical theatre | |8 |Shows |Opera |All opera and light opera | |9 |Concerts|Pop |All rock and pop music concerts | |10 |Concerts|Jazz |All jazz singers and bands | |11 |Concerts|Classical|All symphony, concerto, and choir concerts| +---+--------+---------+------------------------------------------+
To show only N rows from Dataframe in PySpark you can pass the number to show function. Like to show 5 rows, you can pass 5 to show function.
df_category.show(5,truncate=False) +---+------+---+-------------------------------+ |_c0|_c1 |_c2|_c3 | +---+------+---+-------------------------------+ |1 |Sports|MLB|Major League Baseball | |2 |Sports|NHL|National Hockey League | |3 |Sports|NFL|National Football League | |4 |Sports|NBA|National Basketball Association| |5 |Sports|MLS|Major League Soccer | +---+------+---+-------------------------------+ only showing top 5 rows
Case 1: Read all columns in the Dataframe in PySpark
df_category.select("*").show(5,truncate=False) +---+------+---+-------------------------------+ |_c0|_c1 |_c2|_c3 | +---+------+---+-------------------------------+ |1 |Sports|MLB|Major League Baseball | |2 |Sports|NHL|National Hockey League | |3 |Sports|NFL|National Football League | |4 |Sports|NBA|National Basketball Association| |5 |Sports|MLS|Major League Soccer | +---+------+---+-------------------------------+ only showing top 5 rows
Case 2: Read some columns in the Dataframe in PySpark
df_category.select('_c0','_c1').show(5,truncate=False) +---+------+ |_c0|_c1 | +---+------+ |1 |Sports| |2 |Sports| |3 |Sports| |4 |Sports| |5 |Sports| +---+------+ only showing top 5 rows
Case 3: Pass list to Read some columns in the Dataframe in PySpark
You can also pass list of columns to select function in place of writing each column explicitly. This is really helpful when you have few identified columns to populate repeatedly through your ETL pipeline.
column_list = ['_c0','_c1'] df_category.select(column_list).show(5,truncate=False) +---+------+ |_c0|_c1 | +---+------+ |1 |Sports| |2 |Sports| |3 |Sports| |4 |Sports| |5 |Sports| +---+------+ only showing top 5 rows
Case 4: Renaming column names in the Dataframe in PySpark
Now one thing we can further improve in the Dataframe output is the column header. Presently, spark name columns as _c0,_c1 and so on as default values. To give meaningful name to columns, we can pass list with new column names into toDF() function.
column_list=['catid','catgroup','catname','catdesc'] df_category=df_category.toDF(*column_list) df_category.show(truncate=False) +-----+--------+---------+------------------------------------------+ |catid|catgroup|catname |catdesc | +-----+--------+---------+------------------------------------------+ |1 |Sports |MLB |Major League Baseball | |2 |Sports |NHL |National Hockey League | |3 |Sports |NFL |National Football League | |4 |Sports |NBA |National Basketball Association | |5 |Sports |MLS |Major League Soccer | |6 |Shows |Musicals |Musical theatre | |7 |Shows |Plays |All non-musical theatre | |8 |Shows |Opera |All opera and light opera | |9 |Concerts|Pop |All rock and pop music concerts | |10 |Concerts|Jazz |All jazz singers and bands | |11 |Concerts|Classical|All symphony, concerto, and choir concerts| +-----+--------+---------+------------------------------------------+
Case 5: Check column names in Dataframe in PySpark
Many times during ETL pipeline , you may want to dynamically fetch column names in the dataframe to apply some transformation on specific column if it exists. You can check column names in Dataframe by using dataframe.columns command.
df_category.columns ['catid', 'catgroup', 'catname', 'catdesc']
Note: The return type is list so you may traverse the list with for loop if required. Also to check how many columns in the dataframe you can use “len” function to output of above command to get number of columns in the dataframe.
len(df_category.columns) 4
Case 6: Check column name, data type of each column in Dataframe in PySpark
Sometimes you may want to check the column names along with data type for each column then you can use dataframe.dtypes command
df_category.dtypes [('catid', 'string'), ('catgroup', 'string'), ('catname', 'string'), ('catdesc', 'string')]
*Note : return type of this command too is list. So you can traverse it using for loop if required.
Case 7: Print Schema information for Dataframe in PySpark
If you want to visualise dataframe schema in beautiful tree format then you can use dataframe.printSchema() function.
df_category.printSchema() root |-- catid: string (nullable = true) |-- catgroup: string (nullable = true) |-- catname: string (nullable = true) |-- catdesc: string (nullable = true)
You can also use dataframe.schema to get same information is different format. This is especially useful if you want to copy schema from one dataframe and apply to another.
df_category.schema StructType(List(StructField(catid,StringType,true),StructField(catgroup,StringType,true),StructField(catname,StringType,true),StructField(catdesc,StringType,true)))
Case 8: Copy Schema from One Dataframe to another
You can copy schema from one dataframe into object and apply that to another dataframe using schema option.
dfschema=df_category.schema df_category2 = spark.read.schema(dfschema).option("delimiter","|").csv("s3://pyspark.demo/raw/category_pipe.txt") df_category2.printSchema() root |-- catid: string (nullable = true) |-- catgroup: string (nullable = true) |-- catname: string (nullable = true) |-- catdesc: string (nullable = true)
Hope this helps. In the next post , we will see how to how to :
- Fetch unique values from dataframe in PySpark
- Use Filter to select few records from Dataframe in PySpark
- AND
- OR
- LIKE
- IN
- BETWEEN
- NULL
- How to SORT data on basis of one or more columns in ascending or descending order.
hi
Interview question :
how can we remove 2 or mores lines as header while reading file in data frame in pyspark.
sample file :
prod,daily,impress
id,name,country
01,manish,USA
02,jhon,UK
03,willson,Africa
Hi ,
I have below data in abc.csv file
S.NO NAME AGE SEX ADDRESS SAMPLE_SENT_DATE SAMPLE_RESULT_DATE
1 CH.KEERTHI 55 F ZARUGUMALLI 08.08.2020 08.08.2020
2 P SURESH 57 F Zarugumalli 05-08-2020 11.08.2020
3 P HEMASRI 35 MALE Zarugumalli 05-08-2020 11.08.2020
4 CH.DEEPTHI 32 FEMALE Y PALEM 11.08.2020 11.08.2020
5 CH.KARTHIK 24 FEMALE Y PALEM 11.08.2020 11.08.2020
6 D.subbarao 23 M Y PALEM 11.08.2020 11.08.2020
7 iethakshi 40 M Y PALEM 11.08.2020 11.08.2020
8 irajeswari 50 M Y PALEM 11.08.2020 11.08.2020
9 CH.KEERTHI 58 MALE Volepalem 31-07-2020 11.08.2020
10 irajeswari 22 FEMALE Volepalem 30-07-2020 11.08.2020
Here SAMPLE_SENT_DATE and SAMPLE_RESULT_DATE having dates in different formate
How to make SAMPLE_SENT_DATE and SAMPLE_RESULT_DATE to DD-MM-YYYY format
Hi Vivek
Try this.
Also will recommend to keep date in standard “yyyy-MM-dd” format to avoid date format conversion for future date operations.
Also you can just replace “.” with “-” in string data.