ソケットのバケツ

A 'hello' enters a tin can and exits from a bucket

これまで、Erlang 自体を扱うことである程度の楽しみがありました。外部との通信は、あちこちで読み込むテキストファイルのみでした。自分自身との関係も楽しいかもしれませんが、巣穴から出て、世界の残りの部分と話し始める時が来ました。

この章では、ソケットの使用に関する 3 つの構成要素、IO リスト、UDP ソケット、TCP ソケットについて説明します。IO リストは、トピックとして非常に複雑というわけではありません。ソケットや他の Erlang ドライバーを介して送信される文字列を効率的に構築するための賢い方法です。

IO リスト

このガイドの前の方で、テキストには、文字列 (整数のリスト) またはバイナリ (データを保持するバイナリ データ構造) のいずれかを使用できると述べました。「Hello World」などのものをワイヤ経由で送信する場合は、文字列として "Hello World"、バイナリとして <<"Hello World">> で送信できます。表記は似ていますが、結果も似ています。

違いは、物をどのように組み立てるかです。文字列は、整数のリンクされたリストに少し似ています。各文字について、文字自体とリストの残りの部分へのリンクを保存する必要があります。さらに、リストの中央または末尾に要素を追加する場合は、変更するポイントまでリスト全体を走査してから、要素を追加する必要があります。ただし、先頭に追加する場合はそうではありません。

A = [a]
B = [b|A] = [b,a]
C = [c|B] = [c,b,a]

上記のように先頭に追加する場合、ABC に保持されているものは書き換える必要はありません。C の表現は、[c,b,a][c|B]、または [c,|[b|[a]]] などと見なすことができます。最後の場合、A の形状は、宣言時と同じようにリストの末尾にあります。B についても同様です。以下に、追加した場合の様子を示します。

A = [a]
B = A ++ [b] = [a] ++ [b] = [a|[b]]
C = B ++ [c] = [a|[b]] ++ [c] = [a|[b|[c]]]

この書き換えがすべてわかりますか?B を作成するとき、A を書き換える必要があります。C を書き込むとき、B (含まれている [a|...] の部分を含む) を書き換える必要があります。同様の方法で D を追加する場合、C を書き換える必要があります。長い文字列の場合、これは非常に非効率になり、Erlang VM によってクリーンアップされるべき多くのガベージが作成されます。

バイナリの場合、状況はそれほど悪くありません。

A = <<"a">>
B = <<A/binary, "b">> = <<"ab">>
C = <<B/binary, "c">> = <<"abc">>

この場合、バイナリは自身の長さを認識しており、データは一定時間で結合できます。それは良いことです。リストよりもはるかに優れています。また、よりコンパクトです。これらの理由から、今後はテキストを使用する際は、バイナリに固執しようとすることがよくあります。

ただし、いくつかの欠点があります。バイナリは特定のやり方で処理するように設計されており、バイナリの変更、分割などにはまだコストがかかります。さらに、文字列、バイナリ、個々の文字を相互に使用するコードを使用することがあります。常に型を変換するのは面倒です。

このような場合、IO リストが救世主となります。IO リストは、奇妙な型のデータ構造です。バイト (0 から 255 の整数)、バイナリ、またはその他の IO リストのリストです。つまり、IO リストを受け入れる関数は、[$H, $e, [$l, <<"lo">>, " "], [[["W","o"], <<"rl">>]] | [<<"d">>]] のような項目を受け入れることができます。このような場合、Erlang VM は、Hello World という文字シーケンスを取得するために必要なときにリストをフラット化するだけです。

このような IO リストを受け入れる関数は何ですか?データを出力する必要のあるほとんどの関数は受け入れます。io モジュールの関数、file モジュールの関数、TCP および UDP ソケットの関数は、それらを処理できます。unicode モジュールの一部の関数や、re (regular expressions の略) モジュールのすべての関数など、一部のライブラリ関数も処理できます。

シェルで前の Hello World IO リストを io:format("~s~n", [IoList]) で試してみてください。問題なく動作するはずです。

A guido with an RJ-45 connection head

結局のところ、出力されるコンテンツを動的に構築する場合に、不変のデータ構造の問題を回避するために、文字列を構築するための非常に賢い方法です。

TCP と UDP: Bro-tocols

Erlang で使用できる最初の種類のソケットは、UDP プロトコルに基づいています。UDP は、ポート番号など、いくつかの抽象化を提供する IP レイヤーの上に構築されたプロトコルです。UDP はステートレス プロトコルであると言われています。UDP ポートから受信したデータは、小さな部分に分割され、タグ付けされておらず、セッションもなく、受信したフラグメントが送信された順序と同じ順序で受信される保証はありません。実際、誰かがパケットを送信した場合、まったく受信される保証はありません。これらの理由から、パケットが小さく、ほとんど影響なく失われる可能性があり、複雑な交換があまり行われず、低レイテンシーが絶対に必要な場合に、人々は UDP を使用する傾向があります。

これは、失われたパケットの処理、順序の変更、複数の送信者と受信者間の隔離されたセッションの維持などをプロトコルが処理する TCP のようなステートフル プロトコルとは対照的です。TCP は、情報の信頼できる交換を可能にしますが、設定に時間がかかり、重くなるリスクがあります。UDP は高速ですが、信頼性が低くなります。必要なものに応じて慎重に選択してください。

いずれにしても、Erlang で UDP を使用するのは比較的簡単です。特定のポートでソケットを設定すると、そのソケットはデータの送受信の両方を行うことができます。

Diagram showing a Host A that has ports A, B and C, which can all send and receive packets to other hosts

