UDFS en Spark SQL

Si quieres saber qué es una UDF y cómo usarlas en Spark SQL, te lo contamos en este interesante video.

Qué es una UDF

UDF (User Defined Functions) son las funciones de usuario, y son sistemas para definir nuevos métodos SQL que operan sobre las columnas de un DataFrame.

Spark SQL ya tiene operaciones sobre columnas, como filtrar valores en un rango, pero podemos utilizar las UDFs para definir la lógica de nuestro negocio.

Por ejemplo, imaginemos que tenemos una columna que es un producto, y queremos una UDF que devuelva la categoría de producto.

Cómo definir una UDF en Scala

Para definir una UDF en Scala es necesario:

  • Importar org.apache.spark.sql.functions
  • Para definir una UDF tenemos un método que se llama “udf”, que recibe como argumento una lista de entradas, el carácter clave igual y mayor que, para separarlos de la lógica de la función, y después ponemos una lógica con la secuencia de operaciones hasta el retorno deseado: val miUdf = udf((x1: Tipo1, x2: Tipo2, …. , xN: TipoN) => … lógica de la función…)

Usar una UDF en un DataFrame

Una vez que hemos definido una UDF lo que hay que hacer es generar una columna en nuestro DataFrame, con el resultado de aplicar esa UDF.

Para ello se utiliza withColumn, que se le pasa como primer parámetro el nombre de la nueva columna general, y como segundo parámetro el nombre de la UDF y la serie de columnas sobre las que se aplica: miDataFrame.withColumn(“NuevaCol”, miUDF(col(“c1”), …, col(“cN”)))

Ejemplo

Vamos a ver lo anterior con un ejemplo.

En primer lugar sería generar una DataFrame, una tabla de valores. Podemos, por ejemplo, tomar las ventas de una empresa por días de la semana y el total de ventas. Una vez tenemos esa secuencia, con el método parallelize generamos un RDD y lo convertimos a un DataFrame con el método toDF, en el que tenemos dos columnas, el día y las ventas:

val ventasDias=
Seq((“lunes”, 100), (“martes”, 150), (“miércoles”, 140), (“jueves”, 145), (“viernes”, 155), (“sábado”, 90),
(“domingo”, 85))
val miRDD=spark.sparkContext.parallelize(ventasDias)
val tablaVentas=
spark.createDataFrame (miRDD).toDF(“dia”,”ventas”)
tablaVentas.show

La UDF que vamos a tomar como ejemplo es distinguir entre si el día de la semana es laboral o festivo.

Tendría una única entrada, que llamamos x, que es de tipo String. La lógica es que tenemos un if, en el que se comprueban dos condiciones, si es sábado o es domingo, y en ese caso devuelve “festivo”, y en otro caso devuelve “laboral”.

Una vez que hemos registrado la UDF, en la tabla de ventas invocamos a withColumn y definimos la nueva columna, que se llama “TipoDia”, y que es el resultado de aplicar la udfTipoDia sobre la columna “dia”:


val udfTipoDia = udf((x: String) => if (
(x.equals(“sabado”)) || (x.equals(“domingo”)) )
“festivo” else “laboral” )
val ventasTipoDia=tableVentas.withColumn(“TipoDia”, udfTipoDia(col(“dia”)))
ventasTipoDia.show
Una vez que tenemos esa columna, la podríamos utilizar en una agregación, por ejemplo podríamos
hacer un groupBy sobre TipoDia y sumando las ventas para días festivos y días laborales:
ventasTipoDia.groupBy(“TipoDia”).sum(“ventas”).show

Aprende a programar scripts de procesamiento de datos capaces de ejecutarse de forma clusterizada mediante el framework Apache Spark.

Recuerda que puedes comenzar este curso con tu suscripción de OpenWebinars. Si todavía no estás suscrito, aprovecha para hacerlo ahora.

Las cookies nos permiten ofrecer nuestros servicios. Al utilizar nuestros servicios, aceptas el uso que hacemos de las cookies. Más Información