Management_sesión03: Hadoop Distributed File System (HDFS)

En esta sesión veremos:

  • Qué es HDFS – capacidades y utilidad
  • Arquitectura HDFS – cómo se organiza internamente Hadoop
    • Fragmentación, Réplicas y balanceamiento
  • File Formats en HDFS – diseños de particionado
    • Diseño Horizontal: SequenceFile
    • Diseño Horizontal: Avro
    • Diseño Híbrido: Parquet
    • Comparación
    • Compresión de datos en Hadoop
    • Cómo escoger file format y tipo de compresión

Esta sesión contiene asociada 3 sesiones prácticas:

  1. Management_sesion03_hands-on01: Comencemos con Hadoop-parte01
  2. Management_sesión03_Hands-on01: Comencemos con Hadoop_parte02
  3. Management_sesión03_Hands-on02: Block size & balanceamiento
  4. Management_sesion03_Hands-on03: FileFormats & Compresión

Management_sesion03_Hands-on03: FileFormats & Compresión

En esta sesión el objetivo es escribir y leer ficheros en distintos formatos (PlainText, SequenceFile, Avro y Parquet) y distintos algoritmos de compresión y ver como se comporta cada tipo en cuanto a tiempos y memoria guiándose por las cuestiones que adjunto aquí abajo.

Los ficheros los generamos usando un programa Java que han de descargar e importar como proyecto Maven a vuestro IDE y posteriormente compilar.
Se os pedirá completar el código añadiendo dos clases que faltan y adaptar la clase Main:

  • En el Package org.adilazh1.hdfs.reader añadir las clases
    • MyHDFSavroReader: que implemente la interfaz MyReader para poder leer ficheros con formato Avro.
      Como pista investiguen las clases DataFileReader y GenericDatumReader.
    • MyHDFSparquetReader: que implemente la interfaz MyReader para poder leer ficheros con formato Parquet.
      Como pista investiguen las clases ParquetReader y GenericRecord.
  • Adaptar la clase Main para que podamos llamar a las clases creadas y deserializar los ficheros Avro y Parquet para que sean legibles para nosotros.

Descargue el proyecto aquí HDFS-1.0.


Nota: Hay un error en el código, en la clase Main, en la parte de read, para -plainText usen un objeto de la clase MyHDFSPlainFileReader en lugar de MyHDFSSequenceFileReader.


Una vez completado el proyecto (puede hallar una posible solución aquí) y compilado, úselo para responder a las cuestiones:

A qué conclusiones ha llegado?

Habrá notado que una de las características diferenciales a simple vista entre los diversos formatos es la legibilidad. Debido a los metadatos, únicamente el formato de plainText es legible para nosotros, todos los demás necesitan ser deserializados. Pero dicha característica no es la que nos hará escoger un formato u otro, nos interesara según los intereses que tengamos, ahorrar espacio o explotar paralelismo para escoger un formato u otro.

Hemos notado también que al definir SequenceFile, de estructura key-value, hemos tenido que implementar la clave. No es algo automático. Lo que nos permite concluir que un fichero puede ser almacenado en diversos formatos pero somos nosotros los que, conociendo su estructura, adaptamos el fichero al formato si es preciso.

En cuanto a los tamaños de los ficheros comparados con plainText, observamos que SequenceFile para ficheros grandes tiende a ocupar lo mismo que plainText pues sólo añade cierta metadata y clave que relativamente no supone mucho espacio. Avro, de diseño horizontal, guarda el esquema del fichero en formato JSON adjunta al fichero y eso aumenta ligeramente el espacio ocupado. Parquet en cambio observamos que el espacio ocupado es mucho menor que plainText aún que añada header y footer por cada bloque y se debe a que Parquet compacta las columnas aprovechando su diseño híbrido, así se queda con datos menos repetetivos. También representa los valores numéricos del fichero en formato numérico minimal.

