You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We will process our data in AWS environment using Spark (on EMR) and use object storage (S3) for storage.
We should prefer Dataset for storing and processing data in memory because of following reasons:
Static-typing and runtime type-safety. With a DataFrame, you can select a nonexistent column and notice your mistake only when you run your code. With a Dataset, you have a compile time error.
Provides Catalyst optimization and benefit from Tungsten's efficient bytecode generation due to Encoders used for Dataset
Dataset has helpers called encoders, which are smart and efficient encoding utilities that convert data inside each user-defined object into a compact binary format. Spark understands the structure of data in Datasets, it can create a more optimal layout in memory when caching Datasets. This translates into a reduction of memory usage if and when a Dataset is cached in memory as well as a reduction in the number of bytes that Spark needs to transfer over a network during the shuffling process.
Kryo serializer usage leads to Spark storing every row in the Dataset as a flat binary object using Spark's internal encoders and is >10x faster than dataframe's Kryo serialization.
Ref: Heather Miller's Course
While processing billions of records or TB's of data we faced multiple hurdles. This wiki documents extensively the error team faced while processing large dataset using Spark jobs and how to resolve them.
The Spark job and cluster optimization for processing large dataset are also explained below.
Note: Please go through reference links provided to fully understand how spark options affects the data processing
Executor resource calculation:
By assigning 1 core and 1GB for YARN, we are left with 47 core per node.
We allocated 5 cores per executor for max HDFS throughput
MemoryStore and BlockManagerMaster per node consumes 12GB per node
Memory per executor = (374 - 12 -12) / 9 ~= 40 GB
Number of executor = (48 - 1) / 5 ~= 9
Specs per CORE or TASK node of r4.12xlarge instance type:
Cores = 48
Memory = (384 GiB * 1000) / 1024 = 375 GB
Note: If EMR cluster is configured to use task nodes, do not exceed CORE Node to TASK Node ratio 2:1 (as task node does not have HDFS storage. Also, allocate more HDFS storage to compensate for the lack of HDFS storage on task nodes).
Spark submit options:
Spark executor memory allocation layout and calculations:
Approx. (spark.memory. storageFraction * spark.executor. memory) memory for cache, broadcast and accumulator
Benefits
Reference
Parameter
spark.dynamicAllocation. enabled and
Value
TRUE
Explanation
To allocate executor dynamically based on yarn.scheduler. capacity.resource- calculator = org.apache. hadoop.yarn. util.resource. DominantResource Calculator
Benefits
Scales number of executors based on CPU and memory requirements.
Spark shuffle service maintains the shuffle files generated by all Spark executors that ran on that node. Spark executors write the shuffle data and manage it
Benefits
Spark shuffle service service preserves the shuffle files written by executors so the executors can be safely removedResolves error: java.io.IOException: All datanodes are bad."
The parameter -XX:+UseG1GC specifies that the G1GC garbage collector should be used. (The default is -XX: +UseParallelGC.) To understand the frequency and execution time of the garbage collection, use the parameters -verbose:gc -XX: +PrintGCDetails -XX: +PrintGCDateStamps. To initiate garbage collection sooner, set Initiating HeapOccupancyPercent to 35 (the default is 0.45). Doing this helps avoid potential garbage collection for the total memory, which can take a significant amount of time.
Benefits
Better garbage collection as G1 is suItable for large heap to resolve Out of memory issue, reduce the gc pause time, high latency and low throughput
Resolves error: serialized results of x tasks is bigger than spark.driver. maxResultSize
Reference
Parameter
spark.yarn. maxAppAttempts
Value
2
Explanation
Maximum attempts for running application
Benefits
Reference
Parameter
spark.rpc. message.maxSize
Value
2048
Explanation
Increases remote procedure call message size
Benefits
Resolves error: exceeds max allowed: spark.rpc. message.maxSize
Reference
Parameter
spark.spark. worker.timeout
Value
240
Explanation
Allows task working on skewed data more time for execution. Proper re-partitioning (with salting) on join or groupBy column reduces time for execution
Benefits
Resolves: Lost executor xx on slave1.cluster: Executor heartbeat timed out after xxxxx msWARN TransportChannel Handler: Exception in connection from /172.31.3.245:46014
Reduce the number of times the disk file overflows during the shuffle write process, which can reduce the number of disk IO times and improve performance
Benefits
Reference
Parameter
spark.locality. wait
Value
15s
Explanation
Reduces large amounts of data transfer over network (shuffling)
Benefits
Reference
Parameter
spark.shuffle. io.connectionTimeout
Value
3000
Explanation
Benefits
Resolves error: "org.apache. spark.rpc. RpcTimeoutException: Futures timed out after [120 seconds]"
Reduces serialized data size by 50% resulting in less spill size (memory and disk), storage io and network io, but increases CPU overhead by 2-5% which is acceptable while processing large datasets
Benefits
Used by spark.sql. inMemoryColumnarStorage. compressed, spark.rdd. compress, spark.shuffle. compress, spark.shuffle. compress, spark.shuffle. spill.compress, spark.checkpoint. compress, spark.broadcast. compress. Which allows us to broadcast table with 2x records, spill less size (memory and data), reduce disk and network io.
Optimization for custom ShuffleHash join implementation. Note that the MergeSort join is default method which is better for large datasets due to memory limitation
Benefits
Reference
Parameter
spark.reducer. maxSizeInFlight
Value
96
Explanation
Increase data reducers is requested from "map" task outputs in bigger chunks which would improve performance
Resolves error: ERROR scheduler. LiveListenerBus: Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler
Data based spark optimizations:
Let's assume we have two tables whose raw/csv file size is 3TB and 500GB respectively and needs to be joined on particular columns. Following are the ways to optimize the joins and prevent the job failures as the data grows gradually after each refresh.
Set spark.sql. files.maxPartitionBytes to 128MB which will reparation the files after reading so that resultant partitions will be each of 128MB.
If fill rate of joining column is not 100%, filter records containing null and perform join on those records. Union the output with records containing null values.
Set spark.shuffle.paritions value to re-partition data and increase tasks during join operation resulting in increased parallel processing. The partition size should be ~128MB corresponding to the block size in EMRFS (ref. AWS docs).
To know amount of data processed and time taken by each task, open the stage summary metrics in Application Master:
If 25th percentile takes <100ms, but MAX time is > 5 min for task implies that the data is skewed. The data can be evenly distributed by adding salt column:
The default resource calculator i.e org.apache. hadoop.yarn. util.resource. DefaultResource Calculator uses only memory information for allocating containers and CPU scheduling is not enabled by default