Spark implementacja randomSplit()
funkcja podpisu randomSplit() zawiera listę wagi i specyfikację nasion. Lista wagowa ma określać liczbę podziałów i procent (przybliżony) w każdym, a materiał siewny ma na celu odtwarzalność. Stosunek ten jest przybliżony ze względu na charakter sposobu jego obliczania.
na przykład poniższy kod na rysunku 3 podzieliłby df na dwie ramki danych,train_df
=80%, atest_df
= 20% oryginalnej ramki danych. Używając tej samej wartości dla random seed, spodziewamy się, że te same punkty danych będą w tym samym Podziale, jeśli ponownie uruchomimy skrypt lub Spark wewnętrznie przebuduje podział.
pod maską
powtarzany jest następujący proces generowania każdej podzielonej ramki danych: partycjonowanie, sortowanie wewnątrz partycji i próbkowanie Bernoulliego. Jeśli oryginalna ramka danych nie jest buforowana, dane zostaną ponownie pobrane, ponownie podzielone na partycje i ponownie posortowane dla każdego obliczenia podziału. To źródło potencjalnych anomalii. Podsumowując, randomSplit() jest odpowiednikiem wykonywania sample () dla każdego podziału, przy czym procent do próbki zmienia się wraz z wykonywanym podziałem. Jest to oczywiste, jeśli zbadasz kod źródłowy randomSplit () w PySpark3. Ten blog⁴ zawiera również więcej informacji i wizualizacji na temat implementacji funkcji randomSplit ().
przejdźmy przez przykład. Rysunek 4 to schemat próbki() dla każdego podziału, zaczynając od podziału 0,80.
Spark wykorzystuje próbkowanie Bernoulliego, które można podsumować jako generowanie liczb losowych dla elementu (punktu danych) i przyjmowanie go do podziału, jeśli wygenerowana liczba mieści się w pewnym zakresie, określonym przez współczynnik podziału. Dla dzielonej ramki danych 0.8 zakres akceptacji dla próbnika komórek Bernoulliego byłby równy .
ten sam proces pobierania próbek jest wykonywany dla podziału 0,20 na rysunku 5, przy czym tylko granice akceptacji zmieniają się na .
ramka danych jest ponownie pobierana, dzielona i sortowana na partycjach. W przykładzie widać, że partycje RDD są idempotentne. Co oznacza, że punkty danych w każdej partycji na rysunku 4 pozostają w tej samej partycji na rysunku 5. Na przykład punkty b I c znajdują się w partycji 1 na rysunku 4 i 5. Dodatkowo zalążek związany z każdą partycją zawsze pozostaje stały, a kolejność wewnątrz partycji jest identyczna. Wszystkie trzy z tych punktów są fundamentalne zarówno dla sample (), jak i randomSplit (). Zapewnienie, aby ta sama próbka została wyprodukowana z tego samego materiału siewnego w pierwszym z nich oraz zagwarantowanie braku duplikatów lub zniknięcia punktów danych w drugim z nich.
rozwiązania mające na celu uniknięcie niespójności
naprawienie tych problemów polega na zapewnieniu, że partycje RDD i kolejność sortowania są idempotentne. Zapewnia to jedna z trzech następujących metod i można je zastosować: 1) buforowanie ramki danych przed operacjami 2) Ponowne dzielenie przez kolumnę lub zestaw kolumn oraz 3) użycie funkcji agregujących⁵. Przykład każdej z metod przedstawiono na rysunku 6.
buforowanie oryginalnej ramki danych prowadzi do przechowywania zawartości partycji w pamięci. Zamiast więc ponownie pobierać dane, partycjonować i sortować, Spark kontynuuje operacje przy użyciu partycjonowanych danych w pamięci. Zauważ, że cache() jest aliasem dla persist(pyspark.StorageLevel.memory_only)
, który może nie być idealny, jeśli masz ograniczenia pamięci. Zamiast tego możesz rozważyć użycie persist(pyspark.StorageLevel.memory_and_disk_only)
. Jeśli nie ma dostępnej pamięci lub miejsca na dysku, Spark ponownie pobierze Dane i partycje od zera,więc rozsądnie będzie monitorować je z interfejsu internetowego Spark. Buforowanie jest rozwiązaniem wybrałem w moim przypadku.
Podsumowanie i najważniejsze spostrzeżenia
Morał z tej historii jest taki: jeśli w Spark dzieje się nieoczekiwane zachowanie, trzeba tylko zagłębić się nieco głębiej! Oto podsumowanie wszystkich kluczowych punktów tego artykułu:
- randomSplit() jest równoważna wielokrotnemu zastosowaniu sample() na ramce danych, przy czym każda próbka pobiera ponownie, partycjonuje i sortuje ramkę danych wewnątrz partycji.
- rozkład danych na partycjach i kolejność sortowania jest ważna zarówno dla randomSplit (), jak i sample(). Jeśli zmiana nastąpi po ponownym pobraniu danych, mogą wystąpić duplikaty lub brakujące wartości w podziale, a ta sama próbka przy użyciu tego samego materiału siewnego może przynieść różne wyniki.
- te niespójności mogą nie występować przy każdym uruchomieniu, ale aby je całkowicie wyeliminować, zachowaj (aka cache) ramkę danych, podziel kolumnę(kolumny) lub zastosuj funkcje agregujące, takie jak groupBy.