En cuanto a tiempos de lectura, si no deserializamos, las conclusiones son propias de la práctica anterior, depende del tamaño y número de bloques. En cambio, si se desea deserializar y hacer el contenido legible, ahí notamos que Avro y Parquet toman más tiempo que plainText y SequenceFile dado que se necesita mayor cómputo.

La compresión nos permite ahorrar espacio, y si es de tipo splittable nos permitirá reducir el shuffling por la red en nuestros jobs distribuidos. Aquí solo hemos hecho pruebas a modo de ejemplo con SequenceFile y los tres (dos) niveles de compresión que permite: RECORD, compresión a nivel de registros (values) y BLOCK, compresión a nivel de bloques. Una compresión a nivel de bloques reduce el tamaño dado que aplica sobre más cantidad de datos, reduciendo así número de bloques y tiempo de inserción. En cuanto a tiempo de lectura, la compresión introduce cierta sobrecarga que es mayor en el nivel RECORD debido a más «partes» a descomprimir que el nivel BLOCK.

En definitiva, no existe una regla definitiva de cómo escoger un formato u otro, una compresión u otra. Depende del problema, forma de datos, utilidad de los datos, son más del tipo Hot Data o Cloud Data,… Pero lo que hemos podido observar es que Parquet ahorra bastante en espacio sin aplicar una compresión específica, pues aprovecha compactación de columnas y la manera de guardad los valores numéricos. Añadiendo una compresión, deberíamos saber si nuestros datos están más enfocados a almacenarse con poca consulta o más bien a ser explotados por algoritmos en paralelo. Para el primer caso una compresión Snappy (no splittable) sería útil. En cambio, para aprovechar al máximo el paralelismo en jobs distribuidos y usando compresión, un algoritmo a considerar sería LZO.

Management_sesión03_Hands-on01: Comencemos con Hadoop_parte02

Está entrada es continuación de la práctica Management_sesión03_Hands-on01_parte01, donde veremos comándos básicos que nos permitirán insertar ficheros en HDFS y leerlos, así como conocer número de bloques, distribución de éstos, uso de disco, etc. También crearemos un proyecto en JAVA que en el que veremos como interactuar con nuestro Cluster Hadoop desde lenguajes de programación.

Algunos comándos:

  • Insertar fichero en HDFS
    Descargamos un fichero de prueba, por ejemplo este : https://opendata-ajuntament.barcelona.cat/data/es/dataset/culturailleure-bibliotequesimuseus
    Un csv del portal open data de Barcelona, que muestra lista de bibliotecas y museos de la ciudad de Barcelona que incluye las bibliotecas y las bibliotecas municipales, museos, salas de estudio…
    Recuerden que para descargarlo pueden usar wget o una vez bajado a la computadora, usar csp para transferirlo al máster.
    • -D indica que a continuación tenemos un parámetro
    • dfs.blocksize: permite definir el tamaño de bloques
    • dfs.replication: permite definir número de replicas (por defecto 3)
    • copyFromLocal: una de las posibles maneras de insertar ficheros que indica que el origen del fichero es nuestro sistema local. Chequean el comando -put
hadoop-2.10.0/bin/hdfs dfs -D dfs.blocksize=1m -D dfs.replication=2 -copyFromLocal ficheroOrigen destino


UPS!! un error.

at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:710)
copyFromLocal: File /user/adilazh1/C001_Biblioteques_i_museus.csv._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1).  
There are 0 datanode(s) running and no node(s) are excluded in this operation.

Nos aparece un mensaje indicándonos que no encuentra los DataNodes. Se debe a que nos hemos olvidado de configurar bien los ficheros hosts. Las máquinas virtuales llevan el fichero hosts de máster configurado pero no los slaves. Les dejo el link a stackoverflow para que puedan leer mi respuesta. Basicamente tenemos que modificar los ficheros /etc/hosts para añadir las IPs pertinentes.


  • Leer ficheros desde HDFS
    En las próximas sesiones donde hablemos de FileFormats veremos que no todos los formatos son legibles para nosotros debido a los metadatos y su estructura. Por tanto en muchas ocasiones necesitaremos de escribir programas para parsear los ficheros y poder tenerlos de forma legible. No obstante a efectos de pruebas usaremos el comando cat para leer ficheros.
