|
|
|
@ -113,22 +113,27 @@ func (sm *StreamManager) Stop() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (sm *StreamManager) connect() error {
|
|
|
|
|
if sm.client != nil {
|
|
|
|
|
if c, ok := sm.client.(*Client); ok {
|
|
|
|
|
if c.CurrentState.getState() == StateDisconnected {
|
|
|
|
|
sm.Metrics = initMetrics()
|
|
|
|
|
err := c.Connect()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if sm.PostConnect != nil {
|
|
|
|
|
sm.PostConnect(sm.client)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
if sm.client == nil {
|
|
|
|
|
return errors.New("client is not set")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if c, ok := sm.client.(*Client); ok {
|
|
|
|
|
if c.CurrentState.getState() != StateDisconnected {
|
|
|
|
|
return errors.New("client is not disconnected")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return errors.New("client is not disconnected")
|
|
|
|
|
|
|
|
|
|
sm.Metrics = initMetrics()
|
|
|
|
|
|
|
|
|
|
if err := sm.client.Connect(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if sm.PostConnect != nil {
|
|
|
|
|
sm.PostConnect(sm.client)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// resume manages the reconnection loop and apply the define backoff to avoid overloading the server.
|
|
|
|
|