Re: openresty-resty-redis subscribe失败报异常

86 views
Skip to first unread message
Message has been deleted

Zexuan Luo

unread,
Nov 7, 2016, 8:46:34 PM11/7/16
to openresty
应该是 subscribe 而不是 subcribe 吧

在 2016年11月7日星期一 UTC+8下午5:45:27,stone写道:
hi 春哥&各位,在用openresty做redis pub/sub时部是报 :
app.lua:37: attempt to call method 'subcribe' (a nil value)
不知道啥原因

麻烦大神帮看看,thanks ,整体代码如下:

--
-- Created by IntelliJ IDEA.
-- User: Administrator
-- Date: 2016/11/3
-- Time: 16:06
-- To change this template use File | Settings | File Templates.
--
local semaphore = require('ngx.semaphore')
local redis = require('resty.redis')
local server = require('resty.websocket.server')
local strings = require('stringutil')
return function()

   
local M = {
       
client_id = nil,
       
closed = false
    }
--[[
   开启一个协程subscribe
 ]]
    local function listen()
       
local red, err = redis:new()
       
if not red then
            ngx.log(ngx.ERR, 'failed to new sub redis: ', err)
           
return
        end
        red:set_timeout(1000)
       
local ok, err = red:connect("127.0.0.1", 6379)
       
if not ok then
            ngx.log(ngx.ERR, 'failed to connect to sub redis: ', err)
           
return
        end
        ngx.log(ngx.ERR, " success to connect to sub redis 1:")
       
M.sub_redis = red
       
--red:subcribe("lua_test_channel");
        --开新线程监听订阅,主循环结束时要等待其结束
        return ngx.thread.spawn(function()
           
ngx.log(ngx.ERR, " success to connect to sub redis 2:"..type(M.sub_redis.subscribe))

       
    M.sub_redis:subcribe("lua_test_channel");--这里会报异常


      M.sema:post(1)
           
ngx.log(ngx.ERR, " success to connect to sub redis 3:")
           
while not M.closed do
                ngx.log(ngx.ERR, " success to connect to sub redis 4:")
               
local ret, err = M.sub_redis:read_reply()
               
if not ret and err ~= 'timeout' then
                    ngx.log(ngx.ERR, 'failed to read reply: ', err)
                   
-- M.close()
                end
                if ret and ret[1] == 'message' then
                  --  local msg = ret[3]
                    local bs, err = M.sock:send_text(ret[3])
                   
if not bs then
                        ngx.log(ngx.ERR, 'failed to send text: ', err)
                       
--M.close()
                    end
                end
            end
        end)
   
end


    M.start = function()

       
-- ngx.say("hello lua")
       local sema, err = semaphore.new()
       
if not sema then
           ngx.log(ngx.ERR, 'failed to create semaphore: ', err)
           
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
       
end
       M.sema = sema

        --[[

           1. 主协程监听websocket,收到消息后publishredis

           2. 另开一个协程subscribe redis channel

         ]]

        local sock, err = server:new{
           
timeout = 3000, -- in milliseconds
            max_payload_len = 8192,
       
}

       
--当客户端意外关闭连接时的回调
       --[[ local ok, err = ngx.on_abort(function()
            ngx.log(ngx.ERR, "closed", err)
        end)
        if not ok then
            ngx.log(ngx.ERR, 'failed to register the on_abort callback: ', err)
            ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
        end]]

        --创建 socket对象
        if not sock then
            ngx.log(ngx.ERR, "create websocket server error", err)
           
ngx.exit(ngx.HTTP_CLOSE) --异常退出。这里只是退出当前请求,不是nginx整体退出
        end
        --赋值到M对象
        M.sock = sock
        --创建用来puhblisredis连接
        local pub_redis, err = redis:new()
       
if not pub_redis then
            ngx.log(ngx.ERR, 'failed to new redis: ', err)
           
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
       
end

        --redis连接
        pub_redis:set_timeout(1000)
       
local ok, err = pub_redis:connect("127.0.0.1", 6379)
       
if not ok then
            ngx.log(ngx.ERR, 'failed to connect to redis: ', err)
           
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
       
end
       M.pub_redis = pub_redis



        --主协程
        while not M.closed do
            if M.sub_redis == nil then
                listen() --订阅
                M.sema:wait(3)
           
end
            local data, typ, err = sock:recv_frame()
           
if sock.fatal then
                ngx.log(ngx.ERR, "failed to receive frame: ", err)

               
return ngx.exit(444)
            end
            ngx.log(ngx.ERR, " do type:")
           
--没有data 发送ping
            if not data then
                local bytes, err = sock:send_ping()
               
if not bytes then
                    ngx.log(ngx.ERR, "failed to send ping:", err)
                   
return ngx.exit(444)
                end
                -- close
            elseif typ == "close" then break
            --pong命令

            elseif typ == "ping" then
                local bytes, err = sock:send_pong()
               
if not bytes then
                    ngx.log(ngx.ERR, "failed to send pong:", err)
                   
return ngx.exit(444)
                end
                --client pong
            elseif typ == "pong" then
                ngx.log(ngx.INFO, "client ponged")
               
--握手完成,接收到业务数据
            elseif typ == "text" then

                local result = strings().split(data, "#")
               
M.client_id = result[1]
               
local bytes, err = sock:send_text(data)
               
if not bytes then
                    ngx.log(ngx.ERR, "failed to send text:", err)
                   
return ngx.exit(444)
                end
                pub_redis:publish("lua_test_channel", data)
           
end
        end


    end
    return M
end



Reply all
Reply to author
Forward
Message has been deleted
0 new messages