マルチプロセッシングについてもっと

状態を述べよう

a roasted turkey leg

前の章で示した例は、デモンストレーション目的にはすべて適切でしたが、ツールキットにそれだけでは先に進めません。例が悪かったのではなく、プロセスとアクターが単なるメッセージ付きの関数である場合、それほど大きな利点がないことが主な理由です。これを修正するには、プロセスで状態を保持できるようにする必要があります。

まず、新しいkitchen.erlモジュールで、プロセスが冷蔵庫のように動作できるようにする関数を作成しましょう。プロセスは、冷蔵庫に食品を保管することと、冷蔵庫から食品を取り出すことの2つの操作を許可します。事前に保管された食品のみを取り出すことができるようにする必要があります。次の関数は、プロセスのベースとして機能します。

-module(kitchen).
-compile(export_all).

fridge1() ->
    receive
        {From, {store, _Food}} ->
            From ! {self(), ok},
            fridge1();
        {From, {take, _Food}} ->
            %% uh....
            From ! {self(), not_found},
            fridge1();
        terminate ->
            ok
    end.

何かがおかしいです。食品を保管するように要求すると、プロセスはokで応答する必要がありますが、実際には食品を保管する場所がありません。fridge1()が呼び出された後、状態なしで最初から関数が開始されます。また、プロセスに冷蔵庫から食品を取り出すように呼び出すと、取り出す状態がないため、not_foundを返すしかないことがわかります。食品アイテムを保管および取り出すためには、関数に状態を追加する必要があります。

再帰の助けを借りて、プロセスの状態は関数のパラメータに完全に保持できます。冷蔵庫プロセスの場合は、すべての食品をリストとして保存し、誰かが何か食べる必要があるときにそのリストを検索することが考えられます。

fridge2(FoodList) ->
    receive
        {From, {store, Food}} ->
            From ! {self(), ok},
            fridge2([Food|FoodList]);
        {From, {take, Food}} ->
            case lists:member(Food, FoodList) of
                true ->
                    From ! {self(), {ok, Food}},
                    fridge2(lists:delete(Food, FoodList));
                false ->
                    From ! {self(), not_found},
                    fridge2(FoodList)
            end;
        terminate ->
            ok
    end.

最初に気づくべきことは、fridge2/1が1つの引数FoodListを取ることです。{From, {store, Food}}に一致するメッセージを送信すると、関数はFoodFoodListに追加してから移動することがわかります。その再帰呼び出しが行われると、同じアイテムを後で取得できるようになります。実際、私はそれをそこに実装しました。関数は、FoodFoodListの一部であるかどうかを確認するためにlists:member/2を使用しています。結果に応じて、アイテムは呼び出しプロセスに送り返され(FoodListから削除され)、それ以外の場合はnot_foundが送り返されます。

1> c(kitchen).
{ok,kitchen}
2> Pid = spawn(kitchen, fridge2, [[baking_soda]]).
<0.51.0>
3> Pid ! {self(), {store, milk}}.
{<0.33.0>,{store,milk}}
4> flush().
Shell got {<0.51.0>,ok}
ok

冷蔵庫へのアイテムの保管はうまくいっているようです。もう少し試してみて、冷蔵庫から取り出してみましょう。

5> Pid ! {self(), {store, bacon}}.
{<0.33.0>,{store,bacon}}
6> Pid ! {self(), {take, bacon}}.
{<0.33.0>,{take,bacon}}
7> Pid ! {self(), {take, turkey}}.
{<0.33.0>,{take,turkey}}
8> flush().
Shell got {<0.51.0>,ok}
Shell got {<0.51.0>,{ok,bacon}}
Shell got {<0.51.0>,not_found}
ok

予想どおり、最初に(牛乳と重曹と一緒に)冷蔵庫に入れたベーコンを取り出すことができますが、冷蔵庫プロセスには、リクエスト時に見つけるための七面鳥がありません。これが、最後の{<0.51.0>,not_found}メッセージが表示される理由です。

私たちはメッセージが大好きですが、秘密にしています

前の例で面倒なのは、冷蔵庫を使用するプログラマーが、そのプロセス用に発明されたプロトコルについて知っておく必要があることです。これは無意味な負担です。これを解決する良い方法は、メッセージの送受信を処理する関数の助けを借りてメッセージを抽象化することです。

