1. Szintaxis
A Ruby támogatja a konkurens programok készítését. Minden programnak van egy fő szála - ha ez terminál, a többi szál is terminál. Ez azért is érdekes, mert a USER INTERRUPT exception mindig a fő szálnak küldődik.
A párhuzamosság jellege tehát nyelvi szintű, de mivel tetszőleges rendszerhívást végrehajthatunk, az operációs rendszer által nyújtott lehetőségeket is használhatnánk ilyen célokra.
A nyelv által kínált párhuzamosítási lehetőségeket a Thread, a Mutex és a Queue osztály segítségével vehetjük igénybe. Az aktuális szállal párhuzamosan a Thread osztály new metódusával tudunk utasításokat végrehajtani. A new paramétere egy utasításblokk, ami konkurrens módon fog végrehajtódni.
2. Kommunikáció
Az egyes szálak elsősorban osztott változók segítségével kommunikálnak egymással. A Queue osztály segítségével megvalósított aszinkron üzenetküldéses kommunikáció is igényel osztott változót: ilyenkor ugyanis egy Queue osztálybeli objektumot osztott változón keresztül kell, hogy lásson a két szál.
Kölcsönös kizárásra a Ruby a Mutex osztályt ajánlja. Ez egy speciális szemafor, de általános szemaforokat a nyelv nem bocsát a rendelkezésünkre (a programozó készíthet ilyet a Mutex osztály felhasználásával). A Mutex típusú objektumok két lényeges metódusa a lock és az unlock, melyeket a szál kritikus szakaszának elején és végén kell hívni.
A következő séma használata ajánlott:
begin
m.lock
#
# ...a kritikus szakasz...
#
ensure
m.unlock
end
Monitorok és hasonló magas szintű nyelvi elemek nincsenek.
Mivel a kommunikáció osztott változókon keresztül történik és a szálak az őket létrehozó szálakkal hierarchikus rendszerbe szerveződnek, nincs szükség a szálak megnevezésére. Rubyban a folyamat azonosítja a kommunikációs közeget.
Szálak és processzek
A Ruby tehát alapvetően kétféle megoldást nyújt a program különböző részeinek "egyidejű" futtatására. Nyelvi szinten: el lehet ágazni a programon belül, több szálat létrehozva. Operációs rendszer szinten: fel lehet osztani a feladatokat a különböző programok között, több processz használatával. Nézzük ezeket a módszereket külön-külön.
Többszálúsítás (Multithreading)
Általában a legegyszerűbb módja a párhuzamosításnak a Ruby thread-ek használata. Ezek a szálak teljes mértékben egy processzhez tartoznak, megvalósításuk a Ruby interpreterben van implementálva. Ezáltal a Ruby szálak teljes mértékben portálhatók - nincs támaszkodás az operációs rendszerre - ám ekkor le kell mondanunk a natív szálak bizonyos előnyeiről. Előfordulhat a szál kiéheztetése (amikor is egy alacsony prioritású szál nem kap lehetőséget a futásra). Ha egy szál holtpontba kerül, az egész processz futása leállhat. Ha egy szál az operációs rendszernek egy olyan rutinját hívja meg, amely hosszabb idő alatt fejeződik be, akkor az összes szál várakozni fog, amíg az interpreter nem kapja vissza az irányítást. Bár ezek a problémák első látásra riasztónak tűnnek, a Ruby-szálak használata könnyű és hatékony módja a párhozamosításnak.
Ruby-szálak létrehozása
Egy új szál létrehozása igen egyszerű. Az alábbi programrészlet több weboldalt tölt le párhuzamosan. A program minden egyes lehíváshoz létrehoz egy új szálat, amely a http-átvitelt kezeli.
require 'net/http'
pages = %w( www.rubycentral.com
www.awl.com
www.pragmaticprogrammer.com
)
Eredmény:
Fetching: www.rubycentral.com
Fetching: www.awl.com
Fetching: www.pragmaticprogrammer.com
Got www.rubycentral.com: OK
Got www.pragmaticprogrammer.com: OK
Most vizsgáljuk meg a kódot részletesen.
Új szálak a Thread.new hívással jönnek létre. Ez egy olyan blokkot kap paraméterként, amely az új szálban futó kódot tartalmazza. Esetünkben a blokk a net/http könyvtárat használja a felsorolt webhelyek nyitóoldalának letöltéséhez. A nyomkövetésünk tisztán mutatja, hogy ez párhuzamosan történik.
A szál létrehozásakor a kért címet adjuk át paraméterként. Ez a myPage nevű változóban adódik át a blokknak. Miért nem használjuk inkább egyszerűen a page változó értékét a blokkon belül?
Egy szál alapból lát minden olyan globális, példányszintű és lokális változót, amely a szál indulásakor már létezik. Bárki, akinek kistestvére van, elmondhatja, hogy a megosztás nem mindig jó dolog. Esetünkben mind a három szál látja a page változót. Az első szál elindul és a page értéke http://www.rubycentral.com lesz.
Ugyanakkor a ciklus, amely elindította a szálat, még mindig fut és a második menetben a page értéke http://www.awl.com-ra változik. Az első szál még nem futott le és miközben használja a page változót, az hirtelen új értéket kap. Az ebből fakadó hibákat utólag nehéz korrigálni.
Ugyanakkor a szál blokkján belül létrehozott lokális változók tényleg lokálisak - minden szál saját példánnyal rendelkezik belőlük. Példánkban a myPage változó a szál indulásakor jön létre és minden szálnak saját példánya lesz belőle.
Szálak kezelése
még egy ravaszságot vehetünk észre az utolsó sorban. Miért hívjuk meg a join eljárást minden létrehozott szál végén?
Amikor egy Ruby-program terminál, minden futó szál befejeződik - függetlenül attól, hogy éppen milyen állapotban van. Ugyanakkor meg is várhatjuk a szál befejeződését a szál Thread#join metódusának meghívásával. Ekkor a hívó szál blokkolódni fog, amíg a megadott blokk futása be nem fejeződik. A join minden szál végén való meghívásával biztosíthatjuk, hogy mindhárom szál lefusson, mielőtt a program terminálna.
A joinon kívül még van néhány hasznos rutin a szálak kezelésére. Mindenekelőtt az aktuális thread mindig elérhető a Thread.current változón keresztül. Az összes szál listája lekérhető a Thread.list használatával, amely tartalmaz minden olyan Thread-objektumot, amelynek állapota futtatható(runnable) vagy leállt(stopped). Egy szál állapotának lekérdezéséhez a Thread#status és a Thread#alive? metódusokat használhatjuk.
Egy szál prioritását a Thread#priority= beállításával változtathatjuk meg. A nagyobb prioritású szálak előbb lefutnak mint az alacsonyabb prioritású szálak. Később még lesz szó a szálak ütemezéséről, indításáról és leállításáról.
Szálváltozók
Ahogy az már az előző pontban említésre került, egy szál alapból elér minden változót abban a scope-ban, amelyből létrehozták. A szálhoz tartozó blokk lokális változói csak a szálon belül láthatók, kívülről nem lehet őket elérni.
De mi van akkor, ha egy olyan szálhoz tartozó változót akarunk létrehozni, amelyet más szálakból is el tudunk érni, beleértve a fő szálat? A Thread osztály lehetővé tesz egy speciális módszert, amely megengedi a lokális változókra névvel történő hivatkozást. A szálobjektumot Hash-ként kezelve a []= szintaxissal írhatunk a változóba és a [] használatával olvashatunk belőle. Az alábbi példában minden szál eltárolja a count változó aktuális értékét egy lokális mycount változóban. (Ebben a kódban már van versenyhelyzet, de még nem esett szó szinkronizációról, ezért ezt egyelőre hagyjuk figyelmen kívül.)
count = 0
arr = []
10.times do |i|
arr[i] = Thread.new {
sleep(rand(0)/10.0)
Thread.current["mycount"] = count
Eredmény:
8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10
A fő szál megvárja, amíg a többi szál befejeződik, ezután véletlenszerűen meghatározott ideig vár, majd kiírja az általuk eltárolt count-értéket.
Szálak és kivételek
Mi történik, ha egy szálban kiváltódik egy kivétel és nem kezelődik le? Ez attól függ, hogy az abort_on_exception attribútum értéke mire van beállítva.
Ha hamis, akkor alapértelmezett módon egy lekezeletlen kivételszál befejeződését jelenti. A többi szál tovább fut. Az alábbi példában a 3-as számú szál elszáll és semmilyen outputot nem produkál. Ugyanakkor a többi szál követését továbbra is látjuk.
threads = []
6.times { |i|
threads << Thread.new(i) {
raise "Boom!" if i == 3
puts i
}
}
Eredmény:
01
2
45prog.rb:4: Boom! (RuntimeError)
from prog.rb:8:in `join'
from prog.rb:8
from prog.rb:8:in `each'
Ha az abort_on_exception értéke igaz, akkor egy lekezeletlen kivétel az összes szálat megszakítja. Miután a 3-as szál megszakadt, a program nem ad több outputot.
Thread.abort_on_exception = true
threads = []
6.times { |i|
threads << Thread.new(i) {
raise "Boom!" if i == 3
puts i
Eredmény:
01
2
prog.rb:5: Boom! (RuntimeError)
from prog.rb:7:in `initialize'
from prog.rb:7:in `new'
from prog.rb:7
3. Szinkronizáció
A szálütemező (Thread Scheduler) kezelése
Egy jól megtervezett alkalmazásban csak hagyni kell a szálakat, hogy magukban fussanak. Időfüggőségeket beleépíteni egy többszálú programba általában célszerűtlen.
Időnként azonban szükség van a szálak futásának befolyásolására. Például ha egy zenegép egy szála produkálja a fényhatásokat, szükség lehet ennek a szálnak az ideiglenes felfüggesztésére, amikor a zene megáll. Egy másik példa a klasszikus termelő-fogyasztó kapcsolat - ha a termelő lemarad, akkor a fogyasztónak várnia kell.
A Thread osztály ad néhány metódust a szálütemező kezelésére. A Thread.stop meghívásával megállíthatjuk az aktuális szálat, míg a Thread#run folytatja megadott szál futtatását. A Thread.pass kiütemezi az aktuális szálat, hogy csak a többi fusson, a Thread#join és a Thread#value pedig felfüggeszti a hívó szálat, amíg a megadott szál be nem fejeződik.
A fenti metódusok bemutatására szolgál az alábbi – egyébként teljesen céltalan - program.
t = Thread.new { sleep .1; Thread.pass; Thread.stop; }
t.status > "sleep"
t.run
t.status > "run"
t.run
t.status > false
Kizárólag ezeknek a metódusoknak a használatával nem garantált, hogy megfelelő lesz a szinkronizáció, mindig felléphetnek bosszantó versenyhelyzetek (különösen osztott változók használatakor) – márpedig ez megnyújtja a debugolási időt. Szerencsére a szálak rendelkeznek még egy fontos képességgel - a kritikus szekció megvalósításának lehetőségével. Ennek felhasználásával már megvalósítható a biztonságos szinkronizáció.
Kölcsönös kizárás
A többi szál kizárásának legalacsonyabb szintű módszere egy globális "kritikus szál" állapotának (condition) használatán alapul. Ha az állapot értéke igaz (beállítás a Thread.critical= szintaxissal), akkor az ütemező nem fog egyetlen más szálat sem futtatni. Ez azonban nem akadályozza meg újabb szálak létrehozását és futtatását. Bizonyos száloperátorok (mint a stop vagy a kill, az aktuális szálon való altatás vagy kivétel)
eredményeképpen a kritikus szakaszban is beütemeződhet egy másik szál.
A Thread.critical= használatára a Ruby több alternatívát is megenged. Ezek közül a két legjobban használható a Mutex és a ConditionVariable osztály, amelyek a thread könyvtármodulban találhatók.
A Mutex osztály
A Mutex osztály egy egyszerű szemafort implementál, amellyel megvalósítható a kölcsönös kizárás valamely osztott erőforrásra - vagyis egyszerre csak egy szál használhatja az adott erőforrást. A többi szál vagy várakozik, amíg a szemafor szabad nem lesz, vagy azonnali visszajelzést kap a foglaltságról.
A Mutex osztályt gyakran használják olyan esetekben, ahol egy osztott adat beírása elemi művelet kell, hogy legyen. Tegyük fel, hogy két változót kell felülírni egy tranzakció során. Ezt szimulálhatjuk egy triviális programmal, amely számlálókat növel. A felülírásnak eleminek kell lennie - a külvilág nem találhat a számlálókban egymástól eltérő értékeket. Mutex használata nélkül ez nem megy.
count1 = count2 = 0
difference = 0
counter = Thread.new do
loop do
count1 += 1
count2 += 1
end
A példában látszik, hogy a "kém" szálunk igen sokszor futott le úgy, hogy a count1 és a count2 értéke különböző volt. Szerencsére Mutex-szel ez javítható.
require 'thread'
mutex = Mutex.new
count1 = count2 = 0
difference = 0
counter = Thread.new do
loop do
mutex.
Eredmény:
sleep 1
mutex.lock
count1 > 21192
count2 > 21192
difference > 0
Ha minden osztott változóhoz egy mutexet rendelünk, biztosak lehetünk a konzisztenciában. Sajnos - amint az a számokból is látható - ez jelentős teljesítménycsökkenéssel is jár.
Állapotváltozók (Condition Variables)
Néha a mutex használata sem elég ahhoz, hogy megvédjünk egy kritikus adatot. Tegyük fel, hogy kritikus szakaszban vagyunk, de várnunk kell valamilyen erőforrás felszabadulására. Ha a szál elalszik, miközben erre az erőforrásra vár, előfordulhat, hogy egyetlen más szál sem lesz képes felszabadítani az erőforrást, mert nem léphet kritikus szakaszba - azt még mindig az eredeti szál tartja zárva. Ideiglenesen fel kell oldani a kritikus szakaszt és közölni a többiekkel, hogy egy erőforrásra várunk. Amikor az erőforrás felszabadult, rögtön meg kell tudni szerezni és visszatérni a kritikus szakaszba - egy lépésben.
Itt jönnek be az állapotváltozók. Egy állapotváltozó egy egyszerű szemafor, amely egy erőforráshoz van hozzárendelve és védettségét egy mutex biztosítja. Ha egy olyan erőforrásra van szükség, amely éppen foglalt, akkor a hozzá rendelt állapotváltozóra kell várni. Amikor egy másik szál jelzi, hogy az erőforrás felszabadult, az eredeti szál befejezi a várakozást és egyidejűleg a kritikus szakaszba is visszatér.
require 'thread'
mutex = Mutex.new
cv = ConditionVariable.new
a = Thread.new {
mutex.synchronize {
puts "A: Kritikus szakaszban
Eredmény:
A: Kritikus szakaszban vagyok, de várok cv-re(Később a tanyán...)
B: Most kritikus szakaszban vagyok, de már végeztem cv-vel
B: Még mindig kritikus szakaszban vagyok, befejezés
Egyéb szinkronizációs módszerek implementációja megtalálható a monitor.rb és a sync.rb könyvtárakban, a Ruby disztribúció lib alkönyvtárában.
Több processz futtatása
Néha egy taskot több processzre akarunk bontani - vagy éppen egy olyan külön processzt akarunk futtatni, amely nem Rubyban íródott. Semmi gond: Rubyban több metódus is van, amellyel processzeket lehet indítani és kezelni.
Új processz indítása
Több módja is van egy processz indításának, a legegyszerűbb egy parancs futtatása és a futás végének megvárása. Ezt általában akkor használjuk, ha egyetlen parancsot akarunk lefuttatni vagy az operációs rendszertől akarunk valamit lekérdezni. A Ruby ehhez a feladathoz a system és a fordított aposztróf eljárásokat kínálja.
system("tar xzf test.tgz") > tar: test.tgz: Cannot open: No such file or directory\n
tar: Error is not recoverable: exiting now\n
A Kernel::system metódus hajtja végre a megadott parancsot egy alprocesszben; visszatérési értéke true, ha a parancs értelmezhető volt és végre is hajtódott, ellenkező esetben false. Sikertelenség esetén az alprocessz kilépési kódja megtalálható a $? globális változóban.
A system eljárás egyik problémája, hogy a parancs kimenete egyszerűen a programunk kimenetére fog irányítódni és nem biztos, hogy nekünk erre van szükségünk. Egy alprocessz standard outputjának kiolvasásához használhatjuk a fordított aposztrófokat, mint a `date` esetében az előző példában. Érdemes megjegyezni, hogy esetleg szükség lehet a String#chomp metódusra a sorvége karakterek kiszűréséhez.
Eddig egyszerű esetekkel foglalkoztunk - futtathatunk egy másik processzt és megkapjuk az eredményét. Néha azonban valamivel több irányítást szeretnénk megkövetelni. Szeretnénk kommunikálni az alprocesszel, esetleg adatokat küldeni neki és fogadni tőle. Az IO.popen metódus pontosan erre ad lehetőséget. A popen metódus alprocesszként elindít egy parancsot és a parancs standard inputját és outputját átirányítja egy IO objektumba. Ha írunk ebbe az objektumba, az alprocessz azt be tudja olvasni a standard inputjáról és minden, amit az alprocessz kiír, kiolvasható az objektumból.
Például rendszerünk egyik hasznos programja a pig, ami a standard inputról beolvasott angol szavakat igpay atinlay nyelven írja ki. (Roppant hasznos, ha nem akarjuk, hogy a hétéves gyermekünk megértse, amit a programunk kiír...)
pig = IO.popen("pig", "w+")
pig.puts "ice cream after they go to bed"
pig.close_write<
puts pig.gets
Eredmény:
iceway eamcray afterway eythay ogay otay edbay
Ez a példa egyszerre példázza az alprocesszek pipe-okon keresztül történő kezelésének egyszerű megvalósítását és gyakorlati bonyolultságát. A kód valóban elég egyszerűnek látszik: megnyitjuk a pipe-ot, írunk bele, majd kiolvassuk a választ. Ám kiderül, hogy a pig nem üríti ki az outputot, amelyre ír. Az eredeti megközelítésünk ebben a példában, amelyben egy pig.puts után rögtön egy pig.gets hívás szerepelt, beragadt, mert a pig ugyan feldolgozta a bemenetét, de a választ nem írta bele a pipe-ba. Ezért be kellett szúrnunk egy pig.close_write utasítást. Ez egy sorvége jelet küld a pig standard inputjára és az output, amelyre várunk, ürítésre kerül, amikor a pig terminál.
Van még egy különlegessége a popen metódusnak. Ha a parancs, amelyet átadunk, egyetlen mínuszjel ("-"), a popen elindít egy új Ruby interpreter processzt. Mind az eredeti, mind az új interpreter folytatja a futást a popen-ből való visszatéréssel. Az eredeti processz egy IO-objektumot kap vissza, míg az új nilt.
pipe = IO.popen("-","w+")
if pipe
pipe.puts "Szerezz állást!"
$stderr.puts "Fiú mondja '#{pipe.gets.chomp}'"
else
$stderr.puts "Apa"
Eredmény:
Apa mondja 'Szerezz állást!'
Fiú mondja 'OK'
A popen mellett olyan megszokott Unix-hívások is elérhetőek, mint a Kernel::fork, Kernel::exec és az IO.pipe, ha a rendszer támogatja ezeket. Több IO-metódusnál és a Kernel::open esetén is a Ruby egy új alprocesszt hoz létre, ha a file nevének első karaktere egy "|" (pipe). Megjegyzendő, hogy pipe-ot nem lehet létrehozni File.new paranccsal, az csak valódi file-okra működik.
Független gyerekprocesszek
Előfordulhat, hogy nincs szükségünk ilyen szoros együttműködésre: megmondjuk az alprocessznek, hogy mi a dolga, aztán magára hagyjuk és folytatjuk a saját tennivalónkat, majd később ellenőrizzük, hogy befejeződött-e. Például ha egy hosszú rendezést akarunk lefuttatni.
exec("sort testfile > output.txt") if fork == nil
# most fut a rendezés mint gyerekprocessz
# folytassuk a futást a főprogramban
# várjuk meg amíg a rendezés lefut
A Kernel::fork egy processz-azonosítót ad vissza a szülőnek és nilt a gyereknek, így a gyerekprocessz fogja elvégezni a Kernel::exec hívást és lefuttatni a rendezést. Később meghívjuk a Process::wait-et, amely megvárja, amíg a rendezés lefut (és visszaadja a processz-azonosítót).
Ha várakozás helyett inkább jelzést szeretnénk kapni a gyerek befejeződéséről, akkor beállíthatunk egy jelzéskezelőt a Kernel::trap metódussal. Itt egy SIGCLD jelzéskezelőt állítunk be, amely a gyerek processz terminálását kezeli le.
trap("CLD") {
pid = Process.wait
puts "Child pid #{pid}: terminated"
exit
}
exec("sort testfile > output.txt") if fork == nil
Eredmény:
Child pid 31842: terminated
Blokkok és alprocesszek
Az IO.popen ugyanúgy működik egy blokkal, mint a File.open metódus. Ha a popen paraméterben kap egy parancsot, mint például date, akkor az IO-objektum megkapja a blokkot is paraméterként.
IO.popen ("date") { |f| puts "Date is #{f.gets}" }
Eredmény:
Date is Sun Jun 9 00:08:50 CDT 2002
Az IO-objektum automatikusan lezáródik, amikor a blokk kilép, akárcsak a File.open esetében.
Ha a Kernel::fork-hoz rendelünk hozzá egy blokkot, akkor a blokkban lévő kód fog lefutni az alprocesszben, a szülő futása pedig a blokk után folytatódik.
fork do
puts "In child, pid = #$$"
exit 99
end
pid = Process.wait
puts "Child terminated, pid = #{pid}, exit code = #{$? >> 8}"
Eredmény:
In child, pid = 31849
Child terminated, pid = 31849, exit code = 99
Egy utolsó megjegyzés: a $?-ben található kilépési kódot azért csúsztatjuk 8 bittel jobbra kiírás előtt, mert az alsó 8 bit tartalmazza a program kilépésének okát és a felső 8 bit a valódi kilépési kódot. Ez a Posix-rendszerek egyik jellegzetessége.