PySpark Read Write Parquet Files

In this post, we will see how you can read parquet files using pyspark and will also see common options and challenges which you must consider while reading or writing parquet files.

What is Parquet File Format ?

Parquet is a columnar file format and is becoming very popular because of the optimisations it brings to spark data analytics applications. Parquet stores data in columnar format i.e. it stores data column-wise and not row-wise which happens in traditional formats like CSV.

Advantage of Parquet

  • Parquet file format is highly compressed so file size is less. This helps in less I/O during transfer of files over the network.
  • Smaller file size also means less cost incurred due to less storage space.
  • Columnar format helps in reading only the required fields from row rather than reading entire row. This further speeds up the read operation and more rows can be accommodated in single block.
  • Parquet files also stores the metadata information of the file data. This make sure that while reading data proper data type is assigned to each column.

Disadvantage of Parquet

  • Not all applications support reading/writing parquet files , however spark does.
  • Parquet has 2 formats – legacy and non-legacy. Many downstream applications may not be able to read non-legacy parquet format. So if you have parquet files in non-legacy format you may want to convert it to legacy for such cases.
  • You cannot merge and read legacy and non-legacy parquet file format together. Either all file parts should be legacy or non-legacy formats.
  • Parquet supports partial schema evolution and also if existing column datatype is changed over time then reading file parts with different datatype for same column will result in error.
  • You cannot explicitly cast parquet files columns into other datatypes. Like you cannot store integer datatype into string while reading it.
  • Debugging parquet file is a challenge. Unlike CSV files you cannot read the content directly. So if you encounter parquet file issues it is difficult to debug data issues in the files.

PySpark Read Parquet file

You can read parquet file from multiple sources like S3 or HDFS. To read parquet file just pass the location of parquet file to spark.read.parquet along with other options.

In this example we will read parquet file from S3 location. This is open dataset shared by amazon.

[hadoop@ip-10-0-0-87 ~]$ aws s3 ls s3://amazon-reviews-pds/parquet/product_category=Books/ --s --h
2018-04-09 06:35:58    1.0 GiB part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 06:35:59    1.0 GiB part-00001-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 06:36:00    1.0 GiB part-00002-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 06:36:00    1.0 GiB part-00003-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 06:36:00    1.0 GiB part-00004-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 06:36:33    1.0 GiB part-00005-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 06:36:35    1.0 GiB part-00006-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 06:36:35    1.0 GiB part-00007-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 06:36:35    1.0 GiB part-00008-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 06:36:35    1.0 GiB part-00009-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet

PySpark Read Parquet File from S3

You can read single parquet file from S3 by specifying the absolute path while reading parquet files.

>>> df_books = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Books/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet")
>>> df_books.count()
2074682
>>> df_books.printSchema()
root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)

PySpark Read Parquet Folder from S3

You can read all the parquet files in a folder from S3 by specifying the path to the prefix which has all the parquet file parts in it.

>>> df_books = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Books/")
>>> df_books.count()
20726160
>>> df_books.printSchema()
root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)

PySpark Read multiple Parquet Files from S3

You can specify multiple file parts also by passing comma separated absolute path to read multiple files together. In the below example we have read 4 different file parts by passing paths to individual files.

>>> df_books = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Books/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet","s3://amazon-reviews-pds/parquet/product_category=Books/part-00001-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet","s3://amazon-reviews-pds/parquet/product_category=Books/part-00002-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet","s3://amazon-reviews-pds/parquet/product_category=Books/part-00003-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet")

>>> df_books.count()
8290353

>>> df_books.printSchema()
root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)

PySpark Write Parquet Files

You can write dataframe into one or more parquet file parts. By default, it is snappy compressed. In the below examples we will see multiple write options while writing parquet files using pyspark.

Case 1: Spark write Parquet file into HDFS

>>> df_books = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Books/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet")
>>>
>>> df_books.count()
2074682
>>>
>>> df_books.write.mode("append").parquet("hdfs:///data/books/case1/")

You can also use "overwrite" in place of "append" while writing data into target location. Let's look at the hdfs location to see parquet file parts.

[hadoop@ip-10-0-1-223 ~]$ hdfs dfs -ls -h hdfs:///data/books/case1/
Found 10 items
-rw-r--r--   1 hadoop hadoop          0 2021-05-13 08:25 hdfs:///data/books/case1/_SUCCESS
-rw-r--r--   1 hadoop hadoop    128.1 M 2021-05-13 08:25 hdfs:///data/books/case1/part-00000-7bf4d4ec-ff90-4d8f-ba9a-92625dc3b676-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    126.8 M 2021-05-13 08:25 hdfs:///data/books/case1/part-00001-7bf4d4ec-ff90-4d8f-ba9a-92625dc3b676-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    128.0 M 2021-05-13 08:25 hdfs:///data/books/case1/part-00002-7bf4d4ec-ff90-4d8f-ba9a-92625dc3b676-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    128.1 M 2021-05-13 08:25 hdfs:///data/books/case1/part-00003-7bf4d4ec-ff90-4d8f-ba9a-92625dc3b676-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    128.1 M 2021-05-13 08:25 hdfs:///data/books/case1/part-00004-7bf4d4ec-ff90-4d8f-ba9a-92625dc3b676-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    128.1 M 2021-05-13 08:25 hdfs:///data/books/case1/part-00005-7bf4d4ec-ff90-4d8f-ba9a-92625dc3b676-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    129.0 M 2021-05-13 08:25 hdfs:///data/books/case1/part-00006-7bf4d4ec-ff90-4d8f-ba9a-92625dc3b676-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    128.0 M 2021-05-13 08:25 hdfs:///data/books/case1/part-00007-7bf4d4ec-ff90-4d8f-ba9a-92625dc3b676-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     35.5 M 2021-05-13 08:25 hdfs:///data/books/case1/part-00008-7bf4d4ec-ff90-4d8f-ba9a-92625dc3b676-c000.snappy.parquet

