Freigeben über


Migrieren von Daten von Amazon S3 zu Azure Data Lake Storage Gen2

Gilt für: Azure Data Factory Azure Synapse Analytics

Tipp

Data Factory in Microsoft Fabric ist die nächste Generation von Azure Data Factory mit einer einfacheren Architektur, integrierter KI und neuen Features. Wenn Sie mit der Datenintegration noch nicht vertraut sind, beginnen Sie mit Fabric Data Factory. Vorhandene ADF-Workloads können auf Fabric aktualisiert werden, um auf neue Funktionen in der Datenwissenschaft, Echtzeitanalysen und Berichterstellung zuzugreifen.

Verwenden Sie die Vorlagen, um Petabytes von Daten zu migrieren, die aus Hunderten von Millionen Dateien von Amazon S3 zu Azure Data Lake Storage Gen2 bestehen.

Hinweis

Wenn Sie kleine Datenvolumen von AWS S3 auf Azure kopieren möchten (z. B. weniger als 10 TB), ist es effizienter und einfacher, das tool Azure Data Factory Copy Data zu verwenden. Die in diesem Artikel beschriebene Vorlage ist umfangreicher als erforderlich.

Informationen zu den Lösungsvorlagen

Die Datenpartition ist besonders zu empfehlen, wenn mehr als 10 TB an Daten migriert werden. Nutzen Sie zum Partitionieren der Daten die Einstellung „Präfix“, um die Ordner und Dateien in Amazon S3 nach Name zu filtern. So kann dann mit jedem ADF-Kopierauftrag jeweils eine Partition kopiert werden. Sie können mehrere ADF-Kopieraufträge gleichzeitig ausführen, um einen besseren Durchsatz zu erzielen.

Bei der Datenmigration ist normalerweise eine einmalige historische Datenmigration erforderlich, und die Änderungen von AWS S3 zu Azure werden regelmäßig synchronisiert. Es gibt zwei Vorlagen unten, in denen eine Vorlage die einmalige Migration von historischen Daten und eine andere Vorlage die Synchronisierung der Änderungen von AWS S3 zu Azure behandelt.

Für die Vorlage zum Migrieren von historischen Daten von Amazon S3 zu Azure Data Lake Storage Gen2

Diese Vorlage (Template-Name: Migrieren von historischen Daten von AWS S3 zu Azure Data Lake Storage Gen2) geht davon aus, dass Sie eine Partitionsliste in einer externen Steuerelementtabelle in Azure SQL-Datenbank geschrieben haben. Sie verwendet daher eine Lookup-Aktivität zum Abrufen der Partitionsliste aus der externen Steuertabelle, durchläuft jede Partition und kopiert dann mit jedem ADF-Kopierauftrag jeweils eine Partition. Nach Abschluss eines Kopierauftrags wird die Aktivität Gespeicherte Prozedur verwendet, um den Status des Kopiervorgangs für jede Partition in der Steuertabelle zu aktualisieren.

Die Vorlage enthält fünf Aktivitäten:

  • Lookup ruft die Partitionen ab, die nicht aus einer externen Steuerelementtabelle in Azure Data Lake Storage Gen2 kopiert wurden. Der Tabellenname lautet s3_partition_control_table und die Abfrage zum Laden von Daten aus der Tabelle "SELECT PartitionPrefix FROM s3_partition_control_table WHERE SuccessOrFailure = 0" .
  • ForEach ruft die Partitionsliste aus der Lookup-Aktivität ab und iteriert über jede Partition zur TriggerCopy-Aktivität. Sie können einen Wert für batchCount festlegen, um mehrere ADF-Kopieraufträge gleichzeitig auszuführen. Es wurde der Wert 2 in dieser Vorlage festgelegt.
  • ExecutePipeline führt die Pipeline CopyFolderPartitionFromS3 aus. Die zusätzliche Pipeline, die dafür sorgt, dass bei jedem Kopierauftrag eine Partition kopiert wird, wird hinzugefügt, um den nicht erfolgreichen Kopierauftrag mühelos erneut ausführen und so die spezifische Partition erneut aus AWS S3 laden zu können. Alle anderen Kopieraufträge, bei denen andere Partitionen geladen werden, werden nicht beeinträchtigt.
  • Copy kopiert jede Partition von AWS S3 in Azure Data Lake Storage Gen2.
  • SqlServerStoredProcedure aktualisiert den Status des Kopiervorgangs für jede Partition in der Steuertabelle.

