Implementación de Spark de randomSplit()
La función de firma de randomSplit() incluye una lista de pesos y una especificación de semilla. La lista de pesos es para especificar el número de divisiones y el porcentaje (aproximado) en cada una y la semilla es para reproducibilidad. La relación es aproximada debido a la naturaleza de cómo se calcula.
Por ejemplo, el siguiente código de la Figura 3 dividiría df en dos marcos de datos, train_df
siendo el 80% y test_df
siendo el 20% del marco de datos original. Al usar el mismo valor para semilla aleatoria, esperamos que los mismos puntos de datos estén en la misma división si fuéramos a volver a ejecutar el script o Spark reconstruye internamente las divisiones.
Bajo el capó
Se repite el siguiente proceso para generar cada marco de datos dividido: particionamiento, ordenación dentro de particiones y muestreo Bernoulli. Si el marco de datos original no se almacena en caché, los datos se recuperarán, se repartirán y se ordenarán de nuevo para cada cálculo dividido. Esta es la fuente de anomalías potenciales. En resumen, randomSplit() es equivalente a realizar sample () para cada división con el porcentaje a la muestra cambiando con la división que se realiza. Esto es evidente si examina el código fuente de randomSplit () en PySpark3. Este blog provides también proporciona más información e imágenes sobre cómo se implementa randomSplit ().
Veamos un ejemplo. La Figura 4 es un diagrama de la muestra() para cada división, comenzando con la división 0.80.
Spark utiliza muestreo Bernoulli, que se puede resumir como generar números aleatorios para un elemento (punto de datos) y aceptarlo en una división si el número generado cae dentro de un cierto rango, determinado por la relación de división. Para un marco de datos dividido de 0,8, el rango de aceptación para el muestreador de celdas Bernoulli sería .
Se sigue el mismo proceso de muestreo para la división de 0,20 en la Figura 5, con solo los límites de aceptación cambiando a .
El marco de datos se recupera, se particiona y se ordena de nuevo dentro de particiones. Puede ver en el ejemplo que las particiones RDD son idempotentes. Lo que significa que los puntos de datos en cada partición en la Figura 4, permanecen en la misma partición en la Figura 5. Por ejemplo, los puntos b y c se encuentran en la partición 1 tanto en la Figura 4 como en la 5. Además, la semilla asociada a cada partición siempre permanece constante, y el orden dentro de las particiones es idéntico. Estos tres puntos son fundamentales tanto para sample() como para randomSplit(). Garantizar que la misma muestra se produzca con la misma semilla en la primera, y garantizar que no haya duplicados ni puntos de datos que desaparezcan en la segunda.
Soluciones para Evitar Inconsistencias
La solución de estos problemas radica en garantizar que las particiones RDD y el orden de clasificación sean idempotentes. Cualquiera de los tres métodos siguientes garantiza esto y se puede aplicar: 1) almacenar en caché el marco de datos antes de las operaciones 2) volver a particionar por una columna o un conjunto de columnas, y 3) usar funciones agregadas⁵. En la Figura 6 se muestra un ejemplo de cada método.
El almacenamiento en caché del marco de datos original conduce a que el contenido de la partición se mantenga en memoria. Por lo tanto, en lugar de volver a recuperar datos, particionar y ordenar, Spark continúa las operaciones utilizando los datos particionados en memoria. Tenga en cuenta que cache() es un alias para persist(pyspark.StorageLevel.memory_only)
que puede no ser ideal si tiene limitaciones de memoria. En su lugar, puede considerar usar persist(pyspark.StorageLevel.memory_and_disk_only)
. Si no hay memoria o espacio en disco disponible, Spark recuperará y particionará los datos desde cero, por lo que puede ser aconsejable supervisar esto desde la interfaz de usuario web de Spark. El almacenamiento en caché es la solución que elegí en mi caso.
Resumen y conclusiones clave
La moraleja de la historia es: si está ocurriendo un comportamiento inesperado en Spark, ¡solo necesita profundizar un poco más! Aquí hay un resumen de todos los puntos clave de este artículo:
- randomSplit() es equivalente a aplicar sample() en su marco de datos varias veces, con cada muestra de re-obtención, partición y ordenación de su marco de datos dentro de particiones.
- La distribución de datos entre particiones y el orden de clasificación es importante tanto para randomSplit () como para sample (). Si se cambia al volver a obtener los datos, puede haber duplicados o valores faltantes entre divisiones y la misma muestra que usa la misma semilla puede producir resultados diferentes.
- Es posible que estas inconsistencias no ocurran en cada ejecución, pero para eliminarlas por completo, mantenga (también conocido como caché) su marco de datos, repartición en una(s) columna (s) o aplique funciones agregadas como groupBy.