store(Pid, Food) ->
    Pid ! {self(), {store, Food}},
    receive
        {Pid, Msg} -> Msg
    end.

take(Pid, Food) ->
    Pid ! {self(), {take, Food}},
    receive
        {Pid, Msg} -> Msg
    end.

これで、プロセスとの対話がはるかにクリーンになりました。

9> c(kitchen).
{ok,kitchen}
10> f().
ok
11> Pid = spawn(kitchen, fridge2, [[baking_soda]]).
<0.73.0>
12> kitchen:store(Pid, water).
ok
13> kitchen:take(Pid, water).
{ok,water}
14> kitchen:take(Pid, juice).
not_found

self()を送信する必要があるのか、takestoreのような正確なアトムが必要なのかなど、メッセージの仕組みを気にする必要はありません。必要なのは、pidと、どの関数を呼び出すかを知ることだけです。これにより、すべての面倒な作業が隠され、冷蔵庫プロセスを簡単に構築できます。

残りの1つは、プロセスを生成する必要があるという部分全体を隠すことです。メッセージを隠すことは処理しましたが、それでもユーザーがプロセスの作成を処理することを期待しています。次のstart/1関数を追加します。

start(FoodList) ->
    spawn(?MODULE, fridge2, [FoodList]).
Two tin cans with a string, where the tin cans somehow represent the abstraction layer between the vibrating string and the voice

ここで、?MODULEは現在のモジュールの名前を返すマクロです。このような関数を書くことには利点がないように見えますが、実際にはいくつかの利点があります。その本質的な部分は、take/2store/2の呼び出しとの一貫性です。冷蔵庫プロセスに関するすべてのことが、kitchenモジュールによって処理されるようになりました。冷蔵庫プロセスの開始時にログを追加したり、2番目のプロセス(たとえば、冷凍庫)を開始したりする場合は、start/1関数内で非常に簡単に行うことができます。ただし、生成がspawn/3を介してユーザーに委ねられている場合は、冷蔵庫を開始するすべての場所で新しい呼び出しを追加する必要があります。これはエラーが発生しやすく、エラーは最悪です。

この関数が実際に使用されるのを見てみましょう

15> f().
ok
16> c(kitchen).
{ok,kitchen}
17> Pid = kitchen:start([rhubarb, dog, hotdog]).
<0.84.0>
18> kitchen:take(Pid, dog).
{ok,dog}
19> kitchen:take(Pid, dog).
not_found

やった!犬は冷蔵庫から出て、抽象化が完了しました!

タイムアウト

3つの整数AB、およびCをpidに変更できるコマンドpid(A,B,C)の助けを借りて、少し試してみましょう。ここで、意図的にkitchen:take/2に偽物を渡します。

20> kitchen:take(pid(0,250,0), dog).

おっと。シェルがフリーズしました。これは、take/2の実装方法が原因で発生しました。何が起こるかを理解するために、まず通常のケースで何が起こるかを確認しましょう

  1. 食品を取り出すメッセージが、あなた(シェル)から冷蔵庫プロセスに送信されます。
  2. あなたのプロセスは受信モードに切り替わり、新しいメッセージを待ちます。
  3. 冷蔵庫はアイテムを削除し、あなたのプロセスに送信します。
  4. あなたのプロセスはそれを受信し、通常どおり処理を続行します。
Hourglass

そして、シェルがフリーズするときの様子は次のとおりです

  1. 食品を取り出すメッセージが、あなた(シェル)から不明なプロセスに送信されます。
  2. あなたのプロセスは受信モードに切り替わり、新しいメッセージを待ちます。
  3. 不明なプロセスは存在しないか、そのようなメッセージを予期しておらず、何も処理しません。
  4. あなたのシェルプロセスは受信モードで動けなくなります。

特にここではエラー処理が不可能なため、これは厄介です。違法なことは何も起こっていません。プログラムが待機しているだけです。一般的に、非同期操作(Erlangでのメッセージパッシングの仕組み)を処理するものは、データを受信する兆候がない場合に、一定期間後に諦める方法が必要です。Webブラウザーは、ページや画像の読み込みに時間がかかりすぎる場合にそれを行い、電話に出るまでに時間がかかりすぎたり、会議に遅刻したりした場合に行います。Erlangには確かに適切なメカニズムがあり、それはreceive構成の一部です

receive
    Match -> Expression1
after Delay ->
    Expression2
end.

receiveafterの間にある部分は、すでに知っていることとまったく同じです。afterの部分は、Matchパターンに一致するメッセージを受信せずにDelay(ミリ秒を表す整数)の時間だけ経過した場合にトリガーされます。これが起こると、Expression2が実行されます。

store2/2take2/2の2つの新しいインターフェイス関数を記述します。これらは、3秒後に待機を停止することを除いて、store/2take/2とまったく同じように動作します。

store2(Pid, Food) ->
    Pid ! {self(), {store, Food}},
    receive
        {Pid, Msg} -> Msg
    after 3000 ->
        timeout
    end.

take2(Pid, Food) ->
    Pid ! {self(), {take, Food}},
    receive
        {Pid, Msg} -> Msg
    after 3000 ->
        timeout
    end.

^Gでシェルをフリーズ解除し、新しいインターフェイス関数を試すことができます。

User switch command
 --> k 
 --> s
 --> c
Eshell V5.7.5  (abort with ^G)
1> c(kitchen).
{ok,kitchen}
2> kitchen:take2(pid(0,250,0), dog).
timeout

これでうまく動作します。

注: afterは値としてミリ秒しか受け取らないと言いましたが、実際にはアトムinfinityを使用することも可能です。これは多くの場合役に立ちませんが(after句を完全に削除することもできます)、プログラマーが結果の受信が期待される関数に待機時間を送信できる場合に使用されることがあります。そうすることで、プログラマーが本当に永久に待機したい場合、そうすることができます。

そのようなタイマーには、時間が長すぎる場合に諦める以外にも使用方法があります。非常に簡単な例の1つは、以前に使用したtimer:sleep/1関数の動作方法です。これがどのように実装されているかを見てみましょう(新しいmultiproc.erlモジュールに入れてみましょう)。

sleep(T) ->
    receive
    after T -> ok
    end.

この特定のケースでは、パターンがないため、構成のreceive部分でメッセージが一致することはありません。代わりに、遅延Tが経過すると、構成のafter部分が呼び出されます。

別の特別なケースは、タイムアウトが0の場合です。

flush() ->
    receive
        _ -> flush()
    after 0 ->
        ok
    end.

これが起こると、Erlang VMは利用可能なパターンのいずれかに適合するメッセージを見つけようとします。上記の場合、すべてが一致します。メッセージがある限り、flush/0関数は、メールボックスが空になるまで再帰的に自身を呼び出します。これが完了すると、コードのafter 0 -> ok部分が実行され、関数が返されます。

選択的受信

この「フラッシュ」の概念により、呼び出しをネストすることにより、受信するメッセージに優先順位を付けることができる選択的受信を実装できます。

important() ->
    receive
        {Priority, Message} when Priority > 10 ->
            [Message | important()]
    after 0 ->
        normal()
    end.

normal() ->
    receive
        {_, Message} ->
            [Message | normal()]
    after 0 ->
        []
    end.

この関数は、優先度が10を超えるものを最初に持つすべてのメッセージのリストを作成します。

1> c(multiproc).
{ok,multiproc}
2> self() ! {15, high}, self() ! {7, low}, self() ! {1, low}, self() ! {17, high}.       
{17,high}
3> multiproc:important().
[high,high,low,low]

after 0ビットを使用したため、すべてのメッセージがなくなるまで取得されますが、プロセスは、normal/0呼び出しで累積される他のメッセージを検討する前に、優先度が10を超えるすべてのメッセージを取得しようとします。

この慣習が興味深いと思われる場合は、Erlangでの選択的受信の仕組みにより、安全ではない場合があることに注意してください。

メッセージがプロセスに送信されると、プロセスが読み取ってそこでパターンに一致するまでメールボックスに保存されます。前の章で述べたように、メッセージは受信した順に保存されます。これは、メッセージを一致させるたびに、最も古いメッセージから開始されることを意味します。

次に、その最も古いメッセージが、一致するまでreceiveのすべてのパターンに対して試行されます。一致すると、メッセージはメールボックスから削除され、次のreceiveまでプロセスのコードが正常に実行されます。次のreceiveが評価されると、VMはメールボックスに現在ある最も古いメッセージ(削除したメッセージの後のメッセージ)を探します。

