Spark implementering av randomSplit ()
signaturfunktionen för randomSplit () innehåller en viktlista och en fröspecifikation. Viktlistan är att ange antalet splittringar och procentandel (ungefärlig) i varje och fröet är för Reproducerbarhet. Förhållandet är ungefärligt på grund av hur det beräknas.
till exempel skulle följande kod i Figur 3 Dela df i två dataramar,train_df
är 80% ochtest_df
är 20% av den ursprungliga datarammen. Genom att använda samma värde för random seed förväntar vi oss att samma datapunkter är i samma delning om vi skulle köra skriptet igen eller Spark internt återuppbygger delningarna.
under huven
följande process upprepas för att generera varje delad dataram: partitionering, sortering inom partitioner och Bernoulli sampling. Om den ursprungliga dataramen inte är cachad kommer data att hämtas, partitioneras om och sorteras om för varje delad beräkning. Detta är källan till potentiella avvikelser. Sammanfattningsvis är randomSplit() ekvivalent med att utföra prov () för varje delning med procentandelen till provbyte med delningen som utförs. Detta är uppenbart om du undersöker källkoden för randomSplit() i PySpark3. Den här bloggen ger också lite mer information och grafik om hur randomSplit() implementeras.
låt oss gå igenom ett exempel. Figur 4 är ett diagram över provet () för varje delning, som börjar med 0.80 split.
Spark använder Bernoulli-provtagning, som kan sammanfattas som att generera slumptal för ett objekt (datapunkt) och acceptera det i en delning om det genererade numret faller inom ett visst intervall, bestämt av delningsförhållandet. För en 0,8 delad dataram skulle acceptansintervallet för Bernoulli-cellprovtagaren vara .
samma provtagningsprocess följs för 0.20-uppdelningen i Figur 5, med bara gränserna för acceptans ändras till .
dataramen hämtas, partitioneras och sorteras igen inom partitioner. Du kan se i exemplet att RDD-partitioner är idempotenta. Vilket innebär att datapunkterna i varje partition i Figur 4 förblir i samma partition i Figur 5. Till exempel är punkterna b och c i Partition 1 i både figur 4 och 5. Dessutom förblir fröet associerat med varje partition alltid konstant, och ordningen inom partitioner är identisk. Alla tre av dessa punkter är grundläggande för både prov () och randomSplit (). Säkerställa att samma prov produceras med samma utsäde i det förra och garantera inga dubbletter eller försvinnande datapunkter i det senare.
lösningar för att undvika inkonsekvenser
åtgärda dessa problem ligger i att säkerställa att RDD-partitioner och sorteringsordning är idempotenta. Vilken som helst av följande tre metoder säkerställer detta och kan tillämpas: 1) cachning av dataramen före operationer 2) partitionering av en kolumn eller en uppsättning kolumner, och 3) med hjälp av aggregerade funktionersegl. Ett exempel på varje metod visas i Figur 6.
cachning av den ursprungliga dataramen leder till att partitionsinnehåll hålls i minnet. Så istället för att hämta data, partitionera och sortera, fortsätter Spark operationer med partitionerade data i minnet. Observera att cache () är ett alias för persist(pyspark.StorageLevel.memory_only)
som kanske inte är perfekt om du har minnesbegränsningar. Istället kan du överväga att använda persist(pyspark.StorageLevel.memory_and_disk_only)
. Om det inte finns något minne eller diskutrymme tillgängligt, kommer Spark att hämta och partitionera data från början, så det kan vara klokt att övervaka detta från Spark Web UI. Caching är den lösning jag valde i mitt fall.
sammanfattning och viktiga Takeaways
historiens Moral är: om oväntat beteende händer i Spark behöver du bara gräva lite djupare! Här är en sammanfattning av alla viktiga punkter i den här artikeln:
- randomSplit() motsvarar att använda prov() på din dataram flera gånger, där varje prov hämtar, partitionerar och sorterar din dataram inom partitioner.
- datafördelningen mellan partitioner och sorteringsordning är viktig för både randomSplit() och sample(). Om endera ändringen när data hämtas igen kan det finnas dubbletter eller saknade värden över splittringar och samma prov med samma frö kan ge olika resultat.
- dessa inkonsekvenser kan inte hända vid varje körning, men för att eliminera dem helt, kvarstår (aka cache) din dataram, partitionering på en kolumn(er) eller tillämpa aggregerade funktioner som groupBy.