Azure Databricks Series: Connect Function App to Export Cluster Configuration via Visual Studio Code

๐Ÿ‘‰ You can also watch this as a YouTube video here: https://www.youtube.com/watch?v=X_z25rh-Ids

In this blog, we will explore how to use Azure Function Apps to connect with selected Azure Databricks workspaces and retrieve the configuration details of available clusters. Weโ€™ll use Visual Studio Code as the development environment and configure our Function App with Python scripts to automate this process.

This is a step-by-step guide to exporting cluster configurations in a CSV format for better monitoring and analysis.


๐Ÿ”น Prerequisites

Before starting, make sure you have:

  • An active Azure Subscription
  • Azure Databricks Workspace(s) created
  • Personal Access Tokens (PATs) generated for each workspace
  • Visual Studio Code with Azure Functions extension installed
  • Python environment ready

๐Ÿ”น Step 1: Setup Local Settings

Add your workspace URLs and PAT tokens in local.settings.json:

"DATABRICKS_WORKSPACE_URLS": "https://adb-26709042233374857.17.azuredatabricks.net,https://adb-1311525322452571.11.azuredatabricks.net,https://adb-32008745334111.11.azuredatabricks.net",
"DATABRICKS_PAT_TOKENS": "dapixxxxxxxxxxxxxxxxxxxxxxxxx,dapiyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy,dapizzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"

โš ๏ธ Ensure the number of URLs and tokens match.


๐Ÿ”น Step 2: Requirements File

Create a requirements.txt with the following dependencies:

azure-functions
azure-identity
azure-mgmt-resource
requests

This ensures Azure Functions runtime has all required packages.


๐Ÿ”น Step 3: Python Script

Below is the main Function App script that retrieves Databricks cluster details, flattens the JSON output, and generates a downloadable CSV file:

import logging
import os, logging
import azure.functions as func
import requests
from datetime import datetime

# Databricks credentials from environment variables
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
workspace_urls = os.environ.get("DATABRICKS_WORKSPACE_URLS", "")
pat_tokens = os.environ.get("DATABRICKS_PAT_TOKENS", "")
WORKSPACES = [url.strip() for url in workspace_urls.split(",") if url.strip()]
PAT_TOKENS = [tok.strip() for tok in pat_tokens.split(",") if tok.strip()]

if len(WORKSPACES) != len(PAT_TOKENS):
    logging.warning("The number of workspace URLs and PAT tokens do not match. Please check app settings.")

@app.route(route="JBadbcostanalysistrigger")
def JBadbcostanalysistrigger(req: func.HttpRequest) -> func.HttpResponse:
    logging.info("โœ… Databricks Cluster Filtered Report Triggered")
    if not WORKSPACES:
        return func.HttpResponse("No Databricks workspaces configured.", status_code=400)

    selected_headers = [
        "workspace_url", "cluster_name", "autotermination_minutes", "is_single_node",
        "num_workers", "state", "start_time", "terminated_time", "last_activity_time",
        "termination_reason.code", "termination_reason.parameters", "data_security_mode",
        "driver_healthy", "driver_node_type_id", "effective_spark_version", "node_type_id",
        "release_version", "spark_version"
    ]
    all_rows = []
    for i, workspace_url in enumerate(WORKSPACES):
        token = PAT_TOKENS[i] if i < len(PAT_TOKENS) else PAT_TOKENS[0]
        clusters = list_clusters(workspace_url, token)
        for cluster in clusters:
            flat = flatten_cluster(cluster, workspace_url)
            all_rows.append(flat)

    # Build CSV content
    csv_lines = [",".join(selected_headers)]
    for row in all_rows:
        csv_line = []
        for h in selected_headers:
            value = row.get(h, "")
            if isinstance(value, str):
                value = value.replace('"', '""')
                if ',' in value or '\n' in value:
                    value = f'"{value}"'
            csv_line.append(str(value))
        csv_lines.append(",".join(csv_line))
    csv_output = "\n".join(csv_lines)

    logging.info("โœ… Filtered cluster details prepared in CSV format.")
    return func.HttpResponse(csv_output, status_code=200, mimetype="text/csv")

def list_clusters(workspace_url, pat_token):
    api_url = f"{workspace_url.rstrip('/')}/api/2.0/clusters/list"
    headers = {"Authorization": f"Bearer {pat_token}"}
    try:
        res = requests.get(api_url, headers=headers)
    except Exception as e:
        logging.error("HTTP request to %s failed: %s", workspace_url, e)
        return []
    if res.status_code != 200:
        logging.error("Non-200 response from %s: %s %s", workspace_url, res.status_code, res.text)
        return []
    return res.json().get("clusters", [])

def convert_epoch_to_datetime(ms):
    try:
        return datetime.utcfromtimestamp(ms / 1000).strftime('%Y-%m-%d %H:%M:%S')
    except:
        return ms

def flatten_cluster(cluster: dict, workspace_url: str) -> dict:
    flat = {
        "workspace_url": workspace_url,
        "cluster_name": cluster.get("cluster_name", ""),
        "autotermination_minutes": cluster.get("autotermination_minutes", ""),
        "is_single_node": cluster.get("is_single_node", ""),
        "num_workers": cluster.get("num_workers", ""),
        "state": cluster.get("state", ""),
        "start_time": convert_epoch_to_datetime(cluster.get("start_time", "")),
        "terminated_time": convert_epoch_to_datetime(cluster.get("terminated_time", "")),
        "last_activity_time": convert_epoch_to_datetime(cluster.get("last_activity_time", "")),
        "termination_reason.code": cluster.get("termination_reason", {}).get("code", ""),
        "termination_reason.parameters": cluster.get("termination_reason", {}).get("parameters", ""),
        "data_security_mode": cluster.get("data_security_mode", ""),
        "driver_healthy": cluster.get("driver_healthy", ""),
        "driver_node_type_id": cluster.get("driver_node_type_id", ""),
        "effective_spark_version": cluster.get("effective_spark_version", ""),
        "node_type_id": cluster.get("node_type_id", ""),
        "release_version": cluster.get("release_version", ""),
        "spark_version": cluster.get("spark_version", "")
    }
    return flat

๐Ÿ”น Step 4: Deploy and Test

  1. Deploy the Function App to Azure.
  2. Trigger the HTTP endpoint /JBadbcostanalysistrigger.
  3. A CSV file will be returned containing all Databricks cluster configurations from the selected workspaces.

๐ŸŽฏ Conclusion

In this blog, we demonstrated how to:

  • Connect Azure Function App to multiple Databricks Workspaces
  • Retrieve cluster configurations via Databricks REST API
  • Export the details into a CSV for analysis

This approach helps automate cluster monitoring and cost analysis across multiple workspaces efficiently.

๐Ÿ‘‰ Donโ€™t forget to check the full Azure Databricks Series playlist for step-by-step tutorials:
https://www.youtube.com/playlist?list=PLNj2XeCNjFeosTuxZLjfYvnW4H1hsPH07

Thank You,
Vivek Janakiraman

Disclaimer:
The views expressed on this blog are mine alone and do not reflect the views of my company or anyone else. All postings on this blog are provided โ€œAS ISโ€ with no warranties, and confers no rights.

Azure Databricks Series: Connect Function App to Workspace & Export Cluster Config via Azure Portal

๐Ÿ“บ You can also watch this as a YouTube video here:
๐Ÿ‘‰ https://www.youtube.com/watch?v=4Og3btWBNT0


๐Ÿ”น Introduction

In this blog, weโ€™ll walk through how to use Azure Function Apps to connect to selected Azure Databricks workspaces and export the configuration details of available clusters.

This is especially useful for:

  • Auditing cluster usage
  • Tracking configuration changes
  • Exporting cluster details for compliance and reporting

Weโ€™ll be using:

  • Azure Portal to configure the Function App
  • Python script to call Databricks REST APIs
  • Environment variables to manage credentials securely

๐Ÿ”น Prerequisites

Before we begin, ensure you have:
โœ”๏ธ Access to an Azure subscription
โœ”๏ธ One or more Azure Databricks workspaces with valid PAT tokens
โœ”๏ธ A basic understanding of Azure Function Apps
โœ”๏ธ Python installed locally if you want to test before deployment


๐Ÿ”น Step 1: Create a Function App in Azure Portal

  1. Go to the Azure Portal
  2. Create a Function App with:
    • Runtime stack: Python
    • Version: 3.11
    • Hosting: Consumption Plan or Premium Plan (based on needs)
  3. Deploy and wait for the Function App to be ready.

๐Ÿ”น Step 2: Configure Environment Variables

Inside your Function App โ†’ Configuration โ†’ Application settings, add the following:

DATABRICKS_WORKSPACE_URLS

https://adb-2670904240043557.17.azuredatabricks.net,https://adb-1311525333242571.11.azuredatabricks.net,https://adb-320087374534111.11.azuredatabricks.net

DATABRICKS_PAT_TOKENS

dapixxxxxxxxxxxxxxxxxxxxxxxxxxxxx,dapiyyyyyyyyyyyyyyyyyyyyyyyyy,dapizzzzzzzzzzzzzzzzzzzzzzzzzzzz

These variables will allow the function to securely authenticate with Databricks workspaces.


๐Ÿ”น Step 3: Add Dependencies

In your Function App project, create a requirements.txt file with:

azure-functions
azure-identity
azure-mgmt-resource
requests

This ensures your function has the right libraries to run.


๐Ÿ”น Step 4: Function Definition

Inside your project, create the function.json to define HTTP trigger bindings:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": ["get"]
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

This makes the function accessible via HTTP GET requests.


๐Ÿ”น Step 5: Python Code to Retrieve Cluster Configurations

Now, add the following Python script to your Function App:

import os, logging
import azure.functions as func
import requests
from datetime import datetime

# Databricks credentials from environment variables
workspace_urls = os.environ.get("DATABRICKS_WORKSPACE_URLS", "")
pat_tokens = os.environ.get("DATABRICKS_PAT_TOKENS", "")
WORKSPACES = [url.strip() for url in workspace_urls.split(",") if url.strip()]
PAT_TOKENS = [tok.strip() for tok in pat_tokens.split(",") if tok.strip()]

if len(WORKSPACES) != len(PAT_TOKENS):
    logging.warning("The number of workspace URLs and PAT tokens do not match. Please check app settings.")

def list_clusters(workspace_url, pat_token):
    api_url = f"{workspace_url.rstrip('/')}/api/2.0/clusters/list"
    headers = {"Authorization": f"Bearer {pat_token}"}
    try:
        res = requests.get(api_url, headers=headers)
    except Exception as e:
        logging.error("HTTP request to %s failed: %s", workspace_url, e)
        return []
    if res.status_code != 200:
        logging.error("Non-200 response from %s: %s %s", workspace_url, res.status_code, res.text)
        return []
    return res.json().get("clusters", [])

def convert_epoch_to_datetime(ms):
    try:
        return datetime.utcfromtimestamp(ms / 1000).strftime('%Y-%m-%d %H:%M:%S')
    except:
        return ms

def flatten_cluster(cluster: dict, workspace_url: str) -> dict:
    flat = {
        "workspace_url": workspace_url,
        "cluster_name": cluster.get("cluster_name", ""),
        "autotermination_minutes": cluster.get("autotermination_minutes", ""),
        "is_single_node": cluster.get("is_single_node", ""),
        "num_workers": cluster.get("num_workers", ""),
        "state": cluster.get("state", ""),
        "start_time": convert_epoch_to_datetime(cluster.get("start_time", "")),
        "terminated_time": convert_epoch_to_datetime(cluster.get("terminated_time", "")),
        "last_activity_time": convert_epoch_to_datetime(cluster.get("last_activity_time", "")),
        "termination_reason.code": cluster.get("termination_reason", {}).get("code", ""),
        "termination_reason.parameters": cluster.get("termination_reason", {}).get("parameters", ""),
        "data_security_mode": cluster.get("data_security_mode", ""),
        "driver_healthy": cluster.get("driver_healthy", ""),
        "driver_node_type_id": cluster.get("driver_node_type_id", ""),
        "effective_spark_version": cluster.get("effective_spark_version", ""),
        "node_type_id": cluster.get("node_type_id", ""),
        "release_version": cluster.get("release_version", ""),
        "spark_version": cluster.get("spark_version", "")
    }
    return flat

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info("โœ… Databricks Cluster Filtered Report Triggered")

    if not WORKSPACES:
        return func.HttpResponse("No Databricks workspaces configured.", status_code=400)

    selected_headers = [
        "workspace_url", "cluster_name", "autotermination_minutes", "is_single_node",
        "num_workers", "state", "start_time", "terminated_time", "last_activity_time",
        "termination_reason.code", "termination_reason.parameters", "data_security_mode",
        "driver_healthy", "driver_node_type_id", "effective_spark_version", "node_type_id",
        "release_version", "spark_version"
    ]

    all_rows = []
    for i, workspace_url in enumerate(WORKSPACES):
        token = PAT_TOKENS[i] if i < len(PAT_TOKENS) else PAT_TOKENS[0]
        clusters = list_clusters(workspace_url, token)
        for cluster in clusters:
            flat = flatten_cluster(cluster, workspace_url)
            all_rows.append(flat)

    # Build CSV content
    csv_lines = [",".join(selected_headers)]
    for row in all_rows:
        csv_line = []
        for h in selected_headers:
            value = row.get(h, "")
            if isinstance(value, str):
                value = value.replace('"', '""')
                if ',' in value or '\n' in value:
                    value = f'"{value}"'
            csv_line.append(str(value))
        csv_lines.append(",".join(csv_line))

    csv_output = "\n".join(csv_lines)
    logging.info("โœ… Filtered cluster details prepared in CSV format.")

    return func.HttpResponse(csv_output, status_code=200, mimetype="text/csv")

๐Ÿ”น Step 6: Test the Function

  • Deploy your Function App from Azure Portal
  • Copy the function URL
  • Open a browser or Postman โ†’ Send a GET request
  • Youโ€™ll get a CSV output containing cluster details across all configured workspaces ๐ŸŽ‰

๐Ÿ”น Conclusion

With this setup, youโ€™ve automated the process of retrieving cluster configurations from multiple Azure Databricks workspaces. This makes it easy to:
โœ”๏ธ Export data for audits
โœ”๏ธ Track usage patterns
โœ”๏ธ Maintain compliance records

You can further enhance this by storing CSVs in Azure Blob Storage or sending outputs to Power BI for dashboards.

๐Ÿ“บ Donโ€™t forget to check out the full video walkthrough here:
๐Ÿ‘‰ https://www.youtube.com/watch?v=4Og3btWBNT0

Thank You,
Vivek Janakiraman

Disclaimer:
The views expressed on this blog are mine alone and do not reflect the views of my company or anyone else. All postings on this blog are provided โ€œAS ISโ€ with no warranties, and confers no rights.