A Quick Comparison Of Fabric Spark Configuration Settings

Comparison of Fabric Spark Runtime with the default Spark config

I compared the default Spark configurations in the Fabric Spark runtime with those of the standard Spark. I excluded configurations that were identical between the two, as well as those that were irrelevant. I thought sharing this information might be useful to others. Additionally, I have provided brief explanations on a couple of key configurations and their impact on performance. Keep in mind that each Spark job is unique, so it's essential to evaluate the requirements based on the specific job.

💡
Below configs are for Fabric F64 capacity with Starter Pool. I will update the post later with other capacities and pools as well.

Code

This code extracts the default Spark configuration from the Spark documentation and compares it with the Spark config in Fabric. I have tried to make it dynamic so based on the Spark version in Fabric, it will fetch the configs from the corresponding Spark documentation. Note that the below comparison is for Fabric spark runtime 1.1 (Spark 3.3, Delta 2.2). I found 33 configs that are different.

from pyspark import SparkConf
import pandas as pd

def compare_sparkconf():

    '''
    Author : Sandeep Pawar   |   Fabric.guru   | Sept 9, 2023
    Compares spark configurations from current Fabric runtime with the
    default configurations from spark documentation. 

    '''

    ver = spark.version[:5]

    try:
        url = f'https://spark.apache.org/docs/{ver}/configuration.html'
        dfs = pd.read_html(url)
    except Exception as e:
        return f"Check the URL. Error: {e}"

    table_names = ['Application Properties', 'Runtime Environment', 'Shuffle Behavior', 'Spark UI', 'Spark Streaming', 'Spark MLlib', 'GraphX', 'Cluster Manager', 'Deploy', 'Kubernetes', 'Other']

    for idx, df in enumerate(dfs):
        df['Table_Name'] = table_names[idx] if idx < len(table_names) else 'Unknown'

    final_df = pd.concat(dfs, ignore_index=True)

    conf = SparkConf()
    spark_configs_df = pd.DataFrame(conf.getAll(), columns=['Property Name', 'Fabric'])

    joined_df = pd.merge(final_df, spark_configs_df, on='Property Name', how='inner')

    ## Excluding none, paths and credentials 
    excl = 'Default != "(none)"  and not Default.fillna("").str.contains("/") and not Fabric.fillna("").str.contains("/") and not Default.fillna("").str.contains("password") and not Default == "None"'
    filtered_df = joined_df.query( excl, engine='python')

    return filtered_df[filtered_df.Default != filtered_df.Fabric][['Property Name', 'Default', 'Fabric']].reset_index(drop=True)

compare_sparkconf()
Property NameDefaultFabric
spark.driver.cores18
spark.driver.maxResultSize1g4096m
spark.driver.memory1g56g
spark.driver.memoryOverheaddriverMemory * spark.driver.memoryOverheadFactor, with minimum of 384384
spark.executor.memory1g56g
spark.executor.memoryOverheadexecutorMemory * spark.executor.memoryOverheadFactor, with minimum of 384384
spark.shuffle.file.buffer32k1m
spark.shuffle.io.backLog-18192
spark.shuffle.service.enabledfalsetrue
spark.eventLog.enabledfalsetrue
spark.eventLog.buffer.kb100k4k
spark.ui.port40400
spark.io.compression.lz4.blockSize32k128kb
spark.kryoserializer.buffer.max64m128m
spark.rdd.compressfalsetrue
spark.serializerorg.apache.spark.serializer. JavaSerializerorg.apache.spark.serializer.KryoSerializer
spark.executor.cores1 in YARN mode, all the available cores on the worker in standalone and Mesos coarse-grained modes.8
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version12
spark.locality.wait3s1
spark.scheduler.minRegisteredResourcesRatio0.8 for KUBERNETES mode; 0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode0.0
spark.scheduler.modeFIFOFAIR
spark.dynamicAllocation.enabledfalsetrue
spark.dynamicAllocation.initialExecutorsspark.dynamicAllocation.minExecutors1
spark.dynamicAllocation.maxExecutorsinfinity9
spark.dynamicAllocation.minExecutors01
spark.sql.autoBroadcastJoinThreshold10MB26214400
spark.sql.cbo.enabledfalsetrue
spark.sql.cbo.joinReorder.enabledfalsetrue
spark.sql.execution.arrow.pyspark.enabled(value of spark.sql.execution.arrow.enabled)true
spark.sql.execution.arrow.pyspark.fallback.enabled(value of spark.sql.execution.arrow.fallback.enabled)true
spark.sql.files.maxPartitionBytes128MB134217728
spark.sql.optimizer.runtime.bloomFilter.enabledfalsetrue
spark.sql.statistics.fallBackToHdfsfalsetrue
spark.sql.hive.metastore.version2.3.92.3.2

Here are a few that are worth noting:

  • Cost Based Optimizer:

    spark.sql.cbo.enabled is enabled by default. CBO optimizes the query performance by creating more efficient query plans compared to the rule-based optimizer, especially for queries involving multiple joins. However, CBO only works if you gather the statistics using ANALYZE TABLE (which doesn't work with Delta v2 tables). Once statistics are available, optimization is automatic. The plan with the lowest estimated cost (I/O, CPU etc) is selected for execution by Catalyst optimizer. Be sure to update the statistics if your data changes frequently to utilize the optimization. I will write a blog on this.

  • Broadcast Join Threshold:

    spark.sql.autoBroadcastJoinThreshold size is set to 25MB instead of 10MB. When you join two tables with significant skew, you can broadcast the smaller table to avoid data shuffling. If you have AQE on, Spark will automatically switch to broadcast join instead of sort-merge. Although the threshold has been doubled in Fabric, considering the executor memory is 56g, with 8 cores, roughly 25% of total will be used for execution so per core we will have 1.75GB memory. I think you can certainly bump it up to 150MB depending on the data and other tasks. (This is assuming the 56g is available.) -1 turns off broadcast join. If Autotune is used, it will automatically tune this setting. Note the Autotune documentation incorrectly mentions the default as 10MB.

  • Serializer:

    Note that Kryo serializer is used instead of the default Java. Synapse also uses Kryo which is faster and more compact but for some reason the product team never updated the documentation and still mentions Java as the default. I spoke about spark serialization here if you are interested to learn more.

  • Partition Size:

    Perhaps one of the most important settings in spark tuning. But it's not different, it is still 128MB, just shown in bytes instead of MB. This is the max size in bytes in a single partition. Autotune will tune this as well. Typically you want to keep it <=200MB but depends on the data size, cluster size to maximize parallelization & resource utilization.

  • Bloom Filter

    The bloom filter in the config is not the same as the Bloom Filter Indexes in Databricks. As far as I know, Fabric does not offer bloom filter indexing on delta tables.

Other than these nothing else stands out. Other configs are standard and based on the spark cluster you set up in the workspace settings. Fabric also has certain improvements/optimizations introduced in the Spark engine but that would be another blog. I just wanted to quickly identify if any of the standard configs were different in Fabric. Hope this helps.

Did you find this article valuable?

Support Sandeep Pawar | Microsoft Fabric by becoming a sponsor. Any amount is appreciated!