2016. december 3., szombat

Multithreading

      A thread (folyamatszál, eseményszál), a folyamat részét képezi. A folyamat az alkalmazás futó példánya, mely több szálra szakadhat. A folyamat indulásakor először mindig a fő eseményszál indul el, majd abból kisebb mellékszálak ágazhatnak ki és haladhatnak a főszállal párhuzamosan. Eleinte csak folyamat létezett, azaz egyszerre csak egy eseménnyel tudott a processzor foglalkozni. Minden feladat (task), amit a processzor el kellett végezzen külön folyamatot jelentett. Hogy a feladatokat ne szerre, hanem egyszerre végezze a processzor, bevezették a multitasking módszert, amely abból állt, hogy a futó folyamatok ideiglenesen megállnak felszabadítván az erőforrásokat más folyamatok számára, majd idővel visszatérnek és befejezik a feladatot. Mindez nagyon gyorsran és sűrűn van ütemezve és úgy tűnik, hogy párhuzamosan dolgoznak a folyamatok, pedig váltogatják egymást. Gyakorlatilag csak akkor futhat egyszerre több folyamat, ha a számítógép több processzort tartalmaz. Hogy egy folyamat mennyi ideig használhatja a processzort és a memóriát, az az ütemező algoritmustól függ, amit többnyire a platform alapján választanak ki. A multitasking jelentősen javította a számítógépek hatékonyságát, azonban a bonyolult programoknál szükségessé vált elválasztani az adatgyűjtő, adatfeldolgozó és eredményszolgáltató folyamatokat. Ettől fogva hatékonyabb egytüttműködésre volt szükség a folyamatok között az adatok kommunikálása szempontjából, amit csak úgy lehetett megvalósítani ha a teljes memóriaterületen osztozhattak. Az ilyen típusú folyamatokat nevezik szálaknak vagy thread-eknek. A szálak tehát olyan folyamatok melyek egyazon memória kontextusban dolgoznak megosztván erőforrásaikat a fő folyamattal. A folyamatok nem osztanak meg semmit, csupán felszabadítanak és újból lefoglalnak. A szálakat is ütemezők kapcsolgatják, viszont az átkapcsoláshoz nem tartozik hozzá a memória kontextus átkapcsolása is. Ez azt jelenti, hogy a szálaknak közös változóik, adatstruktúráik lehetnek, melyekkel kommunikálhatnak egymással. Az ütemezés preemptív, azaz a szál vagy folyamat futási jogát az ütemező (operációs rendszer) bármikor felülbírálhatja. Ha egy felhasználói szinten futó szál váratlanul blokkolódik, akkor az ütemező az egész folyamatot blokkolja. Kivételes eset, amikor az adott szálnak van belső ütemezője (akár egy kernel szintű szálnak), ilyenkor átadja a vezérlést a folyamaton belül egy másik szálnak. A preemptív ellentéte a kooperatív ütemezés, amelyben a folyamatok futása nem áll meg, míg véget nem ér vagy önként le nem mond. Ennek hátránya, hogy egy folyamat teljesen lefoglalhatja a processzort ami bezavarhat az operációs rendszer működésébe is. Az ilyen folyamatokat fiber-nek nevezik és a szálaktól csupán az ütemezés típusában különböznek.

      A szálak standardként a C++11-ben jelentek meg először, előtte a Boost, Poco vagy egyéb könyvtárat illetve a WinAPI vagy a POSIX függvényeket lehetett használni szálak létrehozására. Éppen ezért a korábbi C++ verziókkal fordított forráskódok (amelyeket pl. a Visual Studio 6, 2005 vagy 2008-ban írtak) többnyire platform függőek. A C++11 programozási nyelv olyan platformfüggetlen interfészt nyújt, amely garantálja a RAII elvet (például megállítja az ott felejtett szálakat) és a függvényobjektumokat használ függvénymutatók helyett.

      Minden olyan alkalmazás amely egynél több szálat használ, többszálas (Multithread) alkalmazásnak tekinthető. Például amikor egy szervernek több kapcsolatot kell fenntartani, akkor egyszerűbb ha minden kapcsolathoz külön szálat rendelünk így mindenik szál új socket-et hoz létre a hozzá tartozó kapcsolatnak. Egy másik lehetséges felasználás a GUI alkalmazásokban lehet. Sok esetben egy folyamat feladata, hogy várja a felhasználótól az adatbevitelt. Ilyenkor nem szabad a többi folyamatnak megállnia és várnia a felhasználóra. Ha nem túl bonyolultak a folyamatok, akkor érdemes egy folyamattá süríteni és azt szálakra bontani. Ezek a szálak kommunikálnak egymással és folyamatosan firssen tartják a felhasználói felület státuszát. Mivel a kommunikációt a közös memóriaterület elérése révén végzik szükség van némi erőforrás-védő mechanizmusra (mutexek, szemaforok, várakozási feltételek stb.), hogy véletlenül se próbálja meg két szál ugyanazt a területet egyszerre használni. A folyamatok (melyek nem osztoznak semmiben) is tudnak egymás között kommunikálni, ám az sokkal bonyolultabb (pipe-ok, fájlok, socket-ek használatát igénylik). Ennek is megvan a maga előnye a szálakhoz képest, mert működhetnek fizikailag különálló gépeken. Ilyen osztott program az NFS szerver, az FTP kliens-szerver, a Telnet, a chat-programok és a böngészők.

      A folyamatok nem tudnak más folyamatokat létrehozni, mégcsak meg sem tudják szakítani saját magukat. Az a folyamat, amely meghívja a fork() függvényt csupán egy másolatot készít önmagáról, melynek saját változói és PID azonosítója (Process ID) lesz. Ezt az ütemező is külön kezeli és szinte függetlenül kezeli a szülő folyamattól. Ezzel szemben, amikor a folyamat egy thread-et készít, akkor bár annak is meglesz a saját stack-je (helyi változói), de a globális változók, a fájlleírók, a jelkezelők és a könyvtárállapot közös marad a szülőfolyamattal.

WinAPI szálak (windows.h)

      Minden folyamat rendelkezik egy fő szállal (main thread). A következő programban a főszál feladata lesz, hogy várja a billentyűzetről érkező adatot és írja ki a counter változót, ha a bemenet különbözik a „q” karaktertől. Ebből leágazik egy másik szál, amely folyamatosan növeli a counter változót a beolvasástól függetlenül. A szál létrehozásáért a CreateThread függvény felelős, melynek a következő paraméterei vannak:
  1. lpThreadAttributes (típusa LPSECURITY_ATTRIBUTES): opcionális paraméter, amely egy mutató amivel megadható, hogy a szál által visszatérített handle örökölhető legyen-e a gyermekfolyamatok számára (amiket a CreateProcess függvénnyel hozhatunk létre). A NULL mutató nem örökölhető handle-t jelent.
  2. dwStackSize (típusa SIZE_T): a szál stack-jének kezdeti méretét lehet itt megadni. Ha nem pontos, akkor a rendszer kerekíti, de ha zéró, akkor a szál a folyamat méretét kapja.
  3. lpStartAddress (típusa LPTHREAD_START_ROUTINE): itt lehet megadni azt a mutatót, amely a szál által futtatni kívánt függény címére mutat (függvénymutató). Ez a szál kezdő címe, ahol egy speciális felépítésű függvény található: DWORD WINAPI myThread(LPVOID lpParameter);
  4. lpParameter (típusa LPVOID): ez egy olyan opcionális bemenő paraméter, amit a myThread függvény módosíthat.
  5. dwCreationFlags (típusa DWORD): egy flag, ami jelzi, hogy a szál elkészült.
  6. lpThreadId (típusa LPDWORD): kimenő paraméter, amelybe a szál azonosítója kerül.

