Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collaborative WebRTC disconnects with big enough update #598

Closed
datakurre opened this issue Apr 12, 2022 · 12 comments · Fixed by #615
Closed

Collaborative WebRTC disconnects with big enough update #598

datakurre opened this issue Apr 12, 2022 · 12 comments · Fixed by #615
Labels
bug Something isn't working

Comments

@datakurre
Copy link

datakurre commented Apr 12, 2022

Description

Collaborative WebRTC silently disconnects with too big display data being transferred.

Reproduce

from IPython.display import display
payload = "a" * 1024 * 256
display({"text/html": f"OK<!--{payload}-->"}, raw=True)

y-webrtc logging enabled with

// enable logging for all modules
localStorage.log = 'true'

webrtc-disconnects

Expected behavior

WebRTC would not disconnect silently. Either support large payloads or gracefully degrades.

Context

RobotKernel injects execution logs into notebook by displaying HTML links with large inline data URIs. Not beautiful, but has been working too well so far:

https://robotkernel.readthedocs.io/en/latest/_/lab/index.html?path=Example.ipynb&room=demo

/cc @bollwyvl

@datakurre datakurre added the bug Something isn't working label Apr 12, 2022
@datakurre
Copy link
Author

datakurre commented Apr 13, 2022

Ouch. This seems to be a known issue in y-webrtc yjs/y-webrtc#20 and actually a known issue of its dependency simple-peer feross/simple-peer#393

There are fixes proposed in both simple-peer and in y-webrtc (by patching simple-peer). I'll see if I am able to apply some of those.

@datakurre
Copy link
Author

I can confirm that buffering WebRTC connection would fix this issue.

Because jupyterlite imports source version of y-webrtc, I was able to apply the propose patch into node_modules before build.

Fixing this properly looks quite involved. y-webrtc would require a feature to allow customization of SimplePeer and then jupyterlite could pass it its own extended SimplePeer with buffering. All this assuming that the patch does not have any side effects...

webrtc-disconnects-patch

@bollwyvl
Copy link
Collaborator

bollwyvl commented Apr 13, 2022 via email

@datakurre
Copy link
Author

Here's the literal patch I used to patch y-webrtc. It's almost the pull request, with a single line addition from the fork of the author of the pull request (because author had not yet updated the pull request).

From fd6f29b6e692a23c7fe5034cc4eee55af98ea55c Mon Sep 17 00:00:00 2001
From: cy <[email protected]>
Date: Fri, 21 May 2021 07:26:03 -0500
Subject: [PATCH] extend simple peer to handle buffered/packet transmission;
 add dependency with license

---
 src/SimplePeerExtended.js | 131 ++++++++++++++++++++++++++++++++++++++
 src/int64-buffer.min.js   |  25 ++++++++
 src/y-webrtc.js           |   4 +-
 3 files changed, 159 insertions(+), 1 deletion(-)
 create mode 100644 src/SimplePeerExtended.js
 create mode 100644 src/int64-buffer.min.js

diff --git a/src/SimplePeerExtended.js b/src/SimplePeerExtended.js
new file mode 100644
index 0000000..125e599
--- /dev/null
+++ b/src/SimplePeerExtended.js
@@ -0,0 +1,132 @@
+import * as Y from 'yjs' // eslint-disable-line
+import Peer from 'simple-peer/simplepeer.min.js'
+const { Int64BE } = require('./int64-buffer.min.js')
+
+export const CHUNK_SIZE = (1024 * 16) - 512 // 16KB - data header
+export const TX_SEND_TTL = 1000 * 30 // 30 seconds
+export const MAX_BUFFERED_AMOUNT = 64 * 1024 // simple peer value
+
+function concatenate (Constructor, arrays) {
+  let totalLength = 0
+  for (const arr of arrays) totalLength += arr.length
+  const result = new Constructor(totalLength)
+  let offset = 0
+  for (const arr of arrays) {
+    result.set(arr, offset)
+    offset += arr.length
+  }
+  return result
+}
+
+class SimplePeerExtended extends Peer {
+  constructor (opts) {
+    super(opts)
+    this._opts = opts
+    this._txOrdinal = 0
+    this._rxPackets = []
+    this._txPause = false
+    this.webRTCMessageQueue = []
+    this.webRTCPaused = false
+  }
+
+  encodePacket ({ chunk, txOrd, index, length, totalSize, chunkSize }) {
+    const encoded = concatenate(Uint8Array, [
+      new Uint8Array(new Int64BE(txOrd).toArrayBuffer()), // 8 bytes
+      new Uint8Array(new Int64BE(index).toArrayBuffer()), // 8 bytes
+      new Uint8Array(new Int64BE(length).toArrayBuffer()), // 8 bytes
+      new Uint8Array(new Int64BE(totalSize).toArrayBuffer()), // 8 bytes
+      new Uint8Array(new Int64BE(chunkSize).toArrayBuffer()), // 8 bytes
+      chunk // CHUNK_SIZE
+    ])
+    return encoded
+  }
+
+  decodePacket (array) {
+    return {
+      txOrd: new Int64BE(array.slice(0, 8)).toNumber(),
+      index: new Int64BE(array.slice(8, 16)).toNumber(),
+      length: new Int64BE(array.slice(16, 24)).toNumber(),
+      totalSize: new Int64BE(array.slice(24, 32)).toNumber(),
+      chunkSize: new Int64BE(array.slice(32, 40)).toNumber(),
+      chunk: array.slice(40)
+    }
+  }
+
+  packetArray (array, size) {
+    const txOrd = this._txOrdinal
+    this._txOrdinal++
+    const chunkedArr = []
+    const totalSize = array.length || array.byteLength
+    let index = 0
+    while (index < totalSize) {
+      chunkedArr.push(array.slice(index, size + index))
+      index += size
+    }
+    return chunkedArr.map((chunk, index) => {
+      return this.encodePacket({
+        chunk,
+        txOrd,
+        index,
+        totalSize,
+        length: chunkedArr.length,
+        chunkSize: chunk.byteLength
+      })
+    })
+  }
+
+  _onChannelMessage (event) {
+    const { data } = event
+    const packet = this.decodePacket(data)
+    if (packet.chunk instanceof ArrayBuffer) packet.chunk = new Uint8Array(packet.chunk)
+    if (packet.chunkSize === packet.totalSize) {
+      this.push(packet.chunk)
+    } else {
+      const data = this._rxPackets.filter((p) => p.txOrd === packet.txOrd)
+      data.push(packet)
+      const indices = data.map(p => p.index)
+      if (new Set(indices).size === packet.length) {
+        data.sort(this.sortPacketArray)
+        const chunks = concatenate(Uint8Array, data.map(p => p.chunk))
+        this.push(chunks)
+        setTimeout(() => { this._rxPackets = this._rxPackets.filter((p) => p.txOrd !== packet.txOrd) }, TX_SEND_TTL)
+      } else {
+        this._rxPackets.push(packet)
+      }
+    }
+  }
+
+  sortPacketArray (a, b) { return a.index > b.index ? 1 : -1 }
+  send (chunk) {
+    if (chunk instanceof ArrayBuffer) chunk = new Uint8Array(chunk)
+    const chunks = this.packetArray(chunk, CHUNK_SIZE)
+    this.webRTCMessageQueue = this.webRTCMessageQueue.concat(chunks)
+    if (this.webRTCPaused) return
+    this.sendMessageQueued()
+  }
+
+  sendMessageQueued () {
+    this.webRTCPaused = false
+    let message = this.webRTCMessageQueue.shift()
+    while (message) {
+      if (this._channel.bufferedAmount && this._channel.bufferedAmount > MAX_BUFFERED_AMOUNT) {
+        this.webRTCPaused = true
+        this.webRTCMessageQueue.unshift(message)
+        const listener = () => {
+          this._channel.removeEventListener('bufferedamountlow', listener)
+          this.sendMessageQueued()
+        }
+        this._channel.addEventListener('bufferedamountlow', listener)
+        return
+      }
+      try {
+        super.send(message)
+        message = this.webRTCMessageQueue.shift()
+      } catch (error) {
+        super.destroy()
+        console.warn(error)
+      }
+    }
+  }
+}
+
+export default SimplePeerExtended
diff --git a/src/int64-buffer.min.js b/src/int64-buffer.min.js
new file mode 100644
index 0000000..81da6bf
--- /dev/null
+++ b/src/int64-buffer.min.js
@@ -0,0 +1,25 @@
+/*
+https://github.com/kawanet/int64-buffer
+The MIT License (MIT)
+
+Copyright (c) 2015-2020 Yusuke Kawasaki
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+*/
+var Uint64BE,Int64BE,Uint64LE,Int64LE;!function(t){var r,f="undefined",n=f!==typeof Buffer&&Buffer,e=f!==typeof Uint8Array&&Uint8Array,o=f!==typeof ArrayBuffer&&ArrayBuffer,i=[0,0,0,0,0,0,0,0],u=Array.isArray||function(t){return!!t&&"[object Array]"==Object.prototype.toString.call(t)},a=4294967296;function s(u,s,A){var U=s?0:4,I=s?4:0,L=s?0:3,d=s?1:2,w=s?2:1,m=s?3:0,S=s?b:E,j=s?B:g,x=k.prototype,M="is"+u,N="_"+M;return x.buffer=void 0,x.offset=0,x[N]=!0,x.toNumber=O,x.toString=function(t){var r=this.buffer,f=this.offset,n=_(r,f+U),e=_(r,f+I),o="",i=!A&&2147483648&n;i&&(n=~n,e=a-e);t=t||10;for(;;){var u=n%t*a+e;if(n=Math.floor(n/t),e=Math.floor(u/t),o=(u%t).toString(t)+o,!n&&!e)break}i&&(o="-"+o);return o},x.toJSON=O,x.toArray=c,n&&(x.toBuffer=y),e&&(x.toArrayBuffer=h),k[M]=function(t){return!(!t||!t[N])},t[u]=k,k;function k(t,u,s,c){return this instanceof k?function(t,u,s,c,y){e&&o&&(u instanceof o&&(u=new e(u)),c instanceof o&&(c=new e(c)));if(!(u||s||c||r))return void(t.buffer=p(i,0));if(!v(u,s)){var h=r||Array;y=s,c=u,s=0,u=r===n?n.alloc(8):new h(8)}if(t.buffer=u,t.offset=s|=0,f===typeof c)return;"string"==typeof c?function(t,r,f,n){var e=0,o=f.length,i=0,u=0;"-"===f[0]&&e++;var s=e;for(;e<o;){var c=parseInt(f[e++],n);if(!(c>=0))break;u=u*n+c,i=i*n+Math.floor(u/a),u%=a}s&&(i=~i,u?u=a-u:i++);J(t,r+U,i),J(t,r+I,u)}(u,s,c,y||10):v(c,y)?l(u,s,c,y):"number"==typeof y?(J(u,s+U,c),J(u,s+I,y)):c>0?S(u,s,c):c<0?j(u,s,c):l(u,s,i,0)}(this,t,u,s,c):new k(t,u,s,c)}function O(){var t=this.buffer,r=this.offset,f=_(t,r+U),n=_(t,r+I);return A||(f|=0),f?f*a+n:n}function J(t,r,f){t[r+m]=255&f,f>>=8,t[r+w]=255&f,f>>=8,t[r+d]=255&f,f>>=8,t[r+L]=255&f}function _(t,r){return 16777216*t[r+L]+(t[r+d]<<16)+(t[r+w]<<8)+t[r+m]}}function c(t){var f=this.buffer,n=this.offset;return r=null,!1!==t&&u(f)?8===f.length?f:f.slice(n,n+8):p(f,n)}function y(t){var f=this.buffer,e=this.offset;return r=n,!1!==t&&n.isBuffer(f)?8===f.length?f:f.slice(e,e+8):n.from(h.call(this,t))}function h(t){var f=this.buffer,n=this.offset,i=f.buffer;if(r=e,!1!==t&&!f.offset&&i instanceof o)return 8===i.byteLength?i:i.slice(n,n+8);var u=new e(8);return l(u,0,f,n),u.buffer}function v(t,r){var f=t&&t.length;return r|=0,f&&r+8<=f&&"string"!=typeof t[r]}function l(t,r,f,n){r|=0,n|=0;for(var e=0;e<8;e++)t[r++]=255&f[n++]}function p(t,r){return Array.prototype.slice.call(t,r,r+8)}function b(t,r,f){for(var n=r+8;n>r;)t[--n]=255&f,f/=256}function B(t,r,f){var n=r+8;for(f++;n>r;)t[--n]=255&-f^255,f/=256}function E(t,r,f){for(var n=r+8;r<n;)t[r++]=255&f,f/=256}function g(t,r,f){var n=r+8;for(f++;r<n;)t[r++]=255&-f^255,f/=256}Uint64BE=s("Uint64BE",!0,!0),Int64BE=s("Int64BE",!0,!1),Uint64LE=s("Uint64LE",!1,!0),Int64LE=s("Int64LE",!1,!1)}("object"==typeof exports&&"string"!=typeof exports.nodeName?exports:this||{});
\ No newline at end of file
diff --git a/src/y-webrtc.js b/src/y-webrtc.js
index 0522c71..81d3cfa 100644
--- a/src/y-webrtc.js
+++ b/src/y-webrtc.js
@@ -13,7 +13,9 @@ import * as math from 'lib0/math.js'
 import { createMutex } from 'lib0/mutex.js'
 
 import * as Y from 'yjs' // eslint-disable-line
-import Peer from 'simple-peer/simplepeer.min.js'
+
+// import Peer from 'simple-peer/simplepeer.min.js'
+import Peer from './SimplePeerExtended'
 
 import * as syncProtocol from 'y-protocols/sync.js'
 import * as awarenessProtocol from 'y-protocols/awareness.js'

@bollwyvl
Copy link
Collaborator

Good for reference, but probably wouldn't want to carry at this time in this repo...

@datakurre
Copy link
Author

@bollwyvl As you suggested, I can di pull for jupyterlite with webpack based patch. The upstream looked like it could take some time.

@bollwyvl
Copy link
Collaborator

To sum up the progress on demonstrating this over on #600:

  • this works,
  • windows is weird
  • we should try to hoist the whole thing to a separate package, e.g. jupyterlab-webrtc-docprovider.
    • stock lite would not provide any yjs-related features (even behind a flag)
    • we can start it in this org, but then should consider moving it to jupyterlab-contrib: it would presumably work with stock JupyterLab, and an off-the-shelf singaling server could be hosted as a JupyterHub service, which sounds more robust than sharing jupyter servers.

@bollwyvl
Copy link
Collaborator

@jtpio
Copy link
Member

jtpio commented Apr 15, 2022

Sounds good, thanks @datakurre and @bollwyvl!

