如何实现并行发送的时候,其中只要有一个返回结果符合,就终止所有请求

70 views
Skip to first unread message

eric yuan

unread,
Jun 4, 2022, 7:22:24 AM6/4/22
to openresty
业务背景
项目是SSP广告业务系统
一个媒体终端会请求我们api(目前使用openresty搭建),我们根据规则,需要同时给多个上游广告主发起请求,因为业务的时间要求比较高,这个并发最长时间不超过300MS
如果超过500MS,全部中断,不管有没有返回
如果没有超过500MS,只要其中一个有返回内容,就终止整个并行


不知道该如何使用openresty进行完成此业务背景

请求有经验的大佬给予帮助

Junlong Li

unread,
Jun 4, 2022, 9:01:49 AM6/4/22
to openresty

eric yuan

unread,
Jun 12, 2022, 10:42:11 AM6/12/22
to openresty
今天查看了
使用ngx.thread.spawn 和 ngx.thread.wati 来实现


local function send_more_http(api_url)
            local httpc = http.new()
            local send_post_params ={}
            local res,err =httpc:request_uri(api_url,send_post_params)
            httpc:close()
            return res
        end

        local spawn = ngx.thread.spawn
        local wait = ngx.thread.wait
        local say = ngx.say

        local threads = {
            spawn(send_more_http, "http://httpbin.org/get?name=1"),
            spawn(send_more_http, "http://httpbin.org/get?name=2"),
            spawn(send_more_http, "http://httpbin.org/get?name=3")
        }
        for i = 1, #threads do
            local ok, res = wait(threads[i])
            if not ok then
                say(i, ": failed to run: ", res)
            else
                say(i, ": status: ", res.status)
                say(i, ": body: ", res.body)
            end
        end

因为我需要访问外部的api,我不知道我这样做是不是并发
其次我不知道如何在判断其中一个返回后,应该如何立刻中断,也不知道如何设置超时间
辛苦有经验的可以帮忙指点一下

Junlong li

unread,
Jul 7, 2022, 11:09:04 AM7/7/22
to openresty
```
       location /sema {
            content_by_lua_block {
                local ngx_now = ngx.now
                local ngx_say = ngx.say
                local ngx_sleep = ngx.sleep

                local semaphore = require "ngx.semaphore"
                local sema = semaphore.new()

                local function work(t)
                    ngx_sleep(t)
                    sema:post(1)
                end

                local spawn = ngx.thread.spawn
                local wait = ngx.thread.wait
                local ths = {}

                local start_time = ngx_now()
                ths[1] = spawn(work, 0.2) -- change to 0.05 will get 50ms timeout for the wait below
                ths[2] = spawn(work, 0.3)
                ths[3] = spawn(work, 0.4)

                local ok, err = sema:wait(0.1)
                if not ok then
                    if err ~= "timeout" then
                        ngx_say("failed to wait, err: ", err)
                        return
                    end
                end

                -- just for demo the status
                for i = 1,3 do
                    ngx_say("thread ", i, " status: ", coroutine.status(ths[i]))
                end

                ngx.update_time()
                local end_time = ngx_now()
                ngx_say("elapse: ", end_time - start_time, "ms")
            }
        }
 ```

Reply all
Reply to author
Forward
0 new messages