A Quick Comparison Of Fabric Spark Configuration Settings
Comparison of Fabric Spark Runtime with the default Spark config
I compared the default Spark configurations in the Fabric Spark runtime with those of the standard Spark. I excluded configurations that were identical between the two, as well as those that were irrelevant. I thought sharing this information might be useful to others. Additionally, I have provided brief explanations on a couple of key configurations and their impact on performance. Keep in mind that each Spark job is unique, so it's essential to evaluate the requirements based on the specific job.
Code
This code extracts the default Spark configuration from the Spark documentation and compares it with the Spark config in Fabric. I have tried to make it dynamic so based on the Spark version in Fabric, it will fetch the configs from the corresponding Spark documentation. Note that the below comparison is for Fabric spark runtime 1.1 (Spark 3.3, Delta 2.2). I found 33 configs that are different.
from pyspark import SparkConf
import pandas as pd
def compare_sparkconf():
'''
Author : Sandeep Pawar | Fabric.guru | Sept 9, 2023
Compares spark configurations from current Fabric runtime with the
default configurations from spark documentation.
'''
ver = spark.version[:5]
try:
url = f'https://spark.apache.org/docs/{ver}/configuration.html'
dfs = pd.read_html(url)
except Exception as e:
return f"Check the URL. Error: {e}"
table_names = ['Application Properties', 'Runtime Environment', 'Shuffle Behavior', 'Spark UI', 'Spark Streaming', 'Spark MLlib', 'GraphX', 'Cluster Manager', 'Deploy', 'Kubernetes', 'Other']
for idx, df in enumerate(dfs):
df['Table_Name'] = table_names[idx] if idx < len(table_names) else 'Unknown'
final_df = pd.concat(dfs, ignore_index=True)
conf = SparkConf()
spark_configs_df = pd.DataFrame(conf.getAll(), columns=['Property Name', 'Fabric'])
joined_df = pd.merge(final_df, spark_configs_df, on='Property Name', how='inner')
## Excluding none, paths and credentials
excl = 'Default != "(none)" and not Default.fillna("").str.contains("/") and not Fabric.fillna("").str.contains("/") and not Default.fillna("").str.contains("password") and not Default == "None"'
filtered_df = joined_df.query( excl, engine='python')
return filtered_df[filtered_df.Default != filtered_df.Fabric][['Property Name', 'Default', 'Fabric']].reset_index(drop=True)
compare_sparkconf()
Property Name | Default | Fabric |
spark.driver.cores | 1 | 8 |
spark.driver.maxResultSize | 1g | 4096m |
spark.driver.memory | 1g | 56g |
spark.driver.memoryOverhead | driverMemory * spark.driver.memoryOverheadFactor, with minimum of 384 | 384 |
spark.executor.memory | 1g | 56g |
spark.executor.memoryOverhead | executorMemory * spark.executor.memoryOverheadFactor, with minimum of 384 | 384 |
spark.shuffle.file.buffer | 32k | 1m |
spark.shuffle.io.backLog | -1 | 8192 |
spark.shuffle.service.enabled | false | true |
spark.eventLog.enabled | false | true |
spark.eventLog.buffer.kb | 100k | 4k |
spark.ui.port | 4040 | 0 |
spark.io.compression.lz4.blockSize | 32k | 128kb |
spark.kryoserializer.buffer.max | 64m | 128m |
spark.rdd.compress | false | true |
spark.serializer | org.apache.spark.serializer. JavaSerializer | org.apache.spark.serializer.KryoSerializer |
spark.executor.cores | 1 in YARN mode, all the available cores on the worker in standalone and Mesos coarse-grained modes. | 8 |
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version | 1 | 2 |
spark.locality.wait | 3s | 1 |
spark.scheduler.minRegisteredResourcesRatio | 0.8 for KUBERNETES mode; 0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode | 0.0 |
spark.scheduler.mode | FIFO | FAIR |
spark.dynamicAllocation.enabled | false | true |
spark.dynamicAllocation.initialExecutors | spark.dynamicAllocation.minExecutors | 1 |
spark.dynamicAllocation.maxExecutors | infinity | 9 |
spark.dynamicAllocation.minExecutors | 0 | 1 |
spark.sql.autoBroadcastJoinThreshold | 10MB | 26214400 |
spark.sql.cbo.enabled | false | true |
spark.sql.cbo.joinReorder.enabled | false | true |
spark.sql.execution.arrow.pyspark.enabled | (value of spark.sql.execution.arrow.enabled) | true |
spark.sql.execution.arrow.pyspark.fallback.enabled | (value of spark.sql.execution.arrow.fallback.enabled) | true |
spark.sql.files.maxPartitionBytes | 128MB | 134217728 |
spark.sql.optimizer.runtime.bloomFilter.enabled | false | true |
spark.sql.statistics.fallBackToHdfs | false | true |
spark.sql.hive.metastore.version | 2.3.9 | 2.3.2 |
Here are a few that are worth noting:
Cost Based Optimizer:
spark.sql.cbo.enabled
is enabled by default. CBO optimizes the query performance by creating more efficient query plans compared to the rule-based optimizer, especially for queries involving multiple joins. However, CBO only works if you gather the statistics usingANALYZE TABLE
(which doesn't work with Delta v2 tables). Once statistics are available, optimization is automatic. The plan with the lowest estimated cost (I/O, CPU etc) is selected for execution by Catalyst optimizer. Be sure to update the statistics if your data changes frequently to utilize the optimization. I will write a blog on this.Broadcast Join Threshold:
spark.sql.autoBroadcastJoinThreshold
size is set to 25MB instead of 10MB. When you join two tables with significant skew, you can broadcast the smaller table to avoid data shuffling. If you have AQE on, Spark will automatically switch to broadcast join instead of sort-merge. Although the threshold has been doubled in Fabric, considering the executor memory is 56g, with 8 cores, roughly 25% of total will be used for execution so per core we will have 1.75GB memory. I think you can certainly bump it up to 150MB depending on the data and other tasks. (This is assuming the 56g is available.) -1 turns off broadcast join. If Autotune is used, it will automatically tune this setting. Note the Autotune documentation incorrectly mentions the default as 10MB.Serializer:
Note that Kryo serializer is used instead of the default Java. Synapse also uses Kryo which is faster and more compact but for some reason the product team never updated the documentation and still mentions Java as the default. I spoke about spark serialization here if you are interested to learn more.
Partition Size:
Perhaps one of the most important settings in spark tuning. But it's not different, it is still 128MB, just shown in bytes instead of MB. This is the max size in bytes in a single partition. Autotune will tune this as well. Typically you want to keep it <=200MB but depends on the data size, cluster size to maximize parallelization & resource utilization.
Bloom Filter
The bloom filter in the config is not the same as the Bloom Filter Indexes in Databricks. As far as I know, Fabric does not offer bloom filter indexing on delta tables.
Other than these nothing else stands out. Other configs are standard and based on the spark cluster you set up in the workspace settings. Fabric also has certain improvements/optimizations introduced in the Spark engine but that would be another blog. I just wanted to quickly identify if any of the standard configs were different in Fabric. Hope this helps.