Spark Transformations And Actions Cheat Sheet



(Part 1) Cluster Mode

PYSPARK RDD CHEAT SHEET Learn PySpark at www.edureka.co $./sbin/start-all.sh $ spark-shell from pyspark import SparkContext sc = SparkContext(master = 'local2') PySpark RDD Initialization Resilient Distributed Datasets (RDDs) are a distributed memory abstraction that helps a.

  • Cheat sheet for Spark Dataframes (using Python). GitHub Gist: instantly share code, notes, and snippets.
  • Actions (which return a value to the driver program after running a computation on the dataset) 3.20 reduce(f: (T, T) = T): T Aggregate the elements of the dataset using a function func (which takes two arguments and returns one).
  • Two types of Apache Spark RDD operations are- Transformations and Actions. A Transformation is a function that produces new RDD from the existing RDDs but when we want to work with the actual dataset, at that point Action is performed. When the action is triggered after the result, new RDD is not formed like transformation.

This post covers cluster mode specific settings, for client mode specific settings, see Part 2.

The Problem

One morning, while doing some back-of-an-envelope calculations, I discovered that we could lower our AWS costs by using clusters of fewer, powerful machines.

More cores, more memory, lower costs – it’s not every day a win/win/win comes along.

As you might expect, there was a catch. We had been using the AWS maximizeResourceAllocation setting to automatically set the size of our Spark executors and driver.

maximizeResourceAllocation allocates an entire node and its resources for the Spark driver. This worked well for us before. Our previous cluster of 10 nodes had been divided into 9 executors and 1 driver. 90% of our resources were processing data while 10% were dedicated to the various housekeeping tasks the driver performs.

However, allocating an entire node to the driver with our new cluster design wasted resources egregiously. A full 33% of our resources were devoted to the driver, leaving only 67% for processing data. Needless to say, our driver was significantly over-allocated.

Clearly, maximizeResourceAllocation wasn’t going to work for our new cluster. We were going to have to roll up our sleeves and manually configure our Spark jobs. Like any developer, I consulted the sacred texts (Google, Stack Overflow, Spark Docs). Helpful information abounded, but most of it was overly general. I had difficulty finding definite answers as to what settings I should choose.

The Solution

While calculating the specifics for our setup, I knew that the cluster specs might change again in the future. I wanted to build a spreadsheet that would make this process less painful. With a generous amount of guidance gleaned from this Cloudera blogpost, How to Tune Your Apache Spark Jobs Part 2, I built the following spreadsheet:

If you would like an easy way to calculate the optimal settings for your Spark cluster, download the spreadsheet from the link above. Below, I’ve listed the fields in the spreadsheet and detail the way in which each is intended to be used.

A couple of quick caveats:

  • The generated configs are optimized for running Spark jobs in cluster deploy-mode
  • The generated configs result in the driver being allocated as many resources as a single executor.
Sheet

Configurable Fields

The fields shown above are configurable. The green-shaded fields should be changed to match your cluster’s specs.It is not recommended that you change the yellow-shaded fields, but some use-cases might require customization. More information about the default, recommended values for the yellow-shaded fields can be found in the Cloudera post.

Number of Nodes

The number of worker machines in your cluster. This can be as low as one machine.

Cheat
Memory Per Node (GB)

The amount of RAM per node that is available for Spark’s use. If using Yarn, this will be the amount of RAM per machine managed by Yarn Resource Manager.

Cores Per Node

The number of cores per node that are available for Spark’s use. If using Yarn, this will be the number of cores per machine managed by Yarn Resource Manager.

Memory Overhead Coefficient
Recommended value: .1

The percentage of memory in each executor that will be reserved for spark.yarn.executor.memoryOverhead.

Executor Memory Upper Bound (GB)
Recommended value: 64

The upper bound for executor memory. Each executor runs on its own JVM. Upwards of 64GB of memory and garbage collection issues can cause slowness.

Executor Core Upper Bound
Recommended value: 5

The upper bound for cores per executor. More than 5 cores per executor can degrade HDFS I/O throughput.I believe this value can safely be increased if writing to a web-based “file system” such as S3, but significant increases to this limit are not recommended.

OS Reserved Cores
Recommended value: 1

Cores per machine to reserve for OS processes. Should be zero if only a percentage of the machine’s cores were made available to Spark (i.e. entered in the Cores Per Node field above).

OS Reserved Memory (GB)
Recommended value: 1

The amount of RAM per machine to reserve for OS processes. Should be zero if only a percentage of the machine’s RAM was made available to Spark (i.e. entered in the Memory Per Node field above).

Parallelism Per Core
Recommended value: 2

The level of parallelism per allocated core. This field is used to determine the spark.default.parallelism setting. Generally recommended setting for this value is double the number of cores.

Note: Cores Per Node and Memory Per Node could also be used to optimize Spark for local mode. If your local machine has 8 cores and 16 GB of RAM and you want to allocate 75% of your resources to running a Spark job, setting Cores Per Node and Memory Per Node to 6 and 12 respectively will give you optimal settings. You would also want to zero out the OS Reserved settings. If Spark is limited to using only a portion of your system, there is no need to set aside resources specifically for the OS.

Reference Table

Once the configurable fields on the left-hand side of the spreadsheet have been set to the desired values, the resultant cluster configuration will be reflected in the reference table.

There is some degree of subjectivity in selecting the Executors Per Node setting that will work best for your use case, so I elected to use a reference table rather than selecting the number automatically.

A good rule of thumb for selecting the optimal number of Executors Per Node would be to select the setting that minimizes Unused Memory Per Node and Unused Cores Per Node while keeping Total Memory Per Executor below the Executor Memory Upper Bound and Core Per Executor below the Executor Core Upper Bound.

For example, take the reference table shown above:

  • Executors Per Node: 1
    • Unused Memory Per Node: 0
    • Unused Cores Per Node: 0
    • Warning: Total Memory Per Executor exceeds the Executor Memory Upper Bound
    • Warning: Cores Per Executor exceeds Executor Core Upper Bound
    • (That row has been greyed out since it has exceeded one of the upper bounds)
  • Executors Per Node: 5
    • Unused Memory Per Node: 0
    • Unused Cores Per Node: 1
    • Warning: Cores Per Executor exceeds the Executor Core Upper Bound.
  • Executors Per Node: 6
    • Unused Memory Per Node: 1
    • Unused Core Per Node: 1
    • Total Memory Per Executor and Cores Per Executor are both below their respective upper bounds.
  • Executors Per Node: All others
    • Either exceed the Executor Memory Upper Bound, exceed the Executor Cores Upper Bound, or waste more resources than Executors Per Node = 6

Executors Per Node = 6 is the optimal setting.

Spark Configs

Now that we have selected an optimal number of Executors Per Node, we are ready to generate the Spark configs with which we will run our job. We enter the optimal number of executors in the Selected Executors Per Node field. The correct settings will be generated automatically.

spark.executor.instances
(Number of Nodes * Selected Executors Per Node) - 1

This is the number of total executors in your cluster. We subtract one to account for the driver. The driver will consume as many resources as we are allocating to an individual executor on one, and only one, of our nodes.

spark.yarn.executor.memoryOverhead
Equal to Overhead Memory Per Executor

The memory to be allocated for the memoryOverhead of each executor, in MB. Calculated from the values from the row in the reference table that corresponds to our Selected Executors Per Node.

spark.executor.memory
Equal to Memory Per Executor

The memory to be allocated for each executor. Calculated from the values from the row in the reference table that corresponds to our Selected Executors Per Node.

spark.yarn.driver.memoryOverhead
Equal to spark.yarn.executor.memoryOverhead

The memory to be allocated for the memoryOverhead of the driver, in MB.

spark.driver.memory
Equal to spark.executor.memory

The memory to be allocated for the driver.

spark.executor.cores
Equal to Cores Per Executor

The number of cores allocated for each executor. Calculated from the values from the row in the reference table that corresponds to our Selected Executors Per Node.

spark.driver.cores
Equal to spark.executor.cores

The number of cores allocated for the driver.

spark.default.parallelism
spark.executor.instances * spark.executor.cores * Parallelism Per Core

Default parallelism for Spark RDDs, Dataframes, etc.

Using Configs

Now that we have the proper numbers for our configs, using them is fairly simple. Below, I’ve demonstrated 3 different ways the configs might be used:

Add to spark-defaults.conf

Note: Will be used for submitted jobs unless overwritten by spark-submit args

Pass as software settings to an AWS EMR Cluster

Note: Will be added to spark-defaults.conf

Pass as args with spark-submit

This article is a part of my '100 data engineering tutorials in 100 days' challenge. (25/100)

The concepts of actions and transformations in Apache Spark are not something we think about every day while using Spark, but occasionally it is good to know the distinction.

First of all, we may be asked that question during a job interview. It is a good question because it helps spot people who did not bother reading the documentation of the tool they are using. Fortunately, after reading this article, you will know the difference.

In addition to that, we may need to know the difference to explain why the code we are reviewing is too slow. For example, some data engineers have strange ideas like calculating counts in the middle of a Spark job to log the number of rows or store them as a metric. While a well-placed count may help to debug and tremendously speed up problem-solving, counting the number of rows in every other line of code is massive overkill. The difference between a transformation and an action helps us explain why doing it is a significant bottleneck.

What is a transformation?

A transformation is every Spark operation that returns a DataFrame, Dataset, or an RDD. When we build a chain of transformations, we add building blocks to the Spark job, but no data gets processed. That is possible because transformations are lazy executed. Spark will calculate the value when it is necessary.

Spark Transformation Example

Of course, this also means that Spark needs to recompute the values when we re-use the same transformations. We can avoid that by using the persist or cache functions.

What is an action?

Actions, on the other hand, are not lazily executed. When we put an action in the code and Spark reaches that line of code when running the job, it will have to perform all of the transformations that lead to that action to produce a value.

Producing value is the key concept here. While transformations return one of the Spark data types, actions return a count of elements (for example, the count function), a list of them (collect, take, etc.), or store the data in external storage (write, saveAsTextFile, and others).

Spark Transformations And Actions Cheat Sheet Free

How to tell the difference

Spark Transformations And Actions Cheat Sheet

When we look at the Spark API, we can easily spot the difference between transformations and actions. If a function returns a DataFrame, Dataset, or RDD, it is a transformation. If it returns anything else or does not return a value at all (or returns Unit in the case of Scala API), it is an action.