Hicimos un trabajo sobre Big Data, en el cual hicimos un pequeño estudio sobre tecnologías actuales como Hadoop, Disco y otras alternativas.
Muchas de ellas tienen potentes motores de MapReduce para el tratamiento de datos de manera paralela y distribuida. En este post no quiero entrar en profundidad sobre lo que es Hadoop o MapReduce, aunque si no sabes lo que es MapReduce y vas a continuar leyendo deberías echarle un ojo a esta presentación donde puedes entender el concepto en 1 minuto.
En el trabajo me asignaron un proyecto donde había que tratar unas cantidades de datos considerables. A continuación explicaré como lo enfoqué.
Contexto
El proyecto consistía en tratar grandes cantidades de facturas telefónicas, grandes cantidades es un concepto relativo, en este caso son unos 100 megabytes de texto plano, formato csv, sobre los que hay que realizar un intenso procesamiento.
Después del procesado había que generar un PDF con un resumen de la factura. Es decir, imaginaros vuestra factura mensual de teléfono fijo, pasarle una aplicación y generar un PDF donde se resumen el consumo que has tenido de voz (nacional, internacional), datos, roaming, etc etc. Obviamente esto es un poco más complicado porque en el caso a tratar existían unas tarifas "ad-hoc" que había que tener en cuenta y aplicar.
Enfoque
Obviamente 100 megabytes de texto plano no es problema que haya que abordar con Hadoop en un cluster de amazon con 50 instancias, pero si es cierto que aplicando técnicas convencionales de procesamiento los tiempos de ejecución se alargaban bastante (en torno a 30 minutos), ya que como he comentado anteriormente el procesamiento es intenso. Así que mi enfoque fue el implementar MapReduce usando hilos de ejecución.
Basicamente y para que se entienda mejor, implementar MapReduce en Hadoop viene a ser esto:
Donde cada nodo es un ordenador conectados por red (flechas). Mi enfoque fue que cada nodo fuera un hilo de ejecución, por tanto las flechas ya no representan la red, sino la memoria RAM.
Implementación
Es esta parte adjunto la clase MapReducer en la cual esta toda la lógica a aplicar sobre los datos. La clase no está completa puesto que hay mucha lógica que no tiene que ver con el post. Por ejemplo, dado que las facturas en realidad es un conjunto se subfacturas, hice la clase MapReducer genérica y los tipos se inyectan por reflexión en tiempo de ejecución, lo que hace que el código sea más complejo de leer, así que he omitido toda esa parte puesto que no tiene interés. Básicamente la clase tiene 3 métodos, Map, Reduce y MapReduce.
El único con visibilidad pública es MapReduce puesto que nunca se llaman a Map o Reduce por separado.
Adjunto el código simplificado y comentado.
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace MyNameSpace { public class MapReducerConclusiones{ private const int NUM_CORES = 16; //Aquí es donde se añadiran los trozos durante la fase de "Map" private ConcurrentBag listBag = new ConcurrentBag (); //Simplemente una envolutura para la "bolsa" de trozos que implementa toda la lógica de concurrencia private BlockingCollection listChunks; //Comparador por el cual decidiremos cuando dos trozos tiene la misma clave para poder actualizarlos (Fase "Reduce") private IEqualityComparer comparer; //Aquí es donde se reducirán los trozos que se generaron en la fase de "Map" private ConcurrentDictionary reportStore; List _tarifas; public MapReducer() { //Aquí se inicializan los objetos que controlaran la concurrencia listChunks = new BlockingCollection (listBag); comparer = new ReportComparer(); reportStore = new ConcurrentDictionary (comparer); _tarifas = _tarLoader.BuildTarifas().GetTarifas(typeof(T)); } private void Map(string filename) { //ProduceFileBlocks() parte el fichero en tantos trozos como hilos disponibles haya en el pool. Parallel.ForEach(ProduceFileBlocks(data as List ), block => { //Aquí se realiza el procesado de los datos Extractor extractor = new Extractor (_tarifas); var extracted = extractor.Extract(block); //Aquí se añaden al conjunto de trozos que posteriormente se reduciran foreach (var item in extracted) { listChunks.Add(item); } }); //Este es el mecanismo de sincronización, cuando todos los hilos hayan hecho //CompleteAdding() empezarán a reducir listChunks.CompleteAdding(); } private void Reduce() { Parallel.ForEach(listChunks.GetConsumingEnumerable(), report => { //Básicamente aquí lo que se hace es ver si cada trozo procesado en el Map está contenido en el la Dictionary, // y en caso de que si que esté, se actualiza con el valor antiguo más el nuevo. if (reportStore.ContainsKey(report)) { var old = reportStore[report]; var update = new Report(); update.Duracion = old.Duracion + report.Duracion; update.Volumen = old.Volumen + report.Volumen; update.Ocurrencias = old.Ocurrencias + report.Ocurrencias; update.SubtotalVodafone = old.SubtotalVodafone + report.SubtotalVodafone; update.SubtotalHeineken = old.SubtotalHeineken + report.SubtotalHeineken; update.Diferencia = old.Diferencia + report.Diferencia; //Este método mira las claves de 'old' y 'update', y si son iguales, actualiza el diccioanrio con 'update' //Aquí es importante definir un comparador "custom" que dependerá de la lógica de cada implementación reportStore.TryUpdate(old, update, update); } else { //Si no hay ninguna clave en el Dictionary se añade el primero tal cual reportStore.TryAdd(report, report); } }); } //Método que engrana las dos fases. Es el único público, ni Map ni Reduce lo son puesto que no tiene sentido llamarlos aislados public void MapReduce(string filename) { //Inicialización previa en caso de que se ejecute dos veces y las "bolsas" estén llenas. if (listChunks.IsAddingCompleted) { listBag = new ConcurrentBag (); listChunks = new BlockingCollection (listBag); } //Aquí se hace map en tantos hilos como hayas configurado. Yo tengo un pool de 16. System.Threading.ThreadPool.QueueUserWorkItem(delegate(object state) { Map(filename); }); Reduce(); } //Método para ver los resultados public void PrintResults() { foreach (KeyValuePair item in reportStore) { Console.WriteLine(item.Value); } } } }
Bueno, básicamente la idea era explicar un poco cómo funciona esto y que puede ser aplicado en proyectos reales sin tener que currar en Facebook y sin tener que usar un cluster con 50 máquinas.
El feedback será apreciado y cualquier duda la responderé.