答复: 使用openresty的一个问题, 求教!

623 views
Skip to first unread message

林谡

unread,
Apr 28, 2015, 6:26:24 AM4/28/15
to open...@googlegroups.com, age...@gmail.com, 吴炳锡, 刘峰, 刘九星, 吕颖, 武志国

接上回的问题, 尝试用tcp发送请求和应答,   试了多次都是 只能发和收, 同时做不行

原因应是:  Starting from the 0.9.9 release, the cosocket object here is full-duplex, that is, a reader "light thread" and a writer "light thread" can operate on a single cosocket object simultaneously (both "light threads" must belong to the same Lua handler though, see reasons above). 

 

报错信息:

lua entry thread aborted: runtime error: init_worker_by_lua:31: bad request

stack traceback:

coroutine 0:

        [C]: in function 'send'

 

 

我怎么才能做到同时收发呢求教目前的代码如下

 

    lua_shared_dict my_locks 100k;

 

    init_worker_by_lua '

   

            transaction = {};

           local tcpsock;

          

           local create_socket = function()

                  if not tcpsock then

                      tcpsock = ngx.socket.tcp();

                      ngx.log(ngx.WARN,"try connect...");

                      local ok, err = tcpsock:connect("10.10.50.115",9013);

                      if not ok then

                           ngx.log(ngx.ERR,"fail to connect ",err);

                           tcpsock = nil;

                      end;

                      tcpsock:settimeout(10000);

                   end;

                   

            end;

          

          

           local lock = require "resty.lock";     

            local function save(key,value)

                  local trans_lock = lock:new("my_locks");

                  trans_lock:lock(key);

                  transaction[key]= {trans_lock};

                  return;

            end;

           

            function send_timer(premature,key,value)

                  create_socket();

                  local bytes, err = tcpsock:send(key.."="..value);

                  ngx.log(ngx.WARN, bytes," bytes are sent");         

            end;

           

            function  transaction.send(key,value)

                 ngx.log(ngx.WARN,"in transaction.send");

                  save(key,value);

                  ngx.timer.at(0,send_timer,key,value);

                 

                  local lock = require "resty.lock"

                  local trans_lock = lock:new("my_locks");

                 local elapsed, err = trans_lock:lock(key);

                   ngx.say(elapsed, ", ", err);

                 trans_lock:unlock(key);

                 local res = transaction[key][2];

                 transaction[key] = nil;

                 return res;                

            end;

           

                      

          local timer = function(premature)

             while true do

                       if not premature then

                          create_socket();

                         -- ngx.log(ngx.WARN,"in timer");

                           local line, err, partial = tcpsock:receive()

                            if not line then

                                  ngx.log(ngx.ERR, "failed to read a line: ", err);

                             --     res = string.find(err,".*timed out");

                               --   if not res then

                               --          tcpsock = nil;

                                 -- end;

                            else               

                             local _,_,k,v = string.find(line, "(%a+)%s*=%s*(%a+)")

                             ngx.log(ngx.WARN, "KEY IS "..k," VALUE IS "..v);

                             transaction[k][2]=v.." is set ok";

                             local ok, err = transaction[k][1]:unlock(k);

                             if not ok then

                                  ngx.say("failed to unlock: ", err)

                              end;                  

                             end;            

                         end;

                     end;

             end;

 

           ngx.timer.at(0,timer);

           

    ';

    server {

        listen       80;

        server_name  localhost;

 

        #charset koi8-r;

 

        #access_log  logs/host.access.log  main;

 

         location /linsu {      

         content_by_lua '

                  if not transaction then

                      ngx.say "transaction is nil";

                      return;

                  end;

                  local rsp = transaction.send(ngx.var.arg_key, ngx.var.arg_value);

                  ngx.say(rsp);

         ';

         }

 

发件人: 林谡
发送时间: 2015427 16:20
收件人: 'open...@googlegroups.com'; 'age...@gmail.com'
抄送: 吴炳锡; 刘峰; 刘九星; 吕颖; 段牧; 高磊; 武志国
主题: 使用openresty的一个问题, 求教!

 

前段时间给agentzh 发了一个邮件描述了我们的一个需求如下:

“我们使用nginx做客户端web访问的接入, 后端各种service提供服务,service对外的接口是公司内部实现的异步rpc协议(所有请求可以共用一个tcp连接请求,每个请求有唯一的sequence标记, 应答异步, 访问者通过sequence来匹配 requestresponse; 希望的流程是Nginx收到http request,处理模块 把请求转成内部rpc格式 通过一个tcp连接(所有处理共用的)向后端异步发,同时保持http request上下文,当后端应答回来后, 要匹配到请求的上下文,然后给http response

 需求特别说明的是这个过程与redis mysql访问不同,它们还是一种同步的机制,一个连接上只能并发一个请求。请问,我们需要的这个机制ngx_lua能提供吗?

 

Agentzh:回复如下:

可以。你可以在 init_worker_by_lua 中通过 ngx.timer.at()

函数创建一个背景工作线程(还是“轻线程),然后所有的后端请求都委托给这个轻线程来分发。这个背景轻线程保持一个到后端的唯一连接。各个下游请求的处理程序(比如

content_by_lua)可以通过 lua-resty-lock [1] 来和工作轻线程同步。你可以使用每 nginx worker

进程一个后端连接的模式,也可以使用整个 nginx 实例一个后端连接的模式。对于后者,你需要通过共享内存字典来跨 worker

进程分发到后端的请求和应答数据,同时你需要确保只有一个 worker 能创建 timer(即背景工作线程)。

 

建议加入 openresty 中文邮件列表讨论这样的问题,谢谢合作!请见 http://openresty.org/#Community

 

按照这个思路我写的一个demo

http{

  lua_shared_dict my_locks 100k;

    init_worker_by_lua '

            transation = {};

            local lock = require "resty.lock";     

             local check_transation = function(premature,key)

            if not premature then

                    ngx.log(ngx.WARN, "key is "..key);

                    ngx.log(ngx.WARN, "transation[key][2] is "..transation[key][2]);

                                    local ok, err = transation[key][1]:unlock(key);

                                   if not ok then

                                       ngx.say("failed to unlock: ", err)

                                   end

                          ngx.log(ngx.WARN,"in timer");

                                    transation[key][3]=transation[key][2].." is set ok";

                  end

        end

          

            local function save(key,value)

                local trans_lock = lock:new("my_locks");

                trans_lock:lock(key);

                transation[key]= {trans_lock,value};

                ngx.timer.at(3,check_transation,key);

                      return;

           end;

          

           function  transation.send(key,value)

                      save(key,value);

                      local lock = require "resty.lock"

                      local trans_lock = lock:new("my_locks");

                 local elapsed, err = trans_lock:lock(key);

                   ngx.say(elapsed, ", ", err);

                 trans_lock:unlock(key);

                 local res = transation[key][3];

                 transation[key] = nil;

                 return res;               

           end;

    ';

    server {

        listen       80;

         location /linsu {      

         content_by_lua '

                  local rsp = transation.send(ngx.var.arg_key, ngx.var.arg_value);

                  ngx.say(rsp);

         ';

         }

}

}

 

测试结果如下:

[root@Master-01 ~]# curl '127.0.0.1/linsu?key=color&value=red'

3.011, nil

red is set ok

 

思路总结一下: 就是client要保存一对key&value nginx异步处理告诉客户端处理成功了。

为了这个需求,我觉得需要解决2个方面的问题

1.      把所有请求从一个socket发出去和从同一个socket等应答回来;这个我在例子里还没有写,只是简单用一个timer来做异步回调处理,但我通过一些小实验觉得在init_worker_by_lua send 方法创建一个socket, 只创建一次, send方法把请求发出去,然后一个ngx.timer.at 创建一个死循环方法,不停地socket.receive,接收rsp,这些应该没有问题,我会继续验证。

2.      content_by_lua所在的entrance thread怎么和异步应答同步的问题。

例子中我采用了agentzh的建议,使用“resty.lock”, 这个的确是成功的,如果socket的问题也解决了,那么我觉得最初的需求被满足了。

但是看resty.lock介绍,似乎里面用的是ngx.sleep, 通过step wise的思路来等锁被unlock 这个感觉效率上有些低, 不是很满意。

 

另外我觉得entrance thread 也是一个coroutine,  但是通过实验我发现 entrance threadngx.thread.spawn出来的轻线程中如果yield 其实只是停一下,很快被ngx_lua scheduler重新调起了, 无法停下来等什么事件发生。

我的需求是:entrance thread 把自己 (self= corouting.running())保存在一个全局table里,比如说替代我例子中的那个lock 下一步yield,然后就真的停下来,ngx_lua scheduler不会重新调起这个thread,然后异步应答回来后,会找到这个selfself.resume(),entrance thread继续运行把结果给client. 这个办法是我在java中最常用的办法,java中虽然没有coroutine这个东西,但是道理是相通的。

这个需求能有办法做到吗?

 

Thx all.

 

 

 

 

 

林谡

unread,
May 4, 2015, 10:22:51 PM5/4/15
to open...@googlegroups.com, age...@gmail.com, 吴炳锡, 刘峰, 刘九星, 吕颖, 武志国, 段牧, 高磊

 需求: 我们使用nginx做客户端web访问的接入, 后端各种service提供服务,service对外的接口是公司内部实现的异步rpc协议(所有请求可以共用一个tcp连接请求,每个请求有唯一的sequence标记, 应答异步, 访问者通过sequence来匹配 requestresponse; 希望的流程是Nginx收到http request,处理模块 把请求转成内部rpc格式 通过一个tcp连接(所有处理共用的)向后端异步发,同时保持http request上下文,当后端应答回来后, 要匹配到请求的上下文,然后给http response

 需求特别说明的是这个过程与redis mysql访问不同,它们还是一种同步的机制,一个连接上只能并发一个请求。

 

终于写完了一个完整的demo, 完成了上面的需求,

但是又2点不满意的地方

1.      content_by_lua所在的entrance thread怎么和异步应答同步的问题。

例子中我采用了agentzh的建议,使用“resty.lock”, 这个的确是成功的,如果socket的问题也解决了,那么我觉得最初的需求被满足了。

但是看resty.lock介绍,似乎里面用的是ngx.sleep, 通过step wise的思路来等锁被unlock 这个感觉效率上有些低, 不是很满意。

 

另外我觉得entrance thread 也是一个coroutine,  但是通过实验我发现 entrance threadngx.thread.spawn出来的轻线程中如果yield 其实只是停一下,很快被ngx_lua scheduler重新调起了, 无法停下来等什么事件发生。

我的需求是:entrance thread 把自己 (self= corouting.running())保存在一个全局table里,比如说替代我例子中的那个lock 下一步yield,然后就真的停下来,ngx_lua scheduler不会重新调起这个thread,然后异步应答回来后,会找到这个selfself.resume(),entrance thread继续运行把结果给client. 这个办法是我在java中最常用的办法,java中虽然没有coroutine这个东西,但是道理是相通的。

2.      entrance thread 通过 init_worker_by_luatimersendloop向外发请求, 怎么同步的问题,目前只能使用sleep方法扫描能有一个worker内的blocking queque东西吗? Blocking

不阻塞,但是也不用sleep的手段实现,而是用nginx的异步事件的机制,就和socket.receive一样.

 

 init_worker_by_lua '

            transaction = {};

           --local tcpsock;

          

           local create_socket = function()

                  if not tcpsock then

                      tcpsock = ngx.socket.tcp();

                      ngx.log(ngx.WARN,"try connect...");

                      local ok, err = tcpsock:connect("10.10.50.115",9013);

                      if not ok then

                          ngx.log(ngx.ERR,"fail to connect ",err);

                           tcpsock = nil;

                      end;

                   end;          

            end;

          

          

           local lock = require "resty.lock";     

            local function save(key,value)

                  local trans_lock = lock:new("my_locks");

                  trans_lock:lock(key);

                  transaction[key]= {trans_lock};

                  return;

            end;

           

         local timer = function(premature)

              ngx.log(ngx.WARN,"recv_loop");

             while true do

                 create_socket();

                -- ngx.log(ngx.WARN,"in timer");

                  local line, err, partial = tcpsock:receive()

                  if not line then

                          ngx.log(ngx.ERR, "failed to read a line: ", err);

                     --     res = string.find(err,".*timed out");

                       --   if not res then

                       --          tcpsock = nil;

                         -- end;

                    else               

                          local _,_,k,v = string.find(line, "(%a+)%s*=%s*(%a+)")

                          ngx.log(ngx.WARN, "KEY IS "..k," VALUE IS "..v);

                          transaction[k][2]=v.." is set ok";

                          local ok, err = transaction[k][1]:unlock(k);

                          if not ok then

                                   ngx.say("failed to unlock: ", err)

                          end;

                                               

                    end;           

                end;

             end;

            

            local sendvalue;     

            local sendloop;

            

            function send_loop()

                 while true do

                    if not sendvalue then

                            ngx.sleep(0.01);

                    else

                              local bytes, err = tcpsock:send(sendvalue); --key.."="..value

                              if not bytes then

                                    ngx.log(ngx.ERR,"send failed reason is ",err);

                              else

                                    ngx.log(ngx.WARN, bytes," bytes are sent");

                                    sendvalue = nil;

                             end;

                      end;                  

                   end;

            end;

           

        

           

            local sendtimer = function(premature)

                  create_socket();

                  ngx.thread.spawn(timer);

                  send_loop();

            end;

        

         function  transaction.send(key,value)

                  ngx.log(ngx.WARN,"in transaction.send");

                  save(key,value);

                  sendvalue = key.."="..value;

                  local lock = require "resty.lock"

                  local trans_lock = lock:new("my_locks");

                  local elapsed, err = trans_lock:lock(key);

                  ngx.say(elapsed, ", ", err);

                  trans_lock:unlock(key);

                  local res = transaction[key][2];

                  transaction[key] = nil;

                  return res;               

         end;

         ngx.timer.at(0,sendtimer);   

 

           

    ';

    server {

        listen       80;

        server_name  localhost;

 

        #charset koi8-r;

 

        #access_log  logs/host.access.log  main;

 

         location /linsu {      

         content_by_lua '

                  if not transaction then

                      ngx.say "transaction is nil";

                      return;

                  end;

                  local rsp = transaction.send(ngx.var.arg_key, ngx.var.arg_value);

                  ngx.say(rsp);

         ';

         }

 

发件人: 林谡
发送时间: 2015428 18:26
收件人: 'open...@googlegroups.com'; 'age...@gmail.com'
抄送: 吴炳锡; 刘峰; 刘九星; 吕颖; 武志国
主题: 答复: 使用openresty的一个问题, 求教!

3.      把所有请求从一个socket发出去和从同一个socket等应答回来;这个我在例子里还没有写,只是简单用一个timer来做异步回调处理,但我通过一些小实验觉得在init_worker_by_lua send 方法创建一个socket, 只创建一次, send方法把请求发出去,然后一个ngx.timer.at 创建一个死循环方法,不停地socket.receive,接收rsp,这些应该没有问题,我会继续验证。

4.      content_by_lua所在的entrance thread怎么和异步应答同步的问题。

Yichun Zhang (agentzh)

unread,
May 7, 2015, 1:55:45 AM5/7/15
to openresty
Hello!

抱歉,回复得有些晚了。因为最近我一直都在中国国内休假 :)

请见我下面的回复。

2015-04-28 18:25 GMT+08:00 林谡:
> 接上回的问题, 尝试用tcp发送请求和应答, 试了多次都是 只能发和收, 同时做不行.
>

你不应把 cosocket 对象缓存在全局共享的变量中,因为 cosocket 对象的生命期是和创建它的 Lua 轻线程绑定在一起的。

比如你是在 ngx.timer.at 的回调里创建的 cosocket,就只应该在那个回调的上下文中使用该 cosocket.

> 原因应是: Starting from the 0.9.9 release, the cosocket object here is
> full-duplex, that is, a reader "light thread" and a writer "light thread"
> can operate on a single cosocket object simultaneously (both "light threads"
> must belong to the same Lua handler though, see reasons above).
>

并不相干,见上。

> 报错信息:
> lua entry thread aborted: runtime error: init_worker_by_lua:31: bad request
>

你这里得到的 bad request 的错误是因为你在错误的上下文中使用了 cosocket 而不是因为同时收发。

另外值得一提的是,由于你让每个 worker 都创建一个 timer,所以不应使用 lua-resty-lock,因为
lua-resty-lock 是跨越所有 nginx worker 进程共享的。你可以仿照 lua-resty-lock 的实现使用
ngx.sleep() 实现一个每 worker 的同步机制。

在实现上,你的 ngx.timer.at 回调函数里应该有两个轻线程,一个从 cosocket 读取,一个向 cosocket 写入。

你的请求处理程序(即 content_by_lua 代码)通过 Lua 模块级别的 Lua table,和 timer 线程共享任务队列,见

https://github.com/openresty/lua-nginx-module#data-sharing-within-an-nginx-worker

这样,timer 线程全权负责 socket 通信,各个请求的 content_by_lua 只通过 Lua table 这样的任务队列向
timer 线程发布新请求和获取响应数据。请求线程和 timer 线程通过基于 ngx.sleep 的 lock 进行同步。

通过 ngx.sleep 确实会引入一点额外的延时,但方便跨 worker 进程工作。未来,ngx_lua 会提供每 worker
进程级别的 ngx.semophore API,届时用于这里的场景就更加理想了。

Regards,
-agentzh

林谡

unread,
May 7, 2015, 11:14:55 PM5/7/15
to open...@googlegroups.com
非常感谢您的回答
后来我自己也搞明白了socket为什么会是bad request问题,这个解决了。也给您发过第三个邮件。
通过您的回答也知道了目前只有ngx.sleep 解决同步问题。但的确感觉差点, 需要的ngx.wait(事件), 事件发生了,wait就返回这种。
感觉就是socke.receive 这种的。
这种能多久有支持?
还有对您 todo list中的tcp server很感兴趣,这个计划能什么时候出来, 有计划打算做一个非http server, 有nginx那么高效,但是又不用c去改nginx的代码。




-----邮件原件-----
发件人: open...@googlegroups.com [mailto:open...@googlegroups.com] 代表 Yichun Zhang (agentzh)
发送时间: 2015年5月7日 13:56
收件人: openresty
主题: Re: [openresty] 答复: 使用openresty的一个问题, 求教!
--
--
邮件来自列表“openresty”,专用于技术讨论!
订阅: 请发空白邮件到 openresty...@googlegroups.com
发言: 请发邮件到 open...@googlegroups.com
退订: 请发邮件至 openresty+...@googlegroups.com
归档: http://groups.google.com/group/openresty
官网: http://openresty.org/
仓库: https://github.com/agentzh/ngx_openresty
教程: http://openresty.org/download/agentzh-nginx-tutorials-zhcn.html

Yichun Zhang (agentzh)

unread,
May 7, 2015, 11:18:55 PM5/7/15
to openresty
Hello!

2015-05-08 11:14 GMT+08:00 林谡 <li...@feinno.com>:
> 这种能多久有支持?

ngx.semaphore 的支持应该会在今年实现吧!当然,如果你能贡献补丁,显然可以加快这个进程。

> 还有对您 todo list中的tcp server很感兴趣,这个计划能什么时候出来, 有计划打算做一个非http server, 有nginx那么高效,但是又不用c去改nginx的代码。
>

最新的 nginx 1.9.0 添加了 stream 子系统,所以我们可以基于它搞一个 ngx_stream_lua
模块。需要仔细考虑的是如何让它和现有的 ngx_http_lua 模块共享尽可能多的代码 :) 这个或许也可以在今年发生?

Regards,
-agentzh

zp J

unread,
Feb 4, 2020, 6:04:11 AM2/4/20
to openresty

我的需求是:entrance thread 把自己 (self= corouting.running())保存在一个全局table里,比如说替代我例子中的那个lock 下一步yield,然后就真的停下来,ngx_lua scheduler不会重新调起这个thread,然后异步应答回来后,会找到这个selfself.resume(),entrance thread继续运行把结果给client. 这个办法是我在java中最常用的办法,java中虽然没有coroutine这个东西,但是道理是相通的。

这个需求能有办法做到吗?


请问这个思路可以实现吗?

DeJiang Zhu

unread,
Feb 6, 2020, 8:22:01 AM2/6/20
to open...@googlegroups.com
现在已经有 ngx.sempahore 了,用这个应该可以很好的满足需求了

https://github.com/openresty/lua-resty-core/blob/master/lib/ngx/semaphore.md


zp J <jzp....@gmail.com> 于2020年2月4日周二 下午7:04写道:

我的需求是:entrance thread 把自己 (self= corouting.running())保存在一个全局table里,比如说替代我例子中的那个lock 下一步yield,然后就真的停下来,ngx_lua scheduler不会重新调起这个thread,然后异步应答回来后,会找到这个selfself.resume(),entrance thread继续运行把结果给client. 这个办法是我在java中最常用的办法,java中虽然没有coroutine这个东西,但是道理是相通的。

这个需求能有办法做到吗?


请问这个思路可以实现吗?

--
--
邮件来自列表“openresty”,专用于技术讨论!
订阅: 请发空白邮件到 openresty...@googlegroups.com
发言: 请发邮件到 open...@googlegroups.com
退订: 请发邮件至 openresty+...@googlegroups.com
归档: http://groups.google.com/group/openresty
官网: http://openresty.org/
仓库: https://github.com/agentzh/ngx_openresty
教程: http://openresty.org/download/agentzh-nginx-tutorials-zhcn.html
---
您收到此邮件是因为您订阅了Google网上论坛上的“openresty”群组。
要退订此群组并停止接收此群组的电子邮件,请发送电子邮件到openresty+...@googlegroups.com
要在网络上查看此讨论,请访问https://groups.google.com/d/msgid/openresty/82b08e71-4e5e-4a20-bbab-a263ae199198%40googlegroups.com
Reply all
Reply to author
Forward
0 new messages