{"id":1109,"date":"2011-09-30T10:38:24","date_gmt":"2011-09-30T08:38:24","guid":{"rendered":"http:\/\/blog.benny-baumann.de\/?p=1109"},"modified":"2011-10-06T19:56:50","modified_gmt":"2011-10-06T17:56:50","slug":"thread-pools","status":"publish","type":"post","link":"https:\/\/blog.benny-baumann.de\/?p=1109","title":{"rendered":"Thread Pools"},"content":{"rendered":"<p>Neuere Prozessoren bieten immer mehr Leistung durch immer mehr parallele Kerne bei aber seit l\u00e4ngerem nahezu gleich gebliebener Taktrate. Somit bleibt einem ohne Anpassung seiner Programme diese zus\u00e4tzliche Leistung verwehrt. Nur in dem man sein Programm in mehrere Teile spaltet, die parallel ablaufen k\u00f6nnen, kann man sein Programm auch auf heutigen Prozessoren in optimaler Geschwindigkeit ausf\u00fchren.<\/p>\n<p>F\u00fcr die Parallelisierung gibt es hierbei je nach Betriebssystem verschiedene Mittel. Die wohl bekanntesten Mittel stellen hierbei Threads unter Windows, bzw. das Forken unter Linux dar. Aber auch etwas exotischere Mittel wie PThreads oder Fibers bieten gute M\u00f6glichkeiten, um ein Programm auf mehrere CPUs zu verteilen.<\/p>\n<p>Die verschiedenen Ans\u00e4tze haben dabei gemeinsam, dass f\u00fcr verschiedene Aufgaben jeweils mehr oder weniger Umfangreiche Objekte erzeugt werden m\u00fcssen, die f\u00fcr jede Aufgabe einen Zwischenstand enthalten, den sogenannten Kontext. Dieser ist bei Fibers (Windows) sehr klein, bei Multitasking mittels Prozessen aber durchaus sehr gro\u00df. Je gr\u00f6\u00dfer dabei solch ein Kontext wird, desto langsamer wird das Umschalten zwischen mehreren Aufgaben. Au\u00dferdem steigt mit der Gr\u00f6\u00dfe des Kontext oftmals auch der Aufwand f\u00fcr die Erzeugung eines neuen Kontextes, was insbesondere, wenn man viele kleinere Aufgaben parallelisieren m\u00f6chte von gro\u00dfer Bedeutung ist.<\/p>\n<p>Eine relativ gute Einf\u00fchrung (leider hinter einer <a href=\"http:\/\/www.bugmenot.com\/view\/linux-mag.com\">via BugMeNot umgehbaren<\/a> Paywall *sigh*) gibt es beim <a href=\"http:\/\/www.linux-mag.com\/id\/792\/\">Linux Magazine<\/a>. Auch die beiden Videos mit der Erkl\u00e4rung kann ich w\u00e4rmstens ans Herz legen.<\/p>\n<p>Nach diesem kurzen Abstecher nun wieder zur\u00fcck zum eigentlichen Thema dieses Beitrages, denn wie auch im verlinkten Beitrag, sowie den beiden darin enthaltenen Videos erkl\u00e4rt, m\u00f6chte man in aller Regel den Setup-Aufwand f\u00fcr viele kleine Aufgaben m\u00f6glichst gering halten. Zu diesem Zweck bietet Windows sogenannte <a href=\"http:\/\/msdn.microsoft.com\/en-us\/library\/ms686766(v=vs.85).aspx\">Threadpools<\/a>. Etwas \u00e4hnliches bietet Linux nicht von Haus aus; doch das l\u00e4sst sich recht einfach \u00e4ndern.<\/p>\n<p>F\u00fcr ein eigenes Projekt unter Linux war ich n\u00e4mlich selber auf der Suche und wurde nach etwas Suchen auch bereits f\u00fcndig. Die <a href=\"http:\/\/people.clarkson.edu\/~jmatthew\/cs644.archive\/cs644.fa2001\/proj\/pthreadsPool\/\">gefundene Implementation<\/a> ist zwar funktional und minimalistisch gehalten, hat jedoch ein paar kleinere Ecken und Kanten, weshalb ich sie f\u00fcr meine Zwecke noch einmal \u00fcberarbeitet habe.<\/p>\n<p>Da der Source somit eh einmal komplett bearbeitet wurde, nutze ich die Chance (insbesondere auf Grund des vielfachen Wunsches eines einzelnen Herren), um die Funktionsweise einmal direkt am Source des Thread Pools zu erkl\u00e4ren. Eine Vorwarnung m\u00f6chte ich aber bereits jetzt geben: Das wird etwas technisch \ud83d\ude09<!--more--><\/p>\n<p>Okay, Ihr wolltet es so! Als ersten Schritt auf dem Weg zu unserem Thread Pool schauen wir uns die ben\u00f6tigten Informationen an. Hierzu werfen wir zuerst einen Blick in den Header des Threadpools:<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n\/\/ Type to tell if threads in the pool should keep running or should terminate\r\ntypedef enum threadpool_state {\r\n    ALL_RUN,\r\n    ALL_EXIT\r\n} threadpool_state_t;\r\n\r\ntypedef struct threadpool {\r\n    size_t                          size;           \/\/ Number of entries in array.\r\n    size_t                          live;           \/\/ Number of live threads in pool (when\r\n                                                    \/\/   pool is being destroyed, live&lt;=size)\r\n    threadpool_state_t              state;          \/\/ Threads check this before getting job.\r\n    pthread_t *                     array;          \/\/ The threads themselves.\r\n\r\n    threadpool_queue_head_t *       queue;          \/\/ queue of work orders\r\n\r\n    pthread_mutex_t                 mutex;          \/\/ protects all vars declared below.\r\n    pthread_cond_t                  job_posted;     \/\/ dispatcher: \"Hey guys, there's a job!\"\r\n    pthread_cond_t                  job_taken;      \/\/ a worker: \"Got it!\"\r\n\r\n    struct timeval                  created;        \/\/ When the threadpool was created.\r\n} threadpool_t;\r\n<\/pre>\n<p>Die Struktur ist an sich nicht weiter spannend. Neben der Gr\u00f6\u00dfe und der Anzahl laufender Threads im Pool enth\u00e4lt diese Struktur noch den Status (ob der Pool aktiv ist, oder beendet werden soll), sowie die Liste aller Threads, die zu diesem Pool geh\u00f6ren. Als letztes ist hier noch ein Feld f\u00fcr den Zeitpunkt, wann dieser Pool erzeugt wurde. An sich also recht unspektakul\u00e4r.<\/p>\n<p>Die vier gerade nicht erw\u00e4hnten Felder machen nun die gesamte Magie des Thread Pools aus: W\u00e4hrend in queue die gesamten Arbeitsauftr\u00e4ge stehen, sorgen mutex, job_posted und job_taken f\u00fcr die Synchronisation der Arbeit. Aber zur Synchronisation nachher gleich mehr, denn zuerst werfen wir einmal einen kurzen Blick auf die Auftragswarteschlange.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n\/\/ \"dispatch_func_t\" declares a typed function pointer.\r\n\/\/ A variable of type \"dispatch_func_t\" points to a \r\n\/\/ function with the following signature:\r\n\/\/     void dispatch_function(void *arg);\r\ntypedef void (* dispatch_func_t)(void *);\r\n\r\ntypedef struct threadpool_queue_node {\r\n    struct threadpool_queue_node *  next;\r\n    struct threadpool_queue_node *  prev;\r\n\r\n    dispatch_func_t                 job_func;\r\n    void *                          job_arg;\r\n    dispatch_func_t                 cleanup_func;\r\n    void *                          cleanup_arg;\r\n} threadpool_queue_node_t;\r\n\r\ntypedef struct threadpool_queue_head {\r\n    threadpool_queue_node_t *       head;\r\n    threadpool_queue_node_t *       tail;\r\n\r\n    threadpool_queue_node_t *       freeHead;\r\n    threadpool_queue_node_t *       freeTail;\r\n\r\n    size_t                          capacity_cur;\r\n    size_t                          capacity_max;\r\n} threadpool_queue_head_t;\r\n<\/pre>\n<p>Okay, dieser Teil besteht aus 2 Teilen: Zum Einen aus einer doppelt verketteten Liste und zum anderen aus den Auftr\u00e4gen, die in dieser verwaltet werden. Die Art der Nutzung der Warteschlange sorgt daf\u00fcr, dass alle Arbeitsauftr\u00e4ge in der Liste zyklisch beschrieben werden und somit eine Art Ringpuffer entsteht. Die Kapazit\u00e4t wird hierbei von zwei Feldern geregelt: capacity_cur (aktuelle Anzahl genutzter Eintr\u00e4ge) und capacity_max (Maximal verf\u00fcgbare Queue-L\u00e4nge). Diese Werte werden beim Erzeugen des Threadpools gesetzt, bzw. dynamisch in bestimmten Grenzen angepasst, wenn die Warteschlange vergr\u00f6\u00dfert werden muss. Auch hierzu gleich mehr.<\/p>\n<p>Denn viel spannender als eine stumpfe doppelt verkettete Liste ist deren Inhalt. In unserem Fall handelt es sich dabei um die auszuf\u00fchrenden Auftr\u00e4ge. Diese sind aus 2 Callbacks und den dazugeh\u00f6rigen Argumenten aufgebaut. Ein Callback ist dabei eine einfache Funktion, der ein Void-Pointer \u00fcbergeben werden kann: Quasi also beliebige Daten \ud83d\ude09<\/p>\n<p>Wer sich nun wundert, warum es neben einer Job-Funktion auch immer eine Cleanup-Funktion gibt, dem sei gesagt, dass Speicher gerne aufger\u00e4umt werden m\u00f6chte. Gleiches gilt auch f\u00fcr andere Ressourcen, die man sich vom Betriebssystem holt. Und selbst wenn man das f\u00fcr seinen Arbeitsauftrag an sich tut, kann es passieren, dass dieser auf Grund des Multithreading abgebrochen wird, bevor er die angeforderten Ressourcen wieder korrekt freigeben konnte. Und genau an dieser Stelle springt einem die Cleanup-Funktion dann zur Seite, da diese automatisch ausgef\u00fchrt wird, wenn der Arbeitsauftrag abgebrochen wurde.<\/p>\n<p>Nach dem wir nun den prinzipiellen Aufbau des Threadpools im Speicher gekl\u00e4rt haben, k\u00f6nnen wir uns dem wichtigen Teil zuwenden: Der eigentlichen Implementierung. Bei dieser beginnen wir der Einfachheit halber mit dem Erstellen des Pools, da dieses noch vergleichsweise Straight Forward ist.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n\/*\r\n * Create a thread pool.\r\n *\/\r\nthreadpool_t *threadpool_create(int poolsize) {\r\n    threadpool_t *pool;     \/\/ pool we create and hand back\r\n    int i;                  \/\/ work var\r\n\r\n    \/\/ sanity check the argument\r\n    if ((poolsize &lt;= 0) || (poolsize &gt; THREADPOOL_MAX_SIZE)) {\r\n        return NULL;\r\n    }\r\n\r\n    \/\/ create the threadpool_t struct\r\n    pool = (threadpool_t *)malloc(sizeof(threadpool_t));\r\n    if (pool == NULL) {\r\n        fprintf(stderr, \"\\n\\nOut of memory creating a new threadpool!\\n\");\r\n        return NULL;\r\n    }\r\n\r\n    \/\/ initialize everything but the array and live thread count\r\n    gettimeofday(&amp;pool-&gt;created, NULL);\r\n    pool-&gt;size = poolsize;\r\n    pool-&gt;state = ALL_RUN;\r\n    pthread_mutex_init(&amp;(pool-&gt;mutex), NULL);\r\n    pthread_cond_init(&amp;(pool-&gt;job_posted), NULL);\r\n    pthread_cond_init(&amp;(pool-&gt;job_taken), NULL);\r\n\r\n    \/\/ create the array of threads within the pool\r\n    pool-&gt;array = (pthread_t *) malloc(pool-&gt;size * sizeof(pthread_t));\r\n    if (!pool-&gt;array) {\r\n        fprintf(stderr, \"\\n\\nOut of memory allocating thread array!\\n\");\r\n        free(pool);\r\n        pool = NULL;\r\n        return NULL;\r\n    }\r\n\r\n    pool-&gt;queue = _threadpool_makeQueue(poolsize);\r\n    if (!pool-&gt;queue) {\r\n        fprintf(stderr, \"\\n\\nOut of memory allocating task queue!\\n\");\r\n        free(pool-&gt;array);\r\n        free(pool);\r\n        pool = NULL;\r\n        return NULL;\r\n    }\r\n\r\n    \/\/ bring each thread to life (update counters in loop so threads can\r\n    \/\/   access pool-&gt;live to find out their ID#\r\n    for (i = 0; i &lt; pool-&gt;size; ++i) {\r\n        if (0 != pthread_create(pool-&gt;array + i, NULL, (void *(*)(void *))_threadpool_dowork, (void *) pool)) {\r\n            perror(\"\\n\\nThread creation failed:\");\r\n            \/\/ TODO: Do proper finalization!\r\n            exit(EXIT_FAILURE);\r\n            return NULL;\r\n        }\r\n\r\n        \/\/ automatic cleanup when thread exits.\r\n        pthread_detach(pool-&gt;array[i]);\r\n    }\r\n\r\n    return pool;\r\n}\r\n<\/pre>\n<p>Ich denke, weite Teile dieser Routine d\u00fcrften selbsterkl\u00e4rend sein. Einzig die letzte Schleife, in der unsere Arbeitsthreads erzeugt werden bedarf wahrscheinlich etwas Erl\u00e4uterung:<\/p>\n<pre lang=\"c\" escaped=\"true\" highlight=\"4,12\">\r\n    \/\/ bring each thread to life (update counters in loop so threads can\r\n    \/\/   access pool-&gt;live to find out their ID#\r\n    for (i = 0; i &lt; pool-&gt;size; ++i) {\r\n        if (0 != pthread_create(pool-&gt;array + i, NULL, (void *(*)(void *))_threadpool_dowork, (void *) pool)) {\r\n            perror(\"\\n\\nThread creation failed:\");\r\n            \/\/ TODO: Do proper finalization!\r\n            exit(EXIT_FAILURE);\r\n            return NULL;\r\n        }\r\n\r\n        \/\/ automatic cleanup when thread exits.\r\n        pthread_detach(pool-&gt;array[i]);\r\n    }\r\n<\/pre>\n<p>Mit pthread_create wird hierbei ein neuer pthread als Kind des aktuellen Threads erzeugt. Dieser w\u00fcrde normalerweise erst dann finalisiert werden, wenn unser Thread auch beendet wird. Um den Arbeitsthread unabh\u00e4ngig von unserem aktuellen Thread (ja, das Hauptprogramm ist auch nur ein Thread), auszuf\u00fchren, m\u00fcssen wir diesen abkoppeln. Dies geschieht in der letzten Zeile der Schleife.<\/p>\n<p>Im Kontext der Create-Methode werden nun noch zwei weitere Methoden referenziert. Die erste ist _threadpool_makeQueue, der die gew\u00fcnschte Gr\u00f6\u00dfe des Thread Pools \u00fcbergeben wird und die entsprechend die oben erw\u00e4hnte, doppelt verkettete Liste initialisiert. Bliebe noch die zweite Funktion zu erw\u00e4hnen: _threadpool_dowork.<\/p>\n<p>Wie der Name dieser Funktion bereits vermuten l\u00e4sst, erledigt sie die ganze Arbeit unseres Thread Pools. Werfen wir also einfach einen Blick drauf:<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n\/*\r\n * Define the life of a working thread.\r\n *\/\r\nvoid *_threadpool_dowork(threadpool_t *pool) {\r\n    \/\/ Remember my creation sequence number\r\n\/\/    int myid =\r\n    __sync_add_and_fetch(&amp;pool-&gt;live, 1);\r\n\r\n    \/\/ When we get a posted job, we copy it into these local vars.\r\n    dispatch_func_t     job_func;\r\n    void *              job_arg;\r\n\r\n    dispatch_func_t     cleanup_func;\r\n    void *              cleanup_arg;\r\n\r\n    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);\r\n    pthread_cleanup_push((dispatch_func_t)pthread_mutex_unlock, (void *) &amp;pool-&gt;mutex);\r\n\r\n    \/\/ Grab mutex so we can begin waiting for a job\r\n    if (0 != pthread_mutex_lock(&amp;pool-&gt;mutex)) {\r\n        perror(\"\\nMutex lock failed!:\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n\r\n    \/\/ Main loop: wait for job posting, do job(s) ... forever\r\n    while(1) {\r\n\r\n        while(!_threadpool_jobAvailable(pool-&gt;queue)) {\r\n            pthread_cond_wait(&amp;pool-&gt;job_posted, &amp;pool-&gt;mutex);\r\n        }\r\n\r\n        \/\/ We've just woken up and we have the mutex.  Check pool's state\r\n        if (ALL_EXIT == pool-&gt;state) {\r\n            break;\r\n        }\r\n\r\n        \/\/ while we find work to do\r\n        _threadpool_getWorkOrder(pool-&gt;queue, &amp;job_func, &amp;job_arg, &amp;cleanup_func, &amp;cleanup_arg);\r\n        pthread_cond_signal(&amp;pool-&gt;job_taken);\r\n\r\n        \/\/ Yield mutex so other jobs can be posted\r\n        if (0 != pthread_mutex_unlock(&amp;pool-&gt;mutex)) {\r\n            perror(\"\\n\\nMutex unlock failed!:\");\r\n            exit(EXIT_FAILURE);\r\n        }\r\n\r\n        \/\/ Run the job we've taken\r\n        if(cleanup_func) {\r\n            pthread_cleanup_push(cleanup_func, cleanup_arg);\r\n        }\r\n\r\n        \/\/GCC fails if I DON'T write a while loop here. Let's make it happy!\r\n        while(0);\r\n\r\n        job_func(job_arg);\r\n\r\n        if(cleanup_func) {\r\n            pthread_cleanup_pop(1);\r\n        }\r\n\r\n        \/\/ Grab mutex so we can grab posted job, or (if no job is posted)\r\n        \/\/   begin waiting for next posting.\r\n        if (0 != pthread_mutex_lock(&amp;pool-&gt;mutex)) {\r\n            perror(\"\\n\\nMutex lock failed!:\");\r\n            exit(EXIT_FAILURE);\r\n        }\r\n    }\r\n\r\n    \/\/ If we get here, we broke from loop because state is ALL_EXIT.\r\n    __sync_sub_and_fetch(&amp;pool-&gt;live, 1);\r\n\r\n    \/\/ We're not really taking a job ... but this signals the destroyer\r\n    \/\/   that one thread has exited, so it can keep on destroying.\r\n    pthread_cond_signal(&amp;pool-&gt;job_taken);\r\n\r\n    if (0 != pthread_mutex_unlock(&amp;pool-&gt;mutex)) {\r\n        perror(\"\\n\\nMutex unlock failed!:\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n\r\n    pthread_cleanup_pop(1);\r\n    return NULL;\r\n}\r\n<\/pre>\n<p>Auch hier ist der Ablauf an sich relativ einfach, aber wie versprochen, wollte ich ja noch mal auf das Thema Synchronisation zu sprechen kommen. Und genau diese Funktion bietet einen wunderbaren Ansatz daf\u00fcr, weil sie alles enth\u00e4lt, was es an Synchronisationsaufgaben zu erledigen gibt.<\/p>\n<p>Das Grundproblem bei nebenl\u00e4ufigen Anwendungen ist n\u00e4mlich, wie man sich auf einen bestimmten Stand einigt, den alle Beteiligten koh\u00e4rent f\u00fcr ihre Berechnungen benutzen. Nehmen wir hierzu einmal an, auf dem Tisch vor uns steht eine bereits geschnittene Torte mit X Tortenst\u00fccken drauf. Ferner k\u00f6nnen wir f\u00fcr jedes der St\u00fccken nur entweder feststellen, ob es noch da ist, oder uns ein St\u00fcck Torte wegnehmen. Wenn wir nur einen Berechtigten haben, der die St\u00fccken der Torte verteilt, ist die Aufgabe relativ klar und die Situation unproblematisch: Derjenige guckt nach, ob ein St\u00fcck noch da ist und nimmt sich dieses St\u00fcck im n\u00e4chsten Zeitschritt einfach. Nun ist so eine Torte unter Party-G\u00e4sten sehr begehrt und das DRM beim Zugriff auf die einzelnen St\u00fccken nat\u00fcrlich nicht existent. Auch erfolgt der Zugriff auf die Torte durch jeden Gast \u00fcblicherweise gleichzeitig, denn alle wollen die leckere Torte gern essen.<\/p>\n<p>Nehmen wir an, die Tortenst\u00fccke sind unterscheidbar (die Menge der Tortenst\u00fccke brauch jedoch nicht abz\u00e4hlbar sein ;-)), so kann es passieren, dass zwei G\u00e4ste das selbe Tortenst\u00fcck essen m\u00f6chten. Da der Zugriff der G\u00e4ste aber nicht synchronisiert erfolgt, k\u00f6nnen folgende 3 Situationen auftreten:<\/p>\n<ol>\n<li>Gast 1 pr\u00fcft, dass ein Tortenst\u00fcck t\u2208Torte von Gast 1 auf Verf\u00fcgbarkeit gepr\u00fcft wird und Gast 2 erst im n\u00e4chsten Schritt die Verf\u00fcgbarkeit pr\u00fcft. Je nachdem, ob diese Pr\u00fcfung von Gast 2 nun schneller vorgenommen wird, als Gast 1 brauch, um das St\u00fcck Torte vom Tisch zu nehmen, ist f\u00fcr Gast 2 das St\u00fcck Torte da oder nicht. Das Ergebnis der \u00dcberpr\u00fcfung ist also nicht immer konsistent. Zudem kann es passieren, dass Gast 2 im Folgeschritt versucht, sich ein St\u00fcck Torte zu nehmen, welches gar nicht mehr da ist.<\/li>\n<li>Gast 1 und 2 fangen gleichzeitig an, zu pr\u00fcfen, ob das selbe St\u00fcck Torte noch da ist, sehen beide, dass dieses noch da ist und werden also im n\u00e4chsten Schritt gleichzeitig versuchen, sich das Selbe St\u00fcck Torte zu nehmen.<\/li>\n<li>Gast 1 pr\u00fcft vor Gast 2 die Verf\u00fcgbarkeit des St\u00fcck Torte, kann sich aber nicht im n\u00e4chsten Schritt das St\u00fcck Torte holen, weil ihm noch ein Tortenheber fehlt. W\u00e4hrend er sich diesen aus der K\u00fcche holt, pr\u00fcft Gast 2 f\u00fcr das selbe St\u00fcck Torte die Verf\u00fcgbarkeit und nimmt sich dieses weg, bevor Gast 1 mit seinem Tortenheber zur\u00fcck ist, um vergeblich zu versuchen, das nun nicht mehr vorhandene St\u00fcck Torte zu nehmen.<\/li>\n<\/ol>\n<p>In allen 3 F\u00e4llen d\u00fcrfte der Spa\u00df der G\u00e4ste durch die unzureichende Synchronisation des Zugriffs auf die Torte getr\u00fcbt werden. Was k\u00f6nnen wir also tun? Die erste Variante w\u00e4re, nur genau einen die Tortenst\u00fccke austeilen zu lassen. Da wir dann aber nicht mehr sehr effizient die Torte verteilen k\u00f6nnten (wir nehmen einmal an, von unseren N G\u00e4sten k\u00f6nnen fast alle gleichzeitig mindestens ein Tortenst\u00fcck erreichen), weil sonst jeder sich anstellen m\u00fcsste, verwerfen wir diesen Vorschlag: Wir wollen das Verteilen der Torte ja schlie\u00dflich parallelisieren, so dass jeder schnellstm\u00f6glich sein St\u00fcck bekommt.<\/p>\n<p>Ein Ansatz, der hier auffallen d\u00fcrfte, ist das Problem, dass wir es irgendwie schaffen m\u00fcssen, dass die G\u00e4ste koordinieren k\u00f6nnen, welche St\u00fccken bereits &#8222;vergeben&#8220; sind, da wir dann Zugriffe auf das selbe St\u00fcck Kuchen unterbinden k\u00f6nnen. F\u00fchren wir dazu eine Operation &#8222;Nachschauen und Markieren&#8220; ein, die auf dem St\u00fcck Torte ein F\u00e4hnchen setzt, sobald das St\u00fcck Torte von jemandem als seines markiert wird. Zus\u00e4tzlich sind die F\u00e4hnchen so beschaffen, dass immer nur eines gesetzt werden kann, aber jeder wei\u00df, wenn er sein eigenes F\u00e4hnchen korrekt gesetzt hat: Wenn Zwei ein F\u00e4hnchen versuchen zu setzen, hat einer von beiden (z.B. vom Universum ausgew\u00fcrfelt ;-)) Gl\u00fcck, w\u00e4hrend der andere eindeutig wei\u00df, dass dieses St\u00fcck nicht verf\u00fcgbar ist (unabh\u00e4ngig, davon, ob er wei\u00df, ob ein St\u00fcck Kuchen da ist oder nicht).<\/p>\n<p>F\u00fcr die drei F\u00e4lle oben hei\u00dft das Folgendes:<\/p>\n<ol>\n<li>Gast 1 und 2 bekommen vom Universum ausgew\u00fcrfelt, wer seine Markierung setzen darf. Der Gast, dessen F\u00e4hnchen nun auf dem St\u00fcck Torte prangt, nimmt sich dieses und entfernt damit auch das F\u00e4hnchen wieder. Der unterlegene Gast kann nun sein F\u00e4hnchen ins Nix stellen, pr\u00fcfen, dass da nix mehr ist und sucht sich ein aderes St\u00fcck Torte.<\/li>\n<li>Gast 1 setzt sein F\u00e4hnchen. Gast 2 sieht das F\u00e4hnchen und kann seines daher nicht setzen. Er wartet also auf Gast 1, bis dieser das F\u00e4hnchen wieder entfernt hat (und das St\u00fcck Torte).<\/li>\n<li>Gast 1 setzt sein F\u00e4hnchen ungest\u00f6rt, Gast 2 kann seines nicht setzen und muss warten. Gast 1 nimmt sich irgendwann sein St\u00fcck Torte und gibt den Zugriff f\u00fcr Gast 2 frei, der feststellt, dass da kein St\u00fcck Torte mehr ist. Gast 2 sucht sich ein anderes.<\/li>\n<\/ol>\n<p>Und schon sind alle unsere G\u00e4ste zufrieden, weil sie immer garantiert bekommen, dass sie ein St\u00fcck Torte auch bekommen, wenn sie einmal festgestellt haben, dass eines da ist, bzw. sie sich ein anderes suchen m\u00fcssen, wenn keines mehr am gew\u00fcnschten Ort liegt.<\/p>\n<p>Was bedeutet das nun f\u00fcr unser Problem im Thread Pool? Nun, wie auch unsere G\u00e4ste mit den Tortenst\u00fccken haben unsere Threads im Thread Pool das Problem, dass auf Speicherbereiche zugegriffen werden muss. Zus\u00e4tzlich zu Speicher vorhanden oder nicht, m\u00fcssen unsere Threads jedoch auch mit Ver\u00e4nderungen zurecht kommen, um einen koh\u00e4renten Stand der Abarbeitung zu gew\u00e4hrleisten, d.h. so dass niemand zwischen dem Lesen einer Speicherstelle und deren Ver\u00e4nderung eine weitere Ver\u00e4nderung vornehmen kann.<\/p>\n<p>Bei Threads wird diese Synchronisation der Zugriffe typischer Weise durch mehrere Techniken realisiert:<\/p>\n<ul>\n<li>Critical Sections: Code-Bereiche, die immer nur von einem Thread gleichzeitig durchlaufen werden k\u00f6nnen.<\/li>\n<li>Semaphoren: Z\u00e4hler, die die Anzahl freier Ressourcen z\u00e4hlen. Sind keine Ressourcen verf\u00fcgbar, muss man auf die Freigabe warten.<\/li>\n<li>Mutex (AKA bin\u00e4rer Semaphor): Wie ein Semaphor, nur halt begrenzt auf die Werte 1 und 0 ;-), also Verf\u00fcgbar oder belegt.<\/li>\n<li>Locks: Sperren f\u00fcr Lese- oder Schreibvorg\u00e4nge, bzw. ggf. auch andere Vorg\u00e4nge<\/li>\n<\/ul>\n<p>M\u00f6chte man, dass die Threads untereinander kooperieren, ist das Signalisieren von Status\u00e4nderungen oftmals von Bedeutung. Hierf\u00fcr kann man Events (Ereignisse, auf deren Eintreten passiv gewartet werden kann) bzw. Condition Variablen (Variablen, die \u00fcber einen Mutex synchronisiert werden) verwenden.<\/p>\n<p>Nach dem wir nun die Grundlagen f\u00fcr die Erkl\u00e4rung der verwendeten Funktionalit\u00e4ten gelegt haben, gehen wir einfach einmal der Reihe nach  durch:<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n\/*\r\n * Define the life of a working thread.\r\n *\/\r\nvoid *_threadpool_dowork(threadpool_t *pool) {\r\n    \/\/ Remember my creation sequence number\r\n\/\/    int myid =\r\n    __sync_add_and_fetch(&amp;pool-&gt;live, 1);\r\n<\/pre>\n<p>Kaum in der Routine und wir haben es bereits mit Synchronisation zu tun \ud83d\ude1b Was diese Zeile macht ist an sich etwas ganz triviales: Wir z\u00e4hlen, wieviele Threads grad aktiv sind. Und da wir grade zum Leben erweckt wurden, teilen wir dies der Pool-Verwaltung mit. Nun machen das au\u00dfer uns noch X andere Threads, was zum oben erw\u00e4hnten Problem mit dem gleichzeitigen Zugriff auf den Speicher f\u00fchrt. Damit dies nicht passiert nutzen wir einen sogenannten atomaren Inkrement. Dies ist eine Spezielle Art eine Variable zu inkrementieren, bei der uns vom Compiler garantiert wird, dass der erzeugte Code auf der Zielplattform ohne Unterbrechung, also in einem einzigen Schritt, ausgef\u00fchrt wird. Auf diese Weise kann also immer nur ein Thread gleichzeitig sein Auferstehen mitteilen, w\u00e4hrend seine Konkurrenten warten m\u00fcssen.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    \/\/ When we get a posted job, we copy it into these local vars.\r\n    dispatch_func_t     job_func;\r\n    void *              job_arg;\r\n\r\n    dispatch_func_t     cleanup_func;\r\n    void *              cleanup_arg;\r\n<\/pre>\n<p>In diesen 4 Variablen merken wir uns sp\u00e4ter, was wir derzeit zu haben. Die Job-Funktion ist dabei die regul\u00e4r auszuf\u00fchrende Funktion, w\u00e4hrend die Cleanup-Funktion nur ausgef\u00fchrt wird, wenn wir w\u00e4hrend der Ausf\u00fchrung unseres Arbeitsauftrags unterbrochen werden.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);\r\n    pthread_cleanup_push((dispatch_func_t)pthread_mutex_unlock, (void *) &amp;pool-&gt;mutex);\r\n<\/pre>\n<p>Und die erste Magie in unserer Routine. Diese ist n\u00f6tig, um dem Betriebssystem mitzuteilen, dass es unseren Thread jederzeit unterbrechen bzw. beenden darf. Zus\u00e4tzlich teilen wir dem Betriebssystem mit, dass wir im Falle einer Unterbrechung bitte ein Unlock des Pool-Mutex ausf\u00fchren m\u00f6chten. Diesen holen wir uns n\u00e4mlich im n\u00e4chsten Schritt und m\u00fcssen daher daf\u00fcr sorgen, dass wir im Falle einer Unterbrechung diesen freigeben, damit wir in keine Deadlocks oder Starving-Situationen laufen k\u00f6nnen.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    \/\/ Grab mutex so we can begin waiting for a job\r\n    if (0 != pthread_mutex_lock(&amp;pool-&gt;mutex)) {\r\n        perror(\"\\nMutex lock failed!:\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n<\/pre>\n<p>Dadurch, dass wir uns den Mutex geholt haben, stellen wir sicher, dass sich die Threads im Pool in eine gewisse Ordnung beim Warten auf Arbeit begeben und keine zwei Threads gleichzeitig versuchen aus der Arbeitswarteschlange zu lesen.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    \/\/ Main loop: wait for job posting, do job(s) ... forever\r\n    while(1) {\r\n\r\n        while(!_threadpool_jobAvailable(pool-&gt;queue)) {\r\n            pthread_cond_wait(&amp;pool-&gt;job_posted, &amp;pool-&gt;mutex);\r\n        }\r\n<\/pre>\n<p>Die Aufgabe unseres Workers ist, sich bis an sein Lebensende Aufgaben zu holen. Dazu fragen wir ab, ob in der Warteschlange Arbeit vorhanden ist. Da die Warteschlange diese Information nicht atomar liefern kann, darf immer nur ein Thread gleichzeitig nachschauen. Da wir zu diesem Zeitpunkt jedoch den Pool-Mutex (siehe vorigen Abschnitt) haben, ist dies gegeben. Da wir aber f\u00fcr den Fall, dass wir keine Arbeit finden anderen eine Chance geben m\u00fcssen, uns Arbeit zu hinterlegen, geben wir mit Hilfe einer Condition-Variable den Pool-Mutex frei. Dies friert unseren eigenen Thread und gibt gleichzeitig den Mutex f\u00fcr andere frei. Sobald jemand neue Arbeit signalisiert, wird unser Thread wieder aus seinem Schlaf erweckt und pr\u00fcft erneut die Verf\u00fcgbarkeit von Arbeit. Beim Aufwecken wird uns zudem wieder der Besitz des Mutex zugestanden, so dass wir wieder garantiert der einzige sind, der auf den Pool und seinen Inhalt zugreift.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n        \/\/ We've just woken up and we have the mutex.  Check pool's state\r\n        if (ALL_EXIT == pool-&gt;state) {\r\n            break;\r\n        }\r\n<\/pre>\n<p>An dieser Stelle \u00fcberpr\u00fcfen wir, bevor wir versuchen, wirklich etwas zu tun, ob unsere anstehende Aufgabe ist, uns selber zu beenden. Wenn dies der Fall ist, dann verlassen wir einfach unsere Arbeitsschleife und starten die Aufr\u00e4umarbeiten. Dazu aber gleich mehr. Denn aller Voraussicht wird es was zu tun geben.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n        \/\/ while we find work to do\r\n        _threadpool_getWorkOrder(pool-&gt;queue, &amp;job_func, &amp;job_arg, &amp;cleanup_func, &amp;cleanup_arg);\r\n        pthread_cond_signal(&amp;pool-&gt;job_taken);\r\n<\/pre>\n<p>Und da wir gewissenhafte Arbeiter sind, fragen wir einfach nach, was unser Auftrag ist und teilen unserer Verwaltung mit, dass wir uns einen Arbeitsauftrag genommen haben. Das Bescheidgeben erfolgt hierbei mit dem Gegenst\u00fcck zur weiter oben bereits gesehenen Funktion pthread_cond_wait. Die Gegenstelle werden wir an anderer Stelle noch genauer erl\u00e4utern.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n        \/\/ Yield mutex so other jobs can be posted\r\n        if (0 != pthread_mutex_unlock(&amp;pool-&gt;mutex)) {\r\n            perror(\"\\n\\nMutex unlock failed!:\");\r\n            exit(EXIT_FAILURE);\r\n        }\r\n<\/pre>\n<p>Da wir nun unsere Arbeit haben, k\u00f6nnen wir auch andere auf die Verwaltungsstruktur zugreifen lassen, da wir bis zur Beendigung unserer Arbeit keinen weiteren Zugriff auf die Verwaltungsstruktur ben\u00f6tigen.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n        \/\/ Run the job we've taken\r\n        if(cleanup_func) {\r\n            pthread_cleanup_push(cleanup_func, cleanup_arg);\r\n        }\r\n\r\n        job_func(job_arg);\r\n\r\n        if(cleanup_func) {\r\n            pthread_cleanup_pop(1);\r\n        }\r\n<\/pre>\n<p>Aufmerksamen Lesern meines Blogs d\u00fcrfte <a href=\"http:\/\/blog.benny-baumann.de\/?p=1078\">dieser Code-Schnipsel bekannt vorkommen<\/a>. Ja, an dieser Stelle hatte der C-Compiler nen Bug, da ich aber die Funktionsweise an sich erkl\u00e4ren m\u00f6chte, verzichte ich auf den zugeh\u00f6rigen Bugfix, da der eh rausoptimiert wird \ud83d\ude09<\/p>\n<p>Die beiden If-Abfragen kl\u00e4ren hierbei ab, ob im Falle eines Abbruchs der zu erledigenden Aufgabe etwas Spezielles zu tun ist. Ist dies der Fall, wird \u00e4hnlich wie bereits oben mit dem Thread Pool eine entsprechende Cleanup-Routine gesetzt, bzw. diese wieder entfernt, wenn deren Ausf\u00fchrung doch nicht notwendig gewesen sein sollte. Und zwischendurch wird halt der eigentliche Arbeitsauftrag ausgef\u00fchrt.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n        \/\/ Grab mutex so we can grab posted job, or (if no job is posted)\r\n        \/\/   begin waiting for next posting.\r\n        if (0 != pthread_mutex_lock(&amp;pool-&gt;mutex)) {\r\n            perror(\"\\n\\nMutex lock failed!:\");\r\n            exit(EXIT_FAILURE);\r\n        }\r\n    }\r\n<\/pre>\n<p>Okay, und damit w\u00e4hren wir auch schon fast durch durch einen Arbeitszyklus. Am Ende m\u00fcssen wir uns n\u00e4mlich nur noch die alleinige Kontrolle \u00fcber den Thread Pool wiederholen und wir k\u00f6nnen uns beruhigt auf die Lauer nach neuer Arbeit begeben.<\/p>\n<p>Nun aber zum Teil des Aufr\u00e4umens:<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    \/\/ If we get here, we broke from loop because state is ALL_EXIT.\r\n    __sync_sub_and_fetch(&amp;pool-&gt;live, 1);\r\n<\/pre>\n<p>Wer oben gut aufgepasst hat, kann sich diese Zeile hoffentlich recht einfach herleiten: Wir sagen einfach bescheid, dass wir nicht mehr aktiv sind. Dies ist n\u00f6tig, um in der Pool-Verwaltung zu wissen, wann keiner mehr am Arbeiten ist.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    \/\/ We're not really taking a job ... but this signals the destroyer\r\n    \/\/   that one thread has exited, so it can keep on destroying.\r\n    pthread_cond_signal(&amp;pool-&gt;job_taken);\r\n<\/pre>\n<p>An dieser Stelle erkl\u00e4rt hoffentlich der Code-Kommentar alles. Also weiter im Source!<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    if (0 != pthread_mutex_unlock(&amp;pool-&gt;mutex)) {\r\n        perror(\"\\n\\nMutex unlock failed!:\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n<\/pre>\n<p>Und schlie\u00dflich m\u00fcssen wir noch den Zugriff auf den Thread Pool wieder freigeben, damit auch wieder andere auf diesen zugreifen d\u00fcrfen.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    pthread_cleanup_pop(1);\r\n    return NULL;\r\n}\r\n<\/pre>\n<p>Da wir aber vom Anfang immer noch die Freigabe des Mutex als eine der zu erledigenden Aufgaben beim Beenden unseres Threads gesetzt haben, m\u00fcssen wir dies noch als erledigt kennzeichnen, d.h. diese Aufgabe entfernen.<\/p>\n<p>Womit wir auch das Erledigen von Arbeit behandelt h\u00e4tten! Einfach, oder? K\u00f6nnen wir also fortsetzen. \ud83d\ude09 Hierzu gehen wir einfach zur Funktion, die uns mitteilt, ob es Arbeit gibt. Diese sieht wie folgt aus:<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\nint _threadpool_jobAvailable(threadpool_queue_head_t *queue)\r\n{\r\n    return queue-&gt;tail != NULL;\r\n}\r\n<\/pre>\n<p>An sich also nicht&#8217;s wirklich Spektakul\u00e4res. Bei der Queue handelt es sich um eine doppelt verkette Liste, wodurch die Zugriffe auf diese relativ normal realisiert werden k\u00f6nnen (und in konstanter Zeit ablaufen):<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\nthreadpool_queue_head_t *_threadpool_makeQueue(int initial_cap) {\r\n    int max_cap = THREADPOOL_MAX_QUEUE \/ (sizeof(threadpool_queue_node_t));\r\n    int i;\r\n\r\n    threadpool_queue_head_t *queue = (threadpool_queue_head_t *) malloc(sizeof(threadpool_queue_head_t));\r\n\r\n    threadpool_queue_node_t *temp;\r\n\r\n    if(!queue) {\r\n        perror(\"Out of memory on malloc\\n\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n\r\n    if(initial_cap &gt; max_cap) {\r\n        initial_cap = max_cap;\r\n    }\r\n\r\n    if(initial_cap &lt; 1) {\r\n        perror(\"Attempting to create a queue that holds no work orders\\n\");\r\n        free(queue);\r\n        exit(EXIT_FAILURE);\r\n        return NULL;\r\n    }\r\n\r\n    queue-&gt;capacity_cur = initial_cap;\r\n    queue-&gt;capacity_max = max_cap;\r\n\r\n    queue-&gt;head = NULL;\r\n    queue-&gt;tail = NULL;\r\n\r\n    queue-&gt;freeHead = (threadpool_queue_node_t *) malloc(sizeof(threadpool_queue_node_t));\r\n\r\n    if(!queue-&gt;freeHead) {\r\n        perror(\"Out of memory on malloc\\n\");\r\n        free(queue);\r\n        exit(EXIT_FAILURE);\r\n        return NULL;\r\n    }\r\n\r\n    queue-&gt;freeTail = queue-&gt;freeHead;\r\n\r\n    \/\/populate the free queue\r\n    for(i = 1; i &lt;= initial_cap; i++) {\r\n        temp = (threadpool_queue_node_t *) malloc(sizeof(threadpool_queue_node_t));\r\n        if(!temp) {\r\n            perror(\"Out of memory on malloc\\n\");\r\n            \/\/ TODO Properly free all memory\r\n            free(queue);\r\n            exit(EXIT_FAILURE);\r\n            return NULL;\r\n        }\r\n\r\n        temp-&gt;next = queue-&gt;freeHead;\r\n        temp-&gt;prev = NULL;\r\n        queue-&gt;freeHead-&gt;prev = temp;\r\n        queue-&gt;freeHead = temp;\r\n    }\r\n\r\n    return queue;\r\n}\r\n\r\nvoid _threadpool_addWorkOrder(threadpool_queue_head_t *queue, dispatch_func_t job_func, void *job_arg, dispatch_func_t cleanup_func, void *cleanup_arg) {\r\n    threadpool_queue_node_t *temp;\r\n\r\n    if(!queue-&gt;freeTail) {\r\n        temp = (threadpool_queue_node_t *) malloc(sizeof(threadpool_queue_node_t));\r\n        if(!temp) {\r\n            perror(\"Out of memory on malloc\\n\");\r\n            exit(2);\r\n        }\r\n\r\n        temp-&gt;next = NULL;\r\n        temp-&gt;prev = NULL;\r\n        queue-&gt;freeHead = temp;\r\n        queue-&gt;freeTail = temp;\r\n        queue-&gt;capacity_cur++;\r\n    }\r\n\r\n    temp = queue-&gt;freeTail;\r\n    if(!queue-&gt;freeTail-&gt;prev) {\r\n        queue-&gt;freeTail = NULL;\r\n        queue-&gt;freeHead = NULL;\r\n    } else {\r\n        queue-&gt;freeTail-&gt;prev-&gt;next = NULL;\r\n        queue-&gt;freeTail = queue-&gt;freeTail-&gt;prev;\r\n        queue-&gt;freeTail-&gt;next = NULL;\r\n    }\r\n\r\n    temp-&gt;job_func = job_func;\r\n    temp-&gt;job_arg = job_arg;\r\n    temp-&gt;cleanup_func = cleanup_func;\r\n    temp-&gt;cleanup_arg = cleanup_arg;\r\n\r\n    temp-&gt;prev = NULL;\r\n    if(!queue-&gt;head) {\r\n        queue-&gt;tail = temp;\r\n        queue-&gt;head = temp;\r\n    } else {\r\n        temp-&gt;next = queue-&gt;head;\r\n        queue-&gt;head-&gt;prev = temp;\r\n        queue-&gt;head = temp;\r\n    }\r\n}\r\n\r\nvoid _threadpool_getWorkOrder(threadpool_queue_head_t *queue, dispatch_func_t *job_func, void **job_arg, dispatch_func_t *cleanup_func, void **cleanup_arg) {\r\n    threadpool_queue_node_t *temp;\r\n\r\n    temp = queue-&gt;tail;\r\n    if(!temp) {\r\n        perror(\"Attempting to getWorkOrder from an empty queue.\\n\");\r\n        exit(2);\r\n    }\r\n\r\n    if(!queue-&gt;tail-&gt;prev) {\r\n        queue-&gt;tail = NULL;\r\n        queue-&gt;head = NULL;\r\n    } else {\r\n        queue-&gt;tail-&gt;prev-&gt;next = NULL;\r\n        queue-&gt;tail = queue-&gt;tail-&gt;prev;\r\n        queue-&gt;tail-&gt;next = NULL;\r\n    }\r\n\r\n    *job_func = temp-&gt;job_func;\r\n    *job_arg  = temp-&gt;job_arg;\r\n    *cleanup_func = temp-&gt;cleanup_func;\r\n    *cleanup_arg  = temp-&gt;cleanup_arg;\r\n\r\n    temp-&gt;next = NULL;\r\n    if(!queue-&gt;freeHead) {\r\n        queue-&gt;freeTail = temp;\r\n        queue-&gt;freeHead = temp;\r\n        temp-&gt;prev = NULL;\r\n    } else {\r\n        temp-&gt;next = queue-&gt;freeHead;\r\n        queue-&gt;freeHead-&gt;prev = temp;\r\n        queue-&gt;freeHead = temp;\r\n    }\r\n}\r\n<\/pre>\n<p>Die erste Funktion ist hierbei f\u00fcr die Initialisierung der Warteschlange f\u00fcr die Arbeitsauftr\u00e4ge zust\u00e4ndig und wird in der oben bereits erl\u00e4uterten Funktion threadpool_create aufgerufen, um den Speicher f\u00fcr die Queue zu initialisieren.<\/p>\n<p>Die Funktion darunter f\u00fcgt in unsere Warteschleife einen neuen Arbeitsauftrag ein. Da wir am Anfang nicht wissen, wieviele dies werden, wird die Queue ggf. dynamisch vergr\u00f6\u00dfert, jedoch nie \u00fcber ein verher festgelegtes Maximum, um einen \u00fcberm\u00e4\u00dfigen R\u00fcckstau zu vermeiden und das Ersch\u00f6pfen der Speicherressourcen zu vermeiden. Schlie\u00dflich erledigt die dritte Funktion das Abholen eines Arbeitsauftrags aus der Warteschlange. Auch hier ist der Quelltext ziemlich einfach zu verstehen.<\/p>\n<p>Wer sich nun fragt, warum keine der Funktionen irgendwas synchronisiert: Weil das nicht notwendig ist: Jede der Funktionen wirdd nur an Stellen aufgerufen, wo garantiert ist, dass immer nur exakt ein Thread Kontrolle \u00fcber die zu manipulierende Datenstruktur besitzt. Im Falle der ersten Funktion ist dies threadpool_create, bei der der Pointer auf diese Struktur als einzige Stelle im Programm bekannt ist; bei den anderen beiden Funktionen haben wir den gelockten Pool-Mutex, der den Zugriff auf die Warteschlange regelt. <\/p>\n<p>Somit ist die Verteilung der Tortenst\u00fcckchen im Thread Pool eindeutig gekl\u00e4rt und jeder teilnehmende Thread ist gl\u00fccklich \ud83d\ude09<\/p>\n<p>Vielleicht noch nicht ganz, denn einen Einzeiler haben wir noch nicht behandelt ;-):<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\nint _threadpool_canAcceptWork(threadpool_queue_head_t *queue)\r\n{\r\n    return (queue-&gt;freeTail != NULL) ||\r\n        (queue-&gt;capacity_cur &lt;= queue-&gt;capacity_max);\r\n}\r\n<\/pre>\n<p>Diese Funktion wird von der Funktion zum Hinzuf\u00fcgen von Arbeitsauftr\u00e4gen aufgerufen und pr\u00fcft, ob wir noch Platz haben. Der Ablauf ist dabei relativ einfach: Sind freigegebene Eintr\u00e4ge von ehemaligen Arbeitsauftr\u00e4gen vorhanden oder sind wir noch unter unserer Maximalkapazit\u00e4t steht dem Hinzuf\u00fcgen weiterer Arbeit nichts im Wege.<\/p>\n<p>Nachdem wir nun das Abarbeiten von Arbeitsauftr\u00e4gen gekl\u00e4rt haben, k\u00f6nnen wir uns dem Anzetteln von Arbeit zuwenden. Dies geschieht \u00fcber die folgenden beiden Funktionen:<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n\/*----------------------------------------------------------------------\r\n * Dispatch a thread\r\n *\/\r\nvoid threadpool_dispatch(threadpool_t *pool, dispatch_func_t func, void *arg) {\r\n    threadpool_dispatch_cleanup(pool, func, arg, NULL, NULL);\r\n}\r\n\r\nvoid threadpool_dispatch_cleanup(threadpool_t *pool, dispatch_func_t job_func, void *job_arg,\r\n    dispatch_func_t cleanup_func, void* cleanup_arg) {\r\n\r\n    if(pool == (threadpool_t *) job_arg) {\r\n        return;\r\n    }\r\n\r\n    pthread_cleanup_push((dispatch_func_t)pthread_mutex_unlock, (void *) &amp;pool-&gt;mutex);\r\n\r\n    \/\/ Grab the mutex\r\n    if (0 != pthread_mutex_lock(&amp;pool-&gt;mutex)) {\r\n        perror(\"Mutex lock failed (!!):\");\r\n        exit(-1);\r\n    }\r\n\r\n    while(!_threadpool_canAcceptWork(pool-&gt;queue)) {\r\n        pthread_cond_signal(&amp;pool-&gt;job_posted);\r\n        pthread_cond_wait(&amp;pool-&gt;job_taken, &amp;pool-&gt;mutex);\r\n    }\r\n\r\n    \/\/ Finally, there's room to post a job. Do so and signal workers.\r\n    _threadpool_addWorkOrder(pool-&gt;queue, job_func, job_arg, cleanup_func, cleanup_arg);\r\n\r\n    pthread_cond_signal(&amp;pool-&gt;job_posted);\r\n\r\n    \/\/ Yield mutex so a worker can pick up the job\r\n    if (0 != pthread_mutex_unlock(&amp;pool-&gt;mutex)) {\r\n        perror(\"\\n\\nMutex unlock failed!:\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n\r\n    pthread_cleanup_pop(1);\r\n}\r\n<\/pre>\n<p>Die erste dieser beiden Funktionen ist sehr schnell erkl\u00e4rt: Sie betrachtet das Hinzuf\u00fcgen eines einfachen Jobs ohne hinterher aufr\u00e4umen einfach als den Fall, bei dem zum Aufr\u00e4umen nichts zu tun ist. Einfach, nicht? \ud83d\ude1b<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\nvoid threadpool_dispatch_cleanup(threadpool_t *pool, dispatch_func_t job_func, void *job_arg,\r\n    dispatch_func_t cleanup_func, void* cleanup_arg) {\r\n\r\n    if(pool == (threadpool_t *) job_arg) {\r\n        return;\r\n    }\r\n<\/pre>\n<p>Um einen Arbeitsauftrag hinzuzuf\u00fcgen pr\u00fcfen wir zuerst, ob dieser \u00fcberhaupt eine auszuf\u00fchrende Routine beinhaltet. Wenn nicht, ignorieren wir den Auftrag einfach.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    pthread_cleanup_push((dispatch_func_t)pthread_mutex_unlock, (void *) &amp;pool-&gt;mutex);\r\n\r\n    \/\/ Grab the mutex\r\n    if (0 != pthread_mutex_lock(&amp;pool-&gt;mutex)) {\r\n        perror(\"Mutex lock failed (!!):\");\r\n        exit(-1);\r\n    }\r\n<\/pre>\n<p>Diese Zeilen d\u00fcrften bekannt vorkommen: Wir geben bescheid, dass wir im Fall eines Abbruchs etwas aufr\u00e4umen m\u00fcssen und holen uns den Pool Mutex, um mit dem Thread Pool arbeiten zu k\u00f6nnen.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    while(!_threadpool_canAcceptWork(pool-&gt;queue)) {\r\n        pthread_cond_signal(&amp;pool-&gt;job_posted);\r\n        pthread_cond_wait(&amp;pool-&gt;job_taken, &amp;pool-&gt;mutex);\r\n    }\r\n<\/pre>\n<p>Wobei wir zuerst einmal um das Wohlergehen unserer Arbeiter besorgt sind und \u00fcberpr\u00fcfen, ob diese mit der Arbeitslast \u00fcberhaupt zurecht kommen. Wenn zu viel Arbeit ansteht, warten wir einfach, bis zumindest ein Teil erledigt wurde, so dass in der Warteschlange wieder Platz frei ist.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    \/\/ Finally, there's room to post a job. Do so and signal workers.\r\n    _threadpool_addWorkOrder(pool-&gt;queue, job_func, job_arg, cleanup_func, cleanup_arg);\r\n\r\n    pthread_cond_signal(&amp;pool-&gt;job_posted);\r\n<\/pre>\n<p>Und sobald dann Platz ist, f\u00fcgen wir den Arbeitsauftrag einfach ein und geben bescheid, dass es wieder Arbeit gibt.. Ganz einfach! \ud83d\ude42<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    \/\/ Yield mutex so a worker can pick up the job\r\n    if (0 != pthread_mutex_unlock(&amp;pool-&gt;mutex)) {\r\n        perror(\"\\n\\nMutex unlock failed!:\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n\r\n    pthread_cleanup_pop(1);\r\n}\r\n<\/pre>\n<p>Abschlie\u00dfend brauchen wir nur noch den Mutex auf den Thread Pool freigeben, damit andere wieder damit arbeiten k\u00f6nnen und in unerem Thread das Aufr\u00e4umen als Hinf\u00e4llig zu kennzeichnen. Auch hier also wenig Spannendes.<\/p>\n<p>Und zu guter Letzt, und nicht nur, weil wir ordentliche Menschen sind, m\u00fcssen wir irgendwann auch einmal aufr\u00e4umen. Dies wird mit der letzten unserer Funktionen realisiert:<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n\/*----------------------------------------------------------------------\r\n * Destroy a thread pool.  If there is a job still waiting for a thread\r\n *   to execute it, tough.  We set the ALL_EXIT flag anyways.\r\n *\/\r\nvoid threadpool_destroy(threadpool_t *pool)\r\n{\r\n    int oldtype;\r\n\r\n    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &amp;oldtype);\r\n    pthread_cleanup_push((dispatch_func_t)pthread_mutex_unlock, (void *) &amp;pool-&gt;mutex);\r\n\r\n    \/\/ Cause all threads to exit. Because they were detached when created,\r\n    \/\/   the underlying memory for each is automatically reclaimed.\r\n\r\n    \/\/ Grab the mutex\r\n    if (0 != pthread_mutex_lock(&amp;pool-&gt;mutex)) {\r\n        perror(\"Mutex lock failed (!!):\");\r\n        exit(-1);\r\n    }\r\n\r\n    pool-&gt;state = ALL_EXIT;\r\n\r\n    while (pool-&gt;live &gt; 0) {\r\n        \/\/ get workers to check in ...\r\n        pthread_cond_signal(&amp;pool-&gt;job_posted);\r\n\r\n        \/\/ ... and wake up when they check out.\r\n        pthread_cond_wait(&amp;pool-&gt;job_taken, &amp;pool-&gt;mutex);\r\n    }\r\n\r\n    \/\/ Null-out entries in pool's thread array; free array.\r\n    memset(pool-&gt;array, 0, pool-&gt;size * sizeof(pthread_t));\r\n    free(pool-&gt;array);\r\n\r\n    \/\/ Destroy the mutex and condition variables in the pool.\r\n    pthread_cleanup_pop(0);\r\n    if (0 != pthread_mutex_unlock(&amp;pool-&gt;mutex)) {\r\n        perror(\"\\n\\nMutex unlock failed!:\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n\r\n    if (0 != pthread_mutex_destroy(&amp;pool-&gt;mutex)) {\r\n        perror(\"\\nMutex destruction failed!:\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n\r\n    if (0 != pthread_cond_destroy(&amp;pool-&gt;job_posted)) {\r\n        perror(\"\\nCondition Variable 'job_posted' destruction failed!:\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n\r\n    if (0 != pthread_cond_destroy(&amp;pool-&gt;job_taken)) {\r\n        perror(\"\\nCondition Variable 'job_taken' destruction failed!:\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n\r\n    \/\/ Zero out all bytes of the pool\r\n    memset(pool, 0, sizeof(threadpool_t));\r\n\r\n    \/\/ Free the pool and null out the pointer to it\r\n    free(pool);\r\n    pool = NULL;\r\n}\r\n<\/pre>\n<p>Auch an dieser Stelle ist eine kurze Erl\u00e4uterung vielleicht recht hilfreich, auch wenn vieles recht einfach zu \u00fcberblicken ist.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n\/*----------------------------------------------------------------------\r\n * Destroy a thread pool.  If there is a job still waiting for a thread\r\n *   to execute it, tough.  We set the ALL_EXIT flag anyways.\r\n *\/\r\nvoid threadpool_destroy(threadpool_t *pool)\r\n{\r\n    int oldtype;\r\n\r\n    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &amp;oldtype);\r\n    pthread_cleanup_push((dispatch_func_t)pthread_mutex_unlock, (void *) &amp;pool-&gt;mutex);\r\n<\/pre>\n<p>Hier wird zum Einen mitgeteilt, dasswir nur zu definierten Zeitpunkten abgebrochen werden m\u00f6chten und teilen zudem mit, dass wir beim Aufr\u00e4umen den Mutex des Thread Pools freigeben m\u00f6chten.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    \/\/ Cause all threads to exit. Because they were detached when created,\r\n    \/\/   the underlying memory for each is automatically reclaimed.\r\n\r\n    \/\/ Grab the mutex\r\n    if (0 != pthread_mutex_lock(&amp;pool-&gt;mutex)) {\r\n        perror(\"Mutex lock failed (!!):\");\r\n        exit(-1);\r\n    }\r\n<\/pre>\n<p>Zu diesem Abschnitt spare ich mir einen dedizierten Kommentar, da wir das schon mehrfach hatten.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    pool-&gt;state = ALL_EXIT;\r\n\r\n    while (pool-&gt;live &gt; 0) {\r\n        \/\/ get workers to check in ...\r\n        pthread_cond_signal(&amp;pool-&gt;job_posted);\r\n\r\n        \/\/ ... and wake up when they check out.\r\n        pthread_cond_wait(&amp;pool-&gt;job_taken, &amp;pool-&gt;mutex);\r\n    }\r\n<\/pre>\n<p>Und die eigentliche Magie: Das Beenden aller Threads im Thread Pool. Wie bereits oben beim Bearbeiten eines Arbeitsauftrages gesehen, hat jeder Thread Pool ein Flag, welches angibt, ob der Thread Pool noch Aktiv ist, oder beendet werden soll. Dieses wird als erstes auf &#8222;Bitte beenden&#8220; gesetzt. Anschlie\u00dfend sagen wir solange, wie noch Threads am Leben sind einfach, bescheid, dass sich ein Thread beenden soll.<\/p>\n<pre lang=\"c\" escaped=\"true\">\r\n    \/\/ Null-out entries in pool's thread array; free array.\r\n    memset(pool-&gt;array, 0, pool-&gt;size * sizeof(pthread_t));\r\n    free(pool-&gt;array);\r\n\r\n    \/\/ Destroy the mutex and condition variables in the pool.\r\n    pthread_cleanup_pop(0);\r\n    if (0 != pthread_mutex_unlock(&amp;pool-&gt;mutex)) {\r\n        perror(\"\\n\\nMutex unlock failed!:\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n\r\n    if (0 != pthread_mutex_destroy(&amp;pool-&gt;mutex)) {\r\n        perror(\"\\nMutex destruction failed!:\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n\r\n    if (0 != pthread_cond_destroy(&amp;pool-&gt;job_posted)) {\r\n        perror(\"\\nCondition Variable 'job_posted' destruction failed!:\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n\r\n    if (0 != pthread_cond_destroy(&amp;pool-&gt;job_taken)) {\r\n        perror(\"\\nCondition Variable 'job_taken' destruction failed!:\");\r\n        exit(EXIT_FAILURE);\r\n    }\r\n\r\n    \/\/ Zero out all bytes of the pool\r\n    memset(pool, 0, sizeof(threadpool_t));\r\n\r\n    \/\/ Free the pool and null out the pointer to it\r\n    free(pool);\r\n    pool = NULL;\r\n}\r\n<\/pre>\n<p>Anschlie\u00dfend geben wir noch die ben\u00f6tigten Ressourcen frei und sorgen dabei daf\u00fcr, dass diese sicher aus dem Speicher gel\u00f6scht werden, um sicher zu gehen, dass keiner auf nicht mehr vorhandene Daten zugreift.<\/p>\n<p>Und damit w\u00e4ren wir auch bereits am Ende: Mit nur diesen wenigen Code-Zeilen k\u00f6nnen wir bequem unsere Arbeit auf mehrere Prozessorkerne verteilen, ohne uns allzu viele Gedanken \u00fcber das Wie zu machen, vorausgesetzt, die Aufgaben k\u00f6nnen unabh\u00e4ngig voneinander ausgef\u00fchrt werden. Wichtig ist zudem auch, dass Aufgaben, die sich Gegenseitig blockieren k\u00f6nnen zur Vermeidung von Deadlocks <strong>nicht<\/strong> im gleichen Thread Pool abgearbeitet werden sollten. Solange nur eine einseitige Abh\u00e4ngigkeit besteht ist dies zwar in der Regel unkritisch, sollte aber dennoch vermieden werden, da der blockierte Thread im Thread Pool belegt wird, ohne dass dieser Arbeit erledigen kann und damit die Leistungsf\u00e4higkeit herabgesetzt wird.<\/p>\n<p>Ansonsten w\u00fcnsch ich mit der hier vorgestellten Implementierung viel Spa\u00df, m\u00f6chte aber vor eventuell auftretenden Fehlern warnen, da der Source zwar an sich Schl\u00fcssig ist, im Detail aber noch kleinere Stolpersteine enthalten kann, die sich wie eigentlich alle Thread-bezogenen Probleme wahrscheinlich erst im Produktiv-Betrieb \u00e4u\u00dfern werden. Daher gilt wie immer D. E. Knuth: Beware of bugs in the above code; I have only proved it correct, not tried it.<\/p>\n<p class=\"wp-flattr-button\"><a href=\"https:\/\/blog.benny-baumann.de\/?flattrss_redirect&amp;id=1109&amp;md5=8c0fe8bd8957434b18beee61bb24d5a1\" title=\"Flattr\" target=\"_blank\"><img src=\"http:\/\/blog.benny-baumann.de\/wp-content\/plugins\/flattr\/img\/flattr-badge-large.png\" srcset=\"http:\/\/blog.benny-baumann.de\/wp-content\/plugins\/flattr\/img\/flattr-badge-large.png\" alt=\"Flattr this!\"\/><\/a><\/p>","protected":false},"excerpt":{"rendered":"<p>Neuere Prozessoren bieten immer mehr Leistung durch immer mehr parallele Kerne bei aber seit l\u00e4ngerem nahezu gleich gebliebener Taktrate. Somit bleibt einem ohne Anpassung seiner Programme diese zus\u00e4tzliche Leistung verwehrt. Nur in dem man sein Programm in mehrere Teile spaltet, die parallel ablaufen k\u00f6nnen, kann man sein Programm auch auf heutigen Prozessoren in optimaler Geschwindigkeit [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"ngg_post_thumbnail":0,"footnotes":""},"categories":[29],"tags":[323,98,322],"class_list":["post-1109","post","type-post","status-publish","format-standard","hentry","category-software","tag-c","tag-developement","tag-threads"],"_links":{"self":[{"href":"https:\/\/blog.benny-baumann.de\/index.php?rest_route=\/wp\/v2\/posts\/1109","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/blog.benny-baumann.de\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/blog.benny-baumann.de\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/blog.benny-baumann.de\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/blog.benny-baumann.de\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=1109"}],"version-history":[{"count":14,"href":"https:\/\/blog.benny-baumann.de\/index.php?rest_route=\/wp\/v2\/posts\/1109\/revisions"}],"predecessor-version":[{"id":1130,"href":"https:\/\/blog.benny-baumann.de\/index.php?rest_route=\/wp\/v2\/posts\/1109\/revisions\/1130"}],"wp:attachment":[{"href":"https:\/\/blog.benny-baumann.de\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=1109"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/blog.benny-baumann.de\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=1109"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/blog.benny-baumann.de\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=1109"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}