Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Usare una tabella di controllo per guidare un
Potrebbe essere necessario acquisire dati da molte origini. Quando l'elenco cambia, impostarlo come hardcoding nella configurazione del processo significa modificare il codice e ridistribuirlo. Usare i metadati per risolvere questo problema archiviando l'elenco di origini in una tabella che viene letta e usata in fase di esecuzione. Aggiungere una sorgente come nuova riga e l'esecuzione successiva del processo la rileva automaticamente senza modifiche al processo stesso.
Questo tutorial illustra come creare un'attività usando questo approccio. Un'attività SQL legge la tabella di controllo e un'attività For each esegue l'iterazione su ogni riga in parallelo.
Come funziona
Il modello usa tre tipi di attività collegati insieme in sequenza:
| Attività | Tipo | Funzionamento |
|---|---|---|
read_markets |
SQL | Esegue una query su una tabella di configurazione e acquisisce il risultato come matrice di righe |
process_markets |
Per ogni | Esegue l'iterazione su {{tasks.read_markets.output.rows}}, eseguendo l'attività nidificata una volta per riga |
run_market_analysis_iteration |
Notebook o SQL (annidato all'interno di For each) | Viene eseguito una volta per riga, usando i valori di riga passati come parametri per eseguire la logica di business |
L'output dell'attività SQL, cioè un array JSON di oggetti riga, scorre direttamente nel For each campo Inputs dell'attività usando il riferimento al valore dinamico {{tasks.read_markets.output.rows}}. L'attività For each passa quindi ogni riga all'attività nidificata come parametri, disponibili come {{input.market}} e {{input.currency}}.
Prerequisiti
- Un'area di lavoro di Databricks con l'autorizzazione per creare processi e notebook
- Autorizzazione per la creazione di tabelle nel catalogo unity
- Schema del catalogo Unity in cui è possibile creare la tabella config (ad esempio,
config) - Un SQL Warehouse per eseguire le attività SQL
Passaggio 1: Creare la tabella di configurazione
La tabella config è il piano di controllo. Contiene l'elenco dei valori che il tuo lavoro elabora. Quando è necessario aggiungere o rimuovere attività, aggiornare questa tabella, non il compito.
Eseguire il codice SQL seguente per creare una markets tabella nello config schema:
CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
('NL', 'EUR'),
('UK', 'GBP'),
('US', 'USD')
AS t(market, currency);
È possibile usare un notebook di Databricks, l'editor SQL o qualsiasi attività SQL per eseguire questa istruzione. Dopo questo passaggio, config.markets contiene tre righe, una per mercato, ognuna con il relativo codice valuta.
Passaggio 2: Scrivere il codice di elaborazione
L'attività nidificata all'interno dell'attività For each viene eseguita una volta per riga. Scegliere un'attività notebook o un'attività SQL a seconda della logica aziendale.
Attività del notebook
Creare un nuovo notebook in un percorso ad esempio /Workspace/Users/<username>/process_market. Questo notebook viene eseguito ad ogni iterazione del compito For each, ricevendo ogni volta un diverso valore di mercato.
Aggiungere il codice seguente al notebook:
# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")
# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")
print(f"Processing market: {market} ({currency})")
# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)
Le dbutils.widgets.text() chiamate impostano i valori predefiniti in modo da poter eseguire il notebook direttamente nell'area di lavoro senza connetterlo a un processo. Quando il notebook viene eseguito come attività nidificata all'interno di un'attività For each , il processo esegue l'override delle impostazioni predefinite con i valori effettivi dei parametri per tale iterazione.
Chiamare dbutils.widgets.text() prima di dbutils.widgets.get(). Se get viene chiamato prima di text, il notebook genera un InputWidgetNotDefined errore quando viene eseguito al di fuori di un lavoro.
L'uso delle impostazioni predefinite consente di testare il notebook all'esterno di un processo, ma si tenga presente il compromesso: se l'attività For each non è configurata correttamente e non passa parametri, il notebook usa le impostazioni predefinite e ha esito positivo in modo silenzioso anziché fallire, rendendo così più difficile rilevare l'errore di configurazione.
Attività SQL
Le attività SQL supportano i parametri denominati usando la sintassi :param_name. Fare riferimento :market e :currency nella query ovunque si desideri usare i valori di iterazione:
SELECT *
FROM sales.transactions
WHERE market = :market
AND currency_code = :currency
Questa query viene configurata direttamente nell'editor di attività nel passaggio 5. L'attività For each passa i valori dell'iterazione corrente ai parametri nominati :market e :currency in fase di esecuzione. A differenza delle attività del notebook, i parametri denominati SQL non supportano i valori predefiniti, se non viene passato un parametro, la query non riesce con un errore di risoluzione dei parametri. Per convalidare o impostare i parametri predefiniti prima dell'esecuzione della query, usare invece un'attività notebook.
Passaggio 3: Crea l'attività
Nell'area di lavoro di Databricks fare clic su Flussi di lavoro nella barra laterale e quindi su Crea processo. Assegnare al processo un nome descrittivo, ad esempio Market Analysis.
Passaggio 4: Configurare l'attività di ricerca SQL
L'attività SQL esegue la query di configurazione e ne rende disponibile l'output per le attività downstream.
Nell'editor di processi fare clic su Aggiungi attività.
Impostare Nome attività su
read_markets.Impostare Tipo su SQL.
Nel campo SQL immettere la query seguente:
SELECT market, currency FROM config.marketsImpostare SQL Warehouse su un warehouse nell'area di lavoro.
Fare clic su Crea attività.
Quando questa attività viene eseguita, Databricks esegue la query e acquisisce il risultato come matrice JSON in tasks.read_markets.output.rows. L'output dell'attività SQL viene sempre restituito come matrice JSON. Non è necessaria alcuna configurazione aggiuntiva. Il formato generico di questo riferimento è tasks.<task-name>.output.rows, dove <task-name> corrisponde alla chiave dell'attività impostata nell'editor di processi. L'output è simile al seguente:
[
{ "market": "NL", "currency": "EUR" },
{ "market": "UK", "currency": "GBP" },
{ "market": "US", "currency": "USD" }
]
Passaggio 5: Configurare l'attività For Each
L'attività For each legge l'output SQL e avvia un'esecuzione di attività nidificata per ogni riga.
Fare clic su Aggiungi attività e impostare Depends on su
read_markets.Impostare Nome attività su
process_markets.Impostare Tipo su Per ogni.
Nel campo Input immettere:
{{tasks.read_markets.output.rows}}Questo fa riferimento alla matrice di righe acquisita dall'attività SQL.
Impostare Concorrenza su
2per consentire l'esecuzione in parallelo di due iterazioni. Aumentare questo valore per migliorare la produttività o se l'attività nidificata supporta un parallelismo superiore.Fare clic su Aggiungi un'attività da ciclare e configura l'attività annidata in base al tipo scelto nel passaggio 2:
Attività del notebook
Impostare Nome attività su
run_market_analysis_iteration.Imposta Tipo su Notebook.
Impostare Percorso sul percorso del notebook creato nel passaggio 2.
Fare clic su Parametri, quindi su Aggiungi per aggiungere ognuno dei parametri seguenti:
-
Chiave:
market, Valore:{{input.market}} -
Chiave:
currency, Valore:{{input.currency}}
Ogni
{{input.<key>}}riferimento viene risolto nel campo corrispondente dell'oggetto riga dell'iterazione corrente.-
Chiave:
Fare clic su Crea attività.
Attività SQL
Impostare Nome attività su
run_market_analysis_iteration.Impostare Tipo su SQL.
Nel campo SQL immettere la query con i parametri denominati, ad esempio:
SELECT * FROM sales.transactions WHERE market = :market AND currency_code = :currencyImpostare SQL Warehouse su un warehouse nell'area di lavoro.
Fare clic su Parametri, quindi su Aggiungi per aggiungere ognuno dei parametri seguenti:
-
Chiave:
market, Valore:{{input.market}} -
Chiave:
currency, Valore:{{input.currency}}
Ogni
{{input.<key>}}riferimento viene risolto nel campo corrispondente dell'oggetto riga dell'iterazione corrente.-
Chiave:
Fare clic su Crea attività.
Il DAG del tuo lavoro ora mostra il flusso di read_markets in process_markets, con l'attività nidificata visibile nel nodo For each.
Passaggio 6: Eseguire il processo e verificare
- Fare clic su Esegui ora per attivare il processo.
- Nella pagina di esecuzione dell'attività, fare clic sul
process_marketsnodo per espandere l'attivitàFor each. - La pagina di esecuzione del processo mostra una tabella di iterazioni, una riga per valore di mercato, ognuna con lo stato, l'ora di inizio e la durata.
- Fare clic su una riga di iterazione per aprire l'output dell'esecuzione dell'attività e verificare che abbia ricevuto il valore di mercato corretto.
Se un'iterazione specifica ha esito negativo, è possibile rieseguire solo l'iterazione dalla pagina di esecuzione del processo senza rieseguire l'intero processo.
Estendere il modello
Per aggiungere un nuovo mercato, inserire una riga nella tabella config:
INSERT INTO config.markets VALUES ('DE', 'EUR');
L'esecuzione del processo successivo include automaticamente Germania, senza modifiche alla configurazione del processo o modifiche al notebook necessarie.
Questo stesso modello funziona per qualsiasi caso d'uso in cui si vuole che i dati guidino l'iterazione.
- Elaborazione per cliente: una riga per ID cliente; il notebook applica trasformazioni specifiche del cliente o invía a destinazioni specifiche del cliente.
- Inserimento tabelle: una riga per ogni nome di tabella di origine; il notebook legge e inserisce ogni tabella.
- Elaborazione backfill: una riga per partizione di data; il notebook elabora nuovamente i dati cronologici per tale partizione.
- Esecuzione basata su flag di funzionalità: una riga per funzionalità o esperimento abilitato; il notebook attiva la logica corrispondente.
Per rimuovere un elemento dall'elaborazione, eliminare la riga o aggiungere una colonna flag active e filtrare con la query SQL:
SELECT market, currency FROM config.markets WHERE active = TRUE
Passaggi successivi
-
Usare un'attività
For eachper eseguire un'altra attività in un ciclo : riferimento completo per la configurazione delleFor eachattività, inclusi i tipi di parametri e le opzioni di concorrenza -
Usare una tabella di ricerca per matrici di parametri di grandi dimensioni in un'attività
For each: come gestire matrici di parametri di grandi dimensioni che superano il limite di 48 KB del valore dell'attività - Accesso valori dei parametri da un'attività - Tutti i metodi per accedere ai valori dei parametri nei notebook, negli script Python e nelle attività SQL