miércoles, 4 de septiembre de 2013

MapReduce en el mundo real con C#

Durante el curso pasado estuve cursando el máster de Ingeniería y Tecnología del Software en la Universidad de Sevilla, durante el máster he tratado muchos temas en mayor o menor profundidad. De ellos, uno de los que más me interesó fue el de tratamiento de grandes cantidades de datos.

Hicimos un trabajo sobre Big Data, en el cual hicimos un pequeño estudio sobre tecnologías actuales como Hadoop, Disco y otras alternativas.

Muchas de ellas tienen potentes motores de MapReduce para el tratamiento de datos de manera paralela y distribuida. En este post no quiero entrar en profundidad sobre lo que es Hadoop o MapReduce, aunque si no sabes lo que es MapReduce y vas a continuar leyendo deberías echarle un ojo a esta presentación donde puedes entender el concepto en 1 minuto.

En el trabajo me asignaron un proyecto donde había que tratar unas cantidades de datos considerables. A continuación explicaré como lo enfoqué.

Contexto
El proyecto consistía en tratar grandes cantidades de facturas telefónicas, grandes cantidades es un concepto relativo, en este caso son unos 100 megabytes de texto plano, formato csv, sobre los que hay que realizar un intenso procesamiento.
Después del procesado había que generar un PDF con un resumen de la factura. Es decir, imaginaros vuestra factura mensual de teléfono fijo, pasarle una aplicación y generar un PDF donde se resumen el consumo que has tenido de voz (nacional, internacional), datos, roaming, etc etc. Obviamente esto es un poco más complicado porque en el caso a tratar existían unas tarifas "ad-hoc" que había que tener en cuenta y aplicar.

Enfoque
Obviamente 100 megabytes de texto plano no es problema que haya que abordar con Hadoop en un cluster de amazon con 50 instancias, pero si es cierto que aplicando técnicas convencionales de procesamiento los tiempos de ejecución se alargaban bastante (en torno a 30 minutos), ya que como he comentado anteriormente el procesamiento es intenso. Así que mi enfoque fue el implementar MapReduce usando hilos de ejecución.
Basicamente y para que se entienda mejor, implementar MapReduce en Hadoop viene a ser esto:


Donde cada nodo es un ordenador conectados por red (flechas). Mi enfoque fue que cada nodo fuera un hilo de ejecución, por tanto las flechas ya no representan la red, sino la memoria RAM.

Implementación
Es esta parte adjunto la clase MapReducer en la cual esta toda la lógica a aplicar sobre los datos. La clase no está completa puesto que hay mucha lógica que no tiene que ver con el post. Por ejemplo, dado que las facturas en realidad es un conjunto se subfacturas, hice la clase MapReducer genérica y los tipos se inyectan por reflexión en tiempo de ejecución, lo que hace que el código sea más complejo de leer, así que he omitido toda esa parte puesto que no tiene interés. Básicamente la clase tiene 3 métodos, Map, Reduce y MapReduce.
El único con visibilidad pública es MapReduce puesto que nunca se llaman a Map o Reduce por separado.

Adjunto el código simplificado y comentado.
 
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;


namespace MyNameSpace
{
    public class MapReducer
    {
        private const int NUM_CORES = 16;
  //Aquí es donde se añadiran los trozos durante la fase de "Map"
        private ConcurrentBag listBag = new ConcurrentBag();
  //Simplemente una envolutura para la "bolsa" de trozos que implementa toda la lógica de concurrencia
        private BlockingCollection listChunks;
  
  
  //Comparador por el cual decidiremos cuando dos trozos tiene la misma clave para poder actualizarlos (Fase "Reduce")
        private IEqualityComparer comparer;
  
  //Aquí es donde se reducirán los trozos que se generaron en la fase de "Map"
        private ConcurrentDictionary reportStore;
  
        
        List _tarifas;