Ezek az adattípusok WinAPI típusok és tulajdonképpen típusdefiníciók vagy álnevek különböző struktúrákra vagy standard adattípusokra. Például az LPVOID a WinDef.h-ban van definiálva mint typedef void *LPVOID.

#include <windows.h>
#include <iostream>
using std::cout;
using std::endl;

DWORD WINAPI myThread(LPVOID lpParameter)
{
     int& counter = *((int*)lpParameter);    // cast az eredeti int típusra
     while (counter < (size_t)(INT_MAX))     // 2.147.483.647-ig számol
           ++counter;
     return 0;
}

int main()
{
     int myCounter = 0; // a szál ezt fogja növelni amikor elindul
     char myChar = ' '; // kezdeti karakter
     DWORD ID;          // thread ID
                        //          1     2    3           4      5    6
     HANDLE myHandle = CreateThread(NULL, 0, myThread, &myCounter, 0, &ID);

     while (myChar != 'q') {
           cout << myCounter << endl;
           myChar = getchar();
     }

     CloseHandle(myHandle);
     return 0;
}

A kimenet:
0

901616379

1409959160

2057832213

2147483647

2147483647
q

A szál csak addig él, míg a myThread függvény vissza nem tér, azaz amíg a counter el nem ér az int maximális értékéig. A program minden billentyűnyomáskor kiírja a myCounter aktuális értékét, és ha a lenyomott billentyű a „q” karakter, akkor az ismétlő ciklus véget ér és elindul a CloseHandle függvény. Ennek paramétere a CreateThread handle-je, ami egyedi akár a létrehozott szál ID-ja, csakhogy ha nem sikerül létrehozni a szálat, akkor a handle az, ami biztosan nulla lesz.
      A CRT (C Run-Time) függvények nagy része remekül működik a CreateThread függvénnyel létrehozott szálakban, azonban vannak olyan függvények, melyek memóriavesztést produkálnak a szál bezárásakor. Ilyen például sz strlen() vagy a signal() függvény, ami nem készteti a szálat a CRT inicializálására, így a CRT függvény által használt memória nem szabadul majd fel amikor a szál bezár (70-80 Byte memóriáról van szó minden záráskor). Hogy ez ne történjen meg, érdemes a _beginthread vagy a _beginthreadex függvényt használni. Ezek első sorban a pramétereik számában különböznek:
- _beginthread(start_address, stack_size, arglist)
- _beginthreadex(security, stack_size, start_address, arglist, initflag, thrdaddr)
A paraméterek a következők:
  • start_address (típusa void*): annak a függvénynek a címe, ahonnan a szál indul.
  • stack_size (típusa unsigned): a szál stack-jének kezdeti mérete.
  • arglist (típusa void*): a szálnak átadott argumentumok listája.
  • security (típusa void*): a szál által visszatérített handle örökölhető legyen-e a gyermekfolyamatok számára. (NULL = nem örökölhető)
  • initflag (típusa unsigned): a szál kezdeti státusza (azonnal induljon, legyen felfüggesztve stb.). Felfüggesztett státuszban létrehozott szál a ResumeThread függvénnyel indítható be.
  • thraddr (típusa unsigned*): a szál azonosítója kerül ebbe a változóba.

A _beginthread és a _beginthreadex még abban is különbözik, hogy míg a _beginthread() bezárja a handle-jét miután véget ér a szál, addig a _beginthreadex()-nek szüksége van a CloseHandle()-re a handle bezárásához, akár a CreateThread esetén.

A két függvény a szálat képviselő függvények hívásában is különbözik: a  _beginthread() a _cdecl (natív, operációs rendszertől függő, alapértelmezett) vagy a _clrcall (menedzselt, operációs rendszertől független, virtuális függvény) híváskonvencióval hívja meg a start_address címen lévő függvényt, a _beginthreadex() pedig az _stdcall (natív, operációs rendszertől függő, Win32 API függvény) vagy a _clrcall konvenciókat használja.

A következő program háromféleképp alkot szálakat: CreateThread, _beginthreadex és _beginthread.

#include <Windows.h>
#include <process.h>
#include <iostream>
using std::cout;
using std::endl;

DWORD WINAPI mythreadA(LPVOID lpParameter)
{
      cout << "CreateThread ID: " <<  GetCurrentThreadId() << endl;
      return 0;
}

unsigned int __stdcall mythreadB(void* data)
{
      cout << "_beginthreadex ID: " << GetCurrentThreadId() << endl;
      return 0;
}

void mythreadC(void* data) //alapértelmezetten _cdecl híváskonvenció
{
      cout << "_beginthread ID: " << GetCurrentThreadId() << endl;
}

int main()
{
      HANDLE myhandleA, myhandleB, myhandleC;

      myhandleA = CreateThread(0, 0, mythreadA, 0, 0, 0);
      WaitForSingleObject(myhandleA, INFINITE);
      CloseHandle(myhandleA);

      myhandleB = (HANDLE)_beginthreadex(0, 0, &mythreadB, 0, 0, 0);
      WaitForSingleObject(myhandleB, INFINITE);
      CloseHandle(myhandleB);

      myhandleC = (HANDLE)_beginthread(&mythreadC, 0, 0);
      WaitForSingleObject(myhandleC, INFINITE);

      return 0;
}

A CreateTthread() által meghívott függvény a standarad DWORD WINAPI típust kell, hogy használja, a _beginthreadex() függvényének típusa unsigned int __stdcall és a _beginthread() függvénye void típusú. Mindhárom függvény kiírja a saját azonosítóját. A WaitForSingleObject függvény egy objektumra vár, hogy az jelezze készenlétét. Ebben az esetben egy HANDLE típusú objektumra vár, ami akkor lesz kész, amikor az általa által képviselt szál a végéhez ér (ezt jelzi az INFINITE paraméter). Ez egyfajta szünetet jelent a processzornak, hogy ne lépjen tovább a következő utasításra, amíg a szál be nem fejeződik. Mindhárom szál létrehozása után szerepel, így azok egymást megvárva sorban fognak elindulni, mint mikor nincs szükség párhuzamos futásra.

A kimenet:
CreateThread ID: 8100
_beginthreadex ID: 6176
_beginthread ID: 7204

A _beginthread() egyszerűbb, mert nincs szüksége annyi paraméterre és nem kell a handle-jétt a CloseHandle() függvénnyel bezárni, ellenben mégis a _beginthreadex() függvényt előnyösebb használni. Amikor a _beginthread() szál véget ér, a visszatérített handle érvénytelen lehet vagy pedig újra felhasználható. Ez akkor jelent problémát, mikor egy _beginthread() szál véget ér, és egy másik indul el, amely ugyanazt a handle értéket kapja mint a véget ér szál. Ha ezután ellenőrizni szeretnénk az első szál handle-jét, akkor valójában a másik szál handle-jét ellenőrizzük. Ilyenkor a WaitForSingleObject() sem biztos, hogy a megfelelő szál befejezésére vár. Mivel a _beginthreadex() esetében a handle-t csakis kézzel lehet lezárni, ilyen hiba nem fordulhat elő.

A WaitForSingleObject() társa a WaitForMultipleObject(), mely egyszerre több szálra várakozik.

