From 2d47cd004c69690ba120fbc238875ab2cdd69091 Mon Sep 17 00:00:00 2001 From: NI Date: Thu, 19 Sep 2019 15:04:20 +0800 Subject: [PATCH] Refactor the stream sender --- ui/socket.js | 1 - ui/stream/common.js | 29 ++++++++ ui/stream/common_test.js | 146 +++++++++++++++++++++++++++++++++++++++ ui/stream/sender.js | 143 ++++++-------------------------------- ui/stream/sender_test.js | 111 +++++++++++++++++++++++++++++ ui/stream/streams.js | 2 +- 6 files changed, 309 insertions(+), 123 deletions(-) create mode 100644 ui/stream/common_test.js create mode 100644 ui/stream/sender_test.js diff --git a/ui/socket.js b/ui/socket.js index 96faaa3..ed6286e 100644 --- a/ui/socket.js +++ b/ui/socket.js @@ -187,7 +187,6 @@ class Dial { throw e; } }, - 15, 4096 - 64 // Server has a 4096 bytes receive buffer, can be no greater ); diff --git a/ui/stream/common.js b/ui/stream/common.js index 5f2a25c..1998a76 100644 --- a/ui/stream/common.js +++ b/ui/stream/common.js @@ -46,3 +46,32 @@ export function getRands(n, min, max) { return r; } + +/** + * Separate given buffer to multiple ones based on input max length + * + * @param {Uint8Array} buf Buffer to separate + * @param {number} max Max length of each buffer + * + * @returns {Array} Separated buffers + * + */ +export function separateBuffer(buf, max) { + let start = 0, + result = []; + + while (start < buf.length) { + let remain = buf.length - start; + + if (remain <= max) { + result.push(buf.slice(start, start + remain)); + + return result; + } + + remain = max; + + result.push(buf.slice(start, start + remain)); + start += remain; + } +} diff --git a/ui/stream/common_test.js b/ui/stream/common_test.js new file mode 100644 index 0000000..6505d3e --- /dev/null +++ b/ui/stream/common_test.js @@ -0,0 +1,146 @@ +// Sshwifty - A Web SSH client +// +// Copyright (C) 2019 Rui NI +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +import assert from "assert"; +import * as common from "./common.js"; + +describe("Common", () => { + it("separateBuffer", async () => { + let resultArr = []; + const expected = new Uint8Array([ + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9 + ]), + sepSeg = common.separateBuffer(expected, 16); + + sepSeg.forEach(d => { + resultArr.push(...d); + }); + + const result = new Uint8Array(resultArr); + + assert.deepEqual(result, expected); + }); +}); diff --git a/ui/stream/sender.js b/ui/stream/sender.js index 644a5ff..f5dd93e 100644 --- a/ui/stream/sender.js +++ b/ui/stream/sender.js @@ -17,26 +17,20 @@ import Exception from "./exception.js"; import * as subscribe from "./subscribe.js"; +import * as common from "./common.js"; export class Sender { /** * constructor * * @param {function} sender Underlaying sender - * @param {number} bufferDelay in ms * */ - constructor(sender, bufferDelay, maxSegSize) { + constructor(sender, maxSegSize) { this.sender = sender; - this.delay = bufferDelay; this.maxSegSize = maxSegSize; - this.timeout = null; - this.buffered = new Uint8Array(this.maxSegSize); - this.bufferedSize = 0; this.subscribe = new subscribe.Subscribe(); this.sendingPoc = this.sending(); - this.resolves = []; - this.rejects = []; } /** @@ -45,9 +39,19 @@ export class Sender { */ async sending() { for (;;) { - let fetched = await this.subscribe.subscribe(); + const fetched = await this.subscribe.subscribe(); - await this.sender(fetched); + try { + const dataSegs = common.separateBuffer(fetched.data, this.maxSegSize); + + for (let i in dataSegs) { + await this.sender(dataSegs[i]); + } + + fetched.resolve(); + } catch (e) { + fetched.reject(e); + } } } @@ -55,7 +59,7 @@ export class Sender { * Clear everything * */ - async clear() { + close() { if (this.timeout !== null) { clearTimeout(this.timeout); this.timeout = null; @@ -64,104 +68,10 @@ export class Sender { this.buffered = null; this.bufferedSize = 0; - this.subscribe.reject(new Exception("Sender has been closed", false)); + this.subscribe.reject(new Exception("Sender has been cleared", false)); + this.subscribe.disable(); this.sendingPoc.catch(() => {}); - - this.reject(new Exception("Sending has been cancelled", true)); - } - - /** - * Call resolves - * - * @param {any} d Data - */ - resolve(d) { - for (let i in this.resolves) { - this.resolves[i](d); - } - - this.resolves = []; - this.rejects = []; - } - - /** - * Call rejects - * - * @param {any} d Data - */ - reject(d) { - for (let i in this.rejects) { - this.rejects[i](d); - } - - this.resolves = []; - this.rejects = []; - } - - /** - * Send buffer to the sender - * - */ - flushBuffer() { - if (this.bufferedSize <= 0) { - return; - } - - if (this.timeout !== null) { - clearTimeout(this.timeout); - this.timeout = null; - } - - this.resolve(true); - - let d = this.buffered.slice(0, this.bufferedSize); - - this.subscribe.resolve(d); - - if (d.length >= this.buffered.length) { - this.buffered = new Uint8Array(this.maxSegSize); - this.bufferedSize = 0; - } else { - this.buffered = this.buffered.slice(d.length, this.buffered.length); - this.bufferedSize = 0; - } - } - - /** - * Append buffer to internal data storage - * - * @param {Uint8Array} buf Buffer data - */ - appendBuffer(buf) { - let remain = this.buffered.length - this.bufferedSize; - - if (remain <= 0) { - this.flushBuffer(); - - remain = this.buffered.length - this.bufferedSize; - } - - let start = 0, - end = remain; - - while (start < buf.length) { - if (end > buf.length) { - end = buf.length; - } - - let d = buf.slice(start, end); - - this.buffered.set(d, this.bufferedSize); - this.bufferedSize += d.length; - - if (this.buffered.length >= this.bufferedSize) { - this.flushBuffer(); - } - - start += d.length; - end = start + (this.buffered.length - this.bufferedSize); - } } /** @@ -176,21 +86,12 @@ export class Sender { * */ send(data) { - let self = this; - return new Promise((resolve, reject) => { - self.resolves.push(resolve); - self.rejects.push(reject); - - this.appendBuffer(data); - - if (this.bufferedSize <= 0) { - return; - } - - self.timeout = setTimeout(() => { - self.flushBuffer(); - }, self.delay); + this.subscribe.resolve({ + data: data, + resolve: resolve, + reject: reject + }); }); } } diff --git a/ui/stream/sender_test.js b/ui/stream/sender_test.js new file mode 100644 index 0000000..464b352 --- /dev/null +++ b/ui/stream/sender_test.js @@ -0,0 +1,111 @@ +// Sshwifty - A Web SSH client +// +// Copyright (C) 2019 Rui NI +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +import assert from "assert"; +import * as sender from "./sender.js"; + +describe("Sender", () => { + function generateTestData(size) { + let d = new Uint8Array(size); + + for (let i = 0; i < d.length; i++) { + d[i] = i % 256; + } + + return d; + } + + it("Send", async () => { + const maxSegSize = 64; + let result = []; + let sd = new sender.Sender(rawData => { + return new Promise(resolve => { + setTimeout(() => { + for (let i in rawData) { + result.push(rawData[i]); + } + + resolve(); + }, 5); + }); + }, maxSegSize); + let expected = generateTestData(maxSegSize * 16); + + sd.send(expected); + + let sendCompleted = new Promise(resolve => { + let timer = setInterval(() => { + if (result.length < expected.length) { + return; + } + + clearInterval(timer); + timer = null; + resolve(); + }, 100); + }); + + await sendCompleted; + + assert.deepEqual(new Uint8Array(result), expected); + }); + + it("Send (Multiple calls)", async () => { + const maxSegSize = 64; + let result = []; + let sd = new sender.Sender(rawData => { + return new Promise(resolve => { + setTimeout(() => { + for (let i in rawData) { + result.push(rawData[i]); + } + + resolve(); + }, 10); + }); + }, maxSegSize); + let expectedSingle = generateTestData(maxSegSize * 2), + expectedLen = expectedSingle.length * 16, + expected = new Uint8Array(expectedLen); + + for (let i = 0; i < expectedLen; i += expectedSingle.length) { + expected.set(expectedSingle, i); + } + + for (let i = 0; i < expectedLen; i += expectedSingle.length) { + setTimeout(() => { + sd.send(expectedSingle); + }, 100); + } + + let sendCompleted = new Promise(resolve => { + let timer = setInterval(() => { + if (result.length < expectedLen) { + return; + } + + clearInterval(timer); + timer = null; + resolve(); + }, 100); + }); + + await sendCompleted; + + assert.deepEqual(new Uint8Array(result), expected); + }); +}); diff --git a/ui/stream/streams.js b/ui/stream/streams.js index d1f59dd..e93c494 100644 --- a/ui/stream/streams.js +++ b/ui/stream/streams.js @@ -139,7 +139,7 @@ export class Streams { } try { - this.sender.clear(); + this.sender.close(); } catch (e) { process.env.NODE_ENV === "development" && console.trace(e); }