Skip to content

SQL & Hadoop

Learn SQL on Hadoop with examples
  • Home
  • SPARK-SQL Dataframe
  • Privacy Policy
  • About

SQL & Hadoop

Learn SQL on Hadoop with examples
  • Home
  • SPARK-SQL Dataframe
  • Privacy Policy
  • About

Recent Discussions

  • vivek on About
  • Ram on Columnar Storage & why you must use it
  • Raj Kumar on Spark Dataframe LIKE NOT LIKE RLIKE
  • Raj on Hive Date Functions – all possible Date operations
  • Bhamini Bhat on Hive Date Functions – all possible Date operations

PySpark Tutorial – Day 1

PySpark is becoming obvious choice for the enterprises when it comes to moving to Spark. As per my understanding , this is primarily for 2 reasons :

  1. The developers are people like me who are expert in SQL but not in programming language like Java, C#. So when it comes to picking one language for Spark, existing capability team is more comfortable working on Python than Scala.
  2. Extension to Data Analytics is Data Science. AI & Machine learning projects are also gaining lot of attention from enterprises now. Python is obvious choice here due to rich library support for Machine Learning projects. The same team members can contribute to Data Engineering, Data Analytics & Data Science and switching across teams is easy if you are comfortable with Python.

In this post, we will see how to learn PySpark from Scratch and this shall especially help people who are good with SQL and now wish to adopt Spark. So I will try to highlight the similarities between SQL and PySpark as well.

We will cover below 5 points in this post:

  1. Check Hadoop/Python/Spark version
  2. Connect to PySpark CLI
  3. Read CSV file into Dataframe and check some/all columns & rows in it.
  4. Check schema and copy schema from one dataframe to another
  5. Basic Metadata info of Dataframe

Check Hadoop/Python/Spark version

Check Hadoop Version:
hadoop version

Hadoop 2.10.1-amzn-0


Check Python Version:
python --version

Python 3.7.9

Check Spark Version:
pyspark --version

Welcome to
  ____              __
 / __/__  ___ _____/ /__
_\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 2.4.7-amzn-0
  /_/

Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_272

Connect to PySpark CLI

If you have pyspark installed and configured correctly, just type “pyspark” and hit enter. This will open pyspark in the terminal and you may see prompt like “SparkSession available as ‘spark'”. So now you don’t have to create a SparkSession explicitly and you can use ‘spark’ directly. We will see in later posts how to create and use SparkSession when running ETL jobs in batch mode.

Read CSV file into a PySpark Dataframe

Next step is to read a csv file into a Dataframe and run some select operation on it. For now, just assume Dataframe to be equivalent to Tables we have in RDBMS.

You can download the data file used in below examples for practice. I have uploaded file into S3 and not everyone may have access to it. Please change the path to point to path where you have placed data file.

df_category = spark.read.csv("s3://pyspark.demo/raw/category_pipe.txt")
df_category.show()
+--------------------+
|                 _c0|
+--------------------+
|1|Sports|MLB|Majo...|
|2|Sports|NHL|Nati...|
|3|Sports|NFL|Nati...|
|4|Sports|NBA|Nati...|
|5|Sports|MLS|Majo...|
|6|Shows|Musicals|...|
|7|Shows|Plays|All...|
|8|Shows|Opera|All...|
|9|Concerts|Pop|Al...|
|10|Concerts|Jazz|...|
|11|Concerts|Class...|
+--------------------+

So we have loaded one file into a dataframe but all the columns are going into a single column. To fix this we have to specify proper delimiter while reading csv file.

df_category = spark.read.option("delimiter","|").csv("s3://pyspark.demo/raw/category_pipe.txt")
df_category.show()
+---+--------+---------+--------------------+
|_c0|     _c1|      _c2|                 _c3|
+---+--------+---------+--------------------+
|  1|  Sports|      MLB|Major League Base...|
|  2|  Sports|      NHL|National Hockey L...|
|  3|  Sports|      NFL|National Football...| 
|  4|  Sports|      NBA|National Basketba...|
|  5|  Sports|      MLS| Major League Soccer|
|  6|   Shows| Musicals|     Musical theatre|
|  7|   Shows|    Plays|All non-musical t...|
|  8|   Shows|    Opera|All opera and lig...|
|  9|Concerts|      Pop|All rock and pop ...|
| 10|Concerts|     Jazz|All jazz singers ...|
| 11|Concerts|Classical|All symphony, con...|
+---+--------+---------+--------------------+