@bollwyvl
Copy link
Collaborator

So over on jupyterlite/jupyterlab-webrtc-docprovider#3, I've hit many of the things from #241... probably would be ready to release that fairly soon. As it imports only @jupyterlab (and not @jupyterlite) APIs, and works just fine with main-line lab and retro, I think it wouldn't even have to be marked beta.

Once that happens, I feel like replacing the existing provider in lite core with the nullprovider, and removing the y-webrtc dependency altogether is our best play: at worst, the few pages the reference it could say, _please ensure jupyterlite_webrtc_docprovider is installed, but otherwise wouldn't have to change.

We could add a jupyterlite[webrtc] extra that also grabbed it, but it could just as easily be ignore-sys-prefix which might not be the intended behavior... I suppose we could add an entry_point that extended the jupyter lite build, but that seems unsatisifying.

Anyhow, what I haven't explored yet is a simple, self-hosted signaling server to recommend, e.g. as a jupyterhub service: relying on unmirrorable, public/free services at runtime is always shady, I think it would be a good solution for many use cases, already, over a user sharing the jupyter_server instance 😱 .

@jtpio
Copy link
Member

jtpio commented Apr 19, 2022

Once that happens, I feel like replacing the existing provider in lite core with the nullprovider, and removing the y-webrtc dependency altogether is our best play: at worst, the few pages the reference it could say, _please ensure jupyterlite_webrtc_docprovider is installed, but otherwise wouldn't have to change.

This sounds perfectly fine 👍

Thanks a lot for this!

@bollwyvl
Copy link
Collaborator

Once the conda-forge PR lands, I'll start the PR here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
3 participants