Añadir almacenamiento para HDFS – Crear Partición en Linux

Como dijimos en el post «Montando un Cluster Big Data desde la a a la z«, es conveniente que los datos guardados en HDFS acaben en una partición de disco localizada en una unidad de almacenamiento externa a donde tenemos el sistema y las herramientas en cada datanode. Es por tanto que en este post, veremos como crear una partición y montarla en Linux para su posterior asignación a HDFS.

Una vez que tenemos la unidad de almacenamiento conectada al sistema, veremos que se ha añadido como sd<x> donde x toma letras a,b,c … según el número de discos que el sistema nos reconoce. Y de momento sin ninguna partición creada. Las particiones aparecen como sd<x><n> donde n es el número de la partición. Toda esta información la obtenemos como sigue:

lsblk

Esta estructura que vemos en la imagen, no es la ideal para un sistema Big Data, pues tenemos el disco sda particionado únicamente en dos particiones: sda1 para el boot lo cual está bien y sda2 para todo el resto desde la raiz /.
Es más conveniente disponer de particiones propias para los logs, y para /root. Pero dejamos ese tema para otro post donde hablemos de qué particionado es más conveniente para un datanode.

Para formatear el disco sdb y crear una partición en éste, que llamaremos /data/hdfs, usaremos la herramienta fdisk

fdisk /dev/sdb

Una vez tecleamos la opción n, nos preguntará si la partición es primary (p) o extended (e). tecleamos p para indicar que es partición primaria.

Nos preguntará número de la partición, de 1 a 4 pues máximo podemos tener 4 particiones primarias en un disco. Tecleamos 1.

Nos preguntará en qué sector queremos que comienze la partición, aquí lo sencillo es dejar el valor por defecto. Le damos intro.

Nos preguntará último sector, como queremos que la totalidad del disco se dedique a dicha partición, dejamos el valor por defecto que es hasta el máximo sector disponible. Le damos intro.

Partition 1 of type Linux and of size 8 GiB is set

Este mensaje nos indica que la partición se ha creado. Tecleamos w (write table to disk and exit) para guardar la tabla de particiones.

Ahora ya nos ha creado la partición sdb1 con la totalidad del espacio del disco sdb pero aún no esta formateada ni asignada a la ruta /data/hdfs.
Para ello, formateamos la partición con formato ext4 más conveniente para hadoop.

mkfs.ext4 /dev/sdb1

En este punto ya tenemos la partición formateada y un UUID asignado, que obtenemos con

blkid

Ahora creamos la ruta que asignaremos a la partición sdb1

cd /
mkdir /data
cd /data
mkdir hdfs

Finalmente para persistir los cambios y asignar la partición de forma permanente, modificamos el fichero /etc/fstab añadiendo:

echo UUID=f85d29e8-997f-45b6-908b-76d9cc9345d6 /data/hdfs ext4 defaults,noatime,nofail 0 2 >> /etc/fstab

noatime, nofail son opciones de montado de particiones que ayudan a mejorar el rendimiento de I/O y evitar errores. noatime evita guardar información relativa a cuándo se ha creado un fichero y cual es la última fecha de modificación. nofail va mejor combinada con x-systemd.device-timeout=5ms, pues dicha opción lo que hace es que en el inicio del sistema, si la unidad de almacenamiento falla, el sistema se inicia sin error, ya posteriormente podremos investigar el porqué ha fallado.

Para montar las paticiones:

mount -av
lsblk

Y esto seria todo, ya sería configuración de HDFS indicarle como ruta de data directory la ruta /data/hdfs.

Montando un Cluster Big Data desde la A a la Z. (Hortonworks)

El objetivo de este poste es el siguiente: tenemos máquinas conectadas a una red interna y queremos instalar y configurar herramientas Big Data para tener nuestro Cluster funcional usando Data Platforms que nos facilitan la integración de los servicios.


Inciso : En el curso Big Data Management iremos integrando las herramientas una a una según las vayamos necesitando y que en este post creamos un Cluster para tal fin: Management_prerrequisito01.


El paradigma en Big Data, y de sus frameworks y herramientas principales, es la distribución y el paralelismo. De ahí la necesidad de trabajar en una estructura de Cluster, máquinas conectadas entre sí que pueden trabajan juntas en la misma tarea. (Les recomiendo ver la sesión 01 del Curso de Management donde hablamos de estos temas: Management_sesión01)

Contenido

  1. Qué necesitamos antes de comenzar?
    1. Elección e instalación del sistema operativo de las máquinas.
    2. Configura la conectividad entre éstas.
    3. Arquitectura y servicios.
  2. Qué es Hortonworks Data Platform HDP?
  3. Instalar y configurar HDP.
  4. Crear un Cluster Big Data con HDP.
    1. Escoger los servicios que necesitamos.
    2. Configurar los servicios.
  5. Conclusiones.

1. Qué necesitamos antes de comenzar?

A la hora de escoger un sistema operativo para cada host de nuestro cluster, las empresas por lo general suelen decantarse por aquellas distribuciones Linux que poseen soporte y actualizaciones/parches garantizados a largo plazo, como por ejemplo Red Hat, SUSE, Ubuntu LTS. No obstante, no podemos afirmar qué distribución Linux es la mejor o estándar, cada caso de negocio casara mejor con alguna. Pero sí podemos afirmar que las mencionadas aquí son las que más mercado tienen.

En nuestro caso, y para la prueba que haremos aquí, usaremos Centos7 (1908) minimal iso. Centos (Community ENTerprise Operating System) Es un sistema operativo de código abierto, basado en la distribución Red Hat Enterprise Linux. Que usaremos por ser gratuita y que parte de Red Hat, una distribución empresarial ampliamente usada. Escoger la versión minimal es para reducir recursos y dado que no tendrá un uso de «desktop», nos comunicaremos con las máquinas via cliente ssh.

