How to avoid 'TypeError: Cannot write to a closing writable stream'

443 views
Skip to first unread message

guest271314

unread,
Jan 19, 2022, 10:12:59 AM1/19/22
to blink-network-dev
How to avoid  'TypeError: Cannot write to a closing writable stream' when using a TransformStream()?

guest271314

unread,
Jan 19, 2022, 8:53:44 PM1/19/22
to blink-network-dev, guest271314
Given write() called in an asynchronous function that does not have a definitive end, this 

while (writer.desiredSize < 0) {
   console.log('Should (eventually) be be increasing to 1', writer.desiredSize);
   await scheduler.postTask(() => {});
 }

is what I tested and am using to avoid 

'TypeError: Cannot write to a closing writable stream'

when write() can be called after close() in an event handler.

guest271314

unread,
Jan 19, 2022, 9:24:56 PM1/19/22
to blink-network-dev, guest271314
Nonetheless the error still occasionally occrs. I am trying to run error free code. Any suggestion to avoid that TypeError will be helpful.

Adam Rice

unread,
Jan 20, 2022, 4:29:25 AM1/20/22
to guest271314, blink-network-dev
Quickest solution is to not call write() after close(). If you cannot do that, you could catch the error from write().

--
You received this message because you are subscribed to the Google Groups "blink-network-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to blink-network-...@chromium.org.
To view this discussion on the web visit https://groups.google.com/a/chromium.org/d/msgid/blink-network-dev/3001f555-aaa3-4eb8-bf2f-5806a806d436n%40chromium.org.

guest271314

unread,
Jan 20, 2022, 9:42:50 AM1/20/22
to blink-network-dev, Adam Rice, blink-network-dev, guest271314
I am currently catching the error. Ideally the goal is to avoid the error altogether. I asked here to see if any stream experts are aware of such a pattern.

This is the code. The problem is Native Messaging event handler (handleMessage) can be and is dispatched even after disconnect(), when Port is null. Basically, I am not sure how to determine when the handlr will not ne fired again, or the pattern suggested by MattiasBuelens in https://github.com/MattiasBuelens/web-streams-polyfill/issues/95#issuecomment-927354741 is effective here.

onload = () => {
  const { readable, writable } = new TransformStream(
/*
  {
    transform(value, c) {
      c.enqueue(value);
    },
    flush() {
      console.log('Flush.');
    }
  }, {
    highWaterMark: 1,
    size(chunk) {
      return chunk.length;
    }    
  }, {
    highWaterMark: 1,
    size(chunk) {
      return chunk.length;
    }    
  }
*/
 );
  const writer = writable.getWriter();
  const id = 'capture_system_audio';
  let port = chrome.runtime.connectNative(id);
  let handleMessage = async (value) => {  
    try {
      if (writable.locked) {
        await writer.ready;
        await writer.write(new Uint8Array(JSON.parse(value)));
      }
    } catch (e) {
      console.log(e.message);
    }
    return true;
  };
  port.onDisconnect.addListener(async (e) => {
    console.log(e.message);
  });
  port.onMessage.addListener(handleMessage);
  onmessage = async (e) => {
    const { type, message } = e.data;
    if (type === 'start') {
      port.postMessage({
        message,
      });
      parent.postMessage(readable, name, [readable]);
    }
    if (type === 'stop') {
      try {
        port.onMessage.removeListener(handleMessage);
        port.disconnect(id);
        port = null;
        /*
        while (writer.desiredSize < 0) {
          await scheduler.postTask(() => {});
        }
        */
        await writer.close();
        await writer.closed;
        writer.releaseLock();
        parent.postMessage(0, name);
        onmessage = null;
        await chrome.storage.local.clear();
      } catch (err) {
        console.log(err.message);
      }
    }
  };
  parent.postMessage(1, name);
};



guest271314

unread,
Jan 21, 2022, 2:37:28 AM1/21/22
to blink-network-dev, guest271314, Adam Rice, blink-network-dev
I adjusted the while loop and scheduler.postTask() pattern to achieve no errors in either browsing context that uses th transferable streams.

