bitterharvest’s diary

A Bitter Harvestは小説の題名。作者は豪州のPeter Yeldham。苦闘の末に勝ちえた偏見からの解放は命との引換になったという悲しい物語

乱舞するメッセージ(1)

1.概略

複数のエージェントがトランザクション処理を行う場合は並行処理となる。並行処理においては、複数のエージェントが一つの資源を争う場合がある。会計システムであれば帳簿であるし、航空券のチケット予約システムでは座席ということになる。
並行処理では、システムのけっけいが不備であると様々な不都合が生じる。ダブルブッキングや、予約できなかった人に請求書が配達されるなどが典型的な例だが、このような不都合が生じないようにするために、これまで、様々な方法が提案されてきた。既に述べた、ソフトウェア・トランザクション・メモリもその一つである。
今回は、CSP(Communicating Sequential Processes)について説明する。CSPは1970年代にトニー・ホア教授により提案された。ホア教授が書いたCSPの本は自由にダウンロードできる。
CSPの機能は、Haskellでも実現されている。実現したのは、英国ケント大学の博士課程の大学生であったブラウンで、彼の博士論文(2011年)としてまとめられている。論文名はCommunicating Haskell Processes (CHP)である。
ここでは、CHPを用いて、小さな会社の会計システムを実現することを最終的な目的とするが、まず、ブラウンが書いた入門書を簡単に紹介する。ここで、説明するプログラムは入門書のものをそのまま引用している。なお、以下のプログラムを実行する場合には、cabalでCHP-plusをインストールしておく。

2.メッセージ・パッシング

CSPの最も基本的な機能はプロセス間でのメッセージの授受である。
今、二つのプロセス考える。一つはコンソールと呼ばれるプロセス。このプロセスは、キーボードの入力をメッセージとして送受でき、ディスプレイへの表示をメッセージとして受信できる。残りのプロセスはエコーである。これは、受信したメッセージをそのまま送信する。CHPを使って表すと次のようになる

import Control.Concurrent.CHP
import qualified Control.Concurrent.CHP.Common as CHP
import Control.Concurrent.CHP.Console

main :: IO ()
main = runCHP_(consoleProcess echo)
echo :: ConsoleChans -> CHP ()
echo chans = CHP.id (cStdin chans) (cStdout chans)

このプログラムでは、consoleProcessとechoの二つのプロセスを用いる。consoleProcessは、importによりモジュールがロードされる。Echoプロセスは、CHP.idを実行するが、これは、単に入力で受け取ったメッセージを何もせずにそのまま出力する。

二番目の例は、前述のプルグラムに似ているが、入力のメッセージをそのまま出力するのではなく、xという文字を取り除いて出力する。CHP.idに代わって、not_xというプロセスになっている。プログラムは次のようになる。

import Control.Concurrent.CHP
import qualified Control.Concurrent.CHP.Common as CHP
import Control.Concurrent.CHP.Console

main :: IO ()
main = runCHP_(consoleProcess not_x)

not_x :: ConsoleChans -> CHP ()
not_x chans = CHP.filter (/= 'x') (cStdin chans) (cStdout chans)

三番目の例は、再び似たような例であるが、入力した文字を十進数表記に変える。そのプロセスはprintOrdである。プログラムは以下の通り。

import Control.Concurrent.CHP
import qualified Control.Concurrent.CHP.Common as CHP
import Control.Concurrent.CHP.Console
import Control.Monad

main :: IO ()
main = runCHP_(consoleProcess printOrd)

printOrd :: ConsoleChans -> CHP ()
printOrd chans = forever (do
  x <- readChannel (cStdin chans)
  let ordXStr = show (fromEnum x) ++ "\n"
  mapM_(writeChannel (cStdout chans)) ordXStr
  )

3.選択

次の例は、二つのプロセスがあった時に選択的に実行する。これは次のようになっている。どちらのプロセスも実行可能でないときは、少なくともどちらが実行可能になるまで待機する。いずれか一つが実行可能であるときは、それを実行する。両方とも実行可能であるときは、どちらかを実行する。
次の例で示すプロセス(printOrdWhileWaiting)は、1秒間待つプロセス(waitFor 1000000 )とメッセージの入力を待っているプロセス(readChanel) cin)から成り立っている。printOrdWhileWaitingは、受信している文字を10進数表示で出力し、waitFor 1000000かreadChanelのいずれかを待つ。もし、1秒たったのであれば、文字をそのままにし、printOrdWhileWaitingに再帰的に戻る。もし、メッセージの入力があったのであれば、文字を新たに受け取ったメッセージで変え、printOrdWhileWaitingに再帰的に戻る。
プログラムは次のようになっている。
なお、選択するプロセスは<->の両側で示す。

import Control.Concurrent.CHP
import qualified Control.Concurrent.CHP.Common as CHP
import Control.Monad
import Control.Concurrent.CHP.Console

main :: IO (Maybe ())
main = runCHP (consoleProcess printOrdWhileWaiting)