        public MapReducer()
        {
   //Aquí se inicializan los objetos que controlaran la concurrencia
            listChunks = new BlockingCollection(listBag);
            comparer = new ReportComparer();
            reportStore = new ConcurrentDictionary(comparer);
            
            _tarifas = _tarLoader.BuildTarifas().GetTarifas(typeof(T));
        }

        private void Map(string filename)
        {
   //ProduceFileBlocks() parte el fichero en tantos trozos como hilos disponibles haya en el pool.
            Parallel.ForEach(ProduceFileBlocks(data as List), block =>
            {
    //Aquí se realiza el procesado de los datos
    Extractor extractor = new Extractor(_tarifas);
                var extracted = extractor.Extract(block);
    //Aquí se añaden al conjunto de trozos que posteriormente se reduciran
                foreach (var item in extracted)
                {
                    listChunks.Add(item);
                }

            });
   //Este es el mecanismo de sincronización, cuando todos los hilos hayan hecho 
   //CompleteAdding() empezarán a reducir
            listChunks.CompleteAdding();
        }

        private void Reduce()
        { 
            Parallel.ForEach(listChunks.GetConsumingEnumerable(), report =>
            {
    //Básicamente aquí lo que se hace es ver si cada trozo procesado en el Map está contenido en el la Dictionary,
    // y en caso de que si que esté, se actualiza con el valor antiguo más el nuevo.
                if (reportStore.ContainsKey(report))
                {
                    var old = reportStore[report];
                    var update = new Report();

                    update.Duracion = old.Duracion + report.Duracion;
                    update.Volumen = old.Volumen + report.Volumen;
                    update.Ocurrencias = old.Ocurrencias + report.Ocurrencias;
                    update.SubtotalVodafone = old.SubtotalVodafone + report.SubtotalVodafone;
                    update.SubtotalHeineken = old.SubtotalHeineken + report.SubtotalHeineken;
                    update.Diferencia = old.Diferencia + report.Diferencia;
     //Este método mira las claves de 'old' y 'update', y si son iguales, actualiza el diccioanrio con 'update'
     //Aquí es importante definir un comparador "custom" que dependerá de la lógica de cada implementación
                    reportStore.TryUpdate(old, update, update);
                }
                else
                {
                    //Si no hay ninguna clave en el Dictionary se añade el primero tal cual
                    reportStore.TryAdd(report, report);
                }

            });
        }
  //Método que engrana las dos fases. Es el único público, ni Map ni Reduce lo son puesto que no tiene sentido llamarlos aislados
        public void MapReduce(string filename)
        {
   //Inicialización previa en caso de que se ejecute dos veces y las "bolsas" estén llenas.
            if (listChunks.IsAddingCompleted)
            {
                listBag = new ConcurrentBag();
                listChunks = new BlockingCollection(listBag);
            }
   //Aquí se hace map en tantos hilos como hayas configurado. Yo tengo un pool de 16.
            System.Threading.ThreadPool.QueueUserWorkItem(delegate(object state)
            {  
                Map(filename);
            });
            Reduce();
        }
  //Método para ver los resultados
        public void PrintResults()
        {
            foreach (KeyValuePair item in reportStore)
            {
                Console.WriteLine(item.Value);
            }
        }
  
    }
}
Conclusiones

Bueno, básicamente la idea era explicar un poco cómo funciona esto y que puede ser aplicado en proyectos reales sin tener que currar en Facebook y sin tener que usar un cluster con 50 máquinas.

El feedback será apreciado y cualquier duda la responderé.

3 comentarios:

  1. De momento te lo copio para verlo más adelante, te hago un dofollow XD . Estoy volviendo al .net a ver que tal.

    ResponderEliminar
  2. Por curiosidad, en el post comentas que el proceso original tardaba unos 30 minutos, ¿cuánto tarda el proceso después de aplicar esta técnica?

    ResponderEliminar
  3. Cierto, no lo comenté. Pues antes de de hacer MapReduce intenté hacer una optimizacion de lo que había pero tardaba en torno a 20 minutos. Después de implementar MapReduce tarda entre 4 y 7 minutos,según el input.

    Un saludo!

    ResponderEliminar