A szálakat többféleképp be lehet zárni, de a legjobb, ha abban a függvényben ér véget, amit kezdetben elindított. A fenti példákban az ID kiírása után rögtön véget is értek a szálak. Létezik az ExitThread() vagy a TerminateThread() függvény is, ám ezek használata félbeszakíthat egy olyan műveletet, amely a programot meghatározatlan állapotba hozza, például nem szabadulnak fel memóriaterületek vagy a közös változókba érvénytelen értékek kerülnek. A _beginthread() és _beingthreadex() társai az _endthread() és az _endthreadex() függvények. Ezek úgy vannak megírva, hogy felszabadítják a lefoglalt memóriát. Az _endthreadex() után a handle-t továbbra is kézzel kell bezárni.

#include <Windows.h>
#include <process.h>
#include <stdio.h>

unsigned int __stdcall mythread(void* data)
{
      printf("Thread %d\n",GetCurrentThreadId());
      return 0;
}

int main()
{
      HANDLE myhandle[2];

      myhandle[0] = (HANDLE)_beginthreadex(0, 0, &mythread, 0, 0, 0);
      myhandle[1] = (HANDLE)_beginthreadex(0, 0, &mythread, 0, 0, 0);

      WaitForMultipleObjects(2, myhandle, true, INFINITE);

      CloseHandle(myhandle[0]);
      CloseHandle(myhandle[1]);

      return 0;
}

A kimenet:

Thread 8044
Thread 6276

A cout() helyett a printf() kiíró függvény volt használva, hogy egyetlen utasításként legyen a „Thread” szöveg és az ID kiírása értelmezve. A WaitForMultipleObjects() első paramétere a szálak számát határozza meg, a második egy mutató a handle objektumokat tartalmazó tömbre. A harmadik paramétert ha igazra állítjuk, akkor addig vár, amíg minden szál be nem fejeződik, viszont a hamis azt jelentené, hogy elég az egyik szál befejezését megvárni. Az utolsó paraméter a várakozási idő a szál(ak) befejezéséig.

Ha a szálat felfüggesztett állapotúra inicializáljuk a létrehozásánál, akkor nem fog rögtön elindulni. A már futó szálat is fel lehet függeszteni a SuspendThread() függvénnyel, ám ez veszélyesebb művelet, mert az általa használt erőforrások (pl. mutexek) is felfüggesztődnek. A felfüggesztett állapotból a ResumeThread() hozza vissza a szálat futó állapotba. A következő program egy billentyűnyomásra vár, hogy elindíthassa a szálat.

#include <Windows.h>
#include <process.h>
#include <iostream>
using std::cout;
using std::endl;

unsigned int __stdcall mythread(void* data)
{
      cout << "Thread ID: " <<  GetCurrentThreadId() << endl;
      return 0;
}

int main()
{
      HANDLE myhandle;

      myhandle = (HANDLE)_beginthreadex(0, 0, &mythread, 0, CREATE_SUSPENDED, 0);
     
      getchar();
      ResumeThread(myhandle);
     
      WaitForSingleObject(myhandle, INFINITE);
      CloseHandle(myhandle);

      return 0;
}

A ResumeThread() függvények száma mindig azonos kell legyen a SuspendThread() függvények számával. Minden szálnak van egy számlálója, ami a felfüggesztés mértékét adja. Mindenik SuspendThread() növel egyet ezen, mindenik ResumeThread() csökkent a számlálón. Az adott szálat tehát többszörösen is fel lehet függeszteni és csak akkor indul el újra, ha a számláló nulláig csökken.

Objektumok és handle-ek

Az objektum a struktúrák továbbfejlesztett változata. Olyan erőforrásokat tartalmaz (fájlok, szálak), melyekhez a külső alkalmazások nem férnek hozzá az objektum révén. A handle egy típus-független referencia az objektumra. Ez azt jelenti, hogy a handle típusának nincs köze az objektum típusához, amire hivatkozik. Úgy is mondják, hogy absztrakt hivatkozás, mert elrejti az objektum valós memóriacímét az API felhasználótól. Olyan akár az index egy memóriacímekkel teli táblázatban: az API rendszerhívás az adott erőforrásnak az indexét (handle-jét) használja, de nem tudja, hogy milyen memóriacímen van pontosan. A Windows API függvények is többnyire handle-eket térítenek vissza és ezzel egy erőforrást alkotnak a kernel-space-ben. Az API felhasználók egyszerűen indexelik (a handle-ek révén) ezeket az erőforrásokat, majd a CloseHandle() függvénnyel jelzik a kernelnek, hogy szabadítsa fel az adott indexen lévő fizikai memóriaterületet. A handle-el rendelkező erőforrásokon a folyamatok osztozhatnak. Az erőforrásnak a handle-je viszont csak abban a folyamatban érvényes, amelyik éppen használja. Ugyanazt handle-t egy másik folyamat nem tudja használni egy időben. A kernelnek előbb hozzáférést kell adnia az erőforráshoz és egy új handle-t kell létrehoznia a folyamat számára. Példa erre, amikor egy fájlt egyszerre több programmal is megnyitunk.

Szinkronizálás a szálak között

#include <Windows.h>
#include <process.h>
#include <stdio.h>
#include <string>

volatile int counter = 0;

unsigned int __stdcall mythread(void*)
{
      std::string s;
      while (counter < 10)
      {
            counter++;
            if(counter % 2 == 0) s = "Igen"; else s = "Nem";
            printf("A %d szalban a(z) %d paros-e: %s\n",GetCurrentThreadId(), counter, s.c_str());
      }
      return 0;
}

int main()
{
      HANDLE myhandleA, myhandleB;

      myhandleA = (HANDLE)_beginthreadex(0, 0, &mythread, 0, 0, 0);
      myhandleB = (HANDLE)_beginthreadex(0, 0, &mythread, 0, 0, 0);

      WaitForSingleObject(myhandleA, INFINITE);
      WaitForSingleObject(myhandleB, INFINITE);

      CloseHandle(myhandleA);
      CloseHandle(myhandleB);

      return 0;
}

A kimenet:
A 4784 szalban a(z) 1 paros-e: Igen
A 392 szalban a(z) 2 paros-e: Nem
A 392 szalban a(z) 4 paros-e: Nem
A 392 szalban a(z) 5 paros-e: Igen
A 392 szalban a(z) 6 paros-e: Nem
A 392 szalban a(z) 7 paros-e: Igen
A 392 szalban a(z) 8 paros-e: Nem
A 392 szalban a(z) 9 paros-e: Igen
A 392 szalban a(z) 10 paros-e: Nem
A 4784 szalban a(z) 3 paros-e: Igen