printOrdWhileWaiting :: ConsoleChans -> CHP ()
printOrdWhileWaiting chans = readChannel cin >>= inner
  where
    inner :: Char -> CHP ()
    inner x = do
      printString (show (fromEnum x) ++ "\n")
      (waitFor 1000000 >> inner x) <-> (readChannel cin >>= inner)

    cin = cStdin chans
    cout = cStdout chans

    printString = mapM (writeChannel cout)

4.パイプライン

パイプラインはいくつかのプロセスを順番にパイプラインのように実行するものである。以下のプログラムは、これまでと同じ二つのプロセスから成り立っていて、キーボードの入力を送信し、ディスプレイに出力するプロセスconsoleProcessと、このプロセスからメッセージを受け取り、処理を行った後で、このプロセスに出力するcrazyPipelineから成り立っている。crazyPipelineはさらに三つのプロセスから成り立っていて、それらは、文字だけを選び出すプロセスCHP.filter isLetterと大文字に変換するプロセスCHP.map toUpperと文字Xを取り除くプロセスCHP.filter (/=’X’)かなりたっている、これらのプロセスが順番に適応された後、メッセージとして出力する。
なお、パイプラインは、pipelineConnectにより順次実行するプロセスを示す。,

import Control.Concurrent.CHP
import qualified Control.Concurrent.CHP.Common as CHP
import Control.Concurrent.CHP.Console
import Control.Concurrent.CHP.Connect
import Control.Monad
import Data.Char

main :: IO ()
main = runCHP_(consoleProcess crazyPipeline)

crazyPipeline :: ConsoleChans -> CHP ()
crazyPipeline chans = do
  pipelineConnect [CHP.filter isLetter, CHP.map toUpper, CHP.filter (/= 'X')]
    (cStdin chans) (cStdout chans)
  return ()

5.並行処理

並行処理はいくつかのプロセスを同時に実行する。以下の例は、選択のところで出てきたものであるが、コンソールに出力するときに、標準出力ばかりではなくエラー出力にも同時に出力している。
並行処理は<||>で示す。

import Control.Concurrent.CHP
import qualified Control.Concurrent.CHP.Common as CHP
import Control.Concurrent.CHP.Console

main :: IO ()
main = runCHP_(consoleProcess echo)
echo :: ConsoleChans -> CHP ()
echo chans = CHP.id (cStdin chans) (cStdout chans)

6.プロセスの並行処理(食事する哲学者)

プロセスの並行処理を示したのが以下の例である。
以下の例は食事する哲学者の例である。もともとの問題は5人いるが、ここでは問題を簡略化するために、二人の哲学者である。哲学者は、考えている場面と食事をする場面がある。食事をする場面では、両方のフォークを取ろうとする。これは並行処理で行う。同時に二つのフォークを取ろうとする。無事両方取れれば食事をすることになる。そうでなければ、機会が来るまで待つことになる。
以下のプログラムでは、哲学者とフォークの数に対応して、それぞれプロセスが二つ用意されている。哲学者が食事をするときは、両方のフォークを取ろうとする。これはプログラムでは、両方のフォークに並行でメッセージを送っている。もし成功したとすると、食事をした後で、フォークを順次置きに行く。これもフォークにメッセージを送っている。一方、フォークの方は、両方とも、一人の哲学者が食事を始めたとき、フォークを取ったことを確認する。これはメッセージを受け取ることで実現している。さらに食事が終わった時に、フォークが置かれることを確認するために、やはりメッセージを受け取りに行く。
この方法は、哲学者の食事する問題を完全に解いているわけではなく、ある程度の時間が経過するとデッドロックが発生する。
なお、runParallel_がプロセスを並行に実行するための関数である。

import Control.Concurrent.CHP
import Control.Concurrent.CHP.Traces
import Control.Monad
import Control.Monad.Trans
import System.Random

spaghettiFork :: Chanin () -> Chanin () -> CHP ()
spaghettiFork claimA claimB = forever
  ( (readChannel claimA >> readChannel claimA)
    <-> (readChannel claimB >> readChannel claimB) )

philosopher :: Chanout () -> Chanout () -> CHP ()
philosopher left right = forever (do
  randomDelay -- Thinking
  writeChannel left () >> writeChannel right ()
  randomDelay -- Eating
  writeChannel left () <||> writeChannel right ()
  )
  where
    randomDelay = liftIO_CHP (getStdRandom (randomR (0, 50000))) >>= waitFor

college :: CHP ()
college = do
  phil0Left <- newChannel' $ chanLabel "phil0Left"
  phil0Right <- newChannel' $ chanLabel "phil0Right"
  phil1Left <- newChannel' $ chanLabel "phil1Left"
  phil1Right <- newChannel' $ chanLabel "phil1Right"
  runParallel_
    [ philosopher ( writer phil0Left ) ( writer phil0Right )
    , philosopher ( writer phil1Left ) ( writer phil1Right )
    , spaghettiFork (reader phil0Left ) (reader phil1Right )
    , spaghettiFork (reader phil1Left ) (reader phil0Right ) ]

main :: IO ()
main = runCHP_CSPTraceAndPrint college