Spark Under the Hood: randomSplit() e sample () Meccanismi interni

Spark Implementazione di randomSplit ()

La funzione di firma di randomSplit () include un elenco di pesi e una specifica seed. La lista dei pesi deve specificare il numero di spaccature e la percentuale (approssimativa) in ciascuna e il seme è per la riproducibilità. Il rapporto è approssimativo a causa della natura di come viene calcolato.

Ad esempio, il seguente codice nella Figura 3 suddivide df in due frame di dati,train_dfpari all ‘ 80% etest_df pari al 20% del frame di dati originale. Usando lo stesso valore per il seme casuale, ci aspettiamo che gli stessi punti dati si trovino nella stessa divisione se dovessimo rieseguire lo script o Spark ricostruisce internamente le divisioni.

Figura 3: randomSplit () signature function example

Sotto il cofano

Il seguente processo viene ripetuto per generare ogni frame di dati diviso: partizionamento, ordinamento all’interno delle partizioni e campionamento di Bernoulli. Se il frame di dati originale non viene memorizzato nella cache, i dati verranno recuperati, ri-partizionati e ri-ordinati per ogni calcolo di divisione. Questa è la fonte di potenziali anomalie. In sintesi, randomSplit() equivale a eseguire sample () per ogni divisione con la percentuale di campionamento che cambia con la divisione eseguita. Questo è evidente se si esamina il codice sorgente per randomSplit () in PySpark3. Questo blog provides fornisce anche ulteriori informazioni e immagini su come randomSplit() è implementato.

Esaminiamo un esempio. La figura 4 è un diagramma del campione () per ogni divisione, che inizia con la divisione 0.80.

Figura 4: Processo di generazione dello 0,8 spalato. Identico all’implementazione di sample ().

Spark utilizza il campionamento di Bernoulli, che può essere riassunto come generare numeri casuali per un elemento (punto dati) e accettarlo in una divisione se il numero generato rientra in un certo intervallo, determinato dal rapporto di divisione. Per un frame di dati diviso 0,8, l’intervallo di accettazione per il campionatore di celle di Bernoulli sarebbe .

Lo stesso processo di campionamento viene seguito per la divisione 0.20 in Figura 5, con solo i limiti di accettazione che cambiano in .

Figura 5: Processo di generazione 0.2 spalato. Identico a sample (). attuazione. Il contenuto della partizione rimane costante e l’ordinamento viene mantenuto garantendo una divisione valida.

Il frame di dati viene nuovamente recuperato, partizionato e ordinato all’interno delle partizioni. Puoi vedere nell’esempio che le partizioni RDD sono idempotenti. Il che significa che i punti dati in ogni partizione in Figura 4, rimangono nella stessa partizione in Figura 5. Ad esempio, i punti b e c si trovano nella partizione 1 sia in Figura 4 che in Figura 5. Inoltre, il seme associato a ciascuna partizione rimane sempre costante e l’ordine all’interno delle partizioni è identico. Tutti e tre questi punti sono fondamentali sia per sample() che per randomSplit(). Garantire che lo stesso campione sia prodotto con lo stesso seme nel primo e garantire l’assenza di duplicati o punti dati scomparsi nel secondo.

Soluzioni per evitare incongruenze

Risolvere questi problemi consiste nel garantire che le partizioni RDD e l’ordinamento siano idempotenti. Uno qualsiasi dei seguenti tre metodi garantisce questo e può essere applicato: 1) memorizzazione nella cache del frame di dati prima delle operazioni 2) ripartizionamento da una colonna o un insieme di colonne e 3) utilizzo di funzioni aggregate⁵. Un esempio di ciascun metodo è mostrato in Figura 6.

Figura 6: Tre diversi metodi per evitare incongruenze in randomSplit() e sample()

La memorizzazione nella cache del frame di dati originale porta al mantenimento in memoria del contenuto della partizione. Quindi, invece di recuperare i dati, partizionare e ordinare, Spark continua le operazioni utilizzando i dati partizionati in memoria. Si noti che cache () è un alias per persist(pyspark.StorageLevel.memory_only) che potrebbe non essere l’ideale se si hanno limitazioni di memoria. Invece, puoi considerare di utilizzare persist(pyspark.StorageLevel.memory_and_disk_only). Se non c’è memoria o spazio su disco disponibile, Spark recupererà e partizionerà i dati da zero, quindi potrebbe essere saggio monitorare questo dall’interfaccia utente Spark Web. Il caching è la soluzione che ho scelto nel mio caso.

Riepilogo e Takeaways chiave

Morale della storia è: se un comportamento inaspettato sta accadendo in Spark, devi solo scavare un po ‘ più a fondo! Ecco un riepilogo di tutti i punti chiave di questo articolo:

  • randomSplit() equivale ad applicare sample() sul frame di dati più volte, con ogni campione che recupera, partiziona e ordina il frame di dati all’interno delle partizioni.
  • La distribuzione dei dati tra partizioni e ordinamento è importante sia per randomSplit() che per sample(). Se si cambia dopo il recupero dei dati, potrebbero esserci duplicati o valori mancanti tra le suddivisioni e lo stesso campione utilizzando lo stesso seme può produrre risultati diversi.
  • Queste incongruenze potrebbero non verificarsi ad ogni esecuzione, ma per eliminarle completamente, persistono (ovvero cache) il frame di dati, la ripartizione su una / e colonna / e, o applicano funzioni di aggregazione come groupBy.

Lascia un commento

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *