Condividi tramite


create_sink

Importante

L'API della pipeline create_sink è disponibile in anteprima pubblica.

La funzione create_sink() scrive in un servizio di streaming di eventi, ad esempio Apache Kafka o Hub eventi di Azure o in una tabella Delta da una pipeline dichiarativa. Dopo aver creato un sink con la funzione create_sink(), si usa il sink in un flusso di accodamento per scrivere i dati nel sink. Il flusso di aggiunta è l'unico tipo di flusso supportato con la funzione create_sink(). Altri tipi di flusso, ad esempio create_auto_cdc_flow, non sono supportati. Per informazioni dettagliate su altri tipi di sink nelle pipeline dichiarative di Lakeflow Spark, vedere Sink in Pipeline dichiarative di Lakeflow Spark.

Il sink Delta supporta le tabelle esterne e gestite di Unity Catalog e le tabelle gestite del metastore Hive. I nomi delle tabelle devono essere completamente qualificati. Ad esempio, le tabelle del catalogo Unity devono usare un identificatore a tre livelli: <catalog>.<schema>.<table>. Le tabelle del metastore Hive devono usare <schema>.<table>.

Annotazioni

  • L'esecuzione di un aggiornamento completo non cancella i dati dai raccoglitori. Tutti i dati elaborati verranno aggiunti al sink e i dati esistenti non verranno modificati.
  • Le aspettative non sono supportate con l'API sink .

Sintassi

from pyspark import pipelines as dp

dp.create_sink(name=<sink_name>, format=<format>, options=<options>)

Parametri

Parametro TIPO Description
name str Obbligatorio. Stringa che identifica il sink e viene utilizzata per fare riferimento e gestire lo sink. I nomi del sink devono essere univoci per la pipeline, inclusi tutti i file di codice sorgente che fanno parte della pipeline.
format str Obbligatorio. Stringa che definisce il formato di output, kafka o delta.
options dict Elenco di opzioni sink, formattate come {"key": "value"}, in cui la chiave e il valore sono entrambe stringhe. Sono supportate tutte le opzioni di Databricks Runtime supportate dai sink Kafka e Delta.

Esempi

from pyspark import pipelines as dp

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)