Die Vorlage enthält zwei Parameter:

  • AWS_S3_bucketName ist der Name des Buckets in AWS S3, aus dem Daten migriert werden sollen. Wenn Sie Daten aus mehreren Buckets in AWS S3 migrieren möchten, können Sie in Ihrer externen Steuertabelle eine weitere Spalte hinzufügen, um den Bucketnamen für jede Partition zu speichern, und darüber hinaus Ihre Pipeline aktualisieren, um Daten aus dieser Spalte abzurufen.
  • Azure_Storage_fileSystem ist Ihr Dateisystemname auf Azure Data Lake Storage Gen2, in den Sie Daten migrieren möchten.

Damit die Vorlage geänderte Dateien nur von Amazon S3 in Azure Data Lake Storage Gen2 kopieren kann

Diese Vorlage (Template-Name: Kopieren von Delta-Daten aus AWS S3 in Azure Data Lake Storage Gen2) verwendet LastModifiedTime jeder Datei, um die neuen oder aktualisierten Dateien nur von AWS S3 in Azure zu kopieren. Beachten Sie, dass, wenn Ihre Dateien oder Ordner auf AWS S3 bereits mit Zeitabschnittsinformationen als Teil des Datei- oder Ordnernamens (z.B. /jjjj/mm/tt/Datei.csv) zeitlich partitioniert wurden, Sie in diesem Tutorial einen leistungsfähigeren Ansatz zum inkrementellen Laden neuer Dateien finden können. Bei dieser Vorlage wird davon ausgegangen, dass Sie eine Partitionsliste in einer Externen Steuerelementtabelle in Azure SQL-Datenbank geschrieben haben. Sie verwendet daher eine Lookup-Aktivität zum Abrufen der Partitionsliste aus der externen Steuertabelle, durchläuft jede Partition und kopiert dann mit jedem ADF-Kopierauftrag jeweils eine Partition. Wenn der Kopierauftrag mit dem Kopieren der Dateien aus AWS S3 beginnt, werden anhand der Eigenschaft „LastModifiedTime“ nur die neuen oder aktualisierten Dateien ermittelt und kopiert. Nach Abschluss eines Kopierauftrags wird die Aktivität Gespeicherte Prozedur verwendet, um den Status des Kopiervorgangs für jede Partition in der Steuertabelle zu aktualisieren.

Die Vorlage enthält sieben Aktivitäten:

  • Lookup ruft die Partitionen aus einer externen Steuertabelle ab. Der Tabellenname lautet s3_partition_delta_control_table und die Abfrage zum Laden von Daten aus der Tabelle "select distinct PartitionPrefix from s3_partition_delta_control_table".
  • ForEach ruft die Partitionsliste aus der Lookup-Aktivität ab und iteriert jede Partition in der TriggerDeltaCopy-Aktivität. Sie können einen Wert für batchCount festlegen, um mehrere ADF-Kopieraufträge gleichzeitig auszuführen. Es wurde der Wert 2 in dieser Vorlage festgelegt.
  • ExecutePipeline führt die Pipeline DeltaCopyFolderPartitionFromS3 aus. Die zusätzliche Pipeline, die dafür sorgt, dass bei jedem Kopierauftrag eine Partition kopiert wird, wird hinzugefügt, um den nicht erfolgreichen Kopierauftrag mühelos erneut ausführen und so die spezifische Partition erneut aus AWS S3 laden zu können. Alle anderen Kopieraufträge, bei denen andere Partitionen geladen werden, werden nicht beeinträchtigt.
  • Mit der Lookup-Aktivität wird der Zeitpunkt der letzten Ausführung des Kopierauftrags aus der externen Steuertabelle abgerufen, damit die neuen oder aktualisierten Dateien anhand von „LastModifiedTime“ ermittelt werden können. Der Tabellenname lautet s3_partition_delta_control_table und die Abfrage zum Laden von Daten aus der Tabelle "select max(JobRunTime) as LastModifiedTime from s3_partition_delta_control_table where PartitionPrefix = '@{pipeline().parameters.prefixStr}' and SuccessOrFailure = 1".
  • Copy kopiert neue oder geänderte Dateien nur für jede Partition von AWS S3 in Azure Data Lake Storage Gen2. Die Eigenschaft modifiedDatetimeStart wird auf den Zeitpunkt der letzten Ausführung des Kopierauftrags festgelegt. Die Eigenschaft modifiedDatetimeEnd wird auf den Zeitpunkt der aktuellen Ausführung des Kopierauftrags festgelegt. Beachten Sie, dass die Uhrzeit der UTC-Zeitzone entspricht.
  • SqlServerStoredProcedure aktualisiert bei erfolgreicher Ausführung den Status des Kopiervorgangs für jede Partition und den Ausführungszeitpunkt des Kopiervorgangs in der Steuertabelle. Die Spalte „SuccessOrFailure“ wird auf 1 festgelegt.
  • SqlServerStoredProcedure aktualisiert bei fehlerhafter Ausführung den Status des Kopiervorgangs für jede Partition und den Ausführungszeitpunkt des Kopiervorgangs in der Steuertabelle. Die Spalte „SuccessOrFailure“ wird auf 0 festgelegt.

Die Vorlage enthält zwei Parameter:

  • AWS_S3_bucketName ist der Name des Buckets in AWS S3, aus dem Daten migriert werden sollen. Wenn Sie Daten aus mehreren Buckets in AWS S3 migrieren möchten, können Sie in Ihrer externen Steuertabelle eine weitere Spalte hinzufügen, um den Bucketnamen für jede Partition zu speichern, und darüber hinaus Ihre Pipeline aktualisieren, um Daten aus dieser Spalte abzurufen.
  • Azure_Storage_fileSystem ist Ihr Dateisystemname auf Azure Data Lake Storage Gen2, in den Sie Daten migrieren möchten.

Informationen zur Verwendung dieser beiden Lösungsvorlagen

Für die Vorlage zum Migrieren von historischen Daten von Amazon S3 zu Azure Data Lake Storage Gen2

  1. Erstellen Sie eine Steuerelementtabelle in Azure SQL-Datenbank, um die Partitionsliste von AWS S3 zu speichern.

    Hinweis

    Der Tabellenname lautet „s3_partition_control_table“. Das Schema der Steuerelementtabelle ist PartitionPrefix und SuccessOrFailure, wobei PartitionPrefix die Präfixeinstellung in S3 ist, um die Ordner und Dateien in Amazon S3 nach Namen zu filtern, und SuccessOrFailure ist der Status des Kopierens jeder Partition: 0 bedeutet, dass diese Partition nicht in Azure kopiert wurde und 1 bedeutet, dass diese Partition erfolgreich in Azure kopiert wurde. In der Steuertabelle sind fünf Partitionen definiert, und der Standardstatus des Kopiervorgangs für die einzelnen Partitionen lautet 0.

    CREATE TABLE [dbo].[s3_partition_control_table](
        [PartitionPrefix] [varchar](255) NULL,
        [SuccessOrFailure] [bit] NULL
    )
    
    INSERT INTO s3_partition_control_table (PartitionPrefix, SuccessOrFailure)
    VALUES
    ('a', 0),
    ('b', 0),
    ('c', 0),
    ('d', 0),
    ('e', 0);
    
  2. Erstellen Sie eine gespeicherte Prozedur auf derselben Azure SQL-Datenbank für die Kontrolltabelle.

    Hinweis

    Der Name der gespeicherten Prozedur lautet „sp_update_partition_success“. Sie wird von der Aktivität „SqlServerStoredProcedure“ in Ihrer ADF-Pipeline aufgerufen.

    CREATE PROCEDURE [dbo].[sp_update_partition_success] @PartPrefix varchar(255)
    AS
    BEGIN
    
        UPDATE s3_partition_control_table
        SET [SuccessOrFailure] = 1 WHERE [PartitionPrefix] = @PartPrefix
    END
    GO
    
  3. Gehen Sie zur Vorlage Historische Daten von AWS S3 zu Azure Data Lake Storage Gen2 migrieren. Geben Sie die Verbindungen zu Ihrer Externen Kontrolltabelle, AWS S3 als Datenquellenspeicher und Azure Data Lake Storage Gen2 als Zielspeicher ein. Achten Sie darauf, dass die externe Steuertabelle und die gespeicherte Prozedur auf dieselbe Verbindung verweisen.

    Screenshot, der die Vorlage zum Migrieren historischer Daten von AWS S3 zu Azure Data Lake Storage Gen2 zeigt.

  4. Klicken Sie auf Diese Vorlage verwenden.

    Screenshot: Hervorhebung der Schaltfläche „Diese Vorlage verwenden“

  5. Im folgenden Beispiel sehen Sie, dass zwei Pipelines und drei Datasets erstellt wurden:

    Screenshot: Zwei Pipelines und drei Datasets, die mithilfe der Vorlage erstellt wurden

  6. Wechseln Sie zur Pipeline „BulkCopyFromS3“, wählen Sie Debug aus, und geben Sie die Parameter ein. Wählen Sie Fertig stellen aus.

    Screenshot, der zeigt, wo Sie „Debuggen“ auswählen und die Parameter eingeben, bevor Sie „Fertig stellen“ auswählen

  7. Ihnen werden Ergebnisse angezeigt, die dem folgenden Beispiel ähneln:

    Screenshot, der die angezeigten Ergebnisse zeigt.