Visual explanation of how message matching is done when a message from the mailbox does match

特定のメッセージを一致させる方法がない場合、そのメッセージは保存キューに入れられ、次のメッセージが試行されます。2番目のメッセージが一致すると、最初のメッセージがメールボックスの先頭に戻され、後で再試行されます。

Visual explanation of how messages that won't match are moved back and forth from the mailbox to a save queue

これにより、有用なメッセージのみを気にするだけで済みます。一部のメッセージを無視して、後で上記の方法で処理することが、選択的受信の本質です。これらは便利ですが、問題は、プロセスに気にしないメッセージが多い場合、有用なメッセージの読み取りに実際に時間がかかるようになり(プロセスのサイズも大きくなります)、大きくなることです。

上の図で、367番目のメッセージが必要だが、最初の366個はコードで無視されるジャンクだと想像してください。367番目のメッセージを取得するには、プロセスは最初の366個と一致させようとする必要があります。それが完了し、すべてがキューに入れられると、367番目のメッセージが取り出され、最初の366個がメールボックスの先頭に戻されます。次の有用なメッセージはさらに深く埋もれており、見つけるのにさらに時間がかかる可能性があります。

この種の受信は、Erlangでパフォーマンス問題を引き起こす頻繁な原因です。アプリケーションの実行速度が遅く、多くのメッセージが飛び交っていることがわかっている場合は、これが原因である可能性があります。

このような選択的受信がコードの著しい減速を引き起こしている場合は、まず、なぜ不要なメッセージを受信しているのかを自問する必要があります。メッセージは適切なプロセスに送信されていますか?パターンは正しいですか?メッセージのフォーマットは間違っていませんか?本来複数のプロセスであるべき場所で1つのプロセスを使用していませんか?これらの質問の1つまたは複数に答えることで、問題が解決する可能性があります。

プロセスのメールボックスを無駄なメッセージで汚染するリスクがあるため、Erlangプログラマーは、そのようなイベントに対する防御策を講じることがあります。標準的な方法は次のようになるかもしれません。

receive
    Pattern1 -> Expression1;
    Pattern2 -> Expression2;
    Pattern3 -> Expression3;
    ...
    PatternN -> ExpressionN;
    Unexpected ->
        io:format("unexpected message ~p~n", [Unexpected])
end.

これは、すべてのメッセージが少なくとも1つの句に一致することを保証するものです。Unexpected変数はあらゆるものと一致し、予期しないメッセージをメールボックスから取り出して警告を表示します。アプリケーションによっては、メッセージをログ機能のような場所に保存して、後で情報を検索できるようにすることができます。メッセージが間違ったプロセスに送信されている場合は、それらを完全に失い、他のプロセスが本来受信すべきものを受信しない理由を見つけるのに苦労するのは残念です。

メッセージに優先度を設定する必要があり、このような包括的な句を使用できない場合は、よりスマートな方法は、最小ヒープを実装するか、gb_treesモジュールを使用して、受信したすべてのメッセージをダンプすることです(キーの最初に優先度番号を配置して、メッセージのソートに使用されるようにしてください)。その後、必要に応じてデータ構造内の最小または最大の要素を検索するだけです。

ほとんどの場合、この手法を使用すると、選択的受信よりも効率的に優先度付きでメッセージを受信できます。ただし、受信するほとんどのメッセージの優先度が可能な限り高い場合は、速度が低下する可能性があります。いつものように、最適化する前にプロファイルを作成して測定することがコツです。

注意: R14A以降、Erlangのコンパイラーに新しい最適化が追加されました。プロセス間の双方向通信の非常に特殊なケースにおける選択的受信を簡略化します。このような関数の例は、multiproc.erloptimized/1です。

これを機能させるには、関数内で参照(make_ref())を作成し、それをメッセージで送信する必要があります。同じ関数で、選択的受信が行われます。同じ参照を含まない限り一致するメッセージがない場合、コンパイラーは、VMがその参照の作成前に受信したメッセージを自動的にスキップするようにします。

このような最適化にコードを無理やり合わせようとしないでください。Erlangの開発者は、頻繁に使用されるパターンのみを探し、それらを高速化します。慣用的なコードを書けば、最適化は自然に訪れるはずです。逆ではありません。

これらの概念を理解したら、次のステップは複数のプロセスでエラー処理を行うことです。