iopsys-feed/quickjs-websocket/src/websocket.js
2021-12-01 16:22:58 +01:00

415 lines
11 KiB
JavaScript

/*
* Copyright (c) 2020 Genexis B.V.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import * as os from 'os'
import * as lws from './lws-client.so'
const CONNECTING = 0
const OPEN = 1
const CLOSING = 2
const CLOSED = 3
const CLOSING1 = 0x10 | CLOSING
const CLOSING2 = 0x20 | CLOSING
function serviceScheduler (context) {
let running = false
let timeout = null
function schedule (time) {
if (timeout) os.clearTimeout(timeout)
timeout = running ? os.setTimeout(callback, time) : null
}
function callback () {
schedule(context.service_periodic())
}
return {
start: function () {
running = true
schedule(0)
},
stop: function () {
running = false
schedule(0)
},
reschedule: schedule
}
}
function fdHandler (fd, events, revents) {
return function () {
service.reschedule(context.service_fd(fd, events, revents))
}
}
function contextCallback (wsi, reason, arg) {
switch (reason) {
case lws.LWS_CALLBACK_ADD_POLL_FD:
service.start()
// fallthrough
case lws.LWS_CALLBACK_CHANGE_MODE_POLL_FD:
os.setReadHandler(
arg.fd,
(arg.events & lws.LWS_POLLIN)
? fdHandler(arg.fd, arg.events, lws.LWS_POLLIN)
: null
)
os.setWriteHandler(
arg.fd,
(arg.events & lws.LWS_POLLOUT)
? fdHandler(arg.fd, arg.events, lws.LWS_POLLOUT)
: null
)
break
case lws.LWS_CALLBACK_DEL_POLL_FD:
os.setReadHandler(arg.fd, null)
os.setWriteHandler(arg.fd, null)
break
case lws.LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
wsi.user.error(typeof arg === 'string' ? arg : '')
break
case lws.LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
if (wsi.user.readyState !== CONNECTING) {
return -1
}
wsi.user.protocol = wsi.WSI_TOKEN_PROTOCOL
wsi.user.extensions = wsi.WSI_TOKEN_EXTENSIONS
break
case lws.LWS_CALLBACK_CLIENT_ESTABLISHED:
if (wsi.user.readyState !== CONNECTING) {
return -1
}
wsi.user.wsi = wsi
wsi.user.open()
break
case lws.LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
if (wsi.user.readyState === CLOSED) {
return -1
}
if (arg instanceof Array) {
wsi.user.closeEvent.code = arg[0]
wsi.user.closeEvent.reason = arg[1]
} else {
wsi.user.closeEvent.code = 1005
wsi.user.closeEvent.reason = ''
}
wsi.user.readyState = CLOSING2
break
case lws.LWS_CALLBACK_CLIENT_CLOSED:
wsi.user.close()
break
case lws.LWS_CALLBACK_CLIENT_RECEIVE:
if (!(arg instanceof ArrayBuffer) ||
wsi.user.readyState === CONNECTING ||
wsi.user.readyState === CLOSED) {
return -1
}
if (wsi.is_first_fragment()) {
wsi.user.inbuf = []
}
wsi.user.inbuf.push(arg)
if (wsi.is_final_fragment()) {
wsi.user.message(wsi.frame_is_binary())
}
break
case lws.LWS_CALLBACK_CLIENT_WRITEABLE:
if ((wsi.user.readyState === OPEN || wsi.user.readyState === CLOSING1) &&
wsi.user.outbuf.length > 0) {
const msg = wsi.user.outbuf.shift()
if (msg === null) {
wsi.user.readyState = CLOSING2
return -1
}
wsi.write(msg)
if (wsi.user.outbuf.length > 0) {
wsi.callback_on_writable()
}
}
break
case lws.LWS_CALLBACK_WSI_DESTROY:
if (wsi.context.connections === 0) service.stop()
break
}
return 0
}
lws.set_log_level(lws.LLL_ERR | lws.LLL_WARN)
const context = lws.create_context(contextCallback, true)
const service = serviceScheduler(context)
function arrayBufferJoin (bufs) {
if (!(bufs instanceof Array)) {
throw new TypeError('Array expected')
}
if (!bufs.every(function (val) { return val instanceof ArrayBuffer })) {
throw new TypeError('ArrayBuffer expected')
}
const len = bufs.reduce(function (acc, val) {
return acc + val.byteLength
}, 0)
const array = new Uint8Array(len)
let offset = 0
for (const b of bufs) {
array.set(new Uint8Array(b), offset)
offset += b.byteLength
}
return array.buffer
}
export function WebSocket (url, protocols) {
const pattern = /^(ws|wss):\/\/([^/?#]*)([^#]*)$/i
const match = pattern.exec(url)
if (match === null) {
throw new TypeError('invalid WebSocket URL')
}
const secure = match[1].toLowerCase() === 'wss'
const host = match[2]
const path = match[3].startsWith('/') ? match[3] : '/' + match[3]
const hostPattern = /^(?:([a-z\d.-]+)|\[([\da-f:]+:[\da-f.]*)\])(?::(\d*))?$/i
const hostMatch = hostPattern.exec(host)
if (hostMatch === null) {
throw new TypeError('invalid WebSocket URL')
}
const address = hostMatch[1] || hostMatch[2]
const port = hostMatch[3] ? parseInt(hostMatch[3]) : (secure ? 443 : 80)
const validPath = /^\/[A-Za-z0-9_.!~*'()%:@&=+$,;/?-]*$/
if (!validPath.test(path)) {
throw new TypeError('invalid WebSocket URL')
}
if (!(port >= 1 && port <= 65535)) {
throw new RangeError('port must be between 1 and 65535')
}
if (protocols === undefined) {
protocols = []
} else if (!(protocols instanceof Array)) {
protocols = [protocols]
}
const validProto = /^[A-Za-z0-9!#$%&'*+.^_|~-]+$/
if (!protocols.every(function (val) { return validProto.test(val) })) {
throw new TypeError('invalid WebSocket subprotocol name')
}
const proto = protocols.length > 0 ? protocols.join(', ') : null
const self = this
const state = {
url: url,
readyState: CONNECTING,
extensions: '',
protocol: '',
onopen: null,
onerror: null,
onclose: null,
onmessage: null,
wsi: null,
inbuf: [],
outbuf: [],
closeEvent: {
type: 'close',
code: 1005,
reason: '',
wasClean: false
},
open: function () {
if (state.readyState === CONNECTING) {
state.readyState = OPEN
if (state.onopen) {
state.onopen.call(self, { type: 'open' })
}
}
},
error: function (e) {
if (state.readyState !== CLOSED) {
state.closeEvent.code = 1006
state.closeEvent.reason = String(e)
state.readyState = CLOSED
try {
if (state.onerror) {
state.onerror.call(self, { type: 'error' })
}
} finally {
if (state.onclose) {
state.onclose.call(self, Object.assign({}, state.closeEvent))
}
}
}
},
close: function () {
if (state.readyState !== CLOSED) {
state.closeEvent.wasClean = true
state.readyState = CLOSED
if (state.onclose) {
state.onclose.call(self, Object.assign({}, state.closeEvent))
}
}
},
message: function (binary) {
if (state.inbuf.length > 0) {
const msg = state.inbuf.length === 1
? state.inbuf[0]
: arrayBufferJoin(state.inbuf)
state.inbuf = []
if (state.readyState === OPEN && state.onmessage) {
state.onmessage.call(self, {
type: 'messasge',
data: binary ? msg : lws.decode_utf8(msg)
})
}
}
}
}
this._wsState = state
os.setTimeout(function () {
try {
context.client_connect(
address, port, secure, path, host, null, proto, state
)
} catch (e) {
state.error(e)
}
}, 0)
}
const readyStateConstants = {
CONNECTING: { value: CONNECTING },
OPEN: { value: OPEN },
CLOSING: { value: CLOSING },
CLOSED: { value: CLOSED }
}
Object.defineProperties(WebSocket, readyStateConstants)
Object.defineProperties(WebSocket.prototype, readyStateConstants)
function checkNullOrFunction (val) {
if (val !== null && typeof val !== 'function') {
throw new TypeError('null or Function expected')
}
}
Object.defineProperties(WebSocket.prototype, {
url: { get: function () { return this._wsState.url } },
readyState: { get: function () { return this._wsState.readyState & 0xf } },
extensions: { get: function () { return this._wsState.extensions } },
protocol: { get: function () { return this._wsState.protocol } },
bufferedAmount: {
get: function () {
return this._wsState.outbuf.reduce(function (acc, val) {
if (val instanceof ArrayBuffer) {
acc += val.byteLength
} else if (typeof val === 'string') {
acc += val.length
}
return acc
}, 0)
}
},
binaryType: {
get: function () { return 'arraybuffer' },
set: function (val) {
if (val !== 'arraybuffer') {
throw new TypeError('only "arraybuffer" allowed for "binaryType"')
}
}
},
onopen: {
get: function () { return this._wsState.onopen },
set: function (val) {
checkNullOrFunction(val)
this._wsState.onopen = val
}
},
onerror: {
get: function () { return this._wsState.onerror },
set: function (val) {
checkNullOrFunction(val)
this._wsState.onerror = val
}
},
onclose: {
get: function () { return this._wsState.onclose },
set: function (val) {
checkNullOrFunction(val)
this._wsState.onclose = val
}
},
onmessage: {
get: function () { return this._wsState.onmessage },
set: function (val) {
checkNullOrFunction(val)
this._wsState.onmessage = val
}
}
})
WebSocket.prototype.close = function (code, reason) {
if (code !== undefined) {
code = Math.trunc(code)
reason = reason === undefined ? '' : String(reason)
if (code !== 1000 && !(code >= 3000 && code <= 4999)) {
throw new RangeError('code must be 1000 or between 3000 and 4999')
}
}
const state = this._wsState
if (state.readyState === OPEN) {
if (code !== undefined) {
state.wsi.close_reason(code, reason)
state.closeEvent.code = code
state.closeEvent.reason = reason
}
state.readyState = CLOSING1
state.outbuf.push(null)
state.wsi.callback_on_writable()
} else if (state.readyState === CONNECTING) {
state.readyState = CLOSING2
}
}
WebSocket.prototype.send = function (msg) {
const state = this._wsState
if (state.readyState === CONNECTING) {
throw new TypeError('send() not allowed in CONNECTING state')
}
if (msg instanceof ArrayBuffer) {
state.outbuf.push(msg.slice(0))
} else if (ArrayBuffer.isView(msg)) {
state.outbuf.push(
msg.buffer.slice(msg.byteOffset, msg.byteOffset + msg.byteLength)
)
} else {
state.outbuf.push(String(msg))
}
if (state.readyState === OPEN) {
state.wsi.callback_on_writable()
}
}