diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 764617fb..a7b70ba5 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -107,7 +107,9 @@ type Config struct { // NotifyWhenOnline is a function that allows the gossiper to be // notified when a certain peer comes online, allowing it to // retry sending a peer message. - NotifyWhenOnline func(peer *btcec.PublicKey, connectedChan chan<- struct{}) + // + // NOTE: The peerChan channel must be buffered. + NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) // ProofMatureDelta the number of confirmations which is needed before // exchange the channel announcement proofs. @@ -2399,13 +2401,13 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably( "to peer(%x): %v. Will retry when online.", remotePeer.SerializeCompressed(), err) - connected := make(chan struct{}) - d.cfg.NotifyWhenOnline(remotePeer, connected) + peerChan := make(chan lnpeer.Peer, 1) + d.cfg.NotifyWhenOnline(remotePeer, peerChan) select { - case <-connected: + case <-peerChan: // Retry sending. - log.Infof("peer %x reconnected. Retry sending"+ + log.Infof("Peer %x reconnected. Retry sending"+ " AnnounceSignatures.", remotePeer.SerializeCompressed()) diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index cd10ca90..9ecf69f5 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -23,6 +23,7 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" "github.com/btcsuite/btcd/btcec" @@ -1186,9 +1187,9 @@ func TestSignatureAnnouncementRetry(t *testing.T) { // We expect the gossiper to register for a notification when the peer // comes back online, so keep track of the channel it wants to get // notified on. - notifyPeers := make(chan chan<- struct{}, 1) + notifyPeers := make(chan chan<- lnpeer.Peer, 1) ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, - connectedChan chan<- struct{}) { + connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan } @@ -1207,7 +1208,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) { // Since sending this local announcement proof to the remote will fail, // the gossiper should register for a notification when the remote is // online again. - var conChan chan<- struct{} + var conChan chan<- lnpeer.Peer select { case conChan = <-notifyPeers: case <-time.After(2 * time.Second): @@ -1371,9 +1372,9 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { msg ...lnwire.Message) error { return fmt.Errorf("intentional error in SendToPeer") } - notifyPeers := make(chan chan<- struct{}, 1) + notifyPeers := make(chan chan<- lnpeer.Peer, 1) ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, - connectedChan chan<- struct{}) { + connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan } @@ -1391,7 +1392,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // Since sending to the remote peer will fail, the gossiper should // register for a notification when it comes back online. - var conChan chan<- struct{} + var conChan chan<- lnpeer.Peer select { case conChan = <-notifyPeers: case <-time.After(2 * time.Second): @@ -1430,7 +1431,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { return fmt.Errorf("intentional error in SendToPeer") }, NotifyWhenOnline: func(peer *btcec.PublicKey, - connectedChan chan<- struct{}) { + connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan }, Router: ctx.gossiper.cfg.Router, @@ -1606,9 +1607,9 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { return nil } - notifyPeers := make(chan chan<- struct{}, 1) + notifyPeers := make(chan chan<- lnpeer.Peer, 1) ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, - connectedChan chan<- struct{}) { + connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan } @@ -2145,6 +2146,8 @@ type mockPeer struct { quit chan struct{} } +var _ lnpeer.Peer = (*mockPeer)(nil) + func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { if p.sentMsgs == nil && p.quit == nil { return nil @@ -2159,6 +2162,9 @@ func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { return nil } +func (p *mockPeer) AddNewChannel(_ *lnwallet.LightningChannel, _ <-chan struct{}) error { + return nil +} func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil } func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk } func (p *mockPeer) PubKey() [33]byte { @@ -2166,3 +2172,4 @@ func (p *mockPeer) PubKey() [33]byte { copy(pubkey[:], p.pk.SerializeCompressed()) return pubkey } +func (p *mockPeer) Address() net.Addr { return nil } diff --git a/fundingmanager.go b/fundingmanager.go index 636ace73..7753204d 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -12,6 +12,10 @@ import ( "golang.org/x/crypto/salsa20" + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" @@ -19,14 +23,11 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" - "github.com/btcsuite/btcd/btcec" - "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btcutil" ) const ( @@ -85,6 +86,12 @@ var ( // // TODO(roasbeef): add command line param to modify maxFundingAmount = maxBtcFundingAmount + + // ErrFundingManagerShuttingDown is an error returned when attempting to + // process a funding request/message but the funding manager has already + // been signaled to shut down. + ErrFundingManagerShuttingDown = errors.New("funding manager shutting " + + "down") ) // reservationWithCtx encapsulates a pending channel reservation. This wrapper @@ -98,7 +105,7 @@ var ( // * deadlines, etc. type reservationWithCtx struct { reservation *lnwallet.ChannelReservation - peerAddress *lnwire.NetAddress + peer lnpeer.Peer chanAmt btcutil.Amount @@ -145,7 +152,7 @@ func (r *reservationWithCtx) updateTimestamp() { // embedded within this message giving the funding manager full context w.r.t // the workflow. type initFundingMsg struct { - peerAddress *lnwire.NetAddress + peer lnpeer.Peer *openChanReq } @@ -153,47 +160,47 @@ type initFundingMsg struct { // the message. This allows the funding manager to queue a response directly to // the peer, progressing the funding workflow. type fundingOpenMsg struct { - msg *lnwire.OpenChannel - peerAddress *lnwire.NetAddress + msg *lnwire.OpenChannel + peer lnpeer.Peer } // fundingAcceptMsg couples an lnwire.AcceptChannel message with the peer who // sent the message. This allows the funding manager to queue a response // directly to the peer, progressing the funding workflow. type fundingAcceptMsg struct { - msg *lnwire.AcceptChannel - peerAddress *lnwire.NetAddress + msg *lnwire.AcceptChannel + peer lnpeer.Peer } // fundingCreatedMsg couples an lnwire.FundingCreated message with the peer who // sent the message. This allows the funding manager to queue a response // directly to the peer, progressing the funding workflow. type fundingCreatedMsg struct { - msg *lnwire.FundingCreated - peerAddress *lnwire.NetAddress + msg *lnwire.FundingCreated + peer lnpeer.Peer } // fundingSignedMsg couples an lnwire.FundingSigned message with the peer who // sent the message. This allows the funding manager to queue a response // directly to the peer, progressing the funding workflow. type fundingSignedMsg struct { - msg *lnwire.FundingSigned - peerAddress *lnwire.NetAddress + msg *lnwire.FundingSigned + peer lnpeer.Peer } // fundingLockedMsg couples an lnwire.FundingLocked message with the peer who // sent the message. This allows the funding manager to finalize the funding // process and announce the existence of the new channel. type fundingLockedMsg struct { - msg *lnwire.FundingLocked - peerAddress *lnwire.NetAddress + msg *lnwire.FundingLocked + peer lnpeer.Peer } // fundingErrorMsg couples an lnwire.Error message with the peer who sent the // message. This allows the funding manager to properly process the error. type fundingErrorMsg struct { - err *lnwire.Error - peerAddress *lnwire.NetAddress + err *lnwire.Error + peerKey *btcec.PublicKey } // pendingChannels is a map instantiated per-peer which tracks all active @@ -257,21 +264,13 @@ type fundingConfig struct { // to the greater network. SendAnnouncement func(msg lnwire.Message) error - // SendToPeer allows the FundingManager to send messages to the peer - // node during the multiple steps involved in the creation of the - // channel's funding transaction and initial commitment transaction. - SendToPeer func(target *btcec.PublicKey, msgs ...lnwire.Message) error - // NotifyWhenOnline allows the FundingManager to register with a - // subsystem that will notify it when the peer comes online. - // This is used when sending the fundingLocked message, since it MUST be + // subsystem that will notify it when the peer comes online. This is + // used when sending the fundingLocked message, since it MUST be // delivered after the funding transaction is confirmed. - NotifyWhenOnline func(peer *btcec.PublicKey, connectedChan chan<- struct{}) - - // FindPeer searches the list of peers connected to the node so that - // the FundingManager can notify other daemon subsystems as necessary - // during the funding process. - FindPeer func(peerKey *btcec.PublicKey) (*peer, error) + // + // NOTE: The peerChan channel must be buffered. + NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) // FindChannel queries the database for the channel with the given // channel ID. @@ -318,9 +317,9 @@ type fundingConfig struct { // WatchNewChannel is to be called once a new channel enters the final // funding stage: waiting for on-chain confirmation. This method sends // the channel to the ChainArbitrator so it can watch for any on-chain - // events related to the channel. We also provide the address of the + // events related to the channel. We also provide the public key of the // node we're establishing a channel with for reconnection purposes. - WatchNewChannel func(*channeldb.OpenChannel, *lnwire.NetAddress) error + WatchNewChannel func(*channeldb.OpenChannel, *btcec.PublicKey) error // ReportShortChanID allows the funding manager to report the newly // discovered short channel ID of a formerly pending channel to outside @@ -547,16 +546,29 @@ func (f *fundingManager) Start() error { // resume wait on startup. case shortChanID, ok := <-confChan: if !ok { - fndgLog.Errorf("waiting for funding" + + fndgLog.Errorf("Waiting for funding" + "confirmation failed") return } - // Success, funding transaction was confirmed. - err := f.handleFundingConfirmation(ch, shortChanID) + // The funding transaction has confirmed, so + // we'll attempt to retrieve the remote peer + // to complete the rest of the funding flow. + peerChan := make(chan lnpeer.Peer, 1) + f.cfg.NotifyWhenOnline(ch.IdentityPub, peerChan) + + var peer lnpeer.Peer + select { + case peer = <-peerChan: + case <-f.quit: + return + } + err := f.handleFundingConfirmation( + peer, ch, shortChanID, + ) if err != nil { - fndgLog.Errorf("failed to handle funding"+ - "confirmation: %v", err) + fndgLog.Errorf("Failed to handle "+ + "funding confirmation: %v", err) return } } @@ -614,10 +626,21 @@ func (f *fundingManager) Start() error { go func(dbChan *channeldb.OpenChannel) { defer f.wg.Done() - err := f.handleFundingConfirmation(dbChan, shortChanID) + peerChan := make(chan lnpeer.Peer, 1) + f.cfg.NotifyWhenOnline(dbChan.IdentityPub, peerChan) + + var peer lnpeer.Peer + select { + case peer = <-peerChan: + case <-f.quit: + return + } + err := f.handleFundingConfirmation( + peer, dbChan, shortChanID, + ) if err != nil { - fndgLog.Errorf("failed to handle funding"+ - "confirmation: %v", err) + fndgLog.Errorf("Failed to handle "+ + "funding confirmation: %v", err) return } }(channel) @@ -740,7 +763,7 @@ func (f *fundingManager) PendingChannels() ([]*pendingChannel, error) { select { case f.queries <- req: case <-f.quit: - return nil, fmt.Errorf("fundingmanager shutting down") + return nil, ErrFundingManagerShuttingDown } select { @@ -749,7 +772,7 @@ func (f *fundingManager) PendingChannels() ([]*pendingChannel, error) { case err := <-errChan: return nil, err case <-f.quit: - return nil, fmt.Errorf("fundingmanager shutting down") + return nil, ErrFundingManagerShuttingDown } } @@ -796,13 +819,13 @@ func (f *fundingManager) CancelPeerReservations(nodePub [33]byte) { // // TODO(roasbeef): if peer disconnects, and haven't yet broadcast funding // transaction, then all reservations should be cleared. -func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey, - tempChanID [32]byte, fundingErr error) { +func (f *fundingManager) failFundingFlow(peer lnpeer.Peer, tempChanID [32]byte, + fundingErr error) { fndgLog.Debugf("Failing funding flow for pendingID=%x: %v", tempChanID, fundingErr) - ctx, err := f.cancelReservationCtx(peer, tempChanID) + ctx, err := f.cancelReservationCtx(peer.IdentityKey(), tempChanID) if err != nil { fndgLog.Errorf("unable to cancel reservation: %v", err) } @@ -837,8 +860,8 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey, } fndgLog.Debugf("Sending funding error to peer (%x): %v", - peer.SerializeCompressed(), spew.Sdump(errMsg)) - if err := f.cfg.SendToPeer(peer, errMsg); err != nil { + peer.IdentityKey().SerializeCompressed(), spew.Sdump(errMsg)) + if err := peer.SendMessage(false, errMsg); err != nil { fndgLog.Errorf("unable to send error message to peer %v", err) } } @@ -919,10 +942,10 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) { // processFundingOpen sends a message to the fundingManager allowing it to // initiate the new funding workflow with the source peer. func (f *fundingManager) processFundingOpen(msg *lnwire.OpenChannel, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingOpenMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingOpenMsg{msg, peer}: case <-f.quit: return } @@ -938,7 +961,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // Check number of pending channels to be smaller than maximum allowed // number and send ErrorGeneric to remote peer if condition is // violated. - peerIDKey := newSerializedKey(fmsg.peerAddress.IdentityKey) + peerIDKey := newSerializedKey(fmsg.peer.IdentityKey()) msg := fmsg.msg amt := msg.FundingAmount @@ -949,8 +972,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { if len(f.activeReservations[peerIDKey]) >= cfg.MaxPendingChannels { f.resMtx.RUnlock() f.failFundingFlow( - fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID, - lnwire.ErrMaxPendingChannels) + fmsg.peer, fmsg.msg.PendingChannelID, + lnwire.ErrMaxPendingChannels, + ) return } f.resMtx.RUnlock() @@ -964,8 +988,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { fndgLog.Errorf("unable to query wallet: %v", err) } f.failFundingFlow( - fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID, - lnwire.ErrSynchronizingChain) + fmsg.peer, fmsg.msg.PendingChannelID, + lnwire.ErrSynchronizingChain, + ) return } @@ -973,7 +998,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // current soft-limit for channel size. if msg.FundingAmount > maxFundingAmount { f.failFundingFlow( - fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID, + fmsg.peer, fmsg.msg.PendingChannelID, lnwire.ErrChanTooLarge, ) return @@ -983,7 +1008,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // a channel that's below our current min channel size. if amt < f.cfg.MinChanSize { f.failFundingFlow( - fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID, + fmsg.peer, fmsg.msg.PendingChannelID, lnwallet.ErrChanTooSmall(amt, btcutil.Amount(f.cfg.MinChanSize)), ) return @@ -992,7 +1017,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { fndgLog.Infof("Recv'd fundingRequest(amt=%v, push=%v, delay=%v, "+ "pendingId=%x) from peer(%x)", amt, msg.PushAmount, msg.CsvDelay, msg.PendingChannelID, - fmsg.peerAddress.IdentityKey.SerializeCompressed()) + fmsg.peer.IdentityKey().SerializeCompressed()) // Attempt to initialize a reservation within the wallet. If the wallet // has insufficient resources to create the channel, then the @@ -1003,13 +1028,12 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { reservation, err := f.cfg.Wallet.InitChannelReservation( amt, 0, msg.PushAmount, lnwallet.SatPerKWeight(msg.FeePerKiloWeight), 0, - fmsg.peerAddress.IdentityKey, fmsg.peerAddress.Address, + fmsg.peer.IdentityKey(), fmsg.peer.Address(), &chainHash, msg.ChannelFlags, ) if err != nil { fndgLog.Errorf("Unable to initialize reservation: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } @@ -1029,9 +1053,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { ) if err != nil { fndgLog.Errorf("Unacceptable channel constraints: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - fmsg.msg.PendingChannelID, err, - ) + f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err) return } @@ -1059,7 +1081,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { remoteCsvDelay: remoteCsvDelay, remoteMinHtlc: minHtlc, err: make(chan error, 1), - peerAddress: fmsg.peerAddress, + peer: fmsg.peer, } f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx f.resMtx.Unlock() @@ -1101,8 +1123,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { err = reservation.ProcessSingleContribution(remoteContribution) if err != nil { fndgLog.Errorf("unable to add contribution reservation: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } @@ -1130,11 +1151,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { HtlcPoint: ourContribution.HtlcBasePoint.PubKey, FirstCommitmentPoint: ourContribution.FirstCommitmentPoint, } - err = f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, &fundingAccept) - if err != nil { + if err := fmsg.peer.SendMessage(false, &fundingAccept); err != nil { fndgLog.Errorf("unable to send funding response to peer: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } } @@ -1142,10 +1161,10 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // processFundingAccept sends a message to the fundingManager allowing it to // continue the second phase of a funding workflow with the target peer. func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingAcceptMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingAcceptMsg{msg, peer}: case <-f.quit: return } @@ -1157,7 +1176,7 @@ func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel, func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { msg := fmsg.msg pendingChanID := fmsg.msg.PendingChannelID - peerKey := fmsg.peerAddress.IdentityKey + peerKey := fmsg.peer.IdentityKey() resCtx, err := f.getReservationCtx(peerKey, pendingChanID) if err != nil { @@ -1181,8 +1200,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { ) if err != nil { fndgLog.Warnf("Unacceptable channel constraints: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - fmsg.msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err) return } @@ -1228,9 +1246,8 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { err = resCtx.reservation.ProcessContribution(remoteContribution) if err != nil { fndgLog.Errorf("Unable to process contribution from %v: %v", - fmsg.peerAddress.IdentityKey, err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + peerKey, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } @@ -1273,15 +1290,12 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { fundingCreated.CommitSig, err = lnwire.NewSigFromRawSignature(sig) if err != nil { fndgLog.Errorf("Unable to parse signature: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } - err = f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingCreated) - if err != nil { + if err := fmsg.peer.SendMessage(false, fundingCreated); err != nil { fndgLog.Errorf("Unable to send funding complete message: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } } @@ -1289,10 +1303,10 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { // processFundingCreated queues a funding complete message coupled with the // source peer to the fundingManager. func (f *fundingManager) processFundingCreated(msg *lnwire.FundingCreated, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingCreatedMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingCreatedMsg{msg, peer}: case <-f.quit: return } @@ -1303,7 +1317,7 @@ func (f *fundingManager) processFundingCreated(msg *lnwire.FundingCreated, // processed, a signature is sent to the remote peer allowing it to broadcast // the funding transaction, progressing the workflow into the final stage. func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { - peerKey := fmsg.peerAddress.IdentityKey + peerKey := fmsg.peer.IdentityKey() pendingChanID := fmsg.msg.PendingChannelID resCtx, err := f.getReservationCtx(peerKey, pendingChanID) @@ -1333,8 +1347,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { if err != nil { // TODO(roasbeef): better error logging: peerID, channelID, etc. fndgLog.Errorf("unable to complete single reservation: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) return } @@ -1374,8 +1387,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { ourCommitSig, err := lnwire.NewSigFromRawSignature(sig) if err != nil { fndgLog.Errorf("unable to parse signature: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) deleteFromDatabase() return } @@ -1384,10 +1396,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { ChanID: channelID, CommitSig: ourCommitSig, } - if err := f.cfg.SendToPeer(peerKey, fundingSigned); err != nil { + if err := fmsg.peer.SendMessage(false, fundingSigned); err != nil { fndgLog.Errorf("unable to send FundingSigned message: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) deleteFromDatabase() return } @@ -1395,8 +1406,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // Now that we've sent over our final signature for this channel, we'll // send it to the ChainArbitrator so it can watch for any on-chain // actions during this final confirmation stage. - peerAddr := resCtx.peerAddress - if err := f.cfg.WatchNewChannel(completeChan, peerAddr); err != nil { + if err := f.cfg.WatchNewChannel(completeChan, peerKey); err != nil { fndgLog.Errorf("Unable to send new ChannelPoint(%v) for "+ "arbitration: %v", fundingOut, err) } @@ -1446,8 +1456,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { err := fmt.Errorf("timeout waiting for funding tx "+ "(%v) to confirm", completeChan.FundingOutpoint) fndgLog.Warnf(err.Error()) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) deleteFromDatabase() return case <-f.quit: @@ -1466,8 +1475,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // Success, funding transaction was confirmed. f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID) - err := f.handleFundingConfirmation(completeChan, - shortChanID) + err := f.handleFundingConfirmation( + fmsg.peer, completeChan, shortChanID, + ) if err != nil { fndgLog.Errorf("failed to handle funding"+ "confirmation: %v", err) @@ -1479,10 +1489,10 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // processFundingSigned sends a single funding sign complete message along with // the source peer to the funding manager. func (f *fundingManager) processFundingSigned(msg *lnwire.FundingSigned, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingSignedMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingSignedMsg{msg, peer}: case <-f.quit: return } @@ -1505,20 +1515,17 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { err := fmt.Errorf("Unable to find signed reservation for "+ "chan_id=%x", fmsg.msg.ChanID) fndgLog.Warnf(err.Error()) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - fmsg.msg.ChanID, err) + f.failFundingFlow(fmsg.peer, fmsg.msg.ChanID, err) return } - peerKey := fmsg.peerAddress.IdentityKey - resCtx, err := f.getReservationCtx(fmsg.peerAddress.IdentityKey, - pendingChanID) + peerKey := fmsg.peer.IdentityKey() + resCtx, err := f.getReservationCtx(peerKey, pendingChanID) if err != nil { fndgLog.Warnf("Unable to find reservation (peerID:%v, chanID:%x)", peerKey, pendingChanID[:]) // TODO: add ErrChanNotFound? - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) return } @@ -1538,8 +1545,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { completeChan, err := resCtx.reservation.CompleteReservation(nil, commitSig) if err != nil { fndgLog.Errorf("Unable to complete reservation sign complete: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) return } @@ -1547,14 +1553,14 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { // we'll send the to be active channel to the ChainArbitrator so it can // watch for any on-chin actions before the channel has fully // confirmed. - peerAddr := resCtx.peerAddress - if err := f.cfg.WatchNewChannel(completeChan, peerAddr); err != nil { + if err := f.cfg.WatchNewChannel(completeChan, peerKey); err != nil { fndgLog.Errorf("Unable to send new ChannelPoint(%v) for "+ "arbitration: %v", fundingPoint, err) } fndgLog.Infof("Finalizing pendingID(%x) over ChannelPoint(%v), "+ - "waiting for channel open on-chain", pendingChanID[:], fundingPoint) + "waiting for channel open on-chain", pendingChanID[:], + fundingPoint) // Send an update to the upstream client that the negotiation process // is over. @@ -1619,7 +1625,9 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { } defer lnChannel.Stop() - err = f.sendFundingLocked(completeChan, lnChannel, shortChanID) + err = f.sendFundingLocked( + fmsg.peer, completeChan, lnChannel, shortChanID, + ) if err != nil { fndgLog.Errorf("failed sending fundingLocked: %v", err) return @@ -1865,10 +1873,11 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open } // handleFundingConfirmation is a wrapper method for creating a new -// lnwallet.LightningChannel object, calling sendFundingLocked, addToRouterGraph, -// and annAfterSixConfs. This is called after the funding transaction is -// confirmed. -func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenChannel, +// lnwallet.LightningChannel object, calling sendFundingLocked, +// addToRouterGraph, and annAfterSixConfs. This is called after the funding +// transaction is confirmed. +func (f *fundingManager) handleFundingConfirmation(peer lnpeer.Peer, + completeChan *channeldb.OpenChannel, shortChanID *lnwire.ShortChannelID) error { // We create the state-machine object which wraps the database state. @@ -1884,7 +1893,7 @@ func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenC fndgLog.Debugf("ChannelID(%v) is now fully confirmed!", chanID) - err = f.sendFundingLocked(completeChan, lnChannel, shortChanID) + err = f.sendFundingLocked(peer, completeChan, lnChannel, shortChanID) if err != nil { return fmt.Errorf("failed sending fundingLocked: %v", err) } @@ -1904,11 +1913,12 @@ func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenC // sendFundingLocked creates and sends the fundingLocked message. // This should be called after the funding transaction has been confirmed, // and the channelState is 'markedOpen'. -func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel, - channel *lnwallet.LightningChannel, +func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer, + completeChan *channeldb.OpenChannel, channel *lnwallet.LightningChannel, shortChanID *lnwire.ShortChannelID) error { chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint) + peerKey := completeChan.IdentityPub // Next, we'll send over the funding locked message which marks that we // consider the channel open by presenting the remote party with our @@ -1933,33 +1943,30 @@ func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel, // down. for { fndgLog.Debugf("Sending FundingLocked for ChannelID(%v) to "+ - "peer %x", chanID, - completeChan.IdentityPub.SerializeCompressed()) + "peer %x", chanID, peerKey.SerializeCompressed()) - err = f.cfg.SendToPeer(completeChan.IdentityPub, - fundingLockedMsg) - if err == nil { - // Sending succeeded, we can break out and continue - // the funding flow. + if err := peer.SendMessage(false, fundingLockedMsg); err == nil { + // Sending succeeded, we can break out and continue the + // funding flow. break } - fndgLog.Warnf("unable to send fundingLocked to peer %x: "+ - "%v. Will retry when online", - completeChan.IdentityPub.SerializeCompressed(), err) + fndgLog.Warnf("Unable to send fundingLocked to peer %x: %v. "+ + "Will retry when online", peerKey.SerializeCompressed(), + err) - connected := make(chan struct{}) + connected := make(chan lnpeer.Peer, 1) f.cfg.NotifyWhenOnline(completeChan.IdentityPub, connected) + select { case <-connected: fndgLog.Infof("Peer(%x) came back online, will retry "+ "sending FundingLocked for ChannelID(%v)", - completeChan.IdentityPub.SerializeCompressed(), - chanID) + peerKey.SerializeCompressed(), chanID) // Retry sending. case <-f.quit: - return fmt.Errorf("shutting down unable to send") + return ErrFundingManagerShuttingDown } } @@ -2069,24 +2076,28 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel, confNtfn, err := f.cfg.Notifier.RegisterConfirmationsNtfn(&txid, numConfs, completeChan.FundingBroadcastHeight) if err != nil { - return fmt.Errorf("Unable to register for confirmation of "+ - "ChannelPoint(%v): %v", completeChan.FundingOutpoint, err) + return fmt.Errorf("Unable to register for "+ + "confirmation of ChannelPoint(%v): %v", + completeChan.FundingOutpoint, err) } - // Wait until 6 confirmations has been reached or the wallet signals - // a shutdown. + // Wait until 6 confirmations has been reached or the wallet + // signals a shutdown. select { case _, ok := <-confNtfn.Confirmed: if !ok { - return fmt.Errorf("ChainNotifier shutting down, cannot "+ - "complete funding flow for ChannelPoint(%v)", + return fmt.Errorf("ChainNotifier shutting "+ + "down, cannot complete funding flow "+ + "for ChannelPoint(%v)", completeChan.FundingOutpoint) } // Fallthrough. case <-f.quit: - return fmt.Errorf("fundingManager shutting down, stopping funding "+ - "flow for ChannelPoint(%v)", completeChan.FundingOutpoint) + return fmt.Errorf("%v, stopping funding flow for "+ + "ChannelPoint(%v)", + ErrFundingManagerShuttingDown, + completeChan.FundingOutpoint) } fundingPoint := completeChan.FundingOutpoint @@ -2095,9 +2106,10 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel, fndgLog.Infof("Announcing ChannelPoint(%v), short_chan_id=%v", &fundingPoint, spew.Sdump(shortChanID)) - // We'll obtain their min HTLC as we'll use this value within our - // ChannelUpdate. We use this value isn't of ours, as the remote party - // will be the one that's carrying the HTLC towards us. + // We'll obtain their min HTLC as we'll use this value within + // our ChannelUpdate. We use this value isn't of ours, as the + // remote party will be the one that's carrying the HTLC towards + // us. remoteMinHTLC := completeChan.RemoteChanCfg.MinHTLC // Create and broadcast the proofs required to make this channel @@ -2131,10 +2143,10 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel, // processFundingLocked sends a message to the fundingManager allowing it to // finish the funding workflow. func (f *fundingManager) processFundingLocked(msg *lnwire.FundingLocked, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingLockedMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingLockedMsg{msg, peer}: case <-f.quit: return } @@ -2146,7 +2158,7 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { defer f.wg.Done() fndgLog.Debugf("Received FundingLocked for ChannelID(%v) from "+ "peer %x", fmsg.msg.ChanID, - fmsg.peerAddress.IdentityKey.SerializeCompressed()) + fmsg.peer.IdentityKey().SerializeCompressed()) // If we are currently in the process of handling a funding locked // message for this channel, ignore. @@ -2242,32 +2254,11 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { f.barrierMtx.Unlock() }() - // Finally, we'll find the peer that sent us this message so we can - // provide it with the fully initialized channel state. - peer, err := f.cfg.FindPeer(fmsg.peerAddress.IdentityKey) - if err != nil { - fndgLog.Errorf("Unable to find peer: %v", err) + if err := fmsg.peer.AddNewChannel(channel, f.quit); err != nil { + fndgLog.Errorf("Unable to add new channel %v with peer %x: %v", + fmsg.peer.IdentityKey().SerializeCompressed(), + *channel.ChanPoint, err) channel.Stop() - return - } - newChanDone := make(chan struct{}) - newChanMsg := &newChannelMsg{ - channel: channel, - done: newChanDone, - } - - select { - case peer.newChannels <- newChanMsg: - case <-f.quit: - return - } - - // We pause here to wait for the peer to recognize the new channel - // before we close the channel barrier corresponding to the channel. - select { - case <-f.quit: - return - case <-newChanDone: // Fallthrough if we're not quitting. } } @@ -2486,11 +2477,9 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe // initFundingWorkflow sends a message to the funding manager instructing it // to initiate a single funder workflow with the source peer. // TODO(roasbeef): re-visit blocking nature.. -func (f *fundingManager) initFundingWorkflow(peerAddress *lnwire.NetAddress, - req *openChanReq) { - +func (f *fundingManager) initFundingWorkflow(peer lnpeer.Peer, req *openChanReq) { f.fundingRequests <- &initFundingMsg{ - peerAddress: peerAddress, + peer: peer, openChanReq: req, } } @@ -2500,7 +2489,7 @@ func (f *fundingManager) initFundingWorkflow(peerAddress *lnwire.NetAddress, // funding workflow. func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { var ( - peerKey = msg.peerAddress.IdentityKey + peerKey = msg.peer.IdentityKey() localAmt = msg.localFundingAmt remoteAmt = msg.remoteFundingAmt capacity = localAmt + remoteAmt @@ -2519,8 +2508,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { fndgLog.Infof("Initiating fundingRequest(localAmt=%v, remoteAmt=%v, "+ "capacity=%v, chainhash=%v, addr=%v, dustLimit=%v)", localAmt, - msg.pushAmt, capacity, msg.chainHash, msg.peerAddress.Address, - ourDustLimit) + msg.pushAmt, capacity, msg.chainHash, peerKey, ourDustLimit) // First, we'll query the fee estimator for a fee that should get the // commitment transaction confirmed by the next few blocks (conf target @@ -2556,7 +2544,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { // request will fail, and be aborted. reservation, err := f.cfg.Wallet.InitChannelReservation( capacity, localAmt, msg.pushAmt, commitFeePerKw, - msg.fundingFeePerVSize, peerKey, msg.peerAddress.Address, + msg.fundingFeePerVSize, peerKey, msg.peer.Address(), &msg.chainHash, channelFlags, ) if err != nil { @@ -2597,7 +2585,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { remoteCsvDelay: remoteCsvDelay, remoteMinHtlc: minHtlc, reservation: reservation, - peerAddress: msg.peerAddress, + peer: msg.peer, updates: msg.updates, err: msg.err, } @@ -2619,7 +2607,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { maxHtlcs := f.cfg.RequiredRemoteMaxHTLCs(capacity) fndgLog.Infof("Starting funding workflow with %v for pendingID(%x)", - msg.peerAddress.Address, chanID) + msg.peer.Address(), chanID) fundingOpen := lnwire.OpenChannel{ ChainHash: *f.cfg.Wallet.Cfg.NetParams.GenesisHash, @@ -2641,7 +2629,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { FirstCommitmentPoint: ourContribution.FirstCommitmentPoint, ChannelFlags: channelFlags, } - if err := f.cfg.SendToPeer(peerKey, &fundingOpen); err != nil { + if err := msg.peer.SendMessage(false, &fundingOpen); err != nil { e := fmt.Errorf("Unable to send funding request message: %v", err) fndgLog.Errorf(e.Error()) @@ -2681,10 +2669,10 @@ func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) { // processFundingError sends a message to the fundingManager allowing it to // process the occurred generic error. func (f *fundingManager) processFundingError(err *lnwire.Error, - peerAddress *lnwire.NetAddress) { + peerKey *btcec.PublicKey) { select { - case f.fundingMsgs <- &fundingErrorMsg{err, peerAddress}: + case f.fundingMsgs <- &fundingErrorMsg{err, peerKey}: case <-f.quit: return } @@ -2696,13 +2684,12 @@ func (f *fundingManager) processFundingError(err *lnwire.Error, func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { protocolErr := fmsg.err - peerKey := fmsg.peerAddress.IdentityKey chanID := fmsg.err.ChanID // First, we'll attempt to retrieve and cancel the funding workflow // that this error was tied to. If we're unable to do so, then we'll // exit early as this was an unwarranted error. - resCtx, err := f.cancelReservationCtx(peerKey, chanID) + resCtx, err := f.cancelReservationCtx(fmsg.peerKey, chanID) if err != nil { fndgLog.Warnf("Received error for non-existent funding "+ "flow: %v (%v)", err, spew.Sdump(protocolErr)) @@ -2713,7 +2700,7 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { // error back to the caller (if any), and cancel the workflow itself. lnErr := lnwire.ErrorCode(protocolErr.Data[0]) fndgLog.Errorf("Received funding error from %x: %v", - peerKey.SerializeCompressed(), string(protocolErr.Data), + fmsg.peerKey.SerializeCompressed(), string(protocolErr.Data), ) // If this isn't a simple error code, then we'll display the entire @@ -2753,10 +2740,11 @@ func (f *fundingManager) pruneZombieReservations() { f.resMtx.RUnlock() for pendingChanID, resCtx := range zombieReservations { - err := fmt.Errorf("reservation timed out waiting for peer (peerID:%v, "+ - "chanID:%x)", resCtx.peerAddress.IdentityKey, pendingChanID[:]) + err := fmt.Errorf("reservation timed out waiting for peer "+ + "(peerID:%v, chanID:%x)", resCtx.peer.IdentityKey(), + pendingChanID[:]) fndgLog.Warnf(err.Error()) - f.failFundingFlow(resCtx.peerAddress.IdentityKey, pendingChanID, err) + f.failFundingFlow(resCtx.peer, pendingChanID, err) } } @@ -2849,9 +2837,9 @@ func (f *fundingManager) getReservationCtx(peerKey *btcec.PublicKey, // channel will receive a new, permanent channel ID, and will no longer be // considered pending. func (f *fundingManager) IsPendingChannel(pendingChanID [32]byte, - peerAddress *lnwire.NetAddress) bool { + peerKey *btcec.PublicKey) bool { - peerIDKey := newSerializedKey(peerAddress.IdentityKey) + peerIDKey := newSerializedKey(peerKey) f.resMtx.RLock() _, ok := f.activeReservations[peerIDKey][pendingChanID] f.resMtx.RUnlock() diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 6baea62c..163b64aa 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -3,6 +3,7 @@ package main import ( + "errors" "fmt" "io/ioutil" "math/big" @@ -12,22 +13,22 @@ import ( "testing" "time" + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog" + "github.com/btcsuite/btcutil" + _ "github.com/btcsuite/btcwallet/walletdb/bdb" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" - "github.com/btcsuite/btcd/chaincfg" - "github.com/btcsuite/btcd/chaincfg/chainhash" - _ "github.com/btcsuite/btcwallet/walletdb/bdb" - - "github.com/btcsuite/btcd/btcec" - "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btcutil" ) const ( @@ -134,14 +135,64 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, type testNode struct { privKey *btcec.PrivateKey + addr *lnwire.NetAddress msgChan chan lnwire.Message announceChan chan lnwire.Message publTxChan chan *wire.MsgTx fundingMgr *fundingManager - peer *peer + newChannels chan *newChannelMsg mockNotifier *mockNotifier testDir string shutdownChannel chan struct{} + + remotePeer *testNode + sendMessage func(lnwire.Message) error +} + +var _ lnpeer.Peer = (*testNode)(nil) + +func (n *testNode) IdentityKey() *btcec.PublicKey { + return n.addr.IdentityKey +} + +func (n *testNode) Address() net.Addr { + return n.addr.Address +} + +func (n *testNode) PubKey() [33]byte { + return newSerializedKey(n.addr.IdentityKey) +} + +func (n *testNode) SendMessage(_ bool, msg ...lnwire.Message) error { + return n.sendMessage(msg[0]) +} + +func (n *testNode) WipeChannel(_ *wire.OutPoint) error { + return nil +} + +func (n *testNode) AddNewChannel(channel *lnwallet.LightningChannel, + quit <-chan struct{}) error { + + done := make(chan struct{}) + msg := &newChannelMsg{ + channel: channel, + done: done, + } + + select { + case n.newChannels <- msg: + case <-quit: + return ErrFundingManagerShuttingDown + } + + select { + case <-done: + case <-quit: + return ErrFundingManagerShuttingDown + } + + return nil } func init() { @@ -180,7 +231,7 @@ func createTestWallet(cdb *channeldb.DB, netParams *chaincfg.Params, } func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, - tempTestDir string) (*testNode, error) { + addr *lnwire.NetAddress, tempTestDir string) (*testNode, error) { netParams := activeNetParams.Params estimator := lnwallet.StaticFeeEstimator{FeeRate: 250} @@ -191,11 +242,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, epochChan: make(chan *chainntnfs.BlockEpoch, 1), } - newChannelsChan := make(chan *newChannelMsg) - p := &peer{ - newChannels: newChannelsChan, - } - sentMessages := make(chan lnwire.Message) sentAnnouncements := make(chan lnwire.Message) publTxChan := make(chan *wire.MsgTx, 1) @@ -249,20 +295,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { return lnwire.NodeAnnouncement{}, nil }, - SendToPeer: func(target *btcec.PublicKey, msgs ...lnwire.Message) error { - select { - case sentMessages <- msgs[0]: - case <-shutdownChan: - return fmt.Errorf("shutting down") - } - return nil - }, - NotifyWhenOnline: func(peer *btcec.PublicKey, connectedChan chan<- struct{}) { - t.Fatalf("did not expect fundingManager to call NotifyWhenOnline") - }, - FindPeer: func(peerKey *btcec.PublicKey) (*peer, error) { - return p, nil - }, TempChanIDSeed: chanIDSeed, FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) { dbChannels, err := cdb.FetchAllChannels() @@ -311,7 +343,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, RequiredRemoteMaxHTLCs: func(chanAmt btcutil.Amount) uint16 { return uint16(lnwallet.MaxHTLCNumber / 2) }, - WatchNewChannel: func(*channeldb.OpenChannel, *lnwire.NetAddress) error { + WatchNewChannel: func(*channeldb.OpenChannel, *btcec.PublicKey) error { return nil }, ReportShortChanID: func(wire.OutPoint) error { @@ -323,22 +355,30 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, if err != nil { t.Fatalf("failed creating fundingManager: %v", err) } - if err = f.Start(); err != nil { t.Fatalf("failed starting fundingManager: %v", err) } - return &testNode{ + testNode := &testNode{ privKey: privKey, msgChan: sentMessages, + newChannels: make(chan *newChannelMsg), announceChan: sentAnnouncements, publTxChan: publTxChan, fundingMgr: f, - peer: p, mockNotifier: chainNotifier, testDir: tempTestDir, shutdownChannel: shutdownChan, - }, nil + addr: addr, + } + + f.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + connectedChan chan<- lnpeer.Peer) { + + connectedChan <- testNode + } + + return testNode, nil } func recreateAliceFundingManager(t *testing.T, alice *testNode) { @@ -375,19 +415,11 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) { CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { return lnwire.NodeAnnouncement{}, nil }, - SendToPeer: func(target *btcec.PublicKey, - msgs ...lnwire.Message) error { - select { - case aliceMsgChan <- msgs[0]: - case <-shutdownChan: - return fmt.Errorf("shutting down") - } - return nil + NotifyWhenOnline: func(peer *btcec.PublicKey, + connectedChan chan<- lnpeer.Peer) { + + connectedChan <- alice.remotePeer }, - NotifyWhenOnline: func(peer *btcec.PublicKey, connectedChan chan<- struct{}) { - t.Fatalf("did not expect fundingManager to call NotifyWhenOnline") - }, - FindPeer: oldCfg.FindPeer, TempChanIDSeed: oldCfg.TempChanIDSeed, FindChannel: oldCfg.FindChannel, PublishTransaction: func(txn *wire.MsgTx) error { @@ -424,7 +456,9 @@ func setupFundingManagers(t *testing.T) (*testNode, *testNode) { t.Fatalf("unable to create temp directory: %v", err) } - alice, err := createTestFundingManager(t, alicePrivKey, aliceTestDir) + alice, err := createTestFundingManager( + t, alicePrivKey, aliceAddr, aliceTestDir, + ) if err != nil { t.Fatalf("failed creating fundingManager: %v", err) } @@ -434,11 +468,37 @@ func setupFundingManagers(t *testing.T) (*testNode, *testNode) { t.Fatalf("unable to create temp directory: %v", err) } - bob, err := createTestFundingManager(t, bobPrivKey, bobTestDir) + bob, err := createTestFundingManager(t, bobPrivKey, bobAddr, bobTestDir) if err != nil { t.Fatalf("failed creating fundingManager: %v", err) } + // With the funding manager's created, we'll now attempt to mimic a + // connection pipe between them. In order to intercept the messages + // within it, we'll redirect all messages back to the msgChan of the + // sender. Since the fundingManager now has a reference to peers itself, + // alice.sendMessage will be triggered when Bob's funding manager + // attempts to send a message to Alice and vice versa. + alice.remotePeer = bob + alice.sendMessage = func(msg lnwire.Message) error { + select { + case alice.remotePeer.msgChan <- msg: + case <-alice.shutdownChannel: + return errors.New("shutting down") + } + return nil + } + + bob.remotePeer = alice + bob.sendMessage = func(msg lnwire.Message) error { + select { + case bob.remotePeer.msgChan <- msg: + case <-bob.shutdownChannel: + return errors.New("shutting down") + } + return nil + } + return alice, bob } @@ -473,7 +533,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, err: errChan, } - alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + alice.fundingMgr.initFundingWorkflow(bob, initReq) // Alice should have sent the OpenChannel message to Bob. var aliceMsg lnwire.Message @@ -498,7 +558,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, } // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) + bob.fundingMgr.processFundingOpen(openChannelReq, alice) // Bob should answer with an AcceptChannel message. acceptChannelResponse := assertFundingMsgSent( @@ -506,7 +566,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, ).(*lnwire.AcceptChannel) // Forward the response to Alice. - alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr) + alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob) // Alice responds with a FundingCreated message. fundingCreated := assertFundingMsgSent( @@ -514,7 +574,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, ).(*lnwire.FundingCreated) // Give the message to Bob. - bob.fundingMgr.processFundingCreated(fundingCreated, aliceAddr) + bob.fundingMgr.processFundingCreated(fundingCreated, alice) // Finally, Bob should send the FundingSigned message. fundingSigned := assertFundingMsgSent( @@ -522,7 +582,7 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, ).(*lnwire.FundingSigned) // Forward the signature to Alice. - alice.fundingMgr.processFundingSigned(fundingSigned, bobAddr) + alice.fundingMgr.processFundingSigned(fundingSigned, bob) // After Alice processes the singleFundingSignComplete message, she will // broadcast the funding transaction to the network. We expect to get a @@ -861,14 +921,14 @@ func assertErrChannelNotFound(t *testing.T, node *testNode, func assertHandleFundingLocked(t *testing.T, alice, bob *testNode) { // They should both send the new channel state to their peer. select { - case c := <-alice.peer.newChannels: + case c := <-alice.newChannels: close(c.done) case <-time.After(time.Second * 15): t.Fatalf("alice did not send new channel to peer") } select { - case c := <-bob.peer.newChannels: + case c := <-bob.newChannels: close(c.done) case <-time.After(time.Second * 15): t.Fatalf("bob did not send new channel to peer") @@ -935,8 +995,8 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { waitForOpenUpdate(t, updateChan) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -970,12 +1030,15 @@ func TestFundingManagerRestartBehavior(t *testing.T) { // sending it on restart. We mimic this behavior by letting the // SendToPeer method return an error, as if the message was not // successfully sent. We then recreate the fundingManager and make sure - // it continues the process as expected. - alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey, - msgs ...lnwire.Message) error { + // it continues the process as expected. We'll save the current + // implementation of sendMessage to restore the original behavior later + // on. + workingSendMessage := bob.sendMessage + bob.sendMessage = func(msg lnwire.Message) error { return fmt.Errorf("intentional error in SendToPeer") } - alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, con chan<- struct{}) { + alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + con chan<- lnpeer.Peer) { // Intentionally empty. } @@ -1010,8 +1073,14 @@ func TestFundingManagerRestartBehavior(t *testing.T) { // While Bob successfully sent fundingLocked. assertDatabaseState(t, bob, fundingOutPoint, fundingLockedSent) - // We now recreate Alice's fundingManager, and expect it to retry - // sending the fundingLocked message. + // We now recreate Alice's fundingManager with the correct sendMessage + // implementation, and expect it to retry sending the fundingLocked + // message. We'll explicitly shut down Alice's funding manager to + // prevent a race when overriding the sendMessage implementation. + if err := alice.fundingMgr.Stop(); err != nil { + t.Fatalf("unable to stop alice's funding manager: %v", err) + } + bob.sendMessage = workingSendMessage recreateAliceFundingManager(t, alice) // Intentionally make the channel announcements fail @@ -1036,8 +1105,8 @@ func TestFundingManagerRestartBehavior(t *testing.T) { } // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1090,13 +1159,17 @@ func TestFundingManagerOfflinePeer(t *testing.T) { // fundingLocked message to the other peer. If the funding node fails // to send the fundingLocked message to the peer, it should wait for // the server to notify it that the peer is back online, and try again. - alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey, - msgs ...lnwire.Message) error { + // We'll save the current implementation of sendMessage to restore the + // original behavior later on. + workingSendMessage := bob.sendMessage + bob.sendMessage = func(msg lnwire.Message) error { return fmt.Errorf("intentional error in SendToPeer") } peerChan := make(chan *btcec.PublicKey, 1) - conChan := make(chan chan<- struct{}, 1) - alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, connected chan<- struct{}) { + conChan := make(chan chan<- lnpeer.Peer, 1) + alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + connected chan<- lnpeer.Peer) { + peerChan <- peer conChan <- connected } @@ -1132,9 +1205,10 @@ func TestFundingManagerOfflinePeer(t *testing.T) { // While Bob successfully sent fundingLocked. assertDatabaseState(t, bob, fundingOutPoint, fundingLockedSent) - // Alice should be waiting for the server to notify when Bob comes back online. + // Alice should be waiting for the server to notify when Bob comes back + // online. var peer *btcec.PublicKey - var con chan<- struct{} + var con chan<- lnpeer.Peer select { case peer = <-peerChan: // Expected @@ -1154,17 +1228,10 @@ func TestFundingManagerOfflinePeer(t *testing.T) { bobPubKey, peer) } - // Fix Alice's SendToPeer, and notify that Bob is back online. - alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey, - msgs ...lnwire.Message) error { - select { - case alice.msgChan <- msgs[0]: - case <-alice.shutdownChannel: - return fmt.Errorf("shutting down") - } - return nil - } - close(con) + // Restore the correct sendMessage implementation, and notify that Bob + // is back online. + bob.sendMessage = workingSendMessage + con <- bob // This should make Alice send the fundingLocked. fundingLockedAlice := assertFundingMsgSent( @@ -1186,8 +1253,8 @@ func TestFundingManagerOfflinePeer(t *testing.T) { waitForOpenUpdate(t, updateChan) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1228,7 +1295,7 @@ func TestFundingManagerPeerTimeoutAfterInitFunding(t *testing.T) { err: errChan, } - alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + alice.fundingMgr.initFundingWorkflow(bob, initReq) // Alice should have sent the OpenChannel message to Bob. var aliceMsg lnwire.Message @@ -1288,7 +1355,7 @@ func TestFundingManagerPeerTimeoutAfterFundingOpen(t *testing.T) { err: errChan, } - alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + alice.fundingMgr.initFundingWorkflow(bob, initReq) // Alice should have sent the OpenChannel message to Bob. var aliceMsg lnwire.Message @@ -1316,7 +1383,7 @@ func TestFundingManagerPeerTimeoutAfterFundingOpen(t *testing.T) { assertNumPendingReservations(t, alice, bobPubKey, 1) // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) + bob.fundingMgr.processFundingOpen(openChannelReq, alice) // Bob should answer with an AcceptChannel. assertFundingMsgSent(t, bob.msgChan, "AcceptChannel") @@ -1357,7 +1424,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) { err: errChan, } - alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + alice.fundingMgr.initFundingWorkflow(bob, initReq) // Alice should have sent the OpenChannel message to Bob. var aliceMsg lnwire.Message @@ -1385,7 +1452,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) { assertNumPendingReservations(t, alice, bobPubKey, 1) // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) + bob.fundingMgr.processFundingOpen(openChannelReq, alice) // Bob should answer with an AcceptChannel. acceptChannelResponse := assertFundingMsgSent( @@ -1396,7 +1463,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) { assertNumPendingReservations(t, bob, alicePubKey, 1) // Forward the response to Alice. - alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr) + alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob) // Alice responds with a FundingCreated messages. assertFundingMsgSent(t, alice.msgChan, "FundingCreated") @@ -1571,9 +1638,9 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { waitForOpenUpdate(t, updateChan) // Send the fundingLocked message twice to Alice, and once to Bob. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1582,7 +1649,7 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { // Alice should not send the channel state the second time, as the // second funding locked should just be ignored. select { - case <-alice.peer.newChannels: + case <-alice.newChannels: t.Fatalf("alice sent new channel to peer a second time") case <-time.After(time.Millisecond * 300): // Expected @@ -1590,9 +1657,9 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { // Another fundingLocked should also be ignored, since Alice should // have updated her database at this point. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) select { - case <-alice.peer.newChannels: + case <-alice.newChannels: t.Fatalf("alice sent new channel to peer a second time") case <-time.After(time.Millisecond * 300): // Expected @@ -1665,8 +1732,8 @@ func TestFundingManagerRestartAfterChanAnn(t *testing.T) { recreateAliceFundingManager(t, alice) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1723,10 +1790,10 @@ func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) { assertFundingLockedSent(t, alice, bob, fundingOutPoint) // Let Alice immediately get the fundingLocked message. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) // Also let Bob get the fundingLocked message. - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1801,8 +1868,8 @@ func TestFundingManagerPrivateChannel(t *testing.T) { waitForOpenUpdate(t, updateChan) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1886,8 +1953,8 @@ func TestFundingManagerPrivateRestart(t *testing.T) { waitForOpenUpdate(t, updateChan) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1954,7 +2021,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { err: errChan, } - alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + alice.fundingMgr.initFundingWorkflow(bob, initReq) // Alice should have sent the OpenChannel message to Bob. var aliceMsg lnwire.Message @@ -1993,7 +2060,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { chanID := openChannelReq.PendingChannelID // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) + bob.fundingMgr.processFundingOpen(openChannelReq, alice) // Bob should answer with an AcceptChannel message. acceptChannelResponse := assertFundingMsgSent( @@ -2013,7 +2080,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { } // Forward the response to Alice. - alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr) + alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob) // Alice responds with a FundingCreated message. fundingCreated := assertFundingMsgSent( @@ -2021,7 +2088,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { ).(*lnwire.FundingCreated) // Give the message to Bob. - bob.fundingMgr.processFundingCreated(fundingCreated, aliceAddr) + bob.fundingMgr.processFundingCreated(fundingCreated, alice) // Finally, Bob should send the FundingSigned message. fundingSigned := assertFundingMsgSent( @@ -2029,7 +2096,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { ).(*lnwire.FundingSigned) // Forward the signature to Alice. - alice.fundingMgr.processFundingSigned(fundingSigned, bobAddr) + alice.fundingMgr.processFundingSigned(fundingSigned, bob) // After Alice processes the singleFundingSignComplete message, she will // broadcast the funding transaction to the network. We expect to get a diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 74366850..c7f1d3ae 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "math" + "net" "reflect" "runtime" "strings" @@ -1411,6 +1412,9 @@ func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error { } return nil } +func (m *mockPeer) AddNewChannel(_ *lnwallet.LightningChannel, _ <-chan struct{}) error { + return nil +} func (m *mockPeer) WipeChannel(*wire.OutPoint) error { return nil } @@ -1420,8 +1424,9 @@ func (m *mockPeer) PubKey() [33]byte { func (m *mockPeer) IdentityKey() *btcec.PublicKey { return nil } - -var _ lnpeer.Peer = (*mockPeer)(nil) +func (m *mockPeer) Address() net.Addr { + return nil +} func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( ChannelLink, *lnwallet.LightningChannel, chan time.Time, func() error, diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 487b1601..fb648ac2 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "io/ioutil" + "net" "sync" "sync/atomic" "testing" @@ -517,6 +518,16 @@ func (s *mockServer) IdentityKey() *btcec.PublicKey { return pubkey } +func (s *mockServer) Address() net.Addr { + return nil +} + +func (s *mockServer) AddNewChannel(channel *lnwallet.LightningChannel, + cancel <-chan struct{}) error { + + return nil +} + func (s *mockServer) WipeChannel(*wire.OutPoint) error { return nil } diff --git a/lnd.go b/lnd.go index 33dcc2a4..237e2807 100644 --- a/lnd.go +++ b/lnd.go @@ -360,9 +360,7 @@ func lndMain() error { idPrivKey.PubKey()) return <-errChan }, - SendToPeer: server.SendToPeer, NotifyWhenOnline: server.NotifyWhenOnline, - FindPeer: server.FindPeer, TempChanIDSeed: chanIDSeed, FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) { dbChannels, err := chanDB.FetchAllChannels() @@ -450,12 +448,12 @@ func lndMain() error { return delay }, WatchNewChannel: func(channel *channeldb.OpenChannel, - addr *lnwire.NetAddress) error { + peerKey *btcec.PublicKey) error { // First, we'll mark this new peer as a persistent peer // for re-connection purposes. server.mu.Lock() - pubStr := string(addr.IdentityKey.SerializeCompressed()) + pubStr := string(peerKey.SerializeCompressed()) server.persistentPeers[pubStr] = struct{}{} server.mu.Unlock() diff --git a/lnpeer/peer.go b/lnpeer/peer.go index 5664bab9..99efe845 100644 --- a/lnpeer/peer.go +++ b/lnpeer/peer.go @@ -1,8 +1,11 @@ package lnpeer import ( + "net" + "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" ) @@ -14,6 +17,10 @@ type Peer interface { // has been sent to the remote peer. SendMessage(sync bool, msg ...lnwire.Message) error + // AddNewChannel adds a new channel to the peer. The channel should fail + // to be added if the cancel channel is closed. + AddNewChannel(channel *lnwallet.LightningChannel, cancel <-chan struct{}) error + // WipeChannel removes the channel uniquely identified by its channel // point from all indexes associated with the peer. WipeChannel(*wire.OutPoint) error @@ -23,4 +30,7 @@ type Peer interface { // IdentityKey returns the public key of the remote peer. IdentityKey() *btcec.PublicKey + + // Address returns the network address of the remote peer. + Address() net.Addr } diff --git a/peer.go b/peer.go index 1d01e7bf..e1113c24 100644 --- a/peer.go +++ b/peer.go @@ -21,6 +21,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" @@ -185,6 +186,9 @@ type peer struct { wg sync.WaitGroup } +// A compile-time check to ensure that peer satisfies the lnpeer.Peer interface. +var _ lnpeer.Peer = (*peer)(nil) + // newPeer creates a new peer from an establish connection object, and a // pointer to the main server. func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, @@ -907,15 +911,15 @@ out: p.queueMsg(lnwire.NewPong(pongBytes), nil) case *lnwire.OpenChannel: - p.server.fundingMgr.processFundingOpen(msg, p.addr) + p.server.fundingMgr.processFundingOpen(msg, p) case *lnwire.AcceptChannel: - p.server.fundingMgr.processFundingAccept(msg, p.addr) + p.server.fundingMgr.processFundingAccept(msg, p) case *lnwire.FundingCreated: - p.server.fundingMgr.processFundingCreated(msg, p.addr) + p.server.fundingMgr.processFundingCreated(msg, p) case *lnwire.FundingSigned: - p.server.fundingMgr.processFundingSigned(msg, p.addr) + p.server.fundingMgr.processFundingSigned(msg, p) case *lnwire.FundingLocked: - p.server.fundingMgr.processFundingLocked(msg, p.addr) + p.server.fundingMgr.processFundingLocked(msg, p) case *lnwire.Shutdown: select { @@ -931,8 +935,9 @@ out: } case *lnwire.Error: - switch { + key := p.addr.IdentityKey + switch { // In the case of an all-zero channel ID we want to // forward the error to all channels with this peer. case msg.ChanID == lnwire.ConnectionWideID: @@ -948,8 +953,8 @@ out: // If the channel ID for the error message corresponds // to a pending channel, then the funding manager will // handle the error. - case p.server.fundingMgr.IsPendingChannel(msg.ChanID, p.addr): - p.server.fundingMgr.processFundingError(msg, p.addr) + case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key): + p.server.fundingMgr.processFundingError(msg, key) // If not we hand the error to the channel link for // this channel. @@ -1999,6 +2004,41 @@ func (p *peer) IdentityKey() *btcec.PublicKey { return p.addr.IdentityKey } +// Address returns the network address of the remote peer. +func (p *peer) Address() net.Addr { + return p.addr.Address +} + +// AddNewChannel adds a new channel to the peer. The channel should fail to be +// added if the cancel channel is closed. +func (p *peer) AddNewChannel(channel *lnwallet.LightningChannel, + cancel <-chan struct{}) error { + + newChanDone := make(chan struct{}) + newChanMsg := &newChannelMsg{ + channel: channel, + done: newChanDone, + } + + select { + case p.newChannels <- newChanMsg: + case <-cancel: + return errors.New("canceled adding new channel") + case <-p.quit: + return ErrPeerExiting + } + + // We pause here to wait for the peer to recognize the new channel + // before we close the channel barrier corresponding to the channel. + select { + case <-newChanDone: + case <-p.quit: + return ErrPeerExiting + } + + return nil +} + // TODO(roasbeef): make all start/stop mutexes a CAS // fetchLastChanUpdate returns a function which is able to retrieve the last diff --git a/server.go b/server.go index d8ed46ca..cd979aa5 100644 --- a/server.go +++ b/server.go @@ -110,7 +110,7 @@ type server struct { inboundPeers map[string]*peer outboundPeers map[string]*peer - peerConnectedListeners map[string][]chan<- struct{} + peerConnectedListeners map[string][]chan<- lnpeer.Peer persistentPeers map[string]struct{} persistentPeersBackoff map[string]time.Duration @@ -264,7 +264,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, peersByPub: make(map[string]*peer), inboundPeers: make(map[string]*peer), outboundPeers: make(map[string]*peer), - peerConnectedListeners: make(map[string][]chan<- struct{}), + peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), globalFeatures: lnwire.NewFeatureVector(globalFeatures, lnwire.GlobalFeatures), @@ -1568,31 +1568,38 @@ func (s *server) SendToPeer(target *btcec.PublicKey, } // NotifyWhenOnline can be called by other subsystems to get notified when a -// particular peer comes online. +// particular peer comes online. The peer itself is sent across the peerChan. // // NOTE: This function is safe for concurrent access. -func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, - connectedChan chan<- struct{}) { +func (s *server) NotifyWhenOnline(peerKey *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + s.mu.Lock() defer s.mu.Unlock() // Compute the target peer's identifier. - pubStr := string(peer.SerializeCompressed()) + pubStr := string(peerKey.SerializeCompressed()) // Check if peer is connected. - _, ok := s.peersByPub[pubStr] + peer, ok := s.peersByPub[pubStr] if ok { // Connected, can return early. srvrLog.Debugf("Notifying that peer %x is online", - peer.SerializeCompressed()) - close(connectedChan) + peerKey.SerializeCompressed()) + + select { + case peerChan <- peer: + case <-s.quit: + } + return } // Not connected, store this listener such that it can be notified when // the peer comes online. s.peerConnectedListeners[pubStr] = append( - s.peerConnectedListeners[pubStr], connectedChan) + s.peerConnectedListeners[pubStr], peerChan, + ) } // sendPeerMessages enqueues a list of messages into the outgoingQueue of the @@ -2240,8 +2247,12 @@ func (s *server) addPeer(p *peer) { } // Check if there are listeners waiting for this peer to come online. - for _, con := range s.peerConnectedListeners[pubStr] { - close(con) + for _, peerChan := range s.peerConnectedListeners[pubStr] { + select { + case peerChan <- p: + case <-s.quit: + return + } } delete(s.peerConnectedListeners, pubStr) } @@ -2509,7 +2520,7 @@ func (s *server) OpenChannel(nodeKey *btcec.PublicKey, // TODO(roasbeef): pass in chan that's closed if/when funding succeeds // so can track as persistent peer? - go s.fundingMgr.initFundingWorkflow(targetPeer.addr, req) + go s.fundingMgr.initFundingWorkflow(targetPeer, req) return updateChan, errChan }