Al no disponer de máquinas reales, usaremos virtualización con VirtualBox.

  • Creamos una máquina Virtual con 30GB de disco virtual (Recomendable tamaño fijo para mejor rendimiento de la máquina virtual, tomará más tiempo en crearse pero si su computadora posee disco SDD no será demasiado), 3GB de RAM (que podremos manipular posteriormente en configuraciones)
    • En configuraciones de Red, para adaptador 1 escogemos NAT y en tipo adaptador Intel PRO/1000 MT Desktop. Adaptador que servirá para que la máquina virtual tenga acceso a Internet. Un segundo adaptador de tipo Red Nat para que las máquinas tengan una red interna entre sí y puedan comunicarse entre ellas (que previamente debemos añadir a la configuración global de VirtualBos en Herramientas -> Preferencias -> Red)
  • Instalamos el sistema Centos7 en la máquina montando la iso que nos hemos descargado previamente.

Una vez que tenemos el sistema operativo instalado, procedemos a configuraciones de red. Necesitamos modificar el hostname, definir una Ip estática para el adaptador Red Nat y añadir al resto de máquinas (que serán un clone de ésta cambiándoles la Ip) al fichero Hosts.

Para modificar el hostname:

hostnamectl set-hostname namenode
hostanem -f

Tendrá efecto el cambio al reiniciar la máquina.

Para definir IP estática:
Modificaremos ficheros asociados a cada adaptador localizados en la ruta /etc/sysconfig/network-scripts/ pero como me es más cómodo usar ifconfig para temas de red, lo tenemos que instalar

yum -y update
yum -y install net-tools
ifconfig -a

Veremos los nombres de los adaptadores de red

Modificaremos la IP para el adaptador enp0s8

cd /etc/sysconfig/network-scripts/
ls -al ifcfg*

La salida nos tiene que arrojar tantos ficheros como adaptadores, si nos falta alguno podemos copiar otro existente con el nombre correspondiente y modificarlo.

vi ifcfg-enp0s8
[root@localhost network-scripts]# cat ifcfg-enp0s8
TYPE=Ethernet
PROXY_METHOD=none
BROWSER_ONLY=no
BOOTPROTO=none # hemos modificado dhcp por none
DEFROUTE=yes
IPV4_FAILURE_FATAL=no
IPV6INIT=yes
IPV6_AUTOCONF=yes
IPV6_DEFROUTE=yes
IPV6_FAILURE_FATAL=no
IPV6_ADDR_GEN_MODE=stable-privacy
NAME=enp0s8
UUID=1c3979b1-53f8-408d-bd40-64086876ac00
DEVICE=enp0s8
ONBOOT=yes #lo hemos domificado a yes
IPADDR=192.168.56.100 #IP fija
NETMASK=255.255.255.0 
GATEWAY=10.0.2.15 # Gateway IP adaptador enp0s3
DNS1=8.8.8.8
DNS2=8.8.4.4

Una vez guardado el fichero, y para que tenga efecto el cambio, refrescamos el NetworkManager:

systemctl stop NetworkManager
systemctl disable NetworkManager
systemctl restart network
chkconfig network on
ifconfig -a

Ya deberíamos tener la IP configurada y con ping google.com por ejemplo comprobamos que seguimos teniendo conexión a internet.

Añadir datanodes al fichero Hosts (Si poseen suficientes recursos pueden tener un elevado número de datanodes, por lo menos 3 si queremos respetar el factor de replicación de HDFS de 3 réplicas de cada bloque por defecto) pero en mi caso sólo tendré un datanode, máquina virtual clone de ésta con IP 192.168.56.101

vi /etc/hosts
[root@localhost network-scripts]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.56.100  namenode
192.168.56.101  datanode

Una vez que he clonado la máquina virtual namenode, para tener otra máquina datanode, he realizado los mismos pasos anteriores, primero modificar hostname y posteriormente IP fija para el adaptador enp0s8 y también cambié el UUID que cambia de máquina a otra para ambos adaptadores. Para modificar el UUID podemos generarlo con uuidgen y añadirlo al fichero borrando el uuid existente:

uuidgen enp0s8 >> /etc/sysconfig/network-scripts/enp0s8
uuidgen enp0s3 >> /etc/sysconfig/network-scripts/enp0s3

También he añadido un disco duro virtual adicional al datanode (en configuración -> almacenamieto -> nuevo disco vdi). En una arquitectura Big Data, el namenode ha de tener más RAM y en general no se encarga de guardar datos y por tanto no importa que tenga mucha capacidad de almacenamiento. Los datanodes en cambio, guardan datos por lo que necesitan más capacidad de almacenamiento. Por lo general es recomendable que los datos se guarden en un disco adicional en cada datanode, así los datos no se mezclen con el sistema y una migración a discos más potentes sea más fácil.

Para ver como montar el disco duro y formatearlo para ser usado como almacenamiento de datos, véase este post: anadir-almacenamiento-para-hdfs

Resumen de lo que tenemos hasta ahora:

En cuanto a la arquitectura y servicios, en nuestro ejemplo tenemos un NameNode y un DataNode debido a las limitada RAM de mi computadora, es un ejemplo que no usaremos para ningún fin analítico sino simplemente montar la arquitectura que es exportable a caso real con más computdoras físicas y por tanto más servicios/herramientas podemos usar.

Ambas máquinas harán el rol de NodeManager. el DataNode tomara también el rol de segundary namenode y Zookeeper manager.

