routerrpc: add quit channel to signal shutdown

This commit is contained in:
carla 2020-03-13 09:30:16 +02:00
parent 67d4bad73f
commit aa70d5d02a
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91

@ -9,6 +9,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sync/atomic"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
@ -32,6 +33,8 @@ const (
) )
var ( var (
errServerShuttingDown = errors.New("routerrpc server shutting down")
// macaroonOps are the set of capabilities that our minted macaroon (if // macaroonOps are the set of capabilities that our minted macaroon (if
// it doesn't already exist) will have. // it doesn't already exist) will have.
macaroonOps = []bakery.Op{ macaroonOps = []bakery.Op{
@ -90,7 +93,12 @@ var (
// Server is a stand alone sub RPC server which exposes functionality that // Server is a stand alone sub RPC server which exposes functionality that
// allows clients to route arbitrary payment through the Lightning Network. // allows clients to route arbitrary payment through the Lightning Network.
type Server struct { type Server struct {
started int32 // To be used atomically.
shutdown int32 // To be used atomically.
cfg *Config cfg *Config
quit chan struct{}
} }
// A compile time check to ensure that Server fully implements the RouterServer // A compile time check to ensure that Server fully implements the RouterServer
@ -150,7 +158,8 @@ func New(cfg *Config) (*Server, lnrpc.MacaroonPerms, error) {
} }
routerServer := &Server{ routerServer := &Server{
cfg: cfg, cfg: cfg,
quit: make(chan struct{}),
} }
return routerServer, macPermissions, nil return routerServer, macPermissions, nil
@ -160,6 +169,10 @@ func New(cfg *Config) (*Server, lnrpc.MacaroonPerms, error) {
// //
// NOTE: This is part of the lnrpc.SubServer interface. // NOTE: This is part of the lnrpc.SubServer interface.
func (s *Server) Start() error { func (s *Server) Start() error {
if atomic.AddInt32(&s.started, 1) != 1 {
return nil
}
return nil return nil
} }
@ -167,6 +180,11 @@ func (s *Server) Start() error {
// //
// NOTE: This is part of the lnrpc.SubServer interface. // NOTE: This is part of the lnrpc.SubServer interface.
func (s *Server) Stop() error { func (s *Server) Stop() error {
if atomic.AddInt32(&s.shutdown, 1) != 1 {
return nil
}
close(s.quit)
return nil return nil
} }
@ -513,6 +531,9 @@ func (s *Server) trackPayment(paymentHash lntypes.Hash,
return err return err
} }
case <-s.quit:
return errServerShuttingDown
case <-stream.Context().Done(): case <-stream.Context().Done():
log.Debugf("Payment status stream %v canceled", paymentHash) log.Debugf("Payment status stream %v canceled", paymentHash)
return stream.Context().Err() return stream.Context().Err()