Freigeben über


Verwende eine Steuertabelle, um einen For each Auftrag zu steuern

Möglicherweise müssen Sie aus vielen Quellen aufnehmen. Wenn sich diese Liste ändert, bedeutet die Hartcodierung in der Auftragskonfiguration das Ändern von Code und die erneute Bereitstellung. Verwenden Sie Metadaten , um dies zu beheben, indem Sie die Liste der Quellen in einer Tabelle speichern, die zur Laufzeit gelesen und verwendet wird. Fügen Sie eine Quelle als neue Zeile hinzu und der nächste Auftrag übernimmt sie, ohne den Auftrag selbst zu ändern.

In diesem Lernprogramm erfahren Sie, wie Sie mit diesem Ansatz einen Auftrag erstellen. Eine SQL-Aufgabe liest die Steuerelementtabelle und eine For each Aufgabe durchläuft jede Zeile parallel.

So funktioniert es

Das Muster verwendet drei Aufgabentypen, die aufeinander verkabelt sind:

Aufgabe Typ Was es tut
read_markets SQL Fragt eine Konfigurationstabelle ab und erfasst das Ergebnis als Zeilenarray.
process_markets Für jede Durchläuft {{tasks.read_markets.output.rows}}, wobei die geschachtelte Aufgabe pro Zeile einmal ausgeführt wird
run_market_analysis_iteration Notebook oder SQL (verschachtelt in einer Schleife „Für jedes“) Wird einmal pro Zeile ausgeführt, wobei Zeilenwerte verwendet werden, die als Parameter übergeben werden, um Ihre Geschäftslogik auszuführen.

Die Ausgabe des SQL-Tasks – ein JSON-Array von Zeilenobjekten – fließt direkt mithilfe des dynamischen Wertverweises For each in das des {{tasks.read_markets.output.rows}}-Tasks. Der For each Vorgang übergibt dann jede Zeile als Parameter an den geschachtelten Vorgang, verfügbar als {{input.market}} und {{input.currency}}.

Voraussetzungen

  • Ein Databricks-Arbeitsbereich mit der Berechtigung zum Erstellen von Aufträgen und Notizbüchern
  • Berechtigung zum Erstellen von Tabellen im Unity-Katalog
  • Ein Unity-Katalogschema, in dem Sie die Konfigurationstabelle erstellen können (z. B config. )
  • Ein SQL-Warehouse zum Ausführen der SQL-Aufgaben

Schritt 1: Erstellen der Konfigurationstabelle

Die Konfigurationstabelle ist Ihre Steuerebene. Sie enthält die Liste der Werte, die Ihre Arbeitsprozesse verarbeiten. Wenn Sie Arbeit hinzufügen oder entfernen müssen, aktualisieren Sie diese Tabelle – nicht den Auftrag.

Führen Sie die folgende SQL-Datei aus, um eine markets Tabelle in Ihrem config Schema zu erstellen:

CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
  ('NL', 'EUR'),
  ('UK', 'GBP'),
  ('US', 'USD')
AS t(market, currency);

Sie können ein Databricks-Notizbuch, den SQL-Editor oder eine beliebige SQL-Aufgabe verwenden, um diese Anweisung auszuführen. Nach diesem Schritt enthält config.markets drei Zeilen, eine für jeden Markt, jeweils mit dessen Währungscode.

Schritt 2: Schreiben des Verarbeitungscodes

Die verschachtelte Aufgabe innerhalb der For each Aufgabe wird einmal pro Zeile ausgeführt. Wählen Sie je nach Geschäftslogik eine Notizbuchaufgabe oder eine SQL-Aufgabe aus.

Notebook-Aufgabe

Erstellen Sie ein neues Notizbuch unter einem Pfad wie /Workspace/Users/<username>/process_market. Dieses Notizbuch wird einmal pro Iteration der For each Aufgabe ausgeführt und erhält jedes Mal einen anderen Marktwert.

Fügen Sie dem Notizbuch den folgenden Code hinzu:

# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")

# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")

print(f"Processing market: {market} ({currency})")

# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
    f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)

Die dbutils.widgets.text() Aufrufe legen Standardwerte fest, damit Sie das Notizbuch direkt in Ihrem Arbeitsbereich ausführen können, ohne es mit einem Auftrag zu verbinden. Wenn das Notizbuch als geschachtelte Aufgabe innerhalb einer For each Aufgabe ausgeführt wird, überschreibt der Auftrag die Standardwerte mit den tatsächlichen Parameterwerten für diese Iteration.

Rufen Sie dbutils.widgets.text() vor dbutils.widgets.get() an. Wenn get vor text aufgerufen wird, löst das Notizbuch einen Fehler der Art InputWidgetNotDefined aus, wenn Sie es außerhalb eines Auftrags ausführen.

Mithilfe von Standardeinstellungen können Sie das Notizbuch außerhalb eines Auftrags testen, aber beachten Sie den Kompromiss: Wenn die For each Aufgabe falsch konfiguriert ist und keine Parameter übergibt, verwendet das Notizbuch die Standardwerte und bleibt erfolgreich im Hintergrund, anstatt fehlzuschlagen – was es erschweren kann, die Fehlkonfiguration zu erkennen.

SQL-Aufgabe

SQL-Aufgaben unterstützen benannte Parameter mithilfe der :param_name Syntax. Verweisen Sie :market und :currency in Ihrer Abfrage überall dort, wo Sie die Iterationswerte verwenden möchten:

SELECT *
FROM sales.transactions
WHERE market = :market
  AND currency_code = :currency

Sie konfigurieren diese Abfrage direkt im Aufgaben-Editor in Schritt 5. Die For each Aufgabe übergibt die Werte der aktuellen Iteration an die zur Laufzeit benannten :market und :currency Parameter. Im Gegensatz zu Notizbuchaufgaben unterstützt SQL benannte Parameter keine Standardwerte – wenn ein Parameter nicht übergeben wird, schlägt die Abfrage mit einem Fehler bei der Parameterauflösung fehl. Verwenden Sie stattdessen eine Notizbuch-Task, um Parameter zu validieren oder Standardwerte festzulegen, bevor die Abfrage ausgeführt wird.

Schritt 3: Erstellen des Auftrags

Klicken Sie in Ihrem Databricks-Arbeitsbereich auf "Workflows " in der Randleiste und dann auf " Auftrag erstellen". Geben Sie dem Auftrag einen beschreibenden Namen, z. B Market Analysis. .

Schritt 4: Konfigurieren der SQL-Nachschlageaufgabe

Die SQL-Aufgabe führt Ihre Konfigurationsabfrage aus und stellt die Ausgabe für nachgeschaltete Aufgaben zur Verfügung.

  1. Klicken Sie im Auftrags-Editor auf "Aufgabe hinzufügen".

  2. Legen Sie den Aufgabennamen auf read_markets.

  3. Setzen Sie Type auf SQL.

  4. Geben Sie im SQL-Feld die folgende Abfrage ein:

    SELECT market, currency FROM config.markets
    
  5. Stellen Sie das SQL-Lager in Ihrem Arbeitsbereich ein.

  6. Klicken Sie auf Aufgabe erstellen.

Wenn diese Aufgabe ausgeführt wird, führt Databricks die Abfrage aus und erfasst das Ergebnis als JSON-Array in tasks.read_markets.output.rows. Die SQL-Aufgabenausgabe wird immer als JSON-Array zurückgegeben – es ist keine zusätzliche Konfiguration erforderlich. Die generische Form dieses Verweises ist tasks.<task-name>.output.rows, wo <task-name> mit dem Aufgabenschlüssel übereinstimmt, den Sie im Auftrags-Editor festgelegt haben. Die Ausgabe sieht wie folgt aus:

[
  { "market": "NL", "currency": "EUR" },
  { "market": "UK", "currency": "GBP" },
  { "market": "US", "currency": "USD" }
]

Schritt 5: Konfigurieren für jede Aufgabe

Die For each Aufgabe liest die SQL-Ausgabe und startet eine geschachtelte Aufgabe pro Zeile.

  1. Klicken Sie auf "Aufgabe hinzufügen", und legen Sie "Abhängig von" fest.read_markets

  2. Legen Sie den Aufgabennamen auf process_markets.

  3. Legen Sie Typ auf Jeweils fest.

  4. Geben Sie im Feld "Eingaben " Folgendes ein:

    {{tasks.read_markets.output.rows}}
    

    Dadurch wird auf das Zeilenarray verwiesen, das von der SQL-Aufgabe erfasst wird.

  5. Legen Sie "Nebenläufigkeit" fest, um 2 zwei Iterationen parallel auszuführen. Erhöhen Sie diesen Wert, um den Durchsatz zu verbessern oder wenn Ihre geschachtelte Aufgabe eine höhere Parallelität unterstützt.

  6. Klicken Sie auf "Aufgabe hinzufügen", um eine Schleife zu durchlaufen und die geschachtelte Aufgabe basierend auf dem typ zu konfigurieren, den Sie in Schritt 2 ausgewählt haben:

