Refactor the stream sender

This commit is contained in:
NI
2019-09-19 15:04:20 +08:00
parent 2736a3db9d
commit 2d47cd004c
6 changed files with 309 additions and 123 deletions

View File

@@ -17,26 +17,20 @@
import Exception from "./exception.js";
import * as subscribe from "./subscribe.js";
import * as common from "./common.js";
export class Sender {
/**
* constructor
*
* @param {function} sender Underlaying sender
* @param {number} bufferDelay in ms
*
*/
constructor(sender, bufferDelay, maxSegSize) {
constructor(sender, maxSegSize) {
this.sender = sender;
this.delay = bufferDelay;
this.maxSegSize = maxSegSize;
this.timeout = null;
this.buffered = new Uint8Array(this.maxSegSize);
this.bufferedSize = 0;
this.subscribe = new subscribe.Subscribe();
this.sendingPoc = this.sending();
this.resolves = [];
this.rejects = [];
}
/**
@@ -45,9 +39,19 @@ export class Sender {
*/
async sending() {
for (;;) {
let fetched = await this.subscribe.subscribe();
const fetched = await this.subscribe.subscribe();
await this.sender(fetched);
try {
const dataSegs = common.separateBuffer(fetched.data, this.maxSegSize);
for (let i in dataSegs) {
await this.sender(dataSegs[i]);
}
fetched.resolve();
} catch (e) {
fetched.reject(e);
}
}
}
@@ -55,7 +59,7 @@ export class Sender {
* Clear everything
*
*/
async clear() {
close() {
if (this.timeout !== null) {
clearTimeout(this.timeout);
this.timeout = null;
@@ -64,104 +68,10 @@ export class Sender {
this.buffered = null;
this.bufferedSize = 0;
this.subscribe.reject(new Exception("Sender has been closed", false));
this.subscribe.reject(new Exception("Sender has been cleared", false));
this.subscribe.disable();
this.sendingPoc.catch(() => {});
this.reject(new Exception("Sending has been cancelled", true));
}
/**
* Call resolves
*
* @param {any} d Data
*/
resolve(d) {
for (let i in this.resolves) {
this.resolves[i](d);
}
this.resolves = [];
this.rejects = [];
}
/**
* Call rejects
*
* @param {any} d Data
*/
reject(d) {
for (let i in this.rejects) {
this.rejects[i](d);
}
this.resolves = [];
this.rejects = [];
}
/**
* Send buffer to the sender
*
*/
flushBuffer() {
if (this.bufferedSize <= 0) {
return;
}
if (this.timeout !== null) {
clearTimeout(this.timeout);
this.timeout = null;
}
this.resolve(true);
let d = this.buffered.slice(0, this.bufferedSize);
this.subscribe.resolve(d);
if (d.length >= this.buffered.length) {
this.buffered = new Uint8Array(this.maxSegSize);
this.bufferedSize = 0;
} else {
this.buffered = this.buffered.slice(d.length, this.buffered.length);
this.bufferedSize = 0;
}
}
/**
* Append buffer to internal data storage
*
* @param {Uint8Array} buf Buffer data
*/
appendBuffer(buf) {
let remain = this.buffered.length - this.bufferedSize;
if (remain <= 0) {
this.flushBuffer();
remain = this.buffered.length - this.bufferedSize;
}
let start = 0,
end = remain;
while (start < buf.length) {
if (end > buf.length) {
end = buf.length;
}
let d = buf.slice(start, end);
this.buffered.set(d, this.bufferedSize);
this.bufferedSize += d.length;
if (this.buffered.length >= this.bufferedSize) {
this.flushBuffer();
}
start += d.length;
end = start + (this.buffered.length - this.bufferedSize);
}
}
/**
@@ -176,21 +86,12 @@ export class Sender {
*
*/
send(data) {
let self = this;
return new Promise((resolve, reject) => {
self.resolves.push(resolve);
self.rejects.push(reject);
this.appendBuffer(data);
if (this.bufferedSize <= 0) {
return;
}
self.timeout = setTimeout(() => {
self.flushBuffer();
}, self.delay);
this.subscribe.resolve({
data: data,
resolve: resolve,
reject: reject
});
});
}
}