You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
290 lines
8.9 KiB
290 lines
8.9 KiB
package discovery |
|
|
|
import ( |
|
"fmt" |
|
"testing" |
|
"time" |
|
|
|
"github.com/btcsuite/btcd/btcec" |
|
"github.com/davecgh/go-spew/spew" |
|
"github.com/lightningnetwork/lnd/lnpeer" |
|
"github.com/lightningnetwork/lnd/lntest/wait" |
|
"github.com/lightningnetwork/lnd/lnwire" |
|
) |
|
|
|
// newTestReliableSender creates a new reliable sender instance used for |
|
// testing. |
|
func newTestReliableSender(t *testing.T) *reliableSender { |
|
t.Helper() |
|
|
|
cfg := &reliableSenderCfg{ |
|
NotifyWhenOnline: func(pubKey [33]byte, |
|
peerChan chan<- lnpeer.Peer) { |
|
pk, err := btcec.ParsePubKey(pubKey[:], btcec.S256()) |
|
if err != nil { |
|
t.Fatalf("unable to parse pubkey: %v", err) |
|
} |
|
peerChan <- &mockPeer{pk: pk} |
|
}, |
|
NotifyWhenOffline: func(_ [33]byte) <-chan struct{} { |
|
c := make(chan struct{}, 1) |
|
return c |
|
}, |
|
MessageStore: newMockMessageStore(), |
|
IsMsgStale: func(lnwire.Message) bool { |
|
return false |
|
}, |
|
} |
|
|
|
return newReliableSender(cfg) |
|
} |
|
|
|
// assertMsgsSent ensures that the given messages can be read from a mock peer's |
|
// msgChan. |
|
func assertMsgsSent(t *testing.T, msgChan chan lnwire.Message, |
|
msgs ...lnwire.Message) { |
|
|
|
t.Helper() |
|
|
|
m := make(map[lnwire.Message]struct{}, len(msgs)) |
|
for _, msg := range msgs { |
|
m[msg] = struct{}{} |
|
} |
|
|
|
for i := 0; i < len(msgs); i++ { |
|
select { |
|
case msg := <-msgChan: |
|
if _, ok := m[msg]; !ok { |
|
t.Fatalf("found unexpected message sent: %v", |
|
spew.Sdump(msg)) |
|
} |
|
case <-time.After(time.Second): |
|
t.Fatal("reliable sender did not send message to peer") |
|
} |
|
} |
|
} |
|
|
|
// TestReliableSenderFlow ensures that the flow for sending messages reliably to |
|
// a peer while taking into account its connection lifecycle works as expected. |
|
func TestReliableSenderFlow(t *testing.T) { |
|
t.Parallel() |
|
|
|
reliableSender := newTestReliableSender(t) |
|
|
|
// Create a mock peer to send the messages to. |
|
pubKey := randPubKey(t) |
|
msgsSent := make(chan lnwire.Message) |
|
peer := &mockPeer{pubKey, msgsSent, reliableSender.quit} |
|
|
|
// Override NotifyWhenOnline and NotifyWhenOffline to provide the |
|
// notification channels so that we can control when notifications get |
|
// dispatched. |
|
notifyOnline := make(chan chan<- lnpeer.Peer, 2) |
|
notifyOffline := make(chan chan struct{}, 1) |
|
|
|
reliableSender.cfg.NotifyWhenOnline = func(_ [33]byte, |
|
peerChan chan<- lnpeer.Peer) { |
|
notifyOnline <- peerChan |
|
} |
|
reliableSender.cfg.NotifyWhenOffline = func(_ [33]byte) <-chan struct{} { |
|
c := make(chan struct{}, 1) |
|
notifyOffline <- c |
|
return c |
|
} |
|
|
|
// We'll start by creating our first message which we should reliably |
|
// send to our peer. |
|
msg1 := randChannelUpdate() |
|
var peerPubKey [33]byte |
|
copy(peerPubKey[:], pubKey.SerializeCompressed()) |
|
if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil { |
|
t.Fatalf("unable to reliably send message: %v", err) |
|
} |
|
|
|
// Since there isn't a peerHandler for this peer currently active due to |
|
// this being the first message being sent reliably, we should expect to |
|
// see a notification request for when the peer is online. |
|
var peerChan chan<- lnpeer.Peer |
|
select { |
|
case peerChan = <-notifyOnline: |
|
case <-time.After(time.Second): |
|
t.Fatal("reliable sender did not request online notification") |
|
} |
|
|
|
// We'll then attempt to send another additional message reliably. |
|
msg2 := randAnnounceSignatures() |
|
if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil { |
|
t.Fatalf("unable to reliably send message: %v", err) |
|
} |
|
|
|
// This should not however request another peer online notification as |
|
// the peerHandler has already been started and is waiting for the |
|
// notification to be dispatched. |
|
select { |
|
case <-notifyOnline: |
|
t.Fatal("reliable sender should not request online notification") |
|
case <-time.After(time.Second): |
|
} |
|
|
|
// We'll go ahead and notify the peer. |
|
peerChan <- peer |
|
|
|
// By doing so, we should expect to see a notification request for when |
|
// the peer is offline. |
|
var offlineChan chan struct{} |
|
select { |
|
case offlineChan = <-notifyOffline: |
|
case <-time.After(time.Second): |
|
t.Fatal("reliable sender did not request offline notification") |
|
} |
|
|
|
// We should also see the messages arrive at the peer since they are now |
|
// seen as online. |
|
assertMsgsSent(t, peer.sentMsgs, msg1, msg2) |
|
|
|
// Then, we'll send one more message reliably. |
|
msg3 := randChannelUpdate() |
|
if err := reliableSender.sendMessage(msg3, peerPubKey); err != nil { |
|
t.Fatalf("unable to reliably send message: %v", err) |
|
} |
|
|
|
// Again, this should not request another peer online notification |
|
// request since we are currently waiting for the peer to be offline. |
|
select { |
|
case <-notifyOnline: |
|
t.Fatal("reliable sender should not request online notification") |
|
case <-time.After(time.Second): |
|
} |
|
|
|
// The expected message should be sent to the peer. |
|
assertMsgsSent(t, peer.sentMsgs, msg3) |
|
|
|
// We'll then notify that the peer is offline. |
|
close(offlineChan) |
|
|
|
// This should cause an online notification to be requested. |
|
select { |
|
case peerChan = <-notifyOnline: |
|
case <-time.After(time.Second): |
|
t.Fatal("reliable sender did not request online notification") |
|
} |
|
|
|
// Once we dispatch it, we should expect to see the messages be resent |
|
// to the peer as they are not stale. |
|
peerChan <- peer |
|
|
|
select { |
|
case <-notifyOffline: |
|
case <-time.After(5 * time.Second): |
|
t.Fatal("reliable sender did not request offline notification") |
|
} |
|
|
|
assertMsgsSent(t, peer.sentMsgs, msg1, msg2, msg3) |
|
} |
|
|
|
// TestReliableSenderStaleMessages ensures that the reliable sender is no longer |
|
// active for a peer which has successfully sent all of its messages and deemed |
|
// them as stale. |
|
func TestReliableSenderStaleMessages(t *testing.T) { |
|
t.Parallel() |
|
|
|
reliableSender := newTestReliableSender(t) |
|
|
|
// Create a mock peer to send the messages to. |
|
pubKey := randPubKey(t) |
|
msgsSent := make(chan lnwire.Message) |
|
peer := &mockPeer{pubKey, msgsSent, reliableSender.quit} |
|
|
|
// Override NotifyWhenOnline to provide the notification channel so that |
|
// we can control when notifications get dispatched. |
|
notifyOnline := make(chan chan<- lnpeer.Peer, 1) |
|
reliableSender.cfg.NotifyWhenOnline = func(_ [33]byte, |
|
peerChan chan<- lnpeer.Peer) { |
|
notifyOnline <- peerChan |
|
} |
|
|
|
// We'll also override IsMsgStale to mark all messages as stale as we're |
|
// interested in testing the stale message behavior. |
|
reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool { |
|
return true |
|
} |
|
|
|
// We'll start by creating our first message which we should reliably |
|
// send to our peer, but will be seen as stale. |
|
msg1 := randAnnounceSignatures() |
|
var peerPubKey [33]byte |
|
copy(peerPubKey[:], pubKey.SerializeCompressed()) |
|
if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil { |
|
t.Fatalf("unable to reliably send message: %v", err) |
|
} |
|
|
|
// Since there isn't a peerHandler for this peer currently active due to |
|
// this being the first message being sent reliably, we should expect to |
|
// see a notification request for when the peer is online. |
|
var peerChan chan<- lnpeer.Peer |
|
select { |
|
case peerChan = <-notifyOnline: |
|
case <-time.After(time.Second): |
|
t.Fatal("reliable sender did not request online notification") |
|
} |
|
|
|
// We'll go ahead and notify the peer. |
|
peerChan <- peer |
|
|
|
// This should cause the message to be sent to the peer since they are |
|
// now seen as online. The message will be sent at least once to ensure |
|
// they can propagate before deciding whether they are stale or not. |
|
assertMsgsSent(t, peer.sentMsgs, msg1) |
|
|
|
// We'll create another message which we'll send reliably. This one |
|
// won't be seen as stale. |
|
msg2 := randChannelUpdate() |
|
|
|
// We'll then wait for the message to be removed from the backing |
|
// message store since it is seen as stale and has been sent at least |
|
// once. Once the message is removed, the peerHandler should be torn |
|
// down as there are no longer any pending messages within the store. |
|
err := wait.NoError(func() error { |
|
msgs, err := reliableSender.cfg.MessageStore.MessagesForPeer( |
|
peerPubKey, |
|
) |
|
if err != nil { |
|
return fmt.Errorf("unable to retrieve messages for "+ |
|
"peer: %v", err) |
|
} |
|
if len(msgs) != 0 { |
|
return fmt.Errorf("expected to not find any "+ |
|
"messages for peer, found %d", len(msgs)) |
|
} |
|
|
|
return nil |
|
}, time.Second) |
|
if err != nil { |
|
t.Fatal(err) |
|
} |
|
|
|
// Override IsMsgStale to no longer mark messages as stale. |
|
reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool { |
|
return false |
|
} |
|
|
|
// We'll request the message to be sent reliably. |
|
if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil { |
|
t.Fatalf("unable to reliably send message: %v", err) |
|
} |
|
|
|
// We should see an online notification request indicating that a new |
|
// peerHandler has been spawned since it was previously torn down. |
|
select { |
|
case peerChan = <-notifyOnline: |
|
case <-time.After(time.Second): |
|
t.Fatal("reliable sender did not request online notification") |
|
} |
|
|
|
// Finally, notifying the peer is online should prompt the message to be |
|
// sent. Only the ChannelUpdate will be sent in this case since the |
|
// AnnounceSignatures message above was seen as stale. |
|
peerChan <- peer |
|
|
|
assertMsgsSent(t, peer.sentMsgs, msg2) |
|
}
|
|
|