lnd.xprv/discovery/reliable_sender.go
Wilmer Paulino 2f679f6015
discovery/reliable_sender: implement message-agnostic reliable sender
In this commit, we implement a new subsystem for the gossiper that
uses some of the existing logic for resending channel announcement
signatures and implements it in a way to make it message-agnostic,
meaning that any type of message can be resent. Along the way we also
modify the way this works to prevent multiple goroutines per peer _and_
message.

A peerHandler will be spawned for each peer for which we attempt to send
a message reliably to. This handler is responsible for managing requests
to reliably send messages to a peer while also taking the peer's
connection lifecycle into account by requesting notifications for when
the peer connects/disconnects. A peer connection notification is first
requested to determine when we should attempt to send any pending
messages. After the messages are sent, a peer disconnection notification
is requested to ensure we don't continue to request connection
notifications while the peer remains connected. Once there are no more
pending messages left to be sent for a given peer, the peerHandler can
be torn down.
2019-02-14 18:33:27 -08:00

317 lines
9.8 KiB
Go

package discovery
import (
"sync"
"github.com/btcsuite/btcd/btcec"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
)
// reliableSenderCfg contains all of necessary items for the reliableSender to
// carry out its duties.
type reliableSenderCfg struct {
// NotifyWhenOnline is a function that allows the gossiper to be
// notified when a certain peer comes online, allowing it to
// retry sending a peer message.
//
// NOTE: The peerChan channel must be buffered.
//
// TODO(wilmer): use [33]byte to avoid unnecessary serializations.
NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer)
// NotifyWhenOffline is a function that allows the gossiper to be
// notified when a certain peer disconnects, allowing it to request a
// notification for when it reconnects.
NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
// MessageStore is a persistent storage of gossip messages which we will
// use to determine which messages need to be resent for a given peer.
MessageStore GossipMessageStore
// IsMsgStale determines whether a message retrieved from the backing
// MessageStore is seen as stale by the current graph.
IsMsgStale func(lnwire.Message) bool
}
// peerManager contains the set of channels required for the peerHandler to
// properly carry out its duties.
type peerManager struct {
// msgs is the channel through which messages will be streamed to the
// handler in order to send the message to the peer while they're
// online.
msgs chan lnwire.Message
// done is a channel that will be closed to signal that the handler for
// the given peer has been torn down for whatever reason.
done chan struct{}
}
// reliableSender is a small subsystem of the gossiper used to reliably send
// gossip messages to peers.
type reliableSender struct {
start sync.Once
stop sync.Once
cfg reliableSenderCfg
// activePeers keeps track of whether a peerHandler exists for a given
// peer. A peerHandler is tasked with handling requests for messages
// that should be reliably sent to peers while also taking into account
// the peer's connection lifecycle.
activePeers map[[33]byte]peerManager
activePeersMtx sync.Mutex
wg sync.WaitGroup
quit chan struct{}
}
// newReliableSender returns a new reliableSender backed by the given config.
func newReliableSender(cfg *reliableSenderCfg) *reliableSender {
return &reliableSender{
cfg: *cfg,
activePeers: make(map[[33]byte]peerManager),
quit: make(chan struct{}),
}
}
// Start spawns message handlers for any peers with pending messages.
func (s *reliableSender) Start() error {
var err error
s.start.Do(func() {
err = s.resendPendingMsgs()
})
return err
}
// Stop halts the reliable sender from sending messages to peers.
func (s *reliableSender) Stop() {
s.stop.Do(func() {
close(s.quit)
s.wg.Wait()
})
}
// sendMessage constructs a request to send a message reliably to a peer. In the
// event that the peer is currently offline, this will only write the message to
// disk. Once the peer reconnects, this message, along with any others pending,
// will be sent to the peer.
func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) error {
// We'll start by persisting the message to disk. This allows us to
// resend the message upon restarts and peer reconnections.
if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil {
return err
}
// Then, we'll spawn a peerHandler for this peer to handle resending its
// pending messages while taking into account its connection lifecycle.
spawnHandler:
msgHandler, ok := s.spawnPeerHandler(peerPubKey)
// If the handler wasn't previously active, we can exit now as we know
// that the message will be sent once the peer online notification is
// received. This prevents us from potentially sending the message
// twice.
if !ok {
return nil
}
// Otherwise, we'll attempt to stream the message to the handler.
// There's a subtle race condition where the handler can be torn down
// due to all of the messages sent being stale, so we'll handle this
// gracefully by spawning another one to prevent blocking.
select {
case msgHandler.msgs <- msg:
case <-msgHandler.done:
goto spawnHandler
case <-s.quit:
return ErrGossiperShuttingDown
}
return nil
}
// spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't
// one already active. The boolean returned signals whether there was already
// one active or not.
func (s *reliableSender) spawnPeerHandler(peerPubKey [33]byte) (peerManager, bool) {
s.activePeersMtx.Lock()
defer s.activePeersMtx.Unlock()
msgHandler, ok := s.activePeers[peerPubKey]
if !ok {
msgHandler = peerManager{
msgs: make(chan lnwire.Message),
done: make(chan struct{}),
}
s.activePeers[peerPubKey] = msgHandler
s.wg.Add(1)
go s.peerHandler(msgHandler, peerPubKey)
}
return msgHandler, ok
}
// peerHandler is responsible for handling our reliable message send requests
// for a given peer while also taking into account the peer's connection
// lifecycle. Any messages that are attempted to be sent while the peer is
// offline will be queued and sent once the peer reconnects.
//
// NOTE: This must be run as a goroutine.
func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) {
defer s.wg.Done()
// We'll start by requesting a notification for when the peer
// reconnects.
pubKey, _ := btcec.ParsePubKey(peerPubKey[:], btcec.S256())
peerChan := make(chan lnpeer.Peer, 1)
waitUntilOnline:
log.Debugf("Requesting online notification for peer=%x", peerPubKey)
s.cfg.NotifyWhenOnline(pubKey, peerChan)
var peer lnpeer.Peer
out:
for {
select {
// While we're waiting, we'll also consume any messages that
// must be sent to prevent blocking the caller. These can be
// ignored for now since the peer is currently offline. Once
// they reconnect, the messages will be sent since they should
// have been persisted to disk.
case <-peerMgr.msgs:
case peer = <-peerChan:
break out
case <-s.quit:
return
}
}
log.Debugf("Peer=%x is now online, proceeding to send pending messages",
peerPubKey)
// Once we detect the peer has reconnected, we'll also request a
// notification for when they disconnect. We'll use this to make sure
// they haven't disconnected (in the case of a flappy peer, etc.) by the
// time we attempt to send them the pending messages.
log.Debugf("Requesting offline notification for peer=%x", peerPubKey)
offlineChan := s.cfg.NotifyWhenOffline(peerPubKey)
pendingMsgs, err := s.cfg.MessageStore.MessagesForPeer(peerPubKey)
if err != nil {
log.Errorf("Unable to retrieve pending messages for peer %x: %v",
peerPubKey, err)
return
}
// With the peer online, we can now proceed to send our pending messages
// for them.
for _, msg := range pendingMsgs {
// Retrieve the short channel ID for which this message applies
// for logging purposes. The error can be ignored as the store
// can only contain messages which have a ShortChannelID field.
shortChanID, _ := msgShortChanID(msg)
if err := peer.SendMessage(false, msg); err != nil {
log.Errorf("Unable to send %v message for channel=%v "+
"to %x: %v", msg.MsgType(), shortChanID,
peerPubKey, err)
goto waitUntilOnline
}
log.Debugf("Successfully sent %v message for channel=%v with "+
"peer=%x upon reconnection", msg.MsgType(), shortChanID,
peerPubKey)
// Now that the message has at least been sent once, we can
// check whether it's stale. This guarantees that
// AnnounceSignatures are sent at least once if we happen to
// already have signatures for both parties.
if s.cfg.IsMsgStale(msg) {
err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey)
if err != nil {
log.Errorf("Unable to remove stale %v message "+
"for channel=%v with peer %x: %v",
msg.MsgType(), shortChanID, peerPubKey,
err)
continue
}
log.Debugf("Removed stale %v message for channel=%v "+
"with peer=%x", msg.MsgType(), shortChanID,
peerPubKey)
}
}
// If all of our messages were stale, then there's no need for this
// handler to continue running, so we can exit now.
pendingMsgs, err = s.cfg.MessageStore.MessagesForPeer(peerPubKey)
if err != nil {
log.Errorf("Unable to retrieve pending messages for peer %x: %v",
peerPubKey, err)
return
}
if len(pendingMsgs) == 0 {
log.Debugf("No pending messages left for peer=%x", peerPubKey)
s.activePeersMtx.Lock()
delete(s.activePeers, peerPubKey)
s.activePeersMtx.Unlock()
close(peerMgr.done)
return
}
// Once the pending messages are sent, we can continue to send any
// future messages while the peer remains connected.
for {
select {
case msg := <-peerMgr.msgs:
// Retrieve the short channel ID for which this message
// applies for logging purposes. The error can be
// ignored as the store can only contain messages which
// have a ShortChannelID field.
shortChanID, _ := msgShortChanID(msg)
if err := peer.SendMessage(false, msg); err != nil {
log.Errorf("Unable to send %v message for "+
"channel=%v to %x: %v", msg.MsgType(),
shortChanID, peerPubKey, err)
}
log.Debugf("Successfully sent %v message for "+
"channel=%v with peer=%x", msg.MsgType(),
shortChanID, peerPubKey)
case <-offlineChan:
goto waitUntilOnline
case <-s.quit:
return
}
}
}
// resendPendingMsgs retrieves and sends all of the messages within the message
// store that should be reliably sent to their respective peers.
func (s *reliableSender) resendPendingMsgs() error {
// Fetch all of the peers for which we have pending messages for and
// spawn a peerMsgHandler for each. Once the peer is seen as online, all
// of the pending messages will be sent.
peers, err := s.cfg.MessageStore.Peers()
if err != nil {
return err
}
for peer := range peers {
s.spawnPeerHandler(peer)
}
return nil
}