Qué es Structured Streaming
Structured Streaming es un mecanismo que se construye encima de SparkSQL que nos permite la ingesta de datos en tiempo real y poder procesarlos.
Esto nos da muchos beneficios, porque es muy interesante ingestar datos en tiempo real y transformarlos para obtener resultados.
Características principales
Sus características principales son:
- Podemos realizar las mismas operaciones que realizamos sobre SparkSQL, sobre los DataFrame y los DataSet.
- La herramienta es la que se encarga de gestionar el streaming, es decir, la herramienta va creando pequeños procesos microbatch con cada uno de los datos que va recibiendo.
- Se asegura el end-to-end.
Práctica con Structured Streaming
Vamos a ver un pequeño ejemplo para verlo de forma más clara.
Tenemos un notebook en el que hemos la Row y el Spark Implicits, que es necesario para este tipo de transformaciones o de ingestas.
import org.apache.spark.sql.Row
import spark.implicits.
Ahora prepararemos un pequeño programa en el cual iremos ingestando información a través de un socket de la máquina, simplemente para probarlo. También implementaremos un "cuenta palabras", que irá contando y mostrando en pantalla las palabras que vayamos introduciendo.
Para ello vamos a ir creando las líneas que iremos leyendo.
Añadimos a continuación varias configuraciones. En primer lugar indicamos que como formato haga un socket, aunque lo más normal es hacerlo desde un fichero, pero para este ejemplo nos viene mejor hacerlo así. En segundo lugar indicamos dónde se encuentra ese socket y el puerto. Finalmente indicamos que comience la carga de datos:
val lineas = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
Esto genera un DataFrame de string con cada uno de los elementos que le enviemos.
Esas líneas, cuando las mandemos, las dividiremos en palabras con el carácter espacio, y después las contaremos. Esto se hace así:
val palabras = lineas.as[String].flatMap(_).split(" "))
val numPalabras = palabras.groupBy("value").count()
Así ya tendríamos preparadas esas dos instrucciones para escuchar una vez comencemos a lanzar las palabras.
Nuestro DataFrame va a tener un valor, que va a ser la palabra, y una cuenta, que será el número.
Finalmente vamos a hacer una query sobre ese DataFrame. Indicamos que nos la muestre en tipo update, es decir, las palabras que vayan cambiando, elegimos que nos lo muestre en consola.
Lanzamos la query e indicamos que acabe en 60 segundos:
val query = numPalabras.writeStream
.outputMode("update")
.format("console")
.start
query.awaitTermination(60000)
Al lanzar lo anterior comenzaría a leer a partir de nuestro socket. Abriremos nuestra máquina virtual, en nuestro caso una máquina virtual con Vagrant, y lanzamos el comando:
nc -lk 999
Ya tenemos nuestro "cuenta palabras" escuchando, así que comenzamos a introducir frases y palabras para que vaya efectuando el conteo de las mismas.
Como vemos, a medida que introducimos palabras, se van contando las mismas y se va incrementando el valor numérico de las que se repiten.