fundingManager: handle duplicate fundingLocked
This commit adds a channel barrier on fundingManager startup for channels where the opening process is not finished. This fixes a bug where we after restarting the fundingManager would receive the fundingLocked message, and crash when trying to close the non-existing barrier. In case we received a fundingLocked message after our own opening process was finished, we would crash with the same error. We therefore check if the channel barrier exists before we try to close it. It also adds functionality to fundingManager that makes it ignore a fundingLocked message it receives for a channel where this is already received. This is necessary when we in case of a reconnection resend the fundingLocked since we cannot be sure the remote has received it. The fundingmanager tests are also updated to check that the fundingLocked messages are sent and handled correcly, and also exercise the scanarios described above.
This commit is contained in:
parent
6b3844ea66
commit
8244b7a78c
@ -286,6 +286,9 @@ type fundingManager struct {
|
||||
localDiscoveryMtx sync.Mutex
|
||||
localDiscoverySignals map[lnwire.ChannelID]chan struct{}
|
||||
|
||||
handleFundingLockedMtx sync.RWMutex
|
||||
handleFundingLockedBarriers map[lnwire.ChannelID]struct{}
|
||||
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
@ -331,6 +334,7 @@ func newFundingManager(cfg fundingConfig) (*fundingManager, error) {
|
||||
fundingMsgs: make(chan interface{}, msgBufferSize),
|
||||
fundingRequests: make(chan *initFundingMsg, msgBufferSize),
|
||||
localDiscoverySignals: make(map[lnwire.ChannelID]chan struct{}),
|
||||
handleFundingLockedBarriers: make(map[lnwire.ChannelID]struct{}),
|
||||
queries: make(chan interface{}, 1),
|
||||
quit: make(chan struct{}),
|
||||
}, nil
|
||||
@ -420,10 +424,22 @@ func (f *fundingManager) Start() error {
|
||||
return err
|
||||
}
|
||||
|
||||
fndgLog.Debugf("channel with opening state %v found",
|
||||
channelState)
|
||||
|
||||
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
|
||||
fndgLog.Debugf("channel (%v) with opening state %v found",
|
||||
chanID, channelState)
|
||||
|
||||
// Set up the channel barriers again, to make sure
|
||||
// waitUntilChannelOpen correctly waits until the opening
|
||||
// process is completely over.
|
||||
f.barrierMtx.Lock()
|
||||
fndgLog.Tracef("Loading pending ChannelPoint(%v), "+
|
||||
"creating chan barrier", channel.FundingOutpoint)
|
||||
f.newChanBarriers[chanID] = make(chan struct{})
|
||||
f.barrierMtx.Unlock()
|
||||
|
||||
// Set up a localDiscoverySignals to make sure we finish sending
|
||||
// our own fundingLocked and channel announcements before
|
||||
// processing a received fundingLocked.
|
||||
f.localDiscoverySignals[chanID] = make(chan struct{})
|
||||
|
||||
// If we did find the channel in the opening state database, we
|
||||
@ -587,6 +603,7 @@ func (f *fundingManager) reservationCoordinator() {
|
||||
case *fundingSignedMsg:
|
||||
f.handleFundingSigned(fmsg)
|
||||
case *fundingLockedMsg:
|
||||
f.wg.Add(1)
|
||||
go f.handleFundingLocked(fmsg)
|
||||
case *fundingErrorMsg:
|
||||
f.handleErrorMsg(fmsg)
|
||||
@ -1520,17 +1537,46 @@ func (f *fundingManager) processFundingLocked(msg *lnwire.FundingLocked,
|
||||
// handleFundingLocked finalizes the channel funding process and enables the
|
||||
// channel to enter normal operating mode.
|
||||
func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
|
||||
defer f.wg.Done()
|
||||
|
||||
// If we are currently in the process of handling a funding locked
|
||||
// message for this channel, ignore.
|
||||
f.handleFundingLockedMtx.Lock()
|
||||
_, ok := f.handleFundingLockedBarriers[fmsg.msg.ChanID]
|
||||
if ok {
|
||||
fndgLog.Infof("Already handling fundingLocked for "+
|
||||
"ChannelID(%v), ignoring.", fmsg.msg.ChanID)
|
||||
f.handleFundingLockedMtx.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// If not already handling fundingLocked for this channel, set up
|
||||
// barrier, and move on.
|
||||
f.handleFundingLockedBarriers[fmsg.msg.ChanID] = struct{}{}
|
||||
f.handleFundingLockedMtx.Unlock()
|
||||
|
||||
defer func() {
|
||||
f.handleFundingLockedMtx.Lock()
|
||||
delete(f.handleFundingLockedBarriers, fmsg.msg.ChanID)
|
||||
f.handleFundingLockedMtx.Unlock()
|
||||
}()
|
||||
|
||||
f.localDiscoveryMtx.Lock()
|
||||
localDiscoverySignal, ok := f.localDiscoverySignals[fmsg.msg.ChanID]
|
||||
f.localDiscoveryMtx.Unlock()
|
||||
|
||||
if ok {
|
||||
// Before we proceed with processing the funding locked
|
||||
// message, we'll wait for the lcoal waitForFundingConfirmation
|
||||
// message, we'll wait for the local waitForFundingConfirmation
|
||||
// goroutine to signal that it has the necessary state in
|
||||
// place. Otherwise, we may be missing critical information
|
||||
// required to handle forwarded HTLC's.
|
||||
<-localDiscoverySignal
|
||||
select {
|
||||
case <-localDiscoverySignal:
|
||||
// Fallthrough
|
||||
case <-f.quit:
|
||||
return
|
||||
}
|
||||
|
||||
// With the signal received, we can now safely delete the entry
|
||||
// from the map.
|
||||
@ -1550,7 +1596,14 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO(roasbeef): done nothing if repeat message sent
|
||||
// If the RemoteNextRevocation is non-nil, it means that we have
|
||||
// already processed fundingLocked for this channel, so ignore.
|
||||
if channel.RemoteNextRevocation() != nil {
|
||||
fndgLog.Infof("Received duplicate fundingLocked for "+
|
||||
"ChannelID(%v), ignoring.", chanID)
|
||||
channel.Stop()
|
||||
return
|
||||
}
|
||||
|
||||
// The funding locked message contains the next commitment point we'll
|
||||
// need to create the next commitment state for the remote party. So
|
||||
@ -1574,9 +1627,13 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
|
||||
// that commitment related modifications to this channel can
|
||||
// now proceed.
|
||||
f.barrierMtx.Lock()
|
||||
fndgLog.Tracef("Closing chan barrier for ChanID(%v)", chanID)
|
||||
close(f.newChanBarriers[chanID])
|
||||
chanBarrier, ok := f.newChanBarriers[chanID]
|
||||
if ok {
|
||||
fndgLog.Tracef("Closing chan barrier for ChanID(%v)",
|
||||
chanID)
|
||||
close(chanBarrier)
|
||||
delete(f.newChanBarriers, chanID)
|
||||
}
|
||||
f.barrierMtx.Unlock()
|
||||
}()
|
||||
|
||||
|
@ -103,10 +103,13 @@ type testNode struct {
|
||||
privKey *btcec.PrivateKey
|
||||
msgChan chan lnwire.Message
|
||||
announceChan chan lnwire.Message
|
||||
arbiterChan chan *lnwallet.LightningChannel
|
||||
publTxChan chan *wire.MsgTx
|
||||
fundingMgr *fundingManager
|
||||
peer *peer
|
||||
mockNotifier *mockNotifier
|
||||
testDir string
|
||||
shutdownChannel chan struct{}
|
||||
}
|
||||
|
||||
func disableFndgLogger(t *testing.T) {
|
||||
@ -115,17 +118,11 @@ func disableFndgLogger(t *testing.T) {
|
||||
fndgLog = btclog.Disabled
|
||||
}
|
||||
|
||||
func createTestWallet(tempTestDir string, netParams *chaincfg.Params,
|
||||
func createTestWallet(cdb *channeldb.DB, netParams *chaincfg.Params,
|
||||
notifier chainntnfs.ChainNotifier, wc lnwallet.WalletController,
|
||||
signer lnwallet.Signer, bio lnwallet.BlockChainIO,
|
||||
estimator lnwallet.FeeEstimator) (*lnwallet.LightningWallet, error) {
|
||||
|
||||
dbDir := filepath.Join(tempTestDir, "cdb")
|
||||
cdb, err := channeldb.Open(dbDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wallet, err := lnwallet.NewLightningWallet(lnwallet.Config{
|
||||
Database: cdb,
|
||||
Notifier: notifier,
|
||||
@ -150,7 +147,8 @@ func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey,
|
||||
tempTestDir string, hdSeed []byte, netParams *chaincfg.Params,
|
||||
chainNotifier chainntnfs.ChainNotifier, estimator lnwallet.FeeEstimator,
|
||||
sentMessages chan lnwire.Message, sentAnnouncements chan lnwire.Message,
|
||||
publTxChan chan *wire.MsgTx, shutdownChan chan struct{}) (*fundingManager, error) {
|
||||
publTxChan chan *wire.MsgTx, shutdownChan chan struct{},
|
||||
arbiterChan chan *lnwallet.LightningChannel, fundingPeer *peer) (*fundingManager, error) {
|
||||
|
||||
wc := &mockWalletController{
|
||||
rootKey: alicePrivKey,
|
||||
@ -161,13 +159,18 @@ func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey,
|
||||
}
|
||||
bio := &mockChainIO{}
|
||||
|
||||
lnw, err := createTestWallet(tempTestDir, netParams,
|
||||
dbDir := filepath.Join(tempTestDir, "cdb")
|
||||
cdb, err := channeldb.Open(dbDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lnw, err := createTestWallet(cdb, netParams,
|
||||
chainNotifier, wc, signer, bio, estimator)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create test ln wallet: %v", err)
|
||||
}
|
||||
|
||||
arbiterChan := make(chan *lnwallet.LightningChannel)
|
||||
var chanIDSeed [32]byte
|
||||
|
||||
f, err := newFundingManager(fundingConfig{
|
||||
@ -199,14 +202,26 @@ func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey,
|
||||
return nil
|
||||
},
|
||||
FindPeer: func(peerKey *btcec.PublicKey) (*peer, error) {
|
||||
return nil, nil
|
||||
return fundingPeer, nil
|
||||
},
|
||||
TempChanIDSeed: chanIDSeed,
|
||||
FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) {
|
||||
// This is not expected to be used in the current tests.
|
||||
// Add an implementation if that changes.
|
||||
t.Fatal("did not expect FindChannel to be called")
|
||||
return nil, nil
|
||||
dbChannels, err := cdb.FetchAllChannels()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, channel := range dbChannels {
|
||||
if chanID.IsChanPoint(&channel.FundingOutpoint) {
|
||||
return lnwallet.NewLightningChannel(
|
||||
signer,
|
||||
nil,
|
||||
estimator,
|
||||
channel)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("unable to find channel")
|
||||
},
|
||||
NumRequiredConfs: func(chanAmt btcutil.Amount,
|
||||
pushAmt lnwire.MilliSatoshi) uint16 {
|
||||
@ -226,12 +241,14 @@ func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey,
|
||||
|
||||
func recreateAliceFundingManager(t *testing.T, alice *testNode) {
|
||||
// Stop the old fundingManager before creating a new one.
|
||||
close(alice.shutdownChannel)
|
||||
if err := alice.fundingMgr.Stop(); err != nil {
|
||||
t.Fatalf("unable to stop old fundingManager: %v", err)
|
||||
}
|
||||
|
||||
aliceMsgChan := make(chan lnwire.Message)
|
||||
aliceAnnounceChan := make(chan lnwire.Message)
|
||||
shutdownChan := make(chan struct{})
|
||||
|
||||
oldCfg := alice.fundingMgr.cfg
|
||||
|
||||
@ -245,7 +262,11 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
|
||||
return nil, nil
|
||||
},
|
||||
SendAnnouncement: func(msg lnwire.Message) error {
|
||||
aliceAnnounceChan <- msg
|
||||
select {
|
||||
case aliceAnnounceChan <- msg:
|
||||
case <-shutdownChan:
|
||||
return fmt.Errorf("shutting down")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) {
|
||||
@ -254,12 +275,14 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
|
||||
ArbiterChan: oldCfg.ArbiterChan,
|
||||
SendToPeer: func(target *btcec.PublicKey,
|
||||
msgs ...lnwire.Message) error {
|
||||
aliceMsgChan <- msgs[0]
|
||||
select {
|
||||
case aliceMsgChan <- msgs[0]:
|
||||
case <-shutdownChan:
|
||||
return fmt.Errorf("shutting down")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
FindPeer: func(peerKey *btcec.PublicKey) (*peer, error) {
|
||||
return nil, nil
|
||||
},
|
||||
FindPeer: oldCfg.FindPeer,
|
||||
TempChanIDSeed: oldCfg.TempChanIDSeed,
|
||||
FindChannel: oldCfg.FindChannel,
|
||||
})
|
||||
@ -270,13 +293,14 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
|
||||
alice.fundingMgr = f
|
||||
alice.msgChan = aliceMsgChan
|
||||
alice.announceChan = aliceAnnounceChan
|
||||
alice.shutdownChannel = shutdownChan
|
||||
|
||||
if err = f.Start(); err != nil {
|
||||
t.Fatalf("failed starting fundingManager: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func setupFundingManagers(t *testing.T, shutdownChannel chan struct{}) (*testNode, *testNode) {
|
||||
func setupFundingManagers(t *testing.T) (*testNode, *testNode) {
|
||||
// We need to set the global config, as fundingManager uses
|
||||
// MaxPendingChannels, and it is usually set in lndMain().
|
||||
cfg = &config{
|
||||
@ -286,6 +310,16 @@ func setupFundingManagers(t *testing.T, shutdownChannel chan struct{}) (*testNod
|
||||
netParams := activeNetParams.Params
|
||||
estimator := lnwallet.StaticFeeEstimator{FeeRate: 250}
|
||||
|
||||
aliceNewChannelsChan := make(chan *newChannelMsg)
|
||||
alicePeer := &peer{
|
||||
newChannels: aliceNewChannelsChan,
|
||||
}
|
||||
|
||||
bobNewChannelsChan := make(chan *newChannelMsg)
|
||||
bobPeer := &peer{
|
||||
newChannels: bobNewChannelsChan,
|
||||
}
|
||||
|
||||
aliceMockNotifier := &mockNotifier{
|
||||
confChannel: make(chan *chainntnfs.TxConfirmation, 1),
|
||||
epochChan: make(chan *chainntnfs.BlockEpoch, 1),
|
||||
@ -299,11 +333,13 @@ func setupFundingManagers(t *testing.T, shutdownChannel chan struct{}) (*testNod
|
||||
aliceMsgChan := make(chan lnwire.Message)
|
||||
aliceAnnounceChan := make(chan lnwire.Message)
|
||||
alicePublTxChan := make(chan *wire.MsgTx, 1)
|
||||
aliceArbiterChan := make(chan *lnwallet.LightningChannel)
|
||||
aliceShutdownChannel := make(chan struct{})
|
||||
|
||||
aliceFundingMgr, err := createTestFundingManager(t, alicePubKey,
|
||||
aliceTestDir, alicePrivKeyBytes[:], netParams, aliceMockNotifier,
|
||||
estimator, aliceMsgChan, aliceAnnounceChan, alicePublTxChan,
|
||||
shutdownChannel)
|
||||
aliceShutdownChannel, aliceArbiterChan, alicePeer)
|
||||
if err != nil {
|
||||
t.Fatalf("failed creating fundingManager: %v", err)
|
||||
}
|
||||
@ -316,10 +352,13 @@ func setupFundingManagers(t *testing.T, shutdownChannel chan struct{}) (*testNod
|
||||
privKey: alicePrivKey,
|
||||
msgChan: aliceMsgChan,
|
||||
announceChan: aliceAnnounceChan,
|
||||
arbiterChan: aliceArbiterChan,
|
||||
publTxChan: alicePublTxChan,
|
||||
fundingMgr: aliceFundingMgr,
|
||||
peer: alicePeer,
|
||||
mockNotifier: aliceMockNotifier,
|
||||
testDir: aliceTestDir,
|
||||
shutdownChannel: aliceShutdownChannel,
|
||||
}
|
||||
|
||||
bobMockNotifier := &mockNotifier{
|
||||
@ -335,9 +374,13 @@ func setupFundingManagers(t *testing.T, shutdownChannel chan struct{}) (*testNod
|
||||
bobMsgChan := make(chan lnwire.Message)
|
||||
bobAnnounceChan := make(chan lnwire.Message)
|
||||
bobPublTxChan := make(chan *wire.MsgTx, 1)
|
||||
bobArbiterChan := make(chan *lnwallet.LightningChannel)
|
||||
bobShutdownChannel := make(chan struct{})
|
||||
|
||||
bobFundingMgr, err := createTestFundingManager(t, bobPubKey, bobTestDir,
|
||||
bobPrivKeyBytes[:], netParams, bobMockNotifier, estimator,
|
||||
bobMsgChan, bobAnnounceChan, bobPublTxChan, shutdownChannel)
|
||||
bobMsgChan, bobAnnounceChan, bobPublTxChan, shutdownChannel,
|
||||
bobArbiterChan, bobPeer)
|
||||
if err != nil {
|
||||
t.Fatalf("failed creating fundingManager: %v", err)
|
||||
}
|
||||
@ -350,17 +393,21 @@ func setupFundingManagers(t *testing.T, shutdownChannel chan struct{}) (*testNod
|
||||
privKey: bobPrivKey,
|
||||
msgChan: bobMsgChan,
|
||||
announceChan: bobAnnounceChan,
|
||||
arbiterChan: bobArbiterChan,
|
||||
publTxChan: bobPublTxChan,
|
||||
fundingMgr: bobFundingMgr,
|
||||
peer: bobPeer,
|
||||
mockNotifier: bobMockNotifier,
|
||||
testDir: bobTestDir,
|
||||
shutdownChannel: bobShutdownChannel,
|
||||
}
|
||||
|
||||
return alice, bob
|
||||
}
|
||||
|
||||
func tearDownFundingManagers(t *testing.T, a, b *testNode, shutdownChannel chan struct{}) {
|
||||
close(shutdownChannel)
|
||||
func tearDownFundingManagers(t *testing.T, a, b *testNode) {
|
||||
close(a.shutdownChannel)
|
||||
close(b.shutdownChannel)
|
||||
|
||||
if err := a.fundingMgr.Stop(); err != nil {
|
||||
t.Fatalf("unable to stop fundingManager: %v", err)
|
||||
@ -515,10 +562,8 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
|
||||
func TestFundingManagerNormalWorkflow(t *testing.T) {
|
||||
disableFndgLogger(t)
|
||||
|
||||
shutdownChannel := make(chan struct{})
|
||||
|
||||
alice, bob := setupFundingManagers(t, shutdownChannel)
|
||||
defer tearDownFundingManagers(t, alice, bob, shutdownChannel)
|
||||
alice, bob := setupFundingManagers(t)
|
||||
defer tearDownFundingManagers(t, alice, bob)
|
||||
|
||||
// We will consume the channel updates as we go, so no buffering is needed.
|
||||
updateChan := make(chan *lnrpc.OpenStatusUpdate)
|
||||
@ -557,28 +602,43 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
|
||||
|
||||
// After the funding transaction is mined, Alice will send
|
||||
// fundingLocked to Bob.
|
||||
var fundingLockedAlice lnwire.Message
|
||||
var aliceMsg lnwire.Message
|
||||
select {
|
||||
case fundingLockedAlice = <-alice.msgChan:
|
||||
case aliceMsg = <-alice.msgChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send fundingLocked")
|
||||
}
|
||||
if fundingLockedAlice.MsgType() != lnwire.MsgFundingLocked {
|
||||
t.Fatalf("expected fundingLocked sent from Alice, "+
|
||||
"instead got %T", fundingLockedAlice)
|
||||
|
||||
fundingLockedAlice, ok := aliceMsg.(*lnwire.FundingLocked)
|
||||
if !ok {
|
||||
errorMsg, gotError := aliceMsg.(*lnwire.Error)
|
||||
if gotError {
|
||||
t.Fatalf("expected FundingLocked to be sent "+
|
||||
"from alice, instead got error: %v",
|
||||
lnwire.ErrorCode(errorMsg.Data[0]))
|
||||
}
|
||||
t.Fatalf("expected FundingLocked to be sent from alice, "+
|
||||
"instead got %T", aliceMsg)
|
||||
}
|
||||
|
||||
// And similarly Bob will send funding locked to Alice.
|
||||
var fundingLockedBob lnwire.Message
|
||||
var bobMsg lnwire.Message
|
||||
select {
|
||||
case fundingLockedBob = <-bob.msgChan:
|
||||
case bobMsg = <-bob.msgChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send fundingLocked")
|
||||
}
|
||||
|
||||
if fundingLockedBob.MsgType() != lnwire.MsgFundingLocked {
|
||||
t.Fatalf("expected fundingLocked sent from Bob, "+
|
||||
"instead got %T", fundingLockedBob)
|
||||
fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked)
|
||||
if !ok {
|
||||
errorMsg, gotError := bobMsg.(*lnwire.Error)
|
||||
if gotError {
|
||||
t.Fatalf("expected FundingLocked to be sent "+
|
||||
"from bob, instead got error: %v",
|
||||
lnwire.ErrorCode(errorMsg.Data[0]))
|
||||
}
|
||||
t.Fatalf("expected FundingLocked to be sent from bob, "+
|
||||
"instead got %T", bobMsg)
|
||||
}
|
||||
|
||||
// Sleep to make sure database write is finished.
|
||||
@ -698,7 +758,7 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
|
||||
t.Fatalf("alice did not send OpenStatusUpdate")
|
||||
}
|
||||
|
||||
_, ok := openUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanOpen)
|
||||
_, ok = openUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanOpen)
|
||||
if !ok {
|
||||
t.Fatal("OpenStatusUpdate was not OpenStatusUpdate_ChanOpen")
|
||||
}
|
||||
@ -718,15 +778,45 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
|
||||
t.Fatalf("expected to not find channel state, but got: %v", state)
|
||||
}
|
||||
|
||||
// Exchange the fundingLocked messages.
|
||||
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
|
||||
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
|
||||
|
||||
// They should both send the new channel to the breach arbiter.
|
||||
select {
|
||||
case <-alice.arbiterChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send channel to breach arbiter")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-bob.arbiterChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send channel to breach arbiter")
|
||||
}
|
||||
|
||||
// And send the new channel state to their peer.
|
||||
select {
|
||||
case c := <-alice.peer.newChannels:
|
||||
close(c.done)
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send new channel to peer")
|
||||
}
|
||||
|
||||
select {
|
||||
case c := <-bob.peer.newChannels:
|
||||
close(c.done)
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send new channel to peer")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestFundingManagerRestartBehavior(t *testing.T) {
|
||||
disableFndgLogger(t)
|
||||
|
||||
shutdownChannel := make(chan struct{})
|
||||
|
||||
alice, bob := setupFundingManagers(t, shutdownChannel)
|
||||
defer tearDownFundingManagers(t, alice, bob, shutdownChannel)
|
||||
alice, bob := setupFundingManagers(t)
|
||||
defer tearDownFundingManagers(t, alice, bob)
|
||||
|
||||
// Run through the process of opening the channel, up until the funding
|
||||
// transaction is broadcasted.
|
||||
@ -777,7 +867,6 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
|
||||
// sent the fundingLocked message, while Alice failed sending it. In
|
||||
// Alice's case this means that there should be no messages for Bob, and
|
||||
// the channel should still be in state 'markedOpen'
|
||||
|
||||
select {
|
||||
case msg := <-alice.msgChan:
|
||||
t.Fatalf("did not expect any message from Alice: %v", msg)
|
||||
@ -786,10 +875,23 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
|
||||
}
|
||||
|
||||
// Bob will send funding locked to Alice
|
||||
fundingLockedBob := <-bob.msgChan
|
||||
if fundingLockedBob.MsgType() != lnwire.MsgFundingLocked {
|
||||
t.Fatalf("expected fundingLocked sent from Bob, "+
|
||||
"instead got %T", fundingLockedBob)
|
||||
var bobMsg lnwire.Message
|
||||
select {
|
||||
case bobMsg = <-bob.msgChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send fundingLocked")
|
||||
}
|
||||
|
||||
fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked)
|
||||
if !ok {
|
||||
errorMsg, gotError := bobMsg.(*lnwire.Error)
|
||||
if gotError {
|
||||
t.Fatalf("expected FundingLocked to be sent "+
|
||||
"from bob, instead got error: %v",
|
||||
lnwire.ErrorCode(errorMsg.Data[0]))
|
||||
}
|
||||
t.Fatalf("expected FundingLocked to be sent from bob, "+
|
||||
"instead got %T", bobMsg)
|
||||
}
|
||||
|
||||
// Sleep to make sure database write is finished.
|
||||
@ -825,10 +927,23 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
|
||||
return fmt.Errorf("intentional error in SendAnnouncement")
|
||||
}
|
||||
|
||||
fundingLockedAlice := <-alice.msgChan
|
||||
if fundingLockedAlice.MsgType() != lnwire.MsgFundingLocked {
|
||||
t.Fatalf("expected fundingLocked sent from Alice, "+
|
||||
"instead got %T", fundingLockedAlice)
|
||||
var aliceMsg lnwire.Message
|
||||
select {
|
||||
case aliceMsg = <-alice.msgChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send fundingLocked")
|
||||
}
|
||||
|
||||
fundingLockedAlice, ok := aliceMsg.(*lnwire.FundingLocked)
|
||||
if !ok {
|
||||
errorMsg, gotError := aliceMsg.(*lnwire.Error)
|
||||
if gotError {
|
||||
t.Fatalf("expected FundingLocked to be sent "+
|
||||
"from alice, instead got error: %v",
|
||||
lnwire.ErrorCode(errorMsg.Data[0]))
|
||||
}
|
||||
t.Fatalf("expected FundingLocked to be sent from alice, "+
|
||||
"instead got %T", aliceMsg)
|
||||
}
|
||||
|
||||
// Sleep to make sure database write is finished.
|
||||
@ -953,15 +1068,45 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
|
||||
t.Fatalf("expected to not find channel state, but got: %v", state)
|
||||
}
|
||||
|
||||
// Exchange the fundingLocked messages.
|
||||
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
|
||||
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
|
||||
|
||||
// They should both send the new channel to the breach arbiter.
|
||||
select {
|
||||
case <-alice.arbiterChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send channel to breach arbiter")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-bob.arbiterChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send channel to breach arbiter")
|
||||
}
|
||||
|
||||
// And send the new channel state to their peer.
|
||||
select {
|
||||
case c := <-alice.peer.newChannels:
|
||||
close(c.done)
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send new channel to peer")
|
||||
}
|
||||
|
||||
select {
|
||||
case c := <-bob.peer.newChannels:
|
||||
close(c.done)
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send new channel to peer")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestFundingManagerFundingTimeout(t *testing.T) {
|
||||
disableFndgLogger(t)
|
||||
|
||||
shutdownChannel := make(chan struct{})
|
||||
|
||||
alice, bob := setupFundingManagers(t, shutdownChannel)
|
||||
defer tearDownFundingManagers(t, alice, bob, shutdownChannel)
|
||||
alice, bob := setupFundingManagers(t)
|
||||
defer tearDownFundingManagers(t, alice, bob)
|
||||
|
||||
// We will consume the channel updates as we go, so no buffering is needed.
|
||||
updateChan := make(chan *lnrpc.OpenStatusUpdate)
|
||||
@ -1016,3 +1161,805 @@ func TestFundingManagerFundingTimeout(t *testing.T) {
|
||||
len(pendingChannels))
|
||||
}
|
||||
}
|
||||
|
||||
// TestFundingManagerReceiveFundingLockedTwice checks that the fundingManager
|
||||
// continues to operate as expected in case we receive a duplicate fundingLocked
|
||||
// message.
|
||||
func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) {
|
||||
disableFndgLogger(t)
|
||||
|
||||
alice, bob := setupFundingManagers(t)
|
||||
defer tearDownFundingManagers(t, alice, bob)
|
||||
|
||||
// We will consume the channel updates as we go, so no buffering is needed.
|
||||
updateChan := make(chan *lnrpc.OpenStatusUpdate)
|
||||
|
||||
// Run through the process of opening the channel, up until the funding
|
||||
// transaction is broadcasted.
|
||||
fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan)
|
||||
|
||||
// Notify that transaction was mined
|
||||
alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
|
||||
bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
|
||||
|
||||
// Give fundingManager time to process the newly mined tx and write
|
||||
//state to database.
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// The funding transaction was mined, so assert that both funding
|
||||
// managers now have the state of this channel 'markedOpen' in their
|
||||
// internal state machine.
|
||||
state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to get channel state: %v", err)
|
||||
}
|
||||
|
||||
if state != markedOpen {
|
||||
t.Fatalf("expected state to be markedOpen, was %v", state)
|
||||
}
|
||||
state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to get channel state: %v", err)
|
||||
}
|
||||
|
||||
if state != markedOpen {
|
||||
t.Fatalf("expected state to be markedOpen, was %v", state)
|
||||
}
|
||||
|
||||
// After the funding transaction is mined, Alice will send
|
||||
// fundingLocked to Bob.
|
||||
var aliceMsg lnwire.Message
|
||||
select {
|
||||
case aliceMsg = <-alice.msgChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send fundingLocked")
|
||||
}
|
||||
|
||||
_, ok := aliceMsg.(*lnwire.FundingLocked)
|
||||
if !ok {
|
||||
errorMsg, gotError := aliceMsg.(*lnwire.Error)
|
||||
if gotError {
|
||||
t.Fatalf("expected FundingLocked to be sent "+
|
||||
"from alice, instead got error: %v",
|
||||
lnwire.ErrorCode(errorMsg.Data[0]))
|
||||
}
|
||||
t.Fatalf("expected FundingLocked to be sent from alice, "+
|
||||
"instead got %T", aliceMsg)
|
||||
}
|
||||
|
||||
// And similarly Bob will send funding locked to Alice.
|
||||
var bobMsg lnwire.Message
|
||||
select {
|
||||
case bobMsg = <-bob.msgChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send fundingLocked")
|
||||
}
|
||||
|
||||
fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked)
|
||||
if !ok {
|
||||
errorMsg, gotError := bobMsg.(*lnwire.Error)
|
||||
if gotError {
|
||||
t.Fatalf("expected FundingLocked to be sent "+
|
||||
"from bob, instead got error: %v",
|
||||
lnwire.ErrorCode(errorMsg.Data[0]))
|
||||
}
|
||||
t.Fatalf("expected FundingLocked to be sent from bob, "+
|
||||
"instead got %T", bobMsg)
|
||||
}
|
||||
|
||||
// Sleep to make sure database write is finished.
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Check that the state machine is updated accordingly
|
||||
state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to get channel state: %v", err)
|
||||
}
|
||||
|
||||
if state != fundingLockedSent {
|
||||
t.Fatalf("expected state to be fundingLockedSent, was %v", state)
|
||||
}
|
||||
state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to get channel state: %v", err)
|
||||
}
|
||||
|
||||
if state != fundingLockedSent {
|
||||
t.Fatalf("expected state to be fundingLockedSent, was %v", state)
|
||||
}
|
||||
|
||||
// After the FundingLocked message is sent, the channel will be announced.
|
||||
// A chanAnnouncement consists of three distinct messages:
|
||||
// 1) ChannelAnnouncement
|
||||
// 2) ChannelUpdate
|
||||
// 3) AnnounceSignatures
|
||||
// that will be announced in no particular order.
|
||||
// A node announcement will also be sent.
|
||||
announcements := make([]lnwire.Message, 4)
|
||||
for i := 0; i < len(announcements); i++ {
|
||||
select {
|
||||
case announcements[i] = <-alice.announceChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send announcement %v", i)
|
||||
}
|
||||
}
|
||||
|
||||
gotChannelAnnouncement := false
|
||||
gotChannelUpdate := false
|
||||
gotAnnounceSignatures := false
|
||||
gotNodeAnnouncement := false
|
||||
|
||||
for _, msg := range announcements {
|
||||
switch msg.(type) {
|
||||
case *lnwire.ChannelAnnouncement:
|
||||
gotChannelAnnouncement = true
|
||||
case *lnwire.ChannelUpdate:
|
||||
gotChannelUpdate = true
|
||||
case *lnwire.AnnounceSignatures:
|
||||
gotAnnounceSignatures = true
|
||||
case *lnwire.NodeAnnouncement:
|
||||
gotNodeAnnouncement = true
|
||||
}
|
||||
}
|
||||
|
||||
if !gotChannelAnnouncement {
|
||||
t.Fatalf("did not get ChannelAnnouncement from Alice")
|
||||
}
|
||||
if !gotChannelUpdate {
|
||||
t.Fatalf("did not get ChannelUpdate from Alice")
|
||||
}
|
||||
if !gotAnnounceSignatures {
|
||||
t.Fatalf("did not get AnnounceSignatures from Alice")
|
||||
}
|
||||
if !gotNodeAnnouncement {
|
||||
t.Fatalf("did not get NodeAnnouncement from Alice")
|
||||
}
|
||||
|
||||
// Do the check for Bob as well.
|
||||
for i := 0; i < len(announcements); i++ {
|
||||
select {
|
||||
case announcements[i] = <-bob.announceChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send announcement %v", i)
|
||||
}
|
||||
}
|
||||
|
||||
gotChannelAnnouncement = false
|
||||
gotChannelUpdate = false
|
||||
gotAnnounceSignatures = false
|
||||
gotNodeAnnouncement = false
|
||||
|
||||
for _, msg := range announcements {
|
||||
switch msg.(type) {
|
||||
case *lnwire.ChannelAnnouncement:
|
||||
gotChannelAnnouncement = true
|
||||
case *lnwire.ChannelUpdate:
|
||||
gotChannelUpdate = true
|
||||
case *lnwire.AnnounceSignatures:
|
||||
gotAnnounceSignatures = true
|
||||
case *lnwire.NodeAnnouncement:
|
||||
gotNodeAnnouncement = true
|
||||
}
|
||||
}
|
||||
|
||||
if !gotChannelAnnouncement {
|
||||
t.Fatalf("did not get ChannelAnnouncement from Bob")
|
||||
}
|
||||
if !gotChannelUpdate {
|
||||
t.Fatalf("did not get ChannelUpdate from Bob")
|
||||
}
|
||||
if !gotAnnounceSignatures {
|
||||
t.Fatalf("did not get AnnounceSignatures from Bob")
|
||||
}
|
||||
if !gotNodeAnnouncement {
|
||||
t.Fatalf("did not get NodeAnnouncement from Bob")
|
||||
}
|
||||
|
||||
// The funding process is now finished, wait for the
|
||||
// OpenStatusUpdate_ChanOpen update
|
||||
var openUpdate *lnrpc.OpenStatusUpdate
|
||||
select {
|
||||
case openUpdate = <-updateChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send OpenStatusUpdate")
|
||||
}
|
||||
|
||||
_, ok = openUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanOpen)
|
||||
if !ok {
|
||||
t.Fatal("OpenStatusUpdate was not OpenStatusUpdate_ChanOpen")
|
||||
}
|
||||
|
||||
// The internal state-machine should now have deleted the channelStates
|
||||
// from the database, as the channel is announced.
|
||||
state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != ErrChannelNotFound {
|
||||
t.Fatalf("expected to not find channel state, but got: %v", state)
|
||||
}
|
||||
|
||||
// Need to give bob time to update database.
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != ErrChannelNotFound {
|
||||
t.Fatalf("expected to not find channel state, but got: %v", state)
|
||||
}
|
||||
|
||||
// Send the fundingLocked message twice to Alice.
|
||||
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
|
||||
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
|
||||
|
||||
// Alice should send the new channel to the breach arbiter.
|
||||
select {
|
||||
case <-alice.arbiterChan:
|
||||
case <-time.After(time.Millisecond * 300):
|
||||
t.Fatalf("alice did not send channel to breach arbiter")
|
||||
}
|
||||
|
||||
// And send the new channel state to their peer.
|
||||
select {
|
||||
case c := <-alice.peer.newChannels:
|
||||
close(c.done)
|
||||
case <-time.After(time.Millisecond * 300):
|
||||
t.Fatalf("alice did not send new channel to peer")
|
||||
}
|
||||
|
||||
// Alice should not send the channel state the second time, as the
|
||||
// second funding locked should just be ignored.
|
||||
select {
|
||||
case <-alice.arbiterChan:
|
||||
t.Fatalf("alice sent channel to breach arbiter a second time")
|
||||
case <-time.After(time.Millisecond * 300):
|
||||
// Expected
|
||||
}
|
||||
select {
|
||||
case <-alice.peer.newChannels:
|
||||
t.Fatalf("alice sent new channel to peer a second time")
|
||||
case <-time.After(time.Millisecond * 300):
|
||||
// Expected
|
||||
}
|
||||
|
||||
// Another fundingLocked should also be ignored, since Alice should
|
||||
// have updated her database at this point.
|
||||
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
|
||||
select {
|
||||
case <-alice.arbiterChan:
|
||||
t.Fatalf("alice sent channel to breach arbiter a second time")
|
||||
case <-time.After(time.Millisecond * 300):
|
||||
// Expected
|
||||
}
|
||||
select {
|
||||
case <-alice.peer.newChannels:
|
||||
t.Fatalf("alice sent new channel to peer a second time")
|
||||
case <-time.After(time.Millisecond * 300):
|
||||
// Expected
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// TestFundingManagerRestartAfterChanAnn checks that the fundingManager properly
|
||||
// handles receiving a fundingLocked after the its own fundingLocked and channel
|
||||
// announcement is sent and gets restarted.
|
||||
func TestFundingManagerRestartAfterChanAnn(t *testing.T) {
|
||||
disableFndgLogger(t)
|
||||
|
||||
alice, bob := setupFundingManagers(t)
|
||||
defer tearDownFundingManagers(t, alice, bob)
|
||||
|
||||
// We will consume the channel updates as we go, so no buffering is needed.
|
||||
updateChan := make(chan *lnrpc.OpenStatusUpdate)
|
||||
|
||||
// Run through the process of opening the channel, up until the funding
|
||||
// transaction is broadcasted.
|
||||
fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan)
|
||||
|
||||
// Notify that transaction was mined
|
||||
alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
|
||||
bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
|
||||
|
||||
// Give fundingManager time to process the newly mined tx and write
|
||||
//state to database.
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// The funding transaction was mined, so assert that both funding
|
||||
// managers now have the state of this channel 'markedOpen' in their
|
||||
// internal state machine.
|
||||
state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to get channel state: %v", err)
|
||||
}
|
||||
|
||||
if state != markedOpen {
|
||||
t.Fatalf("expected state to be markedOpen, was %v", state)
|
||||
}
|
||||
state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to get channel state: %v", err)
|
||||
}
|
||||
|
||||
if state != markedOpen {
|
||||
t.Fatalf("expected state to be markedOpen, was %v", state)
|
||||
}
|
||||
|
||||
// After the funding transaction is mined, Alice will send
|
||||
// fundingLocked to Bob.
|
||||
var aliceMsg lnwire.Message
|
||||
select {
|
||||
case aliceMsg = <-alice.msgChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send fundingLocked")
|
||||
}
|
||||
|
||||
fundingLockedAlice, ok := aliceMsg.(*lnwire.FundingLocked)
|
||||
if !ok {
|
||||
errorMsg, gotError := aliceMsg.(*lnwire.Error)
|
||||
if gotError {
|
||||
t.Fatalf("expected FundingLocked to be sent "+
|
||||
"from alice, instead got error: %v",
|
||||
lnwire.ErrorCode(errorMsg.Data[0]))
|
||||
}
|
||||
t.Fatalf("expected FundingLocked to be sent from alice, "+
|
||||
"instead got %T", aliceMsg)
|
||||
}
|
||||
|
||||
// And similarly Bob will send funding locked to Alice.
|
||||
var bobMsg lnwire.Message
|
||||
select {
|
||||
case bobMsg = <-bob.msgChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send fundingLocked")
|
||||
}
|
||||
|
||||
fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked)
|
||||
if !ok {
|
||||
errorMsg, gotError := bobMsg.(*lnwire.Error)
|
||||
if gotError {
|
||||
t.Fatalf("expected FundingLocked to be sent "+
|
||||
"from bob, instead got error: %v",
|
||||
lnwire.ErrorCode(errorMsg.Data[0]))
|
||||
}
|
||||
t.Fatalf("expected FundingLocked to be sent from bob, "+
|
||||
"instead got %T", bobMsg)
|
||||
}
|
||||
|
||||
// Sleep to make sure database write is finished.
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Check that the state machine is updated accordingly
|
||||
state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to get channel state: %v", err)
|
||||
}
|
||||
|
||||
if state != fundingLockedSent {
|
||||
t.Fatalf("expected state to be fundingLockedSent, was %v", state)
|
||||
}
|
||||
state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to get channel state: %v", err)
|
||||
}
|
||||
|
||||
if state != fundingLockedSent {
|
||||
t.Fatalf("expected state to be fundingLockedSent, was %v", state)
|
||||
}
|
||||
|
||||
// After the FundingLocked message is sent, the channel will be announced.
|
||||
// A chanAnnouncement consists of three distinct messages:
|
||||
// 1) ChannelAnnouncement
|
||||
// 2) ChannelUpdate
|
||||
// 3) AnnounceSignatures
|
||||
// that will be announced in no particular order.
|
||||
// A node announcement will also be sent.
|
||||
announcements := make([]lnwire.Message, 4)
|
||||
for i := 0; i < len(announcements); i++ {
|
||||
select {
|
||||
case announcements[i] = <-alice.announceChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send announcement %v", i)
|
||||
}
|
||||
}
|
||||
|
||||
gotChannelAnnouncement := false
|
||||
gotChannelUpdate := false
|
||||
gotAnnounceSignatures := false
|
||||
gotNodeAnnouncement := false
|
||||
|
||||
for _, msg := range announcements {
|
||||
switch msg.(type) {
|
||||
case *lnwire.ChannelAnnouncement:
|
||||
gotChannelAnnouncement = true
|
||||
case *lnwire.ChannelUpdate:
|
||||
gotChannelUpdate = true
|
||||
case *lnwire.AnnounceSignatures:
|
||||
gotAnnounceSignatures = true
|
||||
case *lnwire.NodeAnnouncement:
|
||||
gotNodeAnnouncement = true
|
||||
}
|
||||
}
|
||||
|
||||
if !gotChannelAnnouncement {
|
||||
t.Fatalf("did not get ChannelAnnouncement from Alice")
|
||||
}
|
||||
if !gotChannelUpdate {
|
||||
t.Fatalf("did not get ChannelUpdate from Alice")
|
||||
}
|
||||
if !gotAnnounceSignatures {
|
||||
t.Fatalf("did not get AnnounceSignatures from Alice")
|
||||
}
|
||||
if !gotNodeAnnouncement {
|
||||
t.Fatalf("did not get NodeAnnouncement from Alice")
|
||||
}
|
||||
|
||||
// Do the check for Bob as well.
|
||||
for i := 0; i < len(announcements); i++ {
|
||||
select {
|
||||
case announcements[i] = <-bob.announceChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send announcement %v", i)
|
||||
}
|
||||
}
|
||||
|
||||
gotChannelAnnouncement = false
|
||||
gotChannelUpdate = false
|
||||
gotAnnounceSignatures = false
|
||||
gotNodeAnnouncement = false
|
||||
|
||||
for _, msg := range announcements {
|
||||
switch msg.(type) {
|
||||
case *lnwire.ChannelAnnouncement:
|
||||
gotChannelAnnouncement = true
|
||||
case *lnwire.ChannelUpdate:
|
||||
gotChannelUpdate = true
|
||||
case *lnwire.AnnounceSignatures:
|
||||
gotAnnounceSignatures = true
|
||||
case *lnwire.NodeAnnouncement:
|
||||
gotNodeAnnouncement = true
|
||||
}
|
||||
}
|
||||
|
||||
if !gotChannelAnnouncement {
|
||||
t.Fatalf("did not get ChannelAnnouncement from Bob")
|
||||
}
|
||||
if !gotChannelUpdate {
|
||||
t.Fatalf("did not get ChannelUpdate from Bob")
|
||||
}
|
||||
if !gotAnnounceSignatures {
|
||||
t.Fatalf("did not get AnnounceSignatures from Bob")
|
||||
}
|
||||
if !gotNodeAnnouncement {
|
||||
t.Fatalf("did not get NodeAnnouncement from Bob")
|
||||
}
|
||||
|
||||
// The funding process is now finished, wait for the
|
||||
// OpenStatusUpdate_ChanOpen update
|
||||
var openUpdate *lnrpc.OpenStatusUpdate
|
||||
select {
|
||||
case openUpdate = <-updateChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send OpenStatusUpdate")
|
||||
}
|
||||
|
||||
_, ok = openUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanOpen)
|
||||
if !ok {
|
||||
t.Fatal("OpenStatusUpdate was not OpenStatusUpdate_ChanOpen")
|
||||
}
|
||||
|
||||
// The internal state-machine should now have deleted the channelStates
|
||||
// from the database, as the channel is announced.
|
||||
state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != ErrChannelNotFound {
|
||||
t.Fatalf("expected to not find channel state, but got: %v", state)
|
||||
}
|
||||
|
||||
// Need to give bob time to update database.
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != ErrChannelNotFound {
|
||||
t.Fatalf("expected to not find channel state, but got: %v", state)
|
||||
}
|
||||
|
||||
// At this point we restart Alice's fundingManager, before she receives
|
||||
// the fundingLocked message. After restart, she will receive it, and
|
||||
// we expect her to be able to handle it correctly.
|
||||
recreateAliceFundingManager(t, alice)
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Exchange the fundingLocked messages.
|
||||
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
|
||||
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
|
||||
|
||||
// They should both send the new channel to the breach arbiter.
|
||||
select {
|
||||
case <-alice.arbiterChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send channel to breach arbiter")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-bob.arbiterChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send channel to breach arbiter")
|
||||
}
|
||||
|
||||
// And send the new channel state to their peer.
|
||||
select {
|
||||
case c := <-alice.peer.newChannels:
|
||||
close(c.done)
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send new channel to peer")
|
||||
}
|
||||
|
||||
select {
|
||||
case c := <-bob.peer.newChannels:
|
||||
close(c.done)
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send new channel to peer")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// TestFundingManagerRestartAfterReceivingFundingLocked checks that the
|
||||
// fundingManager continues to operate as expected after it has received
|
||||
// fundingLocked and then gets restarted.
|
||||
func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) {
|
||||
disableFndgLogger(t)
|
||||
|
||||
alice, bob := setupFundingManagers(t)
|
||||
defer tearDownFundingManagers(t, alice, bob)
|
||||
|
||||
// We will consume the channel updates as we go, so no buffering is needed.
|
||||
updateChan := make(chan *lnrpc.OpenStatusUpdate)
|
||||
|
||||
// Run through the process of opening the channel, up until the funding
|
||||
// transaction is broadcasted.
|
||||
fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan)
|
||||
|
||||
// Notify that transaction was mined
|
||||
alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
|
||||
bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
|
||||
|
||||
// Give fundingManager time to process the newly mined tx and write
|
||||
//state to database.
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// The funding transaction was mined, so assert that both funding
|
||||
// managers now have the state of this channel 'markedOpen' in their
|
||||
// internal state machine.
|
||||
state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to get channel state: %v", err)
|
||||
}
|
||||
|
||||
if state != markedOpen {
|
||||
t.Fatalf("expected state to be markedOpen, was %v", state)
|
||||
}
|
||||
state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to get channel state: %v", err)
|
||||
}
|
||||
|
||||
if state != markedOpen {
|
||||
t.Fatalf("expected state to be markedOpen, was %v", state)
|
||||
}
|
||||
|
||||
// After the funding transaction is mined, Alice will send
|
||||
// fundingLocked to Bob.
|
||||
var aliceMsg lnwire.Message
|
||||
select {
|
||||
case aliceMsg = <-alice.msgChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send fundingLocked")
|
||||
}
|
||||
|
||||
fundingLockedAlice, ok := aliceMsg.(*lnwire.FundingLocked)
|
||||
if !ok {
|
||||
errorMsg, gotError := aliceMsg.(*lnwire.Error)
|
||||
if gotError {
|
||||
t.Fatalf("expected FundingLocked to be sent "+
|
||||
"from alice, instead got error: %v",
|
||||
lnwire.ErrorCode(errorMsg.Data[0]))
|
||||
}
|
||||
t.Fatalf("expected FundingLocked to be sent from alice, "+
|
||||
"instead got %T", aliceMsg)
|
||||
}
|
||||
|
||||
// And similarly Bob will send funding locked to Alice.
|
||||
var bobMsg lnwire.Message
|
||||
select {
|
||||
case bobMsg = <-bob.msgChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send fundingLocked")
|
||||
}
|
||||
|
||||
fundingLockedBob, ok := bobMsg.(*lnwire.FundingLocked)
|
||||
if !ok {
|
||||
errorMsg, gotError := bobMsg.(*lnwire.Error)
|
||||
if gotError {
|
||||
t.Fatalf("expected FundingLocked to be sent "+
|
||||
"from bob, instead got error: %v",
|
||||
lnwire.ErrorCode(errorMsg.Data[0]))
|
||||
}
|
||||
t.Fatalf("expected FundingLocked to be sent from bob, "+
|
||||
"instead got %T", bobMsg)
|
||||
}
|
||||
|
||||
// Sleep to make sure database write is finished.
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Check that the state machine is updated accordingly
|
||||
state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to get channel state: %v", err)
|
||||
}
|
||||
|
||||
if state != fundingLockedSent {
|
||||
t.Fatalf("expected state to be fundingLockedSent, was %v", state)
|
||||
}
|
||||
state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to get channel state: %v", err)
|
||||
}
|
||||
|
||||
if state != fundingLockedSent {
|
||||
t.Fatalf("expected state to be fundingLockedSent, was %v", state)
|
||||
}
|
||||
|
||||
// Let Alice immediately get the fundingLocked message.
|
||||
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// She will block waiting for local channel announcements to finish
|
||||
// before sending the new channel state to the peer.
|
||||
select {
|
||||
case <-alice.peer.newChannels:
|
||||
t.Fatalf("did not expect alice to handle the fundinglocked")
|
||||
case <-time.After(time.Millisecond * 300):
|
||||
}
|
||||
|
||||
// At this point we restart Alice's fundingManager. Bob will resend
|
||||
// the fundingLocked after the connection is re-established.
|
||||
recreateAliceFundingManager(t, alice)
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Simulate Bob resending the message when Alice is back up.
|
||||
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
|
||||
|
||||
// After the FundingLocked message is sent, the channel will be announced.
|
||||
// A chanAnnouncement consists of three distinct messages:
|
||||
// 1) ChannelAnnouncement
|
||||
// 2) ChannelUpdate
|
||||
// 3) AnnounceSignatures
|
||||
// that will be announced in no particular order.
|
||||
// A node announcement will also be sent.
|
||||
announcements := make([]lnwire.Message, 4)
|
||||
for i := 0; i < len(announcements); i++ {
|
||||
select {
|
||||
case announcements[i] = <-alice.announceChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send announcement %v", i)
|
||||
}
|
||||
}
|
||||
|
||||
gotChannelAnnouncement := false
|
||||
gotChannelUpdate := false
|
||||
gotAnnounceSignatures := false
|
||||
gotNodeAnnouncement := false
|
||||
|
||||
for _, msg := range announcements {
|
||||
switch msg.(type) {
|
||||
case *lnwire.ChannelAnnouncement:
|
||||
gotChannelAnnouncement = true
|
||||
case *lnwire.ChannelUpdate:
|
||||
gotChannelUpdate = true
|
||||
case *lnwire.AnnounceSignatures:
|
||||
gotAnnounceSignatures = true
|
||||
case *lnwire.NodeAnnouncement:
|
||||
gotNodeAnnouncement = true
|
||||
}
|
||||
}
|
||||
|
||||
if !gotChannelAnnouncement {
|
||||
t.Fatalf("did not get ChannelAnnouncement from Alice")
|
||||
}
|
||||
if !gotChannelUpdate {
|
||||
t.Fatalf("did not get ChannelUpdate from Alice")
|
||||
}
|
||||
if !gotAnnounceSignatures {
|
||||
t.Fatalf("did not get AnnounceSignatures from Alice")
|
||||
}
|
||||
if !gotNodeAnnouncement {
|
||||
t.Fatalf("did not get NodeAnnouncement from Alice")
|
||||
}
|
||||
|
||||
// Do the check for Bob as well.
|
||||
for i := 0; i < len(announcements); i++ {
|
||||
select {
|
||||
case announcements[i] = <-bob.announceChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send announcement %v", i)
|
||||
}
|
||||
}
|
||||
|
||||
gotChannelAnnouncement = false
|
||||
gotChannelUpdate = false
|
||||
gotAnnounceSignatures = false
|
||||
gotNodeAnnouncement = false
|
||||
|
||||
for _, msg := range announcements {
|
||||
switch msg.(type) {
|
||||
case *lnwire.ChannelAnnouncement:
|
||||
gotChannelAnnouncement = true
|
||||
case *lnwire.ChannelUpdate:
|
||||
gotChannelUpdate = true
|
||||
case *lnwire.AnnounceSignatures:
|
||||
gotAnnounceSignatures = true
|
||||
case *lnwire.NodeAnnouncement:
|
||||
gotNodeAnnouncement = true
|
||||
}
|
||||
}
|
||||
|
||||
if !gotChannelAnnouncement {
|
||||
t.Fatalf("did not get ChannelAnnouncement from Bob")
|
||||
}
|
||||
if !gotChannelUpdate {
|
||||
t.Fatalf("did not get ChannelUpdate from Bob")
|
||||
}
|
||||
if !gotAnnounceSignatures {
|
||||
t.Fatalf("did not get AnnounceSignatures from Bob")
|
||||
}
|
||||
if !gotNodeAnnouncement {
|
||||
t.Fatalf("did not get NodeAnnouncement from Bob")
|
||||
}
|
||||
|
||||
// The funding process is now finished. Since we recreated the
|
||||
// fundingManager, we don't have an update channel to synchronize on,
|
||||
// so a small sleep makes sure the database writing is finished.
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// The internal state-machine should now have deleted the channelStates
|
||||
// from the database, as the channel is announced.
|
||||
state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != ErrChannelNotFound {
|
||||
t.Fatalf("expected to not find channel state, but got: %v", state)
|
||||
}
|
||||
|
||||
// Need to give bob time to update database.
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint)
|
||||
if err != ErrChannelNotFound {
|
||||
t.Fatalf("expected to not find channel state, but got: %v", state)
|
||||
}
|
||||
|
||||
// Exchange the fundingLocked messages.
|
||||
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
|
||||
|
||||
// They should both send the new channel to the breach arbiter.
|
||||
select {
|
||||
case <-alice.arbiterChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send channel to breach arbiter")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-bob.arbiterChan:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send channel to breach arbiter")
|
||||
}
|
||||
|
||||
// And send the new channel state to their peer.
|
||||
select {
|
||||
case c := <-alice.peer.newChannels:
|
||||
close(c.done)
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("alice did not send new channel to peer")
|
||||
}
|
||||
|
||||
select {
|
||||
case c := <-bob.peer.newChannels:
|
||||
close(c.done)
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("bob did not send new channel to peer")
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user