miércoles, 4 de septiembre de 2013

MapReduce en el mundo real con C#

Durante el curso pasado estuve cursando el máster de Ingeniería y Tecnología del Software en la Universidad de Sevilla, durante el máster he tratado muchos temas en mayor o menor profundidad. De ellos, uno de los que más me interesó fue el de tratamiento de grandes cantidades de datos.

Hicimos un trabajo sobre Big Data, en el cual hicimos un pequeño estudio sobre tecnologías actuales como Hadoop, Disco y otras alternativas.

Muchas de ellas tienen potentes motores de MapReduce para el tratamiento de datos de manera paralela y distribuida. En este post no quiero entrar en profundidad sobre lo que es Hadoop o MapReduce, aunque si no sabes lo que es MapReduce y vas a continuar leyendo deberías echarle un ojo a esta presentación donde puedes entender el concepto en 1 minuto.

En el trabajo me asignaron un proyecto donde había que tratar unas cantidades de datos considerables. A continuación explicaré como lo enfoqué.

Contexto
El proyecto consistía en tratar grandes cantidades de facturas telefónicas, grandes cantidades es un concepto relativo, en este caso son unos 100 megabytes de texto plano, formato csv, sobre los que hay que realizar un intenso procesamiento.
Después del procesado había que generar un PDF con un resumen de la factura. Es decir, imaginaros vuestra factura mensual de teléfono fijo, pasarle una aplicación y generar un PDF donde se resumen el consumo que has tenido de voz (nacional, internacional), datos, roaming, etc etc. Obviamente esto es un poco más complicado porque en el caso a tratar existían unas tarifas "ad-hoc" que había que tener en cuenta y aplicar.

Enfoque
Obviamente 100 megabytes de texto plano no es problema que haya que abordar con Hadoop en un cluster de amazon con 50 instancias, pero si es cierto que aplicando técnicas convencionales de procesamiento los tiempos de ejecución se alargaban bastante (en torno a 30 minutos), ya que como he comentado anteriormente el procesamiento es intenso. Así que mi enfoque fue el implementar MapReduce usando hilos de ejecución.
Basicamente y para que se entienda mejor, implementar MapReduce en Hadoop viene a ser esto:


Donde cada nodo es un ordenador conectados por red (flechas). Mi enfoque fue que cada nodo fuera un hilo de ejecución, por tanto las flechas ya no representan la red, sino la memoria RAM.

Implementación
Es esta parte adjunto la clase MapReducer en la cual esta toda la lógica a aplicar sobre los datos. La clase no está completa puesto que hay mucha lógica que no tiene que ver con el post. Por ejemplo, dado que las facturas en realidad es un conjunto se subfacturas, hice la clase MapReducer genérica y los tipos se inyectan por reflexión en tiempo de ejecución, lo que hace que el código sea más complejo de leer, así que he omitido toda esa parte puesto que no tiene interés. Básicamente la clase tiene 3 métodos, Map, Reduce y MapReduce.
El único con visibilidad pública es MapReduce puesto que nunca se llaman a Map o Reduce por separado.

Adjunto el código simplificado y comentado.
 
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;


namespace MyNameSpace
{
    public class MapReducer
    {
        private const int NUM_CORES = 16;
  //Aquí es donde se añadiran los trozos durante la fase de "Map"
        private ConcurrentBag listBag = new ConcurrentBag();
  //Simplemente una envolutura para la "bolsa" de trozos que implementa toda la lógica de concurrencia
        private BlockingCollection listChunks;
  
  
  //Comparador por el cual decidiremos cuando dos trozos tiene la misma clave para poder actualizarlos (Fase "Reduce")
        private IEqualityComparer comparer;
  
  //Aquí es donde se reducirán los trozos que se generaron en la fase de "Map"
        private ConcurrentDictionary reportStore;
  
        
        List _tarifas;

        public MapReducer()
        {
   //Aquí se inicializan los objetos que controlaran la concurrencia
            listChunks = new BlockingCollection(listBag);
            comparer = new ReportComparer();
            reportStore = new ConcurrentDictionary(comparer);
            
            _tarifas = _tarLoader.BuildTarifas().GetTarifas(typeof(T));
        }

        private void Map(string filename)
        {
   //ProduceFileBlocks() parte el fichero en tantos trozos como hilos disponibles haya en el pool.
            Parallel.ForEach(ProduceFileBlocks(data as List), block =>
            {
    //Aquí se realiza el procesado de los datos
    Extractor extractor = new Extractor(_tarifas);
                var extracted = extractor.Extract(block);
    //Aquí se añaden al conjunto de trozos que posteriormente se reduciran
                foreach (var item in extracted)
                {
                    listChunks.Add(item);
                }

            });
   //Este es el mecanismo de sincronización, cuando todos los hilos hayan hecho 
   //CompleteAdding() empezarán a reducir
            listChunks.CompleteAdding();
        }

