大家好,这篇文章又晚了几天,这几天公司的事情比较多,每天晚上都3点多才到家,今天终于稍微不忙,回到家吃个饭,继续为大家带来这个系列的第八篇。
上一篇中,我们讲到了 cowboy_acceptor:acceptor/7 方法里的这一行代码:
cowboy_listener:add_connection(ListenerPid, default, Pid, OptsVsn),
我们来看下这个方法的这个参数分别是什么:
ListenerPid 是cowboy_listener工作进程的进程标识;
Pid 则是cowboy_requests_sup督程的子进程标识符;
OptsVsn = 1;
我们看下 cowboy_listener:add_connection/4 方法:
-spec add_connection(pid(), atom(), pid(), non_neg_integer()) -> ok | {upgrade, any(), non_neg_integer()}.add_connection(ServerPid, Pool, ConnPid, OptsVsn) -> gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn}, infinity).
ServerPid = ListenerPid;这里就是 cowboy_listener工作进程的进程标识,也就是当前模块。
Pool = default;
ConnPid = Pid;
OptsVsn = 1;
我们看下 cowboy_listener:handle_call/3 对 上面发出的消息的处理:
%% @private-spec handle_call(_, _, State) -> {reply, ignored, State} | {stop, normal, stopped, State}.handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{ req_pools=Pools, reqs_table=ReqsTable, queue=Queue, max_conns=MaxConns, proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ReqsTable), State2 = State#state{req_pools=Pools2}, if AccOptsVsn =/= LisOptsVsn -> {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2}; NbConns > MaxConns -> Queue2 = queue:in(From, Queue), {noreply, State2#state{queue=Queue2}}; true -> {reply, ok, State2} end;
不知道大家还记得 这个 gen_server 工作进程,是在什么时候启动的?并且 init/1 又做了些什么,我们在之前的 已经很详细的介绍了,大家可以看下,回忆回忆,这里我就不重复再说了,我把初始化的方法再贴下,方便看:
%% @private-spec init(list()) -> {ok, #state{}}.init([MaxConns, ProtoOpts]) -> ReqsTable = ets:new(requests_table, [set, private]), Queue = queue:new(), {ok, #state{reqs_table=ReqsTable, max_conns=MaxConns, proto_opts=ProtoOpts, queue=Queue}}.
我们回到 cowboy_listener:handle_call/3 来分析下具体的逻辑:这个方法接受三个参数:
如果你了解 gen_server OTP设计原则,你应该能知道这三个参数,分别是什么意思,{add_connection, Pool, ConnPid, AccOptsVsn} 这个参数是实际的请求发送过来的,也就是 gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn}, infinity). 的第二个参数;From 表示调用 gen_server:call/3 的进程标识符;State 则是gen_server的内部状态,我们在init/1方法已经初始化了这个状态。
我们来重点看下这行代码:
{NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ReqsTable),
我们看下 add_pid/4 方法:
%% @private-spec add_pid(pid(), atom(), pools(), ets:tid()) -> {non_neg_integer(), pools()}.add_pid(ConnPid, Pool, Pools, ReqsTable) -> MonitorRef = erlang:monitor(process, ConnPid), ConnPid ! {shoot, self()}, {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of false -> { 1, [{Pool, 1}|Pools]}; {Pool, NbConns} -> NbConns2 = NbConns + 1, {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} end, ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}), {NbConnsRet, Pools2}.
首先是:MonitorRef = erlang:monitor(process, ConnPid),
erlang doc:
这边,我还查看了下 《erlang 编程指南》 148页,关于这个函数的用法如下:
为了单向监控进程,可以把内置函数 erlang:monitor/2 添加到 Erlang中,如下调用:
erlang:monitor(process, Proc)
这样就生成了一个调用进程到另一个标记为 Proc 进程的监控进程,Proc 可以是进程标识符也可以是注册的名字。当带有进程标识符的进程终止的时候,消息:
{'DOWN', MonitorRef, Type, Object, Info} 会发送到监控进程。这个消息包含一个对监控进程的引用。
这个方法就到这,大家具体看看书,我之前看了一遍,没有理解,抽空再好好看看。
我们接着看:ConnPid ! {shoot, self()},还记得我们上一篇文章中,调用:cowboy_http_protocol:start_link/4 会新生成一个进程,并链接到调用进程,这个进程会 调用 init/4 方法,然后这个方法会停在 ok = cowboy:accept_ack(ListenerPid) 方法里,等待 {shoot, ListenerPid} 消息,同时 cowboy_http_protocol:start_link/4 这个方法会返回 {ok, Pid}. 这个 Pid 也就是后来传递到 cowboy_listener:add_pid/4 方法中的参数:ConnPid。当我们往这个进程标识发送一条消息:{shoot, self()}。
Pid 停在ok = cowboy:accept_ack(ListenerPid) 的方法,就会接收到消息 {shoot, ListenerPid},那么它就会继续执行,也就是执行:cowboy_http_protocol:init/4 方法的这一行代码:
wait_request(#state{listener=ListenerPid, socket=Socket, transport=Transport, dispatch=Dispatch, max_empty_lines=MaxEmptyLines, max_keepalive=MaxKeepalive, max_line_length=MaxLineLength, timeout=Timeout, onrequest=OnRequest, onresponse=OnResponse, urldecode=URLDec}).
其他参照代码如下:
cowboy_http_protocol:
%% @doc Start an HTTP protocol process.-spec start_link(pid(), inet:socket(), module(), any()) -> {ok, pid()}.start_link(ListenerPid, Socket, Transport, Opts) -> Pid = spawn_link(?MODULE, init, [ListenerPid, Socket, Transport, Opts]), {ok, Pid}.%% FSM.%% @private-spec init(pid(), inet:socket(), module(), any()) -> ok.init(ListenerPid, Socket, Transport, Opts) -> Dispatch = proplists:get_value(dispatch, Opts, []), MaxEmptyLines = proplists:get_value(max_empty_lines, Opts, 5), MaxKeepalive = proplists:get_value(max_keepalive, Opts, infinity), MaxLineLength = proplists:get_value(max_line_length, Opts, 4096), OnRequest = proplists:get_value(onrequest, Opts), OnResponse = proplists:get_value(onresponse, Opts), Timeout = proplists:get_value(timeout, Opts, 5000), URLDecDefault = { fun cowboy_http:urldecode/2, crash}, URLDec = proplists:get_value(urldecode, Opts, URLDecDefault), ok = cowboy:accept_ack(ListenerPid), wait_request(#state{listener=ListenerPid, socket=Socket, transport=Transport, dispatch=Dispatch, max_empty_lines=MaxEmptyLines, max_keepalive=MaxKeepalive, max_line_length=MaxLineLength, timeout=Timeout, onrequest=OnRequest, onresponse=OnResponse, urldecode=URLDec}).
-spec wait_request(#state{}) -> ok.wait_request(State=#state{socket=Socket, transport=Transport, timeout=T, buffer=Buffer}) -> case Transport:recv(Socket, 0, T) of {ok, Data} -> parse_request(State#state{ buffer= << Buffer/binary, Data/binary >>}); {error, _Reason} -> terminate(State) end.
cowboy:
%% @doc Acknowledge the accepted connection.%%%% Effectively used to make sure the socket control has been given to%% the protocol process before starting to use it.-spec accept_ack(pid()) -> ok.accept_ack(ListenerPid) -> receive {shoot, ListenerPid} -> ok end.
好了,这篇文章,比较乱,没关系,大家多看几遍源码,我会在以后的文章,尝试用图来描述,应该会更好一些,很抱歉,能力有限。
最后,我将在下一篇继续从 cowboy_http_protocol:wait_request/1 和 cowboy_listener:add_pid/4 来分解Cowboy代码,敬请期待,谢谢。