BenBE's humble thoughts Thoughts the world doesn't need yet …

30.09.2011

Thread Pools

Filed under: Software — Schlagwörter: , , — BenBE @ 10:38:24

Neuere Prozessoren bieten immer mehr Leistung durch immer mehr parallele Kerne bei aber seit längerem nahezu gleich gebliebener Taktrate. Somit bleibt einem ohne Anpassung seiner Programme diese zusätzliche Leistung verwehrt. Nur in dem man sein Programm in mehrere Teile spaltet, die parallel ablaufen können, kann man sein Programm auch auf heutigen Prozessoren in optimaler Geschwindigkeit ausführen.

Für die Parallelisierung gibt es hierbei je nach Betriebssystem verschiedene Mittel. Die wohl bekanntesten Mittel stellen hierbei Threads unter Windows, bzw. das Forken unter Linux dar. Aber auch etwas exotischere Mittel wie PThreads oder Fibers bieten gute Möglichkeiten, um ein Programm auf mehrere CPUs zu verteilen.

Die verschiedenen Ansätze haben dabei gemeinsam, dass für verschiedene Aufgaben jeweils mehr oder weniger Umfangreiche Objekte erzeugt werden müssen, die für jede Aufgabe einen Zwischenstand enthalten, den sogenannten Kontext. Dieser ist bei Fibers (Windows) sehr klein, bei Multitasking mittels Prozessen aber durchaus sehr groß. Je größer dabei solch ein Kontext wird, desto langsamer wird das Umschalten zwischen mehreren Aufgaben. Außerdem steigt mit der Größe des Kontext oftmals auch der Aufwand für die Erzeugung eines neuen Kontextes, was insbesondere, wenn man viele kleinere Aufgaben parallelisieren möchte von großer Bedeutung ist.

Eine relativ gute Einführung (leider hinter einer via BugMeNot umgehbaren Paywall *sigh*) gibt es beim Linux Magazine. Auch die beiden Videos mit der Erklärung kann ich wärmstens ans Herz legen.

Nach diesem kurzen Abstecher nun wieder zurück zum eigentlichen Thema dieses Beitrages, denn wie auch im verlinkten Beitrag, sowie den beiden darin enthaltenen Videos erklärt, möchte man in aller Regel den Setup-Aufwand für viele kleine Aufgaben möglichst gering halten. Zu diesem Zweck bietet Windows sogenannte Threadpools. Etwas ähnliches bietet Linux nicht von Haus aus; doch das lässt sich recht einfach ändern.

Für ein eigenes Projekt unter Linux war ich nämlich selber auf der Suche und wurde nach etwas Suchen auch bereits fündig. Die gefundene Implementation ist zwar funktional und minimalistisch gehalten, hat jedoch ein paar kleinere Ecken und Kanten, weshalb ich sie für meine Zwecke noch einmal überarbeitet habe.

Da der Source somit eh einmal komplett bearbeitet wurde, nutze ich die Chance (insbesondere auf Grund des vielfachen Wunsches eines einzelnen Herren), um die Funktionsweise einmal direkt am Source des Thread Pools zu erklären. Eine Vorwarnung möchte ich aber bereits jetzt geben: Das wird etwas technisch 😉

Okay, Ihr wolltet es so! Als ersten Schritt auf dem Weg zu unserem Thread Pool schauen wir uns die benötigten Informationen an. Hierzu werfen wir zuerst einen Blick in den Header des Threadpools:

// Type to tell if threads in the pool should keep running or should terminate
typedef enum threadpool_state {
    ALL_RUN,
    ALL_EXIT
} threadpool_state_t;

typedef struct threadpool {
    size_t                          size;           // Number of entries in array.
    size_t                          live;           // Number of live threads in pool (when
                                                    //   pool is being destroyed, live<=size)
    threadpool_state_t              state;          // Threads check this before getting job.
    pthread_t *                     array;          // The threads themselves.

    threadpool_queue_head_t *       queue;          // queue of work orders

    pthread_mutex_t                 mutex;          // protects all vars declared below.
    pthread_cond_t                  job_posted;     // dispatcher: "Hey guys, there's a job!"
    pthread_cond_t                  job_taken;      // a worker: "Got it!"

    struct timeval                  created;        // When the threadpool was created.
} threadpool_t;

Die Struktur ist an sich nicht weiter spannend. Neben der Größe und der Anzahl laufender Threads im Pool enthält diese Struktur noch den Status (ob der Pool aktiv ist, oder beendet werden soll), sowie die Liste aller Threads, die zu diesem Pool gehören. Als letztes ist hier noch ein Feld für den Zeitpunkt, wann dieser Pool erzeugt wurde. An sich also recht unspektakulär.

Die vier gerade nicht erwähnten Felder machen nun die gesamte Magie des Thread Pools aus: Während in queue die gesamten Arbeitsaufträge stehen, sorgen mutex, job_posted und job_taken für die Synchronisation der Arbeit. Aber zur Synchronisation nachher gleich mehr, denn zuerst werfen wir einmal einen kurzen Blick auf die Auftragswarteschlange.

// "dispatch_func_t" declares a typed function pointer.
// A variable of type "dispatch_func_t" points to a 
// function with the following signature:
//     void dispatch_function(void *arg);
typedef void (* dispatch_func_t)(void *);

typedef struct threadpool_queue_node {
    struct threadpool_queue_node *  next;
    struct threadpool_queue_node *  prev;

    dispatch_func_t                 job_func;
    void *                          job_arg;
    dispatch_func_t                 cleanup_func;
    void *                          cleanup_arg;
} threadpool_queue_node_t;

typedef struct threadpool_queue_head {
    threadpool_queue_node_t *       head;
    threadpool_queue_node_t *       tail;

    threadpool_queue_node_t *       freeHead;
    threadpool_queue_node_t *       freeTail;

    size_t                          capacity_cur;
    size_t                          capacity_max;
} threadpool_queue_head_t;

Okay, dieser Teil besteht aus 2 Teilen: Zum Einen aus einer doppelt verketteten Liste und zum anderen aus den Aufträgen, die in dieser verwaltet werden. Die Art der Nutzung der Warteschlange sorgt dafür, dass alle Arbeitsaufträge in der Liste zyklisch beschrieben werden und somit eine Art Ringpuffer entsteht. Die Kapazität wird hierbei von zwei Feldern geregelt: capacity_cur (aktuelle Anzahl genutzter Einträge) und capacity_max (Maximal verfügbare Queue-Länge). Diese Werte werden beim Erzeugen des Threadpools gesetzt, bzw. dynamisch in bestimmten Grenzen angepasst, wenn die Warteschlange vergrößert werden muss. Auch hierzu gleich mehr.

Denn viel spannender als eine stumpfe doppelt verkettete Liste ist deren Inhalt. In unserem Fall handelt es sich dabei um die auszuführenden Aufträge. Diese sind aus 2 Callbacks und den dazugehörigen Argumenten aufgebaut. Ein Callback ist dabei eine einfache Funktion, der ein Void-Pointer übergeben werden kann: Quasi also beliebige Daten 😉

Wer sich nun wundert, warum es neben einer Job-Funktion auch immer eine Cleanup-Funktion gibt, dem sei gesagt, dass Speicher gerne aufgeräumt werden möchte. Gleiches gilt auch für andere Ressourcen, die man sich vom Betriebssystem holt. Und selbst wenn man das für seinen Arbeitsauftrag an sich tut, kann es passieren, dass dieser auf Grund des Multithreading abgebrochen wird, bevor er die angeforderten Ressourcen wieder korrekt freigeben konnte. Und genau an dieser Stelle springt einem die Cleanup-Funktion dann zur Seite, da diese automatisch ausgeführt wird, wenn der Arbeitsauftrag abgebrochen wurde.

Nach dem wir nun den prinzipiellen Aufbau des Threadpools im Speicher geklärt haben, können wir uns dem wichtigen Teil zuwenden: Der eigentlichen Implementierung. Bei dieser beginnen wir der Einfachheit halber mit dem Erstellen des Pools, da dieses noch vergleichsweise Straight Forward ist.

/*
 * Create a thread pool.
 */
threadpool_t *threadpool_create(int poolsize) {
    threadpool_t *pool;     // pool we create and hand back
    int i;                  // work var

    // sanity check the argument
    if ((poolsize <= 0) || (poolsize > THREADPOOL_MAX_SIZE)) {
        return NULL;
    }

    // create the threadpool_t struct
    pool = (threadpool_t *)malloc(sizeof(threadpool_t));
    if (pool == NULL) {
        fprintf(stderr, "\n\nOut of memory creating a new threadpool!\n");
        return NULL;
    }

    // initialize everything but the array and live thread count
    gettimeofday(&pool->created, NULL);
    pool->size = poolsize;
    pool->state = ALL_RUN;
    pthread_mutex_init(&(pool->mutex), NULL);
    pthread_cond_init(&(pool->job_posted), NULL);
    pthread_cond_init(&(pool->job_taken), NULL);

    // create the array of threads within the pool
    pool->array = (pthread_t *) malloc(pool->size * sizeof(pthread_t));
    if (!pool->array) {
        fprintf(stderr, "\n\nOut of memory allocating thread array!\n");
        free(pool);
        pool = NULL;
        return NULL;
    }

    pool->queue = _threadpool_makeQueue(poolsize);
    if (!pool->queue) {
        fprintf(stderr, "\n\nOut of memory allocating task queue!\n");
        free(pool->array);
        free(pool);
        pool = NULL;
        return NULL;
    }

    // bring each thread to life (update counters in loop so threads can
    //   access pool->live to find out their ID#
    for (i = 0; i < pool->size; ++i) {
        if (0 != pthread_create(pool->array + i, NULL, (void *(*)(void *))_threadpool_dowork, (void *) pool)) {
            perror("\n\nThread creation failed:");
            // TODO: Do proper finalization!
            exit(EXIT_FAILURE);
            return NULL;
        }

        // automatic cleanup when thread exits.
        pthread_detach(pool->array[i]);
    }

    return pool;
}

Ich denke, weite Teile dieser Routine dürften selbsterklärend sein. Einzig die letzte Schleife, in der unsere Arbeitsthreads erzeugt werden bedarf wahrscheinlich etwas Erläuterung:

    // bring each thread to life (update counters in loop so threads can
    //   access pool->live to find out their ID#
    for (i = 0; i < pool->size; ++i) {
        if (0 != pthread_create(pool->array + i, NULL, (void *(*)(void *))_threadpool_dowork, (void *) pool)) {
            perror("\n\nThread creation failed:");
            // TODO: Do proper finalization!
            exit(EXIT_FAILURE);
            return NULL;
        }

        // automatic cleanup when thread exits.
        pthread_detach(pool->array[i]);
    }

Mit pthread_create wird hierbei ein neuer pthread als Kind des aktuellen Threads erzeugt. Dieser würde normalerweise erst dann finalisiert werden, wenn unser Thread auch beendet wird. Um den Arbeitsthread unabhängig von unserem aktuellen Thread (ja, das Hauptprogramm ist auch nur ein Thread), auszuführen, müssen wir diesen abkoppeln. Dies geschieht in der letzten Zeile der Schleife.

Im Kontext der Create-Methode werden nun noch zwei weitere Methoden referenziert. Die erste ist _threadpool_makeQueue, der die gewünschte Größe des Thread Pools übergeben wird und die entsprechend die oben erwähnte, doppelt verkettete Liste initialisiert. Bliebe noch die zweite Funktion zu erwähnen: _threadpool_dowork.

Wie der Name dieser Funktion bereits vermuten lässt, erledigt sie die ganze Arbeit unseres Thread Pools. Werfen wir also einfach einen Blick drauf:

/*
 * Define the life of a working thread.
 */
void *_threadpool_dowork(threadpool_t *pool) {
    // Remember my creation sequence number
//    int myid =
    __sync_add_and_fetch(&pool->live, 1);

    // When we get a posted job, we copy it into these local vars.
    dispatch_func_t     job_func;
    void *              job_arg;

    dispatch_func_t     cleanup_func;
    void *              cleanup_arg;

    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
    pthread_cleanup_push((dispatch_func_t)pthread_mutex_unlock, (void *) &pool->mutex);

    // Grab mutex so we can begin waiting for a job
    if (0 != pthread_mutex_lock(&pool->mutex)) {
        perror("\nMutex lock failed!:");
        exit(EXIT_FAILURE);
    }

    // Main loop: wait for job posting, do job(s) ... forever
    while(1) {

        while(!_threadpool_jobAvailable(pool->queue)) {
            pthread_cond_wait(&pool->job_posted, &pool->mutex);
        }

        // We've just woken up and we have the mutex.  Check pool's state
        if (ALL_EXIT == pool->state) {
            break;
        }

        // while we find work to do
        _threadpool_getWorkOrder(pool->queue, &job_func, &job_arg, &cleanup_func, &cleanup_arg);
        pthread_cond_signal(&pool->job_taken);

        // Yield mutex so other jobs can be posted
        if (0 != pthread_mutex_unlock(&pool->mutex)) {
            perror("\n\nMutex unlock failed!:");
            exit(EXIT_FAILURE);
        }

        // Run the job we've taken
        if(cleanup_func) {
            pthread_cleanup_push(cleanup_func, cleanup_arg);
        }

        //GCC fails if I DON'T write a while loop here. Let's make it happy!
        while(0);

        job_func(job_arg);

        if(cleanup_func) {
            pthread_cleanup_pop(1);
        }

        // Grab mutex so we can grab posted job, or (if no job is posted)
        //   begin waiting for next posting.
        if (0 != pthread_mutex_lock(&pool->mutex)) {
            perror("\n\nMutex lock failed!:");
            exit(EXIT_FAILURE);
        }
    }

    // If we get here, we broke from loop because state is ALL_EXIT.
    __sync_sub_and_fetch(&pool->live, 1);

    // We're not really taking a job ... but this signals the destroyer
    //   that one thread has exited, so it can keep on destroying.
    pthread_cond_signal(&pool->job_taken);

    if (0 != pthread_mutex_unlock(&pool->mutex)) {
        perror("\n\nMutex unlock failed!:");
        exit(EXIT_FAILURE);
    }

    pthread_cleanup_pop(1);
    return NULL;
}

Auch hier ist der Ablauf an sich relativ einfach, aber wie versprochen, wollte ich ja noch mal auf das Thema Synchronisation zu sprechen kommen. Und genau diese Funktion bietet einen wunderbaren Ansatz dafür, weil sie alles enthält, was es an Synchronisationsaufgaben zu erledigen gibt.

Das Grundproblem bei nebenläufigen Anwendungen ist nämlich, wie man sich auf einen bestimmten Stand einigt, den alle Beteiligten kohärent für ihre Berechnungen benutzen. Nehmen wir hierzu einmal an, auf dem Tisch vor uns steht eine bereits geschnittene Torte mit X Tortenstücken drauf. Ferner können wir für jedes der Stücken nur entweder feststellen, ob es noch da ist, oder uns ein Stück Torte wegnehmen. Wenn wir nur einen Berechtigten haben, der die Stücken der Torte verteilt, ist die Aufgabe relativ klar und die Situation unproblematisch: Derjenige guckt nach, ob ein Stück noch da ist und nimmt sich dieses Stück im nächsten Zeitschritt einfach. Nun ist so eine Torte unter Party-Gästen sehr begehrt und das DRM beim Zugriff auf die einzelnen Stücken natürlich nicht existent. Auch erfolgt der Zugriff auf die Torte durch jeden Gast üblicherweise gleichzeitig, denn alle wollen die leckere Torte gern essen.

Nehmen wir an, die Tortenstücke sind unterscheidbar (die Menge der Tortenstücke brauch jedoch nicht abzählbar sein ;-)), so kann es passieren, dass zwei Gäste das selbe Tortenstück essen möchten. Da der Zugriff der Gäste aber nicht synchronisiert erfolgt, können folgende 3 Situationen auftreten:

  1. Gast 1 prüft, dass ein Tortenstück t∈Torte von Gast 1 auf Verfügbarkeit geprüft wird und Gast 2 erst im nächsten Schritt die Verfügbarkeit prüft. Je nachdem, ob diese Prüfung von Gast 2 nun schneller vorgenommen wird, als Gast 1 brauch, um das Stück Torte vom Tisch zu nehmen, ist für Gast 2 das Stück Torte da oder nicht. Das Ergebnis der Überprüfung ist also nicht immer konsistent. Zudem kann es passieren, dass Gast 2 im Folgeschritt versucht, sich ein Stück Torte zu nehmen, welches gar nicht mehr da ist.
  2. Gast 1 und 2 fangen gleichzeitig an, zu prüfen, ob das selbe Stück Torte noch da ist, sehen beide, dass dieses noch da ist und werden also im nächsten Schritt gleichzeitig versuchen, sich das Selbe Stück Torte zu nehmen.
  3. Gast 1 prüft vor Gast 2 die Verfügbarkeit des Stück Torte, kann sich aber nicht im nächsten Schritt das Stück Torte holen, weil ihm noch ein Tortenheber fehlt. Während er sich diesen aus der Küche holt, prüft Gast 2 für das selbe Stück Torte die Verfügbarkeit und nimmt sich dieses weg, bevor Gast 1 mit seinem Tortenheber zurück ist, um vergeblich zu versuchen, das nun nicht mehr vorhandene Stück Torte zu nehmen.

In allen 3 Fällen dürfte der Spaß der Gäste durch die unzureichende Synchronisation des Zugriffs auf die Torte getrübt werden. Was können wir also tun? Die erste Variante wäre, nur genau einen die Tortenstücke austeilen zu lassen. Da wir dann aber nicht mehr sehr effizient die Torte verteilen könnten (wir nehmen einmal an, von unseren N Gästen können fast alle gleichzeitig mindestens ein Tortenstück erreichen), weil sonst jeder sich anstellen müsste, verwerfen wir diesen Vorschlag: Wir wollen das Verteilen der Torte ja schließlich parallelisieren, so dass jeder schnellstmöglich sein Stück bekommt.

