🔷 Azure Databricks Series: Displaying All Clusters in a Databricks Workspace – SQL Warehouses, All-Purpose Compute, Job Clusters & More

In a modern data landscape, keeping track of all compute resources in your Azure Databricks Workspace—including SQL Warehouses, Job Clusters, and All-Purpose Clusters—is crucial for monitoring costs, performance, and resource utilization. 🚀

In this post, we’ll walk through a PySpark + REST API solution to dynamically list all clusters in your workspace, categorize them by type, and store the results into a Delta table for easy access and reporting.


📌 Why Is This Important?

Databricks provides different types of compute environments:

  • SQL Warehouses (formerly SQL Endpoints) for BI workloads
  • ⚙️ All-Purpose Clusters for interactive analysis
  • 📦 Job Clusters for scheduled or triggered pipelines

Tracking these clusters helps:

  • Audit usage & track ownership 👀
  • Understand memory footprint & scale ⛽
  • Identify unused or idle clusters for optimization 💸

🛠️ Prerequisites

Make sure you have the following ready:

  • An active Azure Databricks Workspace
  • A Personal Access Token with workspace read permissions
  • A Spark session running in a notebook

📄 Code

import requests
from pyspark.sql import SparkSession

# provide Databricks config
instance = "https://adb-13115258385123.34.azuredatabricks.net/"
token = "dapiaXXXXXXXX"

headers = {
    "Authorization": f"Bearer {token}"
}

def size_to_memory(cluster_size):
    mapping = {
        "2X-Small": "64 GB",
        "X-Small": "128 GB",
        "Small": "256 GB",
        "Medium": "512 GB",
        "Large": "1 TB",
        "X-Large": "2 TB",
        "2X-Large": "4 TB",
        "3X-Large": "8 TB",
        "4X-Large": "16 TB"
    }
    return mapping.get(cluster_size, "Unknown")

node_url = f"{instance}/api/2.0/clusters/list-node-types"
node_response = requests.get(node_url, headers=headers)
node_types = node_response.json().get("node_types", [])

node_memory_map = {}
for node in node_types:
    node_id = node.get("node_type_id")
    mem_gb = node.get("memory_mb", 0) // 1024
    node_memory_map[node_id] = f"{mem_gb} GB"

sql_url = f"{instance}/api/2.0/sql/endpoints"
sql_response = requests.get(sql_url, headers=headers)
sql_data = sql_response.json()

records = []
for endpoint in sql_data.get("endpoints", []):
    records.append({
        "name": endpoint.get("name", ""),
        "id": endpoint.get("id", ""),
        "cluster_size_or_node_type": endpoint.get("cluster_size", ""),
        "approx_memory": size_to_memory(endpoint.get("cluster_size", "")),
        "auto_stop_mins": str(endpoint.get("auto_stop_mins", "")),
        "creator": endpoint.get("creator_name", ""),
        "state": endpoint.get("state", ""),
        "cluster_type": "SQL Warehouse"
    })

cluster_url = f"{instance}/api/2.0/clusters/list"
cluster_response = requests.get(cluster_url, headers=headers)
cluster_data = cluster_response.json()

for cluster in cluster_data.get("clusters", []):
    node_type_id = cluster.get("node_type_id", "")
    mem = node_memory_map.get(node_type_id, "Unknown")
    
    autoscale = cluster.get("autoscale", {})
    if autoscale:
        workers = f'{autoscale.get("min_workers")} - {autoscale.get("max_workers")}'
    else:
        workers = str(cluster.get("num_workers", 0))

    source = cluster.get("cluster_source", "").upper()
    if source == "JOB":
        cluster_type = "Job"
    elif source in ["UI", "API"]:
        cluster_type = "All-Purpose"
    else:
        cluster_type = "Unknown"

    records.append({
        "name": cluster.get("cluster_name", ""),
        "id": cluster.get("cluster_id", ""),
        "cluster_size_or_node_type": node_type_id,
        "approx_memory": mem,
        "auto_stop_mins": str(cluster.get("autotermination_minutes", "")),
        "creator": cluster.get("creator_user_name", ""),
        "state": cluster.get("state", ""),
        "cluster_type": cluster_type
    })

df = spark.createDataFrame(records)

df.write.format("delta").mode("overwrite").saveAsTable("default.all_clusters_summary")

display(spark.table("default.all_clusters_summary"))

🔒 Security Note

👉 Always keep your token safe. Never expose it in version control or public notebooks. Consider storing it securely in Databricks secrets for production use.


🧠 Final Thoughts

With this solution, you can:

  • Get real-time inventory of all Databricks compute environments
  • Ensure accountability and governance
  • Optimize resource usage and cost

🔁 Automate this with a scheduled job or dashboard, and you’ve got yourself a powerful monitoring solution!


💬 Got Questions?

Let me know in the comments for any questions or enhancements!

Thank You,
Vivek Janakiraman

Disclaimer:
The views expressed on this blog are mine alone and do not reflect the views of my company or anyone else. All postings on this blog are provided “AS IS” with no warranties, and confers no rights.

Leave a Reply