Mindkét szál ugyanazt a függvényt futtatja, amely sorban veszi a számokat és kiírja, hogy párosak-e vagy sem. A közös függvény egy közös változóval, a counter-el dolgozik. Az ütemező nem engedheti, hogy mindkét szál ugyanazt a fizikai erőforrást használja egyszerre, ezért a szálak között versengés folyik. A program kimenetén látható, hogy éppen melyik szál volt az élelmesebb. Az ilyen viselkedés megengedhetetlen és súlyos következményekkel járhat, ezért valami védekezési mechanizmusra lesz szükség, ami megvédi a counter változót, hogy más szál ne módosítsa. Ha a védelem működik, akkor a szálak szinkronban vannak. A Win32 négy lehetőséget szolgáltat a szálak szinkronizálására: kölcsönös kizárás (mutex), kritikusszakasz, szemafor és esemény (event).
  •  Mutex (Mutual Exclusion): a legáltalánosabb adatvédő mechanizmus. Az a szál, amelyik előbb fér hozzá az erőforráshoz, blokkolja az adathoz tartozó mutexet, majd mikor végzett felszabadítja azt. Emiatt a többi szál nem tudja blokkolni ezt a mutexet amíg az eredeti szál fel nem szabadítja.
  • Kritikus szakasz: ugyanúgy működik, mint a mutex, viszont csak egyetlen folyamatban érvényes (ezért gyorsabb is a mutexnél). A mutex ellenben egy algoritmus, mely lehetővé teszi, hogy a folyamatok osztozzanak rajta, azaz kernel szintű erőforrásokkal dolgozik.
  • Szemafor: szintén hasonló a mutex működéséhez, viszont számláló tulajdonsága is van, azaz lehet növelni és csökkenteni jelezvén, hogy mennyi szál férhet hozzá az erőforráshoz ugyanazon időben. Ez már engedékenyebb mechanizmus, de jól jön ha például egy nagy adatbázisból kell adatokat kiolvasni. Az adatbázis egy erőforrást képvisel, amit gyorsabban át lehet járni, ha egyszerre több szál dolgozik rajta.
  • Eseményjelző (event): egy jelzés, amivel a szálak jelezhetnek egymásnak, például jelezhetik, hogy az erőforrás szabad.

Az előző példa alapján a kritikus szakasz a következőképp iktatható be:

#include <Windows.h>
#include <process.h>
#include <stdio.h>
#include <string>

volatile int counter = 0;
CRITICAL_SECTION critical;                // kritikus szakasz típusú változó deklarálása

unsigned int __stdcall mythread(void*)
{
      std::string s;
      while (counter < 10)
      {
            EnterCriticalSection(&critical); // kritikus szakasz kezdete
            counter++;
            LeaveCriticalSection(&critical); // kritikus szakasz vége
            if(counter % 2 == 0) s = "Igen"; else s = "Nem";
            printf("A %d szalban a %d paros-e: %s\n",GetCurrentThreadId(), counter, s.c_str());
      }
      return 0;
}

int main()
{
      HANDLE myhandleA, myhandleB;
      InitializeCriticalSection(&critical); // kritikus szakasz inicializálása

      myhandleA = (HANDLE)_beginthreadex(0, 0, &mythread, 0, 0, 0);
      myhandleB = (HANDLE)_beginthreadex(0, 0, &mythread, 0, 0, 0);

      WaitForSingleObject(myhandleA, INFINITE);
      WaitForSingleObject(myhandleB, INFINITE);

      CloseHandle(myhandleA);
      CloseHandle(myhandleB);
     
      DeleteCriticalSection(&critical);      // kritikus szakasz törlése

      return 0;
}

A program kimenete hasonló az előző kimenethez, azaz a szálak továbbra már nem versengenek a counter változóért, ugyanis a versengő szál várakozó (alvó) állapotb kerül, míg az erőforrás fel nem szabadul. Az altatás és ébresztés időigényes, ezért az EnterCricitalSection() helyett a TryEnterCriticalSection() függvény is választható. Ez igazat térít vissza, ha az erőforrás szabad, hamisat, ha nem. Kicserélve a EnterCriticalSection(&critical) parancsot a while(!TryEnterCriticalSection(&critical)){} paranccsal ugyan kiiktatja az alvó állapotba kerülést, viszont a while folyamatos ellenőrizgetése dolgoztatja a processzort megfosztván a többi szálat a processzor idejétől. Legrosszabb esetben teljesen leállhat az a szál, amelyik a kritikus szakaszban van. Egy megoldás erre a problémára, ha az ellenőrzés kevesebbszer ismétlődik annak reményében, hogy a kritikus szakaszban lévő szál hamar végez. Ha a beállított ismétlések száma lejárt és még mindig nem szabadult fel az erőforrás, akkor a szál alvó állapotba tér, míg az erőforrás fel nem szabadul. Ez a megoldás egyfajta kompromisszum a végtelen ismétlés és az alvás között. Kétféleképp is meg lehet adni, hogy a szál mennyiszer ellenőrizze és próbáljon hozzáférni az erőforráshoz:
  • InitializeCriticalSectionAndSpinCount(&critical, 1000) - rögtön az inicializálásnál meg lehet adni az ismétlések számát is.
  • SetCriticalSectionSpinCount(&critical, 1000) - az inicializálás után is be lehet állítani az ismétlések számát.

Mutexek

A Mutexek kernel objektumok, melyeken a folyamatok osztozhatnak. A CreateMutex() vagy a CreateMutexEx() függvényekkel hozhatóak létre, amik egy handle-t térítenek vissza a létrehozott a mutex objektumról. A CreateMutex() függvénynek három paramétere van: az első egy mutató egy biztonsági attribútumhoz (nulla ha nincs erre szükség), a második egy boolean, amit ha igazra állítunk, akkor az őt használó szál azonnal átveszi az erőforrás feletti hatalmat; a harmadik paraméter a mutex neve (nulla ha nincs rá szükség). Mivel handle-je van a mutexnek, azt a CloseHandle() függvénnyel lehet bezárni, amely egyúttal felszabadítja a mutexet.

#include <Windows.h>
#include <process.h>
#include <stdio.h>
#include <string>

volatile int counter = 0;
HANDLE mutex;                       // mutex objektum deklarálása

unsigned int __stdcall mythread(void*)
{
      std::string s;
      while (counter < 10)
      {
            WaitForSingleObject(mutex, INFINITE); // vár míg a mutex szabad lesz
            counter++;
            ReleaseMutex(mutex);                  // mutex felszabadítása
            if(counter % 2 == 0) s = "Igen"; else s = "Nem";
            printf("A %d szalban a %d paros-e: %s\n",GetCurrentThreadId(), counter, s.c_str());
      }
      return 0;
}

int main()
{
      HANDLE myhandleA, myhandleB;
     
      mutex = CreateMutex(0, 0, 0); // mutex inicializálása

      myhandleA = (HANDLE)_beginthreadex(0, 0, &mythread, 0, 0, 0);
      myhandleB = (HANDLE)_beginthreadex(0, 0, &mythread, 0, 0, 0);

      WaitForSingleObject(myhandleA, INFINITE);
      WaitForSingleObject(myhandleB, INFINITE);

      CloseHandle(myhandleA);
      CloseHandle(myhandleB);
     
      CloseHandle(mutex);           // mutex törlése

      return 0;
}

A WaitForSingleObject() visszatér amint a mutex szabad lesz, vagy ha lejár egy beállított idő. Ez egy plusz tulajdonság a kritikus szakaszhoz képest, valamint az is új, hogy a mutex létrehozásakor megadható, hogy az a szál legyen-e az első felhasználója az erőforrásnak amelyikben létrehoztuk a mutexet.

Szemaforok

A szemafor hasonló a mutexhez, azonban nem feltétlenül ugyanaz a szál szabadítja fel az erőforrást, mint amelyik lefoglalta, mert lehetővé teszi, hogy az erőforráson több szál is dolgozzék. Ez akkor hasznos mikor az erőforrás elég nagy ahhoz, hogy több szál is használja, például egy szerver, amelyen egyszerre x felhasználó lehet bejelentkezve. A szemaforokat háromféleképp lehet létrehozni:
  • CreateSemaphore(lpSemaphoreAttributes, InitialCount, lMaximumCount, lpName):
    •  lpSemaphoreAttributes: mutató a SECURITY_ATTRIBUTES struktúrára, mely eldönti, hogy a függvény handle-jét örökölhetik-e a gyerekfolyamatok.
    • InitialCount: kezdetben mennyi szál férhet hozzá az erőforráshoz
    •  lMaximumCount: legfeljebb mennyi szál férhet hozzá az erőforráshoz
    •  lpName: a szemafor objektum neve
  • CreateSemaphoreEx(lpSemaphoreAttributes, InitialCount, lMaximumCount, lpName, dwFlags, dwDesiredAccess):
    • dwFlags: lefoglalt paraméter, mindig 0
    • dwDesiredAccess: ezzel határozzuk meg, hogy a szemafor hozzáférhető lehet-e más folyamatok számára
  • OpenSemaphore(dwDesiredAccess, bInheritHandle, lpName):
    • bInheritHandle: egy boolean típus, mely megadja, hogy a handle öröklődjék-e (ha örökölhető)