hadoop-2.10.0/bin/hdfs dfs -cat C001_Biblioteques_i_museus.csv

Si no marcamos ninguna ruta si no únicamente el nombre del fichero que queremos leer, hadoop accede a la ruta por defecto que es /user/$USER que nuestro caso es /user/adilazh1

  • Disck usage
    Si queremos ver el uso de disco de cada slave podemos ejecutar la orden du:
hadoop-2.10.0/bin/hdfs dfs -du -h ruta_en_HDFS
  • Estadísticas y bloques
    Si queremos consultar estadísticas sobre un fichero, como número de bloques, tamaño, número de réplicas,… Lo podemos consultar via web en la interfaz que crea Hadoop en el puerto 50070 http://192.168.56.1:50070/explorer.html#/ o también por linea de comandos:
hadoop-2.10.0/bin/hdfs fsck fichero

Notese que nos marca Missing replicas, aquí nos mostrará el número de réplicas que no ha podido llevar a cabo dado que no existen DataNodes suficientes. Y aún que hayamos indicado un número superior de réplicas y en la GUI Hadoop así nos aparece, en realidad solo hará las replicas que son lógicamente posibles, tantas como DataNodes en nuestro Cluster.

Nos suele interesar a veces conocer número de bloques de un fichero que hay en cada slave, para ello:

hadoop-2.10.0/bin/hdfs fsck /user/adilazh1/C001_Biblioteques_i_museus.csv -files -blocks -locations | grep 192.168.56.101 | wc -l

con el comando anterior consultamos número de bloques del fichero «C001_Biblioteques_i_museus.csv» que hay en el slave de IP 192.168.56.101 que es slave1.
Si nos fijamos nos damos cuenta de que la suma de bloques en cada slave no corresponde con el número de bloques que posee el fichero pero es un múltiplo, de hecho es numero bloques * factor de replicación real.

Pasemos a programar!

A continuación el objetivo es comenzar a interactuar con Hadoop desde su API JAVA. Hadoop dispone de API para diversos lenguajes, podríamos decir que los más usados son JAVA, Scala y Python, así que hallaran una infinidad de documentación en esos lenguajes para conectar con Hadoop.

Como ejercicio os propongo crear un programa que nos permita indicarle por medio de argumentos si queremos insertar fichero o leer fichero. En caso de insertar fichero, indicarle en un segundo y tercero argumento el fichero a insertar y el nombre a darle en HDFS respectivamente. Y en caso de leer, pues le indicaríamos únicamente el nombre del fichero a recuperar de HDFS.

Una Posible solución?

  1. Creamos un proyecto Maven, al que incorporamos la dependencia
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.10.0</version>
		</dependency>

Maven ya se encarga por nosotros de incorporar las librerías compiladas. De las cuales usaremos FSDataOutputStream y FSDataInputStream principalmente. Y también necesitaremos un Objeto de la clase Configuration al cual le indicaremos la URL HDFS de nuestro máster.

