Azure Databricks Series: Connect Function App to Export Cluster Configuration via Visual Studio Code

👉 You can also watch this as a YouTube video here: https://www.youtube.com/watch?v=X_z25rh-Ids

In this blog, we will explore how to use Azure Function Apps to connect with selected Azure Databricks workspaces and retrieve the configuration details of available clusters. We’ll use Visual Studio Code as the development environment and configure our Function App with Python scripts to automate this process.

This is a step-by-step guide to exporting cluster configurations in a CSV format for better monitoring and analysis.


🔹 Prerequisites

Before starting, make sure you have:

  • An active Azure Subscription
  • Azure Databricks Workspace(s) created
  • Personal Access Tokens (PATs) generated for each workspace
  • Visual Studio Code with Azure Functions extension installed
  • Python environment ready

🔹 Step 1: Setup Local Settings

Add your workspace URLs and PAT tokens in local.settings.json:

"DATABRICKS_WORKSPACE_URLS": "https://adb-26709042233374857.17.azuredatabricks.net,https://adb-1311525322452571.11.azuredatabricks.net,https://adb-32008745334111.11.azuredatabricks.net",
"DATABRICKS_PAT_TOKENS": "dapixxxxxxxxxxxxxxxxxxxxxxxxx,dapiyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy,dapizzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"

⚠️ Ensure the number of URLs and tokens match.


🔹 Step 2: Requirements File

Create a requirements.txt with the following dependencies:

azure-functions
azure-identity
azure-mgmt-resource
requests

This ensures Azure Functions runtime has all required packages.


🔹 Step 3: Python Script

Below is the main Function App script that retrieves Databricks cluster details, flattens the JSON output, and generates a downloadable CSV file:

import logging
import os, logging
import azure.functions as func
import requests
from datetime import datetime

# Databricks credentials from environment variables
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
workspace_urls = os.environ.get("DATABRICKS_WORKSPACE_URLS", "")
pat_tokens = os.environ.get("DATABRICKS_PAT_TOKENS", "")
WORKSPACES = [url.strip() for url in workspace_urls.split(",") if url.strip()]
PAT_TOKENS = [tok.strip() for tok in pat_tokens.split(",") if tok.strip()]

if len(WORKSPACES) != len(PAT_TOKENS):
    logging.warning("The number of workspace URLs and PAT tokens do not match. Please check app settings.")

@app.route(route="JBadbcostanalysistrigger")
def JBadbcostanalysistrigger(req: func.HttpRequest) -> func.HttpResponse:
    logging.info("✅ Databricks Cluster Filtered Report Triggered")
    if not WORKSPACES:
        return func.HttpResponse("No Databricks workspaces configured.", status_code=400)

    selected_headers = [
        "workspace_url", "cluster_name", "autotermination_minutes", "is_single_node",
        "num_workers", "state", "start_time", "terminated_time", "last_activity_time",
        "termination_reason.code", "termination_reason.parameters", "data_security_mode",
        "driver_healthy", "driver_node_type_id", "effective_spark_version", "node_type_id",
        "release_version", "spark_version"
    ]
    all_rows = []
    for i, workspace_url in enumerate(WORKSPACES):
        token = PAT_TOKENS[i] if i < len(PAT_TOKENS) else PAT_TOKENS[0]
        clusters = list_clusters(workspace_url, token)
        for cluster in clusters:
            flat = flatten_cluster(cluster, workspace_url)
            all_rows.append(flat)

    # Build CSV content
    csv_lines = [",".join(selected_headers)]
    for row in all_rows:
        csv_line = []
        for h in selected_headers:
            value = row.get(h, "")
            if isinstance(value, str):
                value = value.replace('"', '""')
                if ',' in value or '\n' in value:
                    value = f'"{value}"'
            csv_line.append(str(value))
        csv_lines.append(",".join(csv_line))
    csv_output = "\n".join(csv_lines)

    logging.info("✅ Filtered cluster details prepared in CSV format.")
    return func.HttpResponse(csv_output, status_code=200, mimetype="text/csv")

def list_clusters(workspace_url, pat_token):
    api_url = f"{workspace_url.rstrip('/')}/api/2.0/clusters/list"
    headers = {"Authorization": f"Bearer {pat_token}"}
    try:
        res = requests.get(api_url, headers=headers)
    except Exception as e:
        logging.error("HTTP request to %s failed: %s", workspace_url, e)
        return []
    if res.status_code != 200:
        logging.error("Non-200 response from %s: %s %s", workspace_url, res.status_code, res.text)
        return []
    return res.json().get("clusters", [])

def convert_epoch_to_datetime(ms):
    try:
        return datetime.utcfromtimestamp(ms / 1000).strftime('%Y-%m-%d %H:%M:%S')
    except:
        return ms

def flatten_cluster(cluster: dict, workspace_url: str) -> dict:
    flat = {
        "workspace_url": workspace_url,
        "cluster_name": cluster.get("cluster_name", ""),
        "autotermination_minutes": cluster.get("autotermination_minutes", ""),
        "is_single_node": cluster.get("is_single_node", ""),
        "num_workers": cluster.get("num_workers", ""),
        "state": cluster.get("state", ""),
        "start_time": convert_epoch_to_datetime(cluster.get("start_time", "")),
        "terminated_time": convert_epoch_to_datetime(cluster.get("terminated_time", "")),
        "last_activity_time": convert_epoch_to_datetime(cluster.get("last_activity_time", "")),
        "termination_reason.code": cluster.get("termination_reason", {}).get("code", ""),
        "termination_reason.parameters": cluster.get("termination_reason", {}).get("parameters", ""),
        "data_security_mode": cluster.get("data_security_mode", ""),
        "driver_healthy": cluster.get("driver_healthy", ""),
        "driver_node_type_id": cluster.get("driver_node_type_id", ""),
        "effective_spark_version": cluster.get("effective_spark_version", ""),
        "node_type_id": cluster.get("node_type_id", ""),
        "release_version": cluster.get("release_version", ""),
        "spark_version": cluster.get("spark_version", "")
    }
    return flat

🔹 Step 4: Deploy and Test

  1. Deploy the Function App to Azure.
  2. Trigger the HTTP endpoint /JBadbcostanalysistrigger.
  3. A CSV file will be returned containing all Databricks cluster configurations from the selected workspaces.

🎯 Conclusion

In this blog, we demonstrated how to:

  • Connect Azure Function App to multiple Databricks Workspaces
  • Retrieve cluster configurations via Databricks REST API
  • Export the details into a CSV for analysis

This approach helps automate cluster monitoring and cost analysis across multiple workspaces efficiently.

👉 Don’t forget to check the full Azure Databricks Series playlist for step-by-step tutorials:
https://www.youtube.com/playlist?list=PLNj2XeCNjFeosTuxZLjfYvnW4H1hsPH07

Thank You,
Vivek Janakiraman

Disclaimer:
The views expressed on this blog are mine alone and do not reflect the views of my company or anyone else. All postings on this blog are provided “AS IS” with no warranties, and confers no rights.

Leave a Reply