Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Beim Umgang mit großen Datenmengen benötigen Sie eine Pipeline, die nur die neuen und geänderten Datensätze verarbeiten kann, anstatt das gesamte Dataset neu zu verarbeiten. Dies wird als inkrementelle ETL bezeichnet. In Databricks SQL können Sie inkrementelle ETL-Pipelines mithilfe von Streamingtabellen und materialisierten Ansichten erstellen, ohne prozeduralen Code zu schreiben oder manuelle Aktualisierungen zu planen.
Dieses Lernprogramm führt Sie durch ein gängiges Muster: Nachverfolgen von Produktänderungen im Laufe der Zeit. Sie erstellen eine Quelltabelle, erfassen Änderungsereignisse, erstellen eine Bemaßungstabelle, die den vollständigen Verlauf jedes Produkts bewahrt, und fügen sie oben eine aggregierte Berichtsschicht hinzu.
Das wichtigste Merkmal in diesem Tutorial ist AUTO CDC. In einem herkömmlichen Lagerhaus würden Sie komplexe MERGE INTO-Anweisungen schreiben, um Einfügungs-, Aktualisierungs- und Löschereignisse in eine Zieltabelle abzugleichen. Dieser Ansatz ist fehleranfällig, insbesondere wenn Ereignisse außerhalb der Reihenfolge ankommen.
AUTO CDC übernimmt das für Sie. Sie geben den Business Key, die Sequenzierungsspalte und an, ob Sie SCD Type 1 (nur neuester Wert) oder SCD Type 2 (vollständiger Verlauf) wünschen, und Azure Databricks wendet automatisch die richtige Zusammenführungslogik an. Eine Übersicht über CDC finden Sie in den AUTO CDC-APIs: Vereinfachen der Änderungsdatenerfassung mit Pipelines.
Am Ende dieses Lernprogramms haben Sie Folgendes:
- Erstellt eine Quelltabelle, die Änderungen mit dem Änderungsdatenfeed nachverfolgt.
- Überprüfte die rohen Änderungsdaten, um den CDC-Ereignisdatenstrom zu verstehen.
- Es wird
AUTO CDCverwendet, um eine SCD Type 2-Dimensionstabelle aus diesen Ereignissen zu erstellen. - Verarbeitete Löschereignisse inkrementell über die Pipeline.
- Eine materialisierte Ansicht wurde erstellt, die einen aggregierten Bericht schrittweise auf dem neuesten Stand hält.
- Konfiguriert
SCHEDULE REFRESH EVERY 1 DAY, sodass Änderungen automatisch über die Pipeline verteilt werden.
Anforderungen
Um dieses Tutorial abzuschließen, müssen Sie die folgenden Anforderungen erfüllen:
- Ein Azure Databricks Arbeitsbereich mit aktiviertem Unity Catalog.
- Ein SQL-Lagerhaus (serverlos oder pro).
- Verfügen Sie über die Berechtigung zum Erstellen einer Computeressource oder des Zugriffs auf eine Computeressource.
- Serverlose Berechnung für Ihr Konto aktiviert. Weitere Informationen finden Sie unter Features mit eingeschränkter regionaler Verfügbarkeit.
Schritt 1: Einrichten ihres Katalogs und Schemas
Öffnen Sie den SQL-Editor für Databricks , und legen Sie Ihren Arbeitskatalog und Ihr Schema fest. Sie müssen über die Berechtigung für USE den ausgewählten Katalog und das Schema verfügen, das Sie auswählen.
USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;
Schritt 2: Erstellen einer Quelltabelle und Laden von Daten
Erstellen Sie eine products Tabelle mit aktiviertem Delta Lake Change Data Feed auf Azure Databricks (CDF). CDF ist ein Delta Lake-Feature, das jedes Einfügen, Aktualisieren und Löschen als abfragbares Änderungsprotokoll aufzeichnet. Dies ähnelt einem CDC-Datenstrom aus einem Transaktionsquellsystem, mit der Ausnahme, dass die Änderungen direkt in der Delta-Tabelle und nicht aus einem externen Protokoll erfasst werden. Hier verwenden Sie CDF, um die Änderungsereignisse zu generieren, die von der nachgeschalteten Pipeline genutzt werden.
Erstellen Sie die Tabelle, und laden Sie die ersten Datensätze:
CREATE OR REPLACE TABLE products ( product_id INT, product_name STRING, category STRING, warehouse STRING ) TBLPROPERTIES (delta.enableChangeDataFeed = true); INSERT INTO products VALUES (1, 'Spoon', 'Cutlery', 'Seattle'), (2, 'Fork', 'Cutlery', 'Portland'), (3, 'Knife', 'Cutlery', 'Denver'), (4, 'Chair', 'Furniture', 'Austin'), (5, 'Table', 'Furniture', 'Chicago'), (6, 'Lamp', 'Lighting', 'Boston'), (7, 'Mug', 'Kitchenware', 'Seattle'), (8, 'Plate', 'Kitchenware', 'Atlanta'), (9, 'Bowl', 'Kitchenware', 'Dallas'), (10, 'Glass', 'Kitchenware', 'Phoenix');Simulieren von vorgelagerten Änderungen, einschließlich neuer Produkte, Lagerverschiebungen und Einer Kategorieumstellung:
INSERT INTO products VALUES (11, 'Napkin', 'Dining', 'San Francisco'), (12, 'Coaster', 'Dining', 'New York'); UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1; UPDATE products SET category = 'Dining' WHERE product_id = 2;
Schritt 3: Abfragen des Änderungsdatenfeeds
Vor dem Erstellen der downstream-Pipeline hilft es, die rohen Änderungsereignisse zu betrachten, damit Sie verstehen können, was AUTO CDC verarbeitet wird. Die table_changes() Funktion liest das CDF-Protokoll und gibt alle erfassten Vorgänge zusammen mit Metadatenspalten zurück:
SELECT
product_id, product_name, warehouse,
_change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;
Beispielsweise hat der Löffel drei Ereignisse: ein insert (Seattle), ein update_preimage (Seattle) und ein update_postimage (Los Angeles).
Beachten Sie, dass eine einzelne logische Änderung (z. B. das Verschieben des Löffels in ein anderes Lager) mehrere Ereignisse erzeugt: ein Vorimage und ein Postimage. In einem herkömmlichen Lager würden Sie eine MERGE Anweisung schreiben, um alle diese Ereignisse in einer Zieltabelle abzugleichen, Einfügungen, Aktualisierungen und Löschvorgänge mit separater Logik zu behandeln und sicherzustellen, dass Ereignisse in der richtigen Reihenfolge angewendet werden. Diese ist genau die Komplexität, die AUTO CDC im nächsten Schritt beseitigt.
Schritt 4: Erstellen einer SCD-Typ 2-Dimension mit AUTO CDC
Von Bedeutung
AUTO CDC ist in Der Betaversion enthalten. Erfordert Databricks Runtime 17.3 oder höher.
Eine Streamingtabelle verarbeitet Daten inkrementell. Bei jeder Aktualisierung liest sie nur die neuen Zeilen seit der letzten Ausführung, sodass das vollständige Dataset nicht erneut verarbeitet werden muss. Dies eignet sich gut für große oder häufig ändernde Quellen.
AUTO CDC fügt die Change Data Capture (CDC)-Verarbeitung zu einer Streamingtabelle hinzu. Anstatt eine MERGE INTO-Anweisung zu schreiben, die manuell Einfügungen, Aktualisierungen und Löschvorgänge behandelt, deklarieren Sie die Spalte "Business Key" und "Sequenzierung", und lassen Sie Azure Databricks die richtige Logik anwenden.
AUTO CDC behandelt auch Ereignisse außerhalb der Reihenfolge automatisch, was ein häufiges Problem ist, wenn MERGE INTO Ereignisse behandelt, die von verteilten Systemen oder Batch-Verarbeitungen mit überlappenden Zeitstempeln ankommen.
Die folgende Anweisung erstellt eine SCD Type 2-Tabelle, die den Vollversionsverlauf der einzelnen Produkte bewahrt. Jede Version erhält __START_AT und __END_AT Zeitstempel. A NULL in __END_AT markiert die aktuelle Version.
CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
-
SCHEDULE REFRESH EVERY 1 DAY: aktualisiert die Tabelle täglich gemäß einem Zeitplan. -
FLOW AUTO CDC: deklariert dies als CDC-Fluss. Azure Databricks wendet automatisch Semantik zum Einfügen, Aktualisieren und Löschen an. -
KEYS (product_id): der Geschäftsschlüssel. Ereignisse mit demselben Schlüssel werden in Versionszeilen zusammengeführt. -
APPLY AS DELETE WHEN _change_type = 'delete': schließt die aktuelle Version, wenn ein Löschereignis eingeht. Auf diese Weise können Sie die Bedingung definieren, die ein Löschereignis identifiziert. -
SEQUENCE BY _commit_timestamp: etabliert die Reihenfolge von Ereignissen. Behandelt Ankünfte außerhalb der Reihenfolge korrekt. -
STORED AS SCD TYPE 2: behält den vollständigen Verlauf bei.AUTO CDCunterstützt sowohl SCD Type 1 als auch SCD Type 2.
Abfrage der Dimensionstabelle:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
- Löffel: zwei Versionen. Seattle (geschlossen,
__END_ATgesetzt) und Los Angeles (aktuell,__END_AT = NULL). - Gabelung: zwei Versionen. Kategorie "Besteck" (geschlossen) und "Speisen" (aktuell).
- Napkin und Coaster: jeweils eine Version (neu eingefügt,
__END_AT = NULL). - Alle anderen Produkte: jeweils eine Version (
__END_AT = NULL).
Schritt 5: Löschungen durch die Pipeline verarbeiten
Simulieren Sie nun zwei nicht mehr vorhandene Produkte, indem Sie sie aus der Quelltabelle löschen:
DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;
Diese Löschereignisse werden im CDF-Protokoll aufgezeichnet, aber in der Streamingtabelle sind sie noch nicht aufgetaucht. Aktualisieren Sie die Streamingtabelle, um die neuen Ereignisse zu verarbeiten:
REFRESH STREAMING TABLE products_history;
Fragen Sie die Dimensionstabelle ab, um zu überprüfen, ob die Löschungen angewendet wurden:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
Schüssel und Glas sind jetzt mit __END_AT geschlossen und kennzeichnen sie als eingestellt. Alle anderen aktuellen Produkte bleiben unverändert. Die Streamingtabelle verarbeitete nur die neuen Löschereignisse, ohne die Einfügungen und Aktualisierungen aus der vorherigen Aktualisierung erneut zu verarbeiten.
Schritt 6: Erstellen einer aggregierten materialisierten Ansicht
Nachdem Sie nun über eine Dimensionstabelle verfügen, die mit Quelländerungen aktuell bleibt, können Sie darauf eine Berichtsschicht hinzufügen.
In einer materialisierten Ansicht werden vorab berechnete Abfrageergebnisse als physische Tabelle gespeichert. Im Gegensatz zu einer regulären Ansicht, die die Abfrage jedes Mal erneut ausführt, wenn Sie sie lesen, behält eine materialisierte Ansicht die Ergebnisse bei und berechnet nur die von vorgelagerten Änderungen bei jeder Aktualisierung betroffenen Zeilen. Dies eignet sich gut für Dashboards und Berichte, in denen die Abfrageleistung wichtig ist.
CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
category,
COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;
SCHEDULE REFRESH EVERY 1 DAY bedeutet, dass diese Ansicht täglich aktualisiert wird. In Kombination mit demselben Zeitplan in der Streamingtabelle verfügen Sie jetzt über eine dreistufige Pipeline, in der Änderungen an der Quelltabelle durch die Dimension bis hin zum Aggregat in jedem Aktualisierungszyklus weitergeleitet werden. Es ist keine manuelle Aktualisierung erforderlich.
SELECT * FROM products_by_category ORDER BY active_products DESC;
Schritt 7: Überprüfen der End-to-End-Kaskade
Um die vollständige Pipelinekaskade zu überprüfen, nehmen Sie eine Änderung an der Quelltabelle vor:
UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;
Das Messer wechselt von Denver nach Seattle. Diese einzelne DML-Änderung löst die vollständige Pipelinekaskade aus und veranschaulicht, wie die drei Stufen zusammenarbeiten:
-
productszeichnet das Änderungsereignis über CDF auf. -
products_historyverarbeitet das Ereignis und fügt eine neue Version für das Messer hinzu. -
products_by_categoryberechnet nur die betroffene Besteckreihe neu.
Überprüfen:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;
SELECT * FROM products_by_category ORDER BY active_products DESC;
Aufräumen
Um die von diesem Lernprogramm erstellten Ressourcen zu bereinigen, verwenden Sie die folgende SQL-Datei:
DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;