BenBE's humble thoughts Thoughts the world doesn't need yet …

26.08.2012

Threads und Fibers

Filed under: Software — Schlagwörter: , , , — BenBE @ 01:40:34

Für ein Projekt, welches ich bereits seit etwas längerer Zeit vorbereite, benötige ich eine sehr flexible IO-Schicht, mit der ich eine Reihe verschiedener Tätigkeiten wie IO und anderer Events möglichst flexibel parallelisieren kann. Nun gibt es für solche Aufgaben zwar üblicherweise Threads, aber da die Aufgaben zum einen sehr kurzweilig sind, andererseits aber unter gewissen Umständen blockieren können, funktioniert der Ansatz über Thread Pools nur bedingt. Eine vollständige asynchrone Bearbeitung der Ereignisse scheided auf Grund der Komplexität aber auch aus, da das System leicht erweiterbar bleiben muss. Was also benötigt wurde, ist ein Mittelweg aus beiden Ansätzen.

Ein Ansatz für einen solchen Mittelweg bieten Fibers, die analog zu POSIX Threads dem Programm erlauben, mehrere Ausführungsstränge zu erzeugen und damit die Abläufe in der Anwendung zu parallelisieren. Fibers fungieren dabei vollständig im User Mode und sind dadurch gegenüber PThreads oder gar geforkten Prozessen wesentlich leichtgewichtiger beim Wechseln des Zustands.

Und genau diese Leichtgewichtigkeit wollte ich in meinem Projekt gerne nutzen: Jede Aufgabe soll als ein eigener Ausführungsstrang betrachtet werden, wobei spezifische Funktionen, die blockieren können, leicht in einen separaten Ausführungsstrang verlegt werden können sollten. Sollte hierbei festgestellt werden, dass für die Ausführung einer Aufgabe ggf. blockiert werden muss, so soll tas Blockieren der anfragenden Aufgabe nicht gleichzeitig den ausführenden Thread des Thread-Pools belegen, sondern stattdessen auf eine andere Aufgabe wechseln, die in der Zwischenzeit ausgeführt werden kann.

Ein kleines Beispiel hierzu: Wenn man in einer Netzwerk-Anwendung für die Behandlung von IO Rückfragen ausführen muss, muss man ein Event, welches man gerade frisch hereinbekommen hat, kurzzeitig beiseite legen. Gleichzeitig möchte man aber auch die Menge der benötigten Resourcen (z.B. geöffnete Threads) möglichst gering halten. Dies kann z.B. im Falle einer DNS-Anfrage der Fall sein, die für die Entscheidung, ob ein Client erlaubt wird, oder nicht, nötig ist. Würde man die Bearbeitung an dieser Stelle synchron durchführen, müsste die Verarbeitung weiterer Events ggf. warten, bis die DNS-Anfrage durch ist. Würde man die Abfrage asynchron durchführen, würde dies den Code für die Behandlung des Abfrage-Ergebnisses im Programm stark verstreuen. Also warum nicht so etwas?

bool Client::handleAccept() {
    string hostname = DNS::getHostNameByIP(this->ip);
    return "localhost" == hostname;
}

string DNS::getHostNameByIP(const IP& ip) {
    DNSQuery query(DNS::RecordType::PTR, ip);
    Scheduler::AddTask(DNS::internalResolve, query);
    Scheduler::yield();
    return query->success ? query->toString() : "";
}

Nun bringt es reichlich wenig, wenn man jede Teilaufgabe einfach nur so verzögern würde, wenn man jedoch die Zeit zwischen Stellen der Anfrage und deren Ausführung nutzt, um diese noch anzupassen (für einen Hostnamen wird zuerst ein A Record angefragt, ein anderer Task brauch aber auch den passenden AAAA und MX), so kann dies enorm die Effizienz steigern, ohne dass der Code unübersichtlicher wird.

Außerdem kann man sich auf diese Weise sehr elegant die eigentlich im Hintergrund stattfindende IO vom Hals halten, diese aber dennoch leicht bündeln, ohne sich an jeder Stelle mit der parallelen Ausführung befassen zu müssen. Hat man beispielsweise einen 1-zu-n-Filter, so lässt sich dieser einmalig anstoßen, und durch einfaches Synchronisieren auf alle Events auf dessen Abschluss warten:

bool Client::checkPermissions() {
    for(PermissionCheck c: permissionchecks) {
        Scheduler::WaitFor(Scheduler::AddTask(c->perform, this));
    }
    Scheduler::Yield();
    for(PermissionCheck c: permissionchecks) {
        if(!c->success) return false;
    }
    return true;
}

