A Haskell programozási nyelv

Párhuzamosság

Párhuzamosság és a konkurencia

Concurrent Haskell

A párhuzamos funkcionális program több processzort használ a hatékonyság növelésére. Például ha az q+r kifejezést akarjuk kiértékelni, gyorsabb ha párhuzamosan kiértékeljük a q-t és r-et párhuzamosan és összeadjuk az eredményeket. Valójában ha egy programot szekvenciálisan vagy párhuzamosan hajtjuk végre, szemantikailag ugyanazt jelentik. Ráadásul az eredmények is determinisztikusak, minden futás során ugyanazt az eredményt kapjuk. Ez elsőre meglepő lehet az imperatív nyelvekhez képest, ám itt ellenben utóbbiakkal él a hivatkozási hely függetlenség (azaz egy azonosító, bárhol van ugyanazt jelenti, nincsenek "igazi változók").

Ellenben a konkurens programban a konkurencia része a specifikációnak. A programnak több különböző feladatot kell megoldania azonos időben, melyek bonyolult interakcióban lehetnek környezetükkel. Futhat több vagy egy processzoron, ez implementációs döntés.

A Concurrent Haskell a Haskell 98 konkurens kiterjesztése, és a GHC implementálja is a Control.Concurrency modulban.

Szálak és forkIO

Konkurens programok írásához a legtöbb programozási nyelv lehetőséget ad vezérlési szálak létrehozásához. Ebben a Haskell sem más, habár a szálak kezelése másképp néz ki.A Haskellben a szál egy olyan IO akció amely függetlenül fut a többi száltól. Létrehozásához a forkIO függvény kell használni:

forkIO :: IO () -> IO ThreadId

Példaképp nézzünk egy webszervert, amely egy végtelen ciklusban kéréseket fogad az accept segítségével (amely Haskell függvény meghívja az Unix accept eljárását)

acceptConnections :: Config -> Socket -> IO () acceptConnections config socket = forever (do { conn <- accept socket ; forkIO (serviceConn config conn) }) accept :: Socket -> IO Connection type Connection = (Handle, SockAddr)

Az accept eredményében a Handle használható majd a klienssel való kommunikációra. Az acceptConnections a forkIO-t használja egy új szál létrehozásához. A forkIO egy IO akciót kap paraméterként, elkezdi konkurensen egy új szálon futtatni, a hívó szál számára pedig azonnal visszatér a létrehozott szál azonosítójával. A C nyelv szálaival szemben itt nem kell az újonnan létrehozott szálnak paramétereket átadni, mert az adott akció már ismeri az összes szükséges változót (például itt a serviceConn cofig conn az akció, amely már birtokolja a conn és config szabad változókat).

Egy szál várakozhat megadott ezredmásodpercig a threadDelay segítségével

threadDelay :: Int -> IO ()

Kommunikáció és MVars

A legegyszerűbb mód információk megosztására szálak között, hogy közös változót használnak. A Concurrent Haskell erre a célra biztosít egy szinkronizált változó típust, az IORef egy szinkronizált változatát, az MVart:

data MVar a -- Abstract newEmptyMVar :: IO (MVar a) takeMVar :: MVar a -> IO a putMVar :: MVar a -> a -> IO ()

Az IORef-hez hasonlóan egy MVar egy hivatkozás egy változtatható területre, amely vagy egy a típusú értéket tartalmaz, vagy üres. A newEmptyMVar létrehoz egy MVart, de ellenben az IORef newIORef-jével az új MVar üres. A putMVar egy üres MVar-ba tesz egy értéket, ha nem üres MVar-ra történt a hívás, a putMVar blokkolja a szálat, amíg egy másik szál ki nem üríti az adott MVar-t. A takeMVar kiveszi az MVar tartalmát, üresen hagyva azt

A webszerveres példát ki lehet egészíteni egyMVar-ral, ami az aktív szálak számát tartalmazza (Ez hasznos lehet például ha célunk az, hogy ha az aktív kapcsolatok száma túl lép egy határt a program másként viselkedjen, például ne fogadjon új kapcsolatokat).

acceptConnections :: Config -> Socket -> IO () acceptConnections config socket = do { count <- newEmptyMVar ; putMVar count 0 ; forever (do { conn <- accept socket ; forkIO (do { inc count ; serviceConn config conn ; dec count}) }) } inc,dec :: MVar Int -> IO () inc count = do { v <- takeMVar count; putMVar count (v+1) } dec count = do { v <- takeMVar count; putMVar count (v-1) }

Ezáltal van egy számlálónk (count). A számláló frissítései, az inc és a dec megszakíthatatlanok, ugyanis a két utasítás között a count üres, így egy ha egy másik szál is meg akarja változtatni a cout-ot, akkor blokkolódik.

Kommunikáció csatornákon

Egy másik típus, a chan egy egyirányú kommunikációs csatornát nyújt két szál között. A típus a Control.Concurrent.Chan

chanExample = do ch <- newChan forkIO $ do writeChan ch "hello world" writeChan ch "now i quit" readChan ch >>= print readChan ch >>= print

A példában az egyik szál két üzenetet ír a csatornába, amit a másik kiolvas, majd kiír. Ha a csatorna üres, a readChan blokkolja az olvasást, amíg nem lesz olvasható érték. A writeChan sohase blokkol, azonnal beírja az üzenetet a csatornába.

Megjegyzések

A legtöbb Haskell tároló osztályhoz hasonlóan az MVar és a Chan laza kiértékelésűek, egyik sem értékeli ki a tartalmát. Ez nem gond, ám sok programozó szigorúnak hiheti ezeket au IO monádból következtetve. Eme tudás hiánya pedig könnyen vezet hely vagy futási-idő pazarláshoz. Ugyanis ha a célunk egy bonyolult számítás másik szálon való futtatása, akkor a forkIO.t használva eme számítást elrakva egy MVar változóba, majd visszaolvasva ugyanazt a kiszámítandó kifejezést olvassuk vissza. Ezért biztosítani kell az indított szálban a szigorúságot. Illetve a Hackage nevű internetes szabad könyvtár gyűjteményben megtalálható ezen típusok szigorú változata.

A Chan mérete nem korlátozott. Mivel a writeChan mindig sikeresen és azonnal visszatér, ezért a Chan használata veszélyt rejt magában. Ha az egyik szál jóval gyakrabban ír mint a másik olvas, akkor a csatorna a memória túlcsordulásáig tud nőni.

Habár a Haskell másmilyen műveleteket használ a szálak közötti információ cserére, a konkurens programok szokásos problémái megmaradnak. Továbbra is lehetséges holtpont, mivel az üres MVar olvasása blokkolja a futást. Más nyelvekhez hasonlóan itt is használható ez ellen például az erőforrások bizonyos előre adott sorrend szerinti igénylése. Ám ez egy nehezen detektálható nemdeterminisztikus hiba. Ugyanúgy fennáll a kiéheztetés veszélye.

Szerencsére a konkurencia API-ja egyre inkább fejlődik. Például egy egész új nyelvi kiegészítés az STM (Software Transactional Memory) egyszerre nyújt egyszerűbb és biztonságosabb felületet.

Software Transactional Memory (STM)

A konkurens programok hagyományos szál alapú modelljében adatokat osztunk meg a szálak között, zárakkal oldjuk meg a konzisztenciát, és feltételes változókkal értesítjük a szálakat a változásokról. A Haskell MVar módszere javítja valamelyest ezeket az eszközöket, de továbbra is fennállnak ugyanazok a problémák.

Ezen problémák a legkisebb konkurens programokban is gyakran felmerülnek, és ezek a problémák csak gyakoriabbak nagyobb kódok, vagy nagyobb terhelés miatt.

Alapok

Az STM által nyújtott egyszerű, de erős eszközökkel a legtöbb felmerült problémára megoldható. Az atomically kombinátor segítségével egy akciósorozatot tranzakcióként tudunk futtatni. Amint egy ilyen blokkba lépünk a többi szál nem látja az általunk végzett módosításokat, és a mi szálunk sem látja a többiek által végzett módosításokat. Ez a két tulajdonság azt jelenti, hogy a szálunk izolált.

A tranzakcióból való kilépés után

Azaz ez a "mindent vagy semmit" természete az atomically blokkoknak biztosítja az atomiságot (innen a kombinátor neve). Aki dolgozott tranzakciót támogató adatbázisokkal, annak számára az STM-mel való munka ismerősnek tűnhet.

Néhány egyszerű példa

A multi-player RPG játékokban a karakterek rendelkeznek életerővel, tárgyakkal, pénzzel. Az STM megismerését a játék karaktereinek állapotával dogozó függvények és típusok létrehozásával kezdjük, majd az API mélyebb megismeréséhez finomítjuk a kódot

Az STM API elérhető az stm csomag segítségével, melynek moduljai a Control.Concurrent.STM-ben találhatóak

{-# LANGUAGE GeneralizedNewtypeDeriving #-} import Control.Concurrent.STM import Control.Monad data Item = Scroll | Wand | Banjo deriving (Eq, Ord, Show) newtype Gold = Gold Int deriving (Eq, Ord, Show, Num) newtype HitPoint = HitPoint Int deriving (Eq, Ord, Show, Num) type Inventory = TVar [Item] type Health = TVar HitPoint type Balance = TVar Gold data Player = Player { balance :: Balance, health :: Health, inventory :: Inventory }

A TVar paraméteres típus egy olyan mutable változó, amelyet írni és olvasni lehet atomically blokkban. Az egyszerűség kedvéért a játékos készlete tárgyak listája. A newtype deklarációk célja a típusok véletlen összekeverésének elkerülése (mindkettő Int lenne).

Két Balance közötti pénzmozgáshoz csupán az összes TVar értékét kell átállítani.

basicTransfer qty fromBal toBal = do fromQty <- readTVar fromBal toQty <- readTVar toBal writeTVar fromBal (fromQty - qty) writeTVar toBal (toQty + qty)

A fenti kód kipróbálásához egy kis függvény:

transferTest = atomically $ do alice <- newTVar (12 :: Gold) bob <- newTVar 4 basicTransfer 3 alice bob liftM2 (,) (readTVar alice) (readTVar bob)

ghci-ben futtatva a várt eredményt kapjuk:

ghci> :load GameInventory [1 of 1] Compiling Main ( GameInventory.hs, interpreted ) Ok, modules loaded: Main. ghci> atomically transferTest Loading package array-0.1.0.0 ... linking ... done. Loading package stm-2.1.1.0 ... linking ... done. (Gold 9,Gold 7)

Az atomicitás és az izoláció garantálja, hogy ha egy szál látja bob egyenleg változását, akkor látnia kell alice egyenlegén is a módosítást.

Még egy konkurens program esetén is törekszünk, hogy a kód lehető legnagyobb része tisztán funkcionális legyen. Mivel így a kód egyszerűbben érthető, és egyszerűbben tesztelhető. Ráadásul ezáltal az STM motornak kevesebb munkát kell végezni, hiszen az érintett adatok kezelése nem tranzakciós. Az alábbi tisztán funkcionális függvény, amely eltávolít egy elemet a játékos készletéből.

removeInv :: Eq a => a -> [a] -> Maybe [a] removeInv x xs = case find (== x) xs of Nothing -> Nothing Just _ -> Just (delete x xs)

Az eredmény használja a Maybe-t, azaz meg tudjuk mondani, hogy a tárgy a játékos készletében volt-e.

Alább egy tranzakciós függvény található, mely az egyik játékostól elvett tárgyat átadja egy másik játékosnak. Ezt bonyolítja, hogy meg kell állapítani, hogy az átadandó tárgy az átadónál van-e.

maybeGiveItem item fromInv toInv = do fromList <- readTVar fromInv case removeInv item fromList of Nothing -> return False Just newList -> do writeTVar fromInv newList destItems <- readTVar toInv writeTVar toInv (item : destItems) return True

STM és biztonság

Ha már adottak az atomi és izolált tranzakciók, fontos hogy se szándékosan, se véletlenül ne tudjunk kilépni egy atomically blokkból. A Haskell típusrendszere biztosítja helyettünk (az STM monádon belül).

atomically: STM a -> IO a atomically :: STM a -> IO a

Az atomically blokk fogad egy akciót az STM monádban, futtatja, és az eredményét elérhetővé teszi a számunkra az IO monádban. Ez az a monád, amely az összes tranzakciós kódot futtatja. Például a függvények melyek a TVar értékekkel dolgoznak mind az STM monádban fut.

newTVar: a -> STM (tvár a) newTVar :: a -> STM (TVar a) readTVar: tvár egy -> STM a readTVar :: TVar a -> STM a writeTVar: tvár A -> a -> STM () writeTVar :: TVar a -> a -> STM ()

Ez szintén igaz az általunk definiált tranzakciós függvényekre:

basicTransfer :: Gold -> Balance -> Balance -> STM () maybeGiveItem :: Item -> Inventory -> Inventory -> STM Bool

Tranzakciók ismétlése

Az általunk definiált maybeGiveItem függvény API-ja kényelmetlen. Akkor ad át tárgyat ha a karakternél van, eddig rendben. Ám logikai értékkel tér vissza, ami bonyolítja a hívói kódját. Alább egy tárgy eladó függvény, ami a maybeGiveItem eredménye alapján dönti el mit csinál.

maybeSellItem :: Item -> Gold -> Player -> Player -> STM Bool maybeSellItem item price buyer seller = do given <- maybeGiveItem item (inventory seller) (inventory buyer) if given then do basicTransfer price (balance buyer) (balance seller) return True else return False

Nemcsak ellenőrizni kell hogy a tárgy át lett-e adva, tovább kell terjeszteni a sikerességét a hívóba. A komplexitás így kifelé áramlik.

Van elegánsabb módszer arra, hogy kezeljük a végrehathatatlan tranzakciókat. Az STM API biztosít számunkra egy retry akciót, amely azonnal véget vett a végrehajthatatlan atomically blokknak. Ahogy a neve is sugallja, ezután a blokk futtatása újrakezdődik elölről. Írjuk át a maybeGiveItem függvényt, hogy használja a retry-t.

giveItem :: Item -> Inventory -> Inventory -> STM () giveItem item fromInv toInv = do fromList <- readTVar fromInv case removeInv item fromList of Nothing -> retry Just newList -> do writeTVar fromInv newList readTVar toInv >>= writeTVar toInv . (item :)

A korábbi basicTransfer függvényünknek másmilyen típusú hibája van, nem ellenőrzi hogy a küldő egyenlege tartalmaz-e elegendő pénzt. Itt is használható retry-t a javításhoz, miközben a függvény típusa ugyanaz marad.

transfer :: Gold -> Balance -> Balance -> STM () transfer qty fromBal toBal = do fromQty <- readTVar fromBal when (qty > fromQty) $ retry writeTVar fromBal (fromQty - qty) readTVar toBal >>= writeTVar toBal . (qty +)

Most a retry használatával a tárgy eladó függvény jóval egyszerűbb.

sellItem :: Item -> Gold -> Player -> Player -> STM () sellItem item price buyer seller = do giveItem item (inventory seller) (inventory buyer) transfer price (balance buyer) (balance seller)

A viselkedése kicsit eltér a korábbi függvénytől. Ahelyett hogy azonnal hamis értékkel tér vissza, ha az eladó nem rendelkezik a tárggyal, blokkol addig amíg az eladónál van a tárgy, a vevőnél meg elég pénz.

Az STM szépsége abban rejlik, hogy a tiszta kód írását teszi lehetővé. Tudunk két rendesen működő függvény által létrehozni egy jól működő harmadikat minimális erőfeszítés árán.

Mi történik a retry használata során

A retry függvény nem csak a tisztább kódolást segíti, működése mögött kisebb varázslat folyik. Hívásakor nem indítja azonnal újra a tranzakciót. Ehelyett blokkolja a szálunkat egészen addig míg az általunk - a retry hívás elött - érintett változók egyike egy másik szál által megváltozik.

Például, ha az általunk fent megírt transfer függvényt hívjuk meg elégtelen egyenleggel, akkor a retry automatikusan vár addig, amíg az egyenlegünk meg nem változik, és csak ekkor indítja újra az "atomically" blokkot. Az új giveItem függvényünk esetén is hasonló történik: ha a küldő készletében nincs benne az átadandó tárgy, akkor a szála blokkolódik amíg nem lesz benne.

Lehetőségek közötti választás

Nem mindig szeretnénk újraindítani egy atomically akciót, csak mert meghívja a retry-t, vagy mert más szállal való konkurens módosítás miatt a hibázik.

Például az új sellItem függvény meghatározhatatlan ideig próbálkozik amíg hiányzik vagy a tárgy, vagy nincs elég pénz. Jobb lenne csak egyszer megpróbálni az eladást.

Az orElse kombinátor megenged egy "backup" akciót, ha a fő akció nem sikerült.

orElse: STM a -> STM a -> STM a

Így el lehet érni hogy ha a sellItem nem sikerül, akkor az orElse a return False akciót hajtsa végre azonnal visszadva a végrehajtást.

Magasabbrendű kód használata tranzakciókkal

Egy kicsit nagyobb ambícióval, írjunk egy függvényt, amely egy lista első olyan elemét veszi meg, ami az eladónál van, és a vevőnek van rá pénze, (ha nincs ilyen nem vesz semmit). Ezt a kódot meg lehet írni közvetlen

crummyList :: [(Item, Gold)] -> Player -> Player -> STM (Maybe (Item, Gold)) crummyList list buyer seller = go list where go [] = return Nothing go (this@(item,price) : rest) = do sellItem item price buyer seller return (Just this) `orElse` go rest

A kód egy ismerős problémától szenved, össze van keveredve az hogy mit akarunk csinálni és az hogy hogyan. Egy kis vizsgálat után rájöhettünk, hogy a kódba két újrahasználható minta van elrejtve.

Az első annak elérése, hogy egy tranzakció újrakezdés helyett azonnal térjen vissza hibával

maybeSTM :: STM a -> STM (Maybe a) maybeSTM m = (Just `liftM` m) `orElse` return Nothing

A második egy akció próbálgatása lista elemein, úgy hogy az első sikeres végrehajtásnál leálljon, ha nincs ilyen kezdje újra. Ehhez érdemen az STM-et a MonadPlus típusosztály példányaként kezelni.

instance MonadPlus STM where mzero = retry mplus = orElse

A Control.Monad modul definiál egy msum nevű függvényt, ami épp azt csinálja amire szükségünk volt

msum :: MonadPlus m => [m a] -> m a msum = foldr mplus mzero

Most a működés kulcs elemeivel a kezünkben jóval egyszerűbb kódot tudunk írni.

shoppingList :: [(Item, Gold)] -> Player -> Player -> STM (Maybe (Item, Gold)) shoppingList list buyer seller = maybeSTM . msum $ map sellOne list where sellOne this@(item,price) = do sellItem item price buyer seller return this

Mivel így már az STM a MonadPlus példánya, általánosítani tudjuk a maybeSTM-et hogy a MonadPlus felett is működjön.

maybeM :: MonadPlus m => m a -> m (Maybe a) maybeM m = (Just `liftM` m) `mplus` return Nothing

Ezáltal egy rengeteg különböző helyen hasznosítható függvényt kapunk.

I/O és STM

Az STM monád megtiltja nekünk önkényes I/O műveleteket, mert általuk elveszhet az atomicitás vagy az izoláció, ami a monád fő szolgáltatása. Természetesen szükséges lehet I/O műveleteket végezni, azonban rendkívül óvatosan.

A leggyakrabban egy atomically blokkon belüli döntés eredményeképpen kell I/O akciókat végeznünk. Ezekben az esetekben a leghelyesebb visszatérni néhány adattal a blokkból, amely megmondja a hívó számára hogy mi a következő teendő. Visszatérhetünk akcióval is (melyet majd végre kell hajtania).

someAction :: IO a stmTransaction :: STM (IO a) stmTransaction = return someAction doSomething :: IO a doSomething = join (atomically stmTransaction)

Néha Az STM-en belül is szükségel lehet I/O műveletet végezni. Például ha megváltoztathatatlan adatokat olvasásunk egy fájlból, nem sértjük meg az STM garanciáit. Ezekben az esetekben IO akció futtatásához az unsafeIOToSTM-et használhatjuk. Ez a függvény az alacsonyszintű GHC.Conc modulban található.

unsafeIOToSTM :: IO a -> STM a

Az általunk végrehajtott IO akciónak tilos bármilyen atomically tranzakciót kezdeményezni. Ugyanis ha egy szál tranzakciókat próbál meg beágyazni, a futtató környezet kivételt fog dobni.

Mivel a típusrendszer nem tud segíteni annak biztosításában, hogy egy IO kód biztonságos műveleteket végez-e, így az a legbiztonságosabb, ha a lehető legkevesebbszer használjuk a unsafeIOToSTM-t. Íme egy tipikus hiba az atomically blokkban végrehajtott IO-val kapcsolatban:

launchTorpedoes :: IO () notActuallyAtomic = do doStuff unsafeIOToSTM launchTorpedoes mightRetry

Ha a mightRetry blokk miatt a tranzakció újraindul, akkor a launchTorpedos függvény egynél többször hívódik meg. Valójában azt sem tudjuk megmondani előre hogy pontosan hányszor, mivel az újrakezdéseket futási időben kezeljük. A megoldás az, hogy tranzakción belül csak idempotens műveleteket végzünk (akkor idempotens egy művelet, ha többszöri végrehajtása ekvivalens az egyszeri végrehajtásával).

Szálak közötti kommunikáció

Az alapvető TVar-on kívül az stm még két hasznosabb típust is biztosít a szálak közötti kommunikációhoz. A TMVar az MVar STM-en belüli megfelelője: vagy egy Just a értéket, vagy Nothing-ot tartalmazhat. A TChan pedig a Chan STM-beli hasonmása, és egy típusos fifo csatornát implementál.

Több mag használata GCH-ban

TODO: Kidolgozásra vár