You can observe that the input file size is ~1 GB and since the spark write parquet block-size is 128 MB it has split input file into 9 file parts. The last file part is less than 128 MB while others are roughly the same size.

You can also confirm by checking the number of partitions created by default while reading this parquet file.

>>> print(df_books.rdd.getNumPartitions())
9

Case 2: Spark write parquet file into hdfs in legacy format

We will check first the value of legacy format option before extracting the parquet file. We will set it to true and will run same extract again.

>>> spark.sql("set spark.sql.parquet.writeLegacyFormat").show(truncate=False)
+-----------------------------------+-----+
|key                                |value|
+-----------------------------------+-----+
|spark.sql.parquet.writeLegacyFormat|false|
+-----------------------------------+-----+

>>> spark.sql("set spark.sql.parquet.writeLegacyFormat=true").show(truncate=False)
+-----------------------------------+-----+
|key                                |value|
+-----------------------------------+-----+
|spark.sql.parquet.writeLegacyFormat|true |
+-----------------------------------+-----+

>>> df_books = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Books/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet")
>>>
>>> df_books.count()
2074682
>>>
>>> df_books.write.mode("append").parquet("hdfs:///data/books/case2/")

Case 3: Spark write parquet file partition by column

You can also specify the partition column while writing data into HDFS. The partition optimises the read operation on parquet data via partition pruning. It speeds up the reading operation significantly if properly planned. Generally the parquet files are partitioned by date columns but it depends on your use-case.

>>> df_books.write.partitionBy("review_date").mode("append").parquet("hdfs:///data/books/case3/")
[hadoop@ip-10-0-1-223 ~]$ hdfs dfs -ls -h hdfs:///data/books/case3/|head
Found 7000 items
-rw-r--r--   1 hadoop hadoop          0 2021-05-13 08:48 hdfs:///data/books/case3/_SUCCESS
drwxr-xr-x   - hadoop hadoop          0 2021-05-13 08:45 hdfs:///data/books/case3/review_date=1995-07-22
drwxr-xr-x   - hadoop hadoop          0 2021-05-13 08:45 hdfs:///data/books/case3/review_date=1995-07-24
drwxr-xr-x   - hadoop hadoop          0 2021-05-13 08:45 hdfs:///data/books/case3/review_date=1995-08-31
drwxr-xr-x   - hadoop hadoop          0 2021-05-13 08:45 hdfs:///data/books/case3/review_date=1995-10-21
drwxr-xr-x   - hadoop hadoop          0 2021-05-13 08:45 hdfs:///data/books/case3/review_date=1995-10-24
drwxr-xr-x   - hadoop hadoop          0 2021-05-13 08:45 hdfs:///data/books/case3/review_date=1995-11-13
drwxr-xr-x   - hadoop hadoop          0 2021-05-13 08:45 hdfs:///data/books/case3/review_date=1995-11-15
drwxr-xr-x   - hadoop hadoop          0 2021-05-13 08:45 hdfs:///data/books/case3/review_date=1995-11-19

[hadoop@ip-10-0-1-223 ~]$ hdfs dfs -ls -h hdfs:///data/books/case3//review_date=1995-07-22/
Found 1 items
-rw-r--r--   1 hadoop hadoop      5.1 K 2021-05-13 08:45 hdfs:///data/books/case3/review_date=1995-07-22/part-00002-5662d664-8ff5-42d0-8fa5-d4e81fe2def7.c000.snappy.parquet

Notice the review_date column used as directory path. The data is distributed based on review_date column value and for each unique value a sub-directory is created.

Case 4: Spark write parquet file using coalesce

Spark Coalesce is used to merge multiple partitions into one or more as specified in the input parameter to coalesce function. Coalesce avoids shuffling of data. With COALESCE you cannot create more file parts than the default value. This means in this case even if I give coalesce value more than 9, spark won't generate file parts more than 9. Coalesce merges existing logical distributed partitions to create lesser or equal number of partitions. You cannot create more partitions using Coalesce.

