|
|
@ -190,10 +190,10 @@ func (s *swapClientServer) Monitor(in *looprpc.MonitorRequest,
|
|
|
|
s.swapsLock.Unlock()
|
|
|
|
s.swapsLock.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
defer func() {
|
|
|
|
queue.Stop()
|
|
|
|
|
|
|
|
s.swapsLock.Lock()
|
|
|
|
s.swapsLock.Lock()
|
|
|
|
delete(s.subscribers, id)
|
|
|
|
delete(s.subscribers, id)
|
|
|
|
s.swapsLock.Unlock()
|
|
|
|
s.swapsLock.Unlock()
|
|
|
|
|
|
|
|
queue.Stop()
|
|
|
|
}()
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
// Sort completed swaps new to old.
|
|
|
|
// Sort completed swaps new to old.
|
|
|
@ -459,6 +459,7 @@ func (s *swapClientServer) processStatusUpdates(mainCtx context.Context) {
|
|
|
|
select {
|
|
|
|
select {
|
|
|
|
case subscriber <- swp:
|
|
|
|
case subscriber <- swp:
|
|
|
|
case <-mainCtx.Done():
|
|
|
|
case <-mainCtx.Done():
|
|
|
|
|
|
|
|
s.swapsLock.Unlock()
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|