Ein Ansatz, der hier auffallen dürfte, ist das Problem, dass wir es irgendwie schaffen müssen, dass die Gäste koordinieren können, welche Stücken bereits „vergeben“ sind, da wir dann Zugriffe auf das selbe Stück Kuchen unterbinden können. Führen wir dazu eine Operation „Nachschauen und Markieren“ ein, die auf dem Stück Torte ein Fähnchen setzt, sobald das Stück Torte von jemandem als seines markiert wird. Zusätzlich sind die Fähnchen so beschaffen, dass immer nur eines gesetzt werden kann, aber jeder weiß, wenn er sein eigenes Fähnchen korrekt gesetzt hat: Wenn Zwei ein Fähnchen versuchen zu setzen, hat einer von beiden (z.B. vom Universum ausgewürfelt ;-)) Glück, während der andere eindeutig weiß, dass dieses Stück nicht verfügbar ist (unabhängig, davon, ob er weiß, ob ein Stück Kuchen da ist oder nicht).

Für die drei Fälle oben heißt das Folgendes:

  1. Gast 1 und 2 bekommen vom Universum ausgewürfelt, wer seine Markierung setzen darf. Der Gast, dessen Fähnchen nun auf dem Stück Torte prangt, nimmt sich dieses und entfernt damit auch das Fähnchen wieder. Der unterlegene Gast kann nun sein Fähnchen ins Nix stellen, prüfen, dass da nix mehr ist und sucht sich ein aderes Stück Torte.
  2. Gast 1 setzt sein Fähnchen. Gast 2 sieht das Fähnchen und kann seines daher nicht setzen. Er wartet also auf Gast 1, bis dieser das Fähnchen wieder entfernt hat (und das Stück Torte).
  3. Gast 1 setzt sein Fähnchen ungestört, Gast 2 kann seines nicht setzen und muss warten. Gast 1 nimmt sich irgendwann sein Stück Torte und gibt den Zugriff für Gast 2 frei, der feststellt, dass da kein Stück Torte mehr ist. Gast 2 sucht sich ein anderes.

Und schon sind alle unsere Gäste zufrieden, weil sie immer garantiert bekommen, dass sie ein Stück Torte auch bekommen, wenn sie einmal festgestellt haben, dass eines da ist, bzw. sie sich ein anderes suchen müssen, wenn keines mehr am gewünschten Ort liegt.

Was bedeutet das nun für unser Problem im Thread Pool? Nun, wie auch unsere Gäste mit den Tortenstücken haben unsere Threads im Thread Pool das Problem, dass auf Speicherbereiche zugegriffen werden muss. Zusätzlich zu Speicher vorhanden oder nicht, müssen unsere Threads jedoch auch mit Veränderungen zurecht kommen, um einen kohärenten Stand der Abarbeitung zu gewährleisten, d.h. so dass niemand zwischen dem Lesen einer Speicherstelle und deren Veränderung eine weitere Veränderung vornehmen kann.

Bei Threads wird diese Synchronisation der Zugriffe typischer Weise durch mehrere Techniken realisiert:

  • Critical Sections: Code-Bereiche, die immer nur von einem Thread gleichzeitig durchlaufen werden können.
  • Semaphoren: Zähler, die die Anzahl freier Ressourcen zählen. Sind keine Ressourcen verfügbar, muss man auf die Freigabe warten.
  • Mutex (AKA binärer Semaphor): Wie ein Semaphor, nur halt begrenzt auf die Werte 1 und 0 ;-), also Verfügbar oder belegt.
  • Locks: Sperren für Lese- oder Schreibvorgänge, bzw. ggf. auch andere Vorgänge

Möchte man, dass die Threads untereinander kooperieren, ist das Signalisieren von Statusänderungen oftmals von Bedeutung. Hierfür kann man Events (Ereignisse, auf deren Eintreten passiv gewartet werden kann) bzw. Condition Variablen (Variablen, die über einen Mutex synchronisiert werden) verwenden.

Nach dem wir nun die Grundlagen für die Erklärung der verwendeten Funktionalitäten gelegt haben, gehen wir einfach einmal der Reihe nach durch:

/*
 * Define the life of a working thread.
 */
void *_threadpool_dowork(threadpool_t *pool) {
    // Remember my creation sequence number
//    int myid =
    __sync_add_and_fetch(&pool->live, 1);

Kaum in der Routine und wir haben es bereits mit Synchronisation zu tun 😛 Was diese Zeile macht ist an sich etwas ganz triviales: Wir zählen, wieviele Threads grad aktiv sind. Und da wir grade zum Leben erweckt wurden, teilen wir dies der Pool-Verwaltung mit. Nun machen das außer uns noch X andere Threads, was zum oben erwähnten Problem mit dem gleichzeitigen Zugriff auf den Speicher führt. Damit dies nicht passiert nutzen wir einen sogenannten atomaren Inkrement. Dies ist eine Spezielle Art eine Variable zu inkrementieren, bei der uns vom Compiler garantiert wird, dass der erzeugte Code auf der Zielplattform ohne Unterbrechung, also in einem einzigen Schritt, ausgeführt wird. Auf diese Weise kann also immer nur ein Thread gleichzeitig sein Auferstehen mitteilen, während seine Konkurrenten warten müssen.

    // When we get a posted job, we copy it into these local vars.
    dispatch_func_t     job_func;
    void *              job_arg;

    dispatch_func_t     cleanup_func;
    void *              cleanup_arg;

In diesen 4 Variablen merken wir uns später, was wir derzeit zu haben. Die Job-Funktion ist dabei die regulär auszuführende Funktion, während die Cleanup-Funktion nur ausgeführt wird, wenn wir während der Ausführung unseres Arbeitsauftrags unterbrochen werden.

    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
    pthread_cleanup_push((dispatch_func_t)pthread_mutex_unlock, (void *) &pool->mutex);

Und die erste Magie in unserer Routine. Diese ist nötig, um dem Betriebssystem mitzuteilen, dass es unseren Thread jederzeit unterbrechen bzw. beenden darf. Zusätzlich teilen wir dem Betriebssystem mit, dass wir im Falle einer Unterbrechung bitte ein Unlock des Pool-Mutex ausführen möchten. Diesen holen wir uns nämlich im nächsten Schritt und müssen daher dafür sorgen, dass wir im Falle einer Unterbrechung diesen freigeben, damit wir in keine Deadlocks oder Starving-Situationen laufen können.

    // Grab mutex so we can begin waiting for a job
    if (0 != pthread_mutex_lock(&pool->mutex)) {
        perror("\nMutex lock failed!:");
        exit(EXIT_FAILURE);
    }

Dadurch, dass wir uns den Mutex geholt haben, stellen wir sicher, dass sich die Threads im Pool in eine gewisse Ordnung beim Warten auf Arbeit begeben und keine zwei Threads gleichzeitig versuchen aus der Arbeitswarteschlange zu lesen.

    // Main loop: wait for job posting, do job(s) ... forever
    while(1) {

        while(!_threadpool_jobAvailable(pool->queue)) {
            pthread_cond_wait(&pool->job_posted, &pool->mutex);
        }

Die Aufgabe unseres Workers ist, sich bis an sein Lebensende Aufgaben zu holen. Dazu fragen wir ab, ob in der Warteschlange Arbeit vorhanden ist. Da die Warteschlange diese Information nicht atomar liefern kann, darf immer nur ein Thread gleichzeitig nachschauen. Da wir zu diesem Zeitpunkt jedoch den Pool-Mutex (siehe vorigen Abschnitt) haben, ist dies gegeben. Da wir aber für den Fall, dass wir keine Arbeit finden anderen eine Chance geben müssen, uns Arbeit zu hinterlegen, geben wir mit Hilfe einer Condition-Variable den Pool-Mutex frei. Dies friert unseren eigenen Thread und gibt gleichzeitig den Mutex für andere frei. Sobald jemand neue Arbeit signalisiert, wird unser Thread wieder aus seinem Schlaf erweckt und prüft erneut die Verfügbarkeit von Arbeit. Beim Aufwecken wird uns zudem wieder der Besitz des Mutex zugestanden, so dass wir wieder garantiert der einzige sind, der auf den Pool und seinen Inhalt zugreift.

        // We've just woken up and we have the mutex.  Check pool's state
        if (ALL_EXIT == pool->state) {
            break;
        }

An dieser Stelle überprüfen wir, bevor wir versuchen, wirklich etwas zu tun, ob unsere anstehende Aufgabe ist, uns selber zu beenden. Wenn dies der Fall ist, dann verlassen wir einfach unsere Arbeitsschleife und starten die Aufräumarbeiten. Dazu aber gleich mehr. Denn aller Voraussicht wird es was zu tun geben.

        // while we find work to do
        _threadpool_getWorkOrder(pool->queue, &job_func, &job_arg, &cleanup_func, &cleanup_arg);
        pthread_cond_signal(&pool->job_taken);

Und da wir gewissenhafte Arbeiter sind, fragen wir einfach nach, was unser Auftrag ist und teilen unserer Verwaltung mit, dass wir uns einen Arbeitsauftrag genommen haben. Das Bescheidgeben erfolgt hierbei mit dem Gegenstück zur weiter oben bereits gesehenen Funktion pthread_cond_wait. Die Gegenstelle werden wir an anderer Stelle noch genauer erläutern.

        // Yield mutex so other jobs can be posted
        if (0 != pthread_mutex_unlock(&pool->mutex)) {
            perror("\n\nMutex unlock failed!:");
            exit(EXIT_FAILURE);
        }

Da wir nun unsere Arbeit haben, können wir auch andere auf die Verwaltungsstruktur zugreifen lassen, da wir bis zur Beendigung unserer Arbeit keinen weiteren Zugriff auf die Verwaltungsstruktur benötigen.

        // Run the job we've taken
        if(cleanup_func) {
            pthread_cleanup_push(cleanup_func, cleanup_arg);
        }

        job_func(job_arg);

        if(cleanup_func) {
            pthread_cleanup_pop(1);
        }

Aufmerksamen Lesern meines Blogs dürfte dieser Code-Schnipsel bekannt vorkommen. Ja, an dieser Stelle hatte der C-Compiler nen Bug, da ich aber die Funktionsweise an sich erklären möchte, verzichte ich auf den zugehörigen Bugfix, da der eh rausoptimiert wird 😉

Die beiden If-Abfragen klären hierbei ab, ob im Falle eines Abbruchs der zu erledigenden Aufgabe etwas Spezielles zu tun ist. Ist dies der Fall, wird ähnlich wie bereits oben mit dem Thread Pool eine entsprechende Cleanup-Routine gesetzt, bzw. diese wieder entfernt, wenn deren Ausführung doch nicht notwendig gewesen sein sollte. Und zwischendurch wird halt der eigentliche Arbeitsauftrag ausgeführt.

        // Grab mutex so we can grab posted job, or (if no job is posted)
        //   begin waiting for next posting.
        if (0 != pthread_mutex_lock(&pool->mutex)) {
            perror("\n\nMutex lock failed!:");
            exit(EXIT_FAILURE);
        }
    }

Okay, und damit währen wir auch schon fast durch durch einen Arbeitszyklus. Am Ende müssen wir uns nämlich nur noch die alleinige Kontrolle über den Thread Pool wiederholen und wir können uns beruhigt auf die Lauer nach neuer Arbeit begeben.

Nun aber zum Teil des Aufräumens:

    // If we get here, we broke from loop because state is ALL_EXIT.
    __sync_sub_and_fetch(&pool->live, 1);

Wer oben gut aufgepasst hat, kann sich diese Zeile hoffentlich recht einfach herleiten: Wir sagen einfach bescheid, dass wir nicht mehr aktiv sind. Dies ist nötig, um in der Pool-Verwaltung zu wissen, wann keiner mehr am Arbeiten ist.

    // We're not really taking a job ... but this signals the destroyer
    //   that one thread has exited, so it can keep on destroying.
    pthread_cond_signal(&pool->job_taken);

An dieser Stelle erklärt hoffentlich der Code-Kommentar alles. Also weiter im Source!

    if (0 != pthread_mutex_unlock(&pool->mutex)) {
        perror("\n\nMutex unlock failed!:");
        exit(EXIT_FAILURE);
    }

Und schließlich müssen wir noch den Zugriff auf den Thread Pool wieder freigeben, damit auch wieder andere auf diesen zugreifen dürfen.

    pthread_cleanup_pop(1);
    return NULL;
}

Da wir aber vom Anfang immer noch die Freigabe des Mutex als eine der zu erledigenden Aufgaben beim Beenden unseres Threads gesetzt haben, müssen wir dies noch als erledigt kennzeichnen, d.h. diese Aufgabe entfernen.

Womit wir auch das Erledigen von Arbeit behandelt hätten! Einfach, oder? Können wir also fortsetzen. 😉 Hierzu gehen wir einfach zur Funktion, die uns mitteilt, ob es Arbeit gibt. Diese sieht wie folgt aus:

int _threadpool_jobAvailable(threadpool_queue_head_t *queue)
{
    return queue->tail != NULL;
}

An sich also nicht’s wirklich Spektakuläres. Bei der Queue handelt es sich um eine doppelt verkette Liste, wodurch die Zugriffe auf diese relativ normal realisiert werden können (und in konstanter Zeit ablaufen):

threadpool_queue_head_t *_threadpool_makeQueue(int initial_cap) {
    int max_cap = THREADPOOL_MAX_QUEUE / (sizeof(threadpool_queue_node_t));
    int i;

    threadpool_queue_head_t *queue = (threadpool_queue_head_t *) malloc(sizeof(threadpool_queue_head_t));

    threadpool_queue_node_t *temp;

    if(!queue) {
        perror("Out of memory on malloc\n");
        exit(EXIT_FAILURE);
    }

    if(initial_cap > max_cap) {
        initial_cap = max_cap;
    }

    if(initial_cap < 1) {
        perror("Attempting to create a queue that holds no work orders\n");
        free(queue);
        exit(EXIT_FAILURE);
        return NULL;
    }

    queue->capacity_cur = initial_cap;
    queue->capacity_max = max_cap;

    queue->head = NULL;
    queue->tail = NULL;

    queue->freeHead = (threadpool_queue_node_t *) malloc(sizeof(threadpool_queue_node_t));

    if(!queue->freeHead) {
        perror("Out of memory on malloc\n");
        free(queue);
        exit(EXIT_FAILURE);
        return NULL;
    }

    queue->freeTail = queue->freeHead;

    //populate the free queue
    for(i = 1; i <= initial_cap; i++) {
        temp = (threadpool_queue_node_t *) malloc(sizeof(threadpool_queue_node_t));
        if(!temp) {
            perror("Out of memory on malloc\n");
            // TODO Properly free all memory
            free(queue);
            exit(EXIT_FAILURE);
            return NULL;
        }

        temp->next = queue->freeHead;
        temp->prev = NULL;
        queue->freeHead->prev = temp;
        queue->freeHead = temp;
    }

    return queue;
}

void _threadpool_addWorkOrder(threadpool_queue_head_t *queue, dispatch_func_t job_func, void *job_arg, dispatch_func_t cleanup_func, void *cleanup_arg) {
    threadpool_queue_node_t *temp;

    if(!queue->freeTail) {
        temp = (threadpool_queue_node_t *) malloc(sizeof(threadpool_queue_node_t));
        if(!temp) {
            perror("Out of memory on malloc\n");
            exit(2);
        }

        temp->next = NULL;
        temp->prev = NULL;
        queue->freeHead = temp;
        queue->freeTail = temp;
        queue->capacity_cur++;
    }

    temp = queue->freeTail;
    if(!queue->freeTail->prev) {
        queue->freeTail = NULL;
        queue->freeHead = NULL;
    } else {
        queue->freeTail->prev->next = NULL;
        queue->freeTail = queue->freeTail->prev;
        queue->freeTail->next = NULL;
    }

    temp->job_func = job_func;
    temp->job_arg = job_arg;
    temp->cleanup_func = cleanup_func;
    temp->cleanup_arg = cleanup_arg;

    temp->prev = NULL;
    if(!queue->head) {
        queue->tail = temp;
        queue->head = temp;
    } else {
        temp->next = queue->head;
        queue->head->prev = temp;
        queue->head = temp;
    }
}

void _threadpool_getWorkOrder(threadpool_queue_head_t *queue, dispatch_func_t *job_func, void **job_arg, dispatch_func_t *cleanup_func, void **cleanup_arg) {
    threadpool_queue_node_t *temp;

    temp = queue->tail;
    if(!temp) {
        perror("Attempting to getWorkOrder from an empty queue.\n");
        exit(2);
    }

    if(!queue->tail->prev) {
        queue->tail = NULL;
        queue->head = NULL;
    } else {
        queue->tail->prev->next = NULL;
        queue->tail = queue->tail->prev;
        queue->tail->next = NULL;
    }

    *job_func = temp->job_func;
    *job_arg  = temp->job_arg;
    *cleanup_func = temp->cleanup_func;
    *cleanup_arg  = temp->cleanup_arg;

    temp->next = NULL;
    if(!queue->freeHead) {
        queue->freeTail = temp;
        queue->freeHead = temp;
        temp->prev = NULL;
    } else {
        temp->next = queue->freeHead;
        queue->freeHead->prev = temp;
        queue->freeHead = temp;
    }
}

Die erste Funktion ist hierbei für die Initialisierung der Warteschlange für die Arbeitsaufträge zuständig und wird in der oben bereits erläuterten Funktion threadpool_create aufgerufen, um den Speicher für die Queue zu initialisieren.

Die Funktion darunter fügt in unsere Warteschleife einen neuen Arbeitsauftrag ein. Da wir am Anfang nicht wissen, wieviele dies werden, wird die Queue ggf. dynamisch vergrößert, jedoch nie über ein verher festgelegtes Maximum, um einen übermäßigen Rückstau zu vermeiden und das Erschöpfen der Speicherressourcen zu vermeiden. Schließlich erledigt die dritte Funktion das Abholen eines Arbeitsauftrags aus der Warteschlange. Auch hier ist der Quelltext ziemlich einfach zu verstehen.

Wer sich nun fragt, warum keine der Funktionen irgendwas synchronisiert: Weil das nicht notwendig ist: Jede der Funktionen wirdd nur an Stellen aufgerufen, wo garantiert ist, dass immer nur exakt ein Thread Kontrolle über die zu manipulierende Datenstruktur besitzt. Im Falle der ersten Funktion ist dies threadpool_create, bei der der Pointer auf diese Struktur als einzige Stelle im Programm bekannt ist; bei den anderen beiden Funktionen haben wir den gelockten Pool-Mutex, der den Zugriff auf die Warteschlange regelt.

