Lenguajes de Programación

Cómo usar Thread Pool en C++

Aprende a crear tus propios thread pool en C++, primero de forma teórica y después con un caso práctico.

Publicado el 03 de Octubre de 2018
Compartir

Aprende a crear tus propios thread pool en C++, primero de forma teórica y después con un caso práctico.

Cómo crear un thread pool en C++

Vamos a ver cómo nos podemos crear nuestro propio thread pool en C++.

En otros lenguajes de programación como Python entre otros, disponemos de una clase por si queremos paralelizar partes de nuestro código, en la cual pasamos una tarea o una función y una lista con los argumentos que va a recibir, y esa tarea se paraleliza y obtenemos unos resultados.

Vamos a intentar recrear lo mismo con C++ con nuestro propio código.

Para ello vamos a necesitar las cabeceras thread, atomic, mutex, functional, vector y queue.

Después creamos la clase ThreadPool, que va a tener un mutex para para gestionar la cola de tareas, un mutex para gestionar el vector de resultados, un vector con los threads que vamos a tener que manejar, y un atributo para saber cuántos threads estamos usando.

Creamos un constructor, al que le pasamos el tamaño del pool, que es cuántos threads queremos que use, que en nuestro caso será el número de cores que tenga la máquina en la que se está usando el programa.

Después pasamos a la función map, que es una función que recibe otra función y en este caso un vector con argumentos. Lo que va a hacer será aplicar esa función a cada uno de los elementos de la lista por separado, recoger el resultado, almacenarlo en otro vector y devolvérnoslo.

Esto lo hacemos creando una cola, en la que vamos a ir almacenando las funciones con las que queremos trabajar las tareas.

Una tarea, va a ser aplicar la función que nos han pasado a uno de los elementos. Para ello lo que hacemos es crear un wrapper, en el cual accedemos a la función y al argumento en un bucle, nos creamos una función lambda y la pusheamos a la cola de tareas.

Cuando lleguemos al número máximo de threads, creamos un thread que ejecutará una función while, que lo hace que es intentar acceder al recurso de la cola. Os recordamos que los recursos hay que controlarlos para evitar que varios threads accedan a la vez al mismo recurso.

Mientras la cola tenga elementos que haya que ejecutar, recogemos la tarea, eliminamos la misma de la cola, liberamos el recurso de la cola, ejecutamos la tarea, almacenamos el resultado en una variable temporal y nos reservamos el acceso al recurso de los resultados.

Tanto la cola de tareas como el vector de resultados es compartido entre threads, por lo cual hay que gestionarlo, pusheamos el resultado y pasamos a la siguiente.

En el momento en el que en el que la cola de tareas esté vacía, significa que ya no vamos a ejecutar más tareas y el thread termina su ejecución.

Finalmente almacenamos cada uno de los threads y una vez que hemos terminado de crear todos los threads que necesitamos, esperamos que terminen sus tareas y devolvemos los resultados.

class ThreadPool
{
private:
    std::mutex 			m_queue_mutex;
    std::mutex 			m_results_mutex;
    std::vector<std::thread> 	m_threads;
    unsigned int 			m_threads_count;
public:
    ThreadPool(unsigned int pool_size=std::thread::hardware_concurrency()) :
	m_queue_mutex		();
	m_thread_count		(pool_size),
	m_threads		()
{

};
template <class F, class R, class A>
std::vector<R> map(const F& f, std::vector<A> args)
{
    std::queue <std::function<R()>> tasks;
    std::vector<R> results;
    for(auto& arg: args)
    {
        auto task = [&f, &arg]()
        {
            return f(arg);
        };
        tasks.push(std::move(task));
    }
    for (unsigned int i = 0; i < m_thread_count; ++i)
    {
        std::thread t	
        (
                [this, &tasks, &results}()
                {
                    while(true)
                    {
                        if(m_queue_mutex.try_lock())
                        {
                                if (!tasks.empty() {
                                    auto task = tasks.front();
                                    tasks.pop();
                                    m_queue_mutex.unlock();
                                    auto res = task();
                                    std::lock_guard<std::mutex> results_lock(m_results_mutex);
                                    results.push_back(res);
                            }
                            else
                            {
                            m_queue_mutex.unlock();
                            break;
                            }
                        }
    }
            }
        );
        m_threads.push_back(std::move(t));
    }

};	

Ejemplo práctico

int main() {
//
    std::cout << “Threading map example” << std::endl;
    std::cout << “Concurrent threads: ” << std::thread::hardware_concurrency() << std::endl;
    ThreadPool pool;
    std::vector<int> args{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
    auto res = pool.map<std::function>, int, int>
    (
        [](int i)
        {
            std::this_thread::sleep_for(std::chrono::seconds(rand()%2));
            return i +10;
        }
        Args
    );
    for (int& r: res)
    {
        std::cout << r << std::endl;
    }
    return 0;
}

En el ejemplo hemos creado un ThreadPool, tenemos unos argumentos a los que les vamos a sumar 1, después de hacer un sleep aleatorio del thread entre entre 0 y 1 segundos.

Si lo ejecutamos, vemos como los threads van trabajando en ello, y que además nuestra máquina tiene cuatro cores, que son los que están trabajando.

Comprobamos que la adición no es secuencial, ya que no sabemos qué thread se está ejecutando en cada momento, ni cuál va a terminar antes que otro, no solo por el por el random que le hemos añadido en la función, sino porque no podemos controlar el orden de ejecución en sí de los threads una vez creados.


Compartir este post

También te puede interesar...

Tecnología

CLion: Una vista rápida a este IDE para C++

10 Septiembre 2018 Daniel Sánchez Quirós
Tecnología

Uso de argumentos variables en C++

01 Octubre 2018 Daniel Sánchez Quirós
Programación genérica y concurrente

Curso de Programación genérica y concurrente con C++

1 hora y 58 minutos · Curso

Conoce los conceptos fundamentales de la programación genérica y concurrente para crear código reusable e independiente.

  • Metodologías
Artículos
Ver todos