Damit die Vorlage geänderte Dateien nur von Amazon S3 in Azure Data Lake Storage Gen2 kopieren kann

  1. Erstellen Sie eine Steuerelementtabelle in Azure SQL-Datenbank, um die Partitionsliste von AWS S3 zu speichern.

    Hinweis

    Der Tabellenname lautet „s3_partition_delta_control_table“. Das Schema der Steuerelementtabelle ist PartitionPrefix, JobRunTime und SuccessOrFailure, wobei PartitionPrefix die Präfixeinstellung in S3 ist, um die Ordner und Dateien in Amazon S3 nach Namen zu filtern, JobRunTime ist der Datetime-Wert, wenn Kopieraufträge ausgeführt werden, und SuccessOrFailure ist der Status des Kopierens jeder Partition: 0 bedeutet, dass diese Partition nicht in Azure kopiert wurde und 1 bedeutet, dass diese Partition erfolgreich in Azure kopiert wurde. In der Steuertabelle sind fünf Partitionen definiert. Als Standardwert für „JobRunTime“ kann die Zeit festgelegt werden, zu der die einmalige Migration der historischen Daten beginnt. Die ADF-Kopieraktivität kopiert die Dateien in AWS S3, die nach diesem Zeitpunkt zuletzt geändert wurden. Der Standardstatus zum Kopieren der einzelnen Partitionen ist 1.

    CREATE TABLE [dbo].[s3_partition_delta_control_table](
        [PartitionPrefix] [varchar](255) NULL,
        [JobRunTime] [datetime] NULL,
        [SuccessOrFailure] [bit] NULL
        )
    
    INSERT INTO s3_partition_delta_control_table (PartitionPrefix, JobRunTime, SuccessOrFailure)
    VALUES
    ('a','1/1/2019 12:00:00 AM',1),
    ('b','1/1/2019 12:00:00 AM',1),
    ('c','1/1/2019 12:00:00 AM',1),
    ('d','1/1/2019 12:00:00 AM',1),
    ('e','1/1/2019 12:00:00 AM',1);
    
  2. Erstellen Sie eine gespeicherte Prozedur in derselben Azure SQL-Datenbank für die Kontrolltabelle.

    Hinweis

    Der Name der gespeicherten Prozedur lautet „sp_insert_partition_JobRunTime_success“. Sie wird von der Aktivität „SqlServerStoredProcedure“ in Ihrer ADF-Pipeline aufgerufen.

    CREATE PROCEDURE [dbo].[sp_insert_partition_JobRunTime_success] @PartPrefix varchar(255), @JobRunTime datetime, @SuccessOrFailure bit
    AS
    BEGIN
        INSERT INTO s3_partition_delta_control_table (PartitionPrefix, JobRunTime, SuccessOrFailure)
        VALUES
            (@PartPrefix,@JobRunTime,@SuccessOrFailure)
    END
    GO
    
  3. Wechseln Sie zur Vorlage Delta-Daten von AWS S3 zu Azure Data Lake Storage Gen2 kopieren. Geben Sie die Verbindungen zu Ihrer Externen Kontrolltabelle, AWS S3 als Datenquellenspeicher und Azure Data Lake Storage Gen2 als Zielspeicher ein. Achten Sie darauf, dass die externe Steuertabelle und die gespeicherte Prozedur auf dieselbe Verbindung verweisen.

    Erstellen einer neuen Verbindung

  4. Klicken Sie auf Diese Vorlage verwenden.

    „Diese Vorlage verwenden“

  5. Im folgenden Beispiel sehen Sie, dass zwei Pipelines und drei Datasets erstellt wurden:

    Die Pipeline überprüfen

  6. Wechseln Sie zur Pipeline „DeltaCopyFromS3“, wählen Sie Debug aus, und geben Sie die Parameter ein. Wählen Sie Fertig stellen aus.

    Auf **Debuggen** klicken

  7. Ihnen werden Ergebnisse angezeigt, die dem folgenden Beispiel ähneln:

    Überprüfen des Ergebnisses

  8. Sie können die Ergebnisse aus der Steuertabelle auch mithilfe der Abfrage "select * from s3_partition_delta_control_table" überprüfen. Die Ausgabe ähnelt dem folgenden Beispiel:

    Screenshot: Ergebnisse aus der Steuertabelle nach dem Ausführen der Abfrage