Condividi tramite


Ottimizzazione dell'elaborazione dati per Apache Spark

Questo articolo illustra come ottimizzare la configurazione del cluster Apache Spark per ottenere prestazioni ottimali in Azure HDInsight.

Informazioni generali

Se sono presenti processi lenti in un'operazione Join o di riproduzione casuale, la causa è probabilmente dovuta a un'asimmetria dei dati, Per asimmetria dei dati si intende uno squilibrio nei dati del processo. Ad esempio, un job di mappa può richiedere 20 secondi. Tuttavia, l'esecuzione di un processo in cui i dati vengono aggiunti o riorganizzati richiede ore. Per correggere lo skew dei dati, è necessario aggiungere un salt all'intera chiave o usare un salt isolato solo per un sottoinsieme di chiavi. Se si usa un salting isolato, è necessario filtrare ulteriormente per isolare il subset di chiavi archiviate con salting nei join di mapping. È anche possibile introdurre una colonna di bucket ed eseguire una prima aggregazione in bucket.

Un altro fattore che rallenta i join potrebbe essere il tipo di join. Per impostazione predefinita, Spark usa il SortMerge tipo di join. Questo tipo di join è più adatto per set di dati di grandi dimensioni. Ma è altrimenti costoso dal momento che deve prima ordinare i lati sinistro e destro dei dati prima di unirli.

Un Broadcast join è più adatto per set di dati più piccoli o in cui un lato del join è molto più piccolo dell'altro lato. Questo tipo di join trasmette un solo lato a tutti gli executor e pertanto richiede più memoria per le trasmissioni in generale.

È possibile modificare il tipo di join nella configurazione impostando spark.sql.autoBroadcastJoinThresholdoppure è possibile impostare un hint di join usando le API dataframe (dataframe.join(broadcast(df2))).

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

Se si usano tabelle in bucket, si dispone di un terzo tipo di join, il Merge join. Un set di dati correttamente pre-partizionato e preordinato ignorerà la fase costosa di ordinamento da un join SortMerge.

L'ordine dei join è importante, in particolare nelle query più complesse. Iniziare con i join più selettivi. Inoltre, quando possibile, spostare i join che portano a un incremento nel numero di righe dopo le aggregazioni.

Per gestire il parallelismo per i join cartesiani, è possibile aggiungere strutture annidate, finestre e ad esempio ignorare uno o più passaggi nel processo Spark.

Ottimizzare l'esecuzione del lavoro

  • Memorizza nella cache, ad esempio, se usi i dati due volte, allora memorizzali nella cache.
  • Trasmettere le variabili a tutti gli executor. Le variabili vengono serializzate una sola volta, con conseguente ricerca più rapida.
  • Usare il pool di thread sul driver, determinando operazioni più veloci per molte attività.

Monitorare regolarmente i processi in esecuzione per individuare i problemi di prestazioni. Se sono necessarie altre informazioni su alcuni problemi, prendere in considerazione uno degli strumenti di profilatura delle prestazioni seguenti:

La chiave per le prestazioni delle query di Spark 2.x è il motore al tungsteno, che dipende dalla generazione di codici whole-stage. In alcuni casi, la generazione di codice in fase intera potrebbe essere disabilitata. Ad esempio, se si usa un tipo non modificabile (string) nell'espressione di aggregazione, SortAggregate viene visualizzato anziché HashAggregate. Ad esempio, per prestazioni migliori, provare a eseguire le operazioni seguenti e quindi riabilitare la generazione del codice:

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

Passaggi successivi