Allow sender to buffer requests

This commit is contained in:
NI
2019-12-18 21:36:55 +08:00
parent 67060b6b70
commit cbb51614dc
3 changed files with 165 additions and 33 deletions

View File

@@ -189,7 +189,9 @@ class Dial {
throw e; throw e;
} }
}, },
4096 - 64 // Server has a 4096 bytes receive buffer, can be no greater 4096 - 64, // Server has a 4096 bytes receive buffer, can be no greater,
30, // 30ms input delay
10 // max 10 buffered requests
); );
let senderNonce = crypt.generateNonce(); let senderNonce = crypt.generateNonce();

View File

@@ -17,20 +17,82 @@
import Exception from "./exception.js"; import Exception from "./exception.js";
import * as subscribe from "./subscribe.js"; import * as subscribe from "./subscribe.js";
import * as common from "./common.js";
export class Sender { export class Sender {
/** /**
* constructor * constructor
* *
* @param {function} sender Underlaying sender * @param {function} sender Underlaying sender
* @param {integer} maxSegSize The size of max data segment
* @param {integer} bufferFlushDelay Buffer flush delay
* @param {integer} maxBufferedRequests Buffer flush delay
* *
*/ */
constructor(sender, maxSegSize) { constructor(sender, maxSegSize, bufferFlushDelay, maxBufferedRequests) {
this.sender = sender; this.sender = sender;
this.maxSegSize = maxSegSize; this.maxSegSize = maxSegSize;
this.subscribe = new subscribe.Subscribe(); this.subscribe = new subscribe.Subscribe();
this.sendingPoc = this.sending(); this.sendingPoc = this.sending();
this.sendDelay = null;
this.bufferFlushDelay = bufferFlushDelay;
this.maxBufferedRequests = maxBufferedRequests;
this.buffer = new Uint8Array(maxSegSize);
this.bufferUsed = 0;
this.bufferReq = 0;
}
/**
* Sends data to the this.sender
*
* @param {Uint8Array} data to send
* @param {Array<function>} callbacks to call to return send result
*
*/
async sendData(data, callbacks) {
try {
await this.sender(data);
for (let i in callbacks) {
callbacks[i].resolve();
}
} catch (e) {
for (let i in callbacks) {
callbacks[i].reject(e);
}
}
}
/**
* Append data to the end of internal buffer
*
* @param {Uint8Array} data data to add
*
* @returns {integer} How many bytes of data is added
*
*/
appendBuffer(data) {
const remainSize = this.buffer.length - this.bufferUsed,
appendLength = data.length > remainSize ? remainSize : data.length;
this.buffer.set(data.slice(0, appendLength), this.bufferUsed);
this.bufferUsed += appendLength;
return appendLength;
}
/**
* Export current buffer and reset it to empty
*
* @returns {Uint8Array} Exported buffer
*
*/
exportBuffer() {
const buffer = this.buffer.slice(0, this.bufferUsed);
this.bufferUsed = 0;
this.bufferedRequests = 0;
return buffer;
} }
/** /**
@@ -38,19 +100,45 @@ export class Sender {
* *
*/ */
async sending() { async sending() {
let callbacks = [];
for (;;) { for (;;) {
const fetched = await this.subscribe.subscribe(); const fetched = await this.subscribe.subscribe();
try { // Force flush?
const dataSegs = common.separateBuffer(fetched.data, this.maxSegSize); if (fetched === true) {
if (this.bufferUsed <= 0) {
for (let i in dataSegs) { continue;
await this.sender(dataSegs[i]);
} }
fetched.resolve(); await this.sendData(this.exportBuffer(), callbacks);
} catch (e) { callbacks = [];
fetched.reject(e);
continue;
}
callbacks.push({
resolve: fetched.resolve,
reject: fetched.reject
});
// Add data to buffer and maybe flush when the buffer is full
let currentSendDataLen = 0;
while (fetched.data.length > currentSendDataLen) {
const sentLen = this.appendBuffer(
fetched.data.slice(currentSendDataLen, fetched.data.length)
);
// Buffer not full, wait for the force flush
if (this.buffer.length > this.bufferUsed) {
break;
}
currentSendDataLen += sentLen;
await this.sendData(this.exportBuffer(), callbacks);
callbacks = [];
} }
} }
} }
@@ -60,8 +148,14 @@ export class Sender {
* *
*/ */
close() { close() {
if (this.sendDelay !== null) {
clearTimeout(this.sendDelay);
this.sendDelay = null;
}
this.buffered = null; this.buffered = null;
this.bufferedSize = 0; this.bufferUsed = 0;
this.bufferedRequests = 0;
this.subscribe.reject(new Exception("Sender has been cleared", false)); this.subscribe.reject(new Exception("Sender has been cleared", false));
this.subscribe.disable(); this.subscribe.disable();
@@ -81,12 +175,38 @@ export class Sender {
* *
*/ */
send(data) { send(data) {
let delayCleared = false;
if (this.sendDelay !== null) {
clearTimeout(this.sendDelay);
this.sendDelay = null;
delayCleared = true;
}
const self = this;
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this.subscribe.resolve({ self.subscribe.resolve({
data: data, data: data,
resolve: resolve, resolve: resolve,
reject: reject reject: reject
}); });
if (self.bufferedRequests >= self.maxBufferedRequests) {
self.bufferedRequests = 0;
self.subscribe.resolve(true);
return;
}
if (delayCleared) {
self.bufferedRequests++;
}
self.sendDelay = setTimeout(() => {
self.subscribe.resolve(true);
}, self.bufferFlushDelay);
}); });
} }
} }

View File

@@ -32,7 +32,8 @@ describe("Sender", () => {
it("Send", async () => { it("Send", async () => {
const maxSegSize = 64; const maxSegSize = 64;
let result = []; let result = [];
let sd = new sender.Sender(rawData => { let sd = new sender.Sender(
rawData => {
return new Promise(resolve => { return new Promise(resolve => {
setTimeout(() => { setTimeout(() => {
for (let i in rawData) { for (let i in rawData) {
@@ -42,7 +43,11 @@ describe("Sender", () => {
resolve(); resolve();
}, 5); }, 5);
}); });
}, maxSegSize); },
maxSegSize,
300,
3
);
let expected = generateTestData(maxSegSize * 16); let expected = generateTestData(maxSegSize * 16);
sd.send(expected); sd.send(expected);
@@ -67,7 +72,8 @@ describe("Sender", () => {
it("Send (Multiple calls)", async () => { it("Send (Multiple calls)", async () => {
const maxSegSize = 64; const maxSegSize = 64;
let result = []; let result = [];
let sd = new sender.Sender(rawData => { let sd = new sender.Sender(
rawData => {
return new Promise(resolve => { return new Promise(resolve => {
setTimeout(() => { setTimeout(() => {
for (let i in rawData) { for (let i in rawData) {
@@ -77,7 +83,11 @@ describe("Sender", () => {
resolve(); resolve();
}, 10); }, 10);
}); });
}, maxSegSize); },
maxSegSize,
300,
100
);
let expectedSingle = generateTestData(maxSegSize * 2), let expectedSingle = generateTestData(maxSegSize * 2),
expectedLen = expectedSingle.length * 16, expectedLen = expectedSingle.length * 16,
expected = new Uint8Array(expectedLen); expected = new Uint8Array(expectedLen);