Durch diese Art der kooperativen Aufgabenverteilung ist es möglich, Aufgaben parallel zu stellen, die ansonsten seriel abgearbeitet werden würden. Durch diese Parallelisierung können lange Wartezeiten, die bspw. durch Netzwerk-Latenzen verursacht werden, abgefedert werden, ohne dass man sich um diese Details kümmern muss.

Um genau diesen Ansatz einmal zu testen, ist nun folgende Testimplementierung des Ansatzes entstanden (dezeit noch ohne Multithreading). Die Erweiterung um Multithreading bei der Abarbeitung der Tasks kann hierbei z.B. mittels eines Thread-Pools oder anderen Mechanismen realisiert werden. Aber hier einmal der (blanke) Proof-of-Concept:

#include <list>

#include <stdio.h>

#include <ucontext.h>

ucontext_t ctx_default;

class Fiber {

    static const int stack_size = 8192;

public:

    typedef void (* fiber_proc_t)(Fiber& fiber);

    Fiber(fiber_proc_t proc): active(true) {
        this->proc = proc;

        ctx = new ucontext_t();
        getcontext(ctx);
        ctx->uc_stack.ss_size = Fiber::stack_size;
        ctx->uc_stack.ss_sp = new char[Fiber::stack_size];
        ctx->uc_link = &ctx_default;
        makecontext(ctx, reinterpret_cast<void (*)()>(Fiber::runInternal), 1, this);
    }

    virtual ~Fiber() {
        delete[] (char *)ctx->uc_stack.ss_sp;
        delete ctx;
        ctx = 0;
    }

    bool active;

    static std::list<Fiber *> fibers;

private:
    ucontext_t *ctx;
    fiber_proc_t proc;

    static void runInternal(Fiber *f) {
        if(f) f->run();
    }

    void run() {
        if(proc) {
            proc(*this);
        }

        active = false;
    }

public:
    void yield() {
        swapcontext( this->ctx, &ctx_default );
    }

    friend void mainfiber(void);
};

std::list<Fiber *> Fiber::fibers;

void test1(Fiber& fiber);
void test2(Fiber& fiber);

void test1(Fiber& fiber) {
    printf("t1: start: %p\n", &fiber);
    Fiber *sub = new Fiber(test2);
    Fiber::fibers.push_back(sub);
    fiber.yield();
    printf("t1: mid:   %p\n", &fiber);
    fiber.yield();
    printf("t1: stop:  %p\n", &fiber);
}

void test2(Fiber& fiber) {
    printf("t2: start: %p\n", &fiber);
    fiber.yield();
    printf("t2: mid:   %p\n", &fiber);
    fiber.yield();
    printf("t2: stop:  %p\n", &fiber);
}

void mainfiber(void) {
    while(!Fiber::fibers.empty()) {
        Fiber *f = Fiber::fibers.front();
        Fiber::fibers.pop_front();
        if(!f->active) {
            delete f;
            continue;
        }
        Fiber::fibers.push_back(f);

        swapcontext(&ctx_default, f->ctx);
    }
}

int main (void) {
    Fiber *f;

    f = new Fiber(test1);
    Fiber::fibers.push_back(f);

    f = new Fiber(test1);
    Fiber::fibers.push_back(f);

    f = new Fiber(test1);
    Fiber::fibers.push_back(f);

    mainfiber();

    return 0;
}

Wer sich nun wundert, wo hier die Behandlung von Abhängigkeiten, Warten auf Events und ähnliches ist, der sei beruhigt: Die Sachen fehlen im PoC, weil sie für den einfachsten Fall unabhängiger Aufgaben nicht benötigt werden. Das Scheduling wird übrigens von mainfiber realisiert, test1 und test2 sind zwei Aufgaben, die u.U. lange IO-Events haben könnten. Dieser Code-Stand ist zwar bei weitem nicht optimal und bei weitem noch nicht vollständig, da aber der grundlegende Ansatz bereits funktioniert, lässt sich damit gut weiterarbeiten.

Bliebe jetzt nur noch die Realisierung des Schedulers und der Event-Verarbeitung für IO und Timing übrig. Aber dafür hab ich auch schon Ansätze 😉 Auch wenn jetzt nicht alles klar geworden sein sollte, kann man sich zumindest folgendes im Hinterkopf behalten: Mit Threads kann man parallelisieren, mit Fibers kann man Warten inerhalb von Threads mit sinnvollen Aufgaben überbrücken.

P.S.: Beim vorliegenden Programm beschwert sich Valgrind über den wandernden Stack. Nunja, das Programm ist trotzdem korrekt und ohne Speicherlöcher ;-P

Flattr this!

Keine Kommentare »

No comments yet.

RSS feed for comments on this post. TrackBack URL

Leave a comment

Powered by WordPress