Gnist Implementering av randomSplit ()
signaturfunksjonen til randomSplit () inkluderer en vektliste og en frøspesifikasjon. Vektlisten er å spesifisere antall deler og prosentandel (omtrentlig) i hver og frøet er for reproduserbarhet. Forholdet er omtrentlig på grunn av arten av hvordan det beregnes.
for Eksempel vil følgende kode i Figur 3 dele df i to datarammer, train_df
være 80% og test_df
være 20% av den opprinnelige datarammen. Ved å bruke samme verdi for tilfeldig frø, forventer vi at de samme datapunktene er i samme splitt hvis vi skulle kjøre skriptet eller Gnist internt gjenoppbygger splittene.
under Hetten
følgende prosess gjentas for å generere hver delt dataramme: partisjonering, sortering i partisjoner og Bernoulli-prøvetaking. Hvis den opprinnelige datarammen ikke bufres, blir dataene hentet på nytt, partisjonert og sortert på nytt for hver delt beregning. Dette er kilden til potensielle anomalier. Oppsummert er randomSplit () ekvivalent med å utføre prøve () for hver splitt med prosentandelen til prøve endring med splittelsen som utføres. Dette er tydelig hvis du undersøker kildekoden for randomSplit () I PySpark3. Denne bloggen gir også litt mer informasjon og visualer om hvordan randomSplit () er implementert.
La oss gå gjennom et eksempel. Figur 4 er et diagram over prøven () for hver splitt, som begynner med 0,80-splittelsen.
Spark benytter Bernoulli sampling, som kan oppsummeres som å generere tilfeldige tall for et element (datapunkt) og akseptere det i en splitt hvis det genererte tallet faller innenfor et bestemt område, bestemt av delingsforholdet. For en 0,8 delt dataramme vil akseptområdet for Bernoulli-celleprøven være .
den samme prøveprosessen følges for 0,20-splittelsen I Figur 5, med bare grensene for aksept som endres til .
datarammen hentes på nytt, partisjoneres og sorteres i partisjoner igjen. Du kan se i eksemplet AT RDD-partisjoner er idempotente. Hvilket betyr at datapunktene i Hver partisjon i Figur 4, forblir i samme partisjon i Figur 5. For eksempel er punktene b og c I Partisjon 1 i Både Figur 4 og 5. I tillegg forblir frøet knyttet til hver partisjon alltid konstant, og rekkefølgen i partisjoner er identisk. Alle tre av disse punktene er grunnleggende for både prøve() og randomSplit(). Sikre at den samme prøven er produsert med samme frø i den tidligere, og garanterer ingen duplikater eller forsvinner datapunkter i sistnevnte.
Løsninger For Å Unngå Inkonsekvenser
Å Fikse disse problemene ligger i å sikre AT RDD-partisjoner og sorteringsrekkefølge er idempotente. En av de følgende tre metodene sikrer dette og kan brukes: 1) bufring av datarammen før operasjoner 2) partisjonering av en kolonne eller et sett med kolonner, og 3) bruk av aggregatfunksjonerenheten fra signalserver. Et eksempel på hver metode er vist i Figur 6.
Caching den opprinnelige datarammen fører til at partisjonsinnholdet holdes i minnet. Så I stedet for å hente data, partisjonere og sortere, Fortsetter Spark operasjoner ved hjelp av partisjonerte data i minnet. Merk at cache () er et alias for persist(pyspark.StorageLevel.memory_only)
som kanskje ikke er ideelt hvis du har minnebegrensninger. I stedet kan du vurdere å bruke persist(pyspark.StorageLevel.memory_and_disk_only)
. Hvis det ikke er noe minne eller diskplass tilgjengelig, Vil Spark hente og partisjonere data fra bunnen av, så det kan være lurt å overvåke dette fra Spark Web UI. Caching er løsningen jeg valgte i mitt tilfelle.
Sammendrag og Viktige Takeaways
Historiens Moral er: hvis uventet oppførsel skjer I Spark, trenger du bare å grave litt dypere! Her er et sammendrag av alle hovedpunktene i denne artikkelen:
- randomSplit() tilsvarer å bruke prøve () på datarammen flere ganger, med hver prøve på nytt, partisjonering og sortering av datarammen i partisjoner.
- datafordelingen på tvers av partisjoner og sorteringsrekkefølge er viktig for både randomSplit() og sample(). Hvis enten endre på data re-hente, kan det være duplikater eller manglende verdier på tvers av deler og den samme prøven ved hjelp av samme frø kan gi ulike resultater.disse inkonsekvensene kan ikke skje på hvert løp, Men for å eliminere dem helt, fortsett (aka cache) datarammen din, partisjonere på en kolonne( r), eller bruk aggregatfunksjoner som groupBy.