More robust way to handle write pause and resume

This commit is contained in:
NI
2019-09-16 23:21:02 +08:00
parent db692d5eb4
commit 90db2755cd

View File

@@ -51,12 +51,31 @@ type handlerBuf [handlerReadBufLen]byte
// handlerSender writes handler signal // handlerSender writes handler signal
type handlerSender struct { type handlerSender struct {
writer io.Writer writer io.Writer
lock *sync.Mutex lock *sync.Mutex
needWait bool
sign *sync.Cond
}
// pause pauses sending
func (h *handlerSender) pause() {
h.lock.Lock()
defer h.lock.Unlock()
h.needWait = true
}
// resume resumes sending
func (h *handlerSender) resume() {
h.lock.Lock()
defer h.lock.Unlock()
h.needWait = false
h.sign.Broadcast()
} }
// signal sends handler signal // signal sends handler signal
func (h handlerSender) signal(hd Header, d []byte, buf []byte) error { func (h *handlerSender) signal(hd Header, d []byte, buf []byte) error {
bufLen := len(buf) bufLen := len(buf)
dLen := len(d) dLen := len(d)
@@ -75,10 +94,14 @@ func (h handlerSender) signal(hd Header, d []byte, buf []byte) error {
} }
// Write sends data // Write sends data
func (h handlerSender) Write(b []byte) (int, error) { func (h *handlerSender) Write(b []byte) (int, error) {
h.lock.Lock() h.lock.Lock()
defer h.lock.Unlock() defer h.lock.Unlock()
for h.needWait {
h.sign.Wait()
}
return h.writer.Write(b) return h.writer.Write(b)
} }
@@ -127,10 +150,15 @@ func newHandler(
l log.Logger, l log.Logger,
) Handler { ) Handler {
return Handler{ return Handler{
cfg: cfg, cfg: cfg,
commands: commands, commands: commands,
receiver: receiver, receiver: receiver,
sender: handlerSender{writer: sender, lock: senderLock}, sender: handlerSender{
writer: sender,
lock: senderLock,
needWait: false,
sign: sync.NewCond(senderLock),
},
senderPaused: false, senderPaused: false,
receiveDelay: receiveDelay, receiveDelay: receiveDelay,
sendDelay: sendDelay, sendDelay: sendDelay,
@@ -177,6 +205,9 @@ func (e *Handler) handleControl(d byte, l log.Logger) error {
if !e.senderPaused { if !e.senderPaused {
_, wErr = e.sender.Write(e.rBuf[:rLen+1]) _, wErr = e.sender.Write(e.rBuf[:rLen+1])
} else { } else {
e.sender.lock.Lock()
defer e.sender.lock.Unlock()
_, wErr = e.sender.writer.Write(e.rBuf[:rLen+1]) _, wErr = e.sender.writer.Write(e.rBuf[:rLen+1])
} }
@@ -184,13 +215,13 @@ func (e *Handler) handleControl(d byte, l log.Logger) error {
case HeaderControlPauseStream: case HeaderControlPauseStream:
if !e.senderPaused { if !e.senderPaused {
e.sender.lock.Lock() e.sender.pause()
e.senderPaused = true e.senderPaused = true
} }
case HeaderControlResumeStream: case HeaderControlResumeStream:
if e.senderPaused { if e.senderPaused {
e.sender.lock.Unlock() e.sender.resume()
e.senderPaused = false e.senderPaused = false
} }
} }
@@ -227,8 +258,8 @@ func (e *Handler) handleStream(h Header, d byte, l log.Logger) error {
l.Debug("Start stream %d", h.Data()) l.Debug("Start stream %d", h.Data())
if e.senderPaused { if e.senderPaused {
e.sender.lock.Unlock() e.sender.resume()
defer e.sender.lock.Lock() defer e.sender.pause()
} }
return st.reinit(h, &e.receiver, streamHandlerSender{ return st.reinit(h, &e.receiver, streamHandlerSender{
@@ -245,8 +276,8 @@ func (e *Handler) handleClose(h Header, d byte, l log.Logger) error {
} }
if e.senderPaused { if e.senderPaused {
e.sender.lock.Unlock() e.sender.resume()
defer e.sender.lock.Lock() defer e.sender.pause()
} }
cErr := st.close() cErr := st.close()
@@ -269,8 +300,8 @@ func (e *Handler) handleCompleted(d byte, l log.Logger) error {
} }
if e.senderPaused { if e.senderPaused {
e.sender.lock.Unlock() e.sender.resume()
defer e.sender.lock.Lock() defer e.sender.pause()
} }
return st.release() return st.release()
@@ -280,7 +311,7 @@ func (e *Handler) handleCompleted(d byte, l log.Logger) error {
func (e *Handler) Handle() error { func (e *Handler) Handle() error {
defer func() { defer func() {
if e.senderPaused { if e.senderPaused {
e.sender.lock.Unlock() e.sender.resume()
e.senderPaused = false e.senderPaused = false
} }