博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Cowboy 源码分析(八)
阅读量:5132 次
发布时间:2019-06-13

本文共 6163 字,大约阅读时间需要 20 分钟。

  大家好,这篇文章又晚了几天,这几天公司的事情比较多,每天晚上都3点多才到家,今天终于稍微不忙,回到家吃个饭,继续为大家带来这个系列的第八篇。

  上一篇中,我们讲到了 cowboy_acceptor:acceptor/7 方法里的这一行代码:

  cowboy_listener:add_connection(ListenerPid, default, Pid, OptsVsn),

  我们来看下这个方法的这个参数分别是什么:

  ListenerPid 是cowboy_listener工作进程的进程标识;

  Pid 则是cowboy_requests_sup督程的子进程标识符;

  OptsVsn = 1;

  我们看下 cowboy_listener:add_connection/4 方法:

-spec add_connection(pid(), atom(), pid(), non_neg_integer())    -> ok | {upgrade, any(), non_neg_integer()}.add_connection(ServerPid, Pool, ConnPid, OptsVsn) ->    gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn},        infinity).

  ServerPid = ListenerPid;这里就是 cowboy_listener工作进程的进程标识,也就是当前模块。

  Pool = default;

  ConnPid = Pid;

  OptsVsn = 1;

  我们看下 cowboy_listener:handle_call/3 对 上面发出的消息的处理:

%% @private-spec handle_call(_, _, State)    -> {reply, ignored, State} | {stop, normal, stopped, State}.handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{        req_pools=Pools, reqs_table=ReqsTable,        queue=Queue, max_conns=MaxConns,        proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->    {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ReqsTable),    State2 = State#state{req_pools=Pools2},    if    AccOptsVsn =/= LisOptsVsn ->            {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2};        NbConns > MaxConns ->            Queue2 = queue:in(From, Queue),            {noreply, State2#state{queue=Queue2}};        true ->            {reply, ok, State2}    end;

  不知道大家还记得 这个 gen_server 工作进程,是在什么时候启动的?并且 init/1 又做了些什么,我们在之前的 已经很详细的介绍了,大家可以看下,回忆回忆,这里我就不重复再说了,我把初始化的方法再贴下,方便看:

%% @private-spec init(list()) -> {ok, #state{}}.init([MaxConns, ProtoOpts]) ->    ReqsTable = ets:new(requests_table, [set, private]),    Queue = queue:new(),    {ok, #state{reqs_table=ReqsTable, max_conns=MaxConns,        proto_opts=ProtoOpts, queue=Queue}}.

  我们回到 cowboy_listener:handle_call/3 来分析下具体的逻辑:这个方法接受三个参数:

  如果你了解 gen_server OTP设计原则,你应该能知道这三个参数,分别是什么意思,{add_connection, Pool, ConnPid, AccOptsVsn} 这个参数是实际的请求发送过来的,也就是 gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn}, infinity). 的第二个参数;From 表示调用 gen_server:call/3 的进程标识符;State是gen_server的内部状态,我们在init/1方法已经初始化了这个状态。

  我们来重点看下这行代码:

  {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ReqsTable),

  我们看下 add_pid/4 方法:

%% @private-spec add_pid(pid(), atom(), pools(), ets:tid())    -> {non_neg_integer(), pools()}.add_pid(ConnPid, Pool, Pools, ReqsTable) ->    MonitorRef = erlang:monitor(process, ConnPid),    ConnPid ! {shoot, self()},    {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of        false ->            {
1, [{Pool, 1}|Pools]}; {Pool, NbConns} -> NbConns2 = NbConns + 1, {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} end, ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}), {NbConnsRet, Pools2}.

  首先是MonitorRef = erlang:monitor(process, ConnPid),

  erlang doc:

  这边,我还查看了下 《erlang 编程指南》 148页,关于这个函数的用法如下:

    为了单向监控进程,可以把内置函数 erlang:monitor/2 添加到 Erlang中,如下调用:

      erlang:monitor(process, Proc) 

    这样就生成了一个调用进程到另一个标记为 Proc 进程的监控进程,Proc 可以是进程标识符也可以是注册的名字。当带有进程标识符的进程终止的时候,消息:

      {'DOWN', MonitorRef, Type, Object, Info} 会发送到监控进程。这个消息包含一个对监控进程的引用。

  这个方法就到这,大家具体看看书,我之前看了一遍,没有理解,抽空再好好看看。

  我们接着看ConnPid ! {shoot, self()},还记得我们上一篇文章中,调用:cowboy_http_protocol:start_link/4 会新生成一个进程,并链接到调用进程,这个进程会 调用 init/4 方法,然后这个方法会停在 ok = cowboy:accept_ack(ListenerPid) 方法里,等待 {shoot, ListenerPid} 消息,同时 cowboy_http_protocol:start_link/4 这个方法会返回 {ok, Pid}.  这个 Pid 也就是后来传递到 cowboy_listener:add_pid/4 方法中的参数:ConnPid。当我们往这个进程标识发送一条消息:{shoot, self()}

  Pid 停在ok = cowboy:accept_ack(ListenerPid) 的方法,就会接收到消息 {shoot, ListenerPid},那么它就会继续执行,也就是执行:cowboy_http_protocol:init/4 方法的这一行代码:

wait_request(#state{listener=ListenerPid, socket=Socket, transport=Transport,        dispatch=Dispatch, max_empty_lines=MaxEmptyLines,        max_keepalive=MaxKeepalive, max_line_length=MaxLineLength,        timeout=Timeout, onrequest=OnRequest, onresponse=OnResponse,        urldecode=URLDec}).

  其他参照代码如下:

  cowboy_http_protocol

%% @doc Start an HTTP protocol process.-spec start_link(pid(), inet:socket(), module(), any()) -> {ok, pid()}.start_link(ListenerPid, Socket, Transport, Opts) ->    Pid = spawn_link(?MODULE, init, [ListenerPid, Socket, Transport, Opts]),    {ok, Pid}.%% FSM.%% @private-spec init(pid(), inet:socket(), module(), any()) -> ok.init(ListenerPid, Socket, Transport, Opts) ->    Dispatch = proplists:get_value(dispatch, Opts, []),    MaxEmptyLines = proplists:get_value(max_empty_lines, Opts, 5),    MaxKeepalive = proplists:get_value(max_keepalive, Opts, infinity),    MaxLineLength = proplists:get_value(max_line_length, Opts, 4096),    OnRequest = proplists:get_value(onrequest, Opts),    OnResponse = proplists:get_value(onresponse, Opts),    Timeout = proplists:get_value(timeout, Opts, 5000),    URLDecDefault = {
fun cowboy_http:urldecode/2, crash}, URLDec = proplists:get_value(urldecode, Opts, URLDecDefault), ok = cowboy:accept_ack(ListenerPid), wait_request(#state{listener=ListenerPid, socket=Socket, transport=Transport, dispatch=Dispatch, max_empty_lines=MaxEmptyLines, max_keepalive=MaxKeepalive, max_line_length=MaxLineLength, timeout=Timeout, onrequest=OnRequest, onresponse=OnResponse, urldecode=URLDec}).
-spec wait_request(#state{}) -> ok.wait_request(State=#state{socket=Socket, transport=Transport,        timeout=T, buffer=Buffer}) ->    case Transport:recv(Socket, 0, T) of        {ok, Data} -> parse_request(State#state{            buffer= << Buffer/binary, Data/binary >>});        {error, _Reason} -> terminate(State)    end.

  cowboy

%% @doc Acknowledge the accepted connection.%%%% Effectively used to make sure the socket control has been given to%% the protocol process before starting to use it.-spec accept_ack(pid()) -> ok.accept_ack(ListenerPid) ->    receive {shoot, ListenerPid} -> ok end.

  好了,这篇文章,比较乱,没关系,大家多看几遍源码,我会在以后的文章,尝试用图来描述,应该会更好一些,很抱歉,能力有限。

  最后,我将在下一篇继续从 cowboy_http_protocol:wait_request/1 和 cowboy_listener:add_pid/4 来分解Cowboy代码,敬请期待,谢谢。

  

 

转载于:https://www.cnblogs.com/yourihua/archive/2012/05/24/2515380.html

你可能感兴趣的文章
Centos修改默认网卡名
查看>>
sql server2012中使用convert来取得datetime数据类型样式(全)
查看>>
noip2012借教室
查看>>
实现类似add(1)(2)(3)的效果
查看>>
期中总结
查看>>
冲刺四
查看>>
谷歌技术"三宝"之BigTable(转)
查看>>
第五次作业
查看>>
[leetcode-117]填充每个节点的下一个右侧节点指针 II
查看>>
洛谷 P2622 关灯问题II (状态压缩+BFS)
查看>>
LeetCode Jump Game
查看>>
centos6下安装git
查看>>
#C++初学记录(算法测试2019/5/5)(深度搜索)
查看>>
数据从mysql迁移至oracle时知识点记录(一)
查看>>
[No0000120]Python教程3/9-第一个Python程序
查看>>
java-web乱码问题解决
查看>>
222 Count Complete Tree Nodes
查看>>
SpringMVC拦截器
查看>>
Java Database Derby Instance 2
查看>>
[Django]我的第一个网页,报错啦~(自己实现过程中遇到问题以及解决办法)
查看>>