Notebook-Aufgabe

  1. Legen Sie den Aufgabennamen auf run_market_analysis_iteration.

  2. Legen Sie "Typ " auf " Notizbuch" fest.

  3. Legen Sie "Pfad " auf den Pfad des Notizbuchs fest, das Sie in Schritt 2 erstellt haben.

  4. Klicken Sie auf "Parameter", und klicken Sie dann auf "Hinzufügen" , um die folgenden Parameter hinzuzufügen:

    • Schlüssel: , market: {{input.market}}
    • Schlüssel: , currency: {{input.currency}}

    Jeder {{input.<key>}} Verweis wird aus dem Zeilenobjekt der aktuellen Iteration in das entsprechende Feld aufgelöst.

  5. Klicken Sie auf Aufgabe erstellen.

SQL-Aufgabe

  1. Legen Sie den Aufgabennamen auf run_market_analysis_iteration.

  2. Setzen Sie Type auf SQL.

  3. Geben Sie im SQL-Feld Ihre Abfrage mit den benannten Parametern ein, z. B.:

    SELECT *
    FROM sales.transactions
    WHERE market = :market
      AND currency_code = :currency
    
  4. Stellen Sie das SQL-Lager in Ihrem Arbeitsbereich ein.

  5. Klicken Sie auf "Parameter", und klicken Sie dann auf "Hinzufügen" , um die folgenden Parameter hinzuzufügen:

    • Schlüssel: , market: {{input.market}}
    • Schlüssel: , currency: {{input.currency}}

    Jeder {{input.<key>}} Verweis wird aus dem Zeilenobjekt der aktuellen Iteration in das entsprechende Feld aufgelöst.

  6. Klicken Sie auf Aufgabe erstellen.

Ihr Job-DAG zeigt nun read_markets in den Fluss zu process_markets, wobei die geschachtelte Aufgabe innerhalb des For each-Knotens sichtbar ist.

Schritt 6: Ausführen des Auftrags und Überprüfen

  1. Klicken Sie auf "Jetzt ausführen" , um den Auftrag auszulösen.
  2. Klicken Sie auf der Auftragsausführungsseite auf den process_markets Knoten, um die For each Aufgabe zu erweitern.
  3. Auf der Seite "Auftragsausführung" wird eine Tabelle mit Iterationen angezeigt – eine Zeile pro Marktwert – jeweils mit dem Status, der Startzeit und der Dauer.
  4. Klicken Sie auf eine beliebige Iterationszeile, um das Ergebnis der Aufgabenausführung zu öffnen und zu bestätigen, dass das Ergebnis den richtigen Marktwert erhalten hat.

Wenn eine bestimmte Iteration fehlschlägt, können Sie nur diese Iteration von der Auftragsausführungsseite erneut ausführen, ohne den gesamten Auftrag erneut auszuführen.

Erweitern des Musters

Um einen neuen Markt hinzuzufügen, fügen Sie eine Zeile in die Konfigurationstabelle ein:

INSERT INTO config.markets VALUES ('DE', 'EUR');

Der nächste Auftrag wird automatisch in Deutschland ausgeführt, ohne dass Auftragskonfigurationsänderungen oder Notizbuchbearbeitungen erforderlich sind.

Dieses Muster funktioniert für jeden Anwendungsfall, in dem Daten die Iteration fördern sollen:

  • Kundenspezifische Verarbeitung: Eine Zeile pro Kunden-ID; Das Notizbuch wendet kundenspezifische Transformationen an oder liefert sie an kundenspezifische Ziele.
  • Tabellenaufnahme: Eine Zeile pro Quelltabellenname; das Notizbuch liest und nimmt jede Tabelle ein.
  • Backfill-Verarbeitung: Eine Zeile pro Datumspartition; das Notizbuch verarbeitet historische Daten für diese Partition erneut.
  • Feature-Flag-gesteuerte Ausführung: Eine Zeile pro aktiviertes Feature oder Experiment; das Notebook aktiviert die entsprechende Logik.

Wenn Sie ein Element aus der Verarbeitung entfernen möchten, löschen Sie die Zeile, oder fügen Sie in der SQL-Abfrage eine active Kennzeichnungsspalte und einen Filter hinzu:

SELECT market, currency FROM config.markets WHERE active = TRUE

Nächste Schritte