From cbb51614dc17681aa165c3e82f275a08ec69dd49 Mon Sep 17 00:00:00 2001 From: NI Date: Wed, 18 Dec 2019 21:36:55 +0800 Subject: [PATCH] Allow sender to buffer requests --- ui/socket.js | 4 +- ui/stream/sender.js | 144 +++++++++++++++++++++++++++++++++++---- ui/stream/sender_test.js | 50 ++++++++------ 3 files changed, 165 insertions(+), 33 deletions(-) diff --git a/ui/socket.js b/ui/socket.js index a41980a..cad8b6a 100644 --- a/ui/socket.js +++ b/ui/socket.js @@ -189,7 +189,9 @@ class Dial { throw e; } }, - 4096 - 64 // Server has a 4096 bytes receive buffer, can be no greater + 4096 - 64, // Server has a 4096 bytes receive buffer, can be no greater, + 30, // 30ms input delay + 10 // max 10 buffered requests ); let senderNonce = crypt.generateNonce(); diff --git a/ui/stream/sender.js b/ui/stream/sender.js index bc144be..af2da04 100644 --- a/ui/stream/sender.js +++ b/ui/stream/sender.js @@ -17,20 +17,82 @@ import Exception from "./exception.js"; import * as subscribe from "./subscribe.js"; -import * as common from "./common.js"; export class Sender { /** * constructor * * @param {function} sender Underlaying sender + * @param {integer} maxSegSize The size of max data segment + * @param {integer} bufferFlushDelay Buffer flush delay + * @param {integer} maxBufferedRequests Buffer flush delay * */ - constructor(sender, maxSegSize) { + constructor(sender, maxSegSize, bufferFlushDelay, maxBufferedRequests) { this.sender = sender; this.maxSegSize = maxSegSize; this.subscribe = new subscribe.Subscribe(); this.sendingPoc = this.sending(); + this.sendDelay = null; + this.bufferFlushDelay = bufferFlushDelay; + this.maxBufferedRequests = maxBufferedRequests; + this.buffer = new Uint8Array(maxSegSize); + this.bufferUsed = 0; + this.bufferReq = 0; + } + + /** + * Sends data to the this.sender + * + * @param {Uint8Array} data to send + * @param {Array} callbacks to call to return send result + * + */ + async sendData(data, callbacks) { + try { + await this.sender(data); + + for (let i in callbacks) { + callbacks[i].resolve(); + } + } catch (e) { + for (let i in callbacks) { + callbacks[i].reject(e); + } + } + } + + /** + * Append data to the end of internal buffer + * + * @param {Uint8Array} data data to add + * + * @returns {integer} How many bytes of data is added + * + */ + appendBuffer(data) { + const remainSize = this.buffer.length - this.bufferUsed, + appendLength = data.length > remainSize ? remainSize : data.length; + + this.buffer.set(data.slice(0, appendLength), this.bufferUsed); + this.bufferUsed += appendLength; + + return appendLength; + } + + /** + * Export current buffer and reset it to empty + * + * @returns {Uint8Array} Exported buffer + * + */ + exportBuffer() { + const buffer = this.buffer.slice(0, this.bufferUsed); + + this.bufferUsed = 0; + this.bufferedRequests = 0; + + return buffer; } /** @@ -38,19 +100,45 @@ export class Sender { * */ async sending() { + let callbacks = []; + for (;;) { const fetched = await this.subscribe.subscribe(); - try { - const dataSegs = common.separateBuffer(fetched.data, this.maxSegSize); - - for (let i in dataSegs) { - await this.sender(dataSegs[i]); + // Force flush? + if (fetched === true) { + if (this.bufferUsed <= 0) { + continue; } - fetched.resolve(); - } catch (e) { - fetched.reject(e); + await this.sendData(this.exportBuffer(), callbacks); + callbacks = []; + + continue; + } + + callbacks.push({ + resolve: fetched.resolve, + reject: fetched.reject + }); + + // Add data to buffer and maybe flush when the buffer is full + let currentSendDataLen = 0; + + while (fetched.data.length > currentSendDataLen) { + const sentLen = this.appendBuffer( + fetched.data.slice(currentSendDataLen, fetched.data.length) + ); + + // Buffer not full, wait for the force flush + if (this.buffer.length > this.bufferUsed) { + break; + } + + currentSendDataLen += sentLen; + + await this.sendData(this.exportBuffer(), callbacks); + callbacks = []; } } } @@ -60,8 +148,14 @@ export class Sender { * */ close() { + if (this.sendDelay !== null) { + clearTimeout(this.sendDelay); + this.sendDelay = null; + } + this.buffered = null; - this.bufferedSize = 0; + this.bufferUsed = 0; + this.bufferedRequests = 0; this.subscribe.reject(new Exception("Sender has been cleared", false)); this.subscribe.disable(); @@ -81,12 +175,38 @@ export class Sender { * */ send(data) { + let delayCleared = false; + + if (this.sendDelay !== null) { + clearTimeout(this.sendDelay); + this.sendDelay = null; + delayCleared = true; + } + + const self = this; + return new Promise((resolve, reject) => { - this.subscribe.resolve({ + self.subscribe.resolve({ data: data, resolve: resolve, reject: reject }); + + if (self.bufferedRequests >= self.maxBufferedRequests) { + self.bufferedRequests = 0; + + self.subscribe.resolve(true); + + return; + } + + if (delayCleared) { + self.bufferedRequests++; + } + + self.sendDelay = setTimeout(() => { + self.subscribe.resolve(true); + }, self.bufferFlushDelay); }); } } diff --git a/ui/stream/sender_test.js b/ui/stream/sender_test.js index 464b352..bab59b1 100644 --- a/ui/stream/sender_test.js +++ b/ui/stream/sender_test.js @@ -32,17 +32,22 @@ describe("Sender", () => { it("Send", async () => { const maxSegSize = 64; let result = []; - let sd = new sender.Sender(rawData => { - return new Promise(resolve => { - setTimeout(() => { - for (let i in rawData) { - result.push(rawData[i]); - } + let sd = new sender.Sender( + rawData => { + return new Promise(resolve => { + setTimeout(() => { + for (let i in rawData) { + result.push(rawData[i]); + } - resolve(); - }, 5); - }); - }, maxSegSize); + resolve(); + }, 5); + }); + }, + maxSegSize, + 300, + 3 + ); let expected = generateTestData(maxSegSize * 16); sd.send(expected); @@ -67,17 +72,22 @@ describe("Sender", () => { it("Send (Multiple calls)", async () => { const maxSegSize = 64; let result = []; - let sd = new sender.Sender(rawData => { - return new Promise(resolve => { - setTimeout(() => { - for (let i in rawData) { - result.push(rawData[i]); - } + let sd = new sender.Sender( + rawData => { + return new Promise(resolve => { + setTimeout(() => { + for (let i in rawData) { + result.push(rawData[i]); + } - resolve(); - }, 10); - }); - }, maxSegSize); + resolve(); + }, 10); + }); + }, + maxSegSize, + 300, + 100 + ); let expectedSingle = generateTestData(maxSegSize * 2), expectedLen = expectedSingle.length * 16, expected = new Uint8Array(expectedLen);