In the last post, we discussed about basic operations on RDD in PySpark. In this post, we will see other common operations one can perform on RDD in PySpark. Let’s quickly see the syntax and examples for various RDD operations: Read a file into RDD Convert record into LIST of elements Remove the header data Check the count of records in RDD Check the first element in RDD Check the partitions for RDD Use custom function in RDD operations Apply custom function to RDD and see the result: If you want to convert all the columns to UPPER case, create another function which accepts LISTRead More →

A Resilient Distributed Dataset (RDD) is the basic abstraction in Spark. In other words, we can say it is the most common structure that holds data in Spark. RDD is distributed, immutable , fault tolerant, optimized for in-memory computation. Let’s see some basic example of RDD in pyspark. Load file into RDD. Path should be HDFS path and not local. Check count of records in RDD: Check sample records from RDD: Traverse each record in RDD. You cannot directly iterate through records in RDD. To bring all the records to DRIVER you can use collect() action. Apply Map function to convert all columns to upperRead More →

In PySpark, you can do almost all the date operations you can think of using in-built functions. Let’s quickly jump to example and see it one by one. Create a dataframe with sample date values: >>>df_1 = spark.createDataFrame([(‘2019-02-20′,’2019-10-18’,)],[‘start_dt’,’end_dt’]) Check dataframe info >>> df_1 DataFrame[start_dt: string, end_dt: string] Now the problem I see here is that columns start_dt & end_dt are of type string and not date. So let’s quickly convert it into date. >>> df_2 = df_1.select(df_1.start_dt.cast(‘date’),df_1.end_dt.cast(‘date’)) >>> df_2 DataFrame[start_dt: date, end_dt: date] Now we are good. We have a dataframe with 2 columns start_dt & end_dt. Both the columns are of datatype ‘date’. Let’sRead More →

Recently I was working on a project to convert Teradata BTEQ to PySpark code. Since it was mostly SQL queries, we were asked to typically transform into Spark SQL and run it using PySpark. We used sqlContext mostly for SQL queries however in Teradata you can have some constructs like ACITIVTYCOUNT which can help in deciding if you want to run subsequent queries or not. These conditional constructs cannot be directly converted to equivalent Spark SQL. So in pyspark , we converted a dataframe to array and then check array index value to implement IF condition. Be very careful while converting dataframe to array. ItRead More →

One of the most common operation in any DATA Analytics environment is to generate sequences. There are multiple ways of generating SEQUENCE numbers however I find zipWithIndex as the best one in terms of simplicity and performance combined. Especially when requirement is to generate consecutive numbers without any gap. Below is the detailed code which shall help in generating surrogate keys/natural keys/sequence numbers. Step 1: Create a dataframe with all the required columns from the table. df_0= sqlContext.sql(“select pres_name,pres_dob,pres_bp,pres_bs,pres_in,pres_out from usa_prez”) df_0.printSchema() df_0.printSchema() root |– pres_name: string (nullable = true) |– pres_dob: date (nullable = true) |– pres_bp: string (nullable = true) |– pres_bs: stringRead More →