悪い例えですが、これはあなたの家にある郵便受けの束(各郵便受けがポート)を持っていて、小さなメッセージが書かれた小さな紙切れをそれぞれに受け取るようなものです。「このパンツを履いたあなたが好き」から「紙切れは家の中から来ている!」まで、どんな内容でも構いません。メッセージが紙切れに対して大きすぎる場合、その多くは郵便受けに落とされます。意味のある方法でそれらを再組み立てするのはあなたの仕事であり、その後、いくつかの家に車で行き、返信として紙切れを落とします。メッセージが純粋に情報的(「こんにちは、ドアのロックが解除されています」)であるか、非常に小さい(「何を着ていますか?-ロン」)場合は、問題なく、すべてのクエリに 1 つの郵便受けを使用できます。ただし、それらが複雑になる場合は、セッションごとに 1 つのポートを使用したいかもしれませんね。ああ、違います!TCP を使用してください!

TCP の場合、プロトコルはステートフルで、接続ベースであると言われています。メッセージを送信できるようになる前に、ハンドシェイクを行う必要があります。これは、誰かが(UDP の例えにあるものと同様の)郵便受けを取り、'おい、これは IP 94.25.12.37 からの電話だ。チャットしたいか?' というメッセージを送信します。それに対して、'もちろんです。メッセージに番号 N をタグ付けし、それらに番号を増やして追加してください' というメッセージに少し似た内容で返信します。その時点から、あなたまたは IP 92.25.12.37 が互いに通信したい場合は、紙切れを注文したり、不足している紙切れを要求したり、意味のある方法で紙切れに返信したりすることができます。

そうすれば、1 つの郵便受け(またはポート)を使用して、すべての通信を正常に保つことができます。それが TCP の優れた点です。オーバーヘッドは少しありますが、すべてが注文され、適切に配信されるようにします。

これらの例えが好きでない場合は、今すぐ Erlang で TCP および UDP ソケットを使用する方法を見て、本題に入りますのでご安心ください。これはより簡単になるはずです。

UDP ソケット

UDP を使用する基本的な操作はいくつかしかありません。ソケットの設定、メッセージの送信、メッセージの受信、接続のクローズです。可能性は少しこのようなものです

A graph showing that Opening a socket can lead to 3 options: sending data, receiving data, or closing a socket. Sending can lead to receiving data or closing a socket, receiving data can lead to sending data or closing a socket. Finally, closing a socket does nothing

最初に行う操作は、ソケットを開くことです。これは、gen_udp:open/1-2 を呼び出すことによって行われます。最も単純な形式は、{ok, Socket} = gen_udp:open(PortNumber) を呼び出すことによって行われます。

ポート番号は、1 から 65535 までの任意の整数です。0 から 1023 までのポートは、システム ポートとして知られています。ほとんどの場合、管理権限がない限り、オペレーティング システムはシステム ポートをリッスンすることを不可能にします。1024 から 49151 までのポートは、登録されたポートです。通常、権限は必要なく自由に使用できますが、一部は既知のサービスに登録されています。残りのポートは、動的またはプライベートとして知られています。それらは、一時ポートによく使用されます。テストでは、8789 など、使用される可能性の低い、ある程度安全なポート番号を使用します。

しかし、その前に、gen_udp:open/2 はどうでしょうか?2 番目の引数は、データを受信するタイプ (list または binary)、受信方法 (メッセージ ({active, true}) または関数呼び出しの結果 ({active, false})) を指定するオプションのリストにできます。ソケットを IPv4 (inet4) または IPv6 (inet6) で設定するかどうか、UDP ソケットをブロードキャスト情報 ({broadcast, true | false}) に使用できるかどうか、バッファーのサイズなど、より多くのオプションがあります。他に利用可能なオプションはありますが、残りのオプションを理解するのは自分で学ぶことになるので、ここでは簡単なことだけを使用します。このトピックはすぐに複雑になる可能性があり、このガイドは残念ながら Erlang についてであり、TCP や UDP についてではありません。

それでは、ソケットを開きましょう。まず、特定の Erlang シェルを起動します。

