diff --git a/application/command/handler.go b/application/command/handler.go index 95a7049..403d25c 100644 --- a/application/command/handler.go +++ b/application/command/handler.go @@ -51,12 +51,31 @@ type handlerBuf [handlerReadBufLen]byte // handlerSender writes handler signal type handlerSender struct { - writer io.Writer - lock *sync.Mutex + writer io.Writer + lock *sync.Mutex + needWait bool + sign *sync.Cond +} + +// pause pauses sending +func (h *handlerSender) pause() { + h.lock.Lock() + defer h.lock.Unlock() + + h.needWait = true +} + +// resume resumes sending +func (h *handlerSender) resume() { + h.lock.Lock() + defer h.lock.Unlock() + + h.needWait = false + h.sign.Broadcast() } // signal sends handler signal -func (h handlerSender) signal(hd Header, d []byte, buf []byte) error { +func (h *handlerSender) signal(hd Header, d []byte, buf []byte) error { bufLen := len(buf) dLen := len(d) @@ -75,10 +94,14 @@ func (h handlerSender) signal(hd Header, d []byte, buf []byte) error { } // Write sends data -func (h handlerSender) Write(b []byte) (int, error) { +func (h *handlerSender) Write(b []byte) (int, error) { h.lock.Lock() defer h.lock.Unlock() + for h.needWait { + h.sign.Wait() + } + return h.writer.Write(b) } @@ -127,10 +150,15 @@ func newHandler( l log.Logger, ) Handler { return Handler{ - cfg: cfg, - commands: commands, - receiver: receiver, - sender: handlerSender{writer: sender, lock: senderLock}, + cfg: cfg, + commands: commands, + receiver: receiver, + sender: handlerSender{ + writer: sender, + lock: senderLock, + needWait: false, + sign: sync.NewCond(senderLock), + }, senderPaused: false, receiveDelay: receiveDelay, sendDelay: sendDelay, @@ -177,6 +205,9 @@ func (e *Handler) handleControl(d byte, l log.Logger) error { if !e.senderPaused { _, wErr = e.sender.Write(e.rBuf[:rLen+1]) } else { + e.sender.lock.Lock() + defer e.sender.lock.Unlock() + _, wErr = e.sender.writer.Write(e.rBuf[:rLen+1]) } @@ -184,13 +215,13 @@ func (e *Handler) handleControl(d byte, l log.Logger) error { case HeaderControlPauseStream: if !e.senderPaused { - e.sender.lock.Lock() + e.sender.pause() e.senderPaused = true } case HeaderControlResumeStream: if e.senderPaused { - e.sender.lock.Unlock() + e.sender.resume() e.senderPaused = false } } @@ -227,8 +258,8 @@ func (e *Handler) handleStream(h Header, d byte, l log.Logger) error { l.Debug("Start stream %d", h.Data()) if e.senderPaused { - e.sender.lock.Unlock() - defer e.sender.lock.Lock() + e.sender.resume() + defer e.sender.pause() } return st.reinit(h, &e.receiver, streamHandlerSender{ @@ -245,8 +276,8 @@ func (e *Handler) handleClose(h Header, d byte, l log.Logger) error { } if e.senderPaused { - e.sender.lock.Unlock() - defer e.sender.lock.Lock() + e.sender.resume() + defer e.sender.pause() } cErr := st.close() @@ -269,8 +300,8 @@ func (e *Handler) handleCompleted(d byte, l log.Logger) error { } if e.senderPaused { - e.sender.lock.Unlock() - defer e.sender.lock.Lock() + e.sender.resume() + defer e.sender.pause() } return st.release() @@ -280,7 +311,7 @@ func (e *Handler) handleCompleted(d byte, l log.Logger) error { func (e *Handler) Handle() error { defer func() { if e.senderPaused { - e.sender.lock.Unlock() + e.sender.resume() e.senderPaused = false }