A szemaforok is kernel objektumok, így az őket létrehozó függvény handle-t térít vissza a létrehozott objektum számára, amit végül a CloseHandle() függvénnyel lehet bezárni. A szemafor számlálója növekedik valahányszor egy szál befejezi használatát a ReleaseSemaphore() függvény révén és csökken valahányszor a WaitForSingleObject() visszatéríti őt. A ReleaseSemaphore()-nak három paramétere van: a szemafor handle-je amit fel kell szabadítani, a növelés száma és egy opcionális változó, amibe az előző értéke szerepel a számlálónak. Ha a szemafor számlálója legfeljebb 1 lehet, akkor a működése megegyezik a mutex működésével.

#include <Windows.h>
#include <process.h>
#include <stdio.h>
#include <string>

volatile int counter = 0;
HANDLE szemafor;                    // szemafor objektum deklarálása

unsigned int __stdcall mythread(void*)
{
      std::string s;
      while (counter < 10)
      {
            WaitForSingleObject(szemafor, INFINITE); // vár míg a szemafor szabad lesz
            counter++;
            ReleaseSemaphore(szemafor, 1, 0);        // szemafor felszabadítása
            if(counter % 2 == 0) s = "Igen"; else s = "Nem";
            printf("A %d szalban a %d paros-e: %s\n",GetCurrentThreadId(), counter, s.c_str());
      }
      return 0;
}

int main()
{
      HANDLE myhandleA, myhandleB;
     
      szemafor = CreateSemaphore(0, 1, 1, 0); // szemafor inicializálása

      myhandleA = (HANDLE)_beginthreadex(0, 0, &mythread, 0, 0, 0);
      myhandleB = (HANDLE)_beginthreadex(0, 0, &mythread, 0, 0, 0);

      WaitForSingleObject(myhandleA, INFINITE);
      WaitForSingleObject(myhandleB, INFINITE);

      CloseHandle(myhandleA);
      CloseHandle(myhandleB);
     
      CloseHandle(szemafor);        // mutex törlése

      return 0;
}

Kondícióváltozók

A Windows Vista-tól kezdve bevezették a kondícióváltozókat, melyeket a kritikus szakaszoknál vagy sz SRW lock-nál (Slim Reader Wirter) lehet használni. Az SRW lock egyfajta optimizált kritikus szakasz. Arra a problémára volt kitalálva, amikor egyes szálak csak olvasnak, mások csak írnak az erőforrásba. Ilyenkor a hagyományos kritikus szakasz vagy mutex használata nem elegendő, mert mi történik, ha például többen olvasnak, mint írnak. Ilyenkor az erőforrás nem frissül megfelelően és több olvasó szál is ugyanazt az információt fogja kiolvasni. Az erőforrást felváltva kell használják az író és olvasó szálak, melyre két módszer létezik:
·         Osztott hozzáférés: az olvasó szálaknak megengedi a versengést, viszont az olvasások száma nem lehet nagyobb az írások számánál.
·         Kizárásos hozzáférés: addig az erőforrás nem hozzáférhető az olvasó (és író) szálak számára, míg az író szál nem végzett vele.
Az erőforrás SRW lock-ja mindkét módszerrel teljesül: az olvasó szálak az osztott módszerrel, az író szálak a kizárásos módszerrel szerezhetik meg. A kondícióváltozó dolga, hogy leállítja a kritikus szakaszban lévő szálat, amíg egy adott kondíció nem teljesül. Például nem engedi az olvasó szálat tovább lépni, míg az író szál nem végez. A szál várakozását kritikus szakasz esetén a SleepConditionVariableCS(), SRW lock esetén a SleepConditionVariableSRW() függvény idézi elő. Amikor a szál újból felébred, a kritikus szakaszból vagy az SRW lock-ból folytatja. Ezeknek három fő paraméterük van:
  • ConditionVariable: mutató a kondícióváltozóra, melyet az InitializeConditionVariable() függvénnyel lehet inicalizálni.
  • CriticalSection: mutató a kritikus szakasz változóra.
  • dwMiliseconds: időkorlát milliszekundumban, ami ha eltelik, akkor a függvény átveszi a kritikus szakaszt és nullát térít vissza.

A SleepConditionVariableSRW() rendelkezik még egy negyedik paraméterrel (Flags), amely azt jelzi, hogy osztott vagy kizárásos módban működik az SRW lock.

A következő program egy termelő-fogyasztó probléma révén mutatja be a kondícióváltozók használatát. A termelő szál betesz egy elemet egy sorba, a fogyasztó szál kivesz egy elemet a sorból. A hozzáadás és a kivevés is kritikus szakaszban történik. Amikor a program elindul, nincs semmi a sorban, így a fogyasztó szálak kondícióváltozóra várnak, hogy jelezze a termelő szál végét.

#include <windows.h>
#include <process.h>
#include <queue>
#include <stdio.h>

std::queue<int> myqueue;

CONDITION_VARIABLE NotEmpty;
CONDITION_VARIABLE NotFull;
CRITICAL_SECTION critical;
BOOL ExitPressed;

unsigned int __stdcall Produce (void* p)
{
    ULONG ID = (ULONG)(ULONG_PTR)p;

    while (true) // folyamatosan termel
    {
        Sleep (rand() % 500); // termelési idő (gyorsabb mint a fogyasztási)
        EnterCriticalSection (&critical); // kritikus szakasz kezdete
        while (myqueue.size() == 10 && ExitPressed == FALSE) // amíg teli van addig leáll
            SleepConditionVariableCS (&NotFull, &critical, INFINITE);
        if (ExitPressed == TRUE) // ha kézzel megszakítjuk, akkor kritikus szakasz vége
        {
            LeaveCriticalSection (&critical);
            break;
        }
            myqueue.push(1); // elem hozzáadása
        printf ("Producer %u: queue size %d\r\n", ID, myqueue.size());
        LeaveCriticalSection (&critical);  // kritikus szakasz vége
        WakeConditionVariable (&NotEmpty); // jelzés a várakozó fogyasztónak
    }
    printf ("Producer %u exiting\r\n", ID);
    return 0;
}

unsigned int __stdcall Consume (void* p)
{
    ULONG ID = (ULONG)(ULONG_PTR)p;

    while (true) // folyamatosan fogyaszt
    {
        EnterCriticalSection (&critical); // kritikus szakasz kezdete
        while (myqueue.size() == 0 && ExitPressed == FALSE) // amíg üres addig leáll
            SleepConditionVariableCS (&NotEmpty, &critical, INFINITE);
        if (myqueue.size() == 0 && ExitPressed == TRUE)
        {
            LeaveCriticalSection (&critical);
            break;
        }
            myqueue.pop(); // elem kivétele
        printf ("Consumer %u: queue size %d\r\n", ID, myqueue.size());
        LeaveCriticalSection (&critical); // kritikus szakasz vége
        WakeConditionVariable (&NotFull); // jelzés a várakozó termelőnek
        Sleep (rand() % 2000); // fogyasztási idő (lassabb mint a termelési)
    }
    printf ("Consumer %u exiting\r\n", ID);
    return 0;
}