>>> df_books.coalesce(2).write.mode("append").parquet("hdfs:///data/books/case4/")
[hadoop@ip-10-0-1-223 ~]$ hdfs dfs -ls -h hdfs:///data/books/case4/
Found 3 items
-rw-r--r--   1 hadoop hadoop          0 2021-05-13 08:54 hdfs:///data/books/case4/_SUCCESS
-rw-r--r--   1 hadoop hadoop    548.6 M 2021-05-13 08:54 hdfs:///data/books/case4/part-00000-a155aa53-2671-4099-af08-8b29c4ec3443-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    512.1 M 2021-05-13 08:54 hdfs:///data/books/case4/part-00001-a155aa53-2671-4099-af08-8b29c4ec3443-c000.snappy.parquet

In the above example, you can see the output file parts are just 2 and not 9 which was created by default as we saw in case-1 above. Let's see what happens when we give more value than 9 in coalesce function in spark.

>>> df_books.coalesce(15).write.mode("append").parquet("hdfs:///data/books/case4_2/")
[hadoop@ip-10-0-1-223 ~]$ hdfs dfs -ls -h hdfs:///data/books/case4_2/
Found 10 items
-rw-r--r--   1 hadoop hadoop          0 2021-05-13 09:00 hdfs:///data/books/case4_2/_SUCCESS
-rw-r--r--   1 hadoop hadoop    128.1 M 2021-05-13 09:00 hdfs:///data/books/case4_2/part-00000-b960c5b3-c682-4a71-8210-d3c9a80ef885-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    128.1 M 2021-05-13 09:00 hdfs:///data/books/case4_2/part-00001-b960c5b3-c682-4a71-8210-d3c9a80ef885-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    128.0 M 2021-05-13 09:00 hdfs:///data/books/case4_2/part-00002-b960c5b3-c682-4a71-8210-d3c9a80ef885-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    126.8 M 2021-05-13 09:00 hdfs:///data/books/case4_2/part-00003-b960c5b3-c682-4a71-8210-d3c9a80ef885-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    129.0 M 2021-05-13 09:00 hdfs:///data/books/case4_2/part-00004-b960c5b3-c682-4a71-8210-d3c9a80ef885-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    128.0 M 2021-05-13 09:00 hdfs:///data/books/case4_2/part-00005-b960c5b3-c682-4a71-8210-d3c9a80ef885-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    128.1 M 2021-05-13 09:00 hdfs:///data/books/case4_2/part-00006-b960c5b3-c682-4a71-8210-d3c9a80ef885-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop    128.1 M 2021-05-13 09:00 hdfs:///data/books/case4_2/part-00007-b960c5b3-c682-4a71-8210-d3c9a80ef885-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     35.5 M 2021-05-13 09:00 hdfs:///data/books/case4_2/part-00008-b960c5b3-c682-4a71-8210-d3c9a80ef885-c000.snappy.parquet

So you can see that even when we gave value as 15 , the output has only 9 file parts.

Case 5: Spark write parquet file using repartition

Spark repartition is used to create number of partitions as requested by the user. Spark Repartition does data shuffling so it can take some time and resources to generate as many file parts as mentioned in the parameter passed to repartition function. With repartition you can increase as well as decrease number of output file parts which is not possible with coalesce.

>>> df_books.repartition(15).write.mode("append").parquet("hdfs:///data/books/case5/")

You can verify below that the output has 15 file parts in place of default 9 as we saw in case-1.

[hadoop@ip-10-0-1-223 ~]$ hdfs dfs -ls -h hdfs:///data/books/case5/
Found 16 items
-rw-r--r--   1 hadoop hadoop          0 2021-05-13 09:04 hdfs:///data/books/case5/_SUCCESS
-rw-r--r--   1 hadoop hadoop     70.6 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00000-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     70.8 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00001-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     70.5 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00002-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     70.6 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00003-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     70.5 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00004-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     70.4 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00005-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     70.4 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00006-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     70.6 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00007-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     70.2 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00008-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     70.7 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00009-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     70.5 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00010-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     70.7 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00011-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     70.5 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00012-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     70.3 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00013-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     70.3 M 2021-05-13 09:04 hdfs:///data/books/case5/part-00014-dbcebc2a-c44f-40df-8a8e-caa2cf5e4b0e-c000.snappy.parquet

Case 6: Spark write parquet as one file

You can use coalesce or repartition both to generate output parquet file as single file part.

>>df_books.coalesce(1).write.mode("append").parquet("hdfs:///data/books/case6/")
[hadoop@ip-10-0-1-223 ~]$ hdfs dfs -ls -h hdfs:///data/books/case6/
Found 2 items
-rw-r--r--   1 hadoop hadoop          0 2021-05-13 09:11 hdfs:///data/books/case6/_SUCCESS
-rw-r--r--   1 hadoop hadoop      1.0 G 2021-05-13 09:11 hdfs:///data/books/case6/part-00000-37117530-65e6-42a9-9180-903ad5ee25c9-c000.snappy.parquet

In the next post, we will see Parquet file Schema Evolution and Schema Merge.

Leave a Reply

Your email address will not be published. Required fields are marked *