Los servicios que añadiremos son los básicos, HDFS como sistema de almacenamiento formateado en la ruta del disco adicional del datanode, YARN, MapReduce y Zookeeper.

Pueden probar y testear otras herramientas como Kafka, Spark, Hbase, más datanodes si poseen 16GB o más de RAM.

2. Qué es Hortonworks Data Platform HDP?

Las herramientas Big Data por lo general son software libre, pero la cuestión es que unas tienen cierta dependencia de otras lo que requiere de integración entre sí. La integración de las herramientas pasa por configuraciones en diversos ficheros lo que supone conocer bien las herramientas y su arquitectura para una integración óptima.

Otro punto que surge de forma natural de estas dependencias entre las herramientas es que para iniciar y usar una de ellas, se necesita iniciar, lanzando los scripts correspondientes, las herramientas de las que depende la que queremos usar. Como tampoco podemos dejarlos iniciados, sobre todo si tenemos nuestro negocio en el Cloud por cuestión de minimizar costes, surge una complejidad creciente en temas de integración.

Probablemente necesitaremos crear scripts que nos ayuden a iniciar las herramientas en el orden adecuado y tener al día los ficheros de configuración de cada herramienta. Si añadimos nuevas herramientas, tenemos que sumarlas a la integración del sistema.

En definitiva, existe una complejidad en integración y de ahí que surjan modelos de negocio basados en automatizar al cliente parte de dicha integración. Estos modelos de negocio, los Data Platform, como lo es Hortonworks, son un servicio que proporciona un ecosistema big data de manera más sencilla ocupándose de la parte de integración y Managment.

Hortonworks es una compañía de software que comenzó ofreciendo software libre (Y por tanto sólo integra software del ecosistema Hadoop opensource) para paliar esa necesidad de integración de herramientas y ofrecer un Data Platform. Últimamente fue adquirido por Cloudera, empresa que se encarga de ofrecer Data Platforms con soporte y otros más productos segmentando por ejemplo entre CDP (Cloudera Data Platform), CDF (Cloudera Data Flow para escenarios más enfocados a Stremaing), soluciones Híbridas entre OnPremis y Cloud, etc.

Aquí usaremos la Versión HDP 3.1.0 que nos da posibilidad de añadir a nuestro Cluster diversas herramientas Apache como HDFS, MapReduce, Yarn, Zookeeper, Hbase, Hive, Kafka, Spark,Pig, Oozie,Ranger, Tez, entre otras. Vease la documentación oficial en Cloudera HDP.

Para descargar y monitorizar las herramientas que incluye HDP, usaremos Apache Amabri, en su versión 2.7.4 . Donde tenemos acceso a la guía oficial en la documentación de Cloudera.

3. Instalar y configurar HDP

En este punto ya tenemos que tener todos los hosts configurados de modo que pueden comunicarse entre sí y poseen conexión a internet.

Para instala HDP mediante Ambari-server tenemos dos vías de hacerlo, una es mediante conectividad ssh, y otra es instalando previamente Ambari-agent en cada host. La primera opción necesita de nosotros crear public ssh-keys de cada host y añadirla al fichero /.ssh/authorized-keys de cada host e indicarle a Ambari-server la clave del namenode, entonces Ambari se encarga por nosotros de bajar los Ambari-agents en cada host mediante ssh. Pero aquí lo que haremos es evitar la conectividad ssh sin contraseñas e instalaremos primero Amabri-agent en cada host «a mano».

En todo lo que sigue, se ha de usar usuario root

  1. Añadir el repositorio de Hortonworks en cada host:
yum -y install wget
wget -nv http://public-repo-1.hortonworks.com/ambari/centos7/2.x/updates/2.7.4.0/ambari.repo -O /etc/yum.repos.d/ambari.repo
  1. En cada host, instalar Amabari-agent:
yum install ambari-agent
  1. En cada host, modificamos el fichero ambari-agent.ini para indicarle el hostname del datanode:
vi /etc/ambari-agent/conf/ambari-agent.ini
  1. En namenode, instalar Amabari-server:
yum install ambari-server
  1. En cada host, desactivar temporalmente el firewall. Ambari necesitará conectarse a diversos puertos, por simplicidad desactivamos el firewall. En un entorno real, o bien configuras una política de puertos, o bien añades una capa de VPN.
systemctl disable firewalld
systemctl stop firewalld
  1. Configuramos Amabari-server, en namenode:
ambari-server setup
  1. Configuraciones:
    1. SELinux is set to ‘permissive’ mode and temporarily disabled – tecleamos y
    2. Customize user account for ambari-server daemon [y/n] (n)? – intro
    3. Para el JDK escojemos opción 1 si no tenemos java instalado aún como es el caso, nos bajara open-jdk. Si no opción 2 y le damos la ruta donde tenemos Java.
    4. Si escogimos opción 1 en fase anterior, nos pedirá aceptar licencia Oracle, le damos inter
    5. Enable Ambari Server to download and install GPL Licensed LZO packages [y/n] (n)? – le decimos y
    6. Enter advanced database configuration [y/n] (n)? – le damos intro ( nos bajará PostgreSQL como base de datos «embebida» para almacenar configuraciones y metadata que necesita)

Nos aparecerá un mensaje indicando que la configuración ha acabado con éxito

Ambari Server 'setup' completed successfully.

Iniciamos Ambari-agent en cada host y comprobamos que los logs no arrojan errores:

ambari-agent start

Iniciamos Amabari-server en namenode:

ambari-server start

Entonces aquí ya tenemos Ambari-server escuchando en el puerto por defecto IP:8080. (En virtualBox he añadido en red la re-dirección de puertos para poder acceder desde el navegador de la máquina anfitrión). Así que, podemos acceder la interfaz gráfica de Ambari para comenzar a instalar en nuestro Cluster la plataforma HDP.
Las credenciales por defecto son admin admin.

4. Crear un Cluster Big Data con HDP

Una vez iniciado Ambari-server y estando en la interfaz web IP:8080, nos da la opción de crear un Cluster:

Etapa 0: Launch install Wizard, le danos nombre al Cluster.
Etapa 1: escogemos la versión HDP, nos indicará las herramientas que incorpora. También nos preguntará de qué repositorio bajar las herramientas, eliminamos todos menos los repositorios que corresponden a nuestro OS.
Etapa 2: dejamos los hostnames de cada host en diferentes lineas tal y como nos arroja el comando <hostname -f>. Escogemos la opción «perform manual registration» dado que hemos instalado e iniciado ambari-agent en cada host manualmente.
Etapa 3: después de aceptar las alertas de la etapa anterior, Ambari comprobara que tiene acceso a cada máquina sin errores y si todo es Ok, activará el botón next.
Etapa 4: escoger las herramientas que queremos instalar. Algunas herramientas como Hive, necesitan guardar sus metadatos y configuraciones en bases de datos como MySQL, PostgreSQL. Si escogemos dichos servicios nos pedirá bajar los correspondientes conectores y añadirlos a la configuración de Ambari-server setup.
Etapa 5: definir los roles de cada host. Aquí definimos la arquitectura de nuestro Cluster. En caso real, SnameNode suele ser una máquina que no hace el rol de Datanode como es el caso, pues se reserva para nameNode en caso de que el NameNode principal falle, pero aquí se lo asignamos al único datanode que tenemos. También, se recomienda tener ResourceManager asignado a NameNode en Clusters pequeños y un número impar de servicios Zookeeper. Es importante investigar esté tema según los servicios que necesita y recursos que dispone para definir una arquitectura de Cluster óptima en cuanto a disponibilidad y tolerancia a fallos. Doc HortonWorks
Etapa 6: asignar datanodes, namenodes, nodeManager y Clients. En Cluster pequeño, Datanode y NodeManager se asignan a los slaves. Clients tanto a Namenode como Slaves. El nameNode, no toma el rol de datanode, no guardará datos.
Etapa 7: en esta etapa se configuran los servicios, definir rutas donde dejar logs, datos, contraseñas para aquellos servicios que necesitan credenciales, conectividad a BBDD si hay servicios que lo necesitan etc. Aquí lo único que cambiaremos es el directorio HDFS que por defecto es /hadoop/hdfs/* a la ruta /data/hdfs/* que hemos creado en datanode en el almacenamiento añadido. En está etapa también podremos definir Java Heap Size y configuraciones de los ficheros de configuración de cada herramienta.

Una vez que hemos acabado de configurar los parámetros que nos interesan de cada fichero de configuración, le damos a next. Nos indicará un resumen, si todo nos parece coherente le damos Deploy y en la etapa 9 comenzará a instalar los servicios. Las configuraciones se pueden modificar posteriormente en caso de error o afinar algún parámetro, así como añadir datanodes o suprimir alguno.

Finalmente en la etapa 10, nos mostrará un resumen de la situación y posteriormente la interfaz de Ambari, donde podemos monitorizar el Cluster.

Ambari nos añade al PATH variables para acceder por consola a los servicios, podemos por ejemplo acceder a HDFS tecleando hdfs, a Hive usando hive etc.

5. Conclusiones

Hemos visto que un data platform nos permite bajar las herramientas, configurar sus ficheros de configuración, definir la arquitectura de nuestro Cluster y monitorizar cada servicio. Con sencillos pasos tenemos un Cluster funcional, dejando de lado la complejidad de gestionar cada herramienta por separado.

Nos permite tener los ficheros de configuración de cada herramienta bien organizdos, con la posiblidad de modificar dichos ficheros desde una interfaz web amigable. También nos permite iniciar y parar los servicios sin necesidad de lanzar los scripts correspondientes a ello, reduciendo esta labor a acceder al servicio en la interfaz web y seleccionar inicar o parar.

En cambio, notaremos que el consumo de recursos es mayor, y que quizás perdamos la habitud de conocer más a fondo nuestro entorno. Qué servicios iniciar y en qué orden, crear scripts para ellos, acceder a las rutas de configuración en caso de querer modificar algún parámetro y se genera la sensación de perder cierto control.

Para Clusters pequeños, personalmente prefiero integrar las herramientas sin usar Data platforms, en cambio para infraestructuras más complejas y con más recursos, un entorno basado en Data platfrom, puede simplificarnos en gran medida el tiempo dedicado al manegement, y también por lo general sólo las grandes empresas necesitan una estructura más compleja y les es rentable costear los precios de un Data platform que cada vez más tiende a soluciones en el Cloud o Híbridas.

Management_sesión04: Big Data Processing

En esta sesión veremos:

  • Escalabilidad, elasticidad y sus límites – Universal Scalability Law y como modela/mide la escalabilidad.
  • Desafíos en los procesamientos distribuidos – aspectos principales que se tienen en cuenta en las implementaciones de bases NoSQL.
  • Fases de una query distribuida – global query optimizer y local query optimizer
  • Data shipping, Query shipping – dos maneras de asignar query processing
  • Tipos de paralelismo – intra-query y inter-query parallelism
  • Criterios para escoger un acces plan

Management_sesión03: Hadoop Distributed File System (HDFS)

En esta sesión veremos:

  • Qué es HDFS – capacidades y utilidad
  • Arquitectura HDFS – cómo se organiza internamente Hadoop
    • Fragmentación, Réplicas y balanceamiento
  • File Formats en HDFS – diseños de particionado
    • Diseño Horizontal: SequenceFile
    • Diseño Horizontal: Avro
    • Diseño Híbrido: Parquet
    • Comparación
    • Compresión de datos en Hadoop
    • Cómo escoger file format y tipo de compresión

Esta sesión contiene asociada 3 sesiones prácticas:

  1. Management_sesion03_hands-on01: Comencemos con Hadoop-parte01
  2. Management_sesión03_Hands-on01: Comencemos con Hadoop_parte02
  3. Management_sesión03_Hands-on02: Block size & balanceamiento
  4. Management_sesion03_Hands-on03: FileFormats & Compresión

Management_sesion03_Hands-on03: FileFormats & Compresión

En esta sesión el objetivo es escribir y leer ficheros en distintos formatos (PlainText, SequenceFile, Avro y Parquet) y distintos algoritmos de compresión y ver como se comporta cada tipo en cuanto a tiempos y memoria guiándose por las cuestiones que adjunto aquí abajo.

Los ficheros los generamos usando un programa Java que han de descargar e importar como proyecto Maven a vuestro IDE y posteriormente compilar.
Se os pedirá completar el código añadiendo dos clases que faltan y adaptar la clase Main:

  • En el Package org.adilazh1.hdfs.reader añadir las clases
    • MyHDFSavroReader: que implemente la interfaz MyReader para poder leer ficheros con formato Avro.
      Como pista investiguen las clases DataFileReader y GenericDatumReader.
    • MyHDFSparquetReader: que implemente la interfaz MyReader para poder leer ficheros con formato Parquet.
      Como pista investiguen las clases ParquetReader y GenericRecord.
  • Adaptar la clase Main para que podamos llamar a las clases creadas y deserializar los ficheros Avro y Parquet para que sean legibles para nosotros.

Descargue el proyecto aquí HDFS-1.0.


Nota: Hay un error en el código, en la clase Main, en la parte de read, para -plainText usen un objeto de la clase MyHDFSPlainFileReader en lugar de MyHDFSSequenceFileReader.


Una vez completado el proyecto (puede hallar una posible solución aquí) y compilado, úselo para responder a las cuestiones:

A qué conclusiones ha llegado?

Habrá notado que una de las características diferenciales a simple vista entre los diversos formatos es la legibilidad. Debido a los metadatos, únicamente el formato de plainText es legible para nosotros, todos los demás necesitan ser deserializados. Pero dicha característica no es la que nos hará escoger un formato u otro, nos interesara según los intereses que tengamos, ahorrar espacio o explotar paralelismo para escoger un formato u otro.

Hemos notado también que al definir SequenceFile, de estructura key-value, hemos tenido que implementar la clave. No es algo automático. Lo que nos permite concluir que un fichero puede ser almacenado en diversos formatos pero somos nosotros los que, conociendo su estructura, adaptamos el fichero al formato si es preciso.

En cuanto a los tamaños de los ficheros comparados con plainText, observamos que SequenceFile para ficheros grandes tiende a ocupar lo mismo que plainText pues sólo añade cierta metadata y clave que relativamente no supone mucho espacio. Avro, de diseño horizontal, guarda el esquema del fichero en formato JSON adjunta al fichero y eso aumenta ligeramente el espacio ocupado. Parquet en cambio observamos que el espacio ocupado es mucho menor que plainText aún que añada header y footer por cada bloque y se debe a que Parquet compacta las columnas aprovechando su diseño híbrido, así se queda con datos menos repetetivos. También representa los valores numéricos del fichero en formato numérico minimal.

En cuanto a tiempos de lectura, si no deserializamos, las conclusiones son propias de la práctica anterior, depende del tamaño y número de bloques. En cambio, si se desea deserializar y hacer el contenido legible, ahí notamos que Avro y Parquet toman más tiempo que plainText y SequenceFile dado que se necesita mayor cómputo.

La compresión nos permite ahorrar espacio, y si es de tipo splittable nos permitirá reducir el shuffling por la red en nuestros jobs distribuidos. Aquí solo hemos hecho pruebas a modo de ejemplo con SequenceFile y los tres (dos) niveles de compresión que permite: RECORD, compresión a nivel de registros (values) y BLOCK, compresión a nivel de bloques. Una compresión a nivel de bloques reduce el tamaño dado que aplica sobre más cantidad de datos, reduciendo así número de bloques y tiempo de inserción. En cuanto a tiempo de lectura, la compresión introduce cierta sobrecarga que es mayor en el nivel RECORD debido a más «partes» a descomprimir que el nivel BLOCK.

En definitiva, no existe una regla definitiva de cómo escoger un formato u otro, una compresión u otra. Depende del problema, forma de datos, utilidad de los datos, son más del tipo Hot Data o Cloud Data,… Pero lo que hemos podido observar es que Parquet ahorra bastante en espacio sin aplicar una compresión específica, pues aprovecha compactación de columnas y la manera de guardad los valores numéricos. Añadiendo una compresión, deberíamos saber si nuestros datos están más enfocados a almacenarse con poca consulta o más bien a ser explotados por algoritmos en paralelo. Para el primer caso una compresión Snappy (no splittable) sería útil. En cambio, para aprovechar al máximo el paralelismo en jobs distribuidos y usando compresión, un algoritmo a considerar sería LZO.

Management_sesión02:Fundamentos de las bases de datos

En esta sesión veremos:

  • Data Files e Índices – cómo se organizan por dentro las bases de datos.
  • B+ Tree – como construir un índice B+ Tree y para qué casos es más eficiente.
  • Índice Hash – como construir un índiceHash y para qué casos es más eficiente
  • Query Optimizer – qué mecanismos usan las bases de datos (relacionales) para optimizar las consultas.
    • Semantic Optimizer
    • Syntactic Optimiser
    • Physical Optimisation

Management_sesion01:Introducción

En está sesión introductoria al Big Data veremos:

  • Big Data, una manera de gestionar datos? – Introducción a las necesidad que surgen en Big Data.
  • Business Intelligence – «El workflow» entre los datos y departamentos en un ecosistema Big Data.
  • Qué es el Big Data – posible definición de esté área de conocimiento.
  • Challenges – algunos problemas en el ecosistema Big Data y como se intentan resolver.
  • Paradigma – paradigma Big Data: distribución, paralelismo, NoSQL …
  • Cloud & On premise – ventajas vs inconvenientes. Big Data y Cloud
  • Salidas profesionales – comentamos ciertos perfiles Big Data y sus roles principales.

Management_sesión03_Hands-on02: Block size & balanceamiento

En esta práctica el objetivo es profundizar en tamaño de bloques, ver cómo afecta a tiempos de lectura, qué papel juegan en paralelismo de tareas y en definitiva cómo y a base de qué escoger un tamaño de bloque para nuestro fichero. Aprovechando las prácticas para observar como es el balanceamiento.

Primero nos bajamos y descomprimimos un fichero que les dejo en Google Drive

wget --no-check-certificate 'https://docs.google.com/uc?export=download&id=1aJ-yAvSkIvQy4mkosumgZIXdOAHTXIxM' -O wines.rar
unrar wines.rar

Y posteriormente hagamos los ejercicios adjuntos.

A qué conclusiones ha llegado?

Unos de los objetivos de estos ejercicios es responder a la pregunta inicial de cómo escoger el tamaño de bloques óptimo para el fichero que vamos a insertar. Habrán visto que un elevado número de bloques puede mejorar el balanceamiento pero genera sobrecarga sobre los nodos y los tiempos de lectura son mayores, cosa que tampoco queremos. También al tener un número pequeño de slaves fijo a 2, disponer de muchos bloques no ayuda al paralelismo ya que las tareas serán realizadas por los mismos slaves y sólo les estamos añadiendo más sobrecarga, digamos que para componer un trozo del fichero han de leer muchos bloques.
En definitiva escoger número de bloques depende de diversos factores, uno de los más importantes es la cantidad de workers que tenemos en nuestro Cluster, es decir, cantidad de núcleos totales de éstos. Esto nos permite considerar un tamaño de bloque que genere un número similar de bloques que núcleos tienen nuestros workers, así aprovechamos más el paralelismo sin generar sobrecarga.
También recordar que el número de bloques que se generan es el mínimo número capaz de albergar todo nuestro fichero, y si tenemos bloques de 1GB y se nos queda 1KB fuera, se va a generar un bloque que ocupa 1GB para almacenar 1KB, cosa también a tener en cuenta aún que premiamos perder almacenamiento a cambio de mejorar paralelismo.

En cuanto a añadir DataNodes a nuestro cluster con la finalidad de más escalabilidad, tenemos que tener en cuenta el ejercicio 3, los ficheros ya insertados han de redistribuirse para aprovechar el nuevo nodo.

Management_sesión03_Hands-on01: Comencemos con Hadoop_parte02

Está entrada es continuación de la práctica Management_sesión03_Hands-on01_parte01, donde veremos comándos básicos que nos permitirán insertar ficheros en HDFS y leerlos, así como conocer número de bloques, distribución de éstos, uso de disco, etc. También crearemos un proyecto en JAVA que en el que veremos como interactuar con nuestro Cluster Hadoop desde lenguajes de programación.

Algunos comándos:

  • Insertar fichero en HDFS
    Descargamos un fichero de prueba, por ejemplo este : https://opendata-ajuntament.barcelona.cat/data/es/dataset/culturailleure-bibliotequesimuseus
    Un csv del portal open data de Barcelona, que muestra lista de bibliotecas y museos de la ciudad de Barcelona que incluye las bibliotecas y las bibliotecas municipales, museos, salas de estudio…
    Recuerden que para descargarlo pueden usar wget o una vez bajado a la computadora, usar csp para transferirlo al máster.
    • -D indica que a continuación tenemos un parámetro
    • dfs.blocksize: permite definir el tamaño de bloques
    • dfs.replication: permite definir número de replicas (por defecto 3)
    • copyFromLocal: una de las posibles maneras de insertar ficheros que indica que el origen del fichero es nuestro sistema local. Chequean el comando -put
hadoop-2.10.0/bin/hdfs dfs -D dfs.blocksize=1m -D dfs.replication=2 -copyFromLocal ficheroOrigen destino


UPS!! un error.

at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:710)
copyFromLocal: File /user/adilazh1/C001_Biblioteques_i_museus.csv._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1).  
There are 0 datanode(s) running and no node(s) are excluded in this operation.

Nos aparece un mensaje indicándonos que no encuentra los DataNodes. Se debe a que nos hemos olvidado de configurar bien los ficheros hosts. Las máquinas virtuales llevan el fichero hosts de máster configurado pero no los slaves. Les dejo el link a stackoverflow para que puedan leer mi respuesta. Basicamente tenemos que modificar los ficheros /etc/hosts para añadir las IPs pertinentes.


  • Leer ficheros desde HDFS
    En las próximas sesiones donde hablemos de FileFormats veremos que no todos los formatos son legibles para nosotros debido a los metadatos y su estructura. Por tanto en muchas ocasiones necesitaremos de escribir programas para parsear los ficheros y poder tenerlos de forma legible. No obstante a efectos de pruebas usaremos el comando cat para leer ficheros.
hadoop-2.10.0/bin/hdfs dfs -cat C001_Biblioteques_i_museus.csv

Si no marcamos ninguna ruta si no únicamente el nombre del fichero que queremos leer, hadoop accede a la ruta por defecto que es /user/$USER que nuestro caso es /user/adilazh1

  • Disck usage
    Si queremos ver el uso de disco de cada slave podemos ejecutar la orden du:
hadoop-2.10.0/bin/hdfs dfs -du -h ruta_en_HDFS
  • Estadísticas y bloques
    Si queremos consultar estadísticas sobre un fichero, como número de bloques, tamaño, número de réplicas,… Lo podemos consultar via web en la interfaz que crea Hadoop en el puerto 50070 http://192.168.56.1:50070/explorer.html#/ o también por linea de comandos:
hadoop-2.10.0/bin/hdfs fsck fichero

Notese que nos marca Missing replicas, aquí nos mostrará el número de réplicas que no ha podido llevar a cabo dado que no existen DataNodes suficientes. Y aún que hayamos indicado un número superior de réplicas y en la GUI Hadoop así nos aparece, en realidad solo hará las replicas que son lógicamente posibles, tantas como DataNodes en nuestro Cluster.

Nos suele interesar a veces conocer número de bloques de un fichero que hay en cada slave, para ello:

hadoop-2.10.0/bin/hdfs fsck /user/adilazh1/C001_Biblioteques_i_museus.csv -files -blocks -locations | grep 192.168.56.101 | wc -l

con el comando anterior consultamos número de bloques del fichero «C001_Biblioteques_i_museus.csv» que hay en el slave de IP 192.168.56.101 que es slave1.
Si nos fijamos nos damos cuenta de que la suma de bloques en cada slave no corresponde con el número de bloques que posee el fichero pero es un múltiplo, de hecho es numero bloques * factor de replicación real.

Pasemos a programar!

A continuación el objetivo es comenzar a interactuar con Hadoop desde su API JAVA. Hadoop dispone de API para diversos lenguajes, podríamos decir que los más usados son JAVA, Scala y Python, así que hallaran una infinidad de documentación en esos lenguajes para conectar con Hadoop.

Como ejercicio os propongo crear un programa que nos permita indicarle por medio de argumentos si queremos insertar fichero o leer fichero. En caso de insertar fichero, indicarle en un segundo y tercero argumento el fichero a insertar y el nombre a darle en HDFS respectivamente. Y en caso de leer, pues le indicaríamos únicamente el nombre del fichero a recuperar de HDFS.

Una Posible solución?

  1. Creamos un proyecto Maven, al que incorporamos la dependencia
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.10.0</version>
		</dependency>

Maven ya se encarga por nosotros de incorporar las librerías compiladas. De las cuales usaremos FSDataOutputStream y FSDataInputStream principalmente. Y también necesitaremos un Objeto de la clase Configuration al cual le indicaremos la URL HDFS de nuestro máster.

package org.adilazh1.read_write;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class App {
	public static void main(String[] args) throws IOException {
		// Definimos configuración hacia nuestro Cluster
		Configuration config = new Configuration();
		config.set("fs.defaultFS", "hdfs://192.168.56.100:27000");
		
		if (args[0].equals("-write")) {
			writeToHDSF(config, args[1], args[2]);
		}
		if (args[0].equals("-read")) {
			readFromHDFS(config, args[1]);
		}
	}

	public static void writeToHDSF(Configuration config, String filePath, String file) throws IOException {
		FileSystem fs = FileSystem.get(config);

		Path hdfsWritePath = new Path("/user/adilazh1/" + file);
		FSDataOutputStream fsDataOutputStream = fs.create(hdfsWritePath, true);

		BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
		BufferedReader br = new BufferedReader(new FileReader(filePath));

		String line;
		while ((line = br.readLine()) != null) {
			bw.write(line);
			bw.newLine();
		}

		bw.close();
		fs.close();
		br.close();
	}

	public static void readFromHDFS(Configuration config, String file) throws IOException {
		FileSystem fs = FileSystem.get(config);

		FSDataInputStream inputStream = fs.open(new Path("/user/adilazh1/" + file));
		BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));

		String line = null;
		while ((line = br.readLine()) != null) {
			System.out.println(line);
		}

		inputStream.close();
		fs.close();
	}
}

  1. Para probar nuestro código necesitamos compilarlo y subirlo al máster.
    Para compilar depende del IDE que usamos, si usamos Eclipse primero creamos una configuración de ejecución: «Run» –> «Run Configuration» –> «Java Application» botón derecho y crear nueva configuración. Ahí damos nombre al proyecto y seleccionamos la clase Main

Y posteriormente «File»–>»Exportar» –> «Runnable Jar file» y escogemos la configuración que hayamos creado antes. (Más adelante veremos maneras más eligantes de hacerlo, usando el propio fichero .pom y Maven.)

  1. Una vez que tengamos el fichero .jar, lo subemos al Cluster y ejecutamos
scp read.jar adilazh1@127.0.0.1:./recursos/
java -jar read.jar -write ./C001_Biblioteques_i_museus.csv miFichero
java -jar read.jar -read miFichero

Management_sesion03_hands-on01: Comencemos con Hadoop-parte01

En esta sesión práctica el objetivo es descargar y configurar Hadoop por una parte y por otra, practicar ciertos comándos básicos como insertar y leer datos hacia y desde HDFS.
También tendremos que desarrollar un proyecto Maven con programa escrito en JAVA que nos permita escribir y leer ficheros interaccionando con HDFS desde JAVA.

Vamos a necesitar:

  • Tener en mente la arquitectura de nuestro Cluster virtual y la teoría sobre HDFS vista en la sesión03.
  • Un cliente ssh: Putty o mobaXterm
  • Un IDE de desarrollo: Eclipse o IntelliJ IDEA

Comencemos:

Parte 1: instalar y configurar Hadoop.
Iniciamos las tres máquinas virtuales y nos logeamos

usuario: adilazh1
password: azh1

Nos conectamos al master via ssh y una vez dentro creamos un directorio «Descargas»

ssh adilazh1@127.0.0.1
cd
mkdir Descargas

Y ahora descargamos Hadoop 2.10.0 y Java 8 dentro de dicha carpeta

cd Descargas
wget http://apache.uvigo.es/hadoop/common/hadoop-2.10.0/hadoop-2.10.0.tar.gz
tar xf hadoop-2.10.0.tar.gz -C ~/
wget https://www.dropbox.com/s/x1kyrrhmdiwaiyv/jdk-8u241-linux-x64.tar.gz?dl=1
mv jdk-8u241-linux-x64.tar.gz?dl=1 jdk-8u241-linux-x64.tar.gz
tar xf jdk-8u241-linux-x64.tar.gz -C ~/

Inciso: hemos usado jdk-8u241 de Oracle que para descargarlo se necesita logearse, para saltarnos esté paso, he dejado el link en Dropbox. Existen diversos openjdk que pueden usar como por ejemplo amazon corretto.
También los enlaces pueden romperse y dejar de funcionar, en ese caso consultando las webs oficiales podremos obtener nuevos links de descarga.


Ahora pasamos a la verdadera configuración:
1. En el fichero ~/hadoop-2.10.0/etc/hadoop/slaves incluimos los hostnames de los DataNodes, como ya están sus IPs de Red NAT incluidos en el fichero hosts del máster, incluimos únicamente sus nombres. Suprimiendo localhosts que viene por defecto. Quedando el contenido del fichero como sigue:

slave1
slave2

2. Modificamos el fichero ~/hadoop/etc/hadoop/core-site.xml (fichero que permite configurar hadoop) añadiedo dos propiedades. La primera para definir la URL HDFS para el NameNode. La segunda indicaremos donde HDFS guardará los datos:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://master:27000</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/home/adilazh1/data</value>
    </property>
</configuration>

3. HDFS contiene algunas configuraciones por defecto, que podemos consultar en la documentación. Vemos que define el tamaño mínimo de bloque y también el porcentaje mínimo de bloques que deben satisfacer el requisito mínimo de replicación para no entrar en savemode al iniciarse. Para evitar estas configuraciones y poder probar diferentes valores de block size y replicación, desactivamos dichos límites. Para ello modificamos el fichero ~/hadoop-2.10.0/etc/hadoop/hdfs-site.xml añadiendo las siguientes propiedades:

<configuration>
    <property>
        <name>dfs.namenode.fs-limits.min-block-size</name>
        <value>0</value>
    </property>
    <property>
        <name>dfs.namenode.safemode.threshold-pct</name>
        <value>0</value>
    </property>
</configuration>

4. Indicamos a Hadoop donde hemos descargado JAVA, modificando el fichero ~/hadoop-2.10.0/etc/hadoop/hadoop-env.sh modificando los valores de JAVA_HOME.
En dicho fichero también hallamos la variable HADOOP_HEAPSIZE que por defecto está en 1000MB que define JVM heap size. Si posen una computadora más potente con 16GB de RAM, y podéis dedicar más RAM a las VM, podéis indicar un valor superior a la variable HADOOP_HEAPSIZE y así los procesos que corren en JVM tomarán más memoria.

export JAVA_HOME=/home/adilazh1/jdk1.8.0_241

5. Copiamos la configuración a los slaves usando scp

scp -r hadoop-2.10.0 jdk1.8.0_241 adilazh1@slave1:.
scp -r hadoop-2.10.0 jdk1.8.0_241 adilazh1@slave2:.

Parte2: iniciar y apagar cluster hadoop
Ahora que ya hemos configurado hadoop y hemos copiado la configuración a los slaves, podemos iniciar el cluster hadoop y comprobar que todo funcione correctamente:

  1. Formateamos NameNode al tratarse de la primera vez
hadoop-2.10.0/bin/hdfs namenode -format
  1. Iniciamos hadoop!!!
hadoop-2.10.0/sbin/start-dfs.sh

Nos pedirá introducir contraseñas del usuario adilazh1.

Creamos nuestro primer directorio en HDFS

hadoop-2.10.0/bin/hdfs dfs -mkdir /user
hadoop-2.10.0/bin/hdfs dfs -mkdir /user/adilazh1

Hadoop crea una interfaz WEB en el puerto 50070 por defecto, pero esto en el máster que tenemos en versión minimal sin navegadores. Al estar usando una máquina virtual, vamos a poder redirigir puertos a nuestra máquina host y así acceder a la interfaz WEB con el navegador de nuestra máquina. Para ello creamos una regla en virtualbox:

  1. Sobre la máquina «master» –> «configuración» –>»red» –> «Adaptador 1″ –>»avanzados» –> «reenvío de puertos» –>»añadir regla como la imagen y aceptar»

Y recordemos que el adaptador que instala VirtualBox en nuestra máquina host, por defecto tiene como IP 192.168.56.1 (consultar en cmd con el comando ipconfig /all), así que podemos acceder con nuestro navegador usando la URL 192.168.56.1:50070

  1. Parar el Cluster de hadoop
hadoop-2.10.0/sbin/stop-dfs.sh

Para no hacer está práctica muy extensa, dejamos el resto que nos queda «comándos básicos de escritura y lectura desde consola y desde JAVA» para una segunda parte: Management_sesion03_hands-on01: Comencemos con Hadoop-parte02