From 9bd3055fb8538d7ba5921d95ccf25b549568e051 Mon Sep 17 00:00:00 2001 From: Federico Bond Date: Thu, 30 May 2019 18:37:30 -0300 Subject: [PATCH] discovery,fundingmanager: avoid serialization in NotifyWhenOnline --- discovery/gossiper.go | 4 +--- discovery/gossiper_test.go | 32 ++++++++++++++++++------------ discovery/reliable_sender.go | 8 ++------ discovery/reliable_sender_test.go | 12 +++++++---- fundingmanager.go | 33 +++++++++++++++++++++---------- fundingmanager_test.go | 15 +++++++------- server.go | 7 +++---- 7 files changed, 64 insertions(+), 47 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index cd2d5573..288cf1a3 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -140,9 +140,7 @@ type Config struct { // retry sending a peer message. // // NOTE: The peerChan channel must be buffered. - // - // TODO(wilmer): use [33]byte to avoid unnecessary serializations. - NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) + NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer) // NotifyWhenOffline is a function that allows the gossiper to be // notified when a certain peer disconnects, allowing it to request a diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index c8bf9b06..a727870e 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -734,9 +734,11 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { return nil }, - NotifyWhenOnline: func(target *btcec.PublicKey, + NotifyWhenOnline: func(target [33]byte, peerChan chan<- lnpeer.Peer) { - peerChan <- &mockPeer{target, nil, nil} + + pk, _ := btcec.ParsePubKey(target[:], btcec.S256()) + peerChan <- &mockPeer{pk, nil, nil} }, NotifyWhenOffline: func(_ [33]byte) <-chan struct{} { c := make(chan struct{}) @@ -981,11 +983,13 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // Set up a channel that we can use to inspect the messages sent // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target [33]byte, peerChan chan<- lnpeer.Peer) { + pk, _ := btcec.ParsePubKey(target[:], btcec.S256()) + select { - case peerChan <- &mockPeer{target, sentMsgs, ctx.gossiper.quit}: + case peerChan <- &mockPeer{pk, sentMsgs, ctx.gossiper.quit}: case <-ctx.gossiper.quit: } } @@ -1178,11 +1182,13 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // Set up a channel that we can use to inspect the messages sent // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target [33]byte, peerChan chan<- lnpeer.Peer) { + pk, _ := btcec.ParsePubKey(target[:], btcec.S256()) + select { - case peerChan <- &mockPeer{target, sentMsgs, ctx.gossiper.quit}: + case peerChan <- &mockPeer{pk, sentMsgs, ctx.gossiper.quit}: case <-ctx.gossiper.quit: } } @@ -1403,7 +1409,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // channel through which it gets sent to control exactly when to // dispatch it. notifyPeers := make(chan chan<- lnpeer.Peer, 1) - ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte, connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan } @@ -1609,7 +1615,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { // Override NotifyWhenOnline to return the remote peer which we expect // meesages to be sent to. - ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte, peerChan chan<- lnpeer.Peer) { peerChan <- remotePeer @@ -2442,7 +2448,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // Override NotifyWhenOnline to return the remote peer which we expect // meesages to be sent to. - ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte, peerChan chan<- lnpeer.Peer) { peerChan <- remotePeer @@ -2983,7 +2989,7 @@ func TestSendChannelUpdateReliably(t *testing.T) { // NotifyWhenOffline to instead give us access to the channel that will // receive the notification. notifyOnline := make(chan chan<- lnpeer.Peer, 1) - ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(_ [33]byte, peerChan chan<- lnpeer.Peer) { notifyOnline <- peerChan @@ -3360,13 +3366,13 @@ func TestPropagateChanPolicyUpdate(t *testing.T) { // pubkey, and hand it our mock peer above. notifyErr := make(chan error, 1) ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func( - targetPub *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { + targetPub [33]byte, peerChan chan<- lnpeer.Peer) { - if !targetPub.IsEqual(remoteKey) { + if !bytes.Equal(targetPub[:], remoteKey.SerializeCompressed()) { notifyErr <- fmt.Errorf("reliableSender attempted to send the "+ "message to the wrong peer: expected %x got %x", remoteKey.SerializeCompressed(), - targetPub.SerializeCompressed()) + targetPub) } peerChan <- remotePeer diff --git a/discovery/reliable_sender.go b/discovery/reliable_sender.go index 6b8dc6d0..882eeebe 100644 --- a/discovery/reliable_sender.go +++ b/discovery/reliable_sender.go @@ -3,7 +3,6 @@ package discovery import ( "sync" - "github.com/btcsuite/btcd/btcec" "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwire" ) @@ -16,9 +15,7 @@ type reliableSenderCfg struct { // retry sending a peer message. // // NOTE: The peerChan channel must be buffered. - // - // TODO(wilmer): use [33]byte to avoid unnecessary serializations. - NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) + NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer) // NotifyWhenOffline is a function that allows the gossiper to be // notified when a certain peer disconnects, allowing it to request a @@ -164,13 +161,12 @@ func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) { // We'll start by requesting a notification for when the peer // reconnects. - pubKey, _ := btcec.ParsePubKey(peerPubKey[:], btcec.S256()) peerChan := make(chan lnpeer.Peer, 1) waitUntilOnline: log.Debugf("Requesting online notification for peer=%x", peerPubKey) - s.cfg.NotifyWhenOnline(pubKey, peerChan) + s.cfg.NotifyWhenOnline(peerPubKey, peerChan) var peer lnpeer.Peer out: diff --git a/discovery/reliable_sender_test.go b/discovery/reliable_sender_test.go index 5890dfa3..1e503a0b 100644 --- a/discovery/reliable_sender_test.go +++ b/discovery/reliable_sender_test.go @@ -18,9 +18,13 @@ func newTestReliableSender(t *testing.T) *reliableSender { t.Helper() cfg := &reliableSenderCfg{ - NotifyWhenOnline: func(pubKey *btcec.PublicKey, + NotifyWhenOnline: func(pubKey [33]byte, peerChan chan<- lnpeer.Peer) { - peerChan <- &mockPeer{pk: pubKey} + pk, err := btcec.ParsePubKey(pubKey[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + peerChan <- &mockPeer{pk: pk} }, NotifyWhenOffline: func(_ [33]byte) <-chan struct{} { c := make(chan struct{}, 1) @@ -78,7 +82,7 @@ func TestReliableSenderFlow(t *testing.T) { notifyOnline := make(chan chan<- lnpeer.Peer, 2) notifyOffline := make(chan chan struct{}, 1) - reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, + reliableSender.cfg.NotifyWhenOnline = func(_ [33]byte, peerChan chan<- lnpeer.Peer) { notifyOnline <- peerChan } @@ -194,7 +198,7 @@ func TestReliableSenderStaleMessages(t *testing.T) { // Override NotifyWhenOnline to provide the notification channel so that // we can control when notifications get dispatched. notifyOnline := make(chan chan<- lnpeer.Peer, 1) - reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, + reliableSender.cfg.NotifyWhenOnline = func(_ [33]byte, peerChan chan<- lnpeer.Peer) { notifyOnline <- peerChan } diff --git a/fundingmanager.go b/fundingmanager.go index 2216e1ff..22a37a5a 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -268,7 +268,7 @@ type fundingConfig struct { // delivered after the funding transaction is confirmed. // // NOTE: The peerChan channel must be buffered. - NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) + NotifyWhenOnline func(peer [33]byte, peerChan chan<- lnpeer.Peer) // FindChannel queries the database for the channel with the given // channel ID. @@ -563,7 +563,11 @@ func (f *fundingManager) Start() error { // we'll attempt to retrieve the remote peer // to complete the rest of the funding flow. peerChan := make(chan lnpeer.Peer, 1) - f.cfg.NotifyWhenOnline(ch.IdentityPub, peerChan) + + var peerKey [33]byte + copy(peerKey[:], ch.IdentityPub.SerializeCompressed()) + + f.cfg.NotifyWhenOnline(peerKey, peerChan) var peer lnpeer.Peer select { @@ -635,7 +639,11 @@ func (f *fundingManager) Start() error { defer f.wg.Done() peerChan := make(chan lnpeer.Peer, 1) - f.cfg.NotifyWhenOnline(dbChan.IdentityPub, peerChan) + + var peerKey [33]byte + copy(peerKey[:], dbChan.IdentityPub.SerializeCompressed()) + + f.cfg.NotifyWhenOnline(peerKey, peerChan) var peer lnpeer.Peer select { @@ -2040,7 +2048,9 @@ func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer, shortChanID *lnwire.ShortChannelID) error { chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint) - peerKey := completeChan.IdentityPub + + var peerKey [33]byte + copy(peerKey[:], completeChan.IdentityPub.SerializeCompressed()) // Next, we'll send over the funding locked message which marks that we // consider the channel open by presenting the remote party with our @@ -2065,7 +2075,7 @@ func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer, // down. for { fndgLog.Debugf("Sending FundingLocked for ChannelID(%v) to "+ - "peer %x", chanID, peerKey.SerializeCompressed()) + "peer %x", chanID, peerKey) if err := peer.SendMessage(false, fundingLockedMsg); err == nil { // Sending succeeded, we can break out and continue the @@ -2074,17 +2084,16 @@ func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer, } fndgLog.Warnf("Unable to send fundingLocked to peer %x: %v. "+ - "Will retry when online", peerKey.SerializeCompressed(), - err) + "Will retry when online", peerKey, err) connected := make(chan lnpeer.Peer, 1) - f.cfg.NotifyWhenOnline(completeChan.IdentityPub, connected) + f.cfg.NotifyWhenOnline(peerKey, connected) select { case <-connected: fndgLog.Infof("Peer(%x) came back online, will retry "+ "sending FundingLocked for ChannelID(%v)", - peerKey.SerializeCompressed(), chanID) + peerKey, chanID) // Retry sending. case <-f.quit: @@ -2213,7 +2222,11 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel, shortChanID.ToUint64()) peerChan := make(chan lnpeer.Peer, 1) - f.cfg.NotifyWhenOnline(completeChan.IdentityPub, peerChan) + + var peerKey [33]byte + copy(peerKey[:], completeChan.IdentityPub.SerializeCompressed()) + + f.cfg.NotifyWhenOnline(peerKey, peerChan) var peer lnpeer.Peer select { diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 30f4f96d..891f5eab 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -3,6 +3,7 @@ package lnd import ( + "bytes" "errors" "fmt" "io/ioutil" @@ -384,7 +385,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, addr: addr, } - f.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + f.cfg.NotifyWhenOnline = func(peer [33]byte, connectedChan chan<- lnpeer.Peer) { connectedChan <- testNode @@ -431,7 +432,7 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) { CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { return lnwire.NodeAnnouncement{}, nil }, - NotifyWhenOnline: func(peer *btcec.PublicKey, + NotifyWhenOnline: func(peer [33]byte, connectedChan chan<- lnpeer.Peer) { connectedChan <- alice.remotePeer @@ -1126,7 +1127,7 @@ func TestFundingManagerRestartBehavior(t *testing.T) { bob.sendMessage = func(msg lnwire.Message) error { return fmt.Errorf("intentional error in SendToPeer") } - alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + alice.fundingMgr.cfg.NotifyWhenOnline = func(peer [33]byte, con chan<- lnpeer.Peer) { // Intentionally empty. } @@ -1261,9 +1262,9 @@ func TestFundingManagerOfflinePeer(t *testing.T) { bob.sendMessage = func(msg lnwire.Message) error { return fmt.Errorf("intentional error in SendToPeer") } - peerChan := make(chan *btcec.PublicKey, 1) + peerChan := make(chan [33]byte, 1) conChan := make(chan chan<- lnpeer.Peer, 1) - alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + alice.fundingMgr.cfg.NotifyWhenOnline = func(peer [33]byte, connected chan<- lnpeer.Peer) { peerChan <- peer @@ -1303,7 +1304,7 @@ func TestFundingManagerOfflinePeer(t *testing.T) { // Alice should be waiting for the server to notify when Bob comes back // online. - var peer *btcec.PublicKey + var peer [33]byte var con chan<- lnpeer.Peer select { case peer = <-peerChan: @@ -1319,7 +1320,7 @@ func TestFundingManagerOfflinePeer(t *testing.T) { t.Fatalf("alice did not register connectedChan with server") } - if !peer.IsEqual(bobPubKey) { + if !bytes.Equal(peer[:], bobPubKey.SerializeCompressed()) { t.Fatalf("expected to receive Bob's pubkey (%v), instead got %v", bobPubKey, peer) } diff --git a/server.go b/server.go index 8c5fafe0..bab0acc8 100644 --- a/server.go +++ b/server.go @@ -2120,21 +2120,20 @@ func (s *server) BroadcastMessage(skips map[route.Vertex]struct{}, // particular peer comes online. The peer itself is sent across the peerChan. // // NOTE: This function is safe for concurrent access. -func (s *server) NotifyWhenOnline(peerKey *btcec.PublicKey, +func (s *server) NotifyWhenOnline(peerKey [33]byte, peerChan chan<- lnpeer.Peer) { s.mu.Lock() defer s.mu.Unlock() // Compute the target peer's identifier. - pubStr := string(peerKey.SerializeCompressed()) + pubStr := string(peerKey[:]) // Check if peer is connected. peer, ok := s.peersByPub[pubStr] if ok { // Connected, can return early. - srvrLog.Debugf("Notifying that peer %x is online", - peerKey.SerializeCompressed()) + srvrLog.Debugf("Notifying that peer %x is online", peerKey) select { case peerChan <- peer: