Condividi tramite


Avvio rapido: Creare una data factory e una pipeline con Python

APPLICABILE A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Data Factory in Microsoft Fabric è la nuova generazione di Azure Data Factory, con un'architettura più semplice, un'intelligenza artificiale predefinita e nuove funzionalità. Se non si ha familiarità con l'integrazione dei dati, iniziare con Fabric Data Factory. I carichi di lavoro di Azure Data Factory esistenti possono eseguire l'aggiornamento a Fabric per accedere a nuove funzionalità tra data science, analisi in tempo reale e creazione di report.

In questo argomento di avvio rapido viene creata una data factory con Python. La pipeline in questa data factory copia i dati da una cartella a un'altra in archiviazione BLOB di Azure.

Azure Data Factory è un servizio di integrazione dei dati basato sul cloud che consente di creare flussi di lavoro basati sui dati per orchestrare e automatizzare lo spostamento e la trasformazione dei dati. Usando Azure Data Factory, è possibile creare e pianificare flussi di lavoro basati sui dati, denominati pipeline.

Le pipeline possono acquisire dati da archivi di dati disparati. Le pipeline elaborano o trasformano i dati usando servizi di calcolo come Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics e Azure Machine Learning. Le pipeline pubblicano i dati di output in archivi dati, ad esempio Azure Synapse Analytics per applicazioni di Business Intelligence (BI).

Prerequisiti

  • Un account Azure con una sottoscrizione attiva. Crearne una gratuitamente.

  • Python 3.6+.

  • An Archiviazione di Azure account.

  • Azure Storage Explorer (facoltativo).

  • Un'applicazione in Microsoft Entra ID. Creare l'applicazione seguendo i passaggi descritti in questo collegamento, usando l'opzione di autenticazione 2 (segreto dell'applicazione) e assegnando l'applicazione al ruolo Collaboratore seguendo le istruzioni riportate nello stesso articolo. Prendere nota dei valori seguenti, come illustrato nell'articolo da usare nei passaggi successivi: ID applicazione (client), valore del segreto client e ID tenant.

Creare e caricare un file di input

  1. Avviare il Blocco note. Copiare il testo seguente e salvarlo come file input.txt sul disco.

    John|Doe
    Jane|Doe
    
  2. Usare strumenti come Azure Storage Explorer per creare la cartella adfv2tutorial e input nel contenitore. Caricare quindi il file input.txt nella cartella input.

Installare il pacchetto Python

  1. Aprire un terminale o un prompt dei comandi con privilegi di amministratore. 

  2. Installare prima di tutto il pacchetto di Python per le risorse di gestione Azure:

    pip install azure-mgmt-resource
    
  3. Per installare il pacchetto Python per Data Factory, eseguire il comando seguente:

    pip install azure-mgmt-datafactory
    

    L'SDK Python per Data Factory supporta Python 2.7 e 3.6 e versioni successive.

  4. Per installare il pacchetto di Python per l'autenticazione Azure Identity, eseguire il comando seguente:

    pip install azure-identity
    

    Nota

    Il pacchetto "azure-identity" potrebbe avere conflitti con "azure-cli" per alcune dipendenze comuni. Se si verifica un problema di autenticazione, rimuovere "azure-cli" e le relative dipendenze oppure usare un computer pulito senza installare il pacchetto "azure-cli" per renderlo funzionante. Per i cloud sovrani, è necessario usare le appropriate costanti specifiche del cloud. Fare riferimento a Connettersi a tutte le regioni utilizzando le librerie di Azure per Python multi-cloud | Microsoft Docs per istruzioni su come connettersi a questi cloud sovrani con Python.

Creare un client di Data Factory

  1. Creare un file denominato datafactory.py. Aggiungere le seguenti dichiarazioni per aggiungere riferimenti ai namespace.

    from azure.identity import ClientSecretCredential 
    from azure.mgmt.resource import ResourceManagementClient
    from azure.mgmt.datafactory import DataFactoryManagementClient
    from azure.mgmt.datafactory.models import *
    from datetime import datetime, timedelta
    import time
    
  2. Aggiungere le funzioni seguenti, che consentono di stampare informazioni.

    def print_item(group):
        """Print an Azure object instance."""
        print("\tName: {}".format(group.name))
        print("\tId: {}".format(group.id))
        if hasattr(group, 'location'):
            print("\tLocation: {}".format(group.location))
        if hasattr(group, 'tags'):
            print("\tTags: {}".format(group.tags))
        if hasattr(group, 'properties'):
            print_properties(group.properties)
    
    def print_properties(props):
        """Print a ResourceGroup properties instance."""
        if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
            print("\tProperties:")
            print("\t\tProvisioning State: {}".format(props.provisioning_state))
        print("\n\n")
    
    def print_activity_run_details(activity_run):
        """Print activity run details."""
        print("\n\tActivity run details\n")
        print("\tActivity run status: {}".format(activity_run.status))
        if activity_run.status == 'Succeeded':
            print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
            print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
            print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
        else:
            print("\tErrors: {}".format(activity_run.error['message']))
    
  3. Aggiungere il codice seguente al metodo Main per creare un'istanza della classe DataFactoryManagementClient. Usare questo oggetto per creare la data factory, il servizio collegato, i set di dati e la pipeline. È possibile usare questo oggetto anche per monitorare i dettagli sull'esecuzione della pipeline. Impostare la variabile subscription_id come ID della sottoscrizione di Azure. Per un elenco di aree Azure in cui Data Factory è attualmente disponibile, selezionare le aree a cui si è interessati nella pagina seguente e quindi espandere Analytics per individuare Data Factory: Products disponibile in base all'area. Gli archivi dati (Archiviazione di Azure, database SQL di Azure e così via) e i calcoli (HDInsight e così via) usati dalla data factory possono trovarsi in altre aree.

    def main():
    
        # Azure subscription ID
        subscription_id = '<subscription ID>'
    
        # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
        rg_name = '<resource group>'
    
        # The data factory name. It must be globally unique.
        df_name = '<factory name>'
    
        # Specify your Active Directory client ID, client secret, and tenant ID
        credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') 
    
        # Specify following for Sovereign Clouds, import right cloud constant and then use it to connect.
        # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD
        # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id)
    
        resource_client = ResourceManagementClient(credentials, subscription_id)
        adf_client = DataFactoryManagementClient(credentials, subscription_id)
    
        rg_params = {'location':'westus'}
        df_params = {'location':'westus'}
    