Somit ist die Verteilung der Tortenstückchen im Thread Pool eindeutig geklärt und jeder teilnehmende Thread ist glücklich 😉

Vielleicht noch nicht ganz, denn einen Einzeiler haben wir noch nicht behandelt ;-):

int _threadpool_canAcceptWork(threadpool_queue_head_t *queue)
{
    return (queue->freeTail != NULL) ||
        (queue->capacity_cur <= queue->capacity_max);
}

Diese Funktion wird von der Funktion zum Hinzufügen von Arbeitsaufträgen aufgerufen und prüft, ob wir noch Platz haben. Der Ablauf ist dabei relativ einfach: Sind freigegebene Einträge von ehemaligen Arbeitsaufträgen vorhanden oder sind wir noch unter unserer Maximalkapazität steht dem Hinzufügen weiterer Arbeit nichts im Wege.

Nachdem wir nun das Abarbeiten von Arbeitsaufträgen geklärt haben, können wir uns dem Anzetteln von Arbeit zuwenden. Dies geschieht über die folgenden beiden Funktionen:

/*----------------------------------------------------------------------
 * Dispatch a thread
 */
void threadpool_dispatch(threadpool_t *pool, dispatch_func_t func, void *arg) {
    threadpool_dispatch_cleanup(pool, func, arg, NULL, NULL);
}

void threadpool_dispatch_cleanup(threadpool_t *pool, dispatch_func_t job_func, void *job_arg,
    dispatch_func_t cleanup_func, void* cleanup_arg) {

    if(pool == (threadpool_t *) job_arg) {
        return;
    }

    pthread_cleanup_push((dispatch_func_t)pthread_mutex_unlock, (void *) &pool->mutex);

    // Grab the mutex
    if (0 != pthread_mutex_lock(&pool->mutex)) {
        perror("Mutex lock failed (!!):");
        exit(-1);
    }

    while(!_threadpool_canAcceptWork(pool->queue)) {
        pthread_cond_signal(&pool->job_posted);
        pthread_cond_wait(&pool->job_taken, &pool->mutex);
    }

    // Finally, there's room to post a job. Do so and signal workers.
    _threadpool_addWorkOrder(pool->queue, job_func, job_arg, cleanup_func, cleanup_arg);

    pthread_cond_signal(&pool->job_posted);

    // Yield mutex so a worker can pick up the job
    if (0 != pthread_mutex_unlock(&pool->mutex)) {
        perror("\n\nMutex unlock failed!:");
        exit(EXIT_FAILURE);
    }

    pthread_cleanup_pop(1);
}

Die erste dieser beiden Funktionen ist sehr schnell erklärt: Sie betrachtet das Hinzufügen eines einfachen Jobs ohne hinterher aufräumen einfach als den Fall, bei dem zum Aufräumen nichts zu tun ist. Einfach, nicht? 😛

void threadpool_dispatch_cleanup(threadpool_t *pool, dispatch_func_t job_func, void *job_arg,
    dispatch_func_t cleanup_func, void* cleanup_arg) {

    if(pool == (threadpool_t *) job_arg) {
        return;
    }

Um einen Arbeitsauftrag hinzuzufügen prüfen wir zuerst, ob dieser überhaupt eine auszuführende Routine beinhaltet. Wenn nicht, ignorieren wir den Auftrag einfach.

    pthread_cleanup_push((dispatch_func_t)pthread_mutex_unlock, (void *) &pool->mutex);

    // Grab the mutex
    if (0 != pthread_mutex_lock(&pool->mutex)) {
        perror("Mutex lock failed (!!):");
        exit(-1);
    }

Diese Zeilen dürften bekannt vorkommen: Wir geben bescheid, dass wir im Fall eines Abbruchs etwas aufräumen müssen und holen uns den Pool Mutex, um mit dem Thread Pool arbeiten zu können.

    while(!_threadpool_canAcceptWork(pool->queue)) {
        pthread_cond_signal(&pool->job_posted);
        pthread_cond_wait(&pool->job_taken, &pool->mutex);
    }

Wobei wir zuerst einmal um das Wohlergehen unserer Arbeiter besorgt sind und überprüfen, ob diese mit der Arbeitslast überhaupt zurecht kommen. Wenn zu viel Arbeit ansteht, warten wir einfach, bis zumindest ein Teil erledigt wurde, so dass in der Warteschlange wieder Platz frei ist.

    // Finally, there's room to post a job. Do so and signal workers.
    _threadpool_addWorkOrder(pool->queue, job_func, job_arg, cleanup_func, cleanup_arg);

    pthread_cond_signal(&pool->job_posted);

Und sobald dann Platz ist, fügen wir den Arbeitsauftrag einfach ein und geben bescheid, dass es wieder Arbeit gibt.. Ganz einfach! 🙂

    // Yield mutex so a worker can pick up the job
    if (0 != pthread_mutex_unlock(&pool->mutex)) {
        perror("\n\nMutex unlock failed!:");
        exit(EXIT_FAILURE);
    }

    pthread_cleanup_pop(1);
}

Abschließend brauchen wir nur noch den Mutex auf den Thread Pool freigeben, damit andere wieder damit arbeiten können und in unerem Thread das Aufräumen als Hinfällig zu kennzeichnen. Auch hier also wenig Spannendes.

Und zu guter Letzt, und nicht nur, weil wir ordentliche Menschen sind, müssen wir irgendwann auch einmal aufräumen. Dies wird mit der letzten unserer Funktionen realisiert:

/*----------------------------------------------------------------------
 * Destroy a thread pool.  If there is a job still waiting for a thread
 *   to execute it, tough.  We set the ALL_EXIT flag anyways.
 */
