Skip to main content

Command Palette

Search for a command to run...

Reading Delta Tables With ColumnMapping Using Polars

Reading warehouse delta tables in Python notebook using Polars

Updated
S

Principal Program Manager, Microsoft Fabric CAT helping users and organizations build scalable, insightful, secure solutions. Blogs, opinions are my own and do not represent my employer.

There was question a couple of days ago on r/MicrosoftFabric subreddit on reading Data Warehouse tables shortcutted into Lakehouse. You can easily query this using Spark or T-SQL in the notebook, the question was how to do this using Polars since delta tables created by Datawarehouse have Column Mapping enabled. Polars is built on Delta-rs which does not support reading tables with Column Mapping yet.

Below is a crude approach I came up with to map the logical column names to physical column names.

💡
Before you proceed, please note that this is a very inefficient solution and comes with many performance limitations. So, unless you have very small data and you can verify the data, I would advise using Spark or T-SQL. Verify and validate.

The logic:

  • Get the logical column names and physical column names to make a dictionary

  • Get the parquet files from the delta transaction log

  • Apply column mapping

  • Read and union

As you can see from above, you lose the parallelization and efficiency in the process.

Code

#Python notebook
import polars as pl
from deltalake import DeltaTable
import os

def scan_delta_cm(path: str) -> pl.LazyFrame:
   delta_table = DeltaTable(path)

   colmaps: dict[str, str] = dict()
   for field in delta_table.schema().fields:
       logical_name = field.name
       physical_name = field.metadata.get("delta.columnMapping.physicalName", field.name)
       colmaps[logical_name] = physical_name

   all_lazy_frames = []
   for add_action in delta_table.get_add_actions(flatten=True).to_pylist():
       file_path = os.path.join(delta_table.table_uri, add_action["path"])
       lazy_df = pl.scan_parquet(file_path)

       file_schema = lazy_df.collect_schema()
       available_columns = file_schema.names()

       select_exprs = []
       for logical_name, physical_name in colmaps.items():
           if physical_name in available_columns:
               select_exprs.append(pl.col(physical_name).alias(logical_name))

       if select_exprs:
           lazy_df = lazy_df.select(select_exprs)
           all_lazy_frames.append(lazy_df)

   if all_lazy_frames:
       return pl.concat(all_lazy_frames)
   else:
       raise ValueError("No data files found")

#path is abfs path of the table
df = scan_delta_cm(path).collect()
# df.head()

Think of this more of an experiment than a solution which will work for limited cases (tables with deletion vectors wont work either as expected). For any business critical job, I would advise using Spark in such scenarios.

The other easier alternative is to use Duckdb which supports tables with columnMapping.

If you must use Polars, you can zero-copy this duckdb to polars df.