package org.adilazh1.read_write;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class App {
	public static void main(String[] args) throws IOException {
		// Definimos configuración hacia nuestro Cluster
		Configuration config = new Configuration();
		config.set("fs.defaultFS", "hdfs://192.168.56.100:27000");
		
		if (args[0].equals("-write")) {
			writeToHDSF(config, args[1], args[2]);
		}
		if (args[0].equals("-read")) {
			readFromHDFS(config, args[1]);
		}
	}

	public static void writeToHDSF(Configuration config, String filePath, String file) throws IOException {
		FileSystem fs = FileSystem.get(config);

		Path hdfsWritePath = new Path("/user/adilazh1/" + file);
		FSDataOutputStream fsDataOutputStream = fs.create(hdfsWritePath, true);

		BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
		BufferedReader br = new BufferedReader(new FileReader(filePath));

		String line;
		while ((line = br.readLine()) != null) {
			bw.write(line);
			bw.newLine();
		}

		bw.close();
		fs.close();
		br.close();
	}

	public static void readFromHDFS(Configuration config, String file) throws IOException {
		FileSystem fs = FileSystem.get(config);

		FSDataInputStream inputStream = fs.open(new Path("/user/adilazh1/" + file));
		BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));

		String line = null;
		while ((line = br.readLine()) != null) {
			System.out.println(line);
		}

		inputStream.close();
		fs.close();
	}
}

  1. Para probar nuestro código necesitamos compilarlo y subirlo al máster.
    Para compilar depende del IDE que usamos, si usamos Eclipse primero creamos una configuración de ejecución: «Run» –> «Run Configuration» –> «Java Application» botón derecho y crear nueva configuración. Ahí damos nombre al proyecto y seleccionamos la clase Main

Y posteriormente «File»–>»Exportar» –> «Runnable Jar file» y escogemos la configuración que hayamos creado antes. (Más adelante veremos maneras más eligantes de hacerlo, usando el propio fichero .pom y Maven.)

  1. Una vez que tengamos el fichero .jar, lo subemos al Cluster y ejecutamos
scp read.jar adilazh1@127.0.0.1:./recursos/
java -jar read.jar -write ./C001_Biblioteques_i_museus.csv miFichero
java -jar read.jar -read miFichero

Management_sesion03_hands-on01: Comencemos con Hadoop-parte01

En esta sesión práctica el objetivo es descargar y configurar Hadoop por una parte y por otra, practicar ciertos comándos básicos como insertar y leer datos hacia y desde HDFS.
También tendremos que desarrollar un proyecto Maven con programa escrito en JAVA que nos permita escribir y leer ficheros interaccionando con HDFS desde JAVA.

Vamos a necesitar:

  • Tener en mente la arquitectura de nuestro Cluster virtual y la teoría sobre HDFS vista en la sesión03.
  • Un cliente ssh: Putty o mobaXterm
  • Un IDE de desarrollo: Eclipse o IntelliJ IDEA

Comencemos:

Parte 1: instalar y configurar Hadoop.
Iniciamos las tres máquinas virtuales y nos logeamos

usuario: adilazh1
password: azh1

Nos conectamos al master via ssh y una vez dentro creamos un directorio «Descargas»

ssh adilazh1@127.0.0.1
cd
mkdir Descargas

Y ahora descargamos Hadoop 2.10.0 y Java 8 dentro de dicha carpeta

cd Descargas
wget http://apache.uvigo.es/hadoop/common/hadoop-2.10.0/hadoop-2.10.0.tar.gz
tar xf hadoop-2.10.0.tar.gz -C ~/
wget https://www.dropbox.com/s/x1kyrrhmdiwaiyv/jdk-8u241-linux-x64.tar.gz?dl=1
mv jdk-8u241-linux-x64.tar.gz?dl=1 jdk-8u241-linux-x64.tar.gz
tar xf jdk-8u241-linux-x64.tar.gz -C ~/

Inciso: hemos usado jdk-8u241 de Oracle que para descargarlo se necesita logearse, para saltarnos esté paso, he dejado el link en Dropbox. Existen diversos openjdk que pueden usar como por ejemplo amazon corretto.
También los enlaces pueden romperse y dejar de funcionar, en ese caso consultando las webs oficiales podremos obtener nuevos links de descarga.


Ahora pasamos a la verdadera configuración:
1. En el fichero ~/hadoop-2.10.0/etc/hadoop/slaves incluimos los hostnames de los DataNodes, como ya están sus IPs de Red NAT incluidos en el fichero hosts del máster, incluimos únicamente sus nombres. Suprimiendo localhosts que viene por defecto. Quedando el contenido del fichero como sigue:

