discovery,fundingmanager: avoid serialization in NotifyWhenOnline

This commit is contained in:
Federico Bond 2019-05-30 18:37:30 -03:00
parent f802ebddba
commit 9bd3055fb8
7 changed files with 64 additions and 47 deletions

@ -140,9 +140,7 @@ type Config struct {
// retry sending a peer message. // retry sending a peer message.
// //
// NOTE: The peerChan channel must be buffered. // NOTE: The peerChan channel must be buffered.
// NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
// TODO(wilmer): use [33]byte to avoid unnecessary serializations.
NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer)
// NotifyWhenOffline is a function that allows the gossiper to be // NotifyWhenOffline is a function that allows the gossiper to be
// notified when a certain peer disconnects, allowing it to request a // notified when a certain peer disconnects, allowing it to request a

@ -734,9 +734,11 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
return nil return nil
}, },
NotifyWhenOnline: func(target *btcec.PublicKey, NotifyWhenOnline: func(target [33]byte,
peerChan chan<- lnpeer.Peer) { peerChan chan<- lnpeer.Peer) {
peerChan <- &mockPeer{target, nil, nil}
pk, _ := btcec.ParsePubKey(target[:], btcec.S256())
peerChan <- &mockPeer{pk, nil, nil}
}, },
NotifyWhenOffline: func(_ [33]byte) <-chan struct{} { NotifyWhenOffline: func(_ [33]byte) <-chan struct{} {
c := make(chan struct{}) c := make(chan struct{})
@ -981,11 +983,13 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
// Set up a channel that we can use to inspect the messages sent // Set up a channel that we can use to inspect the messages sent
// directly from the gossiper. // directly from the gossiper.
sentMsgs := make(chan lnwire.Message, 10) sentMsgs := make(chan lnwire.Message, 10)
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target [33]byte,
peerChan chan<- lnpeer.Peer) { peerChan chan<- lnpeer.Peer) {
pk, _ := btcec.ParsePubKey(target[:], btcec.S256())
select { select {
case peerChan <- &mockPeer{target, sentMsgs, ctx.gossiper.quit}: case peerChan <- &mockPeer{pk, sentMsgs, ctx.gossiper.quit}:
case <-ctx.gossiper.quit: case <-ctx.gossiper.quit:
} }
} }
@ -1178,11 +1182,13 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
// Set up a channel that we can use to inspect the messages sent // Set up a channel that we can use to inspect the messages sent
// directly from the gossiper. // directly from the gossiper.
sentMsgs := make(chan lnwire.Message, 10) sentMsgs := make(chan lnwire.Message, 10)
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target [33]byte,
peerChan chan<- lnpeer.Peer) { peerChan chan<- lnpeer.Peer) {
pk, _ := btcec.ParsePubKey(target[:], btcec.S256())
select { select {
case peerChan <- &mockPeer{target, sentMsgs, ctx.gossiper.quit}: case peerChan <- &mockPeer{pk, sentMsgs, ctx.gossiper.quit}:
case <-ctx.gossiper.quit: case <-ctx.gossiper.quit:
} }
} }
@ -1403,7 +1409,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
// channel through which it gets sent to control exactly when to // channel through which it gets sent to control exactly when to
// dispatch it. // dispatch it.
notifyPeers := make(chan chan<- lnpeer.Peer, 1) notifyPeers := make(chan chan<- lnpeer.Peer, 1)
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte,
connectedChan chan<- lnpeer.Peer) { connectedChan chan<- lnpeer.Peer) {
notifyPeers <- connectedChan notifyPeers <- connectedChan
} }
@ -1609,7 +1615,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
// Override NotifyWhenOnline to return the remote peer which we expect // Override NotifyWhenOnline to return the remote peer which we expect
// meesages to be sent to. // meesages to be sent to.
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte,
peerChan chan<- lnpeer.Peer) { peerChan chan<- lnpeer.Peer) {
peerChan <- remotePeer peerChan <- remotePeer
@ -2442,7 +2448,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
// Override NotifyWhenOnline to return the remote peer which we expect // Override NotifyWhenOnline to return the remote peer which we expect
// meesages to be sent to. // meesages to be sent to.
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte,
peerChan chan<- lnpeer.Peer) { peerChan chan<- lnpeer.Peer) {
peerChan <- remotePeer peerChan <- remotePeer
@ -2983,7 +2989,7 @@ func TestSendChannelUpdateReliably(t *testing.T) {
// NotifyWhenOffline to instead give us access to the channel that will // NotifyWhenOffline to instead give us access to the channel that will
// receive the notification. // receive the notification.
notifyOnline := make(chan chan<- lnpeer.Peer, 1) notifyOnline := make(chan chan<- lnpeer.Peer, 1)
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(_ [33]byte,
peerChan chan<- lnpeer.Peer) { peerChan chan<- lnpeer.Peer) {
notifyOnline <- peerChan notifyOnline <- peerChan
@ -3360,13 +3366,13 @@ func TestPropagateChanPolicyUpdate(t *testing.T) {
// pubkey, and hand it our mock peer above. // pubkey, and hand it our mock peer above.
notifyErr := make(chan error, 1) notifyErr := make(chan error, 1)
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func( ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(
targetPub *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { targetPub [33]byte, peerChan chan<- lnpeer.Peer) {
if !targetPub.IsEqual(remoteKey) { if !bytes.Equal(targetPub[:], remoteKey.SerializeCompressed()) {
notifyErr <- fmt.Errorf("reliableSender attempted to send the "+ notifyErr <- fmt.Errorf("reliableSender attempted to send the "+
"message to the wrong peer: expected %x got %x", "message to the wrong peer: expected %x got %x",
remoteKey.SerializeCompressed(), remoteKey.SerializeCompressed(),
targetPub.SerializeCompressed()) targetPub)
} }
peerChan <- remotePeer peerChan <- remotePeer

@ -3,7 +3,6 @@ package discovery
import ( import (
"sync" "sync"
"github.com/btcsuite/btcd/btcec"
"github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
@ -16,9 +15,7 @@ type reliableSenderCfg struct {
// retry sending a peer message. // retry sending a peer message.
// //
// NOTE: The peerChan channel must be buffered. // NOTE: The peerChan channel must be buffered.
// NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
// TODO(wilmer): use [33]byte to avoid unnecessary serializations.
NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer)
// NotifyWhenOffline is a function that allows the gossiper to be // NotifyWhenOffline is a function that allows the gossiper to be
// notified when a certain peer disconnects, allowing it to request a // notified when a certain peer disconnects, allowing it to request a
@ -164,13 +161,12 @@ func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) {
// We'll start by requesting a notification for when the peer // We'll start by requesting a notification for when the peer
// reconnects. // reconnects.
pubKey, _ := btcec.ParsePubKey(peerPubKey[:], btcec.S256())
peerChan := make(chan lnpeer.Peer, 1) peerChan := make(chan lnpeer.Peer, 1)
waitUntilOnline: waitUntilOnline:
log.Debugf("Requesting online notification for peer=%x", peerPubKey) log.Debugf("Requesting online notification for peer=%x", peerPubKey)
s.cfg.NotifyWhenOnline(pubKey, peerChan) s.cfg.NotifyWhenOnline(peerPubKey, peerChan)
var peer lnpeer.Peer var peer lnpeer.Peer
out: out:

@ -18,9 +18,13 @@ func newTestReliableSender(t *testing.T) *reliableSender {
t.Helper() t.Helper()
cfg := &reliableSenderCfg{ cfg := &reliableSenderCfg{
NotifyWhenOnline: func(pubKey *btcec.PublicKey, NotifyWhenOnline: func(pubKey [33]byte,
peerChan chan<- lnpeer.Peer) { peerChan chan<- lnpeer.Peer) {
peerChan <- &mockPeer{pk: pubKey} pk, err := btcec.ParsePubKey(pubKey[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
peerChan <- &mockPeer{pk: pk}
}, },
NotifyWhenOffline: func(_ [33]byte) <-chan struct{} { NotifyWhenOffline: func(_ [33]byte) <-chan struct{} {
c := make(chan struct{}, 1) c := make(chan struct{}, 1)
@ -78,7 +82,7 @@ func TestReliableSenderFlow(t *testing.T) {
notifyOnline := make(chan chan<- lnpeer.Peer, 2) notifyOnline := make(chan chan<- lnpeer.Peer, 2)
notifyOffline := make(chan chan struct{}, 1) notifyOffline := make(chan chan struct{}, 1)
reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, reliableSender.cfg.NotifyWhenOnline = func(_ [33]byte,
peerChan chan<- lnpeer.Peer) { peerChan chan<- lnpeer.Peer) {
notifyOnline <- peerChan notifyOnline <- peerChan
} }
@ -194,7 +198,7 @@ func TestReliableSenderStaleMessages(t *testing.T) {
// Override NotifyWhenOnline to provide the notification channel so that // Override NotifyWhenOnline to provide the notification channel so that
// we can control when notifications get dispatched. // we can control when notifications get dispatched.
notifyOnline := make(chan chan<- lnpeer.Peer, 1) notifyOnline := make(chan chan<- lnpeer.Peer, 1)
reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, reliableSender.cfg.NotifyWhenOnline = func(_ [33]byte,
peerChan chan<- lnpeer.Peer) { peerChan chan<- lnpeer.Peer) {
notifyOnline <- peerChan notifyOnline <- peerChan
} }

@ -268,7 +268,7 @@ type fundingConfig struct {
// delivered after the funding transaction is confirmed. // delivered after the funding transaction is confirmed.
// //
// NOTE: The peerChan channel must be buffered. // NOTE: The peerChan channel must be buffered.
NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) NotifyWhenOnline func(peer [33]byte, peerChan chan<- lnpeer.Peer)
// FindChannel queries the database for the channel with the given // FindChannel queries the database for the channel with the given
// channel ID. // channel ID.
@ -563,7 +563,11 @@ func (f *fundingManager) Start() error {
// we'll attempt to retrieve the remote peer // we'll attempt to retrieve the remote peer
// to complete the rest of the funding flow. // to complete the rest of the funding flow.
peerChan := make(chan lnpeer.Peer, 1) peerChan := make(chan lnpeer.Peer, 1)
f.cfg.NotifyWhenOnline(ch.IdentityPub, peerChan)
var peerKey [33]byte
copy(peerKey[:], ch.IdentityPub.SerializeCompressed())
f.cfg.NotifyWhenOnline(peerKey, peerChan)
var peer lnpeer.Peer var peer lnpeer.Peer
select { select {
@ -635,7 +639,11 @@ func (f *fundingManager) Start() error {
defer f.wg.Done() defer f.wg.Done()
peerChan := make(chan lnpeer.Peer, 1) peerChan := make(chan lnpeer.Peer, 1)
f.cfg.NotifyWhenOnline(dbChan.IdentityPub, peerChan)
var peerKey [33]byte
copy(peerKey[:], dbChan.IdentityPub.SerializeCompressed())
f.cfg.NotifyWhenOnline(peerKey, peerChan)
var peer lnpeer.Peer var peer lnpeer.Peer
select { select {
@ -2040,7 +2048,9 @@ func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer,
shortChanID *lnwire.ShortChannelID) error { shortChanID *lnwire.ShortChannelID) error {
chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint) chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)
peerKey := completeChan.IdentityPub
var peerKey [33]byte
copy(peerKey[:], completeChan.IdentityPub.SerializeCompressed())
// Next, we'll send over the funding locked message which marks that we // Next, we'll send over the funding locked message which marks that we
// consider the channel open by presenting the remote party with our // consider the channel open by presenting the remote party with our
@ -2065,7 +2075,7 @@ func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer,
// down. // down.
for { for {
fndgLog.Debugf("Sending FundingLocked for ChannelID(%v) to "+ fndgLog.Debugf("Sending FundingLocked for ChannelID(%v) to "+
"peer %x", chanID, peerKey.SerializeCompressed()) "peer %x", chanID, peerKey)
if err := peer.SendMessage(false, fundingLockedMsg); err == nil { if err := peer.SendMessage(false, fundingLockedMsg); err == nil {
// Sending succeeded, we can break out and continue the // Sending succeeded, we can break out and continue the
@ -2074,17 +2084,16 @@ func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer,
} }
fndgLog.Warnf("Unable to send fundingLocked to peer %x: %v. "+ fndgLog.Warnf("Unable to send fundingLocked to peer %x: %v. "+
"Will retry when online", peerKey.SerializeCompressed(), "Will retry when online", peerKey, err)
err)
connected := make(chan lnpeer.Peer, 1) connected := make(chan lnpeer.Peer, 1)
f.cfg.NotifyWhenOnline(completeChan.IdentityPub, connected) f.cfg.NotifyWhenOnline(peerKey, connected)
select { select {
case <-connected: case <-connected:
fndgLog.Infof("Peer(%x) came back online, will retry "+ fndgLog.Infof("Peer(%x) came back online, will retry "+
"sending FundingLocked for ChannelID(%v)", "sending FundingLocked for ChannelID(%v)",
peerKey.SerializeCompressed(), chanID) peerKey, chanID)
// Retry sending. // Retry sending.
case <-f.quit: case <-f.quit:
@ -2213,7 +2222,11 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
shortChanID.ToUint64()) shortChanID.ToUint64())
peerChan := make(chan lnpeer.Peer, 1) peerChan := make(chan lnpeer.Peer, 1)
f.cfg.NotifyWhenOnline(completeChan.IdentityPub, peerChan)
var peerKey [33]byte
copy(peerKey[:], completeChan.IdentityPub.SerializeCompressed())
f.cfg.NotifyWhenOnline(peerKey, peerChan)
var peer lnpeer.Peer var peer lnpeer.Peer
select { select {

@ -3,6 +3,7 @@
package lnd package lnd
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -384,7 +385,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
addr: addr, addr: addr,
} }
f.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, f.cfg.NotifyWhenOnline = func(peer [33]byte,
connectedChan chan<- lnpeer.Peer) { connectedChan chan<- lnpeer.Peer) {
connectedChan <- testNode connectedChan <- testNode
@ -431,7 +432,7 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) {
return lnwire.NodeAnnouncement{}, nil return lnwire.NodeAnnouncement{}, nil
}, },
NotifyWhenOnline: func(peer *btcec.PublicKey, NotifyWhenOnline: func(peer [33]byte,
connectedChan chan<- lnpeer.Peer) { connectedChan chan<- lnpeer.Peer) {
connectedChan <- alice.remotePeer connectedChan <- alice.remotePeer
@ -1126,7 +1127,7 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
bob.sendMessage = func(msg lnwire.Message) error { bob.sendMessage = func(msg lnwire.Message) error {
return fmt.Errorf("intentional error in SendToPeer") return fmt.Errorf("intentional error in SendToPeer")
} }
alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, alice.fundingMgr.cfg.NotifyWhenOnline = func(peer [33]byte,
con chan<- lnpeer.Peer) { con chan<- lnpeer.Peer) {
// Intentionally empty. // Intentionally empty.
} }
@ -1261,9 +1262,9 @@ func TestFundingManagerOfflinePeer(t *testing.T) {
bob.sendMessage = func(msg lnwire.Message) error { bob.sendMessage = func(msg lnwire.Message) error {
return fmt.Errorf("intentional error in SendToPeer") return fmt.Errorf("intentional error in SendToPeer")
} }
peerChan := make(chan *btcec.PublicKey, 1) peerChan := make(chan [33]byte, 1)
conChan := make(chan chan<- lnpeer.Peer, 1) conChan := make(chan chan<- lnpeer.Peer, 1)
alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, alice.fundingMgr.cfg.NotifyWhenOnline = func(peer [33]byte,
connected chan<- lnpeer.Peer) { connected chan<- lnpeer.Peer) {
peerChan <- peer peerChan <- peer
@ -1303,7 +1304,7 @@ func TestFundingManagerOfflinePeer(t *testing.T) {
// Alice should be waiting for the server to notify when Bob comes back // Alice should be waiting for the server to notify when Bob comes back
// online. // online.
var peer *btcec.PublicKey var peer [33]byte
var con chan<- lnpeer.Peer var con chan<- lnpeer.Peer
select { select {
case peer = <-peerChan: case peer = <-peerChan:
@ -1319,7 +1320,7 @@ func TestFundingManagerOfflinePeer(t *testing.T) {
t.Fatalf("alice did not register connectedChan with server") t.Fatalf("alice did not register connectedChan with server")
} }
if !peer.IsEqual(bobPubKey) { if !bytes.Equal(peer[:], bobPubKey.SerializeCompressed()) {
t.Fatalf("expected to receive Bob's pubkey (%v), instead got %v", t.Fatalf("expected to receive Bob's pubkey (%v), instead got %v",
bobPubKey, peer) bobPubKey, peer)
} }

@ -2120,21 +2120,20 @@ func (s *server) BroadcastMessage(skips map[route.Vertex]struct{},
// particular peer comes online. The peer itself is sent across the peerChan. // particular peer comes online. The peer itself is sent across the peerChan.
// //
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) NotifyWhenOnline(peerKey *btcec.PublicKey, func (s *server) NotifyWhenOnline(peerKey [33]byte,
peerChan chan<- lnpeer.Peer) { peerChan chan<- lnpeer.Peer) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
// Compute the target peer's identifier. // Compute the target peer's identifier.
pubStr := string(peerKey.SerializeCompressed()) pubStr := string(peerKey[:])
// Check if peer is connected. // Check if peer is connected.
peer, ok := s.peersByPub[pubStr] peer, ok := s.peersByPub[pubStr]
if ok { if ok {
// Connected, can return early. // Connected, can return early.
srvrLog.Debugf("Notifying that peer %x is online", srvrLog.Debugf("Notifying that peer %x is online", peerKey)
peerKey.SerializeCompressed())
select { select {
case peerChan <- peer: case peerChan <- peer: