Freigeben über


Lernprogramm: Erstellen einer ETL-Pipeline mit Apache Spark auf der Databricks-Plattform

In diesem Lernprogramm erfahren Sie, wie Sie Ihre erste ETL-Pipeline (Extrahieren, Transformieren und Laden) für die Daten-Orchestrierung mit Apache Spark entwickeln und bereitstellen. Obwohl in diesem Lernprogramm databricks all-purpose compute verwendet werden, können Sie auch serverlose Berechnung verwenden, wenn sie für Ihren Arbeitsbereich aktiviert ist.

Sie können auch Lakeflow Spark Declarative Pipelines verwenden, um ETL-Pipelines zu erstellen. Databricks Lakeflow Spark Declarative Pipelines reduziert die Komplexität der Erstellung, Bereitstellung und Wartung von ETL-Produktionspipelines. Siehe Lernprogramm: Erstellen einer ETL-Pipeline mit Lakeflow Spark Declarative Pipelines.

Am Ende dieses Artikels wissen Sie, wie Sie:

  1. Starten Sie eine Databricks Allzweck-Compute-Ressource.
  2. Erstellen Sie ein Databricks-Notizbuch.
  3. Konfigurieren Sie die inkrementelle Datenaufnahme in Delta Lake mithilfe von Auto Loader.
  4. Verarbeiten und Interagieren mit Daten.
  5. Planen Sie ein Notizbuch als Databricks-Auftrag.

In diesem Lernprogramm werden interaktive Notizbücher verwendet, um allgemeine ETL-Aufgaben in Python oder Scala auszuführen.

Sie können auch den Databricks Terraform-Anbieter verwenden, um die Ressourcen dieses Artikels zu erstellen. Weitere Informationen finden Sie unter Erstellen von Clustern, Notebooks und Aufträgen mit Terraform.

Anforderungen

Hinweis

Wenn Sie nicht über Rechensteuerungsprivilegien verfügen, können Sie dennoch die meisten der nachstehenden Schritte ausführen, solange Sie Zugriff auf eine Computersequelle haben.

Schritt 1: Erstellen einer Computeressource

Erstellen Sie eine Rechenressource zum Ausführen von Befehlen, um explorative Datenanalysen und Data Engineering durchzuführen.

  1. Klicken Sie auf der Seitenleiste auf ComputesymbolCompute.
  2. Klicken Sie auf der Compute-Seite auf Compute erstellen.
  3. Geben Sie einen eindeutigen Namen für die Computeressource an, lassen Sie die verbleibenden Werte im Standardzustand, und klicken Sie auf "Berechnen erstellen".

Weitere Informationen zur Berechnung von Databricks finden Sie unter Compute.

Schritt 2: Erstellen eines Databricks-Notebooks

Wenn Sie ein Notebook in Ihrem Arbeitsbereich erstellen möchten, wählen Sie in der Randleiste Neues SymbolNeu aus, und wählen Sie dann Notebook aus. Im Arbeitsbereich wird ein leeres Notebook geöffnet.

Weitere Informationen zum Erstellen und Verwalten von Notebooks finden Sie unter Verwalten von Notebooks.

Schritt 3: Konfigurieren des Autoloaders zum Erfassen von Daten in Delta Lake

Databricks empfiehlt die Verwendung des Autoloaders für die inkrementelle Datenerfassung. Der Autoloader erkennt und verarbeitet automatisch neue Dateien, sobald sie im Cloudobjektspeicher empfangen werden.

Databricks empfiehlt das Speichern von Daten mit Delta Lake. Delta Lake ist eine Open Source Speicherebene, die ACID-Transaktionen bereitstellt und das Data Lakehouse ermöglicht. Delta Lake ist das Standardformat für Tabellen, die in Databricks erstellt wurden.

Wenn Sie Autoloader zum Erfassen von Daten in einer Delta Lake-Tabelle konfigurieren möchten, kopieren Sie den folgenden Code in eine leere Zelle in Ihrem Notebook:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Hinweis

Die in diesem Code definierten Variablen sollten Ihnen eine sichere Ausführung ermöglichen, ohne dass es zu Konflikten mit bestehenden Arbeitsbereichsressourcen oder anderen Benutzern kommt. Eingeschränkte Netzwerk- oder Speicherberechtigungen lösen beim Ausführen dieses Codes Fehler aus. Wenden Sie sich an Ihren Arbeitsbereichsadministrator, um diese Einschränkungen zu behandeln.

Um mehr über Auto Loader zu erfahren, siehe Was ist Auto Loader?.

Schritt 4: Verarbeiten von und Interagieren mit Daten

Notebooks führen Logik Zelle für Zelle aus. Führen Sie die folgenden Schritte aus, um die Logik in Ihrer Zelle auszuführen:

  1. Wenn Sie die im vorherigen Schritt abgeschlossene Zelle ausführen möchten, wählen Sie die Zelle aus, und drücken Sie UMSCHALT+EINGABE.

  2. Um die soeben erstellte Tabelle abzufragen, kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Zelle ein, und drücken Sie dann UMSCHALT+EINGABETASTE , um die Zelle auszuführen.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. Wenn Sie eine Vorschau der Daten im Datenrahmen anzeigen möchten, kopieren Sie den folgenden Code in eine leere Zelle, und drücken Sie dann UMSCHALT+EINGABE, um die Zelle auszuführen.

    Python

    display(df)
    

    Scala

    display(df)
    

Weitere Informationen zu interaktiven Optionen zum Visualisieren von Daten finden Sie unter Visualisierungen in Databricks-Notizbüchern und SQL-Editor.

Schritt 5: Planen eines Auftrags

Sie können Databricks-Notebooks als Produktionsskripts ausführen, indem Sie sie als Aufgabe in einem Databricks-Auftrag hinzufügen. In diesem Schritt erstellen Sie einen neuen Auftrag, den Sie manuell auslösen können.

So planen Sie Ihr digitales Notizbuch als Aufgabe

  1. Klicken Sie auf der rechten Seite der Kopfleiste auf Planen.
  2. Geben Sie einen eindeutigen Namen unter Auftragsname ein.
  3. Klicken Sie auf Manuell.
  4. Wählen Sie in der Dropdownliste "Compute " die Computeressource aus, die Sie in Schritt 1 erstellt haben.
  5. Klicken Sie auf Erstellen.
  6. Klicken Sie im angezeigten Fenster auf Jetzt ausführen.
  7. Klicken Sie auf das External Link-Symbol neben dem Zeitstempel der Letzten Ausführung, um die Ergebnisse der Auftragsausführung anzuzeigen.

Weitere Informationen zu Aufträgen finden Sie unter Was sind Aufträge?.

Zusätzliche Integrationen

Erfahren Sie mehr über Integrationen und Tools für das Daten engineering mit Azure Databricks: