You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
330 lines
10 KiB
330 lines
10 KiB
package discovery |
|
|
|
import ( |
|
"sync" |
|
|
|
"github.com/lightningnetwork/lnd/lnpeer" |
|
"github.com/lightningnetwork/lnd/lnwire" |
|
) |
|
|
|
// reliableSenderCfg contains all of necessary items for the reliableSender to |
|
// carry out its duties. |
|
type reliableSenderCfg struct { |
|
// NotifyWhenOnline is a function that allows the gossiper to be |
|
// notified when a certain peer comes online, allowing it to |
|
// retry sending a peer message. |
|
// |
|
// NOTE: The peerChan channel must be buffered. |
|
NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer) |
|
|
|
// NotifyWhenOffline is a function that allows the gossiper to be |
|
// notified when a certain peer disconnects, allowing it to request a |
|
// notification for when it reconnects. |
|
NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{} |
|
|
|
// MessageStore is a persistent storage of gossip messages which we will |
|
// use to determine which messages need to be resent for a given peer. |
|
MessageStore GossipMessageStore |
|
|
|
// IsMsgStale determines whether a message retrieved from the backing |
|
// MessageStore is seen as stale by the current graph. |
|
IsMsgStale func(lnwire.Message) bool |
|
} |
|
|
|
// peerManager contains the set of channels required for the peerHandler to |
|
// properly carry out its duties. |
|
type peerManager struct { |
|
// msgs is the channel through which messages will be streamed to the |
|
// handler in order to send the message to the peer while they're |
|
// online. |
|
msgs chan lnwire.Message |
|
|
|
// done is a channel that will be closed to signal that the handler for |
|
// the given peer has been torn down for whatever reason. |
|
done chan struct{} |
|
} |
|
|
|
// reliableSender is a small subsystem of the gossiper used to reliably send |
|
// gossip messages to peers. |
|
type reliableSender struct { |
|
start sync.Once |
|
stop sync.Once |
|
|
|
cfg reliableSenderCfg |
|
|
|
// activePeers keeps track of whether a peerHandler exists for a given |
|
// peer. A peerHandler is tasked with handling requests for messages |
|
// that should be reliably sent to peers while also taking into account |
|
// the peer's connection lifecycle. |
|
activePeers map[[33]byte]peerManager |
|
activePeersMtx sync.Mutex |
|
|
|
wg sync.WaitGroup |
|
quit chan struct{} |
|
} |
|
|
|
// newReliableSender returns a new reliableSender backed by the given config. |
|
func newReliableSender(cfg *reliableSenderCfg) *reliableSender { |
|
return &reliableSender{ |
|
cfg: *cfg, |
|
activePeers: make(map[[33]byte]peerManager), |
|
quit: make(chan struct{}), |
|
} |
|
} |
|
|
|
// Start spawns message handlers for any peers with pending messages. |
|
func (s *reliableSender) Start() error { |
|
var err error |
|
s.start.Do(func() { |
|
err = s.resendPendingMsgs() |
|
}) |
|
return err |
|
} |
|
|
|
// Stop halts the reliable sender from sending messages to peers. |
|
func (s *reliableSender) Stop() { |
|
s.stop.Do(func() { |
|
close(s.quit) |
|
s.wg.Wait() |
|
}) |
|
} |
|
|
|
// sendMessage constructs a request to send a message reliably to a peer. In the |
|
// event that the peer is currently offline, this will only write the message to |
|
// disk. Once the peer reconnects, this message, along with any others pending, |
|
// will be sent to the peer. |
|
func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) error { |
|
// We'll start by persisting the message to disk. This allows us to |
|
// resend the message upon restarts and peer reconnections. |
|
if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil { |
|
return err |
|
} |
|
|
|
// Then, we'll spawn a peerHandler for this peer to handle resending its |
|
// pending messages while taking into account its connection lifecycle. |
|
spawnHandler: |
|
msgHandler, ok := s.spawnPeerHandler(peerPubKey) |
|
|
|
// If the handler wasn't previously active, we can exit now as we know |
|
// that the message will be sent once the peer online notification is |
|
// received. This prevents us from potentially sending the message |
|
// twice. |
|
if !ok { |
|
return nil |
|
} |
|
|
|
// Otherwise, we'll attempt to stream the message to the handler. |
|
// There's a subtle race condition where the handler can be torn down |
|
// due to all of the messages sent being stale, so we'll handle this |
|
// gracefully by spawning another one to prevent blocking. |
|
select { |
|
case msgHandler.msgs <- msg: |
|
case <-msgHandler.done: |
|
goto spawnHandler |
|
case <-s.quit: |
|
return ErrGossiperShuttingDown |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't |
|
// one already active. The boolean returned signals whether there was already |
|
// one active or not. |
|
func (s *reliableSender) spawnPeerHandler(peerPubKey [33]byte) (peerManager, bool) { |
|
s.activePeersMtx.Lock() |
|
defer s.activePeersMtx.Unlock() |
|
|
|
msgHandler, ok := s.activePeers[peerPubKey] |
|
if !ok { |
|
msgHandler = peerManager{ |
|
msgs: make(chan lnwire.Message), |
|
done: make(chan struct{}), |
|
} |
|
s.activePeers[peerPubKey] = msgHandler |
|
|
|
s.wg.Add(1) |
|
go s.peerHandler(msgHandler, peerPubKey) |
|
} |
|
|
|
return msgHandler, ok |
|
} |
|
|
|
// peerHandler is responsible for handling our reliable message send requests |
|
// for a given peer while also taking into account the peer's connection |
|
// lifecycle. Any messages that are attempted to be sent while the peer is |
|
// offline will be queued and sent once the peer reconnects. |
|
// |
|
// NOTE: This must be run as a goroutine. |
|
func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) { |
|
defer s.wg.Done() |
|
|
|
// We'll start by requesting a notification for when the peer |
|
// reconnects. |
|
peerChan := make(chan lnpeer.Peer, 1) |
|
|
|
waitUntilOnline: |
|
log.Debugf("Requesting online notification for peer=%x", peerPubKey) |
|
|
|
s.cfg.NotifyWhenOnline(peerPubKey, peerChan) |
|
|
|
var peer lnpeer.Peer |
|
out: |
|
for { |
|
select { |
|
// While we're waiting, we'll also consume any messages that |
|
// must be sent to prevent blocking the caller. These can be |
|
// ignored for now since the peer is currently offline. Once |
|
// they reconnect, the messages will be sent since they should |
|
// have been persisted to disk. |
|
case msg := <-peerMgr.msgs: |
|
// Retrieve the short channel ID for which this message |
|
// applies for logging purposes. The error can be |
|
// ignored as the store can only contain messages which |
|
// have a ShortChannelID field. |
|
shortChanID, _ := msgShortChanID(msg) |
|
log.Debugf("Received request to send %v message for "+ |
|
"channel=%v while peer=%x is offline", |
|
msg.MsgType(), shortChanID, peerPubKey) |
|
|
|
case peer = <-peerChan: |
|
break out |
|
|
|
case <-s.quit: |
|
return |
|
} |
|
} |
|
|
|
log.Debugf("Peer=%x is now online, proceeding to send pending messages", |
|
peerPubKey) |
|
|
|
// Once we detect the peer has reconnected, we'll also request a |
|
// notification for when they disconnect. We'll use this to make sure |
|
// they haven't disconnected (in the case of a flappy peer, etc.) by the |
|
// time we attempt to send them the pending messages. |
|
log.Debugf("Requesting offline notification for peer=%x", peerPubKey) |
|
|
|
offlineChan := s.cfg.NotifyWhenOffline(peerPubKey) |
|
|
|
pendingMsgs, err := s.cfg.MessageStore.MessagesForPeer(peerPubKey) |
|
if err != nil { |
|
log.Errorf("Unable to retrieve pending messages for peer %x: %v", |
|
peerPubKey, err) |
|
return |
|
} |
|
|
|
// With the peer online, we can now proceed to send our pending messages |
|
// for them. |
|
for _, msg := range pendingMsgs { |
|
// Retrieve the short channel ID for which this message applies |
|
// for logging purposes. The error can be ignored as the store |
|
// can only contain messages which have a ShortChannelID field. |
|
shortChanID, _ := msgShortChanID(msg) |
|
|
|
// Ensure the peer is still online right before sending the |
|
// message. |
|
select { |
|
case <-offlineChan: |
|
goto waitUntilOnline |
|
default: |
|
} |
|
|
|
if err := peer.SendMessage(false, msg); err != nil { |
|
log.Errorf("Unable to send %v message for channel=%v "+ |
|
"to %x: %v", msg.MsgType(), shortChanID, |
|
peerPubKey, err) |
|
goto waitUntilOnline |
|
} |
|
|
|
log.Debugf("Successfully sent %v message for channel=%v with "+ |
|
"peer=%x upon reconnection", msg.MsgType(), shortChanID, |
|
peerPubKey) |
|
|
|
// Now that the message has at least been sent once, we can |
|
// check whether it's stale. This guarantees that |
|
// AnnounceSignatures are sent at least once if we happen to |
|
// already have signatures for both parties. |
|
if s.cfg.IsMsgStale(msg) { |
|
err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey) |
|
if err != nil { |
|
log.Errorf("Unable to remove stale %v message "+ |
|
"for channel=%v with peer %x: %v", |
|
msg.MsgType(), shortChanID, peerPubKey, |
|
err) |
|
continue |
|
} |
|
|
|
log.Debugf("Removed stale %v message for channel=%v "+ |
|
"with peer=%x", msg.MsgType(), shortChanID, |
|
peerPubKey) |
|
} |
|
} |
|
|
|
// If all of our messages were stale, then there's no need for this |
|
// handler to continue running, so we can exit now. |
|
pendingMsgs, err = s.cfg.MessageStore.MessagesForPeer(peerPubKey) |
|
if err != nil { |
|
log.Errorf("Unable to retrieve pending messages for peer %x: %v", |
|
peerPubKey, err) |
|
return |
|
} |
|
|
|
if len(pendingMsgs) == 0 { |
|
log.Debugf("No pending messages left for peer=%x", peerPubKey) |
|
|
|
s.activePeersMtx.Lock() |
|
delete(s.activePeers, peerPubKey) |
|
s.activePeersMtx.Unlock() |
|
|
|
close(peerMgr.done) |
|
|
|
return |
|
} |
|
|
|
// Once the pending messages are sent, we can continue to send any |
|
// future messages while the peer remains connected. |
|
for { |
|
select { |
|
case msg := <-peerMgr.msgs: |
|
// Retrieve the short channel ID for which this message |
|
// applies for logging purposes. The error can be |
|
// ignored as the store can only contain messages which |
|
// have a ShortChannelID field. |
|
shortChanID, _ := msgShortChanID(msg) |
|
|
|
if err := peer.SendMessage(false, msg); err != nil { |
|
log.Errorf("Unable to send %v message for "+ |
|
"channel=%v to %x: %v", msg.MsgType(), |
|
shortChanID, peerPubKey, err) |
|
} |
|
|
|
log.Debugf("Successfully sent %v message for "+ |
|
"channel=%v with peer=%x", msg.MsgType(), |
|
shortChanID, peerPubKey) |
|
|
|
case <-offlineChan: |
|
goto waitUntilOnline |
|
|
|
case <-s.quit: |
|
return |
|
} |
|
} |
|
} |
|
|
|
// resendPendingMsgs retrieves and sends all of the messages within the message |
|
// store that should be reliably sent to their respective peers. |
|
func (s *reliableSender) resendPendingMsgs() error { |
|
// Fetch all of the peers for which we have pending messages for and |
|
// spawn a peerMsgHandler for each. Once the peer is seen as online, all |
|
// of the pending messages will be sent. |
|
peers, err := s.cfg.MessageStore.Peers() |
|
if err != nil { |
|
return err |
|
} |
|
|
|
for peer := range peers { |
|
s.spawnPeerHandler(peer) |
|
} |
|
|
|
return nil |
|
}
|
|
|