The iframe posts the readable side to parent window. When a message is posted to iframe from parent window to stop the stream both contexts need to use similar code to avoid 'TypeError: Cannot write to a closing writable stream' error.

  async stop() {
    console.log(this.inputController.desiredSize);
    try {
      this.source.postMessage({ type: 'stop', message: this.inputController.desiredSize }, '*');
    } catch (err) {
      console.error(err.message);
    }
  }

inputController is a ReadableStream enqueing data within a WritableStream piped to from the transferred readable from iframe, here we drain the inputController with desiredSize < 0 as condition to while loop, then call close

             close: async () => {
                if (channelData.length) {
                  this.inputController.enqueue(channelData);
                }
                while (this.inputController.desiredSize < 0) {
                  await scheduler.postTask(() => {});
                }
                this.inputController.close();

in the iframe we do not need releaseLock() or check if locked, when message is to stop streaming from parent window (here passing also the desiredSize of input stream controller) we use writer.desiredSize < 1. I am not sure why < 0 works for the readable stream (enqueuing the data from the transferred readable and being read in another writable stream on parent window) and < 1 works for the independent readable stream.

async function handleMessage(value, port) {
    try {

      await writer.ready;
      await writer.write(new Uint8Array(JSON.parse(value)));
    } catch (e) {
      console.error(e.message);
      ++errors;
    }
    return true;
  }
// ...
onmessage = async (e) => {
    const { type, message } = e.data;
    if (type === 'start') {
      port.postMessage({
        message,
      });
      parent.postMessage(readable, name, [readable]);
    }
    if (type === 'stop') {
      try {
        port.disconnect(id);
        console.log(writer.desiredSize, message);
        let tasks = 0;
        while (writer.desiredSize < 1) {
          await scheduler.postTask(() => {
            ++tasks;
          });
        }
        await writer.close();
        await writer.closed;
        console.log({ errors, tasks }, writer.desiredSize);

        parent.postMessage(0, name);
        onmessage = null;
        await chrome.storage.local.clear();
      } catch (err) {
        console.error(err.message);
      }
    }
  };

Sample output (audioStream.js is code injected into parent window, transferableStream.js is iframe), -323 is input ReadableStreamDefaultController.desiredSize, verified in iframe message event handler, -16 is the current desiredSize of WritableStreamDefaultWriter in iframe.To drain both readable and writable sides can require thousands of calls to scheduler.postTask(), over 100 when writer current desiredSize is 0, the desiredSize being proportional to the time the stream is propagating and reading data

audioStream.js:357 -323
transferableStream.js:44 -16 -323
transferableStream.js:53 {errors: 0, tasks: 3564} 0

audioStream.js:357 -68
transferableStream.js:44 0 -68
transferableStream.js:53 {errors: 0, tasks: 149} 0

audioStream.js:357 -726
transferableStream.js:44 -13 -726
transferableStream.js:53 {errors: 0, tasks: 1666} 0

audioStream.js:357 -52
transferableStream.js:52 -8 -52
transferableStream.js:56 {errors: 0, tasks: 410} 0

As a function (for the writable side controller)

  async function drain(controller, size, tasks = 0) {
    while (controller.desiredSize < size) {
      await scheduler.postTask(() => {
        ++tasks;
      });
    }
    return tasks;
  }
  //..
  const tasks = await drain(writer, 1);
  await writer.close();
  await writer.closed;

Screenshot_2022-01-20_23-36-01.png

Adam Rice

unread,
Jan 21, 2022, 4:01:23 AM1/21/22
to guest271314, blink-network-dev
Having to poll desiredSize is a sign that you are doing something wrong. For a WritableStream it should be sufficient to await writer.ready. For a ReadableStream controller you can just wait until pull() or one of the other underlying sink methods is called.

You should not worry too much about catching exceptions if you are expecting them.
Message has been deleted

guest271314

unread,
Jan 21, 2022, 8:30:38 AM1/21/22
to blink-network-dev, Adam Rice, blink-network-dev, guest271314
I am not sure what I am doing wrong then. 

You can kindly run the working code on a *nix machine https://github.com/guest271314/captureSystemAudio/tree/master/native_messaging/capture_system_audio (usage) https://github.com/guest271314/captureSystemAudio#web-accessible-resources-php-passthru-parec-fetch-transferable-streams-media-capture-transform-breakout-box for yourself then illuminate precisely what I am doing wrong so I will know.

I am conveying the errors I encountered in my use case The full code is here, in pertinent part the AudioStream class starts at line 237 in audioStream.js https://github.com/guest271314/captureSystemAudio/blob/fef52eb6868687612362f43854e96b32213a798e/native_messaging/capture_system_audio/audioStream.js#L237-L503 and transferableStream.js is the iframe that transfers the readable side to audioStream.js https://github.com/guest271314/captureSystemAudio/blob/master/native_messaging/capture_system_audio/transferableStream.js. I am not expecting exceptions. That is why I posted this question.

I already posted the transferableStream.js that was throwing. Both will throw without waiting for read/write. My solution might be crude though plugs the holes. In fact the code is not that much

class AudioStream {
  constructor(stdin) {
    this.stdin = stdin;
    this.readOffset = 0;
    this.duration = 0;
    this.src = new URL(
      'chrome-extension://<id>/transferableStream.html'
    );
    document
    .querySelectorAll(`[src="${this.src.href}"]`)
    .forEach((f) => {
      document.body.removeChild(f);
    });
    this.ac = new AudioContext({
      sampleRate: 44100,
      latencyHint: 0,
    });
    this.ac.suspend();
    this.msd = new MediaStreamAudioDestinationNode(this.ac, {
      channelCount: 2,
    });
    this.inputController = void 0;
    this.inputStream = new ReadableStream({
      start: (_) => {
        return (this.inputController = _);
      },
    });
    this.inputReader = this.inputStream.getReader();
    const { stream } = this.msd;
    this.stream = stream;
    const [track] = stream.getAudioTracks();
    this.track = track;
    this.osc = new OscillatorNode(this.ac, {
      frequency: 0,
    });
    this.processor = new MediaStreamTrackProcessor({
      track,
    });
    this.generator = new MediaStreamTrackGenerator({
      kind: 'audio',
    });
    const { writable: audioWritable } = this.generator;
    this.audioWritable = audioWritable;
    const { readable: audioReadable } = this.processor;
    this.audioReadable = audioReadable;
    this.audioWriter = this.audioWritable.getWriter();
    this.mediaStream = new MediaStream([this.generator]);
    this.resolve = void 0;
    this.promise = new Promise((_) => (this.resolve = _));
    this.osc.connect(this.msd);
    this.osc.start();
    this.track.onmute = this.track.onunmute = this.track.onended = (e) =>
      console.log(e);
    this.outputStreamController = void 0;
    this.outputStream = new ReadableStream(
      {
        start: async (_) => {
          return (this.outputStreamController = _);
        },
      },
      { highWaterMark: 1 }
    ).pipeThrough(
      new TransformStream({
        async transform(blob, c) {
          try {
            c.enqueue(new Uint8Array(await blob.arrayBuffer()));
          } catch (err) {
            console.warn(`Response.arrayBuffer(): ${err.message}`);

          }
        },
        flush() {
          console.log('Flush.');
        },
      })
    );
    this.recorder = new MediaRecorder(this.mediaStream, {
      audioBitrateMode: 'constant',
    });
    this.recorder.onstop = async (e) => {
      try {
        this.outputStreamController.close();
      } catch (err) {
        console.warn(err);
      }
      const { Decoder, Encoder, tools, Reader } = require('ts-ebml');
      const injectMetadata = async (blob) => {
        const decoder = new Decoder();
        const reader = new Reader();
        reader.logging = false;
        reader.drop_default_duration = false;
        const buffer = await blob.arrayBuffer();
        const elms = decoder.decode(buffer);
        elms.forEach((elm) => reader.read(elm));
        reader.stop();
        const refinedMetadataBuf = tools.makeMetadataSeekable(
          reader.metadatas,
          reader.duration,
          reader.cues
        );
        const body = buffer.slice(reader.metadataSize);
        const result = new Blob([refinedMetadataBuf, body], {
          type: blob.type,
        });
        return result;
      };
      this.resolve(
        injectMetadata(await new Response(this.outputStream).blob())
      );
    };
    this.recorder.ondataavailable = async ({ data }) => {
      if (data.size > 0) {
        this.outputStreamController.enqueue(data);
      }
    };
  }
  async start() {
    return this.nativeMessageStream();

  }
  async stop() {
    console.log(this.inputController.desiredSize);
    try {
      this.source.postMessage({ type: 'stop', message: this.inputController.desiredSize }, '*');
    } catch (err) {
      console.error(err.message);
    }
  }
  async nativeMessageStream() {
    return new Promise((resolve) => {
      onmessage = (e) => {
        if (e.origin === this.src.origin) {
          console.log(e.data);
          if (!this.source) {
            this.source = e.source;
          }
          if (e.data === 1) {
            this.source.postMessage(
              { type: 'start', message: this.stdin },
              '*'
            );
          }
          if (e.data === 0) {
            document
              .querySelectorAll(`[src="${this.src.href}"]`)
              .forEach((f) => {
                document.body.removeChild(f);
              });
            onmessage = null;
          }
          if (e.data instanceof ReadableStream) {
            this.stdout = e.data;
            resolve(this.captureSystemAudio());
          }
        }
      };
      this.transferableWindow = document.createElement('iframe');
      this.transferableWindow.style.display = 'none';
      this.transferableWindow.name = location.href;
      this.transferableWindow.src = this.src.href;
      document.body.appendChild(this.transferableWindow);
    }).catch((err) => {
      throw err;
    });
  }
  async captureSystemAudio() {
    this.recorder.start(1);
    let channelData = [];
    try {
      await Promise.allSettled([
        this.stdout
          .pipeTo(
            new WritableStream({
              write: async (value, c) => {
                let i = 0;
                for (; i < value.buffer.byteLength; i++, this.readOffset++) {
                  if (channelData.length === 441 * 4) {
                    this.inputController.enqueue([...channelData]);
                    channelData.length = 0;
                  }
                  channelData.push(value[i]);
                }
              },
              abort(e) {
                console.error(e.message);
              },

              close: async () => {
                if (channelData.length) {
                  this.inputController.enqueue(channelData);
                }
                while (this.inputController.desiredSize < 0) {
                  await scheduler.postTask(() => {});
                }
                this.inputController.close();
                console.log('Done writing input stream.');
                try {
                  this.recorder.requestData();
                  this.recorder.stop();
                  this.msd.disconnect();
                  this.osc.disconnect();
                  this.track.stop();
                  await this.audioWriter.close();
                  await this.audioWriter.closed;
                  this.generator.stop();
                  await this.ac.close();
                } catch (err) {
                  console.error(err);
                } finally {
                  console.log(
                    `readOffset:${this.readOffset}, duration:${this.duration}, ac.currentTime:${this.ac.currentTime}`,
                    `generator.readyState:${this.generator.readyState}, audioWriter.desiredSize:${this.audioWriter.desiredSize}`,
                    `inputController.desiredSize:${this.inputController.desiredSize}, ac.state:${this.ac.state}`
                  );
                }
              },
            })
          )
          .catch(console.warn),
        this.audioReadable
          .pipeTo(
            new WritableStream({
              abort(e) {
                console.error(e.message);
              },
              write: async ({ timestamp }) => {
                const uint8 = new Int8Array(441 * 4);
                const { value, done } = await this.inputReader.read();
                if (!done) uint8.set(new Int8Array(value));
                const uint16 = new Uint16Array(uint8.buffer);
                // https://stackoverflow.com/a/35248852
                const channels = [new Float32Array(441), new Float32Array(441)];
                for (let i = 0, j = 0, n = 1; i < uint16.length; i++) {
                  const int = uint16[i];
                  // If the high bit is on, then it is a negative number, and actually counts backwards.
                  const float =
                    int >= 0x8000 ? -(0x10000 - int) / 0x8000 : int / 0x7fff;
                  // deinterleave
                  channels[(n = ++n % 2)][!n ? j++ : j - 1] = float;
                }
                const data = new Float32Array(882);
                data.set(channels.shift(), 0);
                data.set(channels.shift(), 441);
                const frame = new AudioData({
                  timestamp,
                  data,
                  sampleRate: 44100,
                  format: 'f32-planar',
                  numberOfChannels: 2,
                  numberOfFrames: 441,
                });
                this.duration += frame.duration;
                await this.audioWriter.write(frame);
              },
              close: () => {
                console.log('Done reading input stream.');
              },
            })
          )
          .catch((e) => {
            console.error(e);
          }),
        this.ac.resume(),
      ]);
      return this.promise;
    } catch (err) {
      console.error(err);
    }
  }
}

audioStream = new AudioStream(
  `parec -d @DEFAULT_MONITOR@`
);
// audioStream.mediaStream: live MediaStream
audioStream
  .start()
  .then(async (ab) => {
    // ab: ArrayBuffer representation of WebM file from MediaRecorder
    console.log(audioStream);
    const blob = new Blob([ab], { type: 'audio/webm;codecs=opus' });
    console.log(URL.createObjectURL(blob));
    const {origin} = audioStream.src;
    globalThis.audioStream = AudioStream = null;
    let permission = await navigator.permissions.query({
      name: 'notifications',
    });
    if (permission.state !== 'granted') {
      permission = await Notification.requestPermission();
    }
    if (permission.state === 'granted' || permission === 'granted') {
      const saveFileNotification = new Notification('Save file?', {
        body: `Click "Activate" to download captured system audio recording.`,
        icon: `${origin}/download_music_icon.png`,
      });
      saveFileNotification.onclick = async (e) => {
        try {
          const handle = await showSaveFilePicker({
            startIn: 'music',
            suggestedName: 'recording' + new Date().getTime() + '.webm',
          });
          console.log(handle);
          const writable = await handle.createWritable();
          const writer = writable.getWriter();
          await writer.write(blob);
          await writer.close();
        } catch (err) {
          console.error(err);
        }
      };
      saveFileNotification.onshow = async (e) => {
        await new Promise((resolve) => setTimeout(resolve, 1000 * 30));
        e.target.close();
      };
    }
  })
  .catch(console.error);
}

