A J# programozási nyelv

Párhuzamosság



Párhuzamosság

Szintaxis

Ahhoz, hogy egy objektumból létre tudjunk hozni egy új végrehajtási szálat, szükséges, hogy az objektum osztálya implementálja a Runnable interfészt. Itt a run() metódst kell megvalósítani. A szál indításakor ez a metódus fog dolgozni.

A szálkezeléshez a következő csomagot kell feltétlenül beimportálni: java.lang.Thread (importálható a System.Threading .NET-es namespace is). Itt található a Thread osztály, melyet a következőképpen kell használni:

public class ParhuzamosOsztaly implements Runnable { {osztály változók} public ParhuzamosOsztaly(){ inicializálás } public run() //ezt kell megvalósítani { ... A szálként futó kód ... } } ... private ParhuzamosOsztaly parhuzamosOsztaly; private Thread szal; parhuzamosOsztaly = new ParhuzamosOsztaly(); szal = new Thread(parhuzamosOsztaly); // szál indítása: szal.start();

Tehát új szál létrehozásakor a konstruktornak egy olyan osztály példányát kell megadni, mely implementálja a korábban említett Runnable interfészt. Ezzel a szál még nem indul el automatikusan, ezért meg kell hívni a start() metódust.

A java.lang.Thread helyett használható a már korábban említett System.Threading névtérben található Thread osztály. Ez kicsit több szabadságot enged meg, mivel nem kell implementálni semmilyen interfészt, hogy egy osztály metódusát szálként indítsuk el. Továbbá nagyon egyszerűen indíthatunk paraméterezett szálat:

package ThreadProba; import System.Windows.Forms.*; import System.Threading.*; public class TProba { protected ListBox sajat_log; private EventWaitHandle PauseEWH; /** @delegate */ protected delegate void LogChangeDelegate(Object szoveg, Object log); private Boolean fut; public TProba(ListBox _log) //ezt hívjuk majd paraméterrel { sajat_log = _log; PauseEWH = new EventWaitHandle(false, EventResetMode.ManualReset); //kezdetben hamis, azaz nem indul fut = new Boolean(true); } public void fut(Object ezt) { String szoveg = (String)ezt; //a kiírandó szöveg LogChangeDelegate lcd = new LogChangeDelegate(log_change); PauseEWH.WaitOne(); while (fut.booleanValue()) { PauseEWH.WaitOne(); if (fut.booleanValue()) { PauseEWH.WaitOne(); sajat_log.BeginInvoke(lcd, new Object[] { szoveg, sajat_log }); try { Thread.sleep(500); } catch (InterruptedException ex) { } } } } public void Start() { PauseEWH.Set(); } /** * PauseEWH-t "hamis"-ra állítja */ public void Pause() { PauseEWH.Reset(); } public void Continue() { PauseEWH.Set(); } /** * Leállítja a végrehajtást */ public void Stop() { PauseEWH.Reset(); fut = fut.FALSE; PauseEWH.Set(); } /** * szoveg-et beírja a log ListBox-ba */ protected void log_change(Object szoveg, Object log) { ((ListBox)log).get_Items().Add(((String)szoveg)); ((ListBox)log).set_SelectedIndex(((ListBox)log).get_Items().get_Count()-1); } } ... //inicializálás: private ListBox log; private TProba tp; private System.Threading.Thread Szal; ... //szál indítása tp = new TProba(log); Szal = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(tp.fut)); //szál inicializálása String sz = "ezt írd"; Szal.Start(sz); //paraméterezett hívás tp.Start(); ... //szál leállítása tp.Stop(); Szal.Join(); ...

Talán furcsa, de a paramétereket nem a szál inicializálásakor kell megadni, hanem akkor, amikor elindítjuk a szálat (Szal.Start(paraméter)). Ez a rövid program a paraméterben átadott szöveget írja ki egy ListBox-ba. A szinkronizációs utasításokról a következő alfejezetben lehet olvasni.

Felvetődhet a kérdés, hogy mire kell a delegate? Vegyük a következő esetet: egy szálból hozzá szeretnénk férni mondjuk egy ListBox-hoz. Tegyük fel, hogy egy új sort szeretnénk beszúrni. Abban az esetben, ha egyszerűen meghívnánk a ListBox objektum Add(Object item) metódusát kivételt kapnánk. Ennek az az oka, hogy a mi új szálunk nem ugyanabban a végrehajtási szálban fut, mint a ListBox objektumunk. Ezen a problémán segít a ListBox BeginInvoke metódusa. Ez egy delegate-et vár paraméterként. Abban az esetben, ha a delegate függvényünk paraméterekkel is rendelkezik, akkor a BeginInvoke hívásnál második paraméterben adhatjuk meg ezeket. Viszont arra ügyeljunk, hogy itt csak egy Object adható meg! Tehát, ahogy a fenti példából is kitűnik, ilyenkor célszerű egy Object-eket tartalmazó tömböt átadni paraméterként, majd a delegate metódus megvalósításánál ezeket a megfelelő típusra cast-olhatjuk. Tehát használva az aszinkron BeginInvoke a megfelelő metódussal és paraméterekkel, el tudjuk végezni új elem beszúrását.

Első körben tehát létre kell hozni egy delegate típust és a vele megegyező szignatúrájú metódust (itt történik az új elem hozzáadása):

/** @delegate */ protected delegate void LogChangeDelegate(Object szoveg, Object log); ... protected void log_change(Object szoveg, Object log) { ((ListBox)log).get_Items().Add(((String)szoveg)); ((ListBox)log).set_SelectedIndex(((ListBox)log).get_Items().get_Count()-1); }

Ezek segítségével már meg tudunk valósítani egy aszinkron BeginInvoke hívást a következőképpen:

LogChangeDelegate lcd = new LogChangeDelegate(log_change); ... sajat_log.BeginInvoke(lcd, new Object[] { szoveg, sajat_log });

Tehát létrehozunk egy LogChangeDelegate típusú delegate-et, természetesen arra ügyelve, hogy a szignatúra megegyezzen. Majd a feljbebb magyarázott módon meghívjuk a BeginInvoke metódust. A delegate-ekről a Típusok, típuskonstrukciók témakörön belül olvashat.

Szinkronizáció

Az előző példában található két metódus a Sleep és Join egyszerű blokkoló utasítások. A Sleep hatására a szál a paraméterben megadott miliszekundumig blokkolódik, azaz felfüggeszti a tevékenységét, majd az idő lejártával folytatja azt. A Join metódus egy szál bevárására használható. A metódus hívásának helyén a végrehajtás addig szünetel, amíg a bevárandó szál nem végzett.

Van egy kis különbség a java.lang.Thread és a A System.Threading.Thread száltípusok között. A java.lang.Thread esetében, amikor egy blokkoló utasítást hívunk, egy InterruptedException kivétel történik. Ezt feltétlenül le kell kezelni. A System.Threading.Thread esetében ilyenekkel nem kell vesződni.

Az egyik legfontosabb dolog párhuzamos környezetekben, hogy az erőforrásokhoz való hozzáférést szabályozzuk. Magyarul, ha egy szál dolgozik valamilyen erőforráson, akkor egy másik szál ne férhessen hozzá. Például:

... kivesz=puffer.Top(); puffer.Pop(); szoveg = "Kivesz: " + kivesz; puffer_log.BeginInvoke(lcd, new Object[] { szoveg, puffer_log }); szoveg = "Kivesz: " + kivesz; sajat_log.BeginInvoke(lcd, new Object[] { szoveg, sajat_log }); ...

Ez a rövid programrészlet a végrehajtási szál azon részét mutatja be, amikor egy veremből kivesszük a legfelső elemet, majd ki is töröljük. A működés közben azt várjuk el, hogy egy elemet csak egy szál vehet le. Látszólag ezzel nincs semmi probléma. De mi van abban az esetben, ha indítunk egy másik szálat ugyanezzel a tevékenységgel és az ütemezés a következőképpen történik (S1 és S2 a két szál):

Látható, hogy már a második lépésben rosszul fog működni a program, de a nagyobb probléma csak ezután következik be. Amikor S1 törli a legfelső elemet, akkor ezután nem sokkal az S2 is ugyanezt teszi. Ekkor két eset állhat fenn:

Ezt az eset úgy oldható meg, hogy kölcsönös kizárást alkalmazunk a puffer veremobjektumra. Erre több lehetőséget biztosít a .NET. Használhatók a beépített System.Threading.Monitor és System.Threading.Semaphore osztályok (ezeket itt most nem mutatjuk be). Ebben az esetben azonban egyszerű és kézenfekvő megoldás a synchronized utasítás (C#-ban lock a párja).

Vegyünk egy szinkronizációs objektumot. Ez bármilyen referencia típus lehet, jelen esetben használható a puffer veremobjektum is. Az utasítás szintaxisa a következő:

... synchronized (puffer) { kivesz=puffer.Top(); puffer.Pop(); szoveg = "Kivesz: " + kivesz; puffer_log.BeginInvoke(lcd, new Object[] { szoveg, puffer_log }); szoveg = "Kivesz: " + kivesz; sajat_log.BeginInvoke(lcd, new Object[] { szoveg, sajat_log }); } ...

Egyszerre csak egy szál veheti birtokába a szinkronizációs objektumot. Hangsúlyozzuk, hogy ez bármilyen referencia típus lehet, nem kötelező pont a puffer objektumot használni. Amikor egy szál megkapja az objektumot, akkor a többi szál, amelyek szintén szeretnék használni az erőforrásokat, blokkolódnak ezen a ponton. Amikor a szál végzett a munkájával, akkor a várakozó szálak közül valaki megkapja az erőforrást, majd a többi újból blokkolódik. Fontos: az, hogy használjuk a synchronized utasítást, még nem biztos, hogy teljesül a kölcsönös kizárás egy objektum esetében. Éppen ezért, ha azt szeretnénk, hogy egy objektumhoz egyszerre csak egy szál férhessen hozzá, akkor az objektum minden előfordulásánál használni kell a szinkronizáló utasítást.

Abban az esetben, ha két szálat szeretnénk szinkronizálni (például, ha az egyik végzett, csak utána induljon a másik), hasznos dolog az EventWaitHandle két alosztályának a használata. Ezek az AutoResetEvent és ManualResetEvent (mindkettő a System.Threading névtérben van). Az AutoResetEvent úgy működik mint egy forgóajtó, melyen csak érvényes jeggyel lehet bemenni. Az Auto azt jelenti az osztály nevében, hogy a forgóajtó automatikusan bezáródik, amikor valaki áthalad rajta. Egy szál a WaitOne metódus hívásával várakozhat a forgóajtónál addig, amíg újra ki nem nyílik. A Set metódus hívásával nyílik ki az ajtó, így adva lehetőséget azoknak a szálaknak, amelyek WaitOne-nál várakoznak. Példa AutoResetEvent-re (Vigyázat! - C# kód):

//inicializálásra két mód is van EventWaitHandle wh = new AutoResetEvent (false); EventWaitHandle wh = new EventWaitHandle (false, EventResetMode.Auto); class BasicWaitHandle { static EventWaitHandle wh = new AutoResetEvent (false); static void Main() { new Thread (Waiter).Start(); Thread.Sleep (1000); // várunk 1000ms-t wh.Set(); // megengedjük, hogy belépjen valaki az forgóajtón } static void Waiter() { Console.WriteLine ("Waiting..."); wh.WaitOne(); // Vár, hogy bemehessen az ajtón Console.WriteLine ("Notified"); } }

A ManualResetEvent annyiban különbözik az AutoResetEvent-től, hogy itt nem történik meg automatikusan a "forgóajtó" kinyitása. Ezt a Reset művelettel lehet megtenni. Például:

... protected EventWaitHandle PauseEWH; protected Boolean fut; ... PauseEWH = new EventWaitHandle(false, EventResetMode.ManualReset); //kezdetben hamis, azaz nem indul el fut = new Boolean(true); ... PauseEWH.WaitOne(); //addig blokkolódik, míg "igaz" nem lesz while (fut.booleanValue()) //megy, amíg fut=igaz { PauseEWH.WaitOne(); if (fut.booleanValue()) { ... Tevékenység ... } } ...

A fenti példában amikor a szál elindul, kezdetben blokkolódik PauseEWH miatt. Amikor valamelyik végrehajtási szálon belül meghívjuk a PauseEWH.Set() metódust, a végrehajtás elindul. Ha bármikor a végrehajtás alatt meghívjuk a PauseEWH.Reset() metódust, a futás felfüggesztődik. A Set() metódus újbóli meghívásával a futás folytatódik. A ciklus addig fut, míg a fut Boolean típusú változó értéke True. Ezt hamisra állítva a szál befejezi futását. Ha egy szálat le szeretnénk állítani, vagy csak szüneteltetni szeretnénk, akkor inkább alkalmazzuk ezen technikákat és kerüljük a Thread osztály Stop, Interrupt, Abort, Suspend és Release utasításait.