Condividi tramite


Attività di Flusso di dati in Azure Data Factory e Azure Synapse Analytics

APPLICABILE A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Data Factory in Microsoft Fabric è la nuova generazione di Azure Data Factory, con un'architettura più semplice, un'intelligenza artificiale predefinita e nuove funzionalità. Se non si ha familiarità con l'integrazione dei dati, iniziare con Fabric Data Factory. I carichi di lavoro di Azure Data Factory esistenti possono eseguire l'aggiornamento a Fabric per accedere a nuove funzionalità tra data science, analisi in tempo reale e creazione di report.

Usare l'attività Flusso di dati per trasformare e spostare i dati tramite flussi di dati di mapping. Se non si ha familiarità con i flussi di dati, vedere panoramica di Mapping Flusso di dati

Creare un'attività Flusso di dati con l'interfaccia utente

Per usare un'attività Flusso di dati in una pipeline, completare la procedura seguente:

  1. Cercare Flusso di dati nel riquadro Attività della pipeline, quindi trascinare un'attività Flusso di dati nel canvas della pipeline.

  2. Selezionare la nuova attività Flusso di dati nell'area di disegno se non è già selezionata e la relativa scheda Settings per modificarne i dettagli.

     Mostra l'interfaccia utente per un'attività Flusso di dati.

  3. La chiave di checkpoint viene usata per impostare il checkpoint quando si usa il flusso di dati per Change Data Capture. È possibile sovrascriverlo. Le attività Flusso di dati usano un valore GUID come chiave di checkpoint anziché "nome pipeline e nome attività" per tenere sempre traccia dello stato della funzione Change Data Capture del cliente, anche se sono presenti azioni di ridenominazione. Tutte le attività del flusso di dati esistenti usano la chiave del modello precedente per garantire la compatibilità con le versioni precedenti. L'opzione per la chiave di checkpoint dopo la pubblicazione di una nuova attività Flusso di dati con la risorsa del flusso di dati abilitata per Change Data Capture è illustrata di seguito.

    Mostra l'interfaccia utente per un'attività di flusso di dati con chiave di checkpoint.

  4. Selezionare un flusso di dati esistente o crearne uno nuovo usando il pulsante Nuovo. Selezionare altre opzioni necessarie per completare la configurazione.

Sintassi

