fix pubsub: stop

pull/8/head
Edouard Paris 5 years ago
parent fa4de73813
commit 8c77b09737

@ -2,6 +2,8 @@ package cli
import (
"context"
"os"
"os/signal"
cli "gopkg.in/urfave/cli.v2"
@ -53,16 +55,20 @@ func run(c *cli.Context) error {
return err
}
ctx := context.Background()
events := make(chan *events.Event)
ps := pubsub.New(app.Logger, app.Network)
go func() {
err := ui.Run(context.Background(), app, events)
err := ui.Run(ctx, app, events)
if err != nil {
app.Logger.Debug("ui", logging.String("error", err.Error()))
}
ps.Stop()
}()
pubsub.Run(context.Background(), app, events)
ps.Run(ctx, events)
return nil
}
@ -78,9 +84,15 @@ func pubsubRun(c *cli.Context) error {
}
events := make(chan *events.Event)
pubsub.Run(context.Background(), app, events)
//ev := <-events
//app.Logger.Info("events quit ", logging.String("type", ev.Type))
ps := pubsub.New(app.Logger, app.Network)
ps.Run(context.Background(), events)
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
go func() {
<-sig
ps.Stop()
}()
return nil
}

@ -2,27 +2,24 @@ package pubsub
import (
"context"
"os"
"os/signal"
"sync"
"time"
"github.com/edouardparis/lntop/app"
"github.com/edouardparis/lntop/events"
"github.com/edouardparis/lntop/logging"
"github.com/edouardparis/lntop/network"
"github.com/edouardparis/lntop/network/models"
)
type pubSub struct {
type PubSub struct {
stop chan bool
logger logging.Logger
network *network.Network
wg *sync.WaitGroup
}
func newPubSub(logger logging.Logger, network *network.Network) *pubSub {
return &pubSub{
func New(logger logging.Logger, network *network.Network) *PubSub {
return &PubSub{
logger: logger.With(logging.String("logger", "pubsub")),
network: network,
wg: &sync.WaitGroup{},
@ -30,7 +27,7 @@ func newPubSub(logger logging.Logger, network *network.Network) *pubSub {
}
}
func (p *pubSub) ticker(ctx context.Context, sub chan *events.Event) {
func (p *PubSub) ticker(ctx context.Context, sub chan *events.Event) {
p.wg.Add(1)
ticker := time.NewTicker(3 * time.Second)
go func() {
@ -73,7 +70,7 @@ func (p *pubSub) ticker(ctx context.Context, sub chan *events.Event) {
}()
}
func (p *pubSub) invoices(ctx context.Context, sub chan *events.Event) {
func (p *PubSub) invoices(ctx context.Context, sub chan *events.Event) {
p.wg.Add(3)
invoices := make(chan *models.Invoice)
ctx, cancel := context.WithCancel(ctx)
@ -106,25 +103,18 @@ func (p *pubSub) invoices(ctx context.Context, sub chan *events.Event) {
}()
}
func (p *pubSub) wait() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
p.wg.Add(1)
go func() {
sig := <-c
p.logger.Debug("Received signal, gracefully stopping", logging.String("sig", sig.String()))
p.stop <- true
close(p.stop)
p.wg.Done()
}()
p.wg.Wait()
func (p *PubSub) Stop() {
p.stop <- true
close(p.stop)
p.logger.Debug("Received signal, gracefully stopping")
}
func Run(ctx context.Context, app *app.App, sub chan *events.Event) {
pubSub := newPubSub(app.Logger, app.Network)
pubSub.logger.Debug("Starting...")
func (p *PubSub) Run(ctx context.Context, sub chan *events.Event) {
p.logger.Debug("Starting...")
p.invoices(ctx, sub)
p.ticker(ctx, sub)
pubSub.invoices(ctx, sub)
pubSub.ticker(ctx, sub)
pubSub.wait()
<-p.stop
p.wg.Wait()
}

Loading…
Cancel
Save