scânteie sub capotă: randomSplit () și sample () lucrări interioare

implementarea Spark a randomSplit ()

funcția de semnătură a randomSplit () include o listă de greutate și o specificație de semințe. Lista de greutate este de a specifica numărul de despicături și procentul (aproximativ) în fiecare și sămânța este pentru reproductibilitate. Raportul este aproximativ datorită naturii modului în care este calculat.

de exemplu, următorul cod din Figura 3 ar împărți df în două cadre de date, train_df fiind 80% și test_df fiind 20% din cadrul de date original. Folosind aceeași valoare pentru semințe aleatorii, ne așteptăm ca aceleași puncte de date să fie în aceeași împărțire dacă ar fi să reluăm scriptul sau Spark reconstruiește intern împărțirile.

Figura 3: randomSplit () exemplu de funcție de semnătură

sub capotă

următorul proces se repetă pentru a genera fiecare cadru de date divizat: partiționarea, sortarea în partiții și eșantionarea Bernoulli. Dacă cadrul de date original nu este memorat în cache, atunci datele vor fi re-preluate, re-partiționate și re-sortate pentru fiecare calcul divizat. Aceasta este sursa anomaliilor potențiale. În rezumat, randomSplit() este echivalent cu efectuarea eșantionului () pentru fiecare împărțire, procentul de eșantion modificându-se odată cu împărțirea efectuată. Acest lucru este evident dacă examinați codul sursă pentru randomSplit() în PySpark3. Acest blog oferă, de asemenea, unele mai multe informații și efecte vizuale cu privire la modul în care randomSplit() este implementat.

să trecem printr-un exemplu. Figura 4 este o diagramă a eșantionului () pentru fiecare scindare, începând cu scindarea 0.80.

Figura 4: procesul de generare a împărțirii 0.8. Identic cu eșantion () punerea în aplicare.

Spark utilizează eșantionarea Bernoulli, care poate fi rezumată ca generând numere aleatorii pentru un articol (punct de date) și acceptându-l într-o împărțire dacă numărul generat se încadrează într-un anumit interval, determinat de raportul de împărțire. Pentru un cadru de date divizat de 0,8, intervalul de acceptare pentru eșantionatorul de celule Bernoulli ar fi .

același proces de eșantionare este urmat pentru împărțirea 0.20 din Figura 5, doar limitele acceptării schimbându-se în .

Figura 5: Procesul de generare a împărțirii 0.2. Identic cu proba (). punerea în aplicare. Conținutul partiției rămâne constant și ordinea de sortare este păstrată asigurând o împărțire validă.

cadrul de date este re-preluat, partiționat și sortat din nou în partiții. Puteți vedea în exemplu că partițiile RDD sunt idempotent. Ceea ce înseamnă că punctele de date din fiecare partiție din Figura 4 rămân în aceeași partiție din Figura 5. De exemplu, punctele b și c sunt în Partiția 1 atât în Figura 4, cât și în 5. În plus, sămânța asociată cu fiecare partiție rămâne întotdeauna constantă, iar ordinea din partiții este identică. Toate aceste trei puncte sunt fundamentale atât pentru sample (), cât și pentru randomSplit (). Asigurarea faptului că același eșantion este produs cu aceeași sămânță în prima și garantarea faptului că nu există duplicate sau puncte de date care dispar în cea de-a doua.

soluții pentru evitarea inconsecvențelor

remedierea acestor probleme constă în asigurarea faptului că partițiile RDD și ordinea de sortare sunt idempotente. Oricare dintre următoarele trei metode asigură acest lucru și poate fi aplicată: 1) cache-ul cadrului de date înainte de operații 2) repartizarea printr-o coloană sau un set de coloane și 3) Utilizarea funcțiilor agregate. Un exemplu al fiecărei metode este prezentat în Figura 6.

Figura 6: Trei metode diferite pentru a evita inconsecvențele în randomSplit () și sample ()

memorarea în cache a cadrului de date original duce la păstrarea conținutului partiției în memorie. Deci, în loc de re-preluarea datelor, partiționarea și sortarea, Spark continuă operațiunile folosind datele partiționate în memorie. Rețineți că cache () este un alias pentru persist(pyspark.StorageLevel.memory_only) care poate să nu fie ideal dacă aveți limitări de memorie. În schimb, puteți lua în considerare utilizarea persist(pyspark.StorageLevel.memory_and_disk_only). Dacă nu există memorie sau spațiu pe disc disponibil, Spark va prelua din nou și va partiționa datele de la zero, deci poate fi înțelept să monitorizați acest lucru din interfața web Spark. Caching-ul este soluția pe care am ales-o în cazul meu.

rezumat și Takeaways cheie

morală a poveștii este: dacă un comportament neașteptat se întâmplă în Spark, trebuie doar să sape un pic mai adânc! Iată un rezumat al tuturor punctelor cheie ale acestui articol:

  • randomSplit() este echivalent cu aplicarea eșantionului() pe cadrul dvs. de date de mai multe ori, fiecare eșantion re-preluând, partiționând și sortând cadrul de date în partiții.
  • distribuția datelor între partiții și ordinea de sortare este importantă atât pentru randomSplit (), cât și pentru sample (). Dacă fie se modifică la reluarea datelor, pot exista duplicate sau valori lipsă între împărțiri și același eșantion care utilizează aceeași sămânță poate produce rezultate diferite.
  • este posibil ca aceste inconsecvențe să nu se întâmple la fiecare rulare, dar pentru a le elimina complet, persistă (aka cache) cadrul dvs. de date, repartiția pe o coloană sau aplicați funcții agregate, cum ar fi groupBy.

Lasă un răspuns

Adresa ta de email nu va fi publicată. Câmpurile obligatorii sunt marcate cu *