Spark randomSplit()の実装
randomsplit()のシグネチャ関数には、重みリストとシード仕様が含まれています。 ウェイトリストは、それぞれの分割数とパーセンテージ(近似)を指定することであり、シードは再現性のためのものです。 比率は、それがどのように計算されるかの性質のためにおおよそのものです。たとえば、図3の次のコードは、dfを2つのデータフレームに分割し、train_df
test_df
は元のデータフレームの20%です。 Random seedに同じ値を使用することで、スクリプトを再実行するか、Sparkが内部的に分割を再構築する場合、同じデータポイントが同じ分割にあることが期待
フードの下
パーティション分割、パーティション内のソート、およびベルヌーイサンプリング:次のプロセスが繰り返 元のデータフレームがキャッシュされていない場合、データは分割計算ごとに再フェッチされ、再パーティション化され、再ソートされます。 これが潜在的な異常の原因です。 要約すると、randomSplit()は、各分割に対してsample()を実行することと同等であり、分割が実行されるにつれてサンプルに対する割合が変化します。 これは、Pyspark3のrandomSplit()のソースコードを調べると明らかです。 このブログでは、randomSplit()がどのように実装されているかについて、より多くの情報とビジュアルも提供しています。
例を見てみましょう。 図4は、0.80分割から始まる各分割のサンプル()の図です。
Sparkは、アイテム(データポイント)の乱数を生成し、生成された数が分割比によって決定される特定の範囲内にある場合に分割にそれを受 0.8分割データフレームの場合、ベルヌーイセルサンプラーの許容範囲は次のようになります。
図5の0.20分割についても同じサンプリングプロセスが続き、受け入れの境界だけが変更されます。
データフレームが再フェッチされ、パーティション化され、パーティション内で再びソートされます。 この例では、RDDパーティションがべき等であることがわかります。 これは、図4の各パーティション内のデータポイントが、図5の同じパーティションに残ることを意味します。 たとえば、点bとcは、図4と図5の両方でパーティション1にあります。 さらに、各パーティションに関連付けられているシードは常に一定のままであり、パーティション内の順序は同じです。 これらの3つの点はすべて、sample()とrandomSplit()の両方にとって基本的なものです。 前者では同じシードで同じサンプルが生成されることを保証し、後者では重複または消失するデータポイントがないことを保証します。
不整合を回避するための解決策
これらの問題を修正するには、RDDパーティションと並べ替え順序が冪等であることを保証することにあ 1)操作の前にデータフレームをキャッシュする2)列または列のセットによる再パーティション化、および3)集計関数⁵を使用する。 各方法の例を図6に示します。
元のデータフレームをキャッシュすると、パーティションコンテンツがメモリに保持され そのため、データの再取得、パーティション分割、ソートの代わりに、Sparkはメモリ内のパーティション化されたデータを使用して操作を続行します。 Cache()はpersist(pyspark.StorageLevel.memory_only)
persist(pyspark.StorageLevel.memory_and_disk_only)
の使用を検討できます。 利用可能なメモリまたはディスク領域がない場合、Sparkはデータを最初から再取得してパーティション化するため、Spark Web UIからこれを監視するのが賢明 キャッシングは私が私の場合に選んだ解決策です。Sparkで予期しない動作が起こっている場合は、もう少し深く掘り下げる必要があります!
要約と重要な注意事項
物語の道徳的なものです:Sparkで予期しない動作が起こっている場合は、もう少し深く掘り下げる必要があります! この記事のすべての重要なポイントの概要は次のとおりです。
- randomSplit()は、データフレームにsample()を複数回適用することと同じです。
- パーティション間のデータ分布と並べ替え順序は、randomSplit()とsample()の両方にとって重要です。 データの再フェッチ時にいずれかが変更された場合、分割間で重複または欠損値が存在し、同じシードを使用する同じサンプルでは異なる結果が生成される可能性があります。
- これらの不整合はすべての実行で発生するわけではありませんが、それらを完全に排除するには、データフレームを永続化(別名キャッシュ)するか、列