Spark pod maską: randomSplit () i sample () wewnętrzne działanie

Spark implementacja randomSplit()

funkcja podpisu randomSplit() zawiera listę wagi i specyfikację nasion. Lista wagowa ma określać liczbę podziałów i procent (przybliżony) w każdym, a materiał siewny ma na celu odtwarzalność. Stosunek ten jest przybliżony ze względu na charakter sposobu jego obliczania.

na przykład poniższy kod na rysunku 3 podzieliłby df na dwie ramki danych,train_df=80%, atest_df = 20% oryginalnej ramki danych. Używając tej samej wartości dla random seed, spodziewamy się, że te same punkty danych będą w tym samym Podziale, jeśli ponownie uruchomimy skrypt lub Spark wewnętrznie przebuduje podział.

Rysunek 3: randomSplit () signature function example

pod maską

powtarzany jest następujący proces generowania każdej podzielonej ramki danych: partycjonowanie, sortowanie wewnątrz partycji i próbkowanie Bernoulliego. Jeśli oryginalna ramka danych nie jest buforowana, dane zostaną ponownie pobrane, ponownie podzielone na partycje i ponownie posortowane dla każdego obliczenia podziału. To źródło potencjalnych anomalii. Podsumowując, randomSplit() jest odpowiednikiem wykonywania sample () dla każdego podziału, przy czym procent do próbki zmienia się wraz z wykonywanym podziałem. Jest to oczywiste, jeśli zbadasz kod źródłowy randomSplit () w PySpark3. Ten blog⁴ zawiera również więcej informacji i wizualizacji na temat implementacji funkcji randomSplit ().

przejdźmy przez przykład. Rysunek 4 to schemat próbki() dla każdego podziału, zaczynając od podziału 0,80.

Rysunek 4: proces generowania podziału 0.8. Identyczna z implementacją sample ().

Spark wykorzystuje próbkowanie Bernoulliego, które można podsumować jako generowanie liczb losowych dla elementu (punktu danych) i przyjmowanie go do podziału, jeśli wygenerowana liczba mieści się w pewnym zakresie, określonym przez współczynnik podziału. Dla dzielonej ramki danych 0.8 zakres akceptacji dla próbnika komórek Bernoulliego byłby równy .

ten sam proces pobierania próbek jest wykonywany dla podziału 0,20 na rysunku 5, przy czym tylko granice akceptacji zmieniają się na .

Rysunek 5: proces generowania podziału 0.2. Identyczny z sample (). wdrożenie. Zawartość partycji pozostaje stała, a kolejność sortowania jest zachowana, zapewniając prawidłowy podział.

ramka danych jest ponownie pobierana, dzielona i sortowana na partycjach. W przykładzie widać, że partycje RDD są idempotentne. Co oznacza, że punkty danych w każdej partycji na rysunku 4 pozostają w tej samej partycji na rysunku 5. Na przykład punkty b I c znajdują się w partycji 1 na rysunku 4 i 5. Dodatkowo zalążek związany z każdą partycją zawsze pozostaje stały, a kolejność wewnątrz partycji jest identyczna. Wszystkie trzy z tych punktów są fundamentalne zarówno dla sample (), jak i randomSplit (). Zapewnienie, aby ta sama próbka została wyprodukowana z tego samego materiału siewnego w pierwszym z nich oraz zagwarantowanie braku duplikatów lub zniknięcia punktów danych w drugim z nich.

rozwiązania mające na celu uniknięcie niespójności

naprawienie tych problemów polega na zapewnieniu, że partycje RDD i kolejność sortowania są idempotentne. Zapewnia to jedna z trzech następujących metod i można je zastosować: 1) buforowanie ramki danych przed operacjami 2) Ponowne dzielenie przez kolumnę lub zestaw kolumn oraz 3) użycie funkcji agregujących⁵. Przykład każdej z metod przedstawiono na rysunku 6.

Rysunek 6: Trzy różne metody, aby uniknąć niespójności w randomSplit () i sample ()

buforowanie oryginalnej ramki danych prowadzi do przechowywania zawartości partycji w pamięci. Zamiast więc ponownie pobierać dane, partycjonować i sortować, Spark kontynuuje operacje przy użyciu partycjonowanych danych w pamięci. Zauważ, że cache() jest aliasem dla persist(pyspark.StorageLevel.memory_only), który może nie być idealny, jeśli masz ograniczenia pamięci. Zamiast tego możesz rozważyć użycie persist(pyspark.StorageLevel.memory_and_disk_only). Jeśli nie ma dostępnej pamięci lub miejsca na dysku, Spark ponownie pobierze Dane i partycje od zera,więc rozsądnie będzie monitorować je z interfejsu internetowego Spark. Buforowanie jest rozwiązaniem wybrałem w moim przypadku.

Podsumowanie i najważniejsze spostrzeżenia

Morał z tej historii jest taki: jeśli w Spark dzieje się nieoczekiwane zachowanie, trzeba tylko zagłębić się nieco głębiej! Oto podsumowanie wszystkich kluczowych punktów tego artykułu:

  • randomSplit() jest równoważna wielokrotnemu zastosowaniu sample() na ramce danych, przy czym każda próbka pobiera ponownie, partycjonuje i sortuje ramkę danych wewnątrz partycji.
  • rozkład danych na partycjach i kolejność sortowania jest ważna zarówno dla randomSplit (), jak i sample(). Jeśli zmiana nastąpi po ponownym pobraniu danych, mogą wystąpić duplikaty lub brakujące wartości w podziale, a ta sama próbka przy użyciu tego samego materiału siewnego może przynieść różne wyniki.
  • te niespójności mogą nie występować przy każdym uruchomieniu, ale aby je całkowicie wyeliminować, zachowaj (aka cache) ramkę danych, podziel kolumnę(kolumny) lub zastosuj funkcje agregujące, takie jak groupBy.

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *