A Scala programozási nyelv

Párhuzamosság

Szignálok és monitorok

A monitor valósítja meg a kölcsönös kizárást a Scala-ban. Ez a következőképpen van definiálva:

class Monitor with { def synchronized [a] (def e: a): a; }

A synchronized metódus végrehajtja a paraméterként átadott számítást kölcsönös kizárással.
Szálak felfüggesztésre kerülhetnek a monitor belsejében, ha egy szignálon várakoznak. A Signal osztály biztosít egy notify és egy wait metódust. Az a folyamat, amely wait-et hívott egészen addig vár, amíg egy másik szál meg nem hívja a notify-t ugyanebben a monitorban. A specifikáció a következő:

class Signal with { def wait: Unit; def wait(msec: Long): Unit; def notify: Unit; def notifyAll: Unit; }

A szignál egy időzített formáját is megvalósítja a wait metódusnak. Ez maximum a megadott ideig várakozik. A notifyAll az összes várakozó szálat felébreszti. A Signal és a Monitor primitív osztályok a Scala-ban.
Egy példa monitorra:

class BoundedBuffer[a](N: int) extends Monitor with { var in = 0; out = 0; n = 0; val elems = new Array[a](N); val nonEmpty = new Signal; val nonFull = new Signal; def put(x: a) = synchronized { if (n == N) nonFull.wait; elems(in) = x; in = (in + 1) % N; n = n + 1; if (n == N - 1) nonFull.send; x } }

A használó program:

val buf = new BoundedBuffer[String](10) fork { while (True) { val s = produceString; buf.put(s) } } fork { while (True) { val s = buf.get; consumeString(s) } }

A fork metódus létrehoz egy új szálat, amely a paraméterként megadott kifejezés kiértékelését hajtja végre:

def fork(def e: Unit) = { val p = new Thread with { def run = e; } }

SyncVar-ok

A szinkronizált változók egy get és egy put műveletet biztosítanak a változó értékének olvasására és beállítására. A get egészen addig blokkolódik, amíg a változó értéke nem definiált. Az unset művelet a változót definiálatlan állapotba állítja.
Implementációjuk:

class SyncVar[a] extends Monitor with { private val defined = new Signal; private var isDefined: Boolean = False; private var value: a; def get = synchronized { if (!isDefined) defined.wait; value } def set(x: a) = synchronized { value = x; isDefined = True; defined.send; } def isSet: Boolean = isDefined; def unset = synchronized { isDefined = False; } }

Future-ök

A future egy olyan érték, amit több más kliens állít elő párhuzamosan. Egy jó megoldás párhuzamosan feldolgozható erőforrások használatára. Egy tipikus használat:

val x = future(someLengthyComputation); anotherLengthyComputation; val y = f(x()) + g(x());

Implementációjuk:

def future[a](def p: a): (Unit)a = { val result = new SyncVar[a]; fork { result.set(p) } (=> result.get) }

A future metódus paraméterként kap egy p számítást, melyet el kell végezni. A típus tetszőleges lehet, ezt a típus paraméter határozza meg. Létrejön egy új szál, ami kiszámítja az eredményt, és meghívja a resultot, ha készen van. Ezzel a szállal párhuzamosan a függvény visszaad egy névtelen függvényt a típussal. Meghíváskor ez a függvény vár a result meghívására, és ekkor visszaadja az eredményt. Ezzel párhuzamosan a függvény újból meghívja a result-ot ugyanazzal az argumentummal.

Szemaforok

A folyamatok szinkronizálására egy lehetőség a lock. Ez két atomi műveletet tesz szükségessé: foglalás és elengedés. A lock implementációja a Scala-ban:

class Lock extends Monitor with Signal with { var available = True; def acquire = { if (!available) wait; available = False } def release = { available = True; notify } }

Írók-olvasók

Egy összetettebb szinkronizációs formát igényel az írók és olvasók problémája. A szinkronizációhoz szükségünk van startRead, startWrite, endRead, endWrite műveletekre. Az író műveleteknek prioritásuk van az olvasó műveletekkel szemben.
Az üzenettér elvre alapozva egy lehetséges megoldás:

class ReadersWriters with { val m = new MessageSpace; private case class Writers(n: Int), Readers(n: Int); Writers(0); Readers(0); def startRead = m receive { case Writers(n) if n == 0 => m receive { case Readers(n) => Writers(0); Readers(n+1); } } def startWrite = m receive { case Writers(n) => Writers(n+1); m receive { case Readers(n) if n == 0 => } } def endRead = receive { case Readers(n) => Readers(n-1) } def endWrite = receive { case Writers(n) => Writers(n-1); if (n == 0) Readers(0) } }

Aszinkron csatornák

Az implementációhoz szükség van a láncolt listák bevezetésére:

class LinkedList[a](x: a) with { val elem: a = x; var next: LinkedList[a] = null; }

A csatorna osztálya egy láncolt listát használ az adatok tárolására, melyeket még nem olvastak ki. Van egy moreData szignál, mely az adatra várakozó szálakfelébresztésére használható:

class Channel[a] with { private val written = new LinkedList[a](null); private var lastWritten = written; private val moreData = new Signal; def write(x: a) = { lastWritten.next = new LinkedList(x); lastWritten = lastWritten.next; moreData.notify; } def read: a = { if (written.next == null) moreData.wait; written = written.next; written.elem; } }

Szinkron csatornák

Az üzenet küldője blokkolódik, míg az üzenet meg nem érkezik a címzetthez. Csak egy változóra van szükség, de három szignál koordinálja a működést:

class SyncChannel[a] with { val data = new SyncVar[a]; def write(x: a): Unit = synchronized { val empty = new Signal, full = new Signal, idle = new Signal; if (data.isSet) idle.wait; data.put(x); full.send; empty.wait; data.unset; idle.send; } def read: a = synchronized { if (!(data.isSet)) full.wait; x = data.get; empty.send; x } }

Üzenetterek

Magas szintű konstrukció a kommunikáció és a szinkronizáció megvalósítására. Az üzenet egy tetszőleges objektum. Van egy speciális TIMEOUT üzenet is:

case class TIMEOUT;

Üzenetterek a következő szignatúrájúak:

class MessageSpace with { def send(msg: Any): Unit; def receive[a](f: PartialFunction[Any, a]): a; def receiveWithin[a](msec: Long)(f: PartialFunction[Any, a]): a; }

Az üzenettér állapotát az üzenetek halmaza határozza meg. A send metódussal küldhetünk új üzenetet, a receive-vel lehet kiolvasni és törölni őket. Utóbbi esetében az üzenet az f üzenet feldolgozóhoz kerül, ami egy parciális függvény az üzenetek teréről egy tetszőleges típushalmazra. Ez tipikusan egy mintaillesztést jelent. A receive blokkolódik, amíg egy megfelelő üzenet rendelkezésre nem áll.
Egy példa a használatra:

class OnePlaceBuffer with { private val m = new MessageSpace; private case class Empty, Full(x: Int); m send Empty; def write(x: Int): Unit = m receive { case Empty => m send Full(x) } def read: Int = m receive { case Full(x) => m send Empty; x } }

Egy lehetséges implementáció:

class MessageSpace with { private abstract class Receiver extends Signal with { def isDefined(msg: Any): Boolean; var msg = null; }

A fogadók számára definiálunk egy közbülső osztályt az isDefined metódussal, mely megmutatja, hogy a fogadó definiált-e egy adott üzenetre. A fogadó örököl egy notify metódust a Signal osztálytól, amelyet a fogadó felébresztésére lehet használni. Az msg változó használható az üzenet tárolására:

private val sent = new LinkedList[Any](null); private val lastSent = sent; private var receivers = new LinkedList[Receiver](null); private var lastReceiver = receivers;

Két láncolt listát alkalmaz az üzenettér. Egyet a küldött, de még nem fogadott üzeneteknek, egyet a várakozó fogadóknak.
A send metódus először ellenőrzi, hogy van-e megfelelő várakozó. Ha van, akkor a fogadó értesítésre kerül, egyébkén az üzenet a lista végére kerül:

def send(msg: Any): Unit = synchronized { var r = receivers, r1 = r.next; while (r1 != null && !r1.elem.isDefined(msg)) { r = r1; r1 = r1.next; } if (r1 != null) { r.next = r1.next; r1.elem.msg = msg; r1.elem.notify; } else { l = new LinkedList(msg); lastSent.next = 1; lastSent = 1; } }

A receive metódus először ellenőrzi, hogy az üzenetre alkalmazható-e a függvény. Ha igen, akkor megtörténik az alkalmazás. Ha nem, akkor egy másik fogadó jön létre és a lista végére kerül:

def receive[a](f: PartialFunction[Any, a]): a = { val msg: Any = synchronized { var s = sent, s1 = s.next; while (s1 != null && !f.isDefined(s1.elem)) { s = s1; s1 = s1.next } if (s1 != null) { s.next = s1.next; s1.elem } else { val r = new LinkedList( new Receiver with { def isDefined(msg: Any) = f.isDefined(msg); }); lastReceiver.next = r; lastReceiver = r; r.elem.wait; r.elem.msg } } f(msg) }

Az üzenettérnek van egy receiveWithin metódusa is, ami csak egy adott ideig blokkolódik. Ha nincs megfelelő üzenet adott időn belül, akkor egy TIMEOUT üzenet megy a feldolgozónak:

def receiveWithin[a](msec: Long)(f: PartialFunction[Any, a]): a = { val msg: Any = synchronized { var s = sent, s1 = s.next; while (s1 != null && !f.isDefined(s1.elem)) { s = s1; s1 = s1.next; } if (s1 != null) { s.next = s1.next; s1.elem } else { val r = new LinkedList( new Receiver with { def isDefined(msg: Any) = f.isDefined(msg); } ) lastReceiver.next = r; lastReceiver = r; r.elem.wait(msec); if (r.elem.msg == null) r.elem.msg = TIMEOUT; r.elem.msg } } f(msg) } } //end MessageSpace

Aktorok

Programozási nyelvekben hagyományosan szálak segítségével történik a párhuzamosság megvalósítása. Ebben a modellben a program futását konkurrensen futó taszkokra bontjuk fel. A közös memória használata és egyéb okok miatt számos, már ismert problémája van ennek a megoldásnak. Ilyen probléma, mikor két processz felülírja, vagy egymás után kétszer írja a közös memória egy változóját; zárolás használata esetén pedig felléphet a holtpont probléma.

Szálakkal való megoldáshoz képsest egy más megközelítésen alapul az aktor modell. Ebben a modellben minden objektum egy aktor. Egy ilyen entitás rendelkezik egy postaládával és saját viselkedése van. Az aktorok üzenetet küldhetnek egymásnak, amik a ládájukban lesznek pufferelve. Egy üzenet fogadásakor végrehajtásra kerül az aktor működése, amiben az aktor küldhet üzenetet másoknak, létrehozhat új aktorokat, és megváltoztathatja, hogy reagál a következő üzenetekre. A modell egy fontos tulajdonsága, hogy minden kommunikáció aszinkron módon történik. A küldő tehát nem várja meg, amíg üzenete kézbesítésre kerül, hanem elküldés után rögtön folytatja működését. Az üzenetek érkezési sorrendje nem meghatározott, csak az biztos, hogy megérkeznek.

Második fő tulajdonság, hogy minden kommunikáció kizárólag üzenet-küldéssel történik, nincs megosztott állapot az aktorok közt. Ha egy aktor valamilyen inforációt meg szeretne tudni egy másik aktor belső állapotáról, üzenetet kell küldeni és fogadni az adatok igényléséhez. Ezzel lehetőséget biztosít az állapothoz való hozzáféréshez, anélkül, hogy egy osztott memóriánál fellépő problémába ütköznénk. Ehhez hasonlóan, a belső állapot megváltoztatása is üzeneteken keresztül történik.

Minden aktor konkurresen fut a többi aktorral, tehát kis, önállóan futó processzekként képzelhetjük el őket.

A scala nyelv lehetőséget ad ilyen magasszintű konstrukciók, aktorok használatára. Ennek eszközei az Actor osztályban (trait), illetve a scala.actors könyvtárban találhatók meg. Az aktorok közötti kommunikációnál a megszokott kérés/válasz mintát kell használni.

class ExampleActor extends Actor { def act() = { //inicializalas while (true) { receive { case MessageType1 => {...} case MessageType2 => {...} ... } } }

Egy aktor működésének megadásához az Actor trait act() függvényét kell felüldefiniálni, ez fog elindulni az Actor start() metódusának hatására (hasonlóan a Java Threadhez). Az inicializálás után egy végtelen ciklusba ágyazott receive segítségével tud üzenetekre reagálni az Actor. Ez megfelel az Ada Entry pointjainak, és megvalósítja az úgynevezett active object absztrakciót, amikor az üzenetküldések hasonló hatást váltanak ki az objektumból mint a metódushívások (csak párhuzamos környezetben).
A receive egy parciális függvény melynek argumentuma egy másik függvény, jelen esetben egy mintaillesztés. Ebből is látszik, hogy a Scalaban új nyelvi elemek helyett library-kkel is lehet magasszintű és jól használható konstrukciókat létrehozni.
Az üzenet küldése a ! operátor segítésével történik, illetve van egy olyan beépített lehetőség is a nyelvben, amivel a címzett megadása nélkül egyből válaszolhatunk annak az objektumnak, akitől az épp feldolgozott üzenetet kaptuk. Ez a reply utasítás segítségével történik. Ennek a konstrukciónak megvan az az előnye, hogy nem kell mindig elküldeni a küldőnek „önmagát” is, mert ez implicit módón megtörténik. Hátránya viszont, hogy teljesítmény kritikus alkalmazásoknál nagy overhead-et is jelenthet.

A fenti példában a megoldással csak egyetlen probléma van: a receive szálblokkoló utasítás, vagyis várakozásra kényszeríti azt a Thread-et ahol meghívásra kerül. Ez azért nem jó mert a Scala 4 szállal indítja a threadpoolját, ezen fut az összes aktor (legyen 4 vagy 1000 darab) és egy receive hívás ebből blokkol 1-et. Ez a threadpool konfigurációjával kezelhető ugyan, de következzen egy jobb megoldás.

Az Actor könyvtár tervezői azt szerették volna elérni, hogy a különböző konkurrens paradigmák együtt jelenjenek meg az Actor megvalósításában. Ezért az előbb ismertetett szálblokkoló metódusok mellett lehetőség van szálmentes, az eseménykezelés technikáját követő üzenetfogadásra is. A szintaxis csak némiképp módosul:

class ExampleActor extends Actor { def act() { //inicializálás loop { react { case MessageType1 => {...} case MessageType2 => {...} ... } } } }

Bár az aktor modell a fentieknek megfelelően alapvetően aszinkron kommunikációra épül, a scala aktorjaival van lehetőség szinkron kommunikációra is. Ehhez a ! helyett a !? operátort kell használni. Ekkor az aktor várakozik, amíg nem kap egy a mintaillesztésben illeszkedő üzenetet. Ha egy üzenet nem illeszkedik egyik mintára sem, a pufferben ekkor is eltárolódik, és egy esetleges következő receive blokkban újra megpróbálja illeszteni.

myService !? Msg(value) match { case Response(r) => // ... }

Ahogy az eddigi példákon látható, az üzenetek típusára nem volt különösebb megkötés, bármilyen üzenet – bármilyen objektum – küldhető egy aktornak. Azok az üzenetek, amelyek a case egyetlen ágára sem illeszkednek az aktor fiókjában el lesznek tárolva, nem okoznak hibát, kivételt.
Mindemellett a típusbeli „szabadosság” mellett a scala komplex típusrendszerének köszönhetően van lehetőség típusbiztos kommunikációra is a csatornák segítségével. Ekkor generic-ek segítségével korlátozhatjuk az elfogadni kívánt üzenetek típusát. Egy aktor postaládája egy olyan csatorna, ami tetszőleges típusú üzenetet elfogad.

A Scala különbséget tesz szál alapú és esemény alapú aktorok között. Szál alapú aktorok saját, külön JVM szálat indítanak. Ezek a Java szálütemezésével vannak ütemezve, ami preemptív prioritás alapú ütemezést használ. Szál alapú aktorok lehetővé teszik hosszabb ideig tartó számítások, illetve blokkoló I/O műveletek végrehajtását anélkül, hogy korlátoznák a többi aktor működését. Ennek a módszernek azonban van egy hátülütője: bármelyik szál tekinthető nehezebb szálnak. Ha sok ilyen aktort indítunk el, a virtuális gépnek elfogyhat a memóriája, illetve csökkenhet a teljesítménye az ütemezésben létrejövő overhead miatt. Olyan helyzetekben, amikor ez nem elfogadható, használhatunk esemény alapú aktorokat. Ezek nem szálanként vannak implementálva, hanem egy szálon futnak. Egy ilyen aktor üzenetre való várakozás esetén nem egy blokkolt szálként van reprezentálva, hanem egy kisebb zárolt egységként. Egy ilyen egység rögzíti az aktor aktuális állapotát, majd a félbehagyott számítás az üzenet fogadásakor folytatódhat. Ennek végrehajtása a küldő szálán történik.
Az ilyen típusú aktorok egy könnyebb súlyú alternatívát nyújtanak, ezzel lehetővé téve nagyobb számú aktorok konkurrens futását. Meg kell jegyezni azonban, hogy ez nem használható igazi párhuzamosságra, hiszen ezek mind egy szálon futnak, az ütemezésben nincs egyenrangúság.
Ez utóbbi típusú aktorok használatához a progamozónak receive helyett react blokkot kell használni. Ebben az esetben van egy nagy korlátozás, ugyanis ilyen blokknak nincs „normális” visszatérési értéke.
A nyelv megengedi a programozónak, hogy a kétféle típust vegyesen használja az adott helyzetnek, igényeknek megfelelően.

A nyelv egyik veszélye – ami több másik ponton előnyt jelent – az, hogy az aktorokat objektumorientált programozással vegyíti. Lehtőség van arra, hogy az aktorok belső állapota publikus metódusokon keresztül elérhető, módosítható legyen, azaz direkt módon módosíthatunk egy objektumot, üzenetküldés nélkül. Ebben az esetben le kell mondani az aktor modell által nyújtott biztonságról, tisztaságról.

A Scala elosztott, hálózaton kommunikáló programok készítésére is magasszintű eszközöket nyújt. A RemoteActor segítségével egyszerűen hozhatunk létre, kérdezhetünk le hálózaton elérhető aktorokat. Ahhoz, hogy egy aktor elérhető legyen egy porton láthatóvá kell tenni az alive metódus segítségével.

actor { alive(9010) register('myName, self) // behavior }
Egy hálózaton futó aktort a select segítségével kérhetünk le, aminek ezután az eddig megszokott módon küldhetünk üzetenet.
actor { // ... val c = select(Node("127.0.0.1", 9010), 'myName) c ! msg // ... }