void threadpool_destroy(threadpool_t *pool)
{
    int oldtype;

    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype);
    pthread_cleanup_push((dispatch_func_t)pthread_mutex_unlock, (void *) &pool->mutex);

    // Cause all threads to exit. Because they were detached when created,
    //   the underlying memory for each is automatically reclaimed.

    // Grab the mutex
    if (0 != pthread_mutex_lock(&pool->mutex)) {
        perror("Mutex lock failed (!!):");
        exit(-1);
    }

    pool->state = ALL_EXIT;

    while (pool->live > 0) {
        // get workers to check in ...
        pthread_cond_signal(&pool->job_posted);

        // ... and wake up when they check out.
        pthread_cond_wait(&pool->job_taken, &pool->mutex);
    }

    // Null-out entries in pool's thread array; free array.
    memset(pool->array, 0, pool->size * sizeof(pthread_t));
    free(pool->array);

    // Destroy the mutex and condition variables in the pool.
    pthread_cleanup_pop(0);
    if (0 != pthread_mutex_unlock(&pool->mutex)) {
        perror("\n\nMutex unlock failed!:");
        exit(EXIT_FAILURE);
    }

    if (0 != pthread_mutex_destroy(&pool->mutex)) {
        perror("\nMutex destruction failed!:");
        exit(EXIT_FAILURE);
    }

    if (0 != pthread_cond_destroy(&pool->job_posted)) {
        perror("\nCondition Variable 'job_posted' destruction failed!:");
        exit(EXIT_FAILURE);
    }

    if (0 != pthread_cond_destroy(&pool->job_taken)) {
        perror("\nCondition Variable 'job_taken' destruction failed!:");
        exit(EXIT_FAILURE);
    }

    // Zero out all bytes of the pool
    memset(pool, 0, sizeof(threadpool_t));

    // Free the pool and null out the pointer to it
    free(pool);
    pool = NULL;
}

Auch an dieser Stelle ist eine kurze Erläuterung vielleicht recht hilfreich, auch wenn vieles recht einfach zu überblicken ist.

/*----------------------------------------------------------------------
 * Destroy a thread pool.  If there is a job still waiting for a thread
 *   to execute it, tough.  We set the ALL_EXIT flag anyways.
 */
void threadpool_destroy(threadpool_t *pool)
{
    int oldtype;

    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype);
    pthread_cleanup_push((dispatch_func_t)pthread_mutex_unlock, (void *) &pool->mutex);

Hier wird zum Einen mitgeteilt, dasswir nur zu definierten Zeitpunkten abgebrochen werden möchten und teilen zudem mit, dass wir beim Aufräumen den Mutex des Thread Pools freigeben möchten.

    // Cause all threads to exit. Because they were detached when created,
    //   the underlying memory for each is automatically reclaimed.

    // Grab the mutex
    if (0 != pthread_mutex_lock(&pool->mutex)) {
        perror("Mutex lock failed (!!):");
        exit(-1);
    }

Zu diesem Abschnitt spare ich mir einen dedizierten Kommentar, da wir das schon mehrfach hatten.

    pool->state = ALL_EXIT;

    while (pool->live > 0) {
        // get workers to check in ...
        pthread_cond_signal(&pool->job_posted);

        // ... and wake up when they check out.
        pthread_cond_wait(&pool->job_taken, &pool->mutex);
    }

Und die eigentliche Magie: Das Beenden aller Threads im Thread Pool. Wie bereits oben beim Bearbeiten eines Arbeitsauftrages gesehen, hat jeder Thread Pool ein Flag, welches angibt, ob der Thread Pool noch Aktiv ist, oder beendet werden soll. Dieses wird als erstes auf „Bitte beenden“ gesetzt. Anschließend sagen wir solange, wie noch Threads am Leben sind einfach, bescheid, dass sich ein Thread beenden soll.

    // Null-out entries in pool's thread array; free array.
    memset(pool->array, 0, pool->size * sizeof(pthread_t));
    free(pool->array);

    // Destroy the mutex and condition variables in the pool.
    pthread_cleanup_pop(0);
    if (0 != pthread_mutex_unlock(&pool->mutex)) {
        perror("\n\nMutex unlock failed!:");
        exit(EXIT_FAILURE);
    }

    if (0 != pthread_mutex_destroy(&pool->mutex)) {
        perror("\nMutex destruction failed!:");
        exit(EXIT_FAILURE);
    }

    if (0 != pthread_cond_destroy(&pool->job_posted)) {
        perror("\nCondition Variable 'job_posted' destruction failed!:");
        exit(EXIT_FAILURE);
    }

    if (0 != pthread_cond_destroy(&pool->job_taken)) {
        perror("\nCondition Variable 'job_taken' destruction failed!:");
        exit(EXIT_FAILURE);
    }

    // Zero out all bytes of the pool
    memset(pool, 0, sizeof(threadpool_t));

    // Free the pool and null out the pointer to it
    free(pool);
    pool = NULL;
}

Anschließend geben wir noch die benötigten Ressourcen frei und sorgen dabei dafür, dass diese sicher aus dem Speicher gelöscht werden, um sicher zu gehen, dass keiner auf nicht mehr vorhandene Daten zugreift.

Und damit wären wir auch bereits am Ende: Mit nur diesen wenigen Code-Zeilen können wir bequem unsere Arbeit auf mehrere Prozessorkerne verteilen, ohne uns allzu viele Gedanken über das Wie zu machen, vorausgesetzt, die Aufgaben können unabhängig voneinander ausgeführt werden. Wichtig ist zudem auch, dass Aufgaben, die sich Gegenseitig blockieren können zur Vermeidung von Deadlocks nicht im gleichen Thread Pool abgearbeitet werden sollten. Solange nur eine einseitige Abhängigkeit besteht ist dies zwar in der Regel unkritisch, sollte aber dennoch vermieden werden, da der blockierte Thread im Thread Pool belegt wird, ohne dass dieser Arbeit erledigen kann und damit die Leistungsfähigkeit herabgesetzt wird.

Ansonsten wünsch ich mit der hier vorgestellten Implementierung viel Spaß, möchte aber vor eventuell auftretenden Fehlern warnen, da der Source zwar an sich Schlüssig ist, im Detail aber noch kleinere Stolpersteine enthalten kann, die sich wie eigentlich alle Thread-bezogenen Probleme wahrscheinlich erst im Produktiv-Betrieb äußern werden. Daher gilt wie immer D. E. Knuth: Beware of bugs in the above code; I have only proved it correct, not tried it.

Flattr this!

Keine Kommentare »

No comments yet.

RSS feed for comments on this post. TrackBack URL

Leave a comment

Powered by WordPress