From 6b3844ea664735be40e0370418279723392630c7 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 13 Sep 2017 14:07:51 +0200 Subject: [PATCH 1/8] lnwallet: add getter for channelState.RemoteNextCommitment --- lnwallet/channel.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 45db6426..d86ec4a5 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -4000,3 +4000,11 @@ func CreateCooperativeCloseTx(fundingTxIn *wire.TxIn, func (lc *LightningChannel) CalcFee(feeRate uint64) uint64 { return (feeRate * uint64(commitWeight)) / 1000 } + +// RemoteNextRevocation returns the channelState's RemoteNextRevocation. +func (lc *LightningChannel) RemoteNextRevocation() *btcec.PublicKey { + lc.Lock() + defer lc.Unlock() + + return lc.channelState.RemoteNextRevocation +} From 8244b7a78cfb304d4b841ed732587a5ec9844073 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 1 Sep 2017 11:42:40 +0200 Subject: [PATCH 2/8] fundingManager: handle duplicate fundingLocked This commit adds a channel barrier on fundingManager startup for channels where the opening process is not finished. This fixes a bug where we after restarting the fundingManager would receive the fundingLocked message, and crash when trying to close the non-existing barrier. In case we received a fundingLocked message after our own opening process was finished, we would crash with the same error. We therefore check if the channel barrier exists before we try to close it. It also adds functionality to fundingManager that makes it ignore a fundingLocked message it receives for a channel where this is already received. This is necessary when we in case of a reconnection resend the fundingLocked since we cannot be sure the remote has received it. The fundingmanager tests are also updated to check that the fundingLocked messages are sent and handled correcly, and also exercise the scanarios described above. --- fundingmanager.go | 95 +++- fundingmanager_test.go | 1105 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 1102 insertions(+), 98 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 551c4f45..0e395e7a 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -286,6 +286,9 @@ type fundingManager struct { localDiscoveryMtx sync.Mutex localDiscoverySignals map[lnwire.ChannelID]chan struct{} + handleFundingLockedMtx sync.RWMutex + handleFundingLockedBarriers map[lnwire.ChannelID]struct{} + quit chan struct{} wg sync.WaitGroup } @@ -323,16 +326,17 @@ var ( // fundingManager. func newFundingManager(cfg fundingConfig) (*fundingManager, error) { return &fundingManager{ - cfg: &cfg, - chanIDKey: cfg.TempChanIDSeed, - activeReservations: make(map[serializedPubKey]pendingChannels), - signedReservations: make(map[lnwire.ChannelID][32]byte), - newChanBarriers: make(map[lnwire.ChannelID]chan struct{}), - fundingMsgs: make(chan interface{}, msgBufferSize), - fundingRequests: make(chan *initFundingMsg, msgBufferSize), - localDiscoverySignals: make(map[lnwire.ChannelID]chan struct{}), - queries: make(chan interface{}, 1), - quit: make(chan struct{}), + cfg: &cfg, + chanIDKey: cfg.TempChanIDSeed, + activeReservations: make(map[serializedPubKey]pendingChannels), + signedReservations: make(map[lnwire.ChannelID][32]byte), + newChanBarriers: make(map[lnwire.ChannelID]chan struct{}), + fundingMsgs: make(chan interface{}, msgBufferSize), + fundingRequests: make(chan *initFundingMsg, msgBufferSize), + localDiscoverySignals: make(map[lnwire.ChannelID]chan struct{}), + handleFundingLockedBarriers: make(map[lnwire.ChannelID]struct{}), + queries: make(chan interface{}, 1), + quit: make(chan struct{}), }, nil } @@ -420,10 +424,22 @@ func (f *fundingManager) Start() error { return err } - fndgLog.Debugf("channel with opening state %v found", - channelState) - chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) + fndgLog.Debugf("channel (%v) with opening state %v found", + chanID, channelState) + + // Set up the channel barriers again, to make sure + // waitUntilChannelOpen correctly waits until the opening + // process is completely over. + f.barrierMtx.Lock() + fndgLog.Tracef("Loading pending ChannelPoint(%v), "+ + "creating chan barrier", channel.FundingOutpoint) + f.newChanBarriers[chanID] = make(chan struct{}) + f.barrierMtx.Unlock() + + // Set up a localDiscoverySignals to make sure we finish sending + // our own fundingLocked and channel announcements before + // processing a received fundingLocked. f.localDiscoverySignals[chanID] = make(chan struct{}) // If we did find the channel in the opening state database, we @@ -587,6 +603,7 @@ func (f *fundingManager) reservationCoordinator() { case *fundingSignedMsg: f.handleFundingSigned(fmsg) case *fundingLockedMsg: + f.wg.Add(1) go f.handleFundingLocked(fmsg) case *fundingErrorMsg: f.handleErrorMsg(fmsg) @@ -1520,17 +1537,46 @@ func (f *fundingManager) processFundingLocked(msg *lnwire.FundingLocked, // handleFundingLocked finalizes the channel funding process and enables the // channel to enter normal operating mode. func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { + defer f.wg.Done() + + // If we are currently in the process of handling a funding locked + // message for this channel, ignore. + f.handleFundingLockedMtx.Lock() + _, ok := f.handleFundingLockedBarriers[fmsg.msg.ChanID] + if ok { + fndgLog.Infof("Already handling fundingLocked for "+ + "ChannelID(%v), ignoring.", fmsg.msg.ChanID) + f.handleFundingLockedMtx.Unlock() + return + } + + // If not already handling fundingLocked for this channel, set up + // barrier, and move on. + f.handleFundingLockedBarriers[fmsg.msg.ChanID] = struct{}{} + f.handleFundingLockedMtx.Unlock() + + defer func() { + f.handleFundingLockedMtx.Lock() + delete(f.handleFundingLockedBarriers, fmsg.msg.ChanID) + f.handleFundingLockedMtx.Unlock() + }() + f.localDiscoveryMtx.Lock() localDiscoverySignal, ok := f.localDiscoverySignals[fmsg.msg.ChanID] f.localDiscoveryMtx.Unlock() if ok { // Before we proceed with processing the funding locked - // message, we'll wait for the lcoal waitForFundingConfirmation + // message, we'll wait for the local waitForFundingConfirmation // goroutine to signal that it has the necessary state in // place. Otherwise, we may be missing critical information // required to handle forwarded HTLC's. - <-localDiscoverySignal + select { + case <-localDiscoverySignal: + // Fallthrough + case <-f.quit: + return + } // With the signal received, we can now safely delete the entry // from the map. @@ -1550,7 +1596,14 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { return } - // TODO(roasbeef): done nothing if repeat message sent + // If the RemoteNextRevocation is non-nil, it means that we have + // already processed fundingLocked for this channel, so ignore. + if channel.RemoteNextRevocation() != nil { + fndgLog.Infof("Received duplicate fundingLocked for "+ + "ChannelID(%v), ignoring.", chanID) + channel.Stop() + return + } // The funding locked message contains the next commitment point we'll // need to create the next commitment state for the remote party. So @@ -1574,9 +1627,13 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { // that commitment related modifications to this channel can // now proceed. f.barrierMtx.Lock() - fndgLog.Tracef("Closing chan barrier for ChanID(%v)", chanID) - close(f.newChanBarriers[chanID]) - delete(f.newChanBarriers, chanID) + chanBarrier, ok := f.newChanBarriers[chanID] + if ok { + fndgLog.Tracef("Closing chan barrier for ChanID(%v)", + chanID) + close(chanBarrier) + delete(f.newChanBarriers, chanID) + } f.barrierMtx.Unlock() }() diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 7259ebb5..66bc6c24 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -100,13 +100,16 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, } type testNode struct { - privKey *btcec.PrivateKey - msgChan chan lnwire.Message - announceChan chan lnwire.Message - publTxChan chan *wire.MsgTx - fundingMgr *fundingManager - mockNotifier *mockNotifier - testDir string + privKey *btcec.PrivateKey + msgChan chan lnwire.Message + announceChan chan lnwire.Message + arbiterChan chan *lnwallet.LightningChannel + publTxChan chan *wire.MsgTx + fundingMgr *fundingManager + peer *peer + mockNotifier *mockNotifier + testDir string + shutdownChannel chan struct{} } func disableFndgLogger(t *testing.T) { @@ -115,17 +118,11 @@ func disableFndgLogger(t *testing.T) { fndgLog = btclog.Disabled } -func createTestWallet(tempTestDir string, netParams *chaincfg.Params, +func createTestWallet(cdb *channeldb.DB, netParams *chaincfg.Params, notifier chainntnfs.ChainNotifier, wc lnwallet.WalletController, signer lnwallet.Signer, bio lnwallet.BlockChainIO, estimator lnwallet.FeeEstimator) (*lnwallet.LightningWallet, error) { - dbDir := filepath.Join(tempTestDir, "cdb") - cdb, err := channeldb.Open(dbDir) - if err != nil { - return nil, err - } - wallet, err := lnwallet.NewLightningWallet(lnwallet.Config{ Database: cdb, Notifier: notifier, @@ -150,7 +147,8 @@ func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey, tempTestDir string, hdSeed []byte, netParams *chaincfg.Params, chainNotifier chainntnfs.ChainNotifier, estimator lnwallet.FeeEstimator, sentMessages chan lnwire.Message, sentAnnouncements chan lnwire.Message, - publTxChan chan *wire.MsgTx, shutdownChan chan struct{}) (*fundingManager, error) { + publTxChan chan *wire.MsgTx, shutdownChan chan struct{}, + arbiterChan chan *lnwallet.LightningChannel, fundingPeer *peer) (*fundingManager, error) { wc := &mockWalletController{ rootKey: alicePrivKey, @@ -161,13 +159,18 @@ func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey, } bio := &mockChainIO{} - lnw, err := createTestWallet(tempTestDir, netParams, + dbDir := filepath.Join(tempTestDir, "cdb") + cdb, err := channeldb.Open(dbDir) + if err != nil { + return nil, err + } + + lnw, err := createTestWallet(cdb, netParams, chainNotifier, wc, signer, bio, estimator) if err != nil { t.Fatalf("unable to create test ln wallet: %v", err) } - arbiterChan := make(chan *lnwallet.LightningChannel) var chanIDSeed [32]byte f, err := newFundingManager(fundingConfig{ @@ -199,14 +202,26 @@ func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey, return nil }, FindPeer: func(peerKey *btcec.PublicKey) (*peer, error) { - return nil, nil + return fundingPeer, nil }, TempChanIDSeed: chanIDSeed, FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) { - // This is not expected to be used in the current tests. - // Add an implementation if that changes. - t.Fatal("did not expect FindChannel to be called") - return nil, nil + dbChannels, err := cdb.FetchAllChannels() + if err != nil { + return nil, err + } + + for _, channel := range dbChannels { + if chanID.IsChanPoint(&channel.FundingOutpoint) { + return lnwallet.NewLightningChannel( + signer, + nil, + estimator, + channel) + } + } + + return nil, fmt.Errorf("unable to find channel") }, NumRequiredConfs: func(chanAmt btcutil.Amount, pushAmt lnwire.MilliSatoshi) uint16 { @@ -226,12 +241,14 @@ func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey, func recreateAliceFundingManager(t *testing.T, alice *testNode) { // Stop the old fundingManager before creating a new one. + close(alice.shutdownChannel) if err := alice.fundingMgr.Stop(); err != nil { t.Fatalf("unable to stop old fundingManager: %v", err) } aliceMsgChan := make(chan lnwire.Message) aliceAnnounceChan := make(chan lnwire.Message) + shutdownChan := make(chan struct{}) oldCfg := alice.fundingMgr.cfg @@ -245,7 +262,11 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) { return nil, nil }, SendAnnouncement: func(msg lnwire.Message) error { - aliceAnnounceChan <- msg + select { + case aliceAnnounceChan <- msg: + case <-shutdownChan: + return fmt.Errorf("shutting down") + } return nil }, CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { @@ -254,12 +275,14 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) { ArbiterChan: oldCfg.ArbiterChan, SendToPeer: func(target *btcec.PublicKey, msgs ...lnwire.Message) error { - aliceMsgChan <- msgs[0] + select { + case aliceMsgChan <- msgs[0]: + case <-shutdownChan: + return fmt.Errorf("shutting down") + } return nil }, - FindPeer: func(peerKey *btcec.PublicKey) (*peer, error) { - return nil, nil - }, + FindPeer: oldCfg.FindPeer, TempChanIDSeed: oldCfg.TempChanIDSeed, FindChannel: oldCfg.FindChannel, }) @@ -270,13 +293,14 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) { alice.fundingMgr = f alice.msgChan = aliceMsgChan alice.announceChan = aliceAnnounceChan + alice.shutdownChannel = shutdownChan if err = f.Start(); err != nil { t.Fatalf("failed starting fundingManager: %v", err) } } -func setupFundingManagers(t *testing.T, shutdownChannel chan struct{}) (*testNode, *testNode) { +func setupFundingManagers(t *testing.T) (*testNode, *testNode) { // We need to set the global config, as fundingManager uses // MaxPendingChannels, and it is usually set in lndMain(). cfg = &config{ @@ -286,6 +310,16 @@ func setupFundingManagers(t *testing.T, shutdownChannel chan struct{}) (*testNod netParams := activeNetParams.Params estimator := lnwallet.StaticFeeEstimator{FeeRate: 250} + aliceNewChannelsChan := make(chan *newChannelMsg) + alicePeer := &peer{ + newChannels: aliceNewChannelsChan, + } + + bobNewChannelsChan := make(chan *newChannelMsg) + bobPeer := &peer{ + newChannels: bobNewChannelsChan, + } + aliceMockNotifier := &mockNotifier{ confChannel: make(chan *chainntnfs.TxConfirmation, 1), epochChan: make(chan *chainntnfs.BlockEpoch, 1), @@ -299,11 +333,13 @@ func setupFundingManagers(t *testing.T, shutdownChannel chan struct{}) (*testNod aliceMsgChan := make(chan lnwire.Message) aliceAnnounceChan := make(chan lnwire.Message) alicePublTxChan := make(chan *wire.MsgTx, 1) + aliceArbiterChan := make(chan *lnwallet.LightningChannel) + aliceShutdownChannel := make(chan struct{}) aliceFundingMgr, err := createTestFundingManager(t, alicePubKey, aliceTestDir, alicePrivKeyBytes[:], netParams, aliceMockNotifier, estimator, aliceMsgChan, aliceAnnounceChan, alicePublTxChan, - shutdownChannel) + aliceShutdownChannel, aliceArbiterChan, alicePeer) if err != nil { t.Fatalf("failed creating fundingManager: %v", err) } @@ -313,13 +349,16 @@ func setupFundingManagers(t *testing.T, shutdownChannel chan struct{}) (*testNod } alice := &testNode{ - privKey: alicePrivKey, - msgChan: aliceMsgChan, - announceChan: aliceAnnounceChan, - publTxChan: alicePublTxChan, - fundingMgr: aliceFundingMgr, - mockNotifier: aliceMockNotifier, - testDir: aliceTestDir, + privKey: alicePrivKey, + msgChan: aliceMsgChan, + announceChan: aliceAnnounceChan, + arbiterChan: aliceArbiterChan, + publTxChan: alicePublTxChan, + fundingMgr: aliceFundingMgr, + peer: alicePeer, + mockNotifier: aliceMockNotifier, + testDir: aliceTestDir, + shutdownChannel: aliceShutdownChannel, } bobMockNotifier := &mockNotifier{ @@ -335,9 +374,13 @@ func setupFundingManagers(t *testing.T, shutdownChannel chan struct{}) (*testNod bobMsgChan := make(chan lnwire.Message) bobAnnounceChan := make(chan lnwire.Message) bobPublTxChan := make(chan *wire.MsgTx, 1) + bobArbiterChan := make(chan *lnwallet.LightningChannel) + bobShutdownChannel := make(chan struct{}) + bobFundingMgr, err := createTestFundingManager(t, bobPubKey, bobTestDir, bobPrivKeyBytes[:], netParams, bobMockNotifier, estimator, - bobMsgChan, bobAnnounceChan, bobPublTxChan, shutdownChannel) + bobMsgChan, bobAnnounceChan, bobPublTxChan, shutdownChannel, + bobArbiterChan, bobPeer) if err != nil { t.Fatalf("failed creating fundingManager: %v", err) } @@ -347,20 +390,24 @@ func setupFundingManagers(t *testing.T, shutdownChannel chan struct{}) (*testNod } bob := &testNode{ - privKey: bobPrivKey, - msgChan: bobMsgChan, - announceChan: bobAnnounceChan, - publTxChan: bobPublTxChan, - fundingMgr: bobFundingMgr, - mockNotifier: bobMockNotifier, - testDir: bobTestDir, + privKey: bobPrivKey, + msgChan: bobMsgChan, + announceChan: bobAnnounceChan, + arbiterChan: bobArbiterChan, + publTxChan: bobPublTxChan, + fundingMgr: bobFundingMgr, + peer: bobPeer, + mockNotifier: bobMockNotifier, + testDir: bobTestDir, + shutdownChannel: bobShutdownChannel, } return alice, bob } -func tearDownFundingManagers(t *testing.T, a, b *testNode, shutdownChannel chan struct{}) { - close(shutdownChannel) +func tearDownFundingManagers(t *testing.T, a, b *testNode) { + close(a.shutdownChannel) + close(b.shutdownChannel) if err := a.fundingMgr.Stop(); err != nil { t.Fatalf("unable to stop fundingManager: %v", err) @@ -515,10 +562,8 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, func TestFundingManagerNormalWorkflow(t *testing.T) { disableFndgLogger(t) - shutdownChannel := make(chan struct{}) - - alice, bob := setupFundingManagers(t, shutdownChannel) - defer tearDownFundingManagers(t, alice, bob, shutdownChannel) + alice, bob := setupFundingManagers(t) + defer tearDownFundingManagers(t, alice, bob) // We will consume the channel updates as we go, so no buffering is needed. updateChan := make(chan *lnrpc.OpenStatusUpdate) @@ -557,28 +602,43 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - var fundingLockedAlice lnwire.Message + var aliceMsg lnwire.Message select { - case fundingLockedAlice = <-alice.msgChan: + case aliceMsg = <-alice.msgChan: case <-time.After(time.Second * 5): t.Fatalf("alice did not send fundingLocked") } - if fundingLockedAlice.MsgType() != lnwire.MsgFundingLocked { - t.Fatalf("expected fundingLocked sent from Alice, "+ - "instead got %T", fundingLockedAlice) + + fundingLockedAlice, ok := aliceMsg.(*lnwire.FundingLocked) + if !ok { + errorMsg, gotError := aliceMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected FundingLocked to be sent "+ + "from alice, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected FundingLocked to be sent from alice, "+ + "instead got %T", aliceMsg) } // And similarly Bob will send funding locked to Alice. - var fundingLockedBob lnwire.Message + var bobMsg lnwire.Message select { - case fundingLockedBob = <-bob.msgChan: + case bobMsg = <-bob.msgChan: case <-time.After(time.Second * 5): t.Fatalf("bob did not send fundingLocked") } - if fundingLockedBob.MsgType() != lnwire.MsgFundingLocked { - t.Fatalf("expected fundingLocked sent from Bob, "+ - "instead got %T", fundingLockedBob) + fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked) + if !ok { + errorMsg, gotError := bobMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected FundingLocked to be sent "+ + "from bob, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected FundingLocked to be sent from bob, "+ + "instead got %T", bobMsg) } // Sleep to make sure database write is finished. @@ -698,7 +758,7 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { t.Fatalf("alice did not send OpenStatusUpdate") } - _, ok := openUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanOpen) + _, ok = openUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanOpen) if !ok { t.Fatal("OpenStatusUpdate was not OpenStatusUpdate_ChanOpen") } @@ -718,15 +778,45 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { t.Fatalf("expected to not find channel state, but got: %v", state) } + // Exchange the fundingLocked messages. + alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + + // They should both send the new channel to the breach arbiter. + select { + case <-alice.arbiterChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send channel to breach arbiter") + } + + select { + case <-bob.arbiterChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send channel to breach arbiter") + } + + // And send the new channel state to their peer. + select { + case c := <-alice.peer.newChannels: + close(c.done) + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send new channel to peer") + } + + select { + case c := <-bob.peer.newChannels: + close(c.done) + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send new channel to peer") + } + } func TestFundingManagerRestartBehavior(t *testing.T) { disableFndgLogger(t) - shutdownChannel := make(chan struct{}) - - alice, bob := setupFundingManagers(t, shutdownChannel) - defer tearDownFundingManagers(t, alice, bob, shutdownChannel) + alice, bob := setupFundingManagers(t) + defer tearDownFundingManagers(t, alice, bob) // Run through the process of opening the channel, up until the funding // transaction is broadcasted. @@ -776,8 +866,7 @@ func TestFundingManagerRestartBehavior(t *testing.T) { // After the funding transaction was mined, Bob should have successfully // sent the fundingLocked message, while Alice failed sending it. In // Alice's case this means that there should be no messages for Bob, and - // the channel should still be in state 'markedOpen' - + // the channel should still be in state 'markedOpen' select { case msg := <-alice.msgChan: t.Fatalf("did not expect any message from Alice: %v", msg) @@ -786,10 +875,23 @@ func TestFundingManagerRestartBehavior(t *testing.T) { } // Bob will send funding locked to Alice - fundingLockedBob := <-bob.msgChan - if fundingLockedBob.MsgType() != lnwire.MsgFundingLocked { - t.Fatalf("expected fundingLocked sent from Bob, "+ - "instead got %T", fundingLockedBob) + var bobMsg lnwire.Message + select { + case bobMsg = <-bob.msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send fundingLocked") + } + + fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked) + if !ok { + errorMsg, gotError := bobMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected FundingLocked to be sent "+ + "from bob, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected FundingLocked to be sent from bob, "+ + "instead got %T", bobMsg) } // Sleep to make sure database write is finished. @@ -825,10 +927,23 @@ func TestFundingManagerRestartBehavior(t *testing.T) { return fmt.Errorf("intentional error in SendAnnouncement") } - fundingLockedAlice := <-alice.msgChan - if fundingLockedAlice.MsgType() != lnwire.MsgFundingLocked { - t.Fatalf("expected fundingLocked sent from Alice, "+ - "instead got %T", fundingLockedAlice) + var aliceMsg lnwire.Message + select { + case aliceMsg = <-alice.msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send fundingLocked") + } + + fundingLockedAlice, ok := aliceMsg.(*lnwire.FundingLocked) + if !ok { + errorMsg, gotError := aliceMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected FundingLocked to be sent "+ + "from alice, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected FundingLocked to be sent from alice, "+ + "instead got %T", aliceMsg) } // Sleep to make sure database write is finished. @@ -953,15 +1068,45 @@ func TestFundingManagerRestartBehavior(t *testing.T) { t.Fatalf("expected to not find channel state, but got: %v", state) } + // Exchange the fundingLocked messages. + alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + + // They should both send the new channel to the breach arbiter. + select { + case <-alice.arbiterChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send channel to breach arbiter") + } + + select { + case <-bob.arbiterChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send channel to breach arbiter") + } + + // And send the new channel state to their peer. + select { + case c := <-alice.peer.newChannels: + close(c.done) + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send new channel to peer") + } + + select { + case c := <-bob.peer.newChannels: + close(c.done) + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send new channel to peer") + } + } func TestFundingManagerFundingTimeout(t *testing.T) { disableFndgLogger(t) - shutdownChannel := make(chan struct{}) - - alice, bob := setupFundingManagers(t, shutdownChannel) - defer tearDownFundingManagers(t, alice, bob, shutdownChannel) + alice, bob := setupFundingManagers(t) + defer tearDownFundingManagers(t, alice, bob) // We will consume the channel updates as we go, so no buffering is needed. updateChan := make(chan *lnrpc.OpenStatusUpdate) @@ -1016,3 +1161,805 @@ func TestFundingManagerFundingTimeout(t *testing.T) { len(pendingChannels)) } } + +// TestFundingManagerReceiveFundingLockedTwice checks that the fundingManager +// continues to operate as expected in case we receive a duplicate fundingLocked +// message. +func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { + disableFndgLogger(t) + + alice, bob := setupFundingManagers(t) + defer tearDownFundingManagers(t, alice, bob) + + // We will consume the channel updates as we go, so no buffering is needed. + updateChan := make(chan *lnrpc.OpenStatusUpdate) + + // Run through the process of opening the channel, up until the funding + // transaction is broadcasted. + fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan) + + // Notify that transaction was mined + alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} + bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} + + // Give fundingManager time to process the newly mined tx and write + //state to database. + time.Sleep(300 * time.Millisecond) + + // The funding transaction was mined, so assert that both funding + // managers now have the state of this channel 'markedOpen' in their + // internal state machine. + state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != markedOpen { + t.Fatalf("expected state to be markedOpen, was %v", state) + } + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != markedOpen { + t.Fatalf("expected state to be markedOpen, was %v", state) + } + + // After the funding transaction is mined, Alice will send + // fundingLocked to Bob. + var aliceMsg lnwire.Message + select { + case aliceMsg = <-alice.msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send fundingLocked") + } + + _, ok := aliceMsg.(*lnwire.FundingLocked) + if !ok { + errorMsg, gotError := aliceMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected FundingLocked to be sent "+ + "from alice, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected FundingLocked to be sent from alice, "+ + "instead got %T", aliceMsg) + } + + // And similarly Bob will send funding locked to Alice. + var bobMsg lnwire.Message + select { + case bobMsg = <-bob.msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send fundingLocked") + } + + fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked) + if !ok { + errorMsg, gotError := bobMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected FundingLocked to be sent "+ + "from bob, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected FundingLocked to be sent from bob, "+ + "instead got %T", bobMsg) + } + + // Sleep to make sure database write is finished. + time.Sleep(300 * time.Millisecond) + + // Check that the state machine is updated accordingly + state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != fundingLockedSent { + t.Fatalf("expected state to be fundingLockedSent, was %v", state) + } + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != fundingLockedSent { + t.Fatalf("expected state to be fundingLockedSent, was %v", state) + } + + // After the FundingLocked message is sent, the channel will be announced. + // A chanAnnouncement consists of three distinct messages: + // 1) ChannelAnnouncement + // 2) ChannelUpdate + // 3) AnnounceSignatures + // that will be announced in no particular order. + // A node announcement will also be sent. + announcements := make([]lnwire.Message, 4) + for i := 0; i < len(announcements); i++ { + select { + case announcements[i] = <-alice.announceChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send announcement %v", i) + } + } + + gotChannelAnnouncement := false + gotChannelUpdate := false + gotAnnounceSignatures := false + gotNodeAnnouncement := false + + for _, msg := range announcements { + switch msg.(type) { + case *lnwire.ChannelAnnouncement: + gotChannelAnnouncement = true + case *lnwire.ChannelUpdate: + gotChannelUpdate = true + case *lnwire.AnnounceSignatures: + gotAnnounceSignatures = true + case *lnwire.NodeAnnouncement: + gotNodeAnnouncement = true + } + } + + if !gotChannelAnnouncement { + t.Fatalf("did not get ChannelAnnouncement from Alice") + } + if !gotChannelUpdate { + t.Fatalf("did not get ChannelUpdate from Alice") + } + if !gotAnnounceSignatures { + t.Fatalf("did not get AnnounceSignatures from Alice") + } + if !gotNodeAnnouncement { + t.Fatalf("did not get NodeAnnouncement from Alice") + } + + // Do the check for Bob as well. + for i := 0; i < len(announcements); i++ { + select { + case announcements[i] = <-bob.announceChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send announcement %v", i) + } + } + + gotChannelAnnouncement = false + gotChannelUpdate = false + gotAnnounceSignatures = false + gotNodeAnnouncement = false + + for _, msg := range announcements { + switch msg.(type) { + case *lnwire.ChannelAnnouncement: + gotChannelAnnouncement = true + case *lnwire.ChannelUpdate: + gotChannelUpdate = true + case *lnwire.AnnounceSignatures: + gotAnnounceSignatures = true + case *lnwire.NodeAnnouncement: + gotNodeAnnouncement = true + } + } + + if !gotChannelAnnouncement { + t.Fatalf("did not get ChannelAnnouncement from Bob") + } + if !gotChannelUpdate { + t.Fatalf("did not get ChannelUpdate from Bob") + } + if !gotAnnounceSignatures { + t.Fatalf("did not get AnnounceSignatures from Bob") + } + if !gotNodeAnnouncement { + t.Fatalf("did not get NodeAnnouncement from Bob") + } + + // The funding process is now finished, wait for the + // OpenStatusUpdate_ChanOpen update + var openUpdate *lnrpc.OpenStatusUpdate + select { + case openUpdate = <-updateChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send OpenStatusUpdate") + } + + _, ok = openUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanOpen) + if !ok { + t.Fatal("OpenStatusUpdate was not OpenStatusUpdate_ChanOpen") + } + + // The internal state-machine should now have deleted the channelStates + // from the database, as the channel is announced. + state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != ErrChannelNotFound { + t.Fatalf("expected to not find channel state, but got: %v", state) + } + + // Need to give bob time to update database. + time.Sleep(300 * time.Millisecond) + + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != ErrChannelNotFound { + t.Fatalf("expected to not find channel state, but got: %v", state) + } + + // Send the fundingLocked message twice to Alice. + alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + + // Alice should send the new channel to the breach arbiter. + select { + case <-alice.arbiterChan: + case <-time.After(time.Millisecond * 300): + t.Fatalf("alice did not send channel to breach arbiter") + } + + // And send the new channel state to their peer. + select { + case c := <-alice.peer.newChannels: + close(c.done) + case <-time.After(time.Millisecond * 300): + t.Fatalf("alice did not send new channel to peer") + } + + // Alice should not send the channel state the second time, as the + // second funding locked should just be ignored. + select { + case <-alice.arbiterChan: + t.Fatalf("alice sent channel to breach arbiter a second time") + case <-time.After(time.Millisecond * 300): + // Expected + } + select { + case <-alice.peer.newChannels: + t.Fatalf("alice sent new channel to peer a second time") + case <-time.After(time.Millisecond * 300): + // Expected + } + + // Another fundingLocked should also be ignored, since Alice should + // have updated her database at this point. + alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + select { + case <-alice.arbiterChan: + t.Fatalf("alice sent channel to breach arbiter a second time") + case <-time.After(time.Millisecond * 300): + // Expected + } + select { + case <-alice.peer.newChannels: + t.Fatalf("alice sent new channel to peer a second time") + case <-time.After(time.Millisecond * 300): + // Expected + } + +} + +// TestFundingManagerRestartAfterChanAnn checks that the fundingManager properly +// handles receiving a fundingLocked after the its own fundingLocked and channel +// announcement is sent and gets restarted. +func TestFundingManagerRestartAfterChanAnn(t *testing.T) { + disableFndgLogger(t) + + alice, bob := setupFundingManagers(t) + defer tearDownFundingManagers(t, alice, bob) + + // We will consume the channel updates as we go, so no buffering is needed. + updateChan := make(chan *lnrpc.OpenStatusUpdate) + + // Run through the process of opening the channel, up until the funding + // transaction is broadcasted. + fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan) + + // Notify that transaction was mined + alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} + bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} + + // Give fundingManager time to process the newly mined tx and write + //state to database. + time.Sleep(300 * time.Millisecond) + + // The funding transaction was mined, so assert that both funding + // managers now have the state of this channel 'markedOpen' in their + // internal state machine. + state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != markedOpen { + t.Fatalf("expected state to be markedOpen, was %v", state) + } + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != markedOpen { + t.Fatalf("expected state to be markedOpen, was %v", state) + } + + // After the funding transaction is mined, Alice will send + // fundingLocked to Bob. + var aliceMsg lnwire.Message + select { + case aliceMsg = <-alice.msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send fundingLocked") + } + + fundingLockedAlice, ok := aliceMsg.(*lnwire.FundingLocked) + if !ok { + errorMsg, gotError := aliceMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected FundingLocked to be sent "+ + "from alice, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected FundingLocked to be sent from alice, "+ + "instead got %T", aliceMsg) + } + + // And similarly Bob will send funding locked to Alice. + var bobMsg lnwire.Message + select { + case bobMsg = <-bob.msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send fundingLocked") + } + + fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked) + if !ok { + errorMsg, gotError := bobMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected FundingLocked to be sent "+ + "from bob, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected FundingLocked to be sent from bob, "+ + "instead got %T", bobMsg) + } + + // Sleep to make sure database write is finished. + time.Sleep(300 * time.Millisecond) + + // Check that the state machine is updated accordingly + state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != fundingLockedSent { + t.Fatalf("expected state to be fundingLockedSent, was %v", state) + } + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != fundingLockedSent { + t.Fatalf("expected state to be fundingLockedSent, was %v", state) + } + + // After the FundingLocked message is sent, the channel will be announced. + // A chanAnnouncement consists of three distinct messages: + // 1) ChannelAnnouncement + // 2) ChannelUpdate + // 3) AnnounceSignatures + // that will be announced in no particular order. + // A node announcement will also be sent. + announcements := make([]lnwire.Message, 4) + for i := 0; i < len(announcements); i++ { + select { + case announcements[i] = <-alice.announceChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send announcement %v", i) + } + } + + gotChannelAnnouncement := false + gotChannelUpdate := false + gotAnnounceSignatures := false + gotNodeAnnouncement := false + + for _, msg := range announcements { + switch msg.(type) { + case *lnwire.ChannelAnnouncement: + gotChannelAnnouncement = true + case *lnwire.ChannelUpdate: + gotChannelUpdate = true + case *lnwire.AnnounceSignatures: + gotAnnounceSignatures = true + case *lnwire.NodeAnnouncement: + gotNodeAnnouncement = true + } + } + + if !gotChannelAnnouncement { + t.Fatalf("did not get ChannelAnnouncement from Alice") + } + if !gotChannelUpdate { + t.Fatalf("did not get ChannelUpdate from Alice") + } + if !gotAnnounceSignatures { + t.Fatalf("did not get AnnounceSignatures from Alice") + } + if !gotNodeAnnouncement { + t.Fatalf("did not get NodeAnnouncement from Alice") + } + + // Do the check for Bob as well. + for i := 0; i < len(announcements); i++ { + select { + case announcements[i] = <-bob.announceChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send announcement %v", i) + } + } + + gotChannelAnnouncement = false + gotChannelUpdate = false + gotAnnounceSignatures = false + gotNodeAnnouncement = false + + for _, msg := range announcements { + switch msg.(type) { + case *lnwire.ChannelAnnouncement: + gotChannelAnnouncement = true + case *lnwire.ChannelUpdate: + gotChannelUpdate = true + case *lnwire.AnnounceSignatures: + gotAnnounceSignatures = true + case *lnwire.NodeAnnouncement: + gotNodeAnnouncement = true + } + } + + if !gotChannelAnnouncement { + t.Fatalf("did not get ChannelAnnouncement from Bob") + } + if !gotChannelUpdate { + t.Fatalf("did not get ChannelUpdate from Bob") + } + if !gotAnnounceSignatures { + t.Fatalf("did not get AnnounceSignatures from Bob") + } + if !gotNodeAnnouncement { + t.Fatalf("did not get NodeAnnouncement from Bob") + } + + // The funding process is now finished, wait for the + // OpenStatusUpdate_ChanOpen update + var openUpdate *lnrpc.OpenStatusUpdate + select { + case openUpdate = <-updateChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send OpenStatusUpdate") + } + + _, ok = openUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanOpen) + if !ok { + t.Fatal("OpenStatusUpdate was not OpenStatusUpdate_ChanOpen") + } + + // The internal state-machine should now have deleted the channelStates + // from the database, as the channel is announced. + state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != ErrChannelNotFound { + t.Fatalf("expected to not find channel state, but got: %v", state) + } + + // Need to give bob time to update database. + time.Sleep(300 * time.Millisecond) + + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != ErrChannelNotFound { + t.Fatalf("expected to not find channel state, but got: %v", state) + } + + // At this point we restart Alice's fundingManager, before she receives + // the fundingLocked message. After restart, she will receive it, and + // we expect her to be able to handle it correctly. + recreateAliceFundingManager(t, alice) + time.Sleep(300 * time.Millisecond) + + // Exchange the fundingLocked messages. + alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + + // They should both send the new channel to the breach arbiter. + select { + case <-alice.arbiterChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send channel to breach arbiter") + } + + select { + case <-bob.arbiterChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send channel to breach arbiter") + } + + // And send the new channel state to their peer. + select { + case c := <-alice.peer.newChannels: + close(c.done) + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send new channel to peer") + } + + select { + case c := <-bob.peer.newChannels: + close(c.done) + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send new channel to peer") + } + +} + +// TestFundingManagerRestartAfterReceivingFundingLocked checks that the +// fundingManager continues to operate as expected after it has received +// fundingLocked and then gets restarted. +func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) { + disableFndgLogger(t) + + alice, bob := setupFundingManagers(t) + defer tearDownFundingManagers(t, alice, bob) + + // We will consume the channel updates as we go, so no buffering is needed. + updateChan := make(chan *lnrpc.OpenStatusUpdate) + + // Run through the process of opening the channel, up until the funding + // transaction is broadcasted. + fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan) + + // Notify that transaction was mined + alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} + bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} + + // Give fundingManager time to process the newly mined tx and write + //state to database. + time.Sleep(300 * time.Millisecond) + + // The funding transaction was mined, so assert that both funding + // managers now have the state of this channel 'markedOpen' in their + // internal state machine. + state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != markedOpen { + t.Fatalf("expected state to be markedOpen, was %v", state) + } + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != markedOpen { + t.Fatalf("expected state to be markedOpen, was %v", state) + } + + // After the funding transaction is mined, Alice will send + // fundingLocked to Bob. + var aliceMsg lnwire.Message + select { + case aliceMsg = <-alice.msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send fundingLocked") + } + + fundingLockedAlice, ok := aliceMsg.(*lnwire.FundingLocked) + if !ok { + errorMsg, gotError := aliceMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected FundingLocked to be sent "+ + "from alice, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected FundingLocked to be sent from alice, "+ + "instead got %T", aliceMsg) + } + + // And similarly Bob will send funding locked to Alice. + var bobMsg lnwire.Message + select { + case bobMsg = <-bob.msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send fundingLocked") + } + + fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked) + if !ok { + errorMsg, gotError := bobMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected FundingLocked to be sent "+ + "from bob, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected FundingLocked to be sent from bob, "+ + "instead got %T", bobMsg) + } + + // Sleep to make sure database write is finished. + time.Sleep(300 * time.Millisecond) + + // Check that the state machine is updated accordingly + state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != fundingLockedSent { + t.Fatalf("expected state to be fundingLockedSent, was %v", state) + } + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != fundingLockedSent { + t.Fatalf("expected state to be fundingLockedSent, was %v", state) + } + + // Let Alice immediately get the fundingLocked message. + alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + time.Sleep(300 * time.Millisecond) + + // She will block waiting for local channel announcements to finish + // before sending the new channel state to the peer. + select { + case <-alice.peer.newChannels: + t.Fatalf("did not expect alice to handle the fundinglocked") + case <-time.After(time.Millisecond * 300): + } + + // At this point we restart Alice's fundingManager. Bob will resend + // the fundingLocked after the connection is re-established. + recreateAliceFundingManager(t, alice) + time.Sleep(300 * time.Millisecond) + + // Simulate Bob resending the message when Alice is back up. + alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + + // After the FundingLocked message is sent, the channel will be announced. + // A chanAnnouncement consists of three distinct messages: + // 1) ChannelAnnouncement + // 2) ChannelUpdate + // 3) AnnounceSignatures + // that will be announced in no particular order. + // A node announcement will also be sent. + announcements := make([]lnwire.Message, 4) + for i := 0; i < len(announcements); i++ { + select { + case announcements[i] = <-alice.announceChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send announcement %v", i) + } + } + + gotChannelAnnouncement := false + gotChannelUpdate := false + gotAnnounceSignatures := false + gotNodeAnnouncement := false + + for _, msg := range announcements { + switch msg.(type) { + case *lnwire.ChannelAnnouncement: + gotChannelAnnouncement = true + case *lnwire.ChannelUpdate: + gotChannelUpdate = true + case *lnwire.AnnounceSignatures: + gotAnnounceSignatures = true + case *lnwire.NodeAnnouncement: + gotNodeAnnouncement = true + } + } + + if !gotChannelAnnouncement { + t.Fatalf("did not get ChannelAnnouncement from Alice") + } + if !gotChannelUpdate { + t.Fatalf("did not get ChannelUpdate from Alice") + } + if !gotAnnounceSignatures { + t.Fatalf("did not get AnnounceSignatures from Alice") + } + if !gotNodeAnnouncement { + t.Fatalf("did not get NodeAnnouncement from Alice") + } + + // Do the check for Bob as well. + for i := 0; i < len(announcements); i++ { + select { + case announcements[i] = <-bob.announceChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send announcement %v", i) + } + } + + gotChannelAnnouncement = false + gotChannelUpdate = false + gotAnnounceSignatures = false + gotNodeAnnouncement = false + + for _, msg := range announcements { + switch msg.(type) { + case *lnwire.ChannelAnnouncement: + gotChannelAnnouncement = true + case *lnwire.ChannelUpdate: + gotChannelUpdate = true + case *lnwire.AnnounceSignatures: + gotAnnounceSignatures = true + case *lnwire.NodeAnnouncement: + gotNodeAnnouncement = true + } + } + + if !gotChannelAnnouncement { + t.Fatalf("did not get ChannelAnnouncement from Bob") + } + if !gotChannelUpdate { + t.Fatalf("did not get ChannelUpdate from Bob") + } + if !gotAnnounceSignatures { + t.Fatalf("did not get AnnounceSignatures from Bob") + } + if !gotNodeAnnouncement { + t.Fatalf("did not get NodeAnnouncement from Bob") + } + + // The funding process is now finished. Since we recreated the + // fundingManager, we don't have an update channel to synchronize on, + // so a small sleep makes sure the database writing is finished. + time.Sleep(300 * time.Millisecond) + + // The internal state-machine should now have deleted the channelStates + // from the database, as the channel is announced. + state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != ErrChannelNotFound { + t.Fatalf("expected to not find channel state, but got: %v", state) + } + + // Need to give bob time to update database. + time.Sleep(300 * time.Millisecond) + + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != ErrChannelNotFound { + t.Fatalf("expected to not find channel state, but got: %v", state) + } + + // Exchange the fundingLocked messages. + bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + + // They should both send the new channel to the breach arbiter. + select { + case <-alice.arbiterChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send channel to breach arbiter") + } + + select { + case <-bob.arbiterChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send channel to breach arbiter") + } + + // And send the new channel state to their peer. + select { + case c := <-alice.peer.newChannels: + close(c.done) + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send new channel to peer") + } + + select { + case c := <-bob.peer.newChannels: + close(c.done) + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send new channel to peer") + } + +} From 5d03256baf72ed5c0e4f0ebdf19e75ef403be19a Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 7 Sep 2017 15:04:07 +0200 Subject: [PATCH 3/8] htlcswith/link: resend fundingLocked from channelLink when numUpdates == 0. In the case where the channelLink get started and the number of updates on this channel is zero, this means no paymenys has been done using this channel. This might mean that the fundingLocked never was sent successfully, so we resend to make sure this channel gets opened correctly. --- htlcswitch/link.go | 21 ++++++++++++++++++++- htlcswitch/mock.go | 3 +++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index bd3bdf69..516b8243 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -322,7 +322,26 @@ func (l *channelLink) htlcManager() { // TODO(roasbeef): fail chan in case of protocol violation - // TODO(roasbeef): resend funding locked if state zero + // If the number of updates on this channel has been zero, we should + // resend the fundingLocked message. This is because in this case we + // cannot be sure if the peer really received the last fundingLocked we + // sent, so resend now. + if l.channel.StateSnapshot().NumUpdates == 0 { + log.Debugf("Resending fundingLocked message to peer.") + + nextRevocation, err := l.channel.NextRevocationKey() + if err != nil { + log.Errorf("unable to create next revocation: %v", err) + } + + fundingLockedMsg := lnwire.NewFundingLocked(l.ChanID(), + nextRevocation) + err = l.cfg.Peer.SendMessage(fundingLockedMsg) + if err != nil { + log.Errorf("failed resending fundingLocked to peer: %v", + err) + } + } out: for { diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 47b3e24a..c437cc57 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -280,6 +280,9 @@ func (s *mockServer) readHandler(message lnwire.Message) error { targetChan = msg.ChanID case *lnwire.CommitSig: targetChan = msg.ChanID + case *lnwire.FundingLocked: + // Ignore + return nil default: return errors.New("unknown message type") } From 4b4c431d67fffd188443e08ef70e5ee98f55a42d Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 13 Sep 2017 14:38:06 +0200 Subject: [PATCH 4/8] server: add NotifyWhenOnline method. This commit adds a listener queue for each peer, that can be used to queue listeners that will be notified when the targetted peer eventually comes online. --- server.go | 44 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/server.go b/server.go index e1e60035..2fd67771 100644 --- a/server.go +++ b/server.go @@ -57,6 +57,8 @@ type server struct { inboundPeers map[string]*peer outboundPeers map[string]*peer + peerConnectedListeners map[string][]chan<- struct{} + persistentPeers map[string]struct{} persistentConnReqs map[string][]*connmgr.ConnReq @@ -134,10 +136,11 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, persistentPeers: make(map[string]struct{}), persistentConnReqs: make(map[string][]*connmgr.ConnReq), - peersByID: make(map[int32]*peer), - peersByPub: make(map[string]*peer), - inboundPeers: make(map[string]*peer), - outboundPeers: make(map[string]*peer), + peersByID: make(map[int32]*peer), + peersByPub: make(map[string]*peer), + inboundPeers: make(map[string]*peer), + outboundPeers: make(map[string]*peer), + peerConnectedListeners: make(map[string][]chan<- struct{}), globalFeatures: globalFeatures, localFeatures: localFeatures, @@ -860,6 +863,33 @@ func (s *server) SendToPeer(target *btcec.PublicKey, return s.sendToPeer(target, msgs) } +// NotifyWhenOnline can be called by other subsystems to get notified when a +// particular peer comes online. +// +// NOTE: This function is safe for concurrent access. +func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { + s.mu.Lock() + defer s.mu.Unlock() + + // Compute the target peer's identifier. + pubStr := string(peer.SerializeCompressed()) + + // Check if peer is connected. + _, ok := s.peersByPub[pubStr] + if ok { + // Connected, can return early. + srvrLog.Debugf("Notifying that peer %v is online", pubStr) + close(connectedChan) + return + } + + // Not connected, store this listener such that it can be notified when + // the peer comes online. + s.peerConnectedListeners[pubStr] = append( + s.peerConnectedListeners[pubStr], connectedChan) +} + // sendToPeer is an internal method that delivers messages to the specified // `target` peer. func (s *server) sendToPeer(target *btcec.PublicKey, @@ -1272,6 +1302,12 @@ func (s *server) addPeer(p *peer) { // channel router so we can synchronize our view of the channel graph // with this new peer. go s.authGossiper.SynchronizeNode(p.addr.IdentityKey) + + // Check if there are listeners waiting for this peer to come online. + for _, con := range s.peerConnectedListeners[pubStr] { + close(con) + } + delete(s.peerConnectedListeners, pubStr) } // removePeer removes the passed peer from the server's state of all active From b8cadf881cf720e268bafd43f5f8afc952973a72 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Mon, 25 Sep 2017 00:01:05 +0200 Subject: [PATCH 5/8] fundingManager: use NotifyWhenOnline when sending fundingLocked. The fundingManager will register with the server to get notified when the targetted peer comes online, in case of a failed send of the fundingLocked message. This is necessary because if the peer is not connected yet (or was disconnected while we were waiting for the funding tx to confirm), we cannot continue the the opening process before the peer successfully has received the fundingLocked. --- fundingmanager.go | 46 ++++++++++++++++++++++++++++++++++++++++------ lnd.go | 9 +++++---- 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 0e395e7a..365387db 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -190,6 +190,12 @@ type fundingConfig struct { // channel's funding transaction and initial commitment transaction. SendToPeer func(target *btcec.PublicKey, msgs ...lnwire.Message) error + // NotifyWhenOnline allows the FundingManager to register with a + // subsystem that will notify it when the peer comes online. + // This is used when sending the fundingLocked message, since it MUST be + // delivered after the funding transaction is confirmed. + NotifyWhenOnline func(peer *btcec.PublicKey, connectedChan chan<- struct{}) + // FindPeer searches the list of peers connected to the node so that // the FundingManager can notify other daemon subsystems as necessary // during the funding process. @@ -1453,18 +1459,46 @@ func (f *fundingManager) sendFundingLockedAndAnnounceChannel( } fundingLockedMsg := lnwire.NewFundingLocked(chanID, nextRevocation) - err = f.cfg.SendToPeer(completeChan.IdentityPub, fundingLockedMsg) - if err != nil { - fndgLog.Errorf("unable to send fundingLocked to peer: %v", err) - return + // If the peer has disconnected before we reach this point, we will need + // to wait for him to come back online before sending the fundingLocked + // message. This is special for fundingLocked, since failing to send any + // of the previous messages in the funding flow just cancels the flow. + // But now the funding transaction is confirmed, the channel is open + // and we have to make sure the peer gets the fundingLocked message when + // it comes back online. This is also crucial during restart of lnd, + // where we might try to resend the fundingLocked message before the + // server has had the time to connect to the peer. We keep trying to + // send fundingLocked until we succeed, or the fundingManager is shut + // down. + for { + err = f.cfg.SendToPeer(completeChan.IdentityPub, + fundingLockedMsg) + if err == nil { + // Sending succeeded, we can break out and continue + // the funding flow. + break + } + + fndgLog.Warnf("unable to send fundingLocked to peer %x: "+ + "%v. Will retry when online", + completeChan.IdentityPub.SerializeCompressed(), err) + + connected := make(chan struct{}) + f.cfg.NotifyWhenOnline(completeChan.IdentityPub, connected) + select { + case <-connected: + // Retry sending. + case <-f.quit: + return + } } // As the fundingLocked message is now sent to the peer, the channel is // moved to the next state of the state machine. It will be moved to the // last state (actually deleted from the database) after the channel is // finally announced. - err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, fundingLockedSent, - shortChanID) + err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, + fundingLockedSent, shortChanID) if err != nil { fndgLog.Errorf("error setting channel state to "+ "fundingLockedSent: %v", err) diff --git a/lnd.go b/lnd.go index 2db3b76f..5e2f3e78 100644 --- a/lnd.go +++ b/lnd.go @@ -186,10 +186,11 @@ func lndMain() error { idPrivKey.PubKey()) return <-errChan }, - ArbiterChan: server.breachArbiter.newContracts, - SendToPeer: server.SendToPeer, - FindPeer: server.FindPeer, - TempChanIDSeed: chanIDSeed, + ArbiterChan: server.breachArbiter.newContracts, + SendToPeer: server.SendToPeer, + NotifyWhenOnline: server.NotifyWhenOnline, + FindPeer: server.FindPeer, + TempChanIDSeed: chanIDSeed, FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) { dbChannels, err := chanDB.FetchAllChannels() if err != nil { From d981e12a3a013a54019384f6c473f6256394b7a8 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Mon, 25 Sep 2017 00:03:07 +0200 Subject: [PATCH 6/8] tests: cleanup of fundingmanager_test.go This commit cleans the fundingManager tests by extracting most of the common code from the different test cases into assert methods, making the test cases easier to follow and distinguish. It also adds a new test for the case where the peer goes offline, and the fundingManager must wait for it to come online before it can send the fundingLocked message and continue the funding flow. --- fundingmanager_test.go | 1239 +++++++++++----------------------------- 1 file changed, 328 insertions(+), 911 deletions(-) diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 66bc6c24..bf4fe291 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -143,12 +143,27 @@ func createTestWallet(cdb *channeldb.DB, netParams *chaincfg.Params, return wallet, nil } -func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey, - tempTestDir string, hdSeed []byte, netParams *chaincfg.Params, - chainNotifier chainntnfs.ChainNotifier, estimator lnwallet.FeeEstimator, - sentMessages chan lnwire.Message, sentAnnouncements chan lnwire.Message, - publTxChan chan *wire.MsgTx, shutdownChan chan struct{}, - arbiterChan chan *lnwallet.LightningChannel, fundingPeer *peer) (*fundingManager, error) { +func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, + tempTestDir string) (*testNode, error) { + + netParams := activeNetParams.Params + estimator := lnwallet.StaticFeeEstimator{FeeRate: 250} + + chainNotifier := &mockNotifier{ + confChannel: make(chan *chainntnfs.TxConfirmation, 1), + epochChan: make(chan *chainntnfs.BlockEpoch, 1), + } + + newChannelsChan := make(chan *newChannelMsg) + p := &peer{ + newChannels: newChannelsChan, + } + + sentMessages := make(chan lnwire.Message) + sentAnnouncements := make(chan lnwire.Message) + publTxChan := make(chan *wire.MsgTx, 1) + arbiterChan := make(chan *lnwallet.LightningChannel) + shutdownChan := make(chan struct{}) wc := &mockWalletController{ rootKey: alicePrivKey, @@ -174,7 +189,7 @@ func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey, var chanIDSeed [32]byte f, err := newFundingManager(fundingConfig{ - IDKey: pubKey, + IDKey: privKey.PubKey(), Wallet: lnw, Notifier: chainNotifier, FeeEstimator: estimator, @@ -201,8 +216,11 @@ func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey, } return nil }, + NotifyWhenOnline: func(peer *btcec.PublicKey, connectedChan chan<- struct{}) { + t.Fatalf("did not expect fundingManager to call NotifyWhenOnline") + }, FindPeer: func(peerKey *btcec.PublicKey) (*peer, error) { - return fundingPeer, nil + return p, nil }, TempChanIDSeed: chanIDSeed, FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) { @@ -236,7 +254,22 @@ func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey, t.Fatalf("failed creating fundingManager: %v", err) } - return f, nil + if err = f.Start(); err != nil { + t.Fatalf("failed starting fundingManager: %v", err) + } + + return &testNode{ + privKey: privKey, + msgChan: sentMessages, + announceChan: sentAnnouncements, + arbiterChan: arbiterChan, + publTxChan: publTxChan, + fundingMgr: f, + peer: p, + mockNotifier: chainNotifier, + testDir: tempTestDir, + shutdownChannel: shutdownChan, + }, nil } func recreateAliceFundingManager(t *testing.T, alice *testNode) { @@ -282,6 +315,9 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) { } return nil }, + NotifyWhenOnline: func(peer *btcec.PublicKey, connectedChan chan<- struct{}) { + t.Fatalf("did not expect fundingManager to call NotifyWhenOnline") + }, FindPeer: oldCfg.FindPeer, TempChanIDSeed: oldCfg.TempChanIDSeed, FindChannel: oldCfg.FindChannel, @@ -307,101 +343,26 @@ func setupFundingManagers(t *testing.T) (*testNode, *testNode) { MaxPendingChannels: defaultMaxPendingChannels, } - netParams := activeNetParams.Params - estimator := lnwallet.StaticFeeEstimator{FeeRate: 250} - - aliceNewChannelsChan := make(chan *newChannelMsg) - alicePeer := &peer{ - newChannels: aliceNewChannelsChan, - } - - bobNewChannelsChan := make(chan *newChannelMsg) - bobPeer := &peer{ - newChannels: bobNewChannelsChan, - } - - aliceMockNotifier := &mockNotifier{ - confChannel: make(chan *chainntnfs.TxConfirmation, 1), - epochChan: make(chan *chainntnfs.BlockEpoch, 1), - } - aliceTestDir, err := ioutil.TempDir("", "alicelnwallet") if err != nil { t.Fatalf("unable to create temp directory: %v", err) } - aliceMsgChan := make(chan lnwire.Message) - aliceAnnounceChan := make(chan lnwire.Message) - alicePublTxChan := make(chan *wire.MsgTx, 1) - aliceArbiterChan := make(chan *lnwallet.LightningChannel) - aliceShutdownChannel := make(chan struct{}) - - aliceFundingMgr, err := createTestFundingManager(t, alicePubKey, - aliceTestDir, alicePrivKeyBytes[:], netParams, aliceMockNotifier, - estimator, aliceMsgChan, aliceAnnounceChan, alicePublTxChan, - aliceShutdownChannel, aliceArbiterChan, alicePeer) + alice, err := createTestFundingManager(t, alicePrivKey, aliceTestDir) if err != nil { t.Fatalf("failed creating fundingManager: %v", err) } - if err = aliceFundingMgr.Start(); err != nil { - t.Fatalf("failed starting fundingManager: %v", err) - } - - alice := &testNode{ - privKey: alicePrivKey, - msgChan: aliceMsgChan, - announceChan: aliceAnnounceChan, - arbiterChan: aliceArbiterChan, - publTxChan: alicePublTxChan, - fundingMgr: aliceFundingMgr, - peer: alicePeer, - mockNotifier: aliceMockNotifier, - testDir: aliceTestDir, - shutdownChannel: aliceShutdownChannel, - } - - bobMockNotifier := &mockNotifier{ - confChannel: make(chan *chainntnfs.TxConfirmation, 1), - epochChan: make(chan *chainntnfs.BlockEpoch, 1), - } - bobTestDir, err := ioutil.TempDir("", "boblnwallet") if err != nil { t.Fatalf("unable to create temp directory: %v", err) } - bobMsgChan := make(chan lnwire.Message) - bobAnnounceChan := make(chan lnwire.Message) - bobPublTxChan := make(chan *wire.MsgTx, 1) - bobArbiterChan := make(chan *lnwallet.LightningChannel) - bobShutdownChannel := make(chan struct{}) - - bobFundingMgr, err := createTestFundingManager(t, bobPubKey, bobTestDir, - bobPrivKeyBytes[:], netParams, bobMockNotifier, estimator, - bobMsgChan, bobAnnounceChan, bobPublTxChan, shutdownChannel, - bobArbiterChan, bobPeer) + bob, err := createTestFundingManager(t, bobPrivKey, bobTestDir) if err != nil { t.Fatalf("failed creating fundingManager: %v", err) } - if err = bobFundingMgr.Start(); err != nil { - t.Fatalf("failed starting fundingManager: %v", err) - } - - bob := &testNode{ - privKey: bobPrivKey, - msgChan: bobMsgChan, - announceChan: bobAnnounceChan, - arbiterChan: bobArbiterChan, - publTxChan: bobPublTxChan, - fundingMgr: bobFundingMgr, - peer: bobPeer, - mockNotifier: bobMockNotifier, - testDir: bobTestDir, - shutdownChannel: bobShutdownChannel, - } - return alice, bob } @@ -559,30 +520,8 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, return fundingOutPoint } -func TestFundingManagerNormalWorkflow(t *testing.T) { - disableFndgLogger(t) - - alice, bob := setupFundingManagers(t) - defer tearDownFundingManagers(t, alice, bob) - - // We will consume the channel updates as we go, so no buffering is needed. - updateChan := make(chan *lnrpc.OpenStatusUpdate) - - // Run through the process of opening the channel, up until the funding - // transaction is broadcasted. - fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan) - - // Notify that transaction was mined - alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} - bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} - - // Give fundingManager time to process the newly mined tx and write - //state to database. - time.Sleep(300 * time.Millisecond) - - // The funding transaction was mined, so assert that both funding - // managers now have the state of this channel 'markedOpen' in their - // internal state machine. +func assertMarkedOpen(t *testing.T, alice, bob *testNode, + fundingOutPoint *wire.OutPoint) { state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) if err != nil { t.Fatalf("unable to get channel state: %v", err) @@ -599,53 +538,33 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { if state != markedOpen { t.Fatalf("expected state to be markedOpen, was %v", state) } +} - // After the funding transaction is mined, Alice will send - // fundingLocked to Bob. - var aliceMsg lnwire.Message +func checkNodeSendingFundingLocked(t *testing.T, node *testNode) *lnwire.FundingLocked { + var msg lnwire.Message select { - case aliceMsg = <-alice.msgChan: + case msg = <-node.msgChan: case <-time.After(time.Second * 5): - t.Fatalf("alice did not send fundingLocked") + t.Fatalf("node did not send fundingLocked") } - fundingLockedAlice, ok := aliceMsg.(*lnwire.FundingLocked) + fundingLocked, ok := msg.(*lnwire.FundingLocked) if !ok { - errorMsg, gotError := aliceMsg.(*lnwire.Error) + errorMsg, gotError := msg.(*lnwire.Error) if gotError { t.Fatalf("expected FundingLocked to be sent "+ - "from alice, instead got error: %v", + "from node, instead got error: %v", lnwire.ErrorCode(errorMsg.Data[0])) } - t.Fatalf("expected FundingLocked to be sent from alice, "+ - "instead got %T", aliceMsg) + t.Fatalf("expected FundingLocked to be sent from node, "+ + "instead got %T", msg) } + return fundingLocked +} - // And similarly Bob will send funding locked to Alice. - var bobMsg lnwire.Message - select { - case bobMsg = <-bob.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send fundingLocked") - } - - fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked) - if !ok { - errorMsg, gotError := bobMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingLocked to be sent "+ - "from bob, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingLocked to be sent from bob, "+ - "instead got %T", bobMsg) - } - - // Sleep to make sure database write is finished. - time.Sleep(300 * time.Millisecond) - - // Check that the state machine is updated accordingly - state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) +func assertFundingLockedSent(t *testing.T, alice, bob *testNode, + fundingOutPoint *wire.OutPoint) { + state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) if err != nil { t.Fatalf("unable to get channel state: %v", err) } @@ -661,7 +580,9 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { if state != fundingLockedSent { t.Fatalf("expected state to be fundingLockedSent, was %v", state) } +} +func assertChannelAnnouncements(t *testing.T, alice, bob *testNode) { // After the FundingLocked message is sent, the channel will be announced. // A chanAnnouncement consists of three distinct messages: // 1) ChannelAnnouncement @@ -748,9 +669,9 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { if !gotNodeAnnouncement { t.Fatalf("did not get NodeAnnouncement from Bob") } +} - // The funding process is now finished, wait for the - // OpenStatusUpdate_ChanOpen update +func waitForOpenUpdate(t *testing.T, updateChan chan *lnrpc.OpenStatusUpdate) { var openUpdate *lnrpc.OpenStatusUpdate select { case openUpdate = <-updateChan: @@ -758,30 +679,27 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { t.Fatalf("alice did not send OpenStatusUpdate") } - _, ok = openUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanOpen) + _, ok := openUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanOpen) if !ok { t.Fatal("OpenStatusUpdate was not OpenStatusUpdate_ChanOpen") } +} - // The internal state-machine should now have deleted the channelStates - // from the database, as the channel is announced. - state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) +func assertNoChannelState(t *testing.T, alice, bob *testNode, + fundingOutPoint *wire.OutPoint) { + state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) if err != ErrChannelNotFound { t.Fatalf("expected to not find channel state, but got: %v", state) } - // Need to give bob time to update database. - time.Sleep(300 * time.Millisecond) - state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) if err != ErrChannelNotFound { t.Fatalf("expected to not find channel state, but got: %v", state) } - // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) +} +func assertHandleFundingLocked(t *testing.T, alice, bob *testNode) { // They should both send the new channel to the breach arbiter. select { case <-alice.arbiterChan: @@ -809,7 +727,66 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { case <-time.After(time.Second * 5): t.Fatalf("bob did not send new channel to peer") } +} +func TestFundingManagerNormalWorkflow(t *testing.T) { + disableFndgLogger(t) + + alice, bob := setupFundingManagers(t) + defer tearDownFundingManagers(t, alice, bob) + + // We will consume the channel updates as we go, so no buffering is needed. + updateChan := make(chan *lnrpc.OpenStatusUpdate) + + // Run through the process of opening the channel, up until the funding + // transaction is broadcasted. + fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan) + + // Notify that transaction was mined + alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} + bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} + + // Give fundingManager time to process the newly mined tx and write + //state to database. + time.Sleep(300 * time.Millisecond) + + // The funding transaction was mined, so assert that both funding + // managers now have the state of this channel 'markedOpen' in their + // internal state machine. + assertMarkedOpen(t, alice, bob, fundingOutPoint) + + // After the funding transaction is mined, Alice will send + // fundingLocked to Bob. + fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + + // And similarly Bob will send funding locked to Alice. + fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + + // Sleep to make sure database write is finished. + time.Sleep(300 * time.Millisecond) + + // Check that the state machine is updated accordingly + assertFundingLockedSent(t, alice, bob, fundingOutPoint) + + // Make sure both fundingManagers send the expected channel announcements. + assertChannelAnnouncements(t, alice, bob) + + // The funding process is now finished, wait for the + // OpenStatusUpdate_ChanOpen update + waitForOpenUpdate(t, updateChan) + + // The internal state-machine should now have deleted the channelStates + // from the database, as the channel is announced. + time.Sleep(300 * time.Millisecond) + assertNoChannelState(t, alice, bob, fundingOutPoint) + + // Exchange the fundingLocked messages. + alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + + // Check that they notify the breach arbiter and peer about the new + // channel. + assertHandleFundingLocked(t, alice, bob) } func TestFundingManagerRestartBehavior(t *testing.T) { @@ -828,12 +805,15 @@ func TestFundingManagerRestartBehavior(t *testing.T) { // before this message has been successfully sent, it should retry // sending it on restart. We mimic this behavior by letting the // SendToPeer method return an error, as if the message was not - // successfully sent. We then the fundingManager and make sure + // successfully sent. We then recreate the fundingManager and make sure // it continues the process as expected. alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey, msgs ...lnwire.Message) error { return fmt.Errorf("intentional error in SendToPeer") } + alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, con chan<- struct{}) { + // Intetionally empty. + } // Notify that transaction was mined alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} @@ -846,22 +826,7 @@ func TestFundingManagerRestartBehavior(t *testing.T) { // The funding transaction was mined, so assert that both funding // managers now have the state of this channel 'markedOpen' in their // internal state machine. - state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != nil { - t.Fatalf("unable to get channel state: %v", err) - } - - if state != markedOpen { - t.Fatalf("expected state to be markedOpen, was %v", state) - } - state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != nil { - t.Fatalf("unable to get channel state: %v", err) - } - - if state != markedOpen { - t.Fatalf("expected state to be markedOpen, was %v", state) - } + assertMarkedOpen(t, alice, bob, fundingOutPoint) // After the funding transaction was mined, Bob should have successfully // sent the fundingLocked message, while Alice failed sending it. In @@ -874,31 +839,14 @@ func TestFundingManagerRestartBehavior(t *testing.T) { // Expected. } - // Bob will send funding locked to Alice - var bobMsg lnwire.Message - select { - case bobMsg = <-bob.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send fundingLocked") - } - - fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked) - if !ok { - errorMsg, gotError := bobMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingLocked to be sent "+ - "from bob, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingLocked to be sent from bob, "+ - "instead got %T", bobMsg) - } + // Bob will send funding locked to Alice. + fundingLockedBob := checkNodeSendingFundingLocked(t, bob) // Sleep to make sure database write is finished. time.Sleep(1 * time.Second) // Alice should still be markedOpen - state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) if err != nil { t.Fatalf("unable to get channel state: %v", err) } @@ -927,24 +875,7 @@ func TestFundingManagerRestartBehavior(t *testing.T) { return fmt.Errorf("intentional error in SendAnnouncement") } - var aliceMsg lnwire.Message - select { - case aliceMsg = <-alice.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send fundingLocked") - } - - fundingLockedAlice, ok := aliceMsg.(*lnwire.FundingLocked) - if !ok { - errorMsg, gotError := aliceMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingLocked to be sent "+ - "from alice, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingLocked to be sent from alice, "+ - "instead got %T", aliceMsg) - } + fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) // Sleep to make sure database write is finished. time.Sleep(500 * time.Millisecond) @@ -967,138 +898,179 @@ func TestFundingManagerRestartBehavior(t *testing.T) { // Expected } - // Bob, however, should send the announcements - announcements := make([]lnwire.Message, 4) - for i := 0; i < len(announcements); i++ { - select { - case announcements[i] = <-bob.announceChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send announcement %v", i) - } - } - - gotChannelAnnouncement := false - gotChannelUpdate := false - gotAnnounceSignatures := false - gotNodeAnnouncement := false - - for _, msg := range announcements { - switch msg.(type) { - case *lnwire.ChannelAnnouncement: - gotChannelAnnouncement = true - case *lnwire.ChannelUpdate: - gotChannelUpdate = true - case *lnwire.AnnounceSignatures: - gotAnnounceSignatures = true - case *lnwire.NodeAnnouncement: - gotNodeAnnouncement = true - } - } - - if !gotChannelAnnouncement { - t.Fatalf("did not get ChannelAnnouncement from Bob") - } - if !gotChannelUpdate { - t.Fatalf("did not get ChannelUpdate from Bob") - } - if !gotAnnounceSignatures { - t.Fatalf("did not get AnnounceSignatures from Bob") - } - if !gotNodeAnnouncement { - t.Fatalf("did not get NodeAnnouncement from Bob") - } - // Next up, we check that the Alice rebroadcasts the announcement - // messages on restart. + // messages on restart. Bob should as expected send announcements. recreateAliceFundingManager(t, alice) time.Sleep(300 * time.Millisecond) - for i := 0; i < len(announcements); i++ { - select { - case announcements[i] = <-alice.announceChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send announcement %v", i) - } - } - - gotChannelAnnouncement = false - gotChannelUpdate = false - gotAnnounceSignatures = false - gotNodeAnnouncement = false - - for _, msg := range announcements { - switch msg.(type) { - case *lnwire.ChannelAnnouncement: - gotChannelAnnouncement = true - case *lnwire.ChannelUpdate: - gotChannelUpdate = true - case *lnwire.AnnounceSignatures: - gotAnnounceSignatures = true - case *lnwire.NodeAnnouncement: - gotNodeAnnouncement = true - } - } - - if !gotChannelAnnouncement { - t.Fatalf("did not get ChannelAnnouncement from Alice after restart") - } - if !gotChannelUpdate { - t.Fatalf("did not get ChannelUpdate from Alice after restart") - } - if !gotAnnounceSignatures { - t.Fatalf("did not get AnnounceSignatures from Alice after restart") - } - if !gotNodeAnnouncement { - t.Fatalf("did not get NodeAnnouncement from Alice after restart") - } + assertChannelAnnouncements(t, alice, bob) // The funding process is now finished. Since we recreated the // fundingManager, we don't have an update channel to synchronize on, // so a small sleep makes sure the database writing is finished. time.Sleep(300 * time.Millisecond) - // The internal state-machine should now have deleted them from the - // internal database, as the channel is announced. - state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != ErrChannelNotFound { - t.Fatalf("expected to not find channel state, but got: %v", state) - } - - state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != ErrChannelNotFound { - t.Fatalf("expected to not find channel state, but got: %v", state) - } + // The internal state-machine should now have deleted the channelStates + // from the database, as the channel is announced. + assertNoChannelState(t, alice, bob, fundingOutPoint) // Exchange the fundingLocked messages. alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) - // They should both send the new channel to the breach arbiter. + // Check that they notify the breach arbiter and peer about the new + // channel. + assertHandleFundingLocked(t, alice, bob) + +} + +// TestFundingManagerOfflinePeer checks that the fundingManager waits for the +// server to notify when the peer comes online, in case sending the +// fundingLocked message fails the first time. +func TestFundingManagerOfflinePeer(t *testing.T) { + disableFndgLogger(t) + + alice, bob := setupFundingManagers(t) + defer tearDownFundingManagers(t, alice, bob) + + // Run through the process of opening the channel, up until the funding + // transaction is broadcasted. + updateChan := make(chan *lnrpc.OpenStatusUpdate) + fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan) + + // After the funding transaction gets mined, both nodes will send the + // fundingLocked message to the other peer. If the funding node fails + // to send the fundingLocked message to the peer, it should wait for + // the server to notify it that the peer is back online, and try again. + alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey, + msgs ...lnwire.Message) error { + return fmt.Errorf("intentional error in SendToPeer") + } + peerChan := make(chan *btcec.PublicKey, 1) + conChan := make(chan chan<- struct{}, 1) + alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, connected chan<- struct{}) { + peerChan <- peer + conChan <- connected + } + + // Notify that transaction was mined + alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} + bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} + + // Give fundingManager time to process the newly mined tx and write to + // the database. + time.Sleep(500 * time.Millisecond) + + // The funding transaction was mined, so assert that both funding + // managers now have the state of this channel 'markedOpen' in their + // internal state machine. + assertMarkedOpen(t, alice, bob, fundingOutPoint) + + // After the funding transaction was mined, Bob should have successfully + // sent the fundingLocked message, while Alice failed sending it. In + // Alice's case this means that there should be no messages for Bob, and + // the channel should still be in state 'markedOpen' select { - case <-alice.arbiterChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send channel to breach arbiter") + case msg := <-alice.msgChan: + t.Fatalf("did not expect any message from Alice: %v", msg) + default: + // Expected. + } + + // Bob will send funding locked to Alice + fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + + // Sleep to make sure database write is finished. + time.Sleep(1 * time.Second) + + // Alice should still be markedOpen + state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != markedOpen { + t.Fatalf("expected state to be markedOpen, was %v", state) + } + + // While Bob successfully sent fundingLocked. + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != fundingLockedSent { + t.Fatalf("expected state to be fundingLockedSent, was %v", state) + } + + // Alice should be waiting for the server to notify when Bob somes back online. + var peer *btcec.PublicKey + var con chan<- struct{} + select { + case peer = <-peerChan: + // Expected + case <-time.After(time.Second * 3): + t.Fatalf("alice did not register peer with server") } select { - case <-bob.arbiterChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send channel to breach arbiter") + case con = <-conChan: + // Expected + case <-time.After(time.Second * 3): + t.Fatalf("alice did not register connectedChan with server") } - // And send the new channel state to their peer. - select { - case c := <-alice.peer.newChannels: - close(c.done) - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send new channel to peer") + if !peer.IsEqual(bobPubKey) { + t.Fatalf("expected to receive Bob's pubkey (%v), instead got %v", + bobPubKey, peer) } - select { - case c := <-bob.peer.newChannels: - close(c.done) - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send new channel to peer") + // Fix Alice's SendToPeer, and notify that Bob is back online. + alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey, + msgs ...lnwire.Message) error { + select { + case alice.msgChan <- msgs[0]: + case <-alice.shutdownChannel: + return fmt.Errorf("shutting down") + } + return nil } + close(con) + + // This should make Alice send the fundingLocked. + fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + + // Sleep to make sure database write is finished. + time.Sleep(500 * time.Millisecond) + + // The state should now be fundingLockedSent + state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != fundingLockedSent { + t.Fatalf("expected state to be fundingLockedSent, was %v", state) + } + + // Make sure both fundingManagers send the expected channel announcements. + assertChannelAnnouncements(t, alice, bob) + + // The funding process is now finished, wait for the + // OpenStatusUpdate_ChanOpen update + waitForOpenUpdate(t, updateChan) + + // The internal state-machine should now have deleted the channelStates + // from the database, as the channel is announced. + time.Sleep(300 * time.Millisecond) + assertNoChannelState(t, alice, bob, fundingOutPoint) + + // Exchange the fundingLocked messages. + alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) + + // Check that they notify the breach arbiter and peer about the new + // channel. + assertHandleFundingLocked(t, alice, bob) } @@ -1189,219 +1161,41 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { // The funding transaction was mined, so assert that both funding // managers now have the state of this channel 'markedOpen' in their // internal state machine. - state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != nil { - t.Fatalf("unable to get channel state: %v", err) - } - - if state != markedOpen { - t.Fatalf("expected state to be markedOpen, was %v", state) - } - state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != nil { - t.Fatalf("unable to get channel state: %v", err) - } - - if state != markedOpen { - t.Fatalf("expected state to be markedOpen, was %v", state) - } + assertMarkedOpen(t, alice, bob, fundingOutPoint) // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - var aliceMsg lnwire.Message - select { - case aliceMsg = <-alice.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send fundingLocked") - } - - _, ok := aliceMsg.(*lnwire.FundingLocked) - if !ok { - errorMsg, gotError := aliceMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingLocked to be sent "+ - "from alice, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingLocked to be sent from alice, "+ - "instead got %T", aliceMsg) - } + fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) // And similarly Bob will send funding locked to Alice. - var bobMsg lnwire.Message - select { - case bobMsg = <-bob.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send fundingLocked") - } - - fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked) - if !ok { - errorMsg, gotError := bobMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingLocked to be sent "+ - "from bob, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingLocked to be sent from bob, "+ - "instead got %T", bobMsg) - } + fundingLockedBob := checkNodeSendingFundingLocked(t, bob) // Sleep to make sure database write is finished. time.Sleep(300 * time.Millisecond) // Check that the state machine is updated accordingly - state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != nil { - t.Fatalf("unable to get channel state: %v", err) - } + assertFundingLockedSent(t, alice, bob, fundingOutPoint) - if state != fundingLockedSent { - t.Fatalf("expected state to be fundingLockedSent, was %v", state) - } - state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != nil { - t.Fatalf("unable to get channel state: %v", err) - } - - if state != fundingLockedSent { - t.Fatalf("expected state to be fundingLockedSent, was %v", state) - } - - // After the FundingLocked message is sent, the channel will be announced. - // A chanAnnouncement consists of three distinct messages: - // 1) ChannelAnnouncement - // 2) ChannelUpdate - // 3) AnnounceSignatures - // that will be announced in no particular order. - // A node announcement will also be sent. - announcements := make([]lnwire.Message, 4) - for i := 0; i < len(announcements); i++ { - select { - case announcements[i] = <-alice.announceChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send announcement %v", i) - } - } - - gotChannelAnnouncement := false - gotChannelUpdate := false - gotAnnounceSignatures := false - gotNodeAnnouncement := false - - for _, msg := range announcements { - switch msg.(type) { - case *lnwire.ChannelAnnouncement: - gotChannelAnnouncement = true - case *lnwire.ChannelUpdate: - gotChannelUpdate = true - case *lnwire.AnnounceSignatures: - gotAnnounceSignatures = true - case *lnwire.NodeAnnouncement: - gotNodeAnnouncement = true - } - } - - if !gotChannelAnnouncement { - t.Fatalf("did not get ChannelAnnouncement from Alice") - } - if !gotChannelUpdate { - t.Fatalf("did not get ChannelUpdate from Alice") - } - if !gotAnnounceSignatures { - t.Fatalf("did not get AnnounceSignatures from Alice") - } - if !gotNodeAnnouncement { - t.Fatalf("did not get NodeAnnouncement from Alice") - } - - // Do the check for Bob as well. - for i := 0; i < len(announcements); i++ { - select { - case announcements[i] = <-bob.announceChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send announcement %v", i) - } - } - - gotChannelAnnouncement = false - gotChannelUpdate = false - gotAnnounceSignatures = false - gotNodeAnnouncement = false - - for _, msg := range announcements { - switch msg.(type) { - case *lnwire.ChannelAnnouncement: - gotChannelAnnouncement = true - case *lnwire.ChannelUpdate: - gotChannelUpdate = true - case *lnwire.AnnounceSignatures: - gotAnnounceSignatures = true - case *lnwire.NodeAnnouncement: - gotNodeAnnouncement = true - } - } - - if !gotChannelAnnouncement { - t.Fatalf("did not get ChannelAnnouncement from Bob") - } - if !gotChannelUpdate { - t.Fatalf("did not get ChannelUpdate from Bob") - } - if !gotAnnounceSignatures { - t.Fatalf("did not get AnnounceSignatures from Bob") - } - if !gotNodeAnnouncement { - t.Fatalf("did not get NodeAnnouncement from Bob") - } + // Make sure both fundingManagers send the expected channel announcements. + assertChannelAnnouncements(t, alice, bob) // The funding process is now finished, wait for the // OpenStatusUpdate_ChanOpen update - var openUpdate *lnrpc.OpenStatusUpdate - select { - case openUpdate = <-updateChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send OpenStatusUpdate") - } - - _, ok = openUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanOpen) - if !ok { - t.Fatal("OpenStatusUpdate was not OpenStatusUpdate_ChanOpen") - } + waitForOpenUpdate(t, updateChan) // The internal state-machine should now have deleted the channelStates // from the database, as the channel is announced. - state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != ErrChannelNotFound { - t.Fatalf("expected to not find channel state, but got: %v", state) - } - - // Need to give bob time to update database. time.Sleep(300 * time.Millisecond) + assertNoChannelState(t, alice, bob, fundingOutPoint) - state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != ErrChannelNotFound { - t.Fatalf("expected to not find channel state, but got: %v", state) - } - - // Send the fundingLocked message twice to Alice. + // Send the fundingLocked message twice to Alice, and once to Bob. alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) + bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) - // Alice should send the new channel to the breach arbiter. - select { - case <-alice.arbiterChan: - case <-time.After(time.Millisecond * 300): - t.Fatalf("alice did not send channel to breach arbiter") - } - - // And send the new channel state to their peer. - select { - case c := <-alice.peer.newChannels: - close(c.done) - case <-time.After(time.Millisecond * 300): - t.Fatalf("alice did not send new channel to peer") - } + // Check that they notify the breach arbiter and peer about the new + // channel. + assertHandleFundingLocked(t, alice, bob) // Alice should not send the channel state the second time, as the // second funding locked should just be ignored. @@ -1463,200 +1257,32 @@ func TestFundingManagerRestartAfterChanAnn(t *testing.T) { // The funding transaction was mined, so assert that both funding // managers now have the state of this channel 'markedOpen' in their // internal state machine. - state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != nil { - t.Fatalf("unable to get channel state: %v", err) - } - - if state != markedOpen { - t.Fatalf("expected state to be markedOpen, was %v", state) - } - state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != nil { - t.Fatalf("unable to get channel state: %v", err) - } - - if state != markedOpen { - t.Fatalf("expected state to be markedOpen, was %v", state) - } + assertMarkedOpen(t, alice, bob, fundingOutPoint) // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - var aliceMsg lnwire.Message - select { - case aliceMsg = <-alice.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send fundingLocked") - } - - fundingLockedAlice, ok := aliceMsg.(*lnwire.FundingLocked) - if !ok { - errorMsg, gotError := aliceMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingLocked to be sent "+ - "from alice, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingLocked to be sent from alice, "+ - "instead got %T", aliceMsg) - } + fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) // And similarly Bob will send funding locked to Alice. - var bobMsg lnwire.Message - select { - case bobMsg = <-bob.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send fundingLocked") - } - - fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked) - if !ok { - errorMsg, gotError := bobMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingLocked to be sent "+ - "from bob, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingLocked to be sent from bob, "+ - "instead got %T", bobMsg) - } + fundingLockedBob := checkNodeSendingFundingLocked(t, bob) // Sleep to make sure database write is finished. time.Sleep(300 * time.Millisecond) // Check that the state machine is updated accordingly - state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != nil { - t.Fatalf("unable to get channel state: %v", err) - } + assertFundingLockedSent(t, alice, bob, fundingOutPoint) - if state != fundingLockedSent { - t.Fatalf("expected state to be fundingLockedSent, was %v", state) - } - state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != nil { - t.Fatalf("unable to get channel state: %v", err) - } - - if state != fundingLockedSent { - t.Fatalf("expected state to be fundingLockedSent, was %v", state) - } - - // After the FundingLocked message is sent, the channel will be announced. - // A chanAnnouncement consists of three distinct messages: - // 1) ChannelAnnouncement - // 2) ChannelUpdate - // 3) AnnounceSignatures - // that will be announced in no particular order. - // A node announcement will also be sent. - announcements := make([]lnwire.Message, 4) - for i := 0; i < len(announcements); i++ { - select { - case announcements[i] = <-alice.announceChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send announcement %v", i) - } - } - - gotChannelAnnouncement := false - gotChannelUpdate := false - gotAnnounceSignatures := false - gotNodeAnnouncement := false - - for _, msg := range announcements { - switch msg.(type) { - case *lnwire.ChannelAnnouncement: - gotChannelAnnouncement = true - case *lnwire.ChannelUpdate: - gotChannelUpdate = true - case *lnwire.AnnounceSignatures: - gotAnnounceSignatures = true - case *lnwire.NodeAnnouncement: - gotNodeAnnouncement = true - } - } - - if !gotChannelAnnouncement { - t.Fatalf("did not get ChannelAnnouncement from Alice") - } - if !gotChannelUpdate { - t.Fatalf("did not get ChannelUpdate from Alice") - } - if !gotAnnounceSignatures { - t.Fatalf("did not get AnnounceSignatures from Alice") - } - if !gotNodeAnnouncement { - t.Fatalf("did not get NodeAnnouncement from Alice") - } - - // Do the check for Bob as well. - for i := 0; i < len(announcements); i++ { - select { - case announcements[i] = <-bob.announceChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send announcement %v", i) - } - } - - gotChannelAnnouncement = false - gotChannelUpdate = false - gotAnnounceSignatures = false - gotNodeAnnouncement = false - - for _, msg := range announcements { - switch msg.(type) { - case *lnwire.ChannelAnnouncement: - gotChannelAnnouncement = true - case *lnwire.ChannelUpdate: - gotChannelUpdate = true - case *lnwire.AnnounceSignatures: - gotAnnounceSignatures = true - case *lnwire.NodeAnnouncement: - gotNodeAnnouncement = true - } - } - - if !gotChannelAnnouncement { - t.Fatalf("did not get ChannelAnnouncement from Bob") - } - if !gotChannelUpdate { - t.Fatalf("did not get ChannelUpdate from Bob") - } - if !gotAnnounceSignatures { - t.Fatalf("did not get AnnounceSignatures from Bob") - } - if !gotNodeAnnouncement { - t.Fatalf("did not get NodeAnnouncement from Bob") - } + // Make sure both fundingManagers send the expected channel announcements. + assertChannelAnnouncements(t, alice, bob) // The funding process is now finished, wait for the // OpenStatusUpdate_ChanOpen update - var openUpdate *lnrpc.OpenStatusUpdate - select { - case openUpdate = <-updateChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send OpenStatusUpdate") - } - - _, ok = openUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanOpen) - if !ok { - t.Fatal("OpenStatusUpdate was not OpenStatusUpdate_ChanOpen") - } + waitForOpenUpdate(t, updateChan) // The internal state-machine should now have deleted the channelStates // from the database, as the channel is announced. - state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != ErrChannelNotFound { - t.Fatalf("expected to not find channel state, but got: %v", state) - } - - // Need to give bob time to update database. time.Sleep(300 * time.Millisecond) - - state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != ErrChannelNotFound { - t.Fatalf("expected to not find channel state, but got: %v", state) - } + assertNoChannelState(t, alice, bob, fundingOutPoint) // At this point we restart Alice's fundingManager, before she receives // the fundingLocked message. After restart, she will receive it, and @@ -1668,34 +1294,9 @@ func TestFundingManagerRestartAfterChanAnn(t *testing.T) { alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) - // They should both send the new channel to the breach arbiter. - select { - case <-alice.arbiterChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send channel to breach arbiter") - } - - select { - case <-bob.arbiterChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send channel to breach arbiter") - } - - // And send the new channel state to their peer. - select { - case c := <-alice.peer.newChannels: - close(c.done) - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send new channel to peer") - } - - select { - case c := <-bob.peer.newChannels: - close(c.done) - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send new channel to peer") - } - + // Check that they notify the breach arbiter and peer about the new + // channel. + assertHandleFundingLocked(t, alice, bob) } // TestFundingManagerRestartAfterReceivingFundingLocked checks that the @@ -1725,84 +1326,20 @@ func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) { // The funding transaction was mined, so assert that both funding // managers now have the state of this channel 'markedOpen' in their // internal state machine. - state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != nil { - t.Fatalf("unable to get channel state: %v", err) - } - - if state != markedOpen { - t.Fatalf("expected state to be markedOpen, was %v", state) - } - state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != nil { - t.Fatalf("unable to get channel state: %v", err) - } - - if state != markedOpen { - t.Fatalf("expected state to be markedOpen, was %v", state) - } + assertMarkedOpen(t, alice, bob, fundingOutPoint) // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - var aliceMsg lnwire.Message - select { - case aliceMsg = <-alice.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send fundingLocked") - } - - fundingLockedAlice, ok := aliceMsg.(*lnwire.FundingLocked) - if !ok { - errorMsg, gotError := aliceMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingLocked to be sent "+ - "from alice, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingLocked to be sent from alice, "+ - "instead got %T", aliceMsg) - } + fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) // And similarly Bob will send funding locked to Alice. - var bobMsg lnwire.Message - select { - case bobMsg = <-bob.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send fundingLocked") - } - - fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked) - if !ok { - errorMsg, gotError := bobMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingLocked to be sent "+ - "from bob, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingLocked to be sent from bob, "+ - "instead got %T", bobMsg) - } + fundingLockedBob := checkNodeSendingFundingLocked(t, bob) // Sleep to make sure database write is finished. time.Sleep(300 * time.Millisecond) // Check that the state machine is updated accordingly - state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != nil { - t.Fatalf("unable to get channel state: %v", err) - } - - if state != fundingLockedSent { - t.Fatalf("expected state to be fundingLockedSent, was %v", state) - } - state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != nil { - t.Fatalf("unable to get channel state: %v", err) - } - - if state != fundingLockedSent { - t.Fatalf("expected state to be fundingLockedSent, was %v", state) - } + assertFundingLockedSent(t, alice, bob, fundingOutPoint) // Let Alice immediately get the fundingLocked message. alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) @@ -1824,92 +1361,8 @@ func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) { // Simulate Bob resending the message when Alice is back up. alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr) - // After the FundingLocked message is sent, the channel will be announced. - // A chanAnnouncement consists of three distinct messages: - // 1) ChannelAnnouncement - // 2) ChannelUpdate - // 3) AnnounceSignatures - // that will be announced in no particular order. - // A node announcement will also be sent. - announcements := make([]lnwire.Message, 4) - for i := 0; i < len(announcements); i++ { - select { - case announcements[i] = <-alice.announceChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send announcement %v", i) - } - } - - gotChannelAnnouncement := false - gotChannelUpdate := false - gotAnnounceSignatures := false - gotNodeAnnouncement := false - - for _, msg := range announcements { - switch msg.(type) { - case *lnwire.ChannelAnnouncement: - gotChannelAnnouncement = true - case *lnwire.ChannelUpdate: - gotChannelUpdate = true - case *lnwire.AnnounceSignatures: - gotAnnounceSignatures = true - case *lnwire.NodeAnnouncement: - gotNodeAnnouncement = true - } - } - - if !gotChannelAnnouncement { - t.Fatalf("did not get ChannelAnnouncement from Alice") - } - if !gotChannelUpdate { - t.Fatalf("did not get ChannelUpdate from Alice") - } - if !gotAnnounceSignatures { - t.Fatalf("did not get AnnounceSignatures from Alice") - } - if !gotNodeAnnouncement { - t.Fatalf("did not get NodeAnnouncement from Alice") - } - - // Do the check for Bob as well. - for i := 0; i < len(announcements); i++ { - select { - case announcements[i] = <-bob.announceChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send announcement %v", i) - } - } - - gotChannelAnnouncement = false - gotChannelUpdate = false - gotAnnounceSignatures = false - gotNodeAnnouncement = false - - for _, msg := range announcements { - switch msg.(type) { - case *lnwire.ChannelAnnouncement: - gotChannelAnnouncement = true - case *lnwire.ChannelUpdate: - gotChannelUpdate = true - case *lnwire.AnnounceSignatures: - gotAnnounceSignatures = true - case *lnwire.NodeAnnouncement: - gotNodeAnnouncement = true - } - } - - if !gotChannelAnnouncement { - t.Fatalf("did not get ChannelAnnouncement from Bob") - } - if !gotChannelUpdate { - t.Fatalf("did not get ChannelUpdate from Bob") - } - if !gotAnnounceSignatures { - t.Fatalf("did not get AnnounceSignatures from Bob") - } - if !gotNodeAnnouncement { - t.Fatalf("did not get NodeAnnouncement from Bob") - } + // Make sure both fundingManagers send the expected channel announcements. + assertChannelAnnouncements(t, alice, bob) // The funding process is now finished. Since we recreated the // fundingManager, we don't have an update channel to synchronize on, @@ -1918,48 +1371,12 @@ func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) { // The internal state-machine should now have deleted the channelStates // from the database, as the channel is announced. - state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != ErrChannelNotFound { - t.Fatalf("expected to not find channel state, but got: %v", state) - } + assertNoChannelState(t, alice, bob, fundingOutPoint) - // Need to give bob time to update database. - time.Sleep(300 * time.Millisecond) - - state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) - if err != ErrChannelNotFound { - t.Fatalf("expected to not find channel state, but got: %v", state) - } - - // Exchange the fundingLocked messages. + // Also let Bob get the fundingLocked message. bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr) - // They should both send the new channel to the breach arbiter. - select { - case <-alice.arbiterChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send channel to breach arbiter") - } - - select { - case <-bob.arbiterChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send channel to breach arbiter") - } - - // And send the new channel state to their peer. - select { - case c := <-alice.peer.newChannels: - close(c.done) - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send new channel to peer") - } - - select { - case c := <-bob.peer.newChannels: - close(c.done) - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send new channel to peer") - } - + // Check that they notify the breach arbiter and peer about the new + // channel. + assertHandleFundingLocked(t, alice, bob) } From ee2eec61883415c7b1ff0fe8f96e4834235e9838 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Mon, 2 Oct 2017 13:11:26 +0200 Subject: [PATCH 7/8] peer: ignore new channel requests for already active channels. --- peer.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/peer.go b/peer.go index 73e62f55..c6dbc0c6 100644 --- a/peer.go +++ b/peer.go @@ -1003,10 +1003,19 @@ out: chanID := lnwire.NewChanIDFromOutPoint(chanPoint) newChan := newChanReq.channel - // First, we'll add this channel to the set of active + // Make sure this channel is not already active. + p.activeChanMtx.Lock() + if _, ok := p.activeChannels[chanID]; ok { + peerLog.Infof("Already have ChannelPoint(%v), ignoring.", chanPoint) + p.activeChanMtx.Unlock() + close(newChanReq.done) + newChanReq.channel.Stop() + continue + } + + // If not already active, we'll add this channel to the set of active // channels, so we can look it up later easily // according to its channel ID. - p.activeChanMtx.Lock() p.activeChannels[chanID] = newChan p.activeChanMtx.Unlock() From ff2adf96a9fb1acb14bfdd392aa5e647044ebcb1 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Mon, 2 Oct 2017 13:23:13 +0200 Subject: [PATCH 8/8] fundingManager: conditional select on arbiterChan and peer.newChannels This commit adds a select statement for sending on the fundingManager's arbiterChan and the peer's newChannels channel. This makes sure we won't be blocked sending on these channels in case of a shutdown. --- fundingmanager.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 365387db..310b6052 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -1652,7 +1652,11 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { // With the channel retrieved, we'll send the breach arbiter the new // channel so it can watch for attempts to breach the channel's // contract by the remote party. - f.cfg.ArbiterChan <- channel + select { + case f.cfg.ArbiterChan <- channel: + case <-f.quit: + return + } // Launch a defer so we _ensure_ that the channel barrier is properly // closed even if the target peer is not longer online at this point. @@ -1683,7 +1687,12 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { channel: channel, done: newChanDone, } - peer.newChannels <- newChanMsg + + select { + case peer.newChannels <- newChanMsg: + case <-f.quit: + return + } // We pause here to wait for the peer to recognize the new channel // before we close the channel barrier corresponding to the channel.