2f679f6015
In this commit, we implement a new subsystem for the gossiper that uses some of the existing logic for resending channel announcement signatures and implements it in a way to make it message-agnostic, meaning that any type of message can be resent. Along the way we also modify the way this works to prevent multiple goroutines per peer _and_ message. A peerHandler will be spawned for each peer for which we attempt to send a message reliably to. This handler is responsible for managing requests to reliably send messages to a peer while also taking the peer's connection lifecycle into account by requesting notifications for when the peer connects/disconnects. A peer connection notification is first requested to determine when we should attempt to send any pending messages. After the messages are sent, a peer disconnection notification is requested to ensure we don't continue to request connection notifications while the peer remains connected. Once there are no more pending messages left to be sent for a given peer, the peerHandler can be torn down.
313 lines
9.3 KiB
Go
313 lines
9.3 KiB
Go
package discovery
|
|
|
|
import (
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/btcsuite/btcd/btcec"
|
|
"github.com/davecgh/go-spew/spew"
|
|
"github.com/lightningnetwork/lnd/lnpeer"
|
|
"github.com/lightningnetwork/lnd/lnwire"
|
|
)
|
|
|
|
// newTestReliableSender creates a new reliable sender instance used for
|
|
// testing.
|
|
func newTestReliableSender(t *testing.T) *reliableSender {
|
|
t.Helper()
|
|
|
|
cfg := &reliableSenderCfg{
|
|
NotifyWhenOnline: func(pubKey *btcec.PublicKey,
|
|
peerChan chan<- lnpeer.Peer) {
|
|
peerChan <- &mockPeer{pk: pubKey}
|
|
},
|
|
NotifyWhenOffline: func(_ [33]byte) <-chan struct{} {
|
|
c := make(chan struct{}, 1)
|
|
return c
|
|
},
|
|
MessageStore: newMockMessageStore(),
|
|
IsMsgStale: func(lnwire.Message) bool {
|
|
return false
|
|
},
|
|
}
|
|
|
|
return newReliableSender(cfg)
|
|
}
|
|
|
|
// assertMsgsSent ensures that the given messages can be read from a mock peer's
|
|
// msgChan.
|
|
func assertMsgsSent(t *testing.T, msgChan chan lnwire.Message,
|
|
msgs ...lnwire.Message) {
|
|
|
|
t.Helper()
|
|
|
|
m := make(map[lnwire.Message]struct{}, len(msgs))
|
|
for _, msg := range msgs {
|
|
m[msg] = struct{}{}
|
|
}
|
|
|
|
for i := 0; i < len(msgs); i++ {
|
|
select {
|
|
case msg := <-msgChan:
|
|
if _, ok := m[msg]; !ok {
|
|
t.Fatalf("found unexpected message sent: %v",
|
|
spew.Sdump(msg))
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatal("reliable sender did not send message to peer")
|
|
}
|
|
}
|
|
}
|
|
|
|
// waitPredicate is a helper test function that will wait for a timeout period
|
|
// of time until the passed predicate returns true.
|
|
func waitPredicate(t *testing.T, timeout time.Duration, pred func() bool) {
|
|
t.Helper()
|
|
|
|
const pollInterval = 20 * time.Millisecond
|
|
exitTimer := time.After(timeout)
|
|
|
|
for {
|
|
<-time.After(pollInterval)
|
|
|
|
select {
|
|
case <-exitTimer:
|
|
t.Fatalf("predicate not satisfied after timeout")
|
|
default:
|
|
}
|
|
|
|
if pred() {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestReliableSenderFlow ensures that the flow for sending messages reliably to
|
|
// a peer while taking into account its connection lifecycle works as expected.
|
|
func TestReliableSenderFlow(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
reliableSender := newTestReliableSender(t)
|
|
|
|
// Create a mock peer to send the messages to.
|
|
pubKey := randPubKey(t)
|
|
msgsSent := make(chan lnwire.Message)
|
|
peer := &mockPeer{pubKey, msgsSent, reliableSender.quit}
|
|
|
|
// Override NotifyWhenOnline and NotifyWhenOffline to provide the
|
|
// notification channels so that we can control when notifications get
|
|
// dispatched.
|
|
notifyOnline := make(chan chan<- lnpeer.Peer, 2)
|
|
notifyOffline := make(chan chan struct{}, 1)
|
|
|
|
reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey,
|
|
peerChan chan<- lnpeer.Peer) {
|
|
notifyOnline <- peerChan
|
|
}
|
|
reliableSender.cfg.NotifyWhenOffline = func(_ [33]byte) <-chan struct{} {
|
|
c := make(chan struct{}, 1)
|
|
notifyOffline <- c
|
|
return c
|
|
}
|
|
|
|
// We'll start by creating our first message which we should reliably
|
|
// send to our peer.
|
|
msg1 := randChannelUpdate()
|
|
var peerPubKey [33]byte
|
|
copy(peerPubKey[:], pubKey.SerializeCompressed())
|
|
if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil {
|
|
t.Fatalf("unable to reliably send message: %v", err)
|
|
}
|
|
|
|
// Since there isn't a peerHandler for this peer currently active due to
|
|
// this being the first message being sent reliably, we should expect to
|
|
// see a notification request for when the peer is online.
|
|
var peerChan chan<- lnpeer.Peer
|
|
select {
|
|
case peerChan = <-notifyOnline:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("reliable sender did not request online notification")
|
|
}
|
|
|
|
// We'll then attempt to send another additional message reliably.
|
|
msg2 := randAnnounceSignatures()
|
|
if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil {
|
|
t.Fatalf("unable to reliably send message: %v", err)
|
|
}
|
|
|
|
// This should not however request another peer online notification as
|
|
// the peerHandler has already been started and is waiting for the
|
|
// notification to be dispatched.
|
|
select {
|
|
case <-notifyOnline:
|
|
t.Fatal("reliable sender should not request online notification")
|
|
case <-time.After(time.Second):
|
|
}
|
|
|
|
// We'll go ahead and notify the peer.
|
|
peerChan <- peer
|
|
|
|
// By doing so, we should expect to see a notification request for when
|
|
// the peer is offline.
|
|
var offlineChan chan struct{}
|
|
select {
|
|
case offlineChan = <-notifyOffline:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("reliable sender did not request offline notification")
|
|
}
|
|
|
|
// We should also see the messages arrive at the peer since they are now
|
|
// seen as online.
|
|
assertMsgsSent(t, peer.sentMsgs, msg1, msg2)
|
|
|
|
// Then, we'll send one more message reliably.
|
|
msg3 := randChannelUpdate()
|
|
if err := reliableSender.sendMessage(msg3, peerPubKey); err != nil {
|
|
t.Fatalf("unable to reliably send message: %v", err)
|
|
}
|
|
|
|
// Again, this should not request another peer online notification
|
|
// request since we are currently waiting for the peer to be offline.
|
|
select {
|
|
case <-notifyOnline:
|
|
t.Fatal("reliable sender should not request online notification")
|
|
case <-time.After(time.Second):
|
|
}
|
|
|
|
// The expected message should be sent to the peer.
|
|
assertMsgsSent(t, peer.sentMsgs, msg3)
|
|
|
|
// We'll then notify that the peer is offline.
|
|
close(offlineChan)
|
|
|
|
// This should cause an online notification to be requested.
|
|
select {
|
|
case peerChan = <-notifyOnline:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("reliable sender did not request online notification")
|
|
}
|
|
|
|
// Once we dispatch it, we should expect to see the messages be resent
|
|
// to the peer as they are not stale.
|
|
peerChan <- peer
|
|
|
|
select {
|
|
case <-notifyOffline:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("reliable sender did not request offline notification")
|
|
}
|
|
|
|
assertMsgsSent(t, peer.sentMsgs, msg1, msg2, msg3)
|
|
}
|
|
|
|
// TestReliableSenderStaleMessages ensures that the reliable sender is no longer
|
|
// active for a peer which has successfully sent all of its messages and deemed
|
|
// them as stale.
|
|
func TestReliableSenderStaleMessages(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
reliableSender := newTestReliableSender(t)
|
|
|
|
// Create a mock peer to send the messages to.
|
|
pubKey := randPubKey(t)
|
|
msgsSent := make(chan lnwire.Message)
|
|
peer := &mockPeer{pubKey, msgsSent, reliableSender.quit}
|
|
|
|
// Override NotifyWhenOnline to provide the notification channel so that
|
|
// we can control when notifications get dispatched.
|
|
notifyOnline := make(chan chan<- lnpeer.Peer, 1)
|
|
reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey,
|
|
peerChan chan<- lnpeer.Peer) {
|
|
notifyOnline <- peerChan
|
|
}
|
|
|
|
// We'll also override IsMsgStale to mark all messages as stale as we're
|
|
// interested in testing the stale message behavior.
|
|
reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool {
|
|
return true
|
|
}
|
|
|
|
// We'll start by creating our first message which we should reliably
|
|
// send to our peer, but will be seen as stale.
|
|
msg1 := randAnnounceSignatures()
|
|
var peerPubKey [33]byte
|
|
copy(peerPubKey[:], pubKey.SerializeCompressed())
|
|
if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil {
|
|
t.Fatalf("unable to reliably send message: %v", err)
|
|
}
|
|
|
|
// Since there isn't a peerHandler for this peer currently active due to
|
|
// this being the first message being sent reliably, we should expect to
|
|
// see a notification request for when the peer is online.
|
|
var peerChan chan<- lnpeer.Peer
|
|
select {
|
|
case peerChan = <-notifyOnline:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("reliable sender did not request online notification")
|
|
}
|
|
|
|
// We'll go ahead and notify the peer.
|
|
peerChan <- peer
|
|
|
|
// This should cause the message to be sent to the peer since they are
|
|
// now seen as online. The message will be sent at least once to ensure
|
|
// they can propagate before deciding whether they are stale or not.
|
|
assertMsgsSent(t, peer.sentMsgs, msg1)
|
|
|
|
// We'll create another message which we'll send reliably. This one
|
|
// won't be seen as stale.
|
|
msg2 := randChannelUpdate()
|
|
|
|
// We'll then wait for the message to be removed from the backing
|
|
// message store since it is seen as stale and has been sent at least
|
|
// once. Once the message is removed, the peerHandler should be torn
|
|
// down as there are no longer any pending messages within the store.
|
|
var predErr error
|
|
waitPredicate(t, time.Second, func() bool {
|
|
msgs, err := reliableSender.cfg.MessageStore.MessagesForPeer(
|
|
peerPubKey,
|
|
)
|
|
if err != nil {
|
|
predErr = fmt.Errorf("unable to retrieve messages for "+
|
|
"peer: %v", err)
|
|
return false
|
|
}
|
|
if len(msgs) != 0 {
|
|
predErr = fmt.Errorf("expected to not find any "+
|
|
"messages for peer, found %d", len(msgs))
|
|
return false
|
|
}
|
|
|
|
predErr = nil
|
|
return true
|
|
})
|
|
if predErr != nil {
|
|
t.Fatal(predErr)
|
|
}
|
|
|
|
// Override IsMsgStale to no longer mark messages as stale.
|
|
reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool {
|
|
return false
|
|
}
|
|
|
|
// We'll request the message to be sent reliably.
|
|
if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil {
|
|
t.Fatalf("unable to reliably send message: %v", err)
|
|
}
|
|
|
|
// We should see an online notification request indicating that a new
|
|
// peerHandler has been spawned since it was previously torn down.
|
|
select {
|
|
case peerChan = <-notifyOnline:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("reliable sender did not request online notification")
|
|
}
|
|
|
|
// Finally, notifying the peer is online should prompt the message to be
|
|
// sent. Only the ChannelUpdate will be sent in this case since the
|
|
// AnnounceSignatures message above was seen as stale.
|
|
peerChan <- peer
|
|
|
|
assertMsgsSent(t, peer.sentMsgs, msg2)
|
|
}
|