Ideally we should be able to use transferable streams with Native Messaging directly. I filed a FUGU for that https://bugs.chromium.org/p/chromium/issues/detail?id=1214621 and for direct connection to Native Messaging host https://bugs.chromium.org/p/chromium/issues/detail?id=1250936 before Chromium authors decided to ban me. 


fetch(url, {method:'post', body:<ReadableStrem|command as string>, signal:abortSignal})
  .then((r) => r.body.pipeTo(...))

in PHP

  stream_set_blocking($input = fopen("php://input", "r"), 0);
  passthru(stream_get_contents($input))

getting fetch() to work cross-origin is not. I would prefer to not use an iframe at all, but alas that is what works.

I look forward to you indicating what I am doing wrong in the code re Streams and Transferable Streams.

guest271314

unread,
Jan 21, 2022, 9:24:22 AM1/21/22
to blink-network-dev, guest271314, Adam Rice, blink-network-dev
FWIW, as an aside, my latest attempt at implementing direct connection to Native Messaging host using "externally_connectable" at an arbitrary web page.

Two (2) issues exists with that approach

1. We need to keep the background ServiceWorker active beyond 5 minute specification "lifecycle", to persist Native Messaging connection and handle hours of capturing system audio, e.g., a live jam session
2. The browser freezes during reading

