From 8c92df0f477fb0582c4e9eeec47e570d3b771d32 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 5 Jul 2018 18:33:28 -0700 Subject: [PATCH 01/11] fundingmanager: return ErrFundingManagerShuttingDown on shutdown --- fundingmanager.go | 47 +++++++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 636ace73..2d16ee64 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -12,6 +12,10 @@ import ( "golang.org/x/crypto/salsa20" + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" @@ -23,10 +27,6 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" - "github.com/btcsuite/btcd/btcec" - "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btcutil" ) const ( @@ -85,6 +85,12 @@ var ( // // TODO(roasbeef): add command line param to modify maxFundingAmount = maxBtcFundingAmount + + // ErrFundingManagerShuttingDown is an error returned when attempting to + // process a funding request/message but the funding manager has already + // been signaled to shut down. + ErrFundingManagerShuttingDown = errors.New("funding manager shutting " + + "down") ) // reservationWithCtx encapsulates a pending channel reservation. This wrapper @@ -740,7 +746,7 @@ func (f *fundingManager) PendingChannels() ([]*pendingChannel, error) { select { case f.queries <- req: case <-f.quit: - return nil, fmt.Errorf("fundingmanager shutting down") + return nil, ErrFundingManagerShuttingDown } select { @@ -749,7 +755,7 @@ func (f *fundingManager) PendingChannels() ([]*pendingChannel, error) { case err := <-errChan: return nil, err case <-f.quit: - return nil, fmt.Errorf("fundingmanager shutting down") + return nil, ErrFundingManagerShuttingDown } } @@ -1959,7 +1965,7 @@ func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel, // Retry sending. case <-f.quit: - return fmt.Errorf("shutting down unable to send") + return ErrFundingManagerShuttingDown } } @@ -2069,24 +2075,28 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel, confNtfn, err := f.cfg.Notifier.RegisterConfirmationsNtfn(&txid, numConfs, completeChan.FundingBroadcastHeight) if err != nil { - return fmt.Errorf("Unable to register for confirmation of "+ - "ChannelPoint(%v): %v", completeChan.FundingOutpoint, err) + return fmt.Errorf("Unable to register for "+ + "confirmation of ChannelPoint(%v): %v", + completeChan.FundingOutpoint, err) } - // Wait until 6 confirmations has been reached or the wallet signals - // a shutdown. + // Wait until 6 confirmations has been reached or the wallet + // signals a shutdown. select { case _, ok := <-confNtfn.Confirmed: if !ok { - return fmt.Errorf("ChainNotifier shutting down, cannot "+ - "complete funding flow for ChannelPoint(%v)", + return fmt.Errorf("ChainNotifier shutting "+ + "down, cannot complete funding flow "+ + "for ChannelPoint(%v)", completeChan.FundingOutpoint) } // Fallthrough. case <-f.quit: - return fmt.Errorf("fundingManager shutting down, stopping funding "+ - "flow for ChannelPoint(%v)", completeChan.FundingOutpoint) + return fmt.Errorf("%v, stopping funding flow for "+ + "ChannelPoint(%v)", + ErrFundingManagerShuttingDown, + completeChan.FundingOutpoint) } fundingPoint := completeChan.FundingOutpoint @@ -2095,9 +2105,10 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel, fndgLog.Infof("Announcing ChannelPoint(%v), short_chan_id=%v", &fundingPoint, spew.Sdump(shortChanID)) - // We'll obtain their min HTLC as we'll use this value within our - // ChannelUpdate. We use this value isn't of ours, as the remote party - // will be the one that's carrying the HTLC towards us. + // We'll obtain their min HTLC as we'll use this value within + // our ChannelUpdate. We use this value isn't of ours, as the + // remote party will be the one that's carrying the HTLC towards + // us. remoteMinHTLC := completeChan.RemoteChanCfg.MinHTLC // Create and broadcast the proofs required to make this channel From 77d2853d76018de76eb7c008c9bdf8e029e580f5 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 5 Jul 2018 13:26:55 -0700 Subject: [PATCH 02/11] lnpeer: extend interface Address and AddNewChannel methods --- lnpeer/peer.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lnpeer/peer.go b/lnpeer/peer.go index 5664bab9..99efe845 100644 --- a/lnpeer/peer.go +++ b/lnpeer/peer.go @@ -1,8 +1,11 @@ package lnpeer import ( + "net" + "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" ) @@ -14,6 +17,10 @@ type Peer interface { // has been sent to the remote peer. SendMessage(sync bool, msg ...lnwire.Message) error + // AddNewChannel adds a new channel to the peer. The channel should fail + // to be added if the cancel channel is closed. + AddNewChannel(channel *lnwallet.LightningChannel, cancel <-chan struct{}) error + // WipeChannel removes the channel uniquely identified by its channel // point from all indexes associated with the peer. WipeChannel(*wire.OutPoint) error @@ -23,4 +30,7 @@ type Peer interface { // IdentityKey returns the public key of the remote peer. IdentityKey() *btcec.PublicKey + + // Address returns the network address of the remote peer. + Address() net.Addr } From 3ab17063ff9afed478820755e97d4bb1e16c7abc Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 5 Jul 2018 13:27:35 -0700 Subject: [PATCH 03/11] multi: satisfy new lnpeer interface --- discovery/gossiper_test.go | 5 +++++ htlcswitch/link_test.go | 9 +++++++-- htlcswitch/mock.go | 11 +++++++++++ peer.go | 39 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 62 insertions(+), 2 deletions(-) diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index cd10ca90..0dd3c604 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -23,6 +23,7 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" "github.com/btcsuite/btcd/btcec" @@ -2159,6 +2160,9 @@ func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { return nil } +func (p *mockPeer) AddNewChannel(_ *lnwallet.LightningChannel, _ <-chan struct{}) error { + return nil +} func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil } func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk } func (p *mockPeer) PubKey() [33]byte { @@ -2166,3 +2170,4 @@ func (p *mockPeer) PubKey() [33]byte { copy(pubkey[:], p.pk.SerializeCompressed()) return pubkey } +func (p *mockPeer) Address() net.Addr { return nil } diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 74366850..c7f1d3ae 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "math" + "net" "reflect" "runtime" "strings" @@ -1411,6 +1412,9 @@ func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error { } return nil } +func (m *mockPeer) AddNewChannel(_ *lnwallet.LightningChannel, _ <-chan struct{}) error { + return nil +} func (m *mockPeer) WipeChannel(*wire.OutPoint) error { return nil } @@ -1420,8 +1424,9 @@ func (m *mockPeer) PubKey() [33]byte { func (m *mockPeer) IdentityKey() *btcec.PublicKey { return nil } - -var _ lnpeer.Peer = (*mockPeer)(nil) +func (m *mockPeer) Address() net.Addr { + return nil +} func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( ChannelLink, *lnwallet.LightningChannel, chan time.Time, func() error, diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 487b1601..fb648ac2 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "io/ioutil" + "net" "sync" "sync/atomic" "testing" @@ -517,6 +518,16 @@ func (s *mockServer) IdentityKey() *btcec.PublicKey { return pubkey } +func (s *mockServer) Address() net.Addr { + return nil +} + +func (s *mockServer) AddNewChannel(channel *lnwallet.LightningChannel, + cancel <-chan struct{}) error { + + return nil +} + func (s *mockServer) WipeChannel(*wire.OutPoint) error { return nil } diff --git a/peer.go b/peer.go index 1d01e7bf..76dd055c 100644 --- a/peer.go +++ b/peer.go @@ -21,6 +21,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" @@ -185,6 +186,9 @@ type peer struct { wg sync.WaitGroup } +// A compile-time check to ensure that peer satisfies the lnpeer.Peer interface. +var _ lnpeer.Peer = (*peer)(nil) + // newPeer creates a new peer from an establish connection object, and a // pointer to the main server. func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, @@ -1999,6 +2003,41 @@ func (p *peer) IdentityKey() *btcec.PublicKey { return p.addr.IdentityKey } +// Address returns the network address of the remote peer. +func (p *peer) Address() net.Addr { + return p.addr.Address +} + +// AddNewChannel adds a new channel to the peer. The channel should fail to be +// added if the cancel channel is closed. +func (p *peer) AddNewChannel(channel *lnwallet.LightningChannel, + cancel <-chan struct{}) error { + + newChanDone := make(chan struct{}) + newChanMsg := &newChannelMsg{ + channel: channel, + done: newChanDone, + } + + select { + case p.newChannels <- newChanMsg: + case <-cancel: + return errors.New("canceled adding new channel") + case <-p.quit: + return ErrPeerExiting + } + + // We pause here to wait for the peer to recognize the new channel + // before we close the channel barrier corresponding to the channel. + select { + case <-newChanDone: + case <-p.quit: + return ErrPeerExiting + } + + return nil +} + // TODO(roasbeef): make all start/stop mutexes a CAS // fetchLastChanUpdate returns a function which is able to retrieve the last From 6b1982f50fa7949bc3261a332a808d29d33c8e40 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 5 Jul 2018 13:28:57 -0700 Subject: [PATCH 04/11] fundingmanager+lnd: modify WatchNewChannel callback to take in peer key --- fundingmanager.go | 10 ++++------ lnd.go | 4 ++-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 2d16ee64..940c7929 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -324,9 +324,9 @@ type fundingConfig struct { // WatchNewChannel is to be called once a new channel enters the final // funding stage: waiting for on-chain confirmation. This method sends // the channel to the ChainArbitrator so it can watch for any on-chain - // events related to the channel. We also provide the address of the + // events related to the channel. We also provide the public key of the // node we're establishing a channel with for reconnection purposes. - WatchNewChannel func(*channeldb.OpenChannel, *lnwire.NetAddress) error + WatchNewChannel func(*channeldb.OpenChannel, *btcec.PublicKey) error // ReportShortChanID allows the funding manager to report the newly // discovered short channel ID of a formerly pending channel to outside @@ -1401,8 +1401,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // Now that we've sent over our final signature for this channel, we'll // send it to the ChainArbitrator so it can watch for any on-chain // actions during this final confirmation stage. - peerAddr := resCtx.peerAddress - if err := f.cfg.WatchNewChannel(completeChan, peerAddr); err != nil { + if err := f.cfg.WatchNewChannel(completeChan, peerKey); err != nil { fndgLog.Errorf("Unable to send new ChannelPoint(%v) for "+ "arbitration: %v", fundingOut, err) } @@ -1553,8 +1552,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { // we'll send the to be active channel to the ChainArbitrator so it can // watch for any on-chin actions before the channel has fully // confirmed. - peerAddr := resCtx.peerAddress - if err := f.cfg.WatchNewChannel(completeChan, peerAddr); err != nil { + if err := f.cfg.WatchNewChannel(completeChan, peerKey); err != nil { fndgLog.Errorf("Unable to send new ChannelPoint(%v) for "+ "arbitration: %v", fundingPoint, err) } diff --git a/lnd.go b/lnd.go index 33dcc2a4..142e9f1e 100644 --- a/lnd.go +++ b/lnd.go @@ -450,12 +450,12 @@ func lndMain() error { return delay }, WatchNewChannel: func(channel *channeldb.OpenChannel, - addr *lnwire.NetAddress) error { + peerKey *btcec.PublicKey) error { // First, we'll mark this new peer as a persistent peer // for re-connection purposes. server.mu.Lock() - pubStr := string(addr.IdentityKey.SerializeCompressed()) + pubStr := string(peerKey.SerializeCompressed()) server.persistentPeers[pubStr] = struct{}{} server.mu.Unlock() From 6504a9cfa831db558f625fb11e92f0fd19372977 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 5 Jul 2018 13:38:11 -0700 Subject: [PATCH 05/11] fundingmanager+peer: modify fundingErrorMsg to use peer key --- fundingmanager.go | 17 ++++++++--------- peer.go | 7 ++++--- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 940c7929..ea0653c5 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -198,8 +198,8 @@ type fundingLockedMsg struct { // fundingErrorMsg couples an lnwire.Error message with the peer who sent the // message. This allows the funding manager to properly process the error. type fundingErrorMsg struct { - err *lnwire.Error - peerAddress *lnwire.NetAddress + err *lnwire.Error + peerKey *btcec.PublicKey } // pendingChannels is a map instantiated per-peer which tracks all active @@ -2690,10 +2690,10 @@ func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) { // processFundingError sends a message to the fundingManager allowing it to // process the occurred generic error. func (f *fundingManager) processFundingError(err *lnwire.Error, - peerAddress *lnwire.NetAddress) { + peerKey *btcec.PublicKey) { select { - case f.fundingMsgs <- &fundingErrorMsg{err, peerAddress}: + case f.fundingMsgs <- &fundingErrorMsg{err, peerKey}: case <-f.quit: return } @@ -2705,13 +2705,12 @@ func (f *fundingManager) processFundingError(err *lnwire.Error, func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { protocolErr := fmsg.err - peerKey := fmsg.peerAddress.IdentityKey chanID := fmsg.err.ChanID // First, we'll attempt to retrieve and cancel the funding workflow // that this error was tied to. If we're unable to do so, then we'll // exit early as this was an unwarranted error. - resCtx, err := f.cancelReservationCtx(peerKey, chanID) + resCtx, err := f.cancelReservationCtx(fmsg.peerKey, chanID) if err != nil { fndgLog.Warnf("Received error for non-existent funding "+ "flow: %v (%v)", err, spew.Sdump(protocolErr)) @@ -2722,7 +2721,7 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { // error back to the caller (if any), and cancel the workflow itself. lnErr := lnwire.ErrorCode(protocolErr.Data[0]) fndgLog.Errorf("Received funding error from %x: %v", - peerKey.SerializeCompressed(), string(protocolErr.Data), + fmsg.peerKey.SerializeCompressed(), string(protocolErr.Data), ) // If this isn't a simple error code, then we'll display the entire @@ -2858,9 +2857,9 @@ func (f *fundingManager) getReservationCtx(peerKey *btcec.PublicKey, // channel will receive a new, permanent channel ID, and will no longer be // considered pending. func (f *fundingManager) IsPendingChannel(pendingChanID [32]byte, - peerAddress *lnwire.NetAddress) bool { + peerKey *btcec.PublicKey) bool { - peerIDKey := newSerializedKey(peerAddress.IdentityKey) + peerIDKey := newSerializedKey(peerKey) f.resMtx.RLock() _, ok := f.activeReservations[peerIDKey][pendingChanID] f.resMtx.RUnlock() diff --git a/peer.go b/peer.go index 76dd055c..4d720fe8 100644 --- a/peer.go +++ b/peer.go @@ -935,8 +935,9 @@ out: } case *lnwire.Error: - switch { + key := p.addr.IdentityKey + switch { // In the case of an all-zero channel ID we want to // forward the error to all channels with this peer. case msg.ChanID == lnwire.ConnectionWideID: @@ -952,8 +953,8 @@ out: // If the channel ID for the error message corresponds // to a pending channel, then the funding manager will // handle the error. - case p.server.fundingMgr.IsPendingChannel(msg.ChanID, p.addr): - p.server.fundingMgr.processFundingError(msg, p.addr) + case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key): + p.server.fundingMgr.processFundingError(msg, key) // If not we hand the error to the channel link for // this channel. From 04c5eba1942e15044b10ebcd56b64ef852e5cb80 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 5 Jul 2018 18:03:04 -0700 Subject: [PATCH 06/11] server: modify NotifyWhenOnline to return the peer once connected --- server.go | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/server.go b/server.go index d8ed46ca..84cb4237 100644 --- a/server.go +++ b/server.go @@ -110,7 +110,7 @@ type server struct { inboundPeers map[string]*peer outboundPeers map[string]*peer - peerConnectedListeners map[string][]chan<- struct{} + peerConnectedListeners map[string][]chan<- lnpeer.Peer persistentPeers map[string]struct{} persistentPeersBackoff map[string]time.Duration @@ -264,7 +264,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, peersByPub: make(map[string]*peer), inboundPeers: make(map[string]*peer), outboundPeers: make(map[string]*peer), - peerConnectedListeners: make(map[string][]chan<- struct{}), + peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), globalFeatures: lnwire.NewFeatureVector(globalFeatures, lnwire.GlobalFeatures), @@ -1568,31 +1568,38 @@ func (s *server) SendToPeer(target *btcec.PublicKey, } // NotifyWhenOnline can be called by other subsystems to get notified when a -// particular peer comes online. +// particular peer comes online. The peer itself is sent across the peerChan. // // NOTE: This function is safe for concurrent access. -func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, - connectedChan chan<- struct{}) { +func (s *server) NotifyWhenOnline(peerKey *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + s.mu.Lock() defer s.mu.Unlock() // Compute the target peer's identifier. - pubStr := string(peer.SerializeCompressed()) + pubStr := string(peerKey.SerializeCompressed()) // Check if peer is connected. - _, ok := s.peersByPub[pubStr] + peer, ok := s.peersByPub[pubStr] if ok { // Connected, can return early. srvrLog.Debugf("Notifying that peer %x is online", - peer.SerializeCompressed()) - close(connectedChan) + peerKey.SerializeCompressed()) + + select { + case peerChan <- peer: + case <-s.quit: + } + return } // Not connected, store this listener such that it can be notified when // the peer comes online. s.peerConnectedListeners[pubStr] = append( - s.peerConnectedListeners[pubStr], connectedChan) + s.peerConnectedListeners[pubStr], peerChan, + ) } // sendPeerMessages enqueues a list of messages into the outgoingQueue of the @@ -2240,8 +2247,12 @@ func (s *server) addPeer(p *peer) { } // Check if there are listeners waiting for this peer to come online. - for _, con := range s.peerConnectedListeners[pubStr] { - close(con) + for _, peerChan := range s.peerConnectedListeners[pubStr] { + select { + case peerChan <- p: + case <-s.quit: + return + } } delete(s.peerConnectedListeners, pubStr) } From 6d4da721569119f65a530a265754ab277186905e Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 5 Jul 2018 18:19:35 -0700 Subject: [PATCH 07/11] discovery/gossiper: update to latest NotifyWhenOnline changes --- discovery/gossiper.go | 12 +++++++----- discovery/gossiper_test.go | 20 +++++++++++--------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 764617fb..a7b70ba5 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -107,7 +107,9 @@ type Config struct { // NotifyWhenOnline is a function that allows the gossiper to be // notified when a certain peer comes online, allowing it to // retry sending a peer message. - NotifyWhenOnline func(peer *btcec.PublicKey, connectedChan chan<- struct{}) + // + // NOTE: The peerChan channel must be buffered. + NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) // ProofMatureDelta the number of confirmations which is needed before // exchange the channel announcement proofs. @@ -2399,13 +2401,13 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably( "to peer(%x): %v. Will retry when online.", remotePeer.SerializeCompressed(), err) - connected := make(chan struct{}) - d.cfg.NotifyWhenOnline(remotePeer, connected) + peerChan := make(chan lnpeer.Peer, 1) + d.cfg.NotifyWhenOnline(remotePeer, peerChan) select { - case <-connected: + case <-peerChan: // Retry sending. - log.Infof("peer %x reconnected. Retry sending"+ + log.Infof("Peer %x reconnected. Retry sending"+ " AnnounceSignatures.", remotePeer.SerializeCompressed()) diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 0dd3c604..9ecf69f5 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -1187,9 +1187,9 @@ func TestSignatureAnnouncementRetry(t *testing.T) { // We expect the gossiper to register for a notification when the peer // comes back online, so keep track of the channel it wants to get // notified on. - notifyPeers := make(chan chan<- struct{}, 1) + notifyPeers := make(chan chan<- lnpeer.Peer, 1) ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, - connectedChan chan<- struct{}) { + connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan } @@ -1208,7 +1208,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) { // Since sending this local announcement proof to the remote will fail, // the gossiper should register for a notification when the remote is // online again. - var conChan chan<- struct{} + var conChan chan<- lnpeer.Peer select { case conChan = <-notifyPeers: case <-time.After(2 * time.Second): @@ -1372,9 +1372,9 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { msg ...lnwire.Message) error { return fmt.Errorf("intentional error in SendToPeer") } - notifyPeers := make(chan chan<- struct{}, 1) + notifyPeers := make(chan chan<- lnpeer.Peer, 1) ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, - connectedChan chan<- struct{}) { + connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan } @@ -1392,7 +1392,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // Since sending to the remote peer will fail, the gossiper should // register for a notification when it comes back online. - var conChan chan<- struct{} + var conChan chan<- lnpeer.Peer select { case conChan = <-notifyPeers: case <-time.After(2 * time.Second): @@ -1431,7 +1431,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { return fmt.Errorf("intentional error in SendToPeer") }, NotifyWhenOnline: func(peer *btcec.PublicKey, - connectedChan chan<- struct{}) { + connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan }, Router: ctx.gossiper.cfg.Router, @@ -1607,9 +1607,9 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { return nil } - notifyPeers := make(chan chan<- struct{}, 1) + notifyPeers := make(chan chan<- lnpeer.Peer, 1) ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, - connectedChan chan<- struct{}) { + connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan } @@ -2146,6 +2146,8 @@ type mockPeer struct { quit chan struct{} } +var _ lnpeer.Peer = (*mockPeer)(nil) + func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { if p.sentMsgs == nil && p.quit == nil { return nil From e669e900177eb594f019582ba2365a5815dc09e7 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 11 Jul 2018 17:37:54 -0700 Subject: [PATCH 08/11] fundingmanager: retrieve peer before resuming funding flow --- fundingmanager.go | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index ea0653c5..1aaf8c66 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -23,6 +23,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" @@ -553,16 +554,29 @@ func (f *fundingManager) Start() error { // resume wait on startup. case shortChanID, ok := <-confChan: if !ok { - fndgLog.Errorf("waiting for funding" + + fndgLog.Errorf("Waiting for funding" + "confirmation failed") return } - // Success, funding transaction was confirmed. - err := f.handleFundingConfirmation(ch, shortChanID) + // The funding transaction has confirmed, so + // we'll attempt to retrieve the remote peer + // to complete the rest of the funding flow. + peerChan := make(chan lnpeer.Peer, 1) + f.cfg.NotifyWhenOnline(ch.IdentityPub, peerChan) + + var peer lnpeer.Peer + select { + case peer = <-peerChan: + case <-f.quit: + return + } + err := f.handleFundingConfirmation( + peer, ch, shortChanID, + ) if err != nil { - fndgLog.Errorf("failed to handle funding"+ - "confirmation: %v", err) + fndgLog.Errorf("Failed to handle "+ + "funding confirmation: %v", err) return } } @@ -620,10 +634,21 @@ func (f *fundingManager) Start() error { go func(dbChan *channeldb.OpenChannel) { defer f.wg.Done() - err := f.handleFundingConfirmation(dbChan, shortChanID) + peerChan := make(chan lnpeer.Peer, 1) + f.cfg.NotifyWhenOnline(dbChan.IdentityPub, peerChan) + + var peer lnpeer.Peer + select { + case peer = <-peerChan: + case <-f.quit: + return + } + err := f.handleFundingConfirmation( + peer, dbChan, shortChanID, + ) if err != nil { - fndgLog.Errorf("failed to handle funding"+ - "confirmation: %v", err) + fndgLog.Errorf("Failed to handle "+ + "funding confirmation: %v", err) return } }(channel) From 9cedef92450445bc1656ae3e746e986eebef2b69 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 5 Jul 2018 13:41:51 -0700 Subject: [PATCH 09/11] fundingmanager: send messages to peers directly In this commit, we modify the existing message sending functionality within the fundingmanager. Due to each mesage send requiring to hold the server's lock to retrieve the peer, we might run into a case where the lock is held for a larger than usual amount of time and would therefore block on sending the message within the fundingmanager. We remedy this by taking a similar approach to some recent changes within the gossiper. We now keep track of each peer within the internal fundingmanager messages and send messages directly to them. --- fundingmanager.go | 257 ++++++++++++++++++++-------------------------- peer.go | 10 +- server.go | 2 +- 3 files changed, 117 insertions(+), 152 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 1aaf8c66..d09d2b12 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -105,7 +105,7 @@ var ( // * deadlines, etc. type reservationWithCtx struct { reservation *lnwallet.ChannelReservation - peerAddress *lnwire.NetAddress + peer lnpeer.Peer chanAmt btcutil.Amount @@ -152,7 +152,7 @@ func (r *reservationWithCtx) updateTimestamp() { // embedded within this message giving the funding manager full context w.r.t // the workflow. type initFundingMsg struct { - peerAddress *lnwire.NetAddress + peer lnpeer.Peer *openChanReq } @@ -160,40 +160,40 @@ type initFundingMsg struct { // the message. This allows the funding manager to queue a response directly to // the peer, progressing the funding workflow. type fundingOpenMsg struct { - msg *lnwire.OpenChannel - peerAddress *lnwire.NetAddress + msg *lnwire.OpenChannel + peer lnpeer.Peer } // fundingAcceptMsg couples an lnwire.AcceptChannel message with the peer who // sent the message. This allows the funding manager to queue a response // directly to the peer, progressing the funding workflow. type fundingAcceptMsg struct { - msg *lnwire.AcceptChannel - peerAddress *lnwire.NetAddress + msg *lnwire.AcceptChannel + peer lnpeer.Peer } // fundingCreatedMsg couples an lnwire.FundingCreated message with the peer who // sent the message. This allows the funding manager to queue a response // directly to the peer, progressing the funding workflow. type fundingCreatedMsg struct { - msg *lnwire.FundingCreated - peerAddress *lnwire.NetAddress + msg *lnwire.FundingCreated + peer lnpeer.Peer } // fundingSignedMsg couples an lnwire.FundingSigned message with the peer who // sent the message. This allows the funding manager to queue a response // directly to the peer, progressing the funding workflow. type fundingSignedMsg struct { - msg *lnwire.FundingSigned - peerAddress *lnwire.NetAddress + msg *lnwire.FundingSigned + peer lnpeer.Peer } // fundingLockedMsg couples an lnwire.FundingLocked message with the peer who // sent the message. This allows the funding manager to finalize the funding // process and announce the existence of the new channel. type fundingLockedMsg struct { - msg *lnwire.FundingLocked - peerAddress *lnwire.NetAddress + msg *lnwire.FundingLocked + peer lnpeer.Peer } // fundingErrorMsg couples an lnwire.Error message with the peer who sent the @@ -270,10 +270,12 @@ type fundingConfig struct { SendToPeer func(target *btcec.PublicKey, msgs ...lnwire.Message) error // NotifyWhenOnline allows the FundingManager to register with a - // subsystem that will notify it when the peer comes online. - // This is used when sending the fundingLocked message, since it MUST be + // subsystem that will notify it when the peer comes online. This is + // used when sending the fundingLocked message, since it MUST be // delivered after the funding transaction is confirmed. - NotifyWhenOnline func(peer *btcec.PublicKey, connectedChan chan<- struct{}) + // + // NOTE: The peerChan channel must be buffered. + NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) // FindPeer searches the list of peers connected to the node so that // the FundingManager can notify other daemon subsystems as necessary @@ -827,13 +829,13 @@ func (f *fundingManager) CancelPeerReservations(nodePub [33]byte) { // // TODO(roasbeef): if peer disconnects, and haven't yet broadcast funding // transaction, then all reservations should be cleared. -func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey, - tempChanID [32]byte, fundingErr error) { +func (f *fundingManager) failFundingFlow(peer lnpeer.Peer, tempChanID [32]byte, + fundingErr error) { fndgLog.Debugf("Failing funding flow for pendingID=%x: %v", tempChanID, fundingErr) - ctx, err := f.cancelReservationCtx(peer, tempChanID) + ctx, err := f.cancelReservationCtx(peer.IdentityKey(), tempChanID) if err != nil { fndgLog.Errorf("unable to cancel reservation: %v", err) } @@ -868,8 +870,8 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey, } fndgLog.Debugf("Sending funding error to peer (%x): %v", - peer.SerializeCompressed(), spew.Sdump(errMsg)) - if err := f.cfg.SendToPeer(peer, errMsg); err != nil { + peer.IdentityKey().SerializeCompressed(), spew.Sdump(errMsg)) + if err := peer.SendMessage(false, errMsg); err != nil { fndgLog.Errorf("unable to send error message to peer %v", err) } } @@ -950,10 +952,10 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) { // processFundingOpen sends a message to the fundingManager allowing it to // initiate the new funding workflow with the source peer. func (f *fundingManager) processFundingOpen(msg *lnwire.OpenChannel, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingOpenMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingOpenMsg{msg, peer}: case <-f.quit: return } @@ -969,7 +971,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // Check number of pending channels to be smaller than maximum allowed // number and send ErrorGeneric to remote peer if condition is // violated. - peerIDKey := newSerializedKey(fmsg.peerAddress.IdentityKey) + peerIDKey := newSerializedKey(fmsg.peer.IdentityKey()) msg := fmsg.msg amt := msg.FundingAmount @@ -980,8 +982,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { if len(f.activeReservations[peerIDKey]) >= cfg.MaxPendingChannels { f.resMtx.RUnlock() f.failFundingFlow( - fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID, - lnwire.ErrMaxPendingChannels) + fmsg.peer, fmsg.msg.PendingChannelID, + lnwire.ErrMaxPendingChannels, + ) return } f.resMtx.RUnlock() @@ -995,8 +998,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { fndgLog.Errorf("unable to query wallet: %v", err) } f.failFundingFlow( - fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID, - lnwire.ErrSynchronizingChain) + fmsg.peer, fmsg.msg.PendingChannelID, + lnwire.ErrSynchronizingChain, + ) return } @@ -1004,7 +1008,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // current soft-limit for channel size. if msg.FundingAmount > maxFundingAmount { f.failFundingFlow( - fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID, + fmsg.peer, fmsg.msg.PendingChannelID, lnwire.ErrChanTooLarge, ) return @@ -1014,7 +1018,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // a channel that's below our current min channel size. if amt < f.cfg.MinChanSize { f.failFundingFlow( - fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID, + fmsg.peer, fmsg.msg.PendingChannelID, lnwallet.ErrChanTooSmall(amt, btcutil.Amount(f.cfg.MinChanSize)), ) return @@ -1023,7 +1027,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { fndgLog.Infof("Recv'd fundingRequest(amt=%v, push=%v, delay=%v, "+ "pendingId=%x) from peer(%x)", amt, msg.PushAmount, msg.CsvDelay, msg.PendingChannelID, - fmsg.peerAddress.IdentityKey.SerializeCompressed()) + fmsg.peer.IdentityKey().SerializeCompressed()) // Attempt to initialize a reservation within the wallet. If the wallet // has insufficient resources to create the channel, then the @@ -1034,13 +1038,12 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { reservation, err := f.cfg.Wallet.InitChannelReservation( amt, 0, msg.PushAmount, lnwallet.SatPerKWeight(msg.FeePerKiloWeight), 0, - fmsg.peerAddress.IdentityKey, fmsg.peerAddress.Address, + fmsg.peer.IdentityKey(), fmsg.peer.Address(), &chainHash, msg.ChannelFlags, ) if err != nil { fndgLog.Errorf("Unable to initialize reservation: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } @@ -1060,9 +1063,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { ) if err != nil { fndgLog.Errorf("Unacceptable channel constraints: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - fmsg.msg.PendingChannelID, err, - ) + f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err) return } @@ -1090,7 +1091,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { remoteCsvDelay: remoteCsvDelay, remoteMinHtlc: minHtlc, err: make(chan error, 1), - peerAddress: fmsg.peerAddress, + peer: fmsg.peer, } f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx f.resMtx.Unlock() @@ -1132,8 +1133,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { err = reservation.ProcessSingleContribution(remoteContribution) if err != nil { fndgLog.Errorf("unable to add contribution reservation: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } @@ -1161,11 +1161,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { HtlcPoint: ourContribution.HtlcBasePoint.PubKey, FirstCommitmentPoint: ourContribution.FirstCommitmentPoint, } - err = f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, &fundingAccept) - if err != nil { + if err := fmsg.peer.SendMessage(false, &fundingAccept); err != nil { fndgLog.Errorf("unable to send funding response to peer: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } } @@ -1173,10 +1171,10 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // processFundingAccept sends a message to the fundingManager allowing it to // continue the second phase of a funding workflow with the target peer. func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingAcceptMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingAcceptMsg{msg, peer}: case <-f.quit: return } @@ -1188,7 +1186,7 @@ func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel, func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { msg := fmsg.msg pendingChanID := fmsg.msg.PendingChannelID - peerKey := fmsg.peerAddress.IdentityKey + peerKey := fmsg.peer.IdentityKey() resCtx, err := f.getReservationCtx(peerKey, pendingChanID) if err != nil { @@ -1212,8 +1210,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { ) if err != nil { fndgLog.Warnf("Unacceptable channel constraints: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - fmsg.msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err) return } @@ -1259,9 +1256,8 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { err = resCtx.reservation.ProcessContribution(remoteContribution) if err != nil { fndgLog.Errorf("Unable to process contribution from %v: %v", - fmsg.peerAddress.IdentityKey, err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + peerKey, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } @@ -1304,15 +1300,12 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { fundingCreated.CommitSig, err = lnwire.NewSigFromRawSignature(sig) if err != nil { fndgLog.Errorf("Unable to parse signature: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } - err = f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingCreated) - if err != nil { + if err := fmsg.peer.SendMessage(false, fundingCreated); err != nil { fndgLog.Errorf("Unable to send funding complete message: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } } @@ -1320,10 +1313,10 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { // processFundingCreated queues a funding complete message coupled with the // source peer to the fundingManager. func (f *fundingManager) processFundingCreated(msg *lnwire.FundingCreated, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingCreatedMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingCreatedMsg{msg, peer}: case <-f.quit: return } @@ -1334,7 +1327,7 @@ func (f *fundingManager) processFundingCreated(msg *lnwire.FundingCreated, // processed, a signature is sent to the remote peer allowing it to broadcast // the funding transaction, progressing the workflow into the final stage. func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { - peerKey := fmsg.peerAddress.IdentityKey + peerKey := fmsg.peer.IdentityKey() pendingChanID := fmsg.msg.PendingChannelID resCtx, err := f.getReservationCtx(peerKey, pendingChanID) @@ -1364,8 +1357,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { if err != nil { // TODO(roasbeef): better error logging: peerID, channelID, etc. fndgLog.Errorf("unable to complete single reservation: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) return } @@ -1405,8 +1397,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { ourCommitSig, err := lnwire.NewSigFromRawSignature(sig) if err != nil { fndgLog.Errorf("unable to parse signature: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) deleteFromDatabase() return } @@ -1415,10 +1406,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { ChanID: channelID, CommitSig: ourCommitSig, } - if err := f.cfg.SendToPeer(peerKey, fundingSigned); err != nil { + if err := fmsg.peer.SendMessage(false, fundingSigned); err != nil { fndgLog.Errorf("unable to send FundingSigned message: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) deleteFromDatabase() return } @@ -1476,8 +1466,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { err := fmt.Errorf("timeout waiting for funding tx "+ "(%v) to confirm", completeChan.FundingOutpoint) fndgLog.Warnf(err.Error()) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) deleteFromDatabase() return case <-f.quit: @@ -1496,8 +1485,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // Success, funding transaction was confirmed. f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID) - err := f.handleFundingConfirmation(completeChan, - shortChanID) + err := f.handleFundingConfirmation( + fmsg.peer, completeChan, shortChanID, + ) if err != nil { fndgLog.Errorf("failed to handle funding"+ "confirmation: %v", err) @@ -1509,10 +1499,10 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // processFundingSigned sends a single funding sign complete message along with // the source peer to the funding manager. func (f *fundingManager) processFundingSigned(msg *lnwire.FundingSigned, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingSignedMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingSignedMsg{msg, peer}: case <-f.quit: return } @@ -1535,20 +1525,17 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { err := fmt.Errorf("Unable to find signed reservation for "+ "chan_id=%x", fmsg.msg.ChanID) fndgLog.Warnf(err.Error()) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - fmsg.msg.ChanID, err) + f.failFundingFlow(fmsg.peer, fmsg.msg.ChanID, err) return } - peerKey := fmsg.peerAddress.IdentityKey - resCtx, err := f.getReservationCtx(fmsg.peerAddress.IdentityKey, - pendingChanID) + peerKey := fmsg.peer.IdentityKey() + resCtx, err := f.getReservationCtx(peerKey, pendingChanID) if err != nil { fndgLog.Warnf("Unable to find reservation (peerID:%v, chanID:%x)", peerKey, pendingChanID[:]) // TODO: add ErrChanNotFound? - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) return } @@ -1568,8 +1555,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { completeChan, err := resCtx.reservation.CompleteReservation(nil, commitSig) if err != nil { fndgLog.Errorf("Unable to complete reservation sign complete: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) return } @@ -1583,7 +1569,8 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { } fndgLog.Infof("Finalizing pendingID(%x) over ChannelPoint(%v), "+ - "waiting for channel open on-chain", pendingChanID[:], fundingPoint) + "waiting for channel open on-chain", pendingChanID[:], + fundingPoint) // Send an update to the upstream client that the negotiation process // is over. @@ -1648,7 +1635,9 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { } defer lnChannel.Stop() - err = f.sendFundingLocked(completeChan, lnChannel, shortChanID) + err = f.sendFundingLocked( + fmsg.peer, completeChan, lnChannel, shortChanID, + ) if err != nil { fndgLog.Errorf("failed sending fundingLocked: %v", err) return @@ -1894,10 +1883,11 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open } // handleFundingConfirmation is a wrapper method for creating a new -// lnwallet.LightningChannel object, calling sendFundingLocked, addToRouterGraph, -// and annAfterSixConfs. This is called after the funding transaction is -// confirmed. -func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenChannel, +// lnwallet.LightningChannel object, calling sendFundingLocked, +// addToRouterGraph, and annAfterSixConfs. This is called after the funding +// transaction is confirmed. +func (f *fundingManager) handleFundingConfirmation(peer lnpeer.Peer, + completeChan *channeldb.OpenChannel, shortChanID *lnwire.ShortChannelID) error { // We create the state-machine object which wraps the database state. @@ -1913,7 +1903,7 @@ func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenC fndgLog.Debugf("ChannelID(%v) is now fully confirmed!", chanID) - err = f.sendFundingLocked(completeChan, lnChannel, shortChanID) + err = f.sendFundingLocked(peer, completeChan, lnChannel, shortChanID) if err != nil { return fmt.Errorf("failed sending fundingLocked: %v", err) } @@ -1933,11 +1923,12 @@ func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenC // sendFundingLocked creates and sends the fundingLocked message. // This should be called after the funding transaction has been confirmed, // and the channelState is 'markedOpen'. -func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel, - channel *lnwallet.LightningChannel, +func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer, + completeChan *channeldb.OpenChannel, channel *lnwallet.LightningChannel, shortChanID *lnwire.ShortChannelID) error { chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint) + peerKey := completeChan.IdentityPub // Next, we'll send over the funding locked message which marks that we // consider the channel open by presenting the remote party with our @@ -1962,29 +1953,26 @@ func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel, // down. for { fndgLog.Debugf("Sending FundingLocked for ChannelID(%v) to "+ - "peer %x", chanID, - completeChan.IdentityPub.SerializeCompressed()) + "peer %x", chanID, peerKey.SerializeCompressed()) - err = f.cfg.SendToPeer(completeChan.IdentityPub, - fundingLockedMsg) - if err == nil { - // Sending succeeded, we can break out and continue - // the funding flow. + if err := peer.SendMessage(false, fundingLockedMsg); err == nil { + // Sending succeeded, we can break out and continue the + // funding flow. break } - fndgLog.Warnf("unable to send fundingLocked to peer %x: "+ - "%v. Will retry when online", - completeChan.IdentityPub.SerializeCompressed(), err) + fndgLog.Warnf("Unable to send fundingLocked to peer %x: %v. "+ + "Will retry when online", peerKey.SerializeCompressed(), + err) - connected := make(chan struct{}) + connected := make(chan lnpeer.Peer, 1) f.cfg.NotifyWhenOnline(completeChan.IdentityPub, connected) + select { case <-connected: fndgLog.Infof("Peer(%x) came back online, will retry "+ "sending FundingLocked for ChannelID(%v)", - completeChan.IdentityPub.SerializeCompressed(), - chanID) + peerKey.SerializeCompressed(), chanID) // Retry sending. case <-f.quit: @@ -2165,10 +2153,10 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel, // processFundingLocked sends a message to the fundingManager allowing it to // finish the funding workflow. func (f *fundingManager) processFundingLocked(msg *lnwire.FundingLocked, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingLockedMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingLockedMsg{msg, peer}: case <-f.quit: return } @@ -2180,7 +2168,7 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { defer f.wg.Done() fndgLog.Debugf("Received FundingLocked for ChannelID(%v) from "+ "peer %x", fmsg.msg.ChanID, - fmsg.peerAddress.IdentityKey.SerializeCompressed()) + fmsg.peer.IdentityKey().SerializeCompressed()) // If we are currently in the process of handling a funding locked // message for this channel, ignore. @@ -2276,32 +2264,11 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { f.barrierMtx.Unlock() }() - // Finally, we'll find the peer that sent us this message so we can - // provide it with the fully initialized channel state. - peer, err := f.cfg.FindPeer(fmsg.peerAddress.IdentityKey) - if err != nil { - fndgLog.Errorf("Unable to find peer: %v", err) + if err := fmsg.peer.AddNewChannel(channel, f.quit); err != nil { + fndgLog.Errorf("Unable to add new channel %v with peer %x: %v", + fmsg.peer.IdentityKey().SerializeCompressed(), + *channel.ChanPoint, err) channel.Stop() - return - } - newChanDone := make(chan struct{}) - newChanMsg := &newChannelMsg{ - channel: channel, - done: newChanDone, - } - - select { - case peer.newChannels <- newChanMsg: - case <-f.quit: - return - } - - // We pause here to wait for the peer to recognize the new channel - // before we close the channel barrier corresponding to the channel. - select { - case <-f.quit: - return - case <-newChanDone: // Fallthrough if we're not quitting. } } @@ -2520,11 +2487,9 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe // initFundingWorkflow sends a message to the funding manager instructing it // to initiate a single funder workflow with the source peer. // TODO(roasbeef): re-visit blocking nature.. -func (f *fundingManager) initFundingWorkflow(peerAddress *lnwire.NetAddress, - req *openChanReq) { - +func (f *fundingManager) initFundingWorkflow(peer lnpeer.Peer, req *openChanReq) { f.fundingRequests <- &initFundingMsg{ - peerAddress: peerAddress, + peer: peer, openChanReq: req, } } @@ -2534,7 +2499,7 @@ func (f *fundingManager) initFundingWorkflow(peerAddress *lnwire.NetAddress, // funding workflow. func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { var ( - peerKey = msg.peerAddress.IdentityKey + peerKey = msg.peer.IdentityKey() localAmt = msg.localFundingAmt remoteAmt = msg.remoteFundingAmt capacity = localAmt + remoteAmt @@ -2553,8 +2518,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { fndgLog.Infof("Initiating fundingRequest(localAmt=%v, remoteAmt=%v, "+ "capacity=%v, chainhash=%v, addr=%v, dustLimit=%v)", localAmt, - msg.pushAmt, capacity, msg.chainHash, msg.peerAddress.Address, - ourDustLimit) + msg.pushAmt, capacity, msg.chainHash, peerKey, ourDustLimit) // First, we'll query the fee estimator for a fee that should get the // commitment transaction confirmed by the next few blocks (conf target @@ -2590,7 +2554,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { // request will fail, and be aborted. reservation, err := f.cfg.Wallet.InitChannelReservation( capacity, localAmt, msg.pushAmt, commitFeePerKw, - msg.fundingFeePerVSize, peerKey, msg.peerAddress.Address, + msg.fundingFeePerVSize, peerKey, msg.peer.Address(), &msg.chainHash, channelFlags, ) if err != nil { @@ -2631,7 +2595,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { remoteCsvDelay: remoteCsvDelay, remoteMinHtlc: minHtlc, reservation: reservation, - peerAddress: msg.peerAddress, + peer: msg.peer, updates: msg.updates, err: msg.err, } @@ -2653,7 +2617,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { maxHtlcs := f.cfg.RequiredRemoteMaxHTLCs(capacity) fndgLog.Infof("Starting funding workflow with %v for pendingID(%x)", - msg.peerAddress.Address, chanID) + msg.peer.Address(), chanID) fundingOpen := lnwire.OpenChannel{ ChainHash: *f.cfg.Wallet.Cfg.NetParams.GenesisHash, @@ -2675,7 +2639,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { FirstCommitmentPoint: ourContribution.FirstCommitmentPoint, ChannelFlags: channelFlags, } - if err := f.cfg.SendToPeer(peerKey, &fundingOpen); err != nil { + if err := msg.peer.SendMessage(false, &fundingOpen); err != nil { e := fmt.Errorf("Unable to send funding request message: %v", err) fndgLog.Errorf(e.Error()) @@ -2786,10 +2750,11 @@ func (f *fundingManager) pruneZombieReservations() { f.resMtx.RUnlock() for pendingChanID, resCtx := range zombieReservations { - err := fmt.Errorf("reservation timed out waiting for peer (peerID:%v, "+ - "chanID:%x)", resCtx.peerAddress.IdentityKey, pendingChanID[:]) + err := fmt.Errorf("reservation timed out waiting for peer "+ + "(peerID:%v, chanID:%x)", resCtx.peer.IdentityKey(), + pendingChanID[:]) fndgLog.Warnf(err.Error()) - f.failFundingFlow(resCtx.peerAddress.IdentityKey, pendingChanID, err) + f.failFundingFlow(resCtx.peer, pendingChanID, err) } } diff --git a/peer.go b/peer.go index 4d720fe8..e1113c24 100644 --- a/peer.go +++ b/peer.go @@ -911,15 +911,15 @@ out: p.queueMsg(lnwire.NewPong(pongBytes), nil) case *lnwire.OpenChannel: - p.server.fundingMgr.processFundingOpen(msg, p.addr) + p.server.fundingMgr.processFundingOpen(msg, p) case *lnwire.AcceptChannel: - p.server.fundingMgr.processFundingAccept(msg, p.addr) + p.server.fundingMgr.processFundingAccept(msg, p) case *lnwire.FundingCreated: - p.server.fundingMgr.processFundingCreated(msg, p.addr) + p.server.fundingMgr.processFundingCreated(msg, p) case *lnwire.FundingSigned: - p.server.fundingMgr.processFundingSigned(msg, p.addr) + p.server.fundingMgr.processFundingSigned(msg, p) case *lnwire.FundingLocked: - p.server.fundingMgr.processFundingLocked(msg, p.addr) + p.server.fundingMgr.processFundingLocked(msg, p) case *lnwire.Shutdown: select { diff --git a/server.go b/server.go index 84cb4237..cd979aa5 100644 --- a/server.go +++ b/server.go @@ -2520,7 +2520,7 @@ func (s *server) OpenChannel(nodeKey *btcec.PublicKey, // TODO(roasbeef): pass in chan that's closed if/when funding succeeds // so can track as persistent peer? - go s.fundingMgr.initFundingWorkflow(targetPeer.addr, req) + go s.fundingMgr.initFundingWorkflow(targetPeer, req) return updateChan, errChan } From 38e01b259f10ca75ca2cff633dbe31aea75c0226 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 5 Jul 2018 13:29:55 -0700 Subject: [PATCH 10/11] fundingmanager+lnd: remove no longer needed FindPeer and SendToPeer callbacks The FindPeer and SendToPeer callbacks are no longer needed within the fundingManager due to the previous commit allowing us to send messages to peers directly. --- fundingmanager.go | 10 ---------- lnd.go | 2 -- 2 files changed, 12 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index d09d2b12..7753204d 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -264,11 +264,6 @@ type fundingConfig struct { // to the greater network. SendAnnouncement func(msg lnwire.Message) error - // SendToPeer allows the FundingManager to send messages to the peer - // node during the multiple steps involved in the creation of the - // channel's funding transaction and initial commitment transaction. - SendToPeer func(target *btcec.PublicKey, msgs ...lnwire.Message) error - // NotifyWhenOnline allows the FundingManager to register with a // subsystem that will notify it when the peer comes online. This is // used when sending the fundingLocked message, since it MUST be @@ -277,11 +272,6 @@ type fundingConfig struct { // NOTE: The peerChan channel must be buffered. NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) - // FindPeer searches the list of peers connected to the node so that - // the FundingManager can notify other daemon subsystems as necessary - // during the funding process. - FindPeer func(peerKey *btcec.PublicKey) (*peer, error) - // FindChannel queries the database for the channel with the given // channel ID. FindChannel func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) diff --git a/lnd.go b/lnd.go index 142e9f1e..237e2807 100644 --- a/lnd.go +++ b/lnd.go @@ -360,9 +360,7 @@ func lndMain() error { idPrivKey.PubKey()) return <-errChan }, - SendToPeer: server.SendToPeer, NotifyWhenOnline: server.NotifyWhenOnline, - FindPeer: server.FindPeer, TempChanIDSeed: chanIDSeed, FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) { dbChannels, err := chanDB.FetchAllChannels() From d54d41eed784ed47d1851b2e92dad47e64da9da4 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 5 Jul 2018 19:32:22 -0700 Subject: [PATCH 11/11] fundingmanager: update tests to latest changes --- fundingmanager_test.go | 283 +++++++++++++++++++++++++---------------- 1 file changed, 175 insertions(+), 108 deletions(-) diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 6baea62c..163b64aa 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -3,6 +3,7 @@ package main import ( + "errors" "fmt" "io/ioutil" "math/big" @@ -12,22 +13,22 @@ import ( "testing" "time" + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog" + "github.com/btcsuite/btcutil" + _ "github.com/btcsuite/btcwallet/walletdb/bdb" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" - "github.com/btcsuite/btcd/chaincfg" - "github.com/btcsuite/btcd/chaincfg/chainhash" - _ "github.com/btcsuite/btcwallet/walletdb/bdb" - - "github.com/btcsuite/btcd/btcec" - "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btcutil" ) const ( @@ -134,14 +135,64 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, type testNode struct { privKey *btcec.PrivateKey + addr *lnwire.NetAddress msgChan chan lnwire.Message announceChan chan lnwire.Message publTxChan chan *wire.MsgTx fundingMgr *fundingManager - peer *peer + newChannels chan *newChannelMsg mockNotifier *mockNotifier testDir string shutdownChannel chan struct{} + + remotePeer *testNode + sendMessage func(lnwire.Message) error +} + +var _ lnpeer.Peer = (*testNode)(nil) + +func (n *testNode) IdentityKey() *btcec.PublicKey { + return n.addr.IdentityKey +} + +func (n *testNode) Address() net.Addr { + return n.addr.Address +} + +func (n *testNode) PubKey() [33]byte { + return newSerializedKey(n.addr.IdentityKey) +} + +func (n *testNode) SendMessage(_ bool, msg ...lnwire.Message) error { + return n.sendMessage(msg[0]) +} + +func (n *testNode) WipeChannel(_ *wire.OutPoint) error { + return nil +} + +func (n *testNode) AddNewChannel(channel *lnwallet.LightningChannel, + quit <-chan struct{}) error { + + done := make(chan struct{}) + msg := &newChannelMsg{ + channel: channel, + done: done, + } + + select { + case n.newChannels <- msg: + case <-quit: + return ErrFundingManagerShuttingDown + } + + select { + case <-done: + case <-quit: + return ErrFundingManagerShuttingDown + } + + return nil } func init() { @@ -180,7 +231,7 @@ func createTestWallet(cdb *channeldb.DB, netParams *chaincfg.Params, } func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, - tempTestDir string) (*testNode, error) { + addr *lnwire.NetAddress, tempTestDir string) (*testNode, error) { netParams := activeNetParams.Params estimator := lnwallet.StaticFeeEstimator{FeeRate: 250} @@ -191,11 +242,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, epochChan: make(chan *chainntnfs.BlockEpoch, 1), } - newChannelsChan := make(chan *newChannelMsg) - p := &peer{ - newChannels: newChannelsChan, - } - sentMessages := make(chan lnwire.Message) sentAnnouncements := make(chan lnwire.Message) publTxChan := make(chan *wire.MsgTx, 1) @@ -249,20 +295,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { return lnwire.NodeAnnouncement{}, nil }, - SendToPeer: func(target *btcec.PublicKey, msgs ...lnwire.Message) error { - select { - case sentMessages <- msgs[0]: - case <-shutdownChan: - return fmt.Errorf("shutting down") - } - return nil - }, - NotifyWhenOnline: func(peer *btcec.PublicKey, connectedChan chan<- struct{}) { - t.Fatalf("did not expect fundingManager to call NotifyWhenOnline") - }, - FindPeer: func(peerKey *btcec.PublicKey) (*peer, error) { - return p, nil - }, TempChanIDSeed: chanIDSeed, FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) { dbChannels, err := cdb.FetchAllChannels() @@ -311,7 +343,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, RequiredRemoteMaxHTLCs: func(chanAmt btcutil.Amount) uint16 { return uint16(lnwallet.MaxHTLCNumber / 2) }, - WatchNewChannel: func(*channeldb.OpenChannel, *lnwire.NetAddress) error { + WatchNewChannel: func(*channeldb.OpenChannel, *btcec.PublicKey) error { return nil }, ReportShortChanID: func(wire.OutPoint) error { @@ -323,22 +355,30 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, if err != nil { t.Fatalf("failed creating fundingManager: %v", err) } - if err = f.Start(); err != nil { t.Fatalf("failed starting fundingManager: %v", err) } - return &testNode{ + testNode := &testNode{ privKey: privKey, msgChan: sentMessages, + newChannels: make(chan *newChannelMsg), announceChan: sentAnnouncements, publTxChan: publTxChan, fundingMgr: f, - peer: p, mockNotifier: chainNotifier, testDir: tempTestDir, shutdownChannel: shutdownChan, - }, nil + addr: addr, + } + + f.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + connectedChan chan<- lnpeer.Peer) { + + connectedChan <- testNode + } + + return testNode, nil } func recreateAliceFundingManager(t *testing.T, alice *testNode) { @@ -375,19 +415,11 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) { CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { return lnwire.NodeAnnouncement{}, nil }, - SendToPeer: func(target *btcec.PublicKey, - msgs ...lnwire.Message) error { - select { - case aliceMsgChan <- msgs[0]: - case <-shutdownChan: - return fmt.Errorf("shutting down") - } - return nil + NotifyWhenOnline: func(peer *btcec.PublicKey, + connectedChan chan<- lnpeer.Peer) { + + connectedChan <- alice.remotePeer }, - NotifyWhenOnline: func(peer *btcec.PublicKey, connectedChan chan<- struct{}) { - t.Fatalf("did not expect fundingManager to call NotifyWhenOnline") - }, - FindPeer: oldCfg.FindPeer, TempChanIDSeed: oldCfg.TempChanIDSeed, FindChannel: oldCfg.FindChannel, PublishTransaction: func(txn *wire.MsgTx) error { @@ -424,7 +456,9 @@ func setupFundingManagers(t *testing.T) (*testNode, *testNode) { t.Fatalf("unable to create temp directory: %v", err) } - alice, err := createTestFundingManager(t, alicePrivKey, aliceTestDir) + alice, err := createTestFundingManager( + t, alicePrivKey, aliceAddr, aliceTestDir, + ) if err != nil { t.Fatalf("failed creating fundingManager: %v", err) } @@ -434,11 +468,37 @@ func setupFundingManagers(t *testing.T) (*testNode, *testNode) { t.Fatalf("unable to create temp directory: %v", err) } - bob, err := createTestFundingManager(t, bobPrivKey, bobTestDir) + bob, err := createTestFundingManager(t, bobPrivKey, bobAddr, bobTestDir) if err != nil { t.Fatalf("failed creating fundingManager: %v", err) } + // With the funding manager's created, we'll now attempt to mimic a + // connection pipe between them. In order to intercept the messages + // within it, we'll redirect all messages back to the msgChan of the + // sender. Since the fundingManager now has a reference to peers itself, + // alice.sendMessage will be triggered when Bob's funding manager + // attempts to send a message to Alice and vice versa. + alice.remotePeer = bob + alice.sendMessage = func(msg lnwire.Message) error { + select { + case alice.remotePeer.msgChan <- msg: + case <-alice.shutdownChannel: + return errors.New("shutting down") + } + return nil + } + + bob.remotePeer = alice + bob.sendMessage = func(msg lnwire.Message) error { + select { + case bob.remotePeer.msgChan <- msg: + case <-bob.shutdownChannel: + return errors.New("shutting down") + } + return nil + } + return alice, bob } @@ -473,7 +533,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, err: errChan, } - alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + alice.fundingMgr.initFundingWorkflow(bob, initReq) // Alice should have sent the OpenChannel message to Bob. var aliceMsg lnwire.Message @@ -498,7 +558,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, } // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) + bob.fundingMgr.processFundingOpen(openChannelReq, alice) // Bob should answer with an AcceptChannel message. acceptChannelResponse := assertFundingMsgSent( @@ -506,7 +566,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, ).(*lnwire.AcceptChannel) // Forward the response to Alice. - alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr) + alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob) // Alice responds with a FundingCreated message. fundingCreated := assertFundingMsgSent( @@ -514,7 +574,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, ).(*lnwire.FundingCreated) // Give the message to Bob. - bob.fundingMgr.processFundingCreated(fundingCreated, aliceAddr) + bob.fundingMgr.processFundingCreated(fundingCreated, alice) // Finally, Bob should send the FundingSigned message. fundingSigned := assertFundingMsgSent( @@ -522,7 +582,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, ).(*lnwire.FundingSigned) // Forward the signature to Alice. - alice.fundingMgr.processFundingSigned(fundingSigned, bobAddr) + alice.fundingMgr.processFundingSigned(fundingSigned, bob) // After Alice processes the singleFundingSignComplete message, she will // broadcast the funding transaction to the network. We expect to get a @@ -861,14 +921,14 @@ func assertErrChannelNotFound(t *testing.T, node *testNode, func assertHandleFundingLocked(t *testing.T, alice, bob *testNode) { // They should both send the new channel state to their peer. select { - case c := <-alice.peer.newChannels: + case c := <-alice.newChannels: close(c.done) case <-time.After(time.Second * 15): t.Fatalf("alice did not send new channel to peer") } select { - case c := <-bob.peer.newChannels: + case c := <-bob.newChannels: close(c.done) case <-time.After(time.Second * 15): t.Fatalf("bob did not send new channel to peer") @@ -935,8 +995,8 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { waitForOpenUpdate(t, updateChan) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -970,12 +1030,15 @@ func TestFundingManagerRestartBehavior(t *testing.T) { // sending it on restart. We mimic this behavior by letting the // SendToPeer method return an error, as if the message was not // successfully sent. We then recreate the fundingManager and make sure - // it continues the process as expected. - alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey, - msgs ...lnwire.Message) error { + // it continues the process as expected. We'll save the current + // implementation of sendMessage to restore the original behavior later + // on. + workingSendMessage := bob.sendMessage + bob.sendMessage = func(msg lnwire.Message) error { return fmt.Errorf("intentional error in SendToPeer") } - alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, con chan<- struct{}) { + alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + con chan<- lnpeer.Peer) { // Intentionally empty. } @@ -1010,8 +1073,14 @@ func TestFundingManagerRestartBehavior(t *testing.T) { // While Bob successfully sent fundingLocked. assertDatabaseState(t, bob, fundingOutPoint, fundingLockedSent) - // We now recreate Alice's fundingManager, and expect it to retry - // sending the fundingLocked message. + // We now recreate Alice's fundingManager with the correct sendMessage + // implementation, and expect it to retry sending the fundingLocked + // message. We'll explicitly shut down Alice's funding manager to + // prevent a race when overriding the sendMessage implementation. + if err := alice.fundingMgr.Stop(); err != nil { + t.Fatalf("unable to stop alice's funding manager: %v", err) + } + bob.sendMessage = workingSendMessage recreateAliceFundingManager(t, alice) // Intentionally make the channel announcements fail @@ -1036,8 +1105,8 @@ func TestFundingManagerRestartBehavior(t *testing.T) { } // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1090,13 +1159,17 @@ func TestFundingManagerOfflinePeer(t *testing.T) { // fundingLocked message to the other peer. If the funding node fails // to send the fundingLocked message to the peer, it should wait for // the server to notify it that the peer is back online, and try again. - alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey, - msgs ...lnwire.Message) error { + // We'll save the current implementation of sendMessage to restore the + // original behavior later on. + workingSendMessage := bob.sendMessage + bob.sendMessage = func(msg lnwire.Message) error { return fmt.Errorf("intentional error in SendToPeer") } peerChan := make(chan *btcec.PublicKey, 1) - conChan := make(chan chan<- struct{}, 1) - alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, connected chan<- struct{}) { + conChan := make(chan chan<- lnpeer.Peer, 1) + alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + connected chan<- lnpeer.Peer) { + peerChan <- peer conChan <- connected } @@ -1132,9 +1205,10 @@ func TestFundingManagerOfflinePeer(t *testing.T) { // While Bob successfully sent fundingLocked. assertDatabaseState(t, bob, fundingOutPoint, fundingLockedSent) - // Alice should be waiting for the server to notify when Bob comes back online. + // Alice should be waiting for the server to notify when Bob comes back + // online. var peer *btcec.PublicKey - var con chan<- struct{} + var con chan<- lnpeer.Peer select { case peer = <-peerChan: // Expected @@ -1154,17 +1228,10 @@ func TestFundingManagerOfflinePeer(t *testing.T) { bobPubKey, peer) } - // Fix Alice's SendToPeer, and notify that Bob is back online. - alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey, - msgs ...lnwire.Message) error { - select { - case alice.msgChan <- msgs[0]: - case <-alice.shutdownChannel: - return fmt.Errorf("shutting down") - } - return nil - } - close(con) + // Restore the correct sendMessage implementation, and notify that Bob + // is back online. + bob.sendMessage = workingSendMessage + con <- bob // This should make Alice send the fundingLocked. fundingLockedAlice := assertFundingMsgSent( @@ -1186,8 +1253,8 @@ func TestFundingManagerOfflinePeer(t *testing.T) { waitForOpenUpdate(t, updateChan) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1228,7 +1295,7 @@ func TestFundingManagerPeerTimeoutAfterInitFunding(t *testing.T) { err: errChan, } - alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + alice.fundingMgr.initFundingWorkflow(bob, initReq) // Alice should have sent the OpenChannel message to Bob. var aliceMsg lnwire.Message @@ -1288,7 +1355,7 @@ func TestFundingManagerPeerTimeoutAfterFundingOpen(t *testing.T) { err: errChan, } - alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + alice.fundingMgr.initFundingWorkflow(bob, initReq) // Alice should have sent the OpenChannel message to Bob. var aliceMsg lnwire.Message @@ -1316,7 +1383,7 @@ func TestFundingManagerPeerTimeoutAfterFundingOpen(t *testing.T) { assertNumPendingReservations(t, alice, bobPubKey, 1) // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) + bob.fundingMgr.processFundingOpen(openChannelReq, alice) // Bob should answer with an AcceptChannel. assertFundingMsgSent(t, bob.msgChan, "AcceptChannel") @@ -1357,7 +1424,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) { err: errChan, } - alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + alice.fundingMgr.initFundingWorkflow(bob, initReq) // Alice should have sent the OpenChannel message to Bob. var aliceMsg lnwire.Message @@ -1385,7 +1452,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) { assertNumPendingReservations(t, alice, bobPubKey, 1) // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) + bob.fundingMgr.processFundingOpen(openChannelReq, alice) // Bob should answer with an AcceptChannel. acceptChannelResponse := assertFundingMsgSent( @@ -1396,7 +1463,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) { assertNumPendingReservations(t, bob, alicePubKey, 1) // Forward the response to Alice. - alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr) + alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob) // Alice responds with a FundingCreated messages. assertFundingMsgSent(t, alice.msgChan, "FundingCreated") @@ -1571,9 +1638,9 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { waitForOpenUpdate(t, updateChan) // Send the fundingLocked message twice to Alice, and once to Bob. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1582,7 +1649,7 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { // Alice should not send the channel state the second time, as the // second funding locked should just be ignored. select { - case <-alice.peer.newChannels: + case <-alice.newChannels: t.Fatalf("alice sent new channel to peer a second time") case <-time.After(time.Millisecond * 300): // Expected @@ -1590,9 +1657,9 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { // Another fundingLocked should also be ignored, since Alice should // have updated her database at this point. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) select { - case <-alice.peer.newChannels: + case <-alice.newChannels: t.Fatalf("alice sent new channel to peer a second time") case <-time.After(time.Millisecond * 300): // Expected @@ -1665,8 +1732,8 @@ func TestFundingManagerRestartAfterChanAnn(t *testing.T) { recreateAliceFundingManager(t, alice) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1723,10 +1790,10 @@ func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) { assertFundingLockedSent(t, alice, bob, fundingOutPoint) // Let Alice immediately get the fundingLocked message. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) // Also let Bob get the fundingLocked message. - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1801,8 +1868,8 @@ func TestFundingManagerPrivateChannel(t *testing.T) { waitForOpenUpdate(t, updateChan) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1886,8 +1953,8 @@ func TestFundingManagerPrivateRestart(t *testing.T) { waitForOpenUpdate(t, updateChan) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1954,7 +2021,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { err: errChan, } - alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + alice.fundingMgr.initFundingWorkflow(bob, initReq) // Alice should have sent the OpenChannel message to Bob. var aliceMsg lnwire.Message @@ -1993,7 +2060,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { chanID := openChannelReq.PendingChannelID // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) + bob.fundingMgr.processFundingOpen(openChannelReq, alice) // Bob should answer with an AcceptChannel message. acceptChannelResponse := assertFundingMsgSent( @@ -2013,7 +2080,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { } // Forward the response to Alice. - alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr) + alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob) // Alice responds with a FundingCreated message. fundingCreated := assertFundingMsgSent( @@ -2021,7 +2088,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { ).(*lnwire.FundingCreated) // Give the message to Bob. - bob.fundingMgr.processFundingCreated(fundingCreated, aliceAddr) + bob.fundingMgr.processFundingCreated(fundingCreated, alice) // Finally, Bob should send the FundingSigned message. fundingSigned := assertFundingMsgSent( @@ -2029,7 +2096,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { ).(*lnwire.FundingSigned) // Forward the signature to Alice. - alice.fundingMgr.processFundingSigned(fundingSigned, bobAddr) + alice.fundingMgr.processFundingSigned(fundingSigned, bob) // After Alice processes the singleFundingSignComplete message, she will // broadcast the funding transaction to the network. We expect to get a