Correct initialization of signal socket thread for multithreading notification under linux.

117 views
Skip to first unread message

Peter Arnold

unread,
May 4, 2022, 3:47:14 AM5/4/22
to discuss-webrtc
Hi, I have asked my friend to post it in stack overflow, so I duplicate it in here.  
I need to solve it for my project.

I'm newbie to webrtc, and there is no good documentation to about webrtc multithreading model.

My application have to communicate through janus-client,
I have one but only under windows, here is the reference
https://github.com/ouxianghui/janus-client

I have changed some class for make it compilable and working under linux, it's work correct. In application part there is signaling thread initialization with socket, and registering couple of threads for correct work of janus library. See below

Here I show main_qt.cpp , it's usual main with windows intialization, it work correct. I removed headers, for decreasing size, main is what interesting.
This four lines was commented, to work with linux.

    rtc::WinsockInitializer winsockInit;
    rtc::Win32SocketServer w32ss;
    rtc::Win32Thread w32Thread(&w32ss);
    rtc::ThreadManager::Instance()->SetCurrentThread(&w32Thread);


   
    static void registerMetaTypes()
    {
        qRegisterMetaType<std::function<void()>>("std::function<void()>");
        qRegisterMetaType<std::string>("std::string");
        qRegisterMetaType<std::vector<std::string>>("std::vector<std::string>");
   
        qRegisterMetaType<uint64_t>("int64_t");
        qRegisterMetaType<uint64_t>("uint64_t");
        qRegisterMetaType<uint64_t>("int32_t");
        qRegisterMetaType<uint64_t>("uint32_t");
        qRegisterMetaType<std::shared_ptr<vi::Participant>>("std::shared_ptr<vi::Participant>");
        qRegisterMetaType<rtc::scoped_refptr<webrtc::MediaStreamInterface>>("rtc::scoped_refptr<webrtc::VideoTrackInterface>");
        qRegisterMetaType<std::shared_ptr<vi::CreateRoomResult>>("std::shared_ptr<vi::CreateRoomResult>");
    }
   
    static void initOpenGL() {
        QSurfaceFormat format;
        format.setDepthBufferSize(24);
        format.setStencilBufferSize(8);
        format.setVersion(3, 2);
        format.setProfile(QSurfaceFormat::CoreProfile);
        QSurfaceFormat::setDefaultFormat(format);
    }
   
    int main(int argc, char *argv[])
    {
        vi::Logger::init();
   
        rtc::WinsockInitializer winsockInit;
        rtc::Win32SocketServer w32ss;
        rtc::Win32Thread w32Thread(&w32ss);
        rtc::ThreadManager::Instance()->SetCurrentThread(&w32Thread);
   
   
        appDelegate->init();
   
        registerMetaTypes();
   
        rtc::InitializeSSL();
   
        QApplication a(argc, argv);
   
        initOpenGL();
   
        int ret = 0;
   
        auto jcDialog = std::make_shared<JanusConnectionDialog>(nullptr);
        jcDialog->init();
   
        if (QDialog::Accepted == jcDialog->exec()) {
            jcDialog->cleanup();
   
            std::shared_ptr<GUI> w = std::make_shared<GUI>();
            appDelegate->getRtcEngine()->registerEventHandler(w);
   
            w->show();
   
            w->init();
   
            ret = a.exec();
        }
   
        appDelegate->destroy();
   
        rtc::CleanupSSL();
   
        return ret;
   
    }

After win object initialization, it calls an initialization of appDelegate , function described bellow
appDelegate->init(); just singletone

    void AppDelegate::init()
    {
        if (!_rtcEngine) {
            _rtcEngine = vi::RTCEngineFactory::createEngine();
            _rtcEngine->init();
        }
    }

    namespace vi {
        std::shared_ptr<IRTCEngine> RTCEngineFactory::createEngine()
        {
            return RTCEngine::instance();
        }
    }

And here header for the class itself which wrapped by AppDelegate

    namespace vi {
        class RTCEngine
            : public IRTCEngine
            , public ISignalingClientObserver
            , public Observable
            , public std::enable_shared_from_this<RTCEngine>
        {
        public:
            static std::shared_ptr<IRTCEngine> instance()
            {
                static std::shared_ptr<IRTCEngine> _instance;
                static std::once_flag ocf;
                std::call_once(ocf, []() {
                    _instance.reset(new RTCEngine());
                });
                return _instance;
            }
   
            ~RTCEngine() override;
   
            void init() override;
   
            void destroy() override;
   
            void registerEventHandler(std::weak_ptr<IEngineEventHandler> handler) override;
   
            void unregisterEventHandler(std::weak_ptr<IEngineEventHandler> handler) override;
   
            void setOptions(const Options& opts) override;
   
            void startup() override;
   
            void shutdown() override;
   
            std::shared_ptr<VideoRoomClientInterface> createVideoRoomClient() override;
   
            std::shared_ptr<IUnifiedFactory> getUnifiedFactory();
   
        protected:
            void onSessionStatus(SessionStatus status) override;
   
        private:
            RTCEngine();
   
            RTCEngine(const RTCEngine&) = delete;
   
            RTCEngine& operator=(const RTCEngine&) = delete;
   
        private:
            friend class Singleton<RTCEngine>;
   
            std::shared_ptr<IUnifiedFactory> _unifiedFactory;
   
            std::vector<std::weak_ptr<IEngineEventHandler>> _observers;
   
            Options _options;
   
            rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> _pcf;
            std::unique_ptr<rtc::Thread> _signaling;
            std::unique_ptr<rtc::Thread> _worker;
            std::unique_ptr<rtc::Thread> _network;
        };
   
    }

Here is the init method body

    void RTCEngine::init()
        {
            if (!_pcf) {
                _signaling = rtc::Thread::Create();
                _signaling->SetName("pc_signaling_thread", nullptr);
                _signaling->Start();
                _worker = rtc::Thread::Create();
                _worker->SetName("pc_worker_thread", nullptr);
                _worker->Start();
                _network = rtc::Thread::CreateWithSocketServer();
                _network->SetName("pc_network_thread", nullptr);
                _network->Start();
                _pcf = webrtc::CreatePeerConnectionFactory(
                    _network.get() /* network_thread */,
                    _worker.get() /* worker_thread */,
                    _signaling.get() /* signaling_thread */,
                    nullptr /* default_adm */,
                    webrtc::CreateBuiltinAudioEncoderFactory(),
                    webrtc::CreateBuiltinAudioDecoderFactory(),
                    webrtc::CreateBuiltinVideoEncoderFactory(),
                    webrtc::CreateBuiltinVideoDecoderFactory(),
                    nullptr /* audio_mixer */,
                    nullptr /* audio_processing */);
            }
   
            _unifiedFactory = std::make_shared<UnifiedFactory>();
            _unifiedFactory->init();
   
            _unifiedFactory->getSignalingClient()->registerObserver(shared_from_this());
        }

    void UnifiedFactory::init()
        {
            if (!_threadProvider) {
                _threadProvider = std::make_unique<vi::ThreadProvider>();
                _threadProvider->init();
                _threadProvider->create({ "signaling-service", "plugin-client", "message-transport", "capture-session" });
            }
   
            if (!_serviceFactory) {
                _serviceFactory = std::make_shared<vi::ServiceFactory>();
                _serviceFactory->init();
            }
   
            if (!_signalingClient) {
                _signalingClient = vi::SignalingClientProxy::Create(_threadProvider->thread("signaling-service"), std::make_shared<vi::SignalingClient>());
                _signalingClient->init();
            }
        }

This part is for thread provider, thread provider is the set of treads associated with it's name for correct execution of functions for call back in observers, I don't know why it's wrap current thread, why need to do like that and not just add threads into container and just start them.

With code of
_signalingClient = vi::SignalingClientProxy::Create(_threadProvider->thread("signaling-service"), std::make_shared<vi::SignalingClient>());
signaling client has to be run with signaling-service thread.


