In this post, we will see how you can run Spark application on existing EMR cluster using Apache Airflow. The most basic way of scheduling jobs in EMR is CRONTAB. But if you have worked with crontab you know how much pain it is to manage and secure it. I will not talk in depth about Airflow or EMR in this post as I feel already the web has some good content on it. However I would like to clearly show you that how you can trigger pyspark application on existing EMR using Airflow.
Step 1: Spark Application
The PySpark code mentioned below is reading some data from S3 path and writing to local HDFS path. It also displays 10 records from the S3 path & HDFS path.
# import modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import sys,logging
from datetime import datetime
# Logging configuration
formatter = logging.Formatter('[%(asctime)s] %(levelname)s @ line %(lineno)d: %(message)s')
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(handler)
# current time variable to be used for logging purpose
dt_string = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
# change it to your app name
AppName = "SparkDummyApp"
# adding dummy function. change or remove it.
def some_function1():
    logger.info("Inside some_function 1")
# adding dummy function. change or remove it.
def some_function2():
    logger.info("Inside some_function 2")
def main():
    # start spark code
    spark = SparkSession.builder.appName(AppName+"_"+str(dt_string)).getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    logger.info("Starting spark application")
    #calling function 1
    some_function1()
    #calling function 2
    some_function2()
    #reading and writing some dummy data
    logger.info("Reading Parquet File")
    df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Shoes/")
    logger.info("Previewing Parquet File Data")
    df.select("marketplace","customer_id","review_id").show(10,truncate=False)
    logger.info("Writing Parquet File Data")
    df.limit(100).write.mode("overwrite").parquet("hdfs:///var/data/shoes/")
    logger.info("Writing Parquet File Data - completed")
    df = spark.read.parquet("hdfs:///var/data/shoes/")
    df.select("marketplace","customer_id","review_id").show(10,truncate=False)
    logger.info("Ending spark application")
    # end spark code
    spark.stop()
    return None
# Starting point for PySpark
if __name__ == '__main__':
    main()
    sys.exit()Save the file as “spark_app.py”. The very basic command to run this pyspark file with default values should be
spark-submit spark_app.py
Step 2: Create Airflow DAG to call EMR Step
We will use EMR operators to add steps into existing EMR.
I have used cluster_id airflow variable in the code. You can create it or else if you are just testing airflow then you can replace it with hardcoded value. This should be “cluster id” of your EMR cluster i.e. ID mentioned in summary tab. Also I have set scheduled_interval to NONE because I want to run this job manually or on adhoc basis. You can change interval as per your requirement.
Also in the SPARK_TASK , make necessary changes as per your requirement. Change the filename , add or edit parameters as you desire.
# import modules/libraries
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.utils.dates import days_ago
from airflow.models import Variable
from datetime import timedelta
import os
# setting up default args
DEFAULT_ARGS = {
    'owner': 'sqlhadoop',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
}
# add & edit the args as per your requirement. Change the pyspark file name.
SPARK_TASK = [
    {
        'Name': 'spark_app',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ["spark-submit", "--deploy-mode", "client", "/home/hadoop/pyspark/spark_app.py"],
        },
    }
]
# set the variables.
DAG_ID = os.path.basename(__file__).replace(".py", "")
cluster_id  = Variable.get("CLUSTER_ID")
with DAG(
    dag_id=DAG_ID,
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    start_date=days_ago(1),
    schedule_interval=None,
    tags=['pyspark'],
) as dag:
    # first step to register EMR step
    step_first = EmrAddStepsOperator(
        task_id='add_emr_step',
        job_flow_id=cluster_id,
        aws_conn_id='aws_default',
        steps=SPARK_TASK,
    )
    # second step to keep track of previous step.
    step_second = EmrStepSensor(
        task_id='watch_emr_step',
        job_flow_id=cluster_id,
        step_id="{{ task_instance.xcom_pull(task_ids='add_emr_step', key='return_value')[0] }}",
        aws_conn_id='aws_default',
    )
    # call the EMR steps.
    step_first >> step_second Save the file as spark_app_dag.py and put it in the DAG folder of your airflow. Refresh the Airflow UI and you shall see new DAG corresponding to this file.