For 1. I created a repository specifically to keep ServiceWorker active indefinitely https://github.com/guest271314/persistent-serviceworker. I have kept the same ServiceWorker active on both Chromium 99 and Firefox 96 for well over 24 hours, persisting the data https://guest271314.github.io/persistent-serviceworker/

For 2. I do not know why the browser freezes

background.js in MV3 ServiceWorker (use caution when testing, freezes the browser on Chromium 99 dev channel)

const {readable, writable} = new TransformStream({
  transform(value, controller) {
    controller.enqueue(value);
  }
});

const reader = readable.getReader();
const writer = writable.getWriter();

async function handleNativeMessage (value) {  
    try {
      if (writable.locked) {
        await writer.ready;
        await writer.write(value);
      }
    } catch (e) {
      console.warn(e.message);
    }
    return true;
  };

if (!self.nativeMessagingPort) {
    self.nativeMessagingPort = chrome.runtime.connectNative('capture_system_audio');
    self.nativeMessagingPort.onDisconnect.addListener(async (e) => {
      console.warn(e);
    });
    self.nativeMessagingPort.onMessage.addListener(handleNativeMessage);
  }

var port;
self.connected = false;
async function handleConnectExternal(_) {
  port = _;
  if (!self.connected) {
    self.connected = true;
    self.nativeMessagingPort.postMessage({message:'parec -d @DEFAULT_MONITOR@'});
  }
  const keepAlive = new Date();

  function handleMessage(e) {
    console.log(e);
  }
  function handleDisconnect(e) {
    console.log(e);
    port.onMessage.removeListener(handleMessage);
    port.onDisconnect.removeListener(handleDisconnect);
    port = null;  
    self.nativeMessagingPort.disconnect();
    self.nativeMessagingPort = null;
    if (!self.nativeMessagingPort) {
      self.nativeMessagingPort = chrome.runtime.connectNative('capture_system_audio');
      self.nativeMessagingPort.onDisconnect.addListener(async (e) => {
        console.warn(e.message);
      });
      self.nativeMessagingPort.onMessage.addListener(handleNativeMessage);
      self.connected = false;
    }
  }

  port.onMessage.addListener(handleMessage);
  port.onDisconnect.addListener(handleDisconnect);
 
  while (port && ((new Date() - keepAlive) / 60000) < 1) {
   // if (port) {
      const now = new Date();
      const {value, done} = await reader.read();
      port.postMessage(JSON.parse(value));
      // {start, now, keepAlive, test: ((new Date() - keepAlive) / 60000) < 1,  minutes: (now - start) / 60000});
      // await new Promise((resolve) => setTimeout(resolve, 100));
   // } else {
   //   break;
   // }
  }
  // const now = new Date();
  // port.postMessage({start, now, keepAlive, test: ((new Date() - keepAlive) / 60000) < 1,  minutes: (now - start) / 60000});
  if (port) {
  port.disconnect();
  //port.onMessage.removeListener(handleMessage);
  //port.onDisconnect.removeListener(handleDisconnect);  
  port = null;
  }
}

chrome.runtime.onConnectExternal.addListener(handleConnectExternal);
*/
/*
var port;
async function* keepAlive() {
  while (true) {
  yield new Promise((resolve) => {
  function handleMessage(e) {
    console.log(e);
  }
  function handleDisconnect(e) {
    port.onMessage.removeListener(handleMessage);
    port.onDisconnect.removeListener(handleDisconnect);  
    port = null;
    resolve('loop');
  }
  port = chrome.runtime.connect('pbcacennomncannjbmdjogheacknncbf', {name:'keepalive'});
  port.onMessage.addListener(handleMessage);
  port.onDisconnect.addListener(handleDisconnect);
  });
  }
}
async function stream() {
  for await (const _ of keepAlive()) {
    console.log(_);    
  };
}
stream();

Though now I am curious about what you believe I am doing wrong in the former code.

Adam Rice

unread,
Jan 24, 2022, 1:25:17 AM1/24/22
to guest271314, blink-network-dev
Wow, that's a lot of code. I think at this point it would be better to take your query to StackOverflow as this is not a support forum.

guest271314

unread,
Jan 24, 2022, 8:15:21 AM1/24/22
to Adam Rice, blink-network-dev
I figured it out. I will use the approach of while desired size wait. You claimed I was doing something wrong. I vet spurious claims. I don't think you meant that with malice, however a claim is a claim. If you can;t back it up with evidence, it is spurious or the individual is lazy or lying. I am banned from SO for another few years. W3C, WICG, and even bugs.chromium.org banned me. A lot of code is relative. WebRTC has a lot of code, yet webrtc-dev still refused capture monitor devices on Linux. It is only a few dozen lines. Thus, I have no roadmap for what I am doing. Progressing nonetheless. I asked the experts, the originators oft he code. I am a primary source researcher. Thanks for your help anyway. Perhaps in your spare time you can take a look at the bugs I filed re Native Messaging and Transferable Streams amalgamation.
Reply all
Reply to author
Forward
0 new messages