Creare una data factory

Aggiungere il codice seguente al metodo Main per creare una data factory. Se il gruppo di risorse esiste già, impostare come commento la prima istruzione create_or_update.

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    #Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

Creare un servizio collegato

Aggiungere il codice seguente al metodo Main che crea un servizio collegato Archiviazione di Azure.

Si creano servizi collegati in una data factory per collegare gli archivi dati e i servizi di calcolo alla data factory. In questa guida introduttiva, è necessario creare un solo servizio collegato di Archiviazione di Azure sia come origine che come destinazione di copia, denominato "AzureStorageLinkedService" nell'esempio. Sostituire <storageaccountname> e <storageaccountkey> con il nome e la chiave dell'account Archiviazione di Azure.

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

Creare set di dati

In questa sezione vengono creati due set di dati: uno per l'origine e l'altro per il sink.

Creare un set di dati per l'origine Azure BLOB

Aggiungere il codice seguente al metodo Main che crea un set di dati BLOB Azure. Per informazioni sulle proprietà del set di dati BLOB Azure, vedere l'articolo Azure connettore BLOB.

Si definisce un set di dati che rappresenta i dati di origine in Azure BLOB. Questo set di dati BLOB fa riferimento al servizio collegato Archiviazione di Azure creato nel passaggio precedente.

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)) 
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

Creare un set di dati per il sink Azure BLOB

Aggiungere il codice seguente al metodo Main che crea un set di dati BLOB Azure. Per informazioni sulle proprietà del set di dati BLOB Azure, vedere l'articolo Azure connettore BLOB.

Si definisce un set di dati che rappresenta i dati di origine in Azure BLOB. Questo set di dati BLOB fa riferimento al servizio collegato Archiviazione di Azure creato nel passaggio precedente.

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

Creare una pipeline

Aggiungere il codice seguente al metodo Main per creare una pipeline con un'attività di copia.

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)

    #Create a pipeline with the copy activity
    
    #Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
    #Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
    
    p_name = 'copyPipeline'
    params_for_pipeline = {}

    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

Creare un'esecuzione della pipeline

Aggiungere il codice seguente al metodo Main per attivare un'esecuzione della pipeline.

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

Monitorare un'esecuzione della pipeline

Per monitorare l'esecuzione della pipeline, aggiungere il codice seguente al metodo Main:

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])

Aggiungere quindi l'istruzione seguente per richiamare il metodo main quando viene eseguito il programma:

# Start the main method
main()

Script completo

Di seguito è riportato il codice completo Python:

from azure.identity import ClientSecretCredential 
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time

def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)

def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n\n")

def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))


def main():

    # Azure subscription ID
    subscription_id = '<subscription ID>'

    # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
    rg_name = '<resource group>'

    # The data factory name. It must be globally unique.
    df_name = '<factory name>'

    # Specify your Active Directory client ID, client secret, and tenant ID
    credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>') 
    resource_client = ResourceManagementClient(credentials, subscription_id)
    adf_client = DataFactoryManagementClient(credentials, subscription_id)

    rg_params = {'location':'westus'}
    df_params = {'location':'westus'}
 
    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    # Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
                                 dsOut_ref], source=blob_source, sink=blob_sink)

    # Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(
        activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])


# Start the main method
main()

Eseguire il codice

Compilare e avviare l'applicazione, quindi verificare l'esecuzione della pipeline.

La console stampa lo stato di creazione della data factory, del servizio collegato, dei set di dati, della pipeline e dell'esecuzione della pipeline. Attendere fino a quando non vengono visualizzati i dettagli sull'esecuzione dell'attività di copia con le dimensioni dei dati letti/scritti. Usare quindi strumenti come Archiviazione di Azure Explorer per verificare che i BLOB vengano copiati in "outputBlobPath" da "inputBlobPath" come specificato nelle variabili.

Di seguito è riportato l'output di esempio:

Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}

Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService

Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in

Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out

Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline

Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.

Activity run details

Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4

Pulire le risorse

Per eliminare la data factory, aggiungere il codice seguente al programma:

adf_client.factories.delete(rg_name, df_name)

La pipeline in questo esempio copia i dati da una posizione a un'altra in un archivio BLOB Azure. Per informazioni sull'uso di Data Factory in più scenari, fare riferimento alle esercitazioni.