iopsys-feed/quickjs-websocket/src/websocket.js
Vivek Dutta 3b53cd0088
quickjs-websocket: critical performance optimizations
- Check OPTIMIZATIONS.md for details
2026-01-09 10:53:51 +05:30

528 lines
15 KiB
JavaScript

/*
* Copyright (c) 2020 Genexis B.V.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import * as os from 'os'
import * as lws from './lws-client.so'
const CONNECTING = 0
const OPEN = 1
const CLOSING = 2
const CLOSED = 3
const CLOSING1 = 0x10 | CLOSING
const CLOSING2 = 0x20 | CLOSING
function serviceScheduler (context) {
let running = false
let timeout = null
let nextTime = 0
let scheduled = false
function callback () {
if (!running) {
timeout = null
scheduled = false
return
}
const newTime = context.service_periodic()
// Only reschedule if time changed or first run
if (newTime !== nextTime || !scheduled) {
nextTime = newTime
timeout = os.setTimeout(callback, nextTime)
scheduled = true
}
}
return {
start: function () {
if (!running) {
running = true
scheduled = false
timeout = os.setTimeout(callback, 0)
}
},
stop: function () {
running = false
if (timeout) {
os.clearTimeout(timeout)
timeout = null
}
scheduled = false
},
reschedule: function (time) {
if (!running) return
// Only reschedule if the new time is sooner or timer not running
if (!scheduled || time < nextTime) {
if (timeout) os.clearTimeout(timeout)
nextTime = time
timeout = os.setTimeout(callback, time)
scheduled = true
}
}
}
}
// Batch event processing: collect multiple FD events before servicing
const pendingEvents = []
let batchScheduled = false
function processBatch () {
batchScheduled = false
if (pendingEvents.length === 0) return
// Process all pending events in one go
let minTime = Infinity
while (pendingEvents.length > 0) {
const event = pendingEvents.shift()
const nextTime = context.service_fd(event.fd, event.events, event.revents)
if (nextTime < minTime) minTime = nextTime
}
// Reschedule with the earliest timeout
if (minTime !== Infinity) {
service.reschedule(minTime)
}
}
function fdHandler (fd, events, revents) {
return function () {
// Add event to batch queue
pendingEvents.push({ fd, events, revents })
// Schedule batch processing if not already scheduled
if (!batchScheduled) {
batchScheduled = true
os.setTimeout(processBatch, 0)
}
}
}
function contextCallback (wsi, reason, arg) {
switch (reason) {
case lws.LWS_CALLBACK_ADD_POLL_FD:
service.start()
// fallthrough
case lws.LWS_CALLBACK_CHANGE_MODE_POLL_FD:
os.setReadHandler(
arg.fd,
(arg.events & lws.LWS_POLLIN)
? fdHandler(arg.fd, arg.events, lws.LWS_POLLIN)
: null
)
os.setWriteHandler(
arg.fd,
(arg.events & lws.LWS_POLLOUT)
? fdHandler(arg.fd, arg.events, lws.LWS_POLLOUT)
: null
)
break
case lws.LWS_CALLBACK_DEL_POLL_FD:
os.setReadHandler(arg.fd, null)
os.setWriteHandler(arg.fd, null)
break
case lws.LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
wsi.user.error(typeof arg === 'string' ? arg : '')
break
case lws.LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
if (wsi.user.readyState !== CONNECTING) {
return -1
}
wsi.user.protocol = wsi.WSI_TOKEN_PROTOCOL
wsi.user.extensions = wsi.WSI_TOKEN_EXTENSIONS
break
case lws.LWS_CALLBACK_CLIENT_ESTABLISHED:
if (wsi.user.readyState !== CONNECTING) {
return -1
}
wsi.user.wsi = wsi
wsi.user.open()
break
case lws.LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
if (wsi.user.readyState === CLOSED) {
return -1
}
if (arg instanceof Array) {
wsi.user.closeEvent.code = arg[0]
wsi.user.closeEvent.reason = arg[1]
} else {
wsi.user.closeEvent.code = 1005
wsi.user.closeEvent.reason = ''
}
wsi.user.readyState = CLOSING2
break
case lws.LWS_CALLBACK_CLIENT_CLOSED:
wsi.user.close()
break
case lws.LWS_CALLBACK_CLIENT_RECEIVE:
if (!(arg instanceof ArrayBuffer) ||
wsi.user.readyState === CONNECTING ||
wsi.user.readyState === CLOSED) {
return -1
}
if (wsi.is_first_fragment()) {
// Pre-size array based on first fragment
// Assume 2-4 fragments for typical fragmented messages
const estimatedFragments = arg.byteLength < 1024 ? 2 : 4
wsi.user.inbuf = new Array(estimatedFragments)
wsi.user.inbuf[0] = arg
wsi.user.inbufCapacity = 1
} else {
// Grow array if needed
if (wsi.user.inbufCapacity >= wsi.user.inbuf.length) {
wsi.user.inbuf.length = wsi.user.inbuf.length * 2
}
wsi.user.inbuf[wsi.user.inbufCapacity++] = arg
}
if (wsi.is_final_fragment()) {
// Trim array to actual size
wsi.user.inbuf.length = wsi.user.inbufCapacity
wsi.user.message(wsi.frame_is_binary())
}
break
case lws.LWS_CALLBACK_CLIENT_WRITEABLE:
if ((wsi.user.readyState === OPEN || wsi.user.readyState === CLOSING1) &&
wsi.user.outbuf.length > 0) {
const msg = wsi.user.outbuf.shift()
if (msg === null) {
wsi.user.readyState = CLOSING2
return -1
}
// Decrement buffered bytes after message is sent
const msgSize = msg instanceof ArrayBuffer ? msg.byteLength : msg.length
wsi.user.bufferedBytes -= msgSize
wsi.write(msg)
if (wsi.user.outbuf.length > 0) {
wsi.callback_on_writable()
}
}
break
case lws.LWS_CALLBACK_WSI_DESTROY:
if (wsi.context.connections === 0) service.stop()
break
}
return 0
}
lws.set_log_level(lws.LLL_ERR | lws.LLL_WARN)
const context = lws.create_context(contextCallback, true)
const service = serviceScheduler(context)
// Event object pool to reduce allocations
const eventPool = {
open: { type: 'open' },
error: { type: 'error' },
message: { type: 'message', data: null },
close: { type: 'close', code: 1005, reason: '', wasClean: false }
}
function arrayBufferJoin (bufs) {
if (!(bufs instanceof Array)) {
throw new TypeError('Array expected')
}
const bufCount = bufs.length
// Fast path: single buffer
if (bufCount === 1) {
if (!(bufs[0] instanceof ArrayBuffer)) {
throw new TypeError('ArrayBuffer expected')
}
return bufs[0]
}
// Fast path: two buffers (common case for fragmented messages)
if (bufCount === 2) {
const buf0 = bufs[0]
const buf1 = bufs[1]
if (!(buf0 instanceof ArrayBuffer) || !(buf1 instanceof ArrayBuffer)) {
throw new TypeError('ArrayBuffer expected')
}
const len = buf0.byteLength + buf1.byteLength
const array = new Uint8Array(len)
array.set(new Uint8Array(buf0), 0)
array.set(new Uint8Array(buf1), buf0.byteLength)
return array.buffer
}
// General path: multiple buffers - single iteration
let len = 0
for (let i = 0; i < bufCount; i++) {
const buf = bufs[i]
if (!(buf instanceof ArrayBuffer)) {
throw new TypeError('ArrayBuffer expected')
}
len += buf.byteLength
}
const array = new Uint8Array(len)
let offset = 0
for (let i = 0; i < bufCount; i++) {
const buf = bufs[i]
array.set(new Uint8Array(buf), offset)
offset += buf.byteLength
}
return array.buffer
}
export function WebSocket (url, protocols) {
// Use C-based URL parser for better performance
const parsed = lws.parse_url(url)
const { secure, address, port, path } = parsed
const host = address + (port === (secure ? 443 : 80) ? '' : ':' + port)
if (protocols === undefined) {
protocols = []
} else if (!(protocols instanceof Array)) {
protocols = [protocols]
}
const validProto = /^[A-Za-z0-9!#$%&'*+.^_|~-]+$/
if (!protocols.every(function (val) { return validProto.test(val) })) {
throw new TypeError('invalid WebSocket subprotocol name')
}
const proto = protocols.length > 0 ? protocols.join(', ') : null
const self = this
const state = {
url: url,
readyState: CONNECTING,
extensions: '',
protocol: '',
onopen: null,
onerror: null,
onclose: null,
onmessage: null,
wsi: null,
inbuf: [],
inbufCapacity: 0,
outbuf: [],
bufferedBytes: 0,
closeEvent: {
type: 'close',
code: 1005,
reason: '',
wasClean: false
},
open: function () {
if (state.readyState === CONNECTING) {
state.readyState = OPEN
if (state.onopen) {
// Reuse pooled event object
state.onopen.call(self, eventPool.open)
}
}
},
error: function (e) {
if (state.readyState !== CLOSED) {
state.closeEvent.code = 1006
state.closeEvent.reason = String(e)
state.readyState = CLOSED
try {
if (state.onerror) {
// Reuse pooled event object
state.onerror.call(self, eventPool.error)
}
} finally {
if (state.onclose) {
// Reuse pooled close event with state data
eventPool.close.code = state.closeEvent.code
eventPool.close.reason = state.closeEvent.reason
eventPool.close.wasClean = state.closeEvent.wasClean
state.onclose.call(self, eventPool.close)
}
}
}
},
close: function () {
if (state.readyState !== CLOSED) {
state.closeEvent.wasClean = true
state.readyState = CLOSED
if (state.onclose) {
// Reuse pooled close event with state data
eventPool.close.code = state.closeEvent.code
eventPool.close.reason = state.closeEvent.reason
eventPool.close.wasClean = state.closeEvent.wasClean
state.onclose.call(self, eventPool.close)
}
}
},
message: function (binary) {
if (state.inbuf.length > 0) {
const msg = state.inbuf.length === 1
? state.inbuf[0]
: arrayBufferJoin(state.inbuf)
state.inbuf = []
if (state.readyState === OPEN && state.onmessage) {
// Reuse pooled event object
eventPool.message.data = binary ? msg : lws.decode_utf8(msg)
state.onmessage.call(self, eventPool.message)
eventPool.message.data = null
}
}
}
}
this._wsState = state
os.setTimeout(function () {
try {
context.client_connect(
address, port, secure, path, host, null, proto, state
)
} catch (e) {
state.error(e)
}
}, 0)
}
const readyStateConstants = {
CONNECTING: { value: CONNECTING },
OPEN: { value: OPEN },
CLOSING: { value: CLOSING },
CLOSED: { value: CLOSED }
}
Object.defineProperties(WebSocket, readyStateConstants)
Object.defineProperties(WebSocket.prototype, readyStateConstants)
function checkNullOrFunction (val) {
if (val !== null && typeof val !== 'function') {
throw new TypeError('null or Function expected')
}
}
Object.defineProperties(WebSocket.prototype, {
url: { get: function () { return this._wsState.url } },
readyState: { get: function () { return this._wsState.readyState & 0xf } },
extensions: { get: function () { return this._wsState.extensions } },
protocol: { get: function () { return this._wsState.protocol } },
bufferedAmount: {
get: function () {
return this._wsState.bufferedBytes
}
},
binaryType: {
get: function () { return 'arraybuffer' },
set: function (val) {
if (val !== 'arraybuffer') {
throw new TypeError('only "arraybuffer" allowed for "binaryType"')
}
}
},
onopen: {
get: function () { return this._wsState.onopen },
set: function (val) {
checkNullOrFunction(val)
this._wsState.onopen = val
}
},
onerror: {
get: function () { return this._wsState.onerror },
set: function (val) {
checkNullOrFunction(val)
this._wsState.onerror = val
}
},
onclose: {
get: function () { return this._wsState.onclose },
set: function (val) {
checkNullOrFunction(val)
this._wsState.onclose = val
}
},
onmessage: {
get: function () { return this._wsState.onmessage },
set: function (val) {
checkNullOrFunction(val)
this._wsState.onmessage = val
}
}
})
WebSocket.prototype.close = function (code, reason) {
if (code !== undefined) {
code = Math.trunc(code)
reason = reason === undefined ? '' : String(reason)
if (code !== 1000 && !(code >= 3000 && code <= 4999)) {
throw new RangeError('code must be 1000 or between 3000 and 4999')
}
}
const state = this._wsState
if (state.readyState === OPEN) {
if (code !== undefined) {
state.wsi.close_reason(code, reason)
state.closeEvent.code = code
state.closeEvent.reason = reason
}
state.readyState = CLOSING1
state.outbuf.push(null)
state.wsi.callback_on_writable()
} else if (state.readyState === CONNECTING) {
state.readyState = CLOSING2
}
}
WebSocket.prototype.send = function (msg, options) {
const state = this._wsState
if (state.readyState === CONNECTING) {
throw new TypeError('send() not allowed in CONNECTING state')
}
const transfer = options && options.transfer === true
let msgSize
if (msg instanceof ArrayBuffer) {
msgSize = msg.byteLength
// Zero-copy mode: use buffer directly without copying
// WARNING: caller must not modify buffer after send
state.outbuf.push(transfer ? msg : msg.slice(0))
} else if (ArrayBuffer.isView(msg)) {
msgSize = msg.byteLength
if (transfer) {
// Zero-copy: use the underlying buffer directly
state.outbuf.push(
msg.byteOffset === 0 && msg.byteLength === msg.buffer.byteLength
? msg.buffer
: msg.buffer.slice(msg.byteOffset, msg.byteOffset + msg.byteLength)
)
} else {
state.outbuf.push(
msg.buffer.slice(msg.byteOffset, msg.byteOffset + msg.byteLength)
)
}
} else {
const strMsg = String(msg)
msgSize = strMsg.length
state.outbuf.push(strMsg)
}
state.bufferedBytes += msgSize
if (state.readyState === OPEN) {
state.wsi.callback_on_writable()
}
}