1> {ok, Socket} = gen_udp:open(8789, [binary, {active,true}]). 
{ok,#Port<0.676>}
2> gen_udp:open(8789, [binary, {active,true}]).
{error,eaddrinuse}

最初のコマンドでは、ソケットを開き、バイナリデータを返すように指示し、アクティブにしたいと考えています。新しいデータ構造 #Port<0.676> が返されるのがわかります。これは、開いたばかりのソケットの表現です。これらは Pid のように使用できます。クラッシュした場合に障害がソケットに伝播されるように、それらへのリンクを設定することもできます。2 番目の関数呼び出しは、同じソケットを再度開こうとしていますが、これは不可能です。そのため、{error, eaddrinuse} が返されます。幸い、最初の Socket ソケットはまだ開いています。

いずれにしても、2 番目の Erlang シェルを起動します。そこでは、異なるポート番号で 2 番目の UDP ソケットを開きます。

1> {ok, Socket} = gen_udp:open(8790).
{ok,#Port<0.587>}
2> gen_udp:send(Socket, {127,0,0,1}, 8789, "hey there!").
ok

おっと、新しい関数ですね! 2回目の呼び出しでは、gen_udp:send/4 がメッセージを送信するために使用されます(なんと素晴らしい説明的な名前でしょう)。引数は、順番に gen_udp:send(OwnSocket, RemoteAddress, RemotePort, Message) です。RemoteAddress は、ドメイン名 ("example.org") を含む文字列またはアトム、IPv4 アドレスを表す 4 タプル、または IPv6 アドレスを表す 8 タプルのいずれかになります。次に、受信者のポート番号(どのメールボックスに紙片をドロップするのか)を指定し、次にメッセージを指定します。メッセージは、文字列、バイナリ、または IO リストにすることができます。

メッセージは実際に送信されたのでしょうか?最初のシェルに戻ってデータをフラッシュしてみてください。

3> flush().
Shell got {udp,#Port<0.676>,{127,0,0,1},8790,<<"hey there!">>}
ok

素晴らしい。ソケットを開いたプロセスは、{udp, Socket, FromIp, FromPort, Message} という形式のメッセージを受信します。これらのフィールドを使用して、メッセージの送信元、どのソケットを通過したか、および内容を知ることができます。これで、ソケットを開き、データを送信し、アクティブモードで受信する方法を説明しました。では、パッシブモードはどうでしょうか?これには、最初のシェルからソケットを閉じ、新しいソケットを開く必要があります。

4> gen_udp:close(Socket).
ok
5> f(Socket).
ok
6> {ok, Socket} = gen_udp:open(8789, [binary, {active,false}]).
{ok,#Port<0.683>}

ここでは、ソケットを閉じ、Socket 変数のバインドを解除し、今回はパッシブモードでソケットを再度開くときにバインドします。メッセージを返す前に、次のことを試してください。

7> gen_udp:recv(Socket, 0).

そして、シェルがスタックするはずです。ここで使用される関数は recv/2 です。これは、パッシブソケットでメッセージをポーリングするために使用される関数です。ここの 0 は、必要なメッセージの長さです。面白いのは、gen_udp では長さが完全に無視されるということです。gen_tcp にも同様の関数があり、この場合は影響があります。とにかく、メッセージを送信しなければ、recv/2 は決して返りません。2番目のシェルに戻り、新しいメッセージを送信してください。

3> gen_udp:send(Socket, {127,0,0,1}, 8789, "hey there!").
ok

すると、最初のシェルは {ok,{{127,0,0,1},8790,<<"hey there!">>}} を戻り値として出力したはずです。永遠に待ちたくない場合はどうすればよいでしょうか?タイムアウト値を追加するだけです。

8> gen_udp:recv(Socket, 0, 2000).
{error,timeout}

そして、それがUDPの大部分です。本当に!

TCPソケット

TCPソケットは、インターフェースの大部分をUDPソケットと共有していますが、動作方法にはいくつかの重要な違いがあります。最大の違いは、クライアントとサーバーがまったく異なるものであるということです。クライアントは、次の操作で動作します。

A diagram similar to the UDP one: connection leads to send and receive, which both send to each other. More over, all states can then lead to the closed state

一方、サーバーはむしろこのスキームに従います。

Diagram similar to the UDP one, although a listen state is added before the whole thing. That state can either move on to the 'accept' state (similar to 'open socket' for the possible branches) or to a close state.

奇妙に見えますね?クライアントは、gen_udp で見たものと少し似ています。ポートに接続し、送受信して、その動作を停止します。ただし、サーバーとして機能する場合は、リスニングという新しいモードが1つあります。これは、TCPがセッションをセットアップする方法が原因です。

まず、新しいシェルを開き、gen_tcp:listen(Port, Options) を使用してリスンソケットと呼ばれるものを開始します。

1> {ok, ListenSocket} = gen_tcp:listen(8091, [{active,true}, binary]).
{ok,#Port<0.661>}

リスンソケットは、接続要求を待機する役割のみを担います。gen_udp と同様のオプションを使用していることがわかります。これは、ほとんどのオプションがすべてのIPソケットで同様になるためです。TCPには、接続バックログ({backlog, N})、キープアライブソケット({keepalive, true | false})、パケットパッキング({packet, N}。ここで N は、ストリップされて解析される各パケットのヘッダーの長さ)など、いくつかのより具体的なオプションがあります。

リスンソケットが開くと、任意のプロセス(複数でも可)がリスンソケットを取得して、「受け入れ」状態になり、クライアントが通信を要求するまでロックされます。

2> {ok, AcceptSocket} = gen_tcp:accept(ListenSocket, 2000).
** exception error: no match of right hand side value {error,timeout}
3> {ok, AcceptSocket} = gen_tcp:accept(ListenSocket).
** exception error: no match of right hand side value {error,closed}

なんてことでしょう。タイムアウトしてクラッシュしました。リスンソケットは、関連付けられていたシェルプロセスが消滅したときに閉じられました。今度は2秒(2000ミリ秒)のタイムアウトなしで、もう一度やり直しましょう。

4> f().
ok
5> {ok, ListenSocket} = gen_tcp:listen(8091, [{active, true}, binary]).
{ok,#Port<0.728>}
6> {ok, AcceptSocket} = gen_tcp:accept(ListenSocket).

そして、プロセスはロックされます。素晴らしい! 2番目のシェルを開きましょう。

1> {ok, Socket} = gen_tcp:connect({127,0,0,1}, 8091, [binary, {active,true}]). 
{ok,#Port<0.596>}

これも通常どおり同じオプションを受け取り、永遠に待ちたくない場合は、最後の位置に Timeout 引数を追加できます。最初のシェルに戻ると、{ok, SocketNumber} が返されたはずです。その時点から、受け入れソケットとクライアントソケットは、gen_udp と同様に、1対1で通信できます。2番目のシェルを使用して、最初のシェルにメッセージを送信します。

3> gen_tcp:send(Socket, "Hey there first shell!").
ok

そして、最初のシェルから

7> flush().
Shell got {tcp,#Port<0.729>,<<"Hey there first shell!">>}
ok

両方のソケットが同じ方法でメッセージを送信でき、gen_tcp:close(Socket) で閉じることができます。受け入れソケットを閉じるとそのソケットのみが閉じられ、リスンソケットを閉じても、関連付けられ確立された受け入れソケットは閉じられませんが、現在実行中の accept 呼び出しは {error, closed} を返すことで中断されます。

ErlangでのTCPソケットのほとんどは以上です!でも、本当にそうでしょうか?

ああ、そうです。もちろん、もっとできることがあります。自分でソケットを少し実験したことがある場合は、ソケットには何らかの所有権があることに気付いたかもしれません。

つまり、UDPソケット、TCPクライアントソケット、TCP受け入れソケットはすべて、存在する任意のプロセスからメッセージを送信できますが、受信したメッセージはソケットを開始したプロセスでのみ読み取ることができます。

A diagram that shows that all processes can send to a socket, but only the owner can receive messages

これはあまり実用的ではありませんね?つまり、ニーズとは関係がない場合でも、メッセージを中継するために常に所有者プロセスをアクティブにしておく必要があるということです。このようにすることができたら素晴らしいと思いませんか?

    1.  Process A starts a socket
    2.  Process A sends a request
    3.  Process A spawns process B
        with a socket
    4a. Gives ownership of the      4b. Process B handles the request
        socket to Process B
    5a. Process A sends a request   5b. Process B Keeps handling
                                        the request
    6a. Process A spawns process C  6b. ...
        with a socket
        ...

ここで、A が多数のクエリを実行する担当になりますが、新しい各プロセスは、応答を待機し、処理などを担当します。このため、A がタスクを実行するために新しいプロセスを委任するのは賢明です。ここでの難しい点は、ソケットの所有権を譲ることです。

これがその秘訣です。gen_tcp と gen_udp の両方に、controlling_process(Socket, Pid) という関数が含まれています。この関数は、現在のソケット所有者が呼び出す必要があります。次に、プロセスは Erlang に「知っていますか?この Pid というやつに私のソケットを引き継がせてください。あきらめます。」と伝えます。今後は、関数の Pid がソケットからメッセージを読み取り、受信できる者になります。以上です。

Inetによるより詳細な制御

これで、ソケットを開き、ソケットを介してメッセージを送信し、所有権を変更する方法などを理解できました。また、パッシブモードとアクティブモードの両方でメッセージをリッスンする方法も理解できました。UDPの例に戻ると、アクティブモードからパッシブモードに切り替えたいときは、ソケットを再起動し、変数をフラッシュして続行しました。これはあまり実用的ではありません。特にTCPを使用しながら同じことをしたい場合は、アクティブなセッションを中断する必要があるためです。

幸いなことに、inet というモジュールがあり、gen_tcpソケットとgen_udpソケットの両方に共通する可能性のあるすべての操作の処理を担当します。現在抱えている問題、つまりアクティブモードとパッシブモードの切り替えには、inet:setopts(Socket, Options) という名前の関数があります。オプションリストには、ソケットの設定時に使用された任意の項を含めることができます。

注意:注意してください! inet という名前のモジュールと inets という名前のモジュールが存在します。ここで必要なのは inet モジュールです。inets は、事前に作成されたサービスとサーバー(FTP、Trivial FTP (TFTP)、HTTPなどを含む)を多数含むOTPアプリケーションです。

それらを区別する簡単な方法は、inetsinet の上に構築されたserviceに関するものであるか、あるいは、inet + s(ervices) と表現することもできます。

TCPサーバーになるためのシェルを開始します

1> {ok, Listen} = gen_tcp:listen(8088, [{active,false}]).
{ok,#Port<0.597>}
2> {ok, Accept} = gen_tcp:accept(Listen).

2番目のシェルで

1> {ok, Socket} = gen_tcp:connect({127,0,0,1}, 8088, []).
{ok,#Port<0.596>}
2> gen_tcp:send(Socket, "hey there").
ok

次に、最初のシェルに戻ると、ソケットが受け入れられたはずです。何か取得したかどうかを確認するためにフラッシュします。

3> flush().
ok

もちろん、パッシブモードなので、何もありません。これを修正しましょう。

4> inet:setopts(Accept, [{active, true}]).
ok
5> flush().
Shell got {tcp,#Port<0.598>,"hey there"}
ok

はい!アクティブソケットとパッシブソケットを完全に制御できるようになったので、権限は私たちにあります。アクティブモードとパッシブモードのどちらを選択すればよいでしょうか?

A stop sign

いくつかの点があります。一般的に、すぐにメッセージを待っている場合は、パッシブモードの方がはるかに高速になります。Erlangは、処理するためにプロセスのメールボックスをいじる必要がなく、そのメールボックスをスキャンしたり、メッセージをフェッチしたりする必要もありません。recv を使用する方が効率的です。ただし、recv は、イベント駆動型だったプロセスをアクティブポーリングに変えてしまいます。ソケットと他のErlangコードの間で仲介役をする必要がある場合は、少し複雑になる可能性があります。

その場合は、アクティブモードに切り替えるのが良いでしょう。パケットがメッセージとして送信される場合、受信(または gen_server の handle_info 関数)で待機してメッセージを処理するだけです。速度以外に、このことの欠点は、レート制限に関係しています。

考え方としては、外部からのすべてのパケットがErlangによって盲目的に受け入れられ、メッセージに変換される場合、VMの外部にいる誰かがVMをフラッドして停止させることが多少容易になります。パッシブモードには、Erlang VMにメッセージを配置できる方法とタイミングを制限し、メッセージのブロック、キューイング、破棄のタスクをより低いレベルの実装に委任できるという利点があります。

では、セマンティクスにはアクティブモードが必要で、安全性にはパッシブモードが必要な場合はどうすればよいでしょうか?inet:setopts/2 を使用して、パッシブとアクティブをすばやく切り替えることを試みることができますが、これは競合状態に対して非常に危険です。代わりに、{active, once} オプションを使用した *active once* と呼ばれるモードがあります。どのように機能するかを試してみましょう。

以前のサーバーからのシェルを維持します。

6> inet:setopts(Accept, [{active, once}]).
ok

クライアントシェルに移動して、さらに2つの send/2 呼び出しを実行します。

3> gen_tcp:send(Socket, "one").
ok
4> gen_tcp:send(Socket, "two").
ok

サーバーシェルに戻ります

7> flush().
Shell got {tcp,#Port<0.598>,"one"}
ok
8> flush().
ok
9> inet:setopts(Accept, [{active, once}]).
ok
10> flush().
Shell got {tcp,#Port<0.598>,"two"}
ok

わかりますか?2回目に {active, once} を要求するまで、メッセージ "two" はメッセージに変換されていません。これは、ソケットがパッシブモードに戻ったことを意味します。したがって、アクティブワンスモードを使用すると、アクティブとパッシブの間の切り替えを安全な方法で行うことができます。優れたセマンティクスと安全性。

inetの一部である他の優れた関数もあります。統計を読み取り、現在のホスト情報を取得し、ソケットを検査するものなどがあります。

さて、これでソケットのほとんどは完了しました。今こそ、これを実践に移すときです。

注: インターネットの荒野には、HTTP、0mq、raw unixソケットなど、大量のプロトコルに対応したライブラリがあります。それらはすべて利用可能です。ただし、標準のErlangディストリビューションには、TCPとUDPソケットの2つの主要なオプションが付属しています。また、HTTPサーバーと解析コードも付属していますが、最も効率的なものではありません。

アップデート
バージョン17.0以降、ポートに対してNパケットのアクティブ状態を設定できるようになりました。TCPおよびUDPポートに{active, N}オプションが追加されました。ここで、Nは0から32767までの任意の値を指定できます。残りのメッセージカウンターが0に達するか、inet:setopts/2を使用して明示的に0に設定されると、ソケットはパッシブ({active, false})モードに移行します。その時点で、ソケットの制御プロセスに移行を通知するメッセージが送信されます。メッセージはUDPの場合は{tcp_passive, Socket}{udp_passive, Socket}になります。

この関数を複数回呼び出すと、新しい値が合計カウンターに追加されます。{active, 3}を3回呼び出すと、最大9つのメッセージが制御プロセスに送信されます。N値は、カウンターを強制的に減らすために負の値にすることもできます。最終値が0を下回る場合、Erlangは黙って0に設定し、パッシブモードに移行します。

A cup of coffee with cookies and a spoon. Text says 'take a break'

Sockserv、再訪

この章では、新しいコードをそれほど多く紹介しません。代わりに、前の章のProcess Questのsockservサーバーを振り返ります。これは完全に実行可能なサーバーであり、OTPスーパービジョンツリー内のTCP接続をgen_serverで処理する方法を見ていきます。

TCPサーバーの単純な実装は、次のようになります。

-module(naive_tcp).
-compile(export_all).

start_server(Port) ->
    Pid = spawn_link(fun() ->
        {ok, Listen} = gen_tcp:listen(Port, [binary, {active, false}]),
        spawn(fun() -> acceptor(Listen) end),
        timer:sleep(infinity)
    end),
    {ok, Pid}.

acceptor(ListenSocket) ->
    {ok, Socket} = gen_tcp:accept(ListenSocket),
    spawn(fun() -> acceptor(ListenSocket) end),
    handle(Socket).

%% Echoing back whatever was obtained
handle(Socket) ->
    inet:setopts(Socket, [{active, once}]),
    receive
        {tcp, Socket, <<"quit", _/binary>>} ->
            gen_tcp:close(Socket);
        {tcp, Socket, Msg} ->
            gen_tcp:send(Socket, Msg),
            handle(Socket)
    end.

これがどのように機能するかを理解するために、少し図解すると役立つかもしれません。

A diagram showing the first process (P1) spawning a listen socket and a first acceptor process (P2). The first acceptor can accept request, handle messages, and then spawn a new acceptor process (P3) that does the same as P2

したがって、start_server関数はlistenソケットを開き、アクセプターを生成し、その後は永久にアイドル状態になります。アイドル状態が必要なのは、listenソケットがそれを開いたプロセスにバインドされているため、接続を処理したい限り、そのプロセスが生きている必要があるからです。各アクセプタープロセスは、受け入れる接続を待機します。接続が1つ入ると、アクセプタープロセスは新しい同様のプロセスを開始し、listenソケットを共有します。その後、新しいプロセスが作業している間、処理を進めることができます。各ハンドラーは、メッセージが"quit"で始まるまで、受信したすべてのメッセージを繰り返します。その後、接続が閉じられます。

注: パターン<<"quit", _/binary>>は、最初に文字quitを含むバイナリ文字列と、気にしないバイナリデータ(_)を照合することを意味します。

Erlangシェルでnaive_tcp:start_server(8091).を実行してサーバーを起動します。次に、telnetクライアント(telnetクライアントは厳密にはraw TCP用ではありませんが、クライアントを作成せずにサーバーをテストするのに適しています)をlocalhostに開くと、次のことが起こっていることがわかります。

$ telnet localhost 8091
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hey there
hey there
that's what I asked
that's what I asked
stop repeating >:(
stop repeating >:(
quit doing that!
Connection closed by foreign host.

万歳。Poople Inc.という新しい会社を設立し、そのようなサーバーでいくつかのソーシャルネットワークを立ち上げる時が来ました。ただし、モジュールの名前が示すように、これは単純な実装です。コードは単純ですが、並列処理を念頭に置いていません。すべてのリクエストが1つずつ到着する場合、単純なサーバーは正常に機能します。ただし、一度に15人がサーバーに接続しようとするキューがある場合はどうなりますか?

すると、一度に1つのクエリしか応答できなくなり、これは各プロセスが最初に接続を待機し、設定し、次に新しいアクセプターを生成することに関係しています。キュー内の15番目のリクエストは、サーバーと議論する権利を要求する機会を得るために、14個の他の接続が設定されるのを待たなければなりません。本番サーバーを扱っている場合、1秒あたり500から1000個のクエリに近い場合があります。それは非現実的です。

必要なのは、現在のシーケンシャルなワークフローを変更することです。

A diagram showing in order, a listen operation, then a bunch of 'accepts' coming one after the other in a chain

より並列的なものに

A diagram showing in order, a listen operation, then a bunch of 'accepts' coming under the listen operation

スタンバイで準備ができているアクセプターを多数用意することで、新しいクエリに応答するための遅延を大幅に削減できます。今、別のデモ実装を行うのではなく、前の章のsockserv-1.0.1を調べます。実際のOTPコンポーネントと実世界のプラクティスに基づいたものを調査する方が良いでしょう。実際、sockservの一般的なパターンは、cowboy(ただし、cowboyはsockservよりも間違いなく信頼性が高い)やetorrent torrentクライアントのようなサーバーで使用されているものと同じです。

このProcess Questのsockservを構築するには、トップダウンで進みます。必要なスキームは、多くのワーカーを持つスーパーバイザーである必要があります。上記の並列図を見ると、スーパーバイザーはlistenソケットを保持し、それをすべてのアクティビティを受け持つワーカーに共有する必要があります。

すべてワーカー間で共有できるスーパーバイザーをどのように記述しますか?通常のスーパービジョンでは実行できません。one_for_oneone_for_allrest_for_oneのいずれのスーパービジョンを使用しても、すべての子は完全に独立しています。自然な反射神経は、グローバルな状態に頼ることです。つまり、listenソケットを保持し、ハンドラーにそれを渡す登録済みプロセスです。この反射神経と戦い、賢くなりましょう。フォース(およびスーパーバイザーの章を読み返す能力)を使用してください。解決策を考える時間は2分です(2分のタイミングは名誉システムに基づいています。自分で時間を計ってください)。

秘密は、simple_one_for_oneスーパーバイザーを使用することにあります。simple_one_for_oneスーパーバイザーは子仕様をすべての子と共有するため、listenソケットをすべての子がアクセスできるようにそこに押し込むだけでよいのです。

これがスーパーバイザーの全貌です。

%%% The supervisor in charge of all the socket acceptors.
-module(sockserv_sup).
-behaviour(supervisor).

-export([start_link/0, start_socket/0]).
-export([init/1]).

start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
    {ok, Port} = application:get_env(port),
    %% Set the socket into {active_once} mode.
    %% See sockserv_serv comments for more details
    {ok, ListenSocket} = gen_tcp:listen(Port, [{active,once}, {packet,line}]),
    spawn_link(fun empty_listeners/0),
    {ok, {{simple_one_for_one, 60, 3600},
         [{socket,
          {sockserv_serv, start_link, [ListenSocket]}, % pass the socket!
          temporary, 1000, worker, [sockserv_serv]}
         ]}}.

start_socket() ->
    supervisor:start_child(?MODULE, []).

%% Start with 20 listeners so that many multiple connections can
%% be started at once, without serialization. In best circumstances,
%% a process would keep the count active at all times to insure nothing
%% bad happens over time when processes get killed too much.
empty_listeners() ->
    [start_socket() || _ <- lists:seq(1,20)],
    ok.

ここで何が起こっているのでしょうか。標準のstart_link/0およびinit/1関数があります。sockservがsimple_one_for_one再起動戦略を取得し、子仕様がListenSocketを渡されるのを確認できます。start_socket/0で開始されたすべての子は、デフォルトで引数としてそれを持つことになります。魔法です!

それだけでは十分ではありません。アプリケーションができるだけ早くクエリを処理できるようにする必要があります。そのため、spawn_link(fun empty_listeners/0)の呼び出しを追加しました。empty_listeners/0関数は、ロックされて着信接続を待機する20個のハンドラーを開始します。spawn_link/1呼び出しの中に入れたのは単純な理由があります。スーパーバイザープロセスはinit/1フェーズであり、メッセージに応答できません。init関数内から自分自身を呼び出すと、プロセスはデッドロックし、実行を完了できなくなります。まさにこの理由で、外部プロセスが必要です。

注: 上記のスニペットでは、gen_tcpにオプション{packet, line}を渡していることがわかります。このオプションにより、受信したすべてのパケットが別々の行に分割され、それに基づいてキューに入れられます(行末は受信した文字列の一部になります)。これにより、この場合、telnetクライアントでより適切に動作することが保証されます。ただし、受信バッファよりも長い行は多くのパケットに分割される可能性があるため、2つのパケットで1つの行を表すことが可能であることに注意してください。受信したコンテンツが改行で終わっているかどうかを確認することで、行が終わっているかどうかを知ることができます。

はい、それが難しい部分でした。これで、ワーカー自体の作成に集中できます。

前の章のProcess Questセッションを思い出してください。物事はこうなりました。

  1. ユーザーがサーバーに接続します。
  2. サーバーがキャラクターの名前を要求します。
  3. ユーザーがキャラクターの名前を送信します。
  4. サーバーがステータスを提案します。
    1. ユーザーが拒否し、4に戻ります。
    2. ユーザーが受け入れ、6に進みます。
  5. ゲームは、次のようになるまで、イベントをプレーヤーに送信します。
  6. ユーザーがサーバーにquitを送信するか、ソケットが強制的に閉じられます。

これは、サーバープロセスへの入力には2つの種類があることを意味します。Process Questアプリケーションからの入力と、ユーザーからの入力です。ユーザーからのデータはソケットから送信されるため、gen_serverのhandle_info/2関数で処理されます。Process Questからのデータは、私たちが制御する方法で送信できるため、handle_castで処理されるキャストが意味をなします。まず、サーバーを起動する必要があります。

-module(sockserv_serv).
-behaviour(gen_server).

-record(state, {name, % player's name
                next, % next step, used when initializing
                socket}). % the current socket

-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         code_change/3, terminate/2]).

まず、非常に標準的なgen_serverコールバックモジュールです。ここでの唯一の特別な点は、キャラクターの名前、ソケット、およびnextというフィールドを含む状態です。next部分は、サーバーの状態に関する一時的な情報を格納するためのキャッチオールフィールドです。gen_fsmは、それほど苦労することなくここで使用できた可能性があります。

実際のサーバー起動については、

-define(TIME, 800).
-define(EXP, 50).

start_link(Socket) ->
    gen_server:start_link(?MODULE, Socket, []).

init(Socket) ->
    %% properly seeding the process
    <<A:32, B:32, C:32>> = crypto:rand_bytes(12),
    random:seed({A,B,C}),
    %% Because accepting a connection is a blocking function call,
    %% we can not do it in here. Forward to the server loop!
    gen_server:cast(self(), accept),
    {ok, #state{socket=Socket}}.

%% We never need you, handle_call!
handle_call(_E, _From, State) ->
    {noreply, State}.

上記で定義された2つのマクロ(?TIMEおよび?EXP)は、アクション間のベースライン遅延(800ミリ秒)と、2番目のレベルに到達するために必要な経験値(50、各レベル後に2倍)を設定できる特別なパラメーターです。

start_link/1関数がソケットを受け取ることに気づくでしょう。それがsockserv_supから渡されたlistenソケットです。

ランダムシードに関する最初のビットは、後でキャラクターの統計を生成するためにプロセスが適切にシードされていることを確認することです。そうしないと、多くのプロセスでいくつかのデフォルト値が使用され、それは望ましくありません。ランダム番号を使用するライブラリではなくinit/1関数で初期化する理由は、シードがプロセスレベル(くそっ!可変状態!)で保存されているため、ライブラリ呼び出しごとに新しいシードを設定したくないからです。

いずれにせよ、ここで本当に重要な部分は、自分自身にメッセージをキャストしているということです。その理由は、gen_tcp:accept/1-2がブロッキング操作であり、すべてのinit関数が同期であるという事実と組み合わさっているためです。接続を受け入れるのに30秒待機すると、プロセスを開始するスーパーバイザーも30秒ロックされます。そのため、自分自身にメッセージをキャストし、次にlistenソケットを状態のsocketフィールドに追加します。

あまり熱狂しないでください。
他人のコードを読むと、random:seed/1now() の結果で呼び出しているのをよく見かけます。now() は単調増加する時間(常に増加し、同じ値が2度返ることはない)を返すので便利な関数です。しかし、Erlang で使用される乱数アルゴリズムにとっては悪いシード値です。そのため、12個の暗号学的に安全な乱数バイトを生成するために crypto:rand_bytes(12) を使用する方が良いでしょう(R14B03+ の場合は crypto:strong_rand_bytes(12) を使用してください)。<<A:32, B:32, C:32>> を行うことで、12バイトを3つの整数にキャストして渡します。

アップデート
バージョン18.0から、randモジュールが導入されました。このモジュールは、randomモジュールよりも優れた擬似乱数アルゴリズムを含み、シードを設定する必要がありません。

接続を受け入れる必要があります。ふざけているのはもう十分です。

handle_cast(accept, S = #state{socket=ListenSocket}) ->
    {ok, AcceptSocket} = gen_tcp:accept(ListenSocket),
    %% Remember that thou art dust, and to dust thou shalt return.
    %% We want to always keep a given number of children in this app.
    sockserv_sup:start_socket(), % a new acceptor is born, praise the lord
    send(AcceptSocket, "What's your character's name?", []),
    {noreply, S#state{socket=AcceptSocket, next=name}};

接続を受け入れ、代替アクセプターを開始し(これにより、常に新しい接続を処理するための約20個のアクセプターを準備します)、次に、アクセプトソケットを ListenSocket の代替として保存し、ソケットを介して受信する次のメッセージが 'next' フィールドを持つ名前に関するものであることに注意します。

しかし、次に進む前に、以下のように定義された send 関数を通してクライアントに質問を送信します。

send(Socket, Str, Args) ->
    ok = gen_tcp:send(Socket, io_lib:format(Str++"~n", Args)),
    ok = inet:setopts(Socket, [{active, once}]),
    ok.

トリックです!メッセージを受信した後、ほぼ常に返信する必要があると考えているため、その関数内で *active once* ルーチンを実行し、さらに改行を追加しています。単なる怠惰を関数に閉じ込めただけです。

ステップ1と2を完了し、ソケットから来るユーザー入力を待つ必要があります。

handle_info({tcp, _Socket, Str}, S = #state{next=name}) ->
    Name = line(Str),
    gen_server:cast(self(), roll_stats),
    {noreply, S#state{name=Name, next=stats}};

Str 文字列に何が含まれているかはわかりませんが、状態の next フィールドによって、受信したものが名前であることがわかります。デモアプリケーションでユーザーがtelnetを使用することを想定していたため、受信するテキストにはすべて改行が含まれています。以下のように定義されている line/1 関数は、それらを取り除きます。

%% Let's get rid of the white space and ignore whatever's after.
%% makes it simpler to deal with telnet.
line(Str) ->
    hd(string:tokens(Str, "\r\n ")).

その名前を受け取ったら、それを保存し、次に(roll_stats)というメッセージを自分自身にキャストして、プレイヤーの統計を生成します。これが次のステップです。

注:ファイルを見ると、メッセージ全体をマッチングする代わりに、短い ?SOCK(Var) マクロを使用していることがわかります。マクロは -define(SOCK(Msg), {tcp, _Port, Msg}). として定義されており、私のような怠け者が少ないタイプ数で文字列をマッチングするための簡単な方法です。

統計のロール処理は、handle_cast 句に戻ってきます。

handle_cast(roll_stats, S = #state{socket=Socket}) ->
    Roll = pq_stats:initial_roll(),
    send(Socket,
         "Stats for your character:~n"
         "  Charisma: ~B~n"
         "  Constitution: ~B~n"
         "  Dexterity: ~B~n"
         "  Intelligence: ~B~n"
         "  Strength: ~B~n"
         "  Wisdom: ~B~n~n"
         "Do you agree to these? y/n~n",
         [Points || {_Name, Points} <- lists:sort(Roll)]),
    {noreply, S#state{next={stats, Roll}}};
two dice, with a 5 rolled on each

pq_stats モジュールには、統計をロールする関数が含まれており、この句全体は、そこに統計を出力するためにのみ使用されています。~B フォーマットパラメーターは、整数を出力することを意味します。状態の next 部分はここで少し過負荷になっています。ユーザーに同意するかどうかを尋ねるため、彼らが教えてくれるのを待たなければならず、統計をドロップして新しいものを生成するか、すぐに開始するであろうProcess Questキャラクターに渡す必要があります。

ユーザー入力を聞いてみましょう。今回は handle_info 関数で。

handle_info({tcp, Socket, Str}, S = #state{socket=Socket, next={stats, _}}) ->
    case line(Str) of
        "y" ->
            gen_server:cast(self(), stats_accepted);
        "n" ->
            gen_server:cast(self(), roll_stats);
        _ -> % ask again because we didn't get what we wanted
            send(Socket, "Answer with y (yes) or n (no)", [])
    end,
    {noreply, S};

この直接の関数句でキャラクターを始めることも可能でしたが、そうしないことにしました。handle_info はユーザー入力を処理するためのもので、handle_cast はProcess Questの処理です。関心の分離です!ユーザーが統計を拒否した場合、再度 roll_stats を呼び出すだけです。新しいことは何もありません。ユーザーが受け入れた場合、Process Questキャラクターを開始し、そこからのイベントを待つことができます。

%% The player has accepted the stats! Start the game!
handle_cast(stats_accepted, S = #state{name=Name, next={stats, Stats}}) ->
    processquest:start_player(Name, [{stats,Stats},{time,?TIME},
                                     {lvlexp, ?EXP}]),
    processquest:subscribe(Name, sockserv_pq_events, self()),
    {noreply, S#state{next=playing}};

これらは私がゲーム用に定義した通常の呼び出しです。プレイヤーを開始し、sockserv_pq_events イベントハンドラーでイベントをサブスクライブします。次の状態は playing であり、これは受信したすべてのメッセージがゲームからのものである可能性が高いことを意味します。

%% Events coming in from process quest
%% We know this because all these events' tuples start with the
%% name of the player as part of the internal protocol defined for us
handle_cast(Event, S = #state{name=N, socket=Sock}) when element(1, Event) =:= N ->
    [case E of
       {wait, Time} -> timer:sleep(Time);
       IoList -> send(Sock, IoList, [])
     end || E <- sockserv_trans:to_str(Event)], % translate to a string
    {noreply, S}.

これがどのように機能するかの詳細にはあまり触れません。sockserv_trans:to_str(Event) が、いくつかのゲームイベントをIOリストのリストまたはイベントの間に待機する遅延を表す {wait, Time} タプルに変換することを覚えておいてください(敵がドロップしたアイテムを表示する少し前に executing a ... メッセージを出力します)。

従うべき手順のリストを思い出していただければ、1つを除いてすべて完了しました。ユーザーが終了を望むときに終了します。handle_info の一番上に次の句を配置してください。

handle_info({tcp, _Socket, "quit"++_}, S) ->
    processquest:stop_player(S#state.name),
    gen_tcp:close(S#state.socket),
    {stop, normal, S};

キャラクターを停止し、ソケットを閉じ、プロセスを終了します。万歳。終了するその他の理由としては、クライアントによってTCPソケットが閉じられた場合などがあります。

handle_info({tcp_closed, _Socket}, S) ->
    {stop, normal, S};
handle_info({tcp_error, _Socket, _}, S) ->
    {stop, normal, S};
handle_info(E, S) ->
    io:format("unexpected: ~p~n", [E]),
    {noreply, S}.

不明なメッセージを処理するための追加の句も追加しました。ユーザーが予想外のものを入力しても、クラッシュさせたくありません。残っているのは terminate/2 関数と code_change/3 関数だけです。

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

terminate(normal, _State) ->
    ok;
terminate(_Reason, _State) ->
    io:format("terminate reason: ~p~n", [_Reason]).

すべてを理解したら、このファイルをコンパイルし、リリース版の対応する beam ファイルと置き換えて、正常に動作するかどうかを試すことができます。正しくコピーしていれば(私もそうしていれば)、うまくいくはずです。

これからどこへ行く?

もしあなたがそれを受け入れるのであれば、あなたの次の課題は、クライアントにあなたの選択したコマンドをいくつか追加することです。たとえば、アクションをしばらくキューに入れてから、サーバーを再開したときにすべてを出力する 'pause' のようなものを追加してみてはどうでしょうか?あるいは、あなたが十分にすごい人であれば、sockserv_serv モジュールでこれまでのレベルと統計を記録し、クライアント側からそれらを取得するコマンドを追加してみてはどうでしょうか。私は読者に任される練習問題が嫌いでしたが、時々、ここに1つや2つ落とすのがあまりにも魅力的です。楽しんでください!

それ以外の場合は、既存のサーバー実装のソースを読んだり、自分でプログラミングしたりすることが良い練習になります。Webサーバーの作成のようなことがアマチュア向けの練習となる言語は珍しいですが、Erlangはその1つです。少し練習すれば、第二の天性のように感じられるようになるでしょう。Erlangが外部世界と通信することは、有用なソフトウェアを作成するための多くのステップの1つに過ぎません。