int main ( void )
{
    HANDLE hProducer1, hConsumer1, hConsumer2;

    InitializeConditionVariable (&NotEmpty);
    InitializeConditionVariable (&NotFull);
    InitializeCriticalSection (&critical);

    hProducer1 = (HANDLE)_beginthreadex(0, 0, &Produce, (void*)1, 0, 0);
    hConsumer1 = (HANDLE)_beginthreadex(0, 0, &Consume, (void*)1, 0, 0);
    hConsumer2 = (HANDLE)_beginthreadex(0, 0, &Consume, (void*)2, 0, 0);

    puts ("Press ENTER to stop...");
    getchar(); // a billentyűnyomás mindegy mikor történik
    EnterCriticalSection (&critical);
    ExitPressed = TRUE; // de a feldolgozása kritikus, nem maradhat el
    LeaveCriticalSection (&critical);

    WakeAllConditionVariable (&NotFull);
    WakeAllConditionVariable (&NotEmpty);

    WaitForSingleObject (hProducer1, INFINITE);
    WaitForSingleObject (hConsumer1, INFINITE);
    WaitForSingleObject (hConsumer2, INFINITE);

    CloseHandle(hProducer1);
    CloseHandle(hConsumer1);
    CloseHandle(hConsumer2);

    return 0;
}

A program két kondícióváltozót használ: egyet a termelőnek (NotFull), egyet a fogyasztónak (NotEmpty). Ezeket az InitialiyeConditionVariable() függvény inicializálja külön-külön. A fogyasztó szálak (hConsumer1, hConsumer2) a SleepConditionVariableCS() függvénnyel várakoznak, hogy elemek kerüljenek a sorba és a WakeConditionVariable() függvénnyel jelzik a termelőnek, hogy készek újabb termékek befogadására. A termelő szál is a SleepConditionVariableCS() függvénnyel vár a fogyasztóra, hogy kivegye az elemet a sorból és a WakeConditionVariable() függvénnyel jelez neki, hogy újabb termék áll rendelkezésre a sorban. A termelő gyorsabban dolgozik mint ahogy a két fogyasztó fogyaszt, így előbb utóbb megtelik a sor. Ha a program futását megszakítjuk, akkor a termelő leáll, a fogyasztó szálak pedig kiürítik a sort, hiszen csak akkor hagyják abba a futást, amikor a sor kiürült.

A program kimenete:
Press ENTER to stop...
Producer 1: queue size 1
Consumer 1: queue size 0
Producer 1: queue size 1
Producer 1: queue size 2
Producer 1: queue size 3
Consumer 2: queue size 2
Producer 1: queue size 3
Consumer 1: queue size 2
Producer 1: queue size 3
Producer 1: queue size 4
Producer 1: queue size 5
Consumer 2: queue size 4

Producer 1 exiting
Consumer 1: queue size 3
Consumer 2: queue size 2
Consumer 1: queue size 1
Consumer 2: queue size 0
Consumer 1 exiting
Consumer 2 exiting

Eventek

Egy szál egy eseményjelző (event) Win32 objektum segítségével jelezni tudja más szálaknak, ha egy esemény végbement. Ezt követően az eseményjelző kiinduló állapotba való állítása manuálisan és automatikusan is történhet. Az állapot lehet jelzett vagy nem-jelzett. Az állapotot és a visszaállítási módot az eseményjelző inicializálásánál határozzuk meg. Ha az eseményjelző jelzett állapotban van, akkor a ResetEvent() függvénnyel állítható vissza nem-jelzett állapotba. Ugyanezt kondícióváltozókkal, szemaforokkal és mutexekkel is véghez lehet vinni. Ha több szál vár ugyanarra az eseményre, akkor az első szál amely észreveszi, rögtön nem-jelzett állapotra teszi az eseményváltozót, így a többi várakozó szál is felszabadul. Az eseményjelzők kernel objektumok, ezért a CreateEvent() függvény egy handle-t térít vissza. Négy paramétere van:
  • lpEventAttributes: biztonsági attribútum, mely eldönti hogy a handle örökölhető-e a gyerekfolyamatok számára
  • bManualReset: boolean típus, ami ha igaz, akkor az eseményjelzőt kézzel kell újraállítani
  • bInitialState: boolean típus, ami ha igaz, akkor a kezdő állapot jelzett állapot
  • lpName: az eseményjelző neve

A létrehozott eseményjelzőt az OpenEvent() függvénnyel lehet létrehozni, melynek három paramétere van:
  • dwDesiredAccess: hozzáférési jogok. NULL az alapértelmezett
  • bInheritHandle: egy boolean típus, mely megadja, hogy a handle öröklődjék-e (ha örökölhető
  • lpName: az eseményjelző neve, amelyiket megnyitunk

A SetEvent() függvény az eseményjelzőt jelzett állapotba teszi. Ettől a WaitForSingleObject() függvénnyel várakozó szálak, melyek az eseményre várakoznak felszabadulnak.

#include <Windows.h>
#include <process.h>
#include <stdio.h>

HANDLE myEvent;

unsigned int __stdcall mythreadA(void* param)
{
      WaitForSingleObject(myEvent, INFINITE); // eseményre vár
      printf("Thread A finished.\n");         // majd üzenetet ír ki
      return 0;
}

unsigned int __stdcall mythreadB(void* param)
{
      printf("Thread B finished.\n"); // üzenetet ír ki
      SetEvent(myEvent);             // majd eseményt kreál
      return 0;
}

int main(int argc, char* argv[])
{
      HANDLE myhandleA, myhandleB;
     
      myEvent = CreateEvent(0, 0, 0, 0); // nem örökölhető, manuális úrjaállítás, nem-jelzett, névtelen

      myhandleA = (HANDLE)_beginthreadex(0, 0, &mythreadA, (void*)0, 0, 0);
      myhandleB = (HANDLE)_beginthreadex(0, 0, &mythreadB, (void*)1, 0, 0);

      WaitForSingleObject(myhandleA, INFINITE);
      WaitForSingleObject(myhandleB, INFINITE);

      CloseHandle(myhandleA);
      CloseHandle(myhandleB);
      CloseHandle(myEvent);

      return 0;
}
A fenti példában az első szál elindítja a myThreadA() függvényt, majd egy eseményre vár. A második szál elindítja a myThreadB() függvényt, jelzetté teszi az eseményjelzőt és ezzel véget is ért a szál futása. Az első szál észleli az eseményjelző állapotát és folytatja a függvényt, azaz kiírja a saját üzenetét.

A kimenet:
Thread B finished.
Thread A finished.

A következő programban egy szál és két eseményjelző szerepel. A program azt az eseményt írja ki, amelyiket a szál jelzettre állított.

#include <windows.h>
#include <process.h>
#include <stdio.h>

HANDLE ghEvents[2]; // két eseményobjektum

unsigned int __stdcall ThreadProc(void* param)
{
    SetEvent(ghEvents[1]); // a második eseményt állítja jelzettre
    return 0;
}

int main( void )
{
    HANDLE hThread;
    DWORD dwEvent;

    ghEvents[0] = CreateEvent( 0, 0, 0, 0);
    ghEvents[1] = CreateEvent( 0, 0, 0, 0);
    hThread = (HANDLE)_beginthreadex(0, 0, &ThreadProc, (void*)0, 0, 0);
    dwEvent = WaitForMultipleObjects(2,ghEvents,FALSE,5000);
      // 5 másodpercet vár hogy valamelyik eseményobjektum jelzést kapjon

    switch (dwEvent) // a visszatérített érték a jelzett esemény indexe
    {
      
        case WAIT_OBJECT_0 + 0:  // ghEvents[0] volt jelzettre állítva
            printf("Az elso esemeny van jelzett allapotban.\n");
            break;

        case WAIT_OBJECT_0 + 1:  // ghEvents[1] volt jelzettre állítva
            printf("A masodik esemeny van jelzett allapotban.\n");
            break;

        case WAIT_TIMEOUT:
            printf("Az 5 masodperc a jelzes elott letelt.\n");
            break;

        default:
            printf("Hiba: %d\n", GetLastError());
            ExitProcess(0);
    }

    CloseHandle(ghEvents[0]);
    CloseHandle(ghEvents[1]);
    CloseHandle(hThread);
   
    return 0;  
}

A kimenet: A masodik esemeny van jelzett allapotban.

std::thread

A C++11 szabványban meghatározott szál nagyon hasonló a Boost szabványban lévőhöz, de van pár különbség is köztük. Nincs viszont olyan az egyikben, amit a másikban nem lehetne megvalósítani, ezért ha a projekt, ami szálakat igényel, nem használja a Boost könyvtárakat, akkor érdemes az std szálakkal kezdeni. A WinAPI szálakkal szemben rengeteg különbség van, melyek az std szálak javára szolgálnak. Többnyire azért, mert az std szálak egy magasabb absztrakciós szinten működnek, azaz Windows alatt futtatva a WinAPI szálakat, Linux alatt futtatva pedig a pthread szálakat használják. Emiatt nem platform függő, ami elég indok ahhoz, hogy ezt válasszuk a fejleszthetőség érdekében.

#include <iostream>
#include <thread>

void fuggveny() // a szál által elindított függvény
{
    std::cout << "Thread" << std::endl;
}

int main()
{
    std::thread t(fuggveny); // a szál elindítása
    t.join(); // a szál csatlakozása a főszálhoz
    return 0;
}

A t a thread osztálynak az objektuma, melynek és egyetlen paramétere az a függvény, amit végre kell hajtson. Ha több paraméter is van a zárójelben, akkor ezek bemenő értékek lesznek a függvény számára. A t.join() utasítás arra utasítja a fő szálat, hogy várja meg, míg a t1 szál végez és csak utána lépjen tovább. Ha ez nincs ott, akkor a fő szál hamarabb végezhet ami hibát eredményez. Hasonló a WinAPI WaitforSingleObject() függvényéhez.

A join() függvényt követően az adott szál nem lesz többé join()-olható. Ez azt jelenti, hogy már véget ért a szál futása, többé nincs miért várni rá. Tulajdonképpen a függvény meghívása előtt ezt le is kell ellenőrizni, mert előfordulhat, hogy véget ér a szál mielőtt a join() utasításhoz eljut a fő szál.

if (t.joinable()) t.join();

A joinable() függvény bool típusú és igazat térít vissza, ha a szál még join()-olható (azaz fut). Ha nem szeretnénk, hogy a program várjon a t szál befejezésére, de ez mégse okozzon hibát, akkor a t.detach() utasítással leválasztható a t mellékszál a fő száltól. Ilyenkor a mellékszál függetlenül fut és az általa használt erőforrások felszabadulnak, amint véget ért. A leválasztott szálat (daemon szálat) többé nem lehet join()-olni és szinkronizálni.

A következő program 10 szálat hoz létre és vár befejezésükig:

#include <thread>
#include <iostream>

void fuggveny(int k) // a szál által elindított függvény
{
    std::cout << "Thread " << k << std::endl;
}

int main()
{
    std::thread t[10];

    for (int i = 0; i < 10; ++i) // 10 szál elindítása
      {
        t[i] = std::thread(fuggveny,i);
    }

    std::cout <<"Main Thread" << std::endl;

    for (int i = 0; i < 10; ++i) // várakozás a szálak befejezésére
    {
        t[i].join();
    }

    return 0;
}

A kimenet:
Thread Thread 3Thread 2
Main Thread
Thread 6
0
Thread Thread 7Thread 4Thread 5Thread 9

8



Thread 1

A kimenetnek nincs sorrendje, a szálak nem a létrehozásuk sorrendjében indulnak el. A befejezésük sem az elindulásuk sorrendjétől függ, van hogy az egyik szál még nem írta ki teljesen a szövegét miközben egy másik is elkezdte kiírni a sajátját. Minden futtatásnál más az eredmény, van hogy teljesen olvashatatlan, mert mind a 11 szál (1 fő + 10 mellék) verseng a kimenő folyamra (stdout) való hozzáféréshez.

Szinkronizálás

Akár a WinAPI esetén, az erőforrásokért való versengést szinkronizálással (pl. mutexszel) lehet szabályozni.

#include <thread>
#include <iostream>
#include <mutex>

static std::mutex barrier;

void fuggveny(int k) // a szál által elindított függvény
{
    barrier.lock();
    std::cout << "Thread " << k << std::endl;
    barrier.unlock();
}

int main()
{
    std::thread t[10];

    for (int i = 0; i < 10; ++i) // 10 szál elindítása
    {
        t[i] = std::thread(fuggveny,i);
    }

    std::cout <<"Main Thread" << std::endl;

    for (int i = 0; i < 10; ++i) // várakozás a szálak befejezésére
    {
        t[i].join();
    }

    return 0;
}

A kimenet:
Thread 0
Thread 1
Thread 2
Main Thread
Thread 3
Thread 4
Thread 5
Thread 6
Thread 8
Thread 7
Thread 9

A barrier egy globális mutex objektum, melyet a szálak ellenőriznek. A függvény csak akkor lépik tovább a kiírásra, ha a barrier szabaddá válik (azaz lehet lock()-olni).

Funktor objektumok

Az objektum orientált programozás elemei is alkalmazhatók a szálakban. Ahhoz, hogy egy objektumot adjunk át paraméterként egy szálnak, az objektum az osztályának rendelkeznie kell egy függvénnyel, amely felülírja (túlterheli) a zárójel operátort. Az ilyen osztályok objektumait functor-nak nevezik:

#include <iostream>
#include <thread>

class FunctorOsztaly
{
public:
      void operator()()
      {
            std::cout << "A () operator tulterhelve." << std::endl;
      }
};

int main()
{
    FunctorOsztaly FunctorObj;
    std::thread t(FunctorObj); // a szál elindítása
    t.join(); // a szál csatlakozása a fõszálhoz
    return 0;
}

A szálakat tagfüggvényekkel is lehet inicializálni. Ilyenkor nem kötelező az operátorfüggvény jelen legyen:

#include <iostream>
#include <thread>

class FunctorOsztaly
{
public:
      void Fuggveny(int a)
      {
            std::cout << "Publikus fuggveny: " << a << std::endl;
      }
};

int main()
{
    FunctorOsztaly FunctorObj;
    std::thread t(FunctorOsztaly::Fuggveny,FunctorObj,6); // a szál elindítása
    t.join(); // a szál csatlakozása a fõszálhoz
    return 0;
}

A kimenet: Publikus fuggveny: 6

Ha az inicializálásban csak a functort használjuk, akkor az utána felsorolt paraméterek az operátorfüggvény paraméterei lesznek.

Azonosítók

Akár a WinAPI szálaknál, az std szálaknak is van egyéni azonosítója, amit a get_id() függvény térít vissza.

#include <thread>
#include <iostream>
#include <mutex>

static std::mutex barrier;

void fuggveny() // a szál által elindított függvény
{
    std::lock_guard<std::mutex> block_threads_until_finish_this_job(barrier);
    std::cout << "Thread " << std::this_thread::get_id() << " started\n";
}

int main()
{
      std::cout << "Main Thread " << std::this_thread::get_id() << " started\n";
     
    std::thread t[10];
    std::thread::id id[10];

    for (int i = 0; i < 10; ++i) // 10 szál elindítása
    {
        t[i] = std::thread(fuggveny);
        id[i]=t[i].get_id();
    }

    for (int i = 0; i < 10; ++i) // várakozás a szálak befejezésére
    {
        t[i].join();
        std::lock_guard<std::mutex> block_threads_until_finish_this_job(barrier);
        std::cout << "Thread " << id[i] << " terminated\n";
    }
   
    std::cout << "Main Thread " << std::this_thread::get_id() << " terminated\n";

    return 0;
}

A kimenet:
Main Thread 1 started
Thread 2 started
Thread 4 started
Thread 5 started
Thread 3 started
Thread 6 started
Thread 7 started
Thread 8 started
Thread 9 started
Thread 2 terminated
Thread 3 terminated
Thread 10 started
Thread 4 terminated
Thread 11 started
Thread 5 terminated
Thread 6 terminated
Thread 7 terminated
Thread 8 terminated
Thread 9 terminated
Thread 10 terminated
Thread 11 terminated
Main Thread 1 terminated

Az std::lock_guard<std::mutex> block_threads_until_finish_this_job(barrier) utasítás addig blokkolja a továbblépést, míg a barrier szabaddá nem válik. Ugyanaz a hatása, mint a barrier.lock() és barrier.unlock() függvényeknek.

Amikor a join() vagy detach() függvényeket csakis az std::thread objektum lebontása előtt használhatjuk. Emellett biztosítani kell, hogy a program eljusson a join() utasításig és semmilyen kivétellel sem ugorhassa át. Ezt a legegyszerűbben úgy lehet elkerülni, ha a kivételekben is meghívjuk a join() függvényt:

      try
      {
            current_thread_task();
      }
      catch(...)
      {
            cout << "catch(...)\n";
            t.join();
            throw;
      }
      t.join();

Emiatt biztos, hogy végezni fog a szál mielőtt a program tovább lépne.

A következő példában a szál egy függvénnyel van inicializálva, majd rögtön lecsatlakozik a főszálról:

#include <thread>

void CallableObj(std::string const& s) {}

void fuggv()
{
      char buf[] = "randomszoveg\n";
      std::thread t(CallableObj,buf);
      t.detach();
}

int main()
{
      fuggv();
      //következő utasítás
      return 0;
}

A főprogram (fő szál) meghívja a fuggv() függvényt, majd tovább lépik. A fuggv() létrehoz egy szálat, melynek paramétere egy std::string típust váró függvény. A CallableObj() függvény a stringet mint char const* típusú paramétert (a buf[] mutatója) kapja meg és stringgé csakis a szál kontextusán belül alakítja. Az átalakítás viszont félbe szakadhat ha a fuggv() közben véget ér. Ez elkerülhető, ha már az inicializálásnál stringgé alakítjuk a buf változót:

std::thread t(CallableObj, std::string(buf));

Move konstruktor

Gyakran szükség van olyan programra, amelyben egy mellékszál által használt erőforrásra szüksége lesz a főszálnak, még mielőtt a mellékszál végezne vele. Ilyenkor az erőforrás tulajdonjogát kell áthelyezni egyik szálból a másikba. Az ilyen esetek miatt az std::thread osztálynak van egy áthelyező (move) konstruktora, ami lehetővé teszi, hogy a példányokat (memóriában tárolt objektumokat) át lehessen helyezni egyik memóriacímről egy másikra. A tulajdonjog nem másolható, csak áthelyezhető. Ugyanilyen osztály az std::ifstream és az std::unique_ptr is. A move konstruktor annyit tesz, hogy az adatok mutatóit átmásolja magának, a helyükre pedig üres értéket rak. Nincs adatmásolás és nincs memóriafoglalás, ezért sokkal gyorsabb a másoló konstruktornál. A következő program létrehoz három szálat, melyek között többször is áthelyezi az erőforrások tulajdonjogát.

#include <iostream>
#include <thread>

using namespace std;

void f1() { cout << "f1()\n"; }
void f2() { cout << "f2()\n"; }


int main()
{
     
      thread t1(f1);         // t1 elindul az f1 függvénnyel
      thread t2 = move(t1);  // t2 átveszi t1 erõforrásának (f1) tulajdonjogát
      t1 = thread(f2);       // t1-nek nincs dolga, ezért megjapja f2 függvényt
      thread t3;             // t3 elindul, de nincs dolga
      t3 = move(t2);         // t3 átveszi t2-tõl az erõforrás (f1) tulajdonjogát.
      //t1.detach();
      t1 = move(t3);         // t1, aki még az f2-t futtatja megkapja t3 erõforrását (f1).
                             // Ettõl kiakad a program.
      t1.join();                  
     
      return 0;
}
A move utasítás a C++11 szabványban jelent meg és elérhetővé teszi a thread osztály move konstruktorát. Egy dolgozó szálhoz történő áthelyezés során (t1 = move(t3)) a program összeomolhat, mert arra kényszerül, hogy állítson meg egy dolgozó szálat. Ez elkerülhető, ha előtte lekapcsoljuk a szálat a főszálról (t1.detach()), így a definiált t1 most új szálként kaphatja meg t3 erőforrásait.

Az erőforrás tulajdonjoga függvénynek is átadható, ha annak paraméteri között szál is szerepel:

#include <iostream>
#include <thread>

using namespace std;

void fnc()
{
      cout << "thread task fnc()\n";
}

void f(thread t)
{
      cout << "f(thread t)\n";
      t.join();
}

int main()
{
      f(thread(fnc)); // kiírja az f() majd a fnc() szövegét
      thread t(fnc);  // kiírná a fnc() szövegét de előbb
      f(move(t));     // kiírja az f() szövegét
      return 0;
}

Az f() egy thread típusú paramétert vár, ami lehet éppen a meghívásnál vagy azelőtt definiálva. Ha már definiálva volt, akkor muszáj a move() szemantikát használni, hisz különben másolás történne, ami a szálak esetén nem lehetséges.

Konténerek

A move() a konténereknek is lehetővé teszi, hogy szálakkal dolgozzanak. Például létrehozható egy std::vector<> konténer osztály, melybe szálakat lehet pakolni, így azok kezelése is egyszerűbbé válik (csoportként lehet velük bánni, nem kell külön deklarálni mindegyiket).

#include <iostream>
#include <thread>
#include <algorithm>
#include <vector>
#include <mutex>

using namespace std;
static std::mutex barrier;

void doTask(unsigned id)
{
      barrier.lock();
      cout << "thread " << id << endl;
      barrier.unlock();
}

int main()
{
      vector<thread> threads;

      for(unsigned i = 1; i <= 10; ++i)
      {
            // 10 darab szál elindítása
            threads.push_back(thread(doTask, i));
      }

      // mindenik szál join()-olása
      for_each(threads.begin(), threads.end(), mem_fn(&thread::join));
     
      return 0;
}

A kimenet:
thread 1
thread 2
thread 3
thread 4
thread 5
thread 6
thread 7
thread 8
thread 9
thread 10



Nincsenek megjegyzések:

Megjegyzés küldése