WebRTC Connection Failure between Next.js and QtPython Application

60 views
Skip to first unread message

Medhansh Garg

unread,
Jun 27, 2025, 2:00:49 AM6/27/25
to discuss-webrtc
I am developing two applications, a Next.js and a QtPython application. The goal is that the Next.js application will generate a WebRTC offer, post it to a Firebase document, and begin polling for an answer. The QtPython app will be polling this document for the offer, after which it will generate an answer accordingly and post this answer to the same Firebase document. The Next.js app will receive this answer and initiate the WebRTC connection. ICE Candidates are gathered on both sides using STUN and TURN servers from Twilio, which are received using a Firebase function.

The parts that work:
- The answer and offer creation
- The Firebase signaling
- ICE Candidate gathering (for the most part)

The parts that fail:
- Sometimes/some of the TURN and STUN servers are failing and returning Error: 701
- After the answer is added to the Remote Description on the PWA side, the ICE connection state disconnects and the peer connection state fails.

Code:
The WebRTC hook on the Next.js side:

import { useState, useRef, useCallback, useEffect } from 'react';

export type ConnectionState = 'connecting' | 'connected' | 'disconnected' | 'error';
export type MediaState       = 'on' | 'off' | 'error';

export interface UseWebRTCStreamProps {
  videoRef:        React.RefObject<HTMLVideoElement | null>;
  media:           MediaStream | null;
  sessionCode:     string;
  isMicOn:         MediaState;
  isVidOn:         MediaState;
  isFrontCamera:   boolean;
  resolution:      string;
  fps:             number;
  exposure:        number;
  startMedia:      () => void;
  stopMedia:       () => void;
}

export default function useWebRTCStream (initialProps: UseWebRTCStreamProps) {
  const propsRef = useRef(initialProps);
  useEffect(() => { propsRef.current = initialProps; });

  const peerRef    = useRef<RTCPeerConnection | null>(null);
  const statsRef   = useRef<NodeJS.Timeout | null>(null);
  const pollingRef = useRef(false);

  const [status, setStatus]     = useState<ConnectionState>('disconnected');
  const [error,  setError]      = useState<string | null>(null);
  const [on,     setOn]         = useState(false);

  const log = (...msg: unknown[]) => console.log('[useWebRTCStream]', ...msg);

  const cleanup = useCallback(() => {
    log('cleanup() called');
    pollingRef.current = false;

    if (statsRef.current) {
      log('clearing stats interval');
      clearInterval(statsRef.current);
      statsRef.current = null;
    }

    if (peerRef.current) {
      log('closing RTCPeerConnection');
      peerRef.current.close();
      peerRef.current = null;
    }

    const video = propsRef.current.videoRef.current;
    if (video) {
      log('detaching stream from <video>');
      video.srcObject = null;
    }

    setStatus('disconnected');
    setOn(false);
  }, []);

  useEffect(() => cleanup, [cleanup]);

  const startStream = useCallback(async () => {
    log('startStream() invoked');

    if (status === 'connecting' || status === 'connected') {
      log('already', status, ' – aborting duplicate call');
      return;
    }

    const {
      media, sessionCode, isMicOn, isVidOn,
      resolution, fps, isFrontCamera, exposure,
    } = propsRef.current;

    if (!media) {
      log('⚠️  No media present – setError and bail');
      setError('No media');
      return;
    }

    try {
      setStatus('connecting');
      log('fetching TURN credentials…');
      const iceResp = await fetch('<stun and turn cred API url>', { method: 'POST' });
      if (!iceResp.ok) throw new Error(`TURN creds fetch failed: ${iceResp.status}`);
      const iceServers = await iceResp.json();
      log('TURN credentials received', iceServers);


      const pc = new RTCPeerConnection({ iceServers, bundlePolicy: 'max-bundle' });
      peerRef.current = pc;
      log('RTCPeerConnection created');

      pc.onicegatheringstatechange = () => log('ICE gathering state →', pc.iceGatheringState);
      pc.oniceconnectionstatechange = () => log('ICE connection state →', pc.iceConnectionState);
      pc.onconnectionstatechange = () => log('Peer connection state →', pc.connectionState);
      pc.onicecandidateerror = (e) => log('ICE candidate error', e);


      media.getTracks().forEach(t => {
        const sender = pc.addTrack(t, media);
        pc.getTransceivers().find(tr => tr.sender === sender)!.direction = 'sendonly';
        log(`added track (${t.kind}) direction=sendonly`);
      });


      const offer = await pc.createOffer();
      log('SDP offer created');
      await pc.setLocalDescription(offer);
      log('local description set');


      await new Promise<void>(res => {
        if (pc.iceGatheringState === 'complete') return res();
        const cb = () => {
          if (pc.iceGatheringState === 'complete') {
            pc.removeEventListener('icegatheringstatechange', cb);
            res();
          }
        };
        pc.addEventListener('icegatheringstatechange', cb);
      });
      log('ICE gathering complete');


      const body = {
        code: sessionCode,
        offer: pc.localDescription,
        metadata: {
          mic:  isMicOn === 'on',
          webcam: isVidOn === 'on',
          resolution, fps,
          platform: 'mobile',
          facingMode: isFrontCamera ? 'user' : 'environment',
          exposureLevel: exposure,
          ts: Date.now(),
        },
      };
      log('submitting offer', body);
      const submitResp = await fetch('<submit offer API url>', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(body),
      });
      if (!submitResp.ok) throw new Error(`submitOffer failed: ${submitResp.status}`);
      log('offer submitted OK');


      pc.onconnectionstatechange = () => {
        log('peer connectionState →', pc.connectionState);
        switch (pc.connectionState) {
          case 'connected':   setStatus('connected'); setOn(true); break;
          case 'disconnected':
          case 'closed':      cleanup(); break;
          case 'failed':      setError('PeerConnection failed'); cleanup(); break;
          default:            setStatus('connecting');
        }
      };


      pollingRef.current = true;
      let delay = 2000;
      while (pollingRef.current) {
        log(`polling for answer (delay ${delay} ms)`);
        const ansResp = await fetch(' answer polling  API url', {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify({ code: sessionCode }),
        });

        if (ansResp.status === 204) {
          await new Promise(r => setTimeout(r, delay));
          delay = Math.min(delay * 2, 30000);
          continue;
        }

        if (!ansResp.ok) throw new Error(`checkAnswer failed: ${ansResp.status}`);
        const { answer } = await ansResp.json();
        if (answer) {
          log('answer received', answer);
          await pc.setRemoteDescription(answer);
          log('remote description set – streaming should begin');

   
          if (!statsRef.current) {
            statsRef.current = setInterval(async () => {
              if (pc.connectionState !== 'connected') return;
              const stats = await pc.getStats();
              stats.forEach(r => {
                if (r.type === 'candidate-pair' && r.state === 'succeeded')
                  log('ICE ✔ succeeded via', r.localCandidateId, '→', r.remoteCandidateId);
                if (r.type === 'outbound-rtp' && r.kind === 'video')
                  log('Video outbound – packets', r.packetsSent, 'bytes', r.bytesSent);
              });
            }, 3000);
            log('stats interval started');
          }
          break
        }
        await new Promise(r => setTimeout(r, delay));
      }
    } catch (e: any) {
      log('Error during startStream –', e.message);
      setError(e.message || 'unknown WebRTC error');
      cleanup();
    }
  }, [cleanup, status]);

  const stopStream = useCallback(() => {
    log('stopStream called');
    cleanup();
  }, [cleanup]);

  const toggleStream = useCallback(() => {
    log('toggleStream – on?', on);
    if (on) {
      propsRef.current.stopMedia();
      stopStream();
    } else {
      propsRef.current.startMedia()
    }
  }, [on, stopStream]);

  useEffect(() => {
    if (initialProps.media && !on) {
      log('media arrived – auto startStream');
      startStream();
    }
  }, [initialProps.media, on, startStream]);

  const replaceTrack = useCallback(async (kind: 'video' | 'audio', track: MediaStreamTrack | null) => {
    const pc = peerRef.current;
    if (!pc) { log('replaceTrack called but no pc'); return; }

    const sender = pc.getSenders().find(s => s.track?.kind === kind);
    if (sender) {
      log(`replacing existing ${kind} track`);
      await sender.replaceTrack(track);
    } else if (track) {
      log(`adding new ${kind} track (no sender)`);
      pc.addTrack(track, propsRef.current.media!);
    } else {
      log(`no ${kind} sender and no new track – nothing to do`);
    }
  }, []);

  return {
    isStreamOn:        on,
    connectionStatus:  status,
    error,
    replaceTrack,
    startStream,
    stopStream,
    toggleStream,
  };
}

Logs on the Next.js Side:
useWebRTCStream.tsx:50 [useWebRTCStream] ICE connection state → checking
useWebRTCStream.tsx:50 [useWebRTCStream] remote description set – streaming should begin
useWebRTCStream.tsx:50 [useWebRTCStream] stats interval started
useWebRTCStream.tsx:50 [useWebRTCStream] peer connectionState → connecting
useWebRTCStream.tsx:50 [useWebRTCStream] ICE connection state → disconnected
useWebRTCStream.tsx:50 [useWebRTCStream] peer connectionState → failed

The WebRTC Worker on the QtPython side:
import asyncio
import json
import threading
import requests
from aiortc import RTCConfiguration, RTCIceServer, RTCPeerConnection, RTCSessionDescription, MediaStreamTrack
from PySide6.QtCore import QObject, Signal
from av import VideoFrame
import cv2
import numpy as np
from datetime import datetime, timedelta
from enum import Enum
import random

class ConnectionState(Enum):
    CONNECTING = "connecting"
    CONNECTED = "connected"
    DISCONNECTED = "disconnected"
    FAILED = "failed"

class WebRTCWorker(QObject):
    video_frame_received = Signal(object)
    connection_state_changed = Signal(ConnectionState)

    def __init__(self, code: str, widget_win_id: int):
        super().__init__()
        self.code = code
        self.offer = None
        self.pc = None
        self.running = False

    def start(self):
        self.running = True
        threading.Thread(target = self._run_async_thread, daemon = True).start()
        self.connection_state_changed.emit(ConnectionState.CONNECTING)

    def stop(self):
        self.running = False
        if self.pc:
            asyncio.run_coroutine_threadsafe(self.pc.close(), asyncio.get_event_loop())
        self.connection_state_changed.emit(ConnectionState.DISCONNECTED)

    def _run_async_thread(self):
        asyncio.run(self._run())

    async def _run(self):
        if await self.poll_for_offer() == 1:
            return
        if not self.offer:
            self.connection_state_changed.emit(ConnectionState.FAILED)
            return
       
        ice_servers = self.fetch_ice_servers()
        print("[TURN] Using ICE servers:", ice_servers)
        config = RTCConfiguration(iceServers = ice_servers)
        self.pc = RTCPeerConnection(configuration = config)

        @self.pc.on("connectionstatechange")
        async def on_connectionstatechange():
            state = self.pc.connectionState
            print(f"[WebRTC] State: {state}")
            match state:
                case "connected":
                    self.connection_state_changed.emit(ConnectionState.CONNECTED)
                case "closed":
                    self.connection_state_changed.emit(ConnectionState.DISCONNECTED)
                case "failed":
                    self.connection_state_changed.emit(ConnectionState.FAILED)
                case "connecting":
                    self.connection_state_changed.emit(ConnectionState.CONNECTING)

        @self.pc.on("track")
        def on_track(track):
            print(f"[WebRTC] Track received: {track.kind}")
            if track.kind == "video":
                asyncio.ensure_future(self.handle_track(track))
       
        @self.pc.on("datachannel")
        def on_datachannel(channel):
            print(f"Data channel established: {channel.label}")
           
        @self.pc.on("iceconnectionstatechange")
        async def on_iceconnchange():
            print("[WebRTC] ICE connection state:", self.pc.iceConnectionState)
       
        # Prepare a Future to be resolved when ICE gathering is done
        self.ice_complete = asyncio.get_event_loop().create_future()

        @self.pc.on("icegatheringstatechange")
        async def on_icegatheringstatechange():
            print("[WebRTC] ICE gathering state:", self.pc.iceGatheringState)
            if self.pc.iceGatheringState == "complete":
                if not self.ice_complete.done():
                    self.ice_complete.set_result(True)

        # Set the remote SDP
        await self.pc.setRemoteDescription(RTCSessionDescription(**self.offer))

        # Create the answer
        answer = await self.pc.createAnswer()
        print("[WebRTC] Created answer:", answer)

        # Start ICE gathering by setting the local description
        await self.pc.setLocalDescription(answer)

        # Now wait for ICE gathering to complete
        await self.ice_complete

        # Send the fully-formed answer SDP (includes ICE candidates)
        self.send_answer(self.pc.localDescription)

    async def poll_for_offer(self):
        self.poll_attempt = 0
        self.max_attempts = 30
        self.base_delay = 1.0
        self.max_delay = 30.0

        while self.poll_attempt < self.max_attempts:
            if not self.running or self.code is None:
                print("🛑 Polling stopped.")
                self.connection_state_changed.emit(ConnectionState.DISCONNECTED)
                return 1

            print(f"[Polling] Attempt {self.poll_attempt + 1}")
            try:
                response = requests.post(
                    "https://checkoffer-qaf2yvcrrq-uc.a.run.app",
                    json = {"code": self.code},
                    timeout=5
                )
                if response.status_code == 200:
                    print("✅ Offer received!")
                    self.offer = response.json().get("offer")
                    self.connection_state_changed.emit(ConnectionState.CONNECTING)
                    return 0
                elif response.status_code == 204:
                    print("🕐 Not ready yet...")
                else:
                    print(f"⚠️ Unexpected status: {response.status_code}")
            except Exception as e:
                print(f"❌ Poll error: {e}")

            self.poll_attempt += 1
            delay = random.uniform(0, min(self.max_delay, self.base_delay * (2 ** self.poll_attempt)))
            print(f"🔁 Retrying in {delay:.2f} seconds...")
            await asyncio.sleep(delay)

        print("⛔ Gave up waiting for offer.")
        self.connection_state_changed.emit(ConnectionState.FAILED)
   
    def fetch_ice_servers(self):
        try:
            response = requests.post("https://getturncredentials-qaf2yvcrrq-uc.a.run.app", timeout = 10)
            response.raise_for_status()
            data = response.json()
           
            print(f"[WebRTC] Fetched ICE servers: {data}")

            ice_servers = []
            for server in data:
                ice_servers.append(
                    RTCIceServer(
                        urls=server["urls"],
                        username=server.get("username"),
                        credential=server.get("credential")
                    )
                )
            return ice_servers
        except Exception as e:
            print(f"❌ Failed to fetch TURN credentials: {e}")
            return []
   
    def send_answer(self, sdp):
        print(sdp)
        try:
            res = requests.post(
                "https://submitanswer-qaf2yvcrrq-uc.a.run.app",
                json = {
                    "code": self.code,
                    "answer": {
                        "sdp": sdp.sdp,
                        "type": sdp.type
                    },
                },
                timeout = 10
            )
            if res.status_code == 200:
                print("[WebRTC] Answer submitted successfully")
            else:
                print(f"[WebRTC] Answer submission failed: {res.status_code}")
        except Exception as e:
            print(f"[WebRTC] Answer error: {e}")

   
    async def handle_track(self, track: MediaStreamTrack):
        print("Inside handle track")
        self.track = track
        frame_count = 0
        while True:
            try:
                print("Waiting for frame...")
                frame = await asyncio.wait_for(track.recv(), timeout = 5.0)
                frame_count += 1
                print(f"Received frame {frame_count}")
               
                if isinstance(frame, VideoFrame):
                    print(f"Frame type: VideoFrame, pts: {frame.pts}, time_base: {frame.time_base}")
                    frame = frame.to_ndarray(format = "bgr24")
                elif isinstance(frame, np.ndarray):
                    print(f"Frame type: numpy array")
                else:
                    print(f"Unexpected frame type: {type(frame)}")
                    continue
             
                 # Add timestamp to the frame
                current_time = datetime.now()
                new_time = current_time - timedelta(seconds = 55)
                timestamp = new_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
                cv2.putText(frame, timestamp, (10, frame.shape[0] - 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
                cv2.imwrite(f"imgs/received_frame_{frame_count}.jpg", frame)
                print(f"Saved frame {frame_count} to file")
                cv2.imshow("Frame", frame)
   
                # Exit on 'q' key press
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
            except asyncio.TimeoutError:
                print("Timeout waiting for frame, continuing...")
            except Exception as e:
                print(f"Error in handle_track: {str(e)}")
                if "Connection" in str(e):
                    break
       
        print("Exiting handle_track")
        await self.pc.close()

My testing setup is as follows:
The Next.js project is running using npm run dev, and then I run ngrok http 3000 to expose the dev server on https, and I run the QtPython app locally.

Things I've tried:
- Initially, I wasn't receiving any ICE Candidates with "type = relay" when I was using public STUN servers and/or private Metered STUN and TURN servers. Upon further testing, I found that Metered's STUN server and several TURN servers were unreachable. So I switched to Twilio, where I am getting ICE Candidates with "type = relay," which, to my understanding, means that the TURN servers are being contacted to facilitate the connection
- Tired of checking why I'm getting Error 701, but I'm yet to figure out why.

I can confirm based on the console.log()s that SDP offers and answers are being generated, received, and set by both sides. However, the WebRTC connection still ultimately fails.

I'm using polling for now because I'm storing the WebRTC signaling data in the database unencrypted, so I have the database walled off except through Firebase Functions. I'm currently using polling until I implement ECDH encryption for the signaling data (which might be overkill, but I want to learn ECDH encryption) and open up the database for real-time use.

I'm very, very new to WebRTC, so I would appreciate any help and advice. Please feel free to let me know if the question requires any additional information or if any logs are needed (I didn't include them because I was concerned that they might contain sensitive data about my IP address and network setup).
Reply all
Reply to author
Forward
0 new messages