Enable the DAG and trigger it to execute it. This should create new STEP in EMR which you can verify on EMR console as well.

If the step fails then most likely it is due to permission errors. Make sure that the role AIRFLOW is using must have privileges to add steps in EMR.
You can attach custom policy to airflow role to provide required privileges.
Step 3: Verify Spark Logs
You can very easily check the log files for step execution in EMR. Get the step-id from the airflow log or EMR console and run the command mentioned below:
cat /mnt/var/log/hadoop/steps/STEPID/stdout
*replace STEPID with your EMR Step ID.
> cat /mnt/var/log/hadoop/steps/s-2Z77NQGX6348N/stdout [2021-06-10 09:49:59,547] INFO @ line 33: Starting spark application [2021-06-10 09:49:59,547] INFO @ line 23: Inside some_function 1 [2021-06-10 09:49:59,547] INFO @ line 27: Inside some_function 2 [2021-06-10 09:49:59,547] INFO @ line 42: Reading Parquet File [2021-06-10 09:50:03,778] INFO @ line 44: Previewing Parquet File Data +-----------+-----------+--------------+ |marketplace|customer_id|review_id | +-----------+-----------+--------------+ |US |12093441 |R7M6D2QDT449X | |US |36076731 |R2ZT8HS7YD5C72| |US |28401579 |R2BDJHQ8CJLI42| |US |50221708 |R1GKLWAOQQSFSU| |US |48894478 |R24MJI3RX5GQUR| |US |11623697 |R3F3ZYUSI5E0IG| |US |42749207 |R3R7TP5T52I1MS| |US |34015308 |RUZVAXEAYIYXI | |US |43599735 |RQR8NHGH1YPR2 | |US |50636206 |RCTIBCLIL6NFI | +-----------+-----------+--------------+ only showing top 10 rows [2021-06-10 09:50:06,182] INFO @ line 46: Writing Parquet File Data [2021-06-10 09:50:15,858] INFO @ line 48: Writing Parquet File Data - completed +-----------+-----------+--------------+ |marketplace|customer_id|review_id | +-----------+-----------+--------------+ |US |12093441 |R7M6D2QDT449X | |US |36076731 |R2ZT8HS7YD5C72| |US |28401579 |R2BDJHQ8CJLI42| |US |50221708 |R1GKLWAOQQSFSU| |US |48894478 |R24MJI3RX5GQUR| |US |11623697 |R3F3ZYUSI5E0IG| |US |42749207 |R3R7TP5T52I1MS| |US |34015308 |RUZVAXEAYIYXI | |US |43599735 |RQR8NHGH1YPR2 | |US |50636206 |RCTIBCLIL6NFI | +-----------+-----------+--------------+ only showing top 10 rows [2021-06-10 09:50:16,158] INFO @ line 51: Ending spark application
Why I prefer Airflow ?
There are many options which you can use to schedule jobs , from crontab to controlM to other schedulers. I have recently started using airflow and I think I am going to continue to use it for some time now. Since most of the standard features of any schedulers are available in Airflow I would like to share few key reasons why I prefer airflow:
I am a Data Engineer (who is comfortable in Python) and not a member of code deployment team. So for me using a scalable orchestration framework with minimum effort is highly desirable. Airflow gives me this freedom where I can quickly create DAGs and schedule jobs as per requirement.
Automation to create DAGs is very straightforward. Like if I have to run similar spark applications for different data sources, I can automate DAG creation and execution process with few lines of code.
Since airflow is all Python, I can use it to handle few standard python operations as well rather than completely depending on my codebase to handle it.
More and more enterprises are contributing to airflow operators. So integrating with other applications is very quick especially when an operator is directly available to use. Like in this case, I used EMR operators for airflow there by reducing coding efforts drastically.
I hope this tutorial will help you in adopting airflow easily. Let me know your feedback in the comments below.