{
    "name": "MyDataFlowActivity",
    "type": "ExecuteDataFlow",
    "typeProperties": {
      "dataflow": {
         "referenceName": "MyDataFlow",
         "type": "DataFlowReference"
      },
      "compute": {
         "coreCount": 8,
         "computeType": "General"
      },
      "traceLevel": "Fine",
      "runConcurrently": true,
      "continueOnError": true,      
      "staging": {
          "linkedService": {
              "referenceName": "MyStagingLinkedService",
              "type": "LinkedServiceReference"
          },
          "folderPath": "my-container/my-folder"
      },
      "integrationRuntime": {
          "referenceName": "MyDataFlowIntegrationRuntime",
          "type": "IntegrationRuntimeReference"
      }
}

Proprietà del tipo

Proprietà Descrizione Valori consentiti Richiesto
flusso di dati Riferimento al Flusso di dati in esecuzione DataFlowReference
integrationRuntime L'ambiente di calcolo in cui viene eseguito il flusso di dati. Se non specificato, viene usato il runtime di integrazione Azure autoresolve. IntegrationRuntimeReference No
compute.coreCount Numero di core usati nel cluster Spark. Può essere specificato solo se viene usato il runtime di integrazione Azure autoresolve 8, 16, 32, 48, 80, 144, 272 No
compute.computeType Tipo di calcolo usato nel cluster Spark. Può essere specificato solo se viene usato il runtime di integrazione Azure autoresolve "General" No
staging.linkedService Se si usa un'origine o un sink di Azure Synapse Analytics, specificare l'account di archiviazione usato per la gestione temporanea di PolyBase.

Se il tuo Archiviazione di Azure è configurato con un endpoint del servizio VNet, devi utilizzare l'autenticazione dell'identità gestita con l'opzione "consenti il servizio Microsoft attendibile" abilitata nell'account di archiviazione. Fare riferimento a Impatto dell'uso degli endpoint del servizio VNet con Archiviazione di Azure. Informazioni anche sulle configurazioni necessarie per Azure BLOB e Azure Data Lake Storage Gen2 rispettivamente.
LinkedServiceReference Solo se il flusso di dati legge o scrive in Azure Synapse Analytics
staging.folderPath Se si usa un'origine o un sink di Azure Synapse Analytics, il percorso della cartella è nell'account di archiviazione BLOB usato per l'archiviazione temporanea di PolyBase String Solo se il flusso di dati legge o scrive in Azure Synapse Analytics
traceLevel Impostare il livello di registrazione dell'esecuzione dell'attività del flusso di dati Fine, grossolana, nessuna No

Execute Flusso di dati

Ridimensionare dinamicamente il calcolo del flusso di dati in fase di esecuzione

Le proprietà Core Count e Compute Type possono essere impostate in modo dinamico per adattarsi alle dimensioni dei dati di origine in ingresso in fase di esecuzione. Usare attività della pipeline come Ricerca o Recupera metadati per trovare le dimensioni dei dati del set di dati di origine. Usare quindi "Aggiungi contenuto dinamico" nelle proprietà dell'attività di flusso di dati. È possibile scegliere dimensioni di calcolo piccole, medie o grandi. Facoltativamente, selezionare "Personalizzato" e configurare manualmente i tipi di calcolo e il numero di core.

Dynamic Flusso di dati

Ecco una breve esercitazione video che illustra questa tecnica

Runtime di integrazione del flusso di dati

Scegliere quale Integration Runtime usare per l'esecuzione dell'attività Flusso di dati. Per impostazione predefinita, il servizio usa il runtime di integrazione automatica Azure con quattro core di lavoro. Questo IR ha un tipo di calcolo generico e funziona nella stessa regione della tua istanza del servizio. Per le pipeline operative, è consigliabile creare runtime di integrazione personalizzati Azure che definiscono aree, tipo di calcolo, numero di core e TTL per l'esecuzione dell'attività del flusso di dati.

Un tipo di elaborazione minimo di Uso Generale con una configurazione di 8+8 (totale di 16 v-core) e un Time to Live (TTL) di 10 minuti è la raccomandazione minima per la maggior parte dei carichi di lavoro di produzione. Impostando una durata breve, Azure IR può gestire un cluster attivo senza che siano necessari i diversi minuti di avvio di un cluster inattivo. Per altre informazioni, vedere Azure integration runtime.

Azure Integration Runtime

Importante

La selezione di Integration Runtime nell'attività di Flusso di dati si applica solo alle esecuzioni attivate della pipeline. Il debug della pipeline con i flussi di dati viene eseguito nel cluster specificato nella sessione di debug.

PolyBase

Se si usa Azure Synapse Analytics come sink o come origine, è necessario scegliere un percorso di gestione temporanea per il caricamento in patch PolyBase. PolyBase consente il caricamento a blocchi invece di caricare i dati riga per riga. PolyBase riduce drasticamente il tempo di caricamento in Azure Synapse Analytics.

Chiave di checkpoint

Quando si usa l'opzione Change Capture per le origini del flusso di dati, Azure Data Factory mantiene e gestisce automaticamente il checkpoint. La chiave del checkpoint predefinita è un hash del nome del flusso di dati e del nome della pipeline. Se si usa un modello dinamico per le tabelle o le cartelle di origine, è possibile eseguire l'override di questo hash e impostare qui il proprio valore della chiave di checkpoint.

Livello di registrazione

Se non è necessario che ogni esecuzione delle attività del flusso di dati nella pipeline registri dettagliatamente tutti i log di telemetria, è possibile impostare opzionalmente il livello di registrazione su "Basic" o "Nessuno". Quando si eseguono i flussi di dati in modalità dettagliata (impostazione predefinita), si richiede al servizio di registrare completamente l'attività a ogni singolo livello di partizione durante la trasformazione dei dati. Può trattarsi di un'operazione costosa, quindi l'abilitazione della modalità dettagliata solo quando si risolvono i problemi migliora il flusso di dati complessivo e le prestazioni della pipeline. La modalità "Basic" registra solo le durate della trasformazione mentre "Nessuno" fornisce solo un riepilogo delle durate.

Livello di registrazione

Proprietà del ricettore

La funzionalità di raggruppamento nei flussi di dati consente di impostare l'ordine di esecuzione dei sink e di raggruppare i sink usando lo stesso numero di gruppo. Per gestire i gruppi, è possibile richiedere al servizio di eseguire i sink nello stesso gruppo, per eseguirli in parallelo. È possibile anche impostare il gruppo di sink affinché l'esecuzione continui anche dopo che in un sink si verifica un errore.

Il comportamento predefinito dei sink del flusso di dati consiste nell'eseguire ogni sink in modo sequenziale e nel terminare il flusso di dati quando si verifica un errore nel sink. Inoltre, per impostazione predefinita, tutti i sink vengono impostati sullo stesso gruppo, a meno che non si entrino nelle proprietà del flusso di dati e si impostino priorità diverse per i sink.

Proprietà sink

Solo prima riga

Questa opzione è disponibile solo per i flussi di dati per cui i sink della cache sono abilitati per "Output all'attività". L'output del flusso di dati inserito direttamente nella pipeline è limitato a 2 MB. L'impostazione "Solo prima riga" consente di limitare l'output del flusso di dati quando si inserisce l'output dell'attività del flusso di dati direttamente nella pipeline.

Parametrizzazione di Flusso di dati

Set di dati con parametri

Se il flusso di dati usa set di dati con parametri, impostare i valori dei parametri nella scheda Impostazioni .

Esegui Parametri del Flusso di Dati

Flussi di dati con parametri

Se il flusso di dati è con parametri, impostare i valori dinamici dei parametri del flusso di dati nella scheda Parametri . È possibile usare il linguaggio delle espressioni della pipeline o il linguaggio delle espressioni del flusso di dati per assegnare valori di parametro dinamici o letterali. Per altre informazioni, vedere parametri Flusso di dati.

Proprietà di calcolo con parametri.

È possibile parametrizzare il numero di core o il tipo di calcolo se si usa il runtime di integrazione automatica Azure e si specificano i valori per compute.coreCount e compute.computeType.

Esempio di parametro del flusso di dati

Debug per pipeline dell'attività Flusso di dati

Per eseguire una pipeline di debug con un'attività di Flusso di dati, è necessario attivare la modalità di debug tramite il dispositivo di scorrimento Flusso di dati Debug sulla barra superiore. La modalità di debug consente di eseguire il flusso di dati su un cluster Spark attivo. Per altre informazioni, vedere Modalità di debug.

Screenshot che mostra dove è il pulsante Debug

La pipeline di debug viene eseguita nel cluster di debug attivo, non nell'ambiente di runtime di integrazione specificato nelle impostazioni dell'attività Flusso di dati. È possibile scegliere l'ambiente di calcolo di debug quando si avvia la modalità di debug.

Monitoraggio dell'attività Flusso di dati

L'attività Flusso di dati offre un'esperienza di monitoraggio speciale in cui è possibile visualizzare le informazioni di partizionamento, fase e derivazione dei dati. Aprire il riquadro di monitoraggio tramite l'icona degli occhiali in Azioni. Per altre informazioni, vedere Monitoraggio delle Flusso di dati.

Usare i risultati dell'attività Flusso di dati in un'attività successiva

L'attività Flusso di dati restituisce metriche relative al numero di righe scritte in ogni sink e di righe lette da ogni origine. Questi risultati vengono restituiti nella output sezione del risultato dell'esecuzione dell'attività. Le metriche restituite sono nel formato del codice JSON seguente.

{
    "runStatus": {
        "metrics": {
            "<your sink name1>": {
                "rowsWritten": <number of rows written>,
                "sinkProcessingTime": <sink processing time in ms>,
                "sources": {
                    "<your source name1>": {
                        "rowsRead": <number of rows read>
                    },
                    "<your source name2>": {
                        "rowsRead": <number of rows read>
                    },
                    ...
                }
            },
            "<your sink name2>": {
                ...
            },
            ...
        }
    }
}

Ad esempio, per ottenere il numero di righe scritte in un sink denominato "sink1" in un'attività denominata "dataflowActivity", usare @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten.

Per ottenere il numero di righe lette da un'origine denominata "source1" usata in tale sink, usare @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead.

Nota

Se il sink non contiene alcuna riga scritta, non verrà visualizzato nelle metriche. L'esistenza può essere verificata usando la contains funzione . Ad esempio, contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1') controlla se eventuali righe sono state scritte in sink1.

Consulta le attività del flusso di controllo supportate: