discovery/reliable_sender: implement message-agnostic reliable sender

In this commit, we implement a new subsystem for the gossiper that
uses some of the existing logic for resending channel announcement
signatures and implements it in a way to make it message-agnostic,
meaning that any type of message can be resent. Along the way we also
modify the way this works to prevent multiple goroutines per peer _and_
message.

A peerHandler will be spawned for each peer for which we attempt to send
a message reliably to. This handler is responsible for managing requests
to reliably send messages to a peer while also taking the peer's
connection lifecycle into account by requesting notifications for when
the peer connects/disconnects. A peer connection notification is first
requested to determine when we should attempt to send any pending
messages. After the messages are sent, a peer disconnection notification
is requested to ensure we don't continue to request connection
notifications while the peer remains connected. Once there are no more
pending messages left to be sent for a given peer, the peerHandler can
be torn down.
This commit is contained in:
Wilmer Paulino 2019-02-05 17:18:49 -08:00
parent 6e556aa897
commit 2f679f6015
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 628 additions and 0 deletions

@ -0,0 +1,316 @@
package discovery
import (
"sync"
"github.com/btcsuite/btcd/btcec"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
)
// reliableSenderCfg contains all of necessary items for the reliableSender to
// carry out its duties.
type reliableSenderCfg struct {
// NotifyWhenOnline is a function that allows the gossiper to be
// notified when a certain peer comes online, allowing it to
// retry sending a peer message.
//
// NOTE: The peerChan channel must be buffered.
//
// TODO(wilmer): use [33]byte to avoid unnecessary serializations.
NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer)
// NotifyWhenOffline is a function that allows the gossiper to be
// notified when a certain peer disconnects, allowing it to request a
// notification for when it reconnects.
NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
// MessageStore is a persistent storage of gossip messages which we will
// use to determine which messages need to be resent for a given peer.
MessageStore GossipMessageStore
// IsMsgStale determines whether a message retrieved from the backing
// MessageStore is seen as stale by the current graph.
IsMsgStale func(lnwire.Message) bool
}
// peerManager contains the set of channels required for the peerHandler to
// properly carry out its duties.
type peerManager struct {
// msgs is the channel through which messages will be streamed to the
// handler in order to send the message to the peer while they're
// online.
msgs chan lnwire.Message
// done is a channel that will be closed to signal that the handler for
// the given peer has been torn down for whatever reason.
done chan struct{}
}
// reliableSender is a small subsystem of the gossiper used to reliably send
// gossip messages to peers.
type reliableSender struct {
start sync.Once
stop sync.Once
cfg reliableSenderCfg
// activePeers keeps track of whether a peerHandler exists for a given
// peer. A peerHandler is tasked with handling requests for messages
// that should be reliably sent to peers while also taking into account
// the peer's connection lifecycle.
activePeers map[[33]byte]peerManager
activePeersMtx sync.Mutex
wg sync.WaitGroup
quit chan struct{}
}
// newReliableSender returns a new reliableSender backed by the given config.
func newReliableSender(cfg *reliableSenderCfg) *reliableSender {
return &reliableSender{
cfg: *cfg,
activePeers: make(map[[33]byte]peerManager),
quit: make(chan struct{}),
}
}
// Start spawns message handlers for any peers with pending messages.
func (s *reliableSender) Start() error {
var err error
s.start.Do(func() {
err = s.resendPendingMsgs()
})
return err
}
// Stop halts the reliable sender from sending messages to peers.
func (s *reliableSender) Stop() {
s.stop.Do(func() {
close(s.quit)
s.wg.Wait()
})
}
// sendMessage constructs a request to send a message reliably to a peer. In the
// event that the peer is currently offline, this will only write the message to
// disk. Once the peer reconnects, this message, along with any others pending,
// will be sent to the peer.
func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) error {
// We'll start by persisting the message to disk. This allows us to
// resend the message upon restarts and peer reconnections.
if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil {
return err
}
// Then, we'll spawn a peerHandler for this peer to handle resending its
// pending messages while taking into account its connection lifecycle.
spawnHandler:
msgHandler, ok := s.spawnPeerHandler(peerPubKey)
// If the handler wasn't previously active, we can exit now as we know
// that the message will be sent once the peer online notification is
// received. This prevents us from potentially sending the message
// twice.
if !ok {
return nil
}
// Otherwise, we'll attempt to stream the message to the handler.
// There's a subtle race condition where the handler can be torn down
// due to all of the messages sent being stale, so we'll handle this
// gracefully by spawning another one to prevent blocking.
select {
case msgHandler.msgs <- msg:
case <-msgHandler.done:
goto spawnHandler
case <-s.quit:
return ErrGossiperShuttingDown
}
return nil
}
// spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't
// one already active. The boolean returned signals whether there was already
// one active or not.
func (s *reliableSender) spawnPeerHandler(peerPubKey [33]byte) (peerManager, bool) {
s.activePeersMtx.Lock()
defer s.activePeersMtx.Unlock()
msgHandler, ok := s.activePeers[peerPubKey]
if !ok {
msgHandler = peerManager{
msgs: make(chan lnwire.Message),
done: make(chan struct{}),
}
s.activePeers[peerPubKey] = msgHandler
s.wg.Add(1)
go s.peerHandler(msgHandler, peerPubKey)
}
return msgHandler, ok
}
// peerHandler is responsible for handling our reliable message send requests
// for a given peer while also taking into account the peer's connection
// lifecycle. Any messages that are attempted to be sent while the peer is
// offline will be queued and sent once the peer reconnects.
//
// NOTE: This must be run as a goroutine.
func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) {
defer s.wg.Done()
// We'll start by requesting a notification for when the peer
// reconnects.
pubKey, _ := btcec.ParsePubKey(peerPubKey[:], btcec.S256())
peerChan := make(chan lnpeer.Peer, 1)
waitUntilOnline:
log.Debugf("Requesting online notification for peer=%x", peerPubKey)
s.cfg.NotifyWhenOnline(pubKey, peerChan)
var peer lnpeer.Peer
out:
for {
select {
// While we're waiting, we'll also consume any messages that
// must be sent to prevent blocking the caller. These can be
// ignored for now since the peer is currently offline. Once
// they reconnect, the messages will be sent since they should
// have been persisted to disk.
case <-peerMgr.msgs:
case peer = <-peerChan:
break out
case <-s.quit:
return
}
}
log.Debugf("Peer=%x is now online, proceeding to send pending messages",
peerPubKey)
// Once we detect the peer has reconnected, we'll also request a
// notification for when they disconnect. We'll use this to make sure
// they haven't disconnected (in the case of a flappy peer, etc.) by the
// time we attempt to send them the pending messages.
log.Debugf("Requesting offline notification for peer=%x", peerPubKey)
offlineChan := s.cfg.NotifyWhenOffline(peerPubKey)
pendingMsgs, err := s.cfg.MessageStore.MessagesForPeer(peerPubKey)
if err != nil {
log.Errorf("Unable to retrieve pending messages for peer %x: %v",
peerPubKey, err)
return
}
// With the peer online, we can now proceed to send our pending messages
// for them.
for _, msg := range pendingMsgs {
// Retrieve the short channel ID for which this message applies
// for logging purposes. The error can be ignored as the store
// can only contain messages which have a ShortChannelID field.
shortChanID, _ := msgShortChanID(msg)
if err := peer.SendMessage(false, msg); err != nil {
log.Errorf("Unable to send %v message for channel=%v "+
"to %x: %v", msg.MsgType(), shortChanID,
peerPubKey, err)
goto waitUntilOnline
}
log.Debugf("Successfully sent %v message for channel=%v with "+
"peer=%x upon reconnection", msg.MsgType(), shortChanID,
peerPubKey)
// Now that the message has at least been sent once, we can
// check whether it's stale. This guarantees that
// AnnounceSignatures are sent at least once if we happen to
// already have signatures for both parties.
if s.cfg.IsMsgStale(msg) {
err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey)
if err != nil {
log.Errorf("Unable to remove stale %v message "+
"for channel=%v with peer %x: %v",
msg.MsgType(), shortChanID, peerPubKey,
err)
continue
}
log.Debugf("Removed stale %v message for channel=%v "+
"with peer=%x", msg.MsgType(), shortChanID,
peerPubKey)
}
}
// If all of our messages were stale, then there's no need for this
// handler to continue running, so we can exit now.
pendingMsgs, err = s.cfg.MessageStore.MessagesForPeer(peerPubKey)
if err != nil {
log.Errorf("Unable to retrieve pending messages for peer %x: %v",
peerPubKey, err)
return
}
if len(pendingMsgs) == 0 {
log.Debugf("No pending messages left for peer=%x", peerPubKey)
s.activePeersMtx.Lock()
delete(s.activePeers, peerPubKey)
s.activePeersMtx.Unlock()
close(peerMgr.done)
return
}
// Once the pending messages are sent, we can continue to send any
// future messages while the peer remains connected.
for {
select {
case msg := <-peerMgr.msgs:
// Retrieve the short channel ID for which this message
// applies for logging purposes. The error can be
// ignored as the store can only contain messages which
// have a ShortChannelID field.
shortChanID, _ := msgShortChanID(msg)
if err := peer.SendMessage(false, msg); err != nil {
log.Errorf("Unable to send %v message for "+
"channel=%v to %x: %v", msg.MsgType(),
shortChanID, peerPubKey, err)
}
log.Debugf("Successfully sent %v message for "+
"channel=%v with peer=%x", msg.MsgType(),
shortChanID, peerPubKey)
case <-offlineChan:
goto waitUntilOnline
case <-s.quit:
return
}
}
}
// resendPendingMsgs retrieves and sends all of the messages within the message
// store that should be reliably sent to their respective peers.
func (s *reliableSender) resendPendingMsgs() error {
// Fetch all of the peers for which we have pending messages for and
// spawn a peerMsgHandler for each. Once the peer is seen as online, all
// of the pending messages will be sent.
peers, err := s.cfg.MessageStore.Peers()
if err != nil {
return err
}
for peer := range peers {
s.spawnPeerHandler(peer)
}
return nil
}

@ -0,0 +1,312 @@
package discovery
import (
"fmt"
"testing"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
)
// newTestReliableSender creates a new reliable sender instance used for
// testing.
func newTestReliableSender(t *testing.T) *reliableSender {
t.Helper()
cfg := &reliableSenderCfg{
NotifyWhenOnline: func(pubKey *btcec.PublicKey,
peerChan chan<- lnpeer.Peer) {
peerChan <- &mockPeer{pk: pubKey}
},
NotifyWhenOffline: func(_ [33]byte) <-chan struct{} {
c := make(chan struct{}, 1)
return c
},
MessageStore: newMockMessageStore(),
IsMsgStale: func(lnwire.Message) bool {
return false
},
}
return newReliableSender(cfg)
}
// assertMsgsSent ensures that the given messages can be read from a mock peer's
// msgChan.
func assertMsgsSent(t *testing.T, msgChan chan lnwire.Message,
msgs ...lnwire.Message) {
t.Helper()
m := make(map[lnwire.Message]struct{}, len(msgs))
for _, msg := range msgs {
m[msg] = struct{}{}
}
for i := 0; i < len(msgs); i++ {
select {
case msg := <-msgChan:
if _, ok := m[msg]; !ok {
t.Fatalf("found unexpected message sent: %v",
spew.Sdump(msg))
}
case <-time.After(time.Second):
t.Fatal("reliable sender did not send message to peer")
}
}
}
// waitPredicate is a helper test function that will wait for a timeout period
// of time until the passed predicate returns true.
func waitPredicate(t *testing.T, timeout time.Duration, pred func() bool) {
t.Helper()
const pollInterval = 20 * time.Millisecond
exitTimer := time.After(timeout)
for {
<-time.After(pollInterval)
select {
case <-exitTimer:
t.Fatalf("predicate not satisfied after timeout")
default:
}
if pred() {
return
}
}
}
// TestReliableSenderFlow ensures that the flow for sending messages reliably to
// a peer while taking into account its connection lifecycle works as expected.
func TestReliableSenderFlow(t *testing.T) {
t.Parallel()
reliableSender := newTestReliableSender(t)
// Create a mock peer to send the messages to.
pubKey := randPubKey(t)
msgsSent := make(chan lnwire.Message)
peer := &mockPeer{pubKey, msgsSent, reliableSender.quit}
// Override NotifyWhenOnline and NotifyWhenOffline to provide the
// notification channels so that we can control when notifications get
// dispatched.
notifyOnline := make(chan chan<- lnpeer.Peer, 2)
notifyOffline := make(chan chan struct{}, 1)
reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey,
peerChan chan<- lnpeer.Peer) {
notifyOnline <- peerChan
}
reliableSender.cfg.NotifyWhenOffline = func(_ [33]byte) <-chan struct{} {
c := make(chan struct{}, 1)
notifyOffline <- c
return c
}
// We'll start by creating our first message which we should reliably
// send to our peer.
msg1 := randChannelUpdate()
var peerPubKey [33]byte
copy(peerPubKey[:], pubKey.SerializeCompressed())
if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil {
t.Fatalf("unable to reliably send message: %v", err)
}
// Since there isn't a peerHandler for this peer currently active due to
// this being the first message being sent reliably, we should expect to
// see a notification request for when the peer is online.
var peerChan chan<- lnpeer.Peer
select {
case peerChan = <-notifyOnline:
case <-time.After(time.Second):
t.Fatal("reliable sender did not request online notification")
}
// We'll then attempt to send another additional message reliably.
msg2 := randAnnounceSignatures()
if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil {
t.Fatalf("unable to reliably send message: %v", err)
}
// This should not however request another peer online notification as
// the peerHandler has already been started and is waiting for the
// notification to be dispatched.
select {
case <-notifyOnline:
t.Fatal("reliable sender should not request online notification")
case <-time.After(time.Second):
}
// We'll go ahead and notify the peer.
peerChan <- peer
// By doing so, we should expect to see a notification request for when
// the peer is offline.
var offlineChan chan struct{}
select {
case offlineChan = <-notifyOffline:
case <-time.After(time.Second):
t.Fatal("reliable sender did not request offline notification")
}
// We should also see the messages arrive at the peer since they are now
// seen as online.
assertMsgsSent(t, peer.sentMsgs, msg1, msg2)
// Then, we'll send one more message reliably.
msg3 := randChannelUpdate()
if err := reliableSender.sendMessage(msg3, peerPubKey); err != nil {
t.Fatalf("unable to reliably send message: %v", err)
}
// Again, this should not request another peer online notification
// request since we are currently waiting for the peer to be offline.
select {
case <-notifyOnline:
t.Fatal("reliable sender should not request online notification")
case <-time.After(time.Second):
}
// The expected message should be sent to the peer.
assertMsgsSent(t, peer.sentMsgs, msg3)
// We'll then notify that the peer is offline.
close(offlineChan)
// This should cause an online notification to be requested.
select {
case peerChan = <-notifyOnline:
case <-time.After(time.Second):
t.Fatal("reliable sender did not request online notification")
}
// Once we dispatch it, we should expect to see the messages be resent
// to the peer as they are not stale.
peerChan <- peer
select {
case <-notifyOffline:
case <-time.After(5 * time.Second):
t.Fatal("reliable sender did not request offline notification")
}
assertMsgsSent(t, peer.sentMsgs, msg1, msg2, msg3)
}
// TestReliableSenderStaleMessages ensures that the reliable sender is no longer
// active for a peer which has successfully sent all of its messages and deemed
// them as stale.
func TestReliableSenderStaleMessages(t *testing.T) {
t.Parallel()
reliableSender := newTestReliableSender(t)
// Create a mock peer to send the messages to.
pubKey := randPubKey(t)
msgsSent := make(chan lnwire.Message)
peer := &mockPeer{pubKey, msgsSent, reliableSender.quit}
// Override NotifyWhenOnline to provide the notification channel so that
// we can control when notifications get dispatched.
notifyOnline := make(chan chan<- lnpeer.Peer, 1)
reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey,
peerChan chan<- lnpeer.Peer) {
notifyOnline <- peerChan
}
// We'll also override IsMsgStale to mark all messages as stale as we're
// interested in testing the stale message behavior.
reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool {
return true
}
// We'll start by creating our first message which we should reliably
// send to our peer, but will be seen as stale.
msg1 := randAnnounceSignatures()
var peerPubKey [33]byte
copy(peerPubKey[:], pubKey.SerializeCompressed())
if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil {
t.Fatalf("unable to reliably send message: %v", err)
}
// Since there isn't a peerHandler for this peer currently active due to
// this being the first message being sent reliably, we should expect to
// see a notification request for when the peer is online.
var peerChan chan<- lnpeer.Peer
select {
case peerChan = <-notifyOnline:
case <-time.After(time.Second):
t.Fatal("reliable sender did not request online notification")
}
// We'll go ahead and notify the peer.
peerChan <- peer
// This should cause the message to be sent to the peer since they are
// now seen as online. The message will be sent at least once to ensure
// they can propagate before deciding whether they are stale or not.
assertMsgsSent(t, peer.sentMsgs, msg1)
// We'll create another message which we'll send reliably. This one
// won't be seen as stale.
msg2 := randChannelUpdate()
// We'll then wait for the message to be removed from the backing
// message store since it is seen as stale and has been sent at least
// once. Once the message is removed, the peerHandler should be torn
// down as there are no longer any pending messages within the store.
var predErr error
waitPredicate(t, time.Second, func() bool {
msgs, err := reliableSender.cfg.MessageStore.MessagesForPeer(
peerPubKey,
)
if err != nil {
predErr = fmt.Errorf("unable to retrieve messages for "+
"peer: %v", err)
return false
}
if len(msgs) != 0 {
predErr = fmt.Errorf("expected to not find any "+
"messages for peer, found %d", len(msgs))
return false
}
predErr = nil
return true
})
if predErr != nil {
t.Fatal(predErr)
}
// Override IsMsgStale to no longer mark messages as stale.
reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool {
return false
}
// We'll request the message to be sent reliably.
if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil {
t.Fatalf("unable to reliably send message: %v", err)
}
// We should see an online notification request indicating that a new
// peerHandler has been spawned since it was previously torn down.
select {
case peerChan = <-notifyOnline:
case <-time.After(time.Second):
t.Fatal("reliable sender did not request online notification")
}
// Finally, notifying the peer is online should prompt the message to be
// sent. Only the ChannelUpdate will be sent in this case since the
// AnnounceSignatures message above was seen as stale.
peerChan <- peer
assertMsgsSent(t, peer.sentMsgs, msg2)
}