Now the column looks ok however we see some values in last column is truncated. So to view the complete values without truncation in PySpark Dataframe we use “truncate=False” in show function.

df_category.show(truncate=False)
+---+--------+---------+------------------------------------------+
|_c0|_c1     |_c2      |_c3                                       |
+---+--------+---------+------------------------------------------+
|1  |Sports  |MLB      |Major League Baseball                     |
|2  |Sports  |NHL      |National Hockey League                    |
|3  |Sports  |NFL      |National Football League                  |
|4  |Sports  |NBA      |National Basketball Association           |
|5  |Sports  |MLS      |Major League Soccer                       |
|6  |Shows   |Musicals |Musical theatre                           |
|7  |Shows   |Plays    |All non-musical theatre                   |
|8  |Shows   |Opera    |All opera and light opera                 |
|9  |Concerts|Pop      |All rock and pop music concerts           |
|10 |Concerts|Jazz     |All jazz singers and bands                |
|11 |Concerts|Classical|All symphony, concerto, and choir concerts|
+---+--------+---------+------------------------------------------+

To show only N rows from Dataframe in PySpark you can pass the number to show function. Like to show 5 rows, you can pass 5 to show function.

df_category.show(5,truncate=False)
+---+------+---+-------------------------------+
|_c0|_c1   |_c2|_c3                            |
+---+------+---+-------------------------------+
|1  |Sports|MLB|Major League Baseball          |
|2  |Sports|NHL|National Hockey League         |
|3  |Sports|NFL|National Football League       |
|4  |Sports|NBA|National Basketball Association|
|5  |Sports|MLS|Major League Soccer            |
+---+------+---+-------------------------------+
only showing top 5 rows

Case 1: Read all columns in the Dataframe in PySpark

df_category.select("*").show(5,truncate=False)
+---+------+---+-------------------------------+
|_c0|_c1   |_c2|_c3                            |
+---+------+---+-------------------------------+
|1  |Sports|MLB|Major League Baseball          |
|2  |Sports|NHL|National Hockey League         |
|3  |Sports|NFL|National Football League       |
|4  |Sports|NBA|National Basketball Association|
|5  |Sports|MLS|Major League Soccer            |
+---+------+---+-------------------------------+
only showing top 5 rows

Case 2: Read some columns in the Dataframe in PySpark

df_category.select('_c0','_c1').show(5,truncate=False)
+---+------+
|_c0|_c1   |
+---+------+
|1  |Sports|
|2  |Sports|
|3  |Sports|
|4  |Sports|
|5  |Sports|
+---+------+
only showing top 5 rows

Case 3: Pass list to Read some columns in the Dataframe in PySpark

You can also pass list of columns to select function in place of writing each column explicitly. This is really helpful when you have few identified columns to populate repeatedly through your ETL pipeline.

column_list = ['_c0','_c1']
df_category.select(column_list).show(5,truncate=False)
+---+------+
|_c0|_c1   |
+---+------+
|1  |Sports|
|2  |Sports|
|3  |Sports|
|4  |Sports|
|5  |Sports|
+---+------+
only showing top 5 rows

Case 4: Renaming column names in the Dataframe in PySpark

Now one thing we can further improve in the Dataframe output is the column header. Presently, spark name columns as _c0,_c1 and so on as default values. To give meaningful name to columns, we can pass list with new column names into toDF() function.

column_list=['catid','catgroup','catname','catdesc']
df_category=df_category.toDF(*column_list)
df_category.show(truncate=False)

+-----+--------+---------+------------------------------------------+
|catid|catgroup|catname  |catdesc                                   |
+-----+--------+---------+------------------------------------------+
|1    |Sports  |MLB      |Major League Baseball                     |
|2    |Sports  |NHL      |National Hockey League                    |
|3    |Sports  |NFL      |National Football League                  |
|4    |Sports  |NBA      |National Basketball Association           |
|5    |Sports  |MLS      |Major League Soccer                       |
|6    |Shows   |Musicals |Musical theatre                           |
|7    |Shows   |Plays    |All non-musical theatre                   |
|8    |Shows   |Opera    |All opera and light opera                 |
|9    |Concerts|Pop      |All rock and pop music concerts           |
|10   |Concerts|Jazz     |All jazz singers and bands                |
|11   |Concerts|Classical|All symphony, concerto, and choir concerts|
+-----+--------+---------+------------------------------------------+

Case 5: Check column names in Dataframe in PySpark

Many times during ETL pipeline , you may want to dynamically fetch column names in the dataframe to apply some transformation on specific column if it exists. You can check column names in Dataframe by using dataframe.columns command.

df_category.columns 
['catid', 'catgroup', 'catname', 'catdesc']

Note: The return type is list so you may traverse the list with for loop if required. Also to check how many columns in the dataframe you can use “len” function to output of above command to get number of columns in the dataframe.

len(df_category.columns)
4

Case 6: Check column name, data type of each column in Dataframe in PySpark

Sometimes you may want to check the column names along with data type for each column then you can use dataframe.dtypes command

df_category.dtypes 
[('catid', 'string'), ('catgroup', 'string'), ('catname', 'string'), ('catdesc', 'string')]

*Note : return type of this command too is list. So you can traverse it using for loop if required.

Case 7: Print Schema information for Dataframe in PySpark

If you want to visualise dataframe schema in beautiful tree format then you can use dataframe.printSchema() function.

df_category.printSchema()
root
 |-- catid: string (nullable = true)
 |-- catgroup: string (nullable = true)
 |-- catname: string (nullable = true)
 |-- catdesc: string (nullable = true)

You can also use dataframe.schema to get same information is different format. This is especially useful if you want to copy schema from one dataframe and apply to another.

df_category.schema
StructType(List(StructField(catid,StringType,true),StructField(catgroup,StringType,true),StructField(catname,StringType,true),StructField(catdesc,StringType,true)))

Case 8: Copy Schema from One Dataframe to another

You can copy schema from one dataframe into object and apply that to another dataframe using schema option.

dfschema=df_category.schema
df_category2 = spark.read.schema(dfschema).option("delimiter","|").csv("s3://pyspark.demo/raw/category_pipe.txt")

df_category2.printSchema()
root
 |-- catid: string (nullable = true)
 |-- catgroup: string (nullable = true)
 |-- catname: string (nullable = true)
 |-- catdesc: string (nullable = true)


Hope this helps. In the next post , we will see how to how to :

  • Fetch unique values from dataframe in PySpark
  • Use Filter to select few records from Dataframe in PySpark
    • AND 
    • OR
    • LIKE
    • IN
    • BETWEEN
    • NULL
  • How to SORT data on basis of one or more columns in ascending or descending order.
Tags:PySpark CLIPySpark metadataPySpark schemaPySpark versionRead CSV file PySpark

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

previousPySpark-How to Generate MD5 of entire row with columns
nextPySpark Tutorial – Day 2

Subscribe to Blog via Email

Enter your email address to subscribe to this blog and receive notifications of new posts by email.

Join 3 other subscribers

Topics

  • Apache HIVE
  • Apache Spark
  • PySpark
  • SQL on Hadoop

Recent Posts

  • PySpark Tutorial – Day 2
  • PySpark Tutorial – Day 1
  • PySpark-How to Generate MD5 of entire row with columns
  • EMR – No space left on device [Solved]
  • Spark single application consumes all resources – Good or Bad for your cluster ?

Trending Posts

  • Hive Date Functions - all possible Date operations
  • Spark Dataframe WHERE Filter
  • Hive - BETWEEN
  • PySpark - zipWithIndex Example
  • How to Subtract TIMESTAMP-DATE-TIME in HIVE
© 2021 sqlandhadoop.com