Header of class thread manager,and add and find methods ,see below. It's just manage created threads

    class ThreadProvider
        {
        public:
            ThreadProvider();
   
            ~ThreadProvider();
   
            void init();
   
            void destroy();
   
            void create(const std::list<std::string>& threadNames);
   
            rtc::Thread* thread(const std::string& name);
   
        private:
            ThreadProvider(const ThreadProvider&) = delete;
   
            ThreadProvider(ThreadProvider&&) = delete;
   
            ThreadProvider& operator=(const ThreadProvider&) = delete;
   
        public:
            void stopAll();
   
        private:
            std::unordered_map<std::string, std::shared_ptr<rtc::Thread>> _threadsMap;
           
            std::mutex _mutex;
   
            rtc::Thread* _mainThread = nullptr;
   
            std::atomic_bool _inited;
   
            std::atomic_bool _destroy;
        };





    void ThreadProvider::init()
        {
            std::lock_guard<std::mutex> lock(_mutex);
   
            _mainThread = rtc::ThreadManager::Instance()->CurrentThread();
   
            _inited = true;
        }
   
        void ThreadProvider::destroy()
        {
            _destroy = true;
        }
   
        void ThreadProvider::create(const std::list<std::string>& threadNames)
        {
            std::lock_guard<std::mutex> lock(_mutex);
   
            if (!_inited) {
                DLOG("_inited == false");
                return;
            }
   
            for (const auto& name : threadNames) {
                _threadsMap[name] = rtc::Thread::Create();
                _threadsMap[name]->SetName(name, nullptr);
                _threadsMap[name]->Start();
            }
        }


Here is passing through observers with threads itself, it found interesting observer, but when it have execute thread and call callback it the thread remained in the queue and did not execute, as if it had not been notified for execution.  

    template<typename Observer>
        class UniversalObservable {
        public:
            using observer_ptr = std::shared_ptr<Observer>;
            void addWeakObserver(const observer_ptr &observer, absl::optional<std::string> threadName) {
                rtc::CritScope scope(&_criticalSection);
                if (!hasObserverInternal(observer)) {
                    WeakObject wobj(observer, threadName);
                    _observers.emplace_back(wobj);
                }
            }
           
            void addObserver(const observer_ptr &observer, absl::optional<std::string> threadName) {
                rtc::CritScope scope(&_criticalSection);
                if (!hasObserverInternal(observer)) {
                    Object obj(observer, threadName);
                    _observers.emplace_back(obj);
                }
            }
           

    virtual void notifyObservers(std::function<void(const observer_ptr &)> notifier) const {
            decltype(_observers) observers;
            {
                rtc::CritScope scope(&_criticalSection);
                observers = _observers;
            }
           
            for (const auto &observer: observers) {
                auto var = absl::any(observer);
                if (var.has_value()) {
                    std::shared_ptr<Observer> obs;
                    absl::optional<std::string> threadName;
                   
                    if (absl::any_cast<WeakObject>(&var)) {
                        WeakObject wobj = absl::any_cast<WeakObject>(var);
                        obs = wobj.observer.lock();
                        threadName = wobj.threadName;
                    } else if (absl::any_cast<Object>(&var)) {
                        Object obj = absl::any_cast<Object>(var);
                        obs = obj.observer;
                        threadName = obj.threadName;
                    }
                   
                    if (obs) {
                        rtc::Thread* thread = TMgr->thread(threadName.value_or(""));
                        assert(thread);
                        if (thread->IsCurrent()) {
                            notifier(obs);
                        }
                        else {
                            //notifier(obs);
                            thread->PostTask(RTC_FROM_HERE, [wobs = std::weak_ptr<Observer>(obs), notifier]() {
                                if (auto observer = wobs.lock()) {
                                    notifier(observer);
                                }
                            });
                        }
                    }
                }
            }
        }

after that initialization, called lines below, I'm not discussing about elements between and behind, it's logic is independent for that problem.

 auto jcDialog = std::make_shared<JanusConnectionDialog>(nullptr);
 jcDialog->init();

    JanusConnectionDialog header, it inherited from handler interface so it could work as observer.

    class JanusConnectionDialog : public QDialog, public vi::IEngineEventHandler, public std::enable_shared_from_this<JanusConnectionDialog>
    {
        Q_OBJECT
   
    public:
        JanusConnectionDialog(QWidget *parent = Q_NULLPTR);
   
        ~JanusConnectionDialog();
   
        void init();
   
        void cleanup();
   
    private slots:
        void on_connectJanusPushButton_clicked();
   
        void on_cancelConnectPushButton_clicked();
   
        void on_pushButton_clicked();
   
    private:
   
        // IEngineEventHandler
   
        void onStatus(vi::EngineStatus status) override;
   
        void onError(int32_t code) override;
   
    private:
        Ui::JanusConnectionDialog ui;
    };



    void JanusConnectionDialog::init()
    {
        appDelegate->getRtcEngine()->registerEventHandler(shared_from_this());
    }


And register Event handler itself, which add observer to list of observers

    void RTCEngine::registerEventHandler(std::weak_ptr<IEngineEventHandler> handler)
        {
            Observable::addBizObserver<IEngineEventHandler>(_observers, handler);
        }


After I push button to connect to server, meessage after succesed connection have to be returned to slot, it's not qt slot , just my function for get messages from registerEventHandler, it's not called if I comment first four lines

    rtc::WinsockInitializer winsockInit;
    rtc::Win32SocketServer w32ss;
    rtc::Win32Thread w32Thread(&w32ss);
    rtc::ThreadManager::Instance()->SetCurrentThread(&w32Thread);

 ,
but it work correct with windows
The observer call back is

    void RTCEngine::onSessionStatus(SessionStatus status)
        {
            Observable::notifyObserver4Change<IEngineEventHandler>(_observers, [status](const auto& observer) {
                EngineStatus es = status == SessionStatus::CONNECTED ? EngineStatus::CONNECTED : EngineStatus::DISCONNECTED;
                observer->onStatus(es);
            });
        }

Which call onStatus, for signaling, that , there is opened connection  

    void JanusConnectionDialog::onStatus(vi::EngineStatus status)
    {
        if (vi::EngineStatus::CONNECTED == status) {
            accept();
        }
    }


And here is the code for calling an observer call back, this callback is called always, because I have a notification about channel is open and json message is pass through that method, but there is no observer notification in lambda, after executing notifyobservers.
See upper, code with class UniversalObservable::notifyobservers

    void SignalingClient::createSession(std::shared_ptr<CreateSessionEvent> event)
        {
            auto lambda = [wself = weak_from_this(), event](const std::string& json) {
                std::string err;
                std::shared_ptr<CreateSessionResponse> model = fromJsonString<CreateSessionResponse>(json, err);
                if (!err.empty()) {
                    DLOG("parse JanusResponse failed");
                    return;
                }
   
                DLOG("model.janus = {}", model->janus.value_or(""));
                if (auto self = wself.lock()) {
                    self->_sessionId = model->session_id.value_or(0) > 0 ? model->session_id.value() : model->data->id.value();
                    self->startHeartbeat();
                    self->_sessionStatus =SessionStatus::CONNECTED;
   
                   
    self->UniversalObservable<ISignalingClientObserver>::notifyObservers([](const auto& observer) {
    //here is the part for calling an observer and return call back
                        observer->onSessionStatus(SessionStatus::CONNECTED);
                    });
   
                    if (event && event->callback) {
                        self->_eventHandlerThread->PostTask(RTC_FROM_HERE, [cb = event->callback]() {
                            (*cb)(true, "");
                        });
                    }
                }
            };
            std::shared_ptr<JCCallback> callback = std::make_shared<JCCallback>(lambda);
            if (event && event->reconnect) {
                _client->reconnectSession(_sessionId, callback);
            }
            else {
                _client->createSession(callback);
            }
        }


This method, is where this call back has to be executed, but it never called

    void RTCEngine::onSessionStatus(SessionStatus status)
        {
            Observable::notifyObserver4Change<IEngineEventHandler>(_observers, [status](const auto& observer) {
                EngineStatus es = status == SessionStatus::CONNECTED ? EngineStatus::CONNECTED : EngineStatus::DISCONNECTED;
                observer->onStatus(es);
            });
        }

Maybe there is some mechanisms to work with linux system as windows, or some example which do the same functionality as this four lines, it's work under windows, so it could be some hidden calls for passing a message up to client application from library.

    rtc::WinsockInitializer winsockInit;
    rtc::Win32SocketServer w32ss;
    rtc::Win32Thread w32Thread(&w32ss);
    rtc::ThreadManager::Instance()->SetCurrentThread(&w32Thread);

Thanks for the help!
   


Reply all
Reply to author
Forward
0 new messages