# Quick Test : Daft With Ray In Fabric

<div data-node-type="callout">
<div data-node-type="callout-emoji">💡</div>
<div data-node-type="callout-text">This is not a benchmark. This is one specific instance of a test, on a specific dataset using specific transformations. My goal was just to see how to set up daft + ray in Fabric and compare for the sake of learning.</div>
</div>

Daft is a [distributed query engine](https://www.getdaft.io/projects/docs/en/latest/) which is gaining a lot of traction. You can [use a Ray cluster](https://www.getdaft.io/projects/docs/en/stable/user_guide/integrations/ray.html) to run daft on multiple workers similar to spark. I have no experience with Ray, so this is an attempt to see how it works. Based on my limited understanding, I don't think you can provision a multiple node Ray cluster in Fabric so in my test I am just using a single node with all available CPUs. I used Fabric runtime 1.2 with the default medium size pool.

The dataset is `Common Government` dataset from Tableau's [BI Benchmark](https://github.com/cwida/public_bi_benchmark/tree/master/benchmark/CommonGovernment) which aims to mimic real world data. This dataset as 54 columns and 160M rows. I saved this as a delta table in a lakehouse.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1722735092009/2326d27b-0b49-4801-a7b5-197ecc7f2064.png align="center")

**Setup :**

I transformed the data using same filters and aggregations for all engines and recorded the durations. These are all cold cache.

```python
#Fabric runtime 1.2
%pip install getdaft[all] polars deltalake --q

import daft
import ray
import polars as pl
import time
import psutil
import pandas as pd 
import matplotlib.pyplot as plt

path = "/lakehouse/default/Tables/commongov"

durations={}
```

**Polars:**

```python
start = time.perf_counter()
result=(pl.scan_delta(path)
.filter(pl.col("_c49")=="NAVY")
.filter(pl.col("_c9")<250)
.group_by(["_c3","_c35"])
.agg(pl.col("_c10").sum(), pl.col("_c37").sum())
.collect()
)

durations['polars']=round(time.perf_counter()-start)
```

**Daft (Default):**

```python
start = time.perf_counter()
table_uri = (path)
df = daft.read_delta_lake(table_uri)

df_agg = (df
            .where((df["_c55"]==2011) & 
                   (df["_c49"]=="NAVY") & 
                   (df["_c9"]<250)
                   )
            .groupby(["_c3","_c35"])
            .agg([(df["_c10"],"sum"),(df["_c37"],"sum")])
            .collect()
         )
durations['daft']=round(time.perf_counter()-start)
```

**Daft with Ray: Default**

```python
table_uri = (path)

if not ray.is_initialized():
    ray.init(runtime_env={"pip": ["getdaft"]}, num_cpus=8)
resources = ray.cluster_resources()
print("Available resources:", resources)
print("Available CPUs (workers):", resources.get("CPU", "Not available"))

# print CPU usage
def print_cpu_usage():
    cpu_usage = psutil.cpu_percent(interval=1, percpu=True)
    print("CPU Usage per core:", cpu_usage)

# CPU usage
print_cpu_usage()

start = time.perf_counter()
df = daft.read_delta_lake(table_uri)

df_agg = (
    df.where((df["_c55"] == 2011) & (df["_c49"] == "NAVY") & (df["_c9"] < 250))
    .groupby(["_c3", "_c35"])
    .agg([(df["_c10"], "sum"), (df["_c37"], "sum")])
    .collect()
)

durations['daft + ray']=round(time.perf_counter()-start)
```

**Daft with Ray : Partitioned data**

I partitioned the data into 8 partitions just to see if that would help with parallelizing the tasks effectively.

```python
#ray.shutdown() #shutdown the previous ray cluster
table_uri = (path)
if not ray.is_initialized():
    ray.init(runtime_env={"pip": ["getdaft"]}, num_cpus=8)
resources = ray.cluster_resources()
print("Available resources:", resources)

print("Available CPUs (workers):", resources.get("CPU", "Not available"))

print_cpu_usage()

start = time.perf_counter()
df = daft.read_delta_lake(table_uri).repartition(num=8) #partitions

df_agg = (
    df.where((df["_c55"] == 2011) & (df["_c49"] == "NAVY") & (df["_c9"] < 250))
    .groupby(["_c3", "_c35"])
    .agg([(df["_c10"], "sum"), (df["_c37"], "sum")])
    .collect()
)

durations['daft + ray_partitioned']=round(time.perf_counter()-start)
```

**Spark:**

```python
from pyspark.sql.functions import col, sum as sum_
start = time.perf_counter()
result = (
    spark.table("commongov")
      .filter(col("_c55") == 2011)
      .filter(col("_c49") == "NAVY")
      .filter(col("_c9") < 250)
      .groupBy("_c3", "_c35")
      .agg(sum_("_c10").alias("sum_c10"), sum_("_c37").alias("sum_c37"))
).collect()
durations['spark']=round(time.perf_counter()-start)
```

**Result:**

```python
df = pd.DataFrame(list(durations.items()), columns=['engine', 'duration']).sort_values("duration")
plt.figure(figsize=(10, 6))
barplot = plt.bar(df['engine'], df['duration'], color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd'])
plt.title('Comparison of Engine Durations', fontsize=16)
plt.xlabel('Engine', fontsize=14)
plt.ylabel('Duration (s)', fontsize=14)

for bar in barplot:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, yval, round(yval, 1), va='bottom', ha='center', fontsize=12)

plt.show()
```

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1722735568339/bd42f707-f5fd-498d-8d50-a660b34bf88d.png align="center")

**Notes:**

* Using the Ray cluster helped compared to polars and default daft - *in this case*
    
* Like spark, I tried experimenting with various partitioning strategies to balance the load but that didn't seem to improve the performance
    
* I don't know if there are any Ray configs that can be tuned to improve parallelization. I also don't know if these are generalizable.
    
* I don't know how to use Ray for prod workloads and if at all it can be used in Fabric, should be interesting.
    
* You can set up a Ray cluster [in databricks](https://docs.databricks.com/en/machine-learning/ray/index.html).
    
* How Amazon migrated from spark to daft + ray : [article](https://aws.amazon.com/blogs/opensource/amazons-exabyte-scale-migration-from-apache-spark-to-ray-on-amazon-ec2/)
    

If you spot anything odd or have suggestions, please let me know.
