lnd.xprv/discovery/reliable_sender.go

335 lines
10 KiB
Go

package discovery
import (
"sync"
"github.com/btcsuite/btcd/btcec"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
)
// reliableSenderCfg contains all of necessary items for the reliableSender to
// carry out its duties.
type reliableSenderCfg struct {
// NotifyWhenOnline is a function that allows the gossiper to be
// notified when a certain peer comes online, allowing it to
// retry sending a peer message.
//
// NOTE: The peerChan channel must be buffered.
//
// TODO(wilmer): use [33]byte to avoid unnecessary serializations.
NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer)
// NotifyWhenOffline is a function that allows the gossiper to be
// notified when a certain peer disconnects, allowing it to request a
// notification for when it reconnects.
NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
// MessageStore is a persistent storage of gossip messages which we will
// use to determine which messages need to be resent for a given peer.
MessageStore GossipMessageStore
// IsMsgStale determines whether a message retrieved from the backing
// MessageStore is seen as stale by the current graph.
IsMsgStale func(lnwire.Message) bool
}
// peerManager contains the set of channels required for the peerHandler to
// properly carry out its duties.
type peerManager struct {
// msgs is the channel through which messages will be streamed to the
// handler in order to send the message to the peer while they're
// online.
msgs chan lnwire.Message
// done is a channel that will be closed to signal that the handler for
// the given peer has been torn down for whatever reason.
done chan struct{}
}
// reliableSender is a small subsystem of the gossiper used to reliably send
// gossip messages to peers.
type reliableSender struct {
start sync.Once
stop sync.Once
cfg reliableSenderCfg
// activePeers keeps track of whether a peerHandler exists for a given
// peer. A peerHandler is tasked with handling requests for messages
// that should be reliably sent to peers while also taking into account
// the peer's connection lifecycle.
activePeers map[[33]byte]peerManager
activePeersMtx sync.Mutex
wg sync.WaitGroup
quit chan struct{}
}
// newReliableSender returns a new reliableSender backed by the given config.
func newReliableSender(cfg *reliableSenderCfg) *reliableSender {
return &reliableSender{
cfg: *cfg,
activePeers: make(map[[33]byte]peerManager),
quit: make(chan struct{}),
}
}
// Start spawns message handlers for any peers with pending messages.
func (s *reliableSender) Start() error {
var err error
s.start.Do(func() {
err = s.resendPendingMsgs()
})
return err
}
// Stop halts the reliable sender from sending messages to peers.
func (s *reliableSender) Stop() {
s.stop.Do(func() {
close(s.quit)
s.wg.Wait()
})
}
// sendMessage constructs a request to send a message reliably to a peer. In the
// event that the peer is currently offline, this will only write the message to
// disk. Once the peer reconnects, this message, along with any others pending,
// will be sent to the peer.
func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) error {
// We'll start by persisting the message to disk. This allows us to
// resend the message upon restarts and peer reconnections.
if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil {
return err
}
// Then, we'll spawn a peerHandler for this peer to handle resending its
// pending messages while taking into account its connection lifecycle.
spawnHandler:
msgHandler, ok := s.spawnPeerHandler(peerPubKey)
// If the handler wasn't previously active, we can exit now as we know
// that the message will be sent once the peer online notification is
// received. This prevents us from potentially sending the message
// twice.
if !ok {
return nil
}
// Otherwise, we'll attempt to stream the message to the handler.
// There's a subtle race condition where the handler can be torn down
// due to all of the messages sent being stale, so we'll handle this
// gracefully by spawning another one to prevent blocking.
select {
case msgHandler.msgs <- msg:
case <-msgHandler.done:
goto spawnHandler
case <-s.quit:
return ErrGossiperShuttingDown
}
return nil
}
// spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't
// one already active. The boolean returned signals whether there was already
// one active or not.
func (s *reliableSender) spawnPeerHandler(peerPubKey [33]byte) (peerManager, bool) {
s.activePeersMtx.Lock()
defer s.activePeersMtx.Unlock()
msgHandler, ok := s.activePeers[peerPubKey]
if !ok {
msgHandler = peerManager{
msgs: make(chan lnwire.Message),
done: make(chan struct{}),
}
s.activePeers[peerPubKey] = msgHandler
s.wg.Add(1)
go s.peerHandler(msgHandler, peerPubKey)
}
return msgHandler, ok
}
// peerHandler is responsible for handling our reliable message send requests
// for a given peer while also taking into account the peer's connection
// lifecycle. Any messages that are attempted to be sent while the peer is
// offline will be queued and sent once the peer reconnects.
//
// NOTE: This must be run as a goroutine.
func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) {
defer s.wg.Done()
// We'll start by requesting a notification for when the peer
// reconnects.
pubKey, _ := btcec.ParsePubKey(peerPubKey[:], btcec.S256())
peerChan := make(chan lnpeer.Peer, 1)
waitUntilOnline:
log.Debugf("Requesting online notification for peer=%x", peerPubKey)
s.cfg.NotifyWhenOnline(pubKey, peerChan)
var peer lnpeer.Peer
out:
for {
select {
// While we're waiting, we'll also consume any messages that
// must be sent to prevent blocking the caller. These can be
// ignored for now since the peer is currently offline. Once
// they reconnect, the messages will be sent since they should
// have been persisted to disk.
case msg := <-peerMgr.msgs:
// Retrieve the short channel ID for which this message
// applies for logging purposes. The error can be
// ignored as the store can only contain messages which
// have a ShortChannelID field.
shortChanID, _ := msgShortChanID(msg)
log.Debugf("Received request to send %v message for "+
"channel=%v while peer=%x is offline",
msg.MsgType(), shortChanID, peerPubKey)
case peer = <-peerChan:
break out
case <-s.quit:
return
}
}
log.Debugf("Peer=%x is now online, proceeding to send pending messages",
peerPubKey)
// Once we detect the peer has reconnected, we'll also request a
// notification for when they disconnect. We'll use this to make sure
// they haven't disconnected (in the case of a flappy peer, etc.) by the
// time we attempt to send them the pending messages.
log.Debugf("Requesting offline notification for peer=%x", peerPubKey)
offlineChan := s.cfg.NotifyWhenOffline(peerPubKey)
pendingMsgs, err := s.cfg.MessageStore.MessagesForPeer(peerPubKey)
if err != nil {
log.Errorf("Unable to retrieve pending messages for peer %x: %v",
peerPubKey, err)
return
}
// With the peer online, we can now proceed to send our pending messages
// for them.
for _, msg := range pendingMsgs {
// Retrieve the short channel ID for which this message applies
// for logging purposes. The error can be ignored as the store
// can only contain messages which have a ShortChannelID field.
shortChanID, _ := msgShortChanID(msg)
// Ensure the peer is still online right before sending the
// message.
select {
case <-offlineChan:
goto waitUntilOnline
default:
}
if err := peer.SendMessage(false, msg); err != nil {
log.Errorf("Unable to send %v message for channel=%v "+
"to %x: %v", msg.MsgType(), shortChanID,
peerPubKey, err)
goto waitUntilOnline
}
log.Debugf("Successfully sent %v message for channel=%v with "+
"peer=%x upon reconnection", msg.MsgType(), shortChanID,
peerPubKey)
// Now that the message has at least been sent once, we can
// check whether it's stale. This guarantees that
// AnnounceSignatures are sent at least once if we happen to
// already have signatures for both parties.
if s.cfg.IsMsgStale(msg) {
err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey)
if err != nil {
log.Errorf("Unable to remove stale %v message "+
"for channel=%v with peer %x: %v",
msg.MsgType(), shortChanID, peerPubKey,
err)
continue
}
log.Debugf("Removed stale %v message for channel=%v "+
"with peer=%x", msg.MsgType(), shortChanID,
peerPubKey)
}
}
// If all of our messages were stale, then there's no need for this
// handler to continue running, so we can exit now.
pendingMsgs, err = s.cfg.MessageStore.MessagesForPeer(peerPubKey)
if err != nil {
log.Errorf("Unable to retrieve pending messages for peer %x: %v",
peerPubKey, err)
return
}
if len(pendingMsgs) == 0 {
log.Debugf("No pending messages left for peer=%x", peerPubKey)
s.activePeersMtx.Lock()
delete(s.activePeers, peerPubKey)
s.activePeersMtx.Unlock()
close(peerMgr.done)
return
}
// Once the pending messages are sent, we can continue to send any
// future messages while the peer remains connected.
for {
select {
case msg := <-peerMgr.msgs:
// Retrieve the short channel ID for which this message
// applies for logging purposes. The error can be
// ignored as the store can only contain messages which
// have a ShortChannelID field.
shortChanID, _ := msgShortChanID(msg)
if err := peer.SendMessage(false, msg); err != nil {
log.Errorf("Unable to send %v message for "+
"channel=%v to %x: %v", msg.MsgType(),
shortChanID, peerPubKey, err)
}
log.Debugf("Successfully sent %v message for "+
"channel=%v with peer=%x", msg.MsgType(),
shortChanID, peerPubKey)
case <-offlineChan:
goto waitUntilOnline
case <-s.quit:
return
}
}
}
// resendPendingMsgs retrieves and sends all of the messages within the message
// store that should be reliably sent to their respective peers.
func (s *reliableSender) resendPendingMsgs() error {
// Fetch all of the peers for which we have pending messages for and
// spawn a peerMsgHandler for each. Once the peer is seen as online, all
// of the pending messages will be sent.
peers, err := s.cfg.MessageStore.Peers()
if err != nil {
return err
}
for peer := range peers {
s.spawnPeerHandler(peer)
}
return nil
}