slave1
slave2

2. Modificamos el fichero ~/hadoop/etc/hadoop/core-site.xml (fichero que permite configurar hadoop) añadiedo dos propiedades. La primera para definir la URL HDFS para el NameNode. La segunda indicaremos donde HDFS guardará los datos:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://master:27000</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/home/adilazh1/data</value>
    </property>
</configuration>

3. HDFS contiene algunas configuraciones por defecto, que podemos consultar en la documentación. Vemos que define el tamaño mínimo de bloque y también el porcentaje mínimo de bloques que deben satisfacer el requisito mínimo de replicación para no entrar en savemode al iniciarse. Para evitar estas configuraciones y poder probar diferentes valores de block size y replicación, desactivamos dichos límites. Para ello modificamos el fichero ~/hadoop-2.10.0/etc/hadoop/hdfs-site.xml añadiendo las siguientes propiedades:

<configuration>
    <property>
        <name>dfs.namenode.fs-limits.min-block-size</name>
        <value>0</value>
    </property>
    <property>
        <name>dfs.namenode.safemode.threshold-pct</name>
        <value>0</value>
    </property>
</configuration>

4. Indicamos a Hadoop donde hemos descargado JAVA, modificando el fichero ~/hadoop-2.10.0/etc/hadoop/hadoop-env.sh modificando los valores de JAVA_HOME.
En dicho fichero también hallamos la variable HADOOP_HEAPSIZE que por defecto está en 1000MB que define JVM heap size. Si posen una computadora más potente con 16GB de RAM, y podéis dedicar más RAM a las VM, podéis indicar un valor superior a la variable HADOOP_HEAPSIZE y así los procesos que corren en JVM tomarán más memoria.

export JAVA_HOME=/home/adilazh1/jdk1.8.0_241

5. Copiamos la configuración a los slaves usando scp

scp -r hadoop-2.10.0 jdk1.8.0_241 adilazh1@slave1:.
scp -r hadoop-2.10.0 jdk1.8.0_241 adilazh1@slave2:.

Parte2: iniciar y apagar cluster hadoop
Ahora que ya hemos configurado hadoop y hemos copiado la configuración a los slaves, podemos iniciar el cluster hadoop y comprobar que todo funcione correctamente:

  1. Formateamos NameNode al tratarse de la primera vez
hadoop-2.10.0/bin/hdfs namenode -format
  1. Iniciamos hadoop!!!
hadoop-2.10.0/sbin/start-dfs.sh

Nos pedirá introducir contraseñas del usuario adilazh1.

Creamos nuestro primer directorio en HDFS

hadoop-2.10.0/bin/hdfs dfs -mkdir /user
hadoop-2.10.0/bin/hdfs dfs -mkdir /user/adilazh1

Hadoop crea una interfaz WEB en el puerto 50070 por defecto, pero esto en el máster que tenemos en versión minimal sin navegadores. Al estar usando una máquina virtual, vamos a poder redirigir puertos a nuestra máquina host y así acceder a la interfaz WEB con el navegador de nuestra máquina. Para ello creamos una regla en virtualbox:

  1. Sobre la máquina «master» –> «configuración» –>»red» –> «Adaptador 1″ –>»avanzados» –> «reenvío de puertos» –>»añadir regla como la imagen y aceptar»

Y recordemos que el adaptador que instala VirtualBox en nuestra máquina host, por defecto tiene como IP 192.168.56.1 (consultar en cmd con el comando ipconfig /all), así que podemos acceder con nuestro navegador usando la URL 192.168.56.1:50070

  1. Parar el Cluster de hadoop
hadoop-2.10.0/sbin/stop-dfs.sh

Para no hacer está práctica muy extensa, dejamos el resto que nos queda «comándos básicos de escritura y lectura desde consola y desde JAVA» para una segunda parte: Management_sesion03_hands-on01: Comencemos con Hadoop-parte02