在這個(gè)版本中,需要提供如下功能:
- 使用 otp 的 supervisor 監(jiān)控樹(shù),保證服務(wù)可靠性。
- 添加日志功能,通過(guò)定制 sasl alarm_handler 來(lái)記錄警告事件。
- 將名稱(chēng)服務(wù)打包為 application,暫且叫 vsns 吧,very stabilization name server 呵呵。
- 開(kāi)放 socket 服務(wù) (使用半阻塞的混合模式),使用 vsns://verb /param 自定義協(xié)議對(duì)外提供訪問(wèn)支持。
最終驗(yàn)證性的功能測(cè)試用例如下,主要的測(cè)試代碼位于 test/0 方法中,其上的幾個(gè)方法都用于 socket 通信:
- -module(vsns_tcp_client).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.06").
- -vsn(0.11).
- -compile(export_all).
- conn() ->
- {ok, Socket} = gen_tcp:connect("localhost", 8304,
- [binary, {packet, 2}, {reuseaddr, true}, {active, once}]),
- Socket.
- eval(Socket, Args, AssertVal) ->
- ok = gen_tcp:send(Socket, Args),
- receive
- {tcp, _, AssertVal} ->
- io:format("Ok. ~p = ~p.~n", [Args, AssertVal]);
- {tcp_closed, _} ->
- case Args of
- <<"vsns://kernel_oops">> ->
- io:format("Ok. kernel_oops = tcp_closed.~n");
- _Other ->
- io:format("Connection abort by server.~n")
- end;
- Other ->
- io:format("Assert faild. ~p != ~p.~n", [Other, AssertVal])
- end,
- inet:setopts(Socket, [{active, once}]).
- close(Socket) ->
- gen_tcp:close(Socket).
- test() ->
- S = conn(),
- eval(S, <<"vsns://remove_all">>, <<"ack">>),
- eval(S, <<"vsns://save/abc/123">>, <<"">>),
- eval(S, <<"vsns://save/abc/456">>, <<"123">>),
- eval(S, <<"vsns://save/abc/789">>, <<"456">>),
- eval(S, <<"vsns://load_all">>, <<"ack">>),
- eval(S, <<"vsns://remove/abc">>, <<"789">>),
- eval(S, <<"vsns://remove/not_value">>, <<"">>),
- eval(S, <<"foo">>, <<"unknow">>),
- eval(S, <<"vsns://kernel_oops">>, <<"">>),
- ok = close(S),
- pass.
- %% File end.
實(shí)際實(shí)現(xiàn) supervisor 監(jiān)控樹(shù)、日志和警告事件功能的過(guò)程,也是學(xué)習(xí) 《Erlang 程序設(shè)計(jì)》的過(guò)程。
首先,為名稱(chēng)服務(wù)添加監(jiān)控進(jìn)程。erlang otp 監(jiān)控樹(shù)很簡(jiǎn)單,只需要實(shí)現(xiàn)一個(gè) supervisor behaviour module 提供給 otp supervisor 模塊就可以,前面版本的名稱(chēng)服務(wù)是通過(guò) erlang shell 啟動(dòng)的,在以后將由這個(gè)監(jiān)控進(jìn)程來(lái)啟動(dòng)她,主要的啟動(dòng)代碼在 init/1 方法中,監(jiān)控模塊代碼如下:
- -module(name_server_sup).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.04").
- -vsn(0.1).
- -behaviour(supervisor).
- %% gen_supervisor behaviour callback functions.
- -export([init/1]).
- %% Interface functions.
- -export([start/0, start_in_shell/0, start_link/1]).
- start() ->
- spawn(fun() -> supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []) end).
- start_in_shell() ->
- {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []),
- unlink(Pid).
- start_link(Args) ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, Args).
- init([]) ->
- gen_event:swap_handler(alarm_handler, {alarm_handler, swap}, {vsns_alarm_handler, foo}),
- {ok, {
- {one_for_one, 3, 10},
- [{
- vsns_name_server,
- {name_server, start_link, []},
- permanent,
- 1,
- worker,
- [name_server]
- }]
- }}.
- %% File end.
有了這個(gè) name_server_sup 就不怕 name_server 崩潰了,supervisor 進(jìn)程會(huì)負(fù)責(zé)重新啟動(dòng),對(duì)于描述監(jiān)控策略的數(shù)據(jù)結(jié)構(gòu)可參考 erlang doc。其中的 vsns_alarm_handler 是定制的警告事件處理模塊,負(fù)責(zé)將服務(wù)中的報(bào)警記錄到 erlang sasl 日志中,后期可以使用 rb 工具來(lái)查看處理。接下來(lái)就是警告日志處理模塊代碼:
- -module(vsns_alarm_handler).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.04").
- -vsn(0.11).
- -behaviour(gen_event).
- %% gen_event behaviour callback functions.
- -export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
- init(Args) ->
- io:format("vsns_alarm_handler init : ~p.~n", [Args]),
- {ok, Args}.
- handle_event({set_alarm, {remove_all, From}}, _State) ->
- error_logger:error_msg("vsns depot clear by ~p started.~n.", [From]),
- {ok, _State};
- handle_event({clear_alarm, {remove_all, From}}, _State) ->
- error_logger:error_msg("vsns depot clear by ~p done.~n.", [From]),
- {ok, _State};
- handle_event(Event, State) ->
- error_logger:error_msg("unmatched event: ~p.~n", [Event, State]),
- {ok, State}.
- handle_call(_Req, State) ->
- {ok, State, State}.
- handle_info(_Info, State) ->
- {ok, State}.
- terminate(_Reason, _State) ->
- ok.
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- %% File end.
歸根到底,就是通過(guò) error_logger:error_msg 調(diào)用來(lái)記錄日志。當(dāng)然還涉及到 erlang sasl 的配置:
- %% file name: sasl_log.config
- %% auther: lzy
- %% email: lzy.dev@gmail.com
- %% date: 2009.02.04
- %% version: 0.1
- [{sasl, [
- {sasl_error_logger, false},
- {errlog_type, error},
- {error_logger_mf_dir, "./logs"},
- %% 10M per log file.
- {error_logger_mf_maxbytes, 1048760},
- {error_logger_mf_maxfiles, 5}
- ]}].
- %% File end.
該配置文件可以通過(guò) erlang shell 的 啟動(dòng)啟動(dòng)參數(shù)指定。-boot start_sasl -config .\sasl_log。再接下來(lái)就是打包 vsns application,這需要一個(gè) application 描述文件和一個(gè) application behavior 模塊,很簡(jiǎn)單具體配置參數(shù)語(yǔ)意可參考 erlang doc。
- %% file name: vsns.app
- %% auther: lzy
- %% email: lzy.dev@gmail.com
- %% date: 2009.02.05
- %% version: 0.1
- {
- application, vsns,
- [
- {description, "very stabilization name service."},
- {vsn, "1.0a"},
- {modules, [vsns_app, vsns_supervisor, name_server, vsns_alarm_handler]},
- {registered, [vsns_supervisor, name_server]},
- {applications, [kernel, stdlib]},
- {mod, {vsns_app, []}},
- {start_phases, []}
- ]
- }.
- %% File end.
- -module(vsns_app).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.05").
- -vsn(0.1).
- -behavior(application).
- -export([start/2, stop/1]).
- start(_Type, Args) ->
- name_server_sup:start_link(Args).
- stop(_State) ->
- void.
- %% File end.
經(jīng)過(guò)這樣的包裝,就可以通過(guò) application:start(vsns) 調(diào)用來(lái)啟動(dòng) vsns 服務(wù)。通過(guò) appmon 工具可以看到如下進(jìn)程樹(shù):
到這里,我們就可以通過(guò) erlang 來(lái)使用 vsns 了。
- C:\Program Files\erl5.6.4\usr\lzy_app\vsns>..\..\..\bin\erl.exe -sname vsns +P 1
- 02400 -smp enable +S 1 -boot start_sasl -config sasl_log
- Eshell V5.6.4 (abort with ^G)
- (vsns@srclzy)1> application:start(vsns).
- vsns_alarm_handler init : {foo,{alarm_handler,[]}}.
- name_server starting.
- ok
- (vsns@srclzy)2> name_server:save(abc, 123).
- undefined
- (vsns@srclzy)3> name_server:load_all().
- [{abc,123}]
最后還需要一個(gè) socket tcp 服務(wù)器,來(lái)將 vsns 暴露出來(lái),允許其它 client 來(lái)使用服務(wù)。otp 中沒(méi)有類(lèi)似的 socket server behavior,但可以通過(guò) gen_server 來(lái)實(shí)現(xiàn),當(dāng)然甚至可以實(shí)現(xiàn)一個(gè)非 otp 相關(guān)的 socket 服務(wù)器。這里 Serge Aleynikov 實(shí)現(xiàn)了一個(gè)很好 tcp 服務(wù)器,基于有限狀態(tài)機(jī)模式來(lái)處理請(qǐng)求,在此做了很好的闡述:Building a Non-blocking TCP server using OTP principles ,不過(guò)恐怕需要代理來(lái)打開(kāi)連接。在他給出的代碼中,我添加了幾行代碼,將 socket server 提供的服務(wù)是做為可配置的,通過(guò) application 環(huán)境來(lái)配置 socket server 使用的 gen_fsm behaviour module,大約位于 tcp_server_app 模塊的 15 和 27 行。
- -module(tcp_server_app).
- ... ...
- -define(DEF_SERVICE, tcp_echo_fsm).
- ... ...
- start(_Type, _Args) ->
- ListenPort = get_app_env(listen_port, ?DEF_PORT),
- ServiceMod = get_app_env(service_mod, ?DEF_SERVICE),
- supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, ServiceMod]).
- ... ...
在 saleyn_tcp_server 中提供的是 echo 服務(wù)。為了將 saleyn_tcp_server 服務(wù)指定成 vsns,除了上面的修改外,剩下就只需要實(shí)現(xiàn)一個(gè)調(diào)用 vsns 的 gen_fsm behaviour module 了,代碼很簡(jiǎn)單,是基于 tcp_echo_fsm 修改得來(lái)的,呵呵。
- -module(vsns_tcp_fsm).
- -author(lzy).
- -email(lzy.dev@gmail.com).
- -date("2009.02.06").
- -vsn(0.1).
- -remark("vsns_tcp_fsm used by saleyn_tcp_server appliction to support vsns socket server.").
- -remark("It referenced from saleyn_tcp_server/tcp_echo_fsm module.").
- -behaviour(gen_fsm).
- -export([start_link/0, set_socket/2]).
- %% gen_fsm callbacks
- -export([init/1, handle_event/3,
- handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
- %% FSM States
- -export([
- 'WAIT_FOR_SOCKET'/2,
- 'WAIT_FOR_DATA'/2
- ]).
- -record(state, {
- socket, % client socket
- addr % client address
- }).
- -define(TIMEOUT, 120000).
- %%%------------------------------------------------------------------------
- %%% API
- %%%------------------------------------------------------------------------
- %%-------------------------------------------------------------------------
- %% @spec (Socket) -> {ok,Pid} | ignore | {error,Error}
- %% @doc To be called by the supervisor in order to start the server.
- %% If init/1 fails with Reason, the function returns {error,Reason}.
- %% If init/1 returns {stop,Reason} or ignore, the process is
- %% terminated and the function returns {error,Reason} or ignore,
- %% respectively.
- %% @end
- %%-------------------------------------------------------------------------
- start_link() ->
- gen_fsm:start_link(?MODULE, [], []).
- set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) ->
- gen_fsm:send_event(Pid, {socket_ready, Socket}).
- %%%------------------------------------------------------------------------
- %%% Callback functions from gen_server
- %%%------------------------------------------------------------------------
- %%-------------------------------------------------------------------------
- %% Func: init/1
- %% Returns: {ok, StateName, StateData} |
- %% {ok, StateName, StateData, Timeout} |
- %% ignore |
- %% {stop, StopReason}
- %% @private
- %%-------------------------------------------------------------------------
- init([]) ->
- process_flag(trap_exit, true),
- {ok, 'WAIT_FOR_SOCKET', #state{}}.
- %%-------------------------------------------------------------------------
- %% Func: StateName/2
- %% Returns: {next_state, NextStateName, NextStateData} |
- %% {next_state, NextStateName, NextStateData, Timeout} |
- %% {stop, Reason, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- 'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) ->
- % Now we own the socket
- inet:setopts(Socket, [binary, {packet, 2}, {reuseaddr, true}, {active, once}]),
- {ok, {IP, _Port}} = inet:peername(Socket),
- {next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ?TIMEOUT};
- 'WAIT_FOR_SOCKET'(Other, State) ->
- error_logger:error_msg("State: 'WAIT_FOR_SOCKET'. Unexpected message: ~p\n", [Other]),
- %% Allow to receive async messages
- {next_state, 'WAIT_FOR_SOCKET', State}.
- %% Notification event coming from client
- 'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) ->
- ok = handle_data(S, string:tokens(binary_to_list(Data), "/")),
- inet:setopts(S, [{active, once}]),
- {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT};
- 'WAIT_FOR_DATA'(timeout, State) ->
- error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]),
- {stop, normal, State};
- 'WAIT_FOR_DATA'(Data, State) ->
- io:format("~p Ignoring data: ~p\n", [self(), Data]),
- {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}.
- %%-------------------------------------------------------------------------
- %% Func: handle_event/3
- %% Returns: {next_state, NextStateName, NextStateData} |
- %% {next_state, NextStateName, NextStateData, Timeout} |
- %% {stop, Reason, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- handle_event(Event, StateName, StateData) ->
- {stop, {StateName, undefined_event, Event}, StateData}.
- %%-------------------------------------------------------------------------
- %% Func: handle_sync_event/4
- %% Returns: {next_state, NextStateName, NextStateData} |
- %% {next_state, NextStateName, NextStateData, Timeout} |
- %% {reply, Reply, NextStateName, NextStateData} |
- %% {reply, Reply, NextStateName, NextStateData, Timeout} |
- %% {stop, Reason, NewStateData} |
- %% {stop, Reason, Reply, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- handle_sync_event(Event, _From, StateName, StateData) ->
- {stop, {StateName, undefined_event, Event}, StateData}.
- %%-------------------------------------------------------------------------
- %% Func: handle_info/3
- %% Returns: {next_state, NextStateName, NextStateData} |
- %% {next_state, NextStateName, NextStateData, Timeout} |
- %% {stop, Reason, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) ->
- % Flow control: enable forwarding of next TCP message
- inet:setopts(Socket, [{active, once}]),
- ?MODULE:StateName({data, Bin}, StateData);
- handle_info({tcp_closed, Socket}, _StateName,
- #state{socket=Socket, addr=Addr} = StateData) ->
- error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]),
- {stop, normal, StateData};
- handle_info(_Info, StateName, StateData) ->
- {noreply, StateName, StateData}.
- %%-------------------------------------------------------------------------
- %% Func: terminate/3
- %% Purpose: Shutdown the fsm
- %% Returns: any
- %% @private
- %%-------------------------------------------------------------------------
- terminate(_Reason, _StateName, #state{socket=Socket}) ->
- (catch gen_tcp:close(Socket)),
- ok.
- %%-------------------------------------------------------------------------
- %% Func: code_change/4
- %% Purpose: Convert process state when code is changed
- %% Returns: {ok, NewState, NewStateData}
- %% @private
- %%-------------------------------------------------------------------------
- code_change(_OldVsn, StateName, StateData, _Extra) ->
- {ok, StateName, StateData}.
- handle_data(S, ["vsns:", "save", Key, Value]) ->
- gen_tcp:send(S, list_to_binary(swap_undefined(name_server:save(Key, Value))));
- handle_data(S, ["vsns:", "load", Key]) ->
- gen_tcp:send(S, list_to_binary(swap_undefined(name_server:load(Key))));
- handle_data(S, ["vsns:", "load_all"]) ->
- name_server:load_all(),
- gen_tcp:send(S, <<"ack">>); % list_to_binary(name_server:load_all())
- handle_data(S, ["vsns:", "remove", Key]) ->
- gen_tcp:send(S, list_to_binary(swap_undefined(name_server:remove(Key))));
- handle_data(S, ["vsns:", "remove_all"]) ->
- name_server:remove_all(),
- gen_tcp:send(S, <<"ack">>); % list_to_binary(name_server:remove_all())
- handle_data(S, ["vsns:", "kernel_oops"]) ->
- gen_tcp:send(S, list_to_binary(name_server:kernel_oops()));
- handle_data(S, _Data) ->
- gen_tcp:send(S, <<"unknow">>).
- swap_undefined(undefined) ->
- "";
- swap_undefined(Other) ->
- Other.
- % File end.