        private void Reduce()
        { 
            Parallel.ForEach(listChunks.GetConsumingEnumerable(), report =>
            {
    //Básicamente aquí lo que se hace es ver si cada trozo procesado en el Map está contenido en el la Dictionary,
    // y en caso de que si que esté, se actualiza con el valor antiguo más el nuevo.
                if (reportStore.ContainsKey(report))
                {
                    var old = reportStore[report];
                    var update = new Report();

                    update.Duracion = old.Duracion + report.Duracion;
                    update.Volumen = old.Volumen + report.Volumen;
                    update.Ocurrencias = old.Ocurrencias + report.Ocurrencias;
                    update.SubtotalVodafone = old.SubtotalVodafone + report.SubtotalVodafone;
                    update.SubtotalHeineken = old.SubtotalHeineken + report.SubtotalHeineken;
                    update.Diferencia = old.Diferencia + report.Diferencia;
     //Este método mira las claves de 'old' y 'update', y si son iguales, actualiza el diccioanrio con 'update'
     //Aquí es importante definir un comparador "custom" que dependerá de la lógica de cada implementación
                    reportStore.TryUpdate(old, update, update);
                }
                else
                {
                    //Si no hay ninguna clave en el Dictionary se añade el primero tal cual
                    reportStore.TryAdd(report, report);
                }

            });
        }
  //Método que engrana las dos fases. Es el único público, ni Map ni Reduce lo son puesto que no tiene sentido llamarlos aislados
        public void MapReduce(string filename)
        {
   //Inicialización previa en caso de que se ejecute dos veces y las "bolsas" estén llenas.
            if (listChunks.IsAddingCompleted)
            {
                listBag = new ConcurrentBag();
                listChunks = new BlockingCollection(listBag);
            }
   //Aquí se hace map en tantos hilos como hayas configurado. Yo tengo un pool de 16.
            System.Threading.ThreadPool.QueueUserWorkItem(delegate(object state)
            {  
                Map(filename);
            });
            Reduce();
        }
  //Método para ver los resultados
        public void PrintResults()
        {
            foreach (KeyValuePair item in reportStore)
            {
                Console.WriteLine(item.Value);
            }
        }
  
    }
}
Conclusiones

Bueno, básicamente la idea era explicar un poco cómo funciona esto y que puede ser aplicado en proyectos reales sin tener que currar en Facebook y sin tener que usar un cluster con 50 máquinas.

El feedback será apreciado y cualquier duda la responderé.

viernes, 30 de agosto de 2013

TORified .NET applications

Today I learned how to use TOR in a .NET application. It is pretty easy, I'm going to explain it step by step.
  1. 1. Download Privoxy and TOR and install them by the default way.
  2. 2. Open the config file in your Privoxy installation folder and unccomment the following line: "forward-socks5   /  127.0.0.1:9050". This will allow request routing over TOR.
  3. 3. Open Visual Studio and create a new console project and paste this piece of code:
        public static void Main(string[] args)
        {
            
            HttpWebRequest request = (HttpWebRequest)WebRequest.Create("http://whatismyipaddress.com/");
            request.Proxy = new WebProxy("127.0.0.1",8118);
            string s = "";
            using (var response = request.GetResponse())
            {
                using (var reader = new StreamReader(response.GetResponseStream(), Encoding.GetEncoding("utf-8")))
                {
                    s = reader.ReadToEnd();
                }
            }
        }

That's all. Now you can perform anonimous queries :)

sábado, 13 de agosto de 2011

Pasos a seguir para el reconocimiento de locutores

Bueno después de mucho pelearme y leer, he llegado a unas conclusiones.

Primero de todo, tener una base de datos de voces, lo interesante es construirla nosotros mismos con voces de personas de nuestro alrededor, pero para las tareas de testing yo voy a usar una base de datos, llamada BIOSECURE, que contiene 18 samples de audio de aproximadamente 10 segundos de duración. De cada sample solo hay unos 3 segundos aproximados de voz real, es decir, el resto es silencio, la manera de eliminar el silencio la veremos más adelante, para ello, usaremos una aplicación de la plataforma MISTRAL.

Lo segundo que tenemos que hacer es sacar el vector de caracteristicas de cada, para ello usaremos otra aplicación que está fuera de la plataforma MISTRAL, llamada Spro.

El tercer paso es quitar los silencios de cada sample de audio. Justo antes de suprimir el silencio, es interesante normalizar el vector de caracteristicas (MFCC en nuestro caso). Podemos normalizar de diferentes maneras, pero nosotros lo haremos siguiendo una distribución de probabilidad N(0,1).
Una vez normalizado, usaremos EnegyDetector para etiquetar las zonas con más energía de cada sample(Definimos un umbral, y si está por debajo de ese umbral es silencio y si está por encima es habla.)

Cuarto paso: World model training (no me gusta la traducción), este paso consiste en crear un modelo que representa el espacio total de posibles alternativas dado un hipotetico locutor.
Este es un modelo de aprendizaje, hemos elegido usar Gaussian Mixture Model (GMM). Con esto conseguimos un modelo universal con el que a posteriori compararemos  con el modelo objetivo, es decir, con el locutor que queremos identificar.

Quinto paso: Entrenar el modelo objetico o Target model training. El modelo del locutor se obtiene adaptando los parametros del modelo del cuarto paso usando una estimación MAP (Maximum a posteriori)


NOTA: Esto es un blog, y como he dicho, estoy haciendo un resumen "abstracto" de los pasos a seguir, aquí se nombran muchas técnicas para pasar de un paso a otro, muchas de ellas tienen una fuerte componente matemática/estadística, pero también es cierto que es asequible para alguien que tenga un mínimo conocimiento de inferencia estadística y contraste de hipótesis (nociones que "causalmente" se estudian en nuestra titulación)

martes, 26 de julio de 2011

Mistral open source platform

Hi, these days I was reading about speaker recognition in some papers (I'll give some links in other posts) and my conclusions can be resumed in this image:

General steps for speaker recognition.
So, I have been doing some proof of concept using CoMIRVA library but I didn't reach anything useful, but exploring Microsoft Speech API (SAPI) I saw an interesting topic in StackOverflow where I found Mistral platform. 
I didn't use Mistral, for now, but I'm reading its wiki and in this days I'll be testing this platform in deep.
Talking about Mistral, I can tell that it's written in C++, so it's cool because I hope to implement Mistral in an Android application using Android NDK.

So, as I go by testing I'm going to write here my progress.

miércoles, 6 de julio de 2011

Comienzo mi proyecto fin de carrera.

Bueno, después de días de pereza, hoy empiezo DE VERDAD mi proyecto fin de carrera, es el último "obstaculo" que me queda para ser ingeniero, así que, a partir de hoy, comienzo al 100% con él.
Voy a explicar un poco en qué consistirá: El proyecto se llama "Identificación de voces en dispositivos Android", aunque mi enfoque será el siguiente, voy a intentar hacer un módulo genérico de identificación de voces, y luego haré una implementación concreta, en una aplicación para la plataforma Android por ejemplo.

El primer paso a seguir es leer, leer muchos papers sobre identificación de voces, para ello, estoy haciendo uso de una herramienta muy interesante, Mendeley, se trata de un gestor de artículos, desde el cual puedes sincronizar tus artículos para tenerlos siempre disponibles desde cualquier ubicación. Aparte de esto, tiene otras muchas herramientas, como notas, subrayar texto, compartir y editar artículos con otros usuarios... etc.
El caso, es una herramienta muy completa y que pienso que me será muy útil.

Respecto al contenido de los papers, ahora mismo me encuentro leyendo este paper: "Real-Time Speaker Identification and Verification"

Pues bueno, este es el primer post sobre mi PFC y voy a escribir periodicamente mis progresos.

domingo, 5 de diciembre de 2010

Recuerda esto en el futuro.

Hi current Jesús, i'm writing from the future. Now you think that you are clear about C, pointer, strings, etc. But i know you so well, and i know that in the future, you'll need this code to open your mind and remember what you knew in the past about pointers.

For example, when you want to declare a string in C, i usually do:

char* foo = "myString";

or i can do:

char foo[] = "myString";

Ok, but, what about lists of strings? I mean, if i want to declare a list of strings, i could do this:

char* foo[] = {"myString1", "myString2"};


But now i was in the tube coming back to home and i was thinking about how to do this with pointers (i know that it is simply but i'm excited because i'm learning C and i discover that my thoughts weren't so far from reality)

So, this is another way to do the same, but knowledge is free :)

char** foo = (char**) malloc(sizeof(char*)*numberOfStrings);
foo[0] = "myString1";
foo[1] = "myString2";

Thanks Jesús from the future!!!!!!! :D

martes, 19 de octubre de 2010

using gcc

This is a reminder for my own in the future =D

1)
-gcc -o myprogram myprogram.c; for compiling and linking just a source file
2)
-gcc -c file1.c file2.c ......... fileN.c; for compile some files.
-gcc -o myprogram file1.o file2.o ....... fileN.o; for linking and generate the binary file
3)
if you want to see asm code generated in compilation time, you have to do:
-gcc -S -c file1.c file2.c ......... fileN.c; and then just open them with your favourite text editor