Vonk onder de motorkap: randomSplit() en sample () binnenwerk

Vonk implementatie van randomSplit ()

de signature functie van randomSplit () omvat een gewichtslijst en een seed specificatie. De gewichtslijst moet het aantal splitsingen en percentage (bij benadering) in elk specificeren en het zaad is voor reproduceerbaarheid. De verhouding is bij benadering te wijten aan de aard van de manier waarop het wordt berekend.

bijvoorbeeld, de volgende code in Figuur 3 zou DF splitsen in twee dataframes, train_df is 80% en test_df is 20% van het oorspronkelijke dataframe. Door dezelfde waarde te gebruiken voor random seed, verwachten we dat dezelfde gegevenspunten in dezelfde split zitten als we het script opnieuw uitvoeren of de vonk intern de splits herbouwt.

Figuur 3: randomSplit () signature function example

onder de motorkap

het volgende proces wordt herhaald om elk gesplitst gegevensframe te genereren: partitioneren, Sorteren binnen partities, en Bernoulli sampling. Als het oorspronkelijke dataframe niet in de cache is opgeslagen, worden de gegevens opnieuw opgehaald, opnieuw gepartitioneerd en opnieuw gesorteerd voor elke splitberekening. Dit is de bron van potentiële anomalieën. Samenvattend is randomSplit() gelijk aan het uitvoeren van sample() voor elke splitsing waarbij het percentage van de steekproef verandert naarmate de splitsing wordt uitgevoerd. Dit is duidelijk als je de broncode voor randomSplit() in PySpark3 onderzoekt. Deze blog⁴ biedt ook wat meer informatie en visuals over hoe randomSplit() is geïmplementeerd.

laten we een voorbeeld doornemen. Figuur 4 is een diagram van het monster () voor elke splitsing, te beginnen met de 0,80 splitsing.

Figuur 4: proces voor het genereren van de 0,8-splitsing. Identiek aan sample () implementatie.

Spark maakt gebruik van Bernoulli-bemonstering, die kan worden samengevat als het genereren van willekeurige getallen voor een item (gegevenspunt) en het accepteren in een splitsing als het gegenereerde getal binnen een bepaald bereik valt, bepaald door de splitsingsverhouding. Voor een 0,8 split data frame zou het acceptatiebereik voor de Bernoulli cell sampler zijn .

hetzelfde bemonsteringsproces wordt gevolgd voor de 0,20 splitsing in Figuur 5, waarbij alleen de grenzen van de acceptatie veranderen in .

Figuur 5: proces voor het genereren van de 0,2-splitsing. Identiek aan Monster (). uitvoering. De partitieinhoud blijft constant en de sorteervolgorde wordt behouden om een geldige splitsing te garanderen.

het dataframe wordt opnieuw opgehaald, gepartitioneerd en opnieuw gesorteerd binnen partities. Je kunt in het voorbeeld zien dat RDD partities idempotent zijn. Wat betekent dat de gegevenspunten in elke partitie in Figuur 4, blijven in dezelfde partitie in Figuur 5. Bijvoorbeeld, de punten b en c staan in partitie 1 in zowel Figuur 4 als 5. Bovendien blijft het zaad dat bij elke partitie hoort altijd constant, en de volgorde binnen partities is identiek. Alle drie deze punten zijn fundamenteel voor zowel sample () als randomSplit (). Ervoor zorgen dat hetzelfde monster in het eerste monster met hetzelfde zaad wordt geproduceerd, en ervoor zorgen dat in het tweede monster geen duplicaten of verdwijnpunten worden geproduceerd.

oplossingen om inconsistenties te vermijden

het oplossen van deze problemen ligt in ervoor te zorgen dat RDD-partities en sorteervolgorde idempotent zijn. Elk van de volgende drie methoden zorgen voor dit en kan worden toegepast: 1) caching het gegevenskader voor operaties 2) repartitioning door een kolom of een reeks kolommen, en 3) het gebruik van geaggregeerde functies⁵. Een voorbeeld van elke methode is weergegeven in Figuur 6.

Figuur 6: Drie verschillende methoden om inconsistenties te voorkomen in randomSplit() en sample()

het cachen van het oorspronkelijke gegevensframe leidt tot partitie-inhoud die in het geheugen wordt bewaard. Dus in plaats van het opnieuw ophalen van gegevens, partitioneren en sorteren, Spark blijft operaties met behulp van de gepartitioneerde gegevens in het geheugen. Merk op dat cache() een alias is voor persist(pyspark.StorageLevel.memory_only) wat misschien niet ideaal is als je geheugenbeperkingen hebt. In plaats daarvan kunt u overwegen om persist(pyspark.StorageLevel.memory_and_disk_only)te gebruiken. Als er geen geheugen of schijfruimte beschikbaar is, zal Spark gegevens opnieuw ophalen en partitioneren vanaf het begin, dus het kan verstandig zijn om dit te controleren vanaf de Spark web UI. Caching is de oplossing die ik in mijn geval heb gekozen.

samenvatting en belangrijkste afhaalmaaltijden

moraal van het verhaal is: als er onverwacht gedrag gebeurt in Spark, moet je gewoon wat dieper graven! Hier is een samenvatting van alle belangrijke punten van dit artikel:

  • randomSplit() is gelijk aan het meerdere keren toepassen van sample() op uw dataframe, waarbij elke sample opnieuw wordt opgehaald, partitioneerd en uw dataframe binnen partities wordt gesorteerd.
  • de gegevensdistributie over partities en sorteervolgorde is belangrijk voor zowel randomSplit () als sample (). Als een van beide wordt gewijzigd bij het opnieuw ophalen van gegevens, kunnen er duplicaten of ontbrekende waarden zijn over splitsingen en kan hetzelfde monster met hetzelfde zaad verschillende resultaten opleveren.
  • deze inconsistenties kunnen niet bij elke run voorkomen, maar om ze volledig te elimineren, persist (aka cache) uw dataframe, herpartitioneer op een kolom(s), of voeg samengevoegde functies toe zoals groupBy.

Geef een antwoord

Het e-mailadres wordt niet gepubliceerd. Vereiste velden zijn gemarkeerd met *