Merge pull request #328 from halseth/funding-double-fundinglocked

FundingLocked improvements
This commit is contained in:
Olaoluwa Osuntokun 2017-10-02 16:20:33 -07:00 committed by GitHub
commit 4153712ba7
8 changed files with 854 additions and 314 deletions

@ -190,6 +190,12 @@ type fundingConfig struct {
// channel's funding transaction and initial commitment transaction. // channel's funding transaction and initial commitment transaction.
SendToPeer func(target *btcec.PublicKey, msgs ...lnwire.Message) error SendToPeer func(target *btcec.PublicKey, msgs ...lnwire.Message) error
// NotifyWhenOnline allows the FundingManager to register with a
// subsystem that will notify it when the peer comes online.
// This is used when sending the fundingLocked message, since it MUST be
// delivered after the funding transaction is confirmed.
NotifyWhenOnline func(peer *btcec.PublicKey, connectedChan chan<- struct{})
// FindPeer searches the list of peers connected to the node so that // FindPeer searches the list of peers connected to the node so that
// the FundingManager can notify other daemon subsystems as necessary // the FundingManager can notify other daemon subsystems as necessary
// during the funding process. // during the funding process.
@ -286,6 +292,9 @@ type fundingManager struct {
localDiscoveryMtx sync.Mutex localDiscoveryMtx sync.Mutex
localDiscoverySignals map[lnwire.ChannelID]chan struct{} localDiscoverySignals map[lnwire.ChannelID]chan struct{}
handleFundingLockedMtx sync.RWMutex
handleFundingLockedBarriers map[lnwire.ChannelID]struct{}
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
} }
@ -323,16 +332,17 @@ var (
// fundingManager. // fundingManager.
func newFundingManager(cfg fundingConfig) (*fundingManager, error) { func newFundingManager(cfg fundingConfig) (*fundingManager, error) {
return &fundingManager{ return &fundingManager{
cfg: &cfg, cfg: &cfg,
chanIDKey: cfg.TempChanIDSeed, chanIDKey: cfg.TempChanIDSeed,
activeReservations: make(map[serializedPubKey]pendingChannels), activeReservations: make(map[serializedPubKey]pendingChannels),
signedReservations: make(map[lnwire.ChannelID][32]byte), signedReservations: make(map[lnwire.ChannelID][32]byte),
newChanBarriers: make(map[lnwire.ChannelID]chan struct{}), newChanBarriers: make(map[lnwire.ChannelID]chan struct{}),
fundingMsgs: make(chan interface{}, msgBufferSize), fundingMsgs: make(chan interface{}, msgBufferSize),
fundingRequests: make(chan *initFundingMsg, msgBufferSize), fundingRequests: make(chan *initFundingMsg, msgBufferSize),
localDiscoverySignals: make(map[lnwire.ChannelID]chan struct{}), localDiscoverySignals: make(map[lnwire.ChannelID]chan struct{}),
queries: make(chan interface{}, 1), handleFundingLockedBarriers: make(map[lnwire.ChannelID]struct{}),
quit: make(chan struct{}), queries: make(chan interface{}, 1),
quit: make(chan struct{}),
}, nil }, nil
} }
@ -420,10 +430,22 @@ func (f *fundingManager) Start() error {
return err return err
} }
fndgLog.Debugf("channel with opening state %v found",
channelState)
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
fndgLog.Debugf("channel (%v) with opening state %v found",
chanID, channelState)
// Set up the channel barriers again, to make sure
// waitUntilChannelOpen correctly waits until the opening
// process is completely over.
f.barrierMtx.Lock()
fndgLog.Tracef("Loading pending ChannelPoint(%v), "+
"creating chan barrier", channel.FundingOutpoint)
f.newChanBarriers[chanID] = make(chan struct{})
f.barrierMtx.Unlock()
// Set up a localDiscoverySignals to make sure we finish sending
// our own fundingLocked and channel announcements before
// processing a received fundingLocked.
f.localDiscoverySignals[chanID] = make(chan struct{}) f.localDiscoverySignals[chanID] = make(chan struct{})
// If we did find the channel in the opening state database, we // If we did find the channel in the opening state database, we
@ -587,6 +609,7 @@ func (f *fundingManager) reservationCoordinator() {
case *fundingSignedMsg: case *fundingSignedMsg:
f.handleFundingSigned(fmsg) f.handleFundingSigned(fmsg)
case *fundingLockedMsg: case *fundingLockedMsg:
f.wg.Add(1)
go f.handleFundingLocked(fmsg) go f.handleFundingLocked(fmsg)
case *fundingErrorMsg: case *fundingErrorMsg:
f.handleErrorMsg(fmsg) f.handleErrorMsg(fmsg)
@ -1436,18 +1459,46 @@ func (f *fundingManager) sendFundingLockedAndAnnounceChannel(
} }
fundingLockedMsg := lnwire.NewFundingLocked(chanID, nextRevocation) fundingLockedMsg := lnwire.NewFundingLocked(chanID, nextRevocation)
err = f.cfg.SendToPeer(completeChan.IdentityPub, fundingLockedMsg) // If the peer has disconnected before we reach this point, we will need
if err != nil { // to wait for him to come back online before sending the fundingLocked
fndgLog.Errorf("unable to send fundingLocked to peer: %v", err) // message. This is special for fundingLocked, since failing to send any
return // of the previous messages in the funding flow just cancels the flow.
// But now the funding transaction is confirmed, the channel is open
// and we have to make sure the peer gets the fundingLocked message when
// it comes back online. This is also crucial during restart of lnd,
// where we might try to resend the fundingLocked message before the
// server has had the time to connect to the peer. We keep trying to
// send fundingLocked until we succeed, or the fundingManager is shut
// down.
for {
err = f.cfg.SendToPeer(completeChan.IdentityPub,
fundingLockedMsg)
if err == nil {
// Sending succeeded, we can break out and continue
// the funding flow.
break
}
fndgLog.Warnf("unable to send fundingLocked to peer %x: "+
"%v. Will retry when online",
completeChan.IdentityPub.SerializeCompressed(), err)
connected := make(chan struct{})
f.cfg.NotifyWhenOnline(completeChan.IdentityPub, connected)
select {
case <-connected:
// Retry sending.
case <-f.quit:
return
}
} }
// As the fundingLocked message is now sent to the peer, the channel is // As the fundingLocked message is now sent to the peer, the channel is
// moved to the next state of the state machine. It will be moved to the // moved to the next state of the state machine. It will be moved to the
// last state (actually deleted from the database) after the channel is // last state (actually deleted from the database) after the channel is
// finally announced. // finally announced.
err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, fundingLockedSent, err = f.saveChannelOpeningState(&completeChan.FundingOutpoint,
shortChanID) fundingLockedSent, shortChanID)
if err != nil { if err != nil {
fndgLog.Errorf("error setting channel state to "+ fndgLog.Errorf("error setting channel state to "+
"fundingLockedSent: %v", err) "fundingLockedSent: %v", err)
@ -1520,17 +1571,46 @@ func (f *fundingManager) processFundingLocked(msg *lnwire.FundingLocked,
// handleFundingLocked finalizes the channel funding process and enables the // handleFundingLocked finalizes the channel funding process and enables the
// channel to enter normal operating mode. // channel to enter normal operating mode.
func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
defer f.wg.Done()
// If we are currently in the process of handling a funding locked
// message for this channel, ignore.
f.handleFundingLockedMtx.Lock()
_, ok := f.handleFundingLockedBarriers[fmsg.msg.ChanID]
if ok {
fndgLog.Infof("Already handling fundingLocked for "+
"ChannelID(%v), ignoring.", fmsg.msg.ChanID)
f.handleFundingLockedMtx.Unlock()
return
}
// If not already handling fundingLocked for this channel, set up
// barrier, and move on.
f.handleFundingLockedBarriers[fmsg.msg.ChanID] = struct{}{}
f.handleFundingLockedMtx.Unlock()
defer func() {
f.handleFundingLockedMtx.Lock()
delete(f.handleFundingLockedBarriers, fmsg.msg.ChanID)
f.handleFundingLockedMtx.Unlock()
}()
f.localDiscoveryMtx.Lock() f.localDiscoveryMtx.Lock()
localDiscoverySignal, ok := f.localDiscoverySignals[fmsg.msg.ChanID] localDiscoverySignal, ok := f.localDiscoverySignals[fmsg.msg.ChanID]
f.localDiscoveryMtx.Unlock() f.localDiscoveryMtx.Unlock()
if ok { if ok {
// Before we proceed with processing the funding locked // Before we proceed with processing the funding locked
// message, we'll wait for the lcoal waitForFundingConfirmation // message, we'll wait for the local waitForFundingConfirmation
// goroutine to signal that it has the necessary state in // goroutine to signal that it has the necessary state in
// place. Otherwise, we may be missing critical information // place. Otherwise, we may be missing critical information
// required to handle forwarded HTLC's. // required to handle forwarded HTLC's.
<-localDiscoverySignal select {
case <-localDiscoverySignal:
// Fallthrough
case <-f.quit:
return
}
// With the signal received, we can now safely delete the entry // With the signal received, we can now safely delete the entry
// from the map. // from the map.
@ -1550,7 +1630,14 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
return return
} }
// TODO(roasbeef): done nothing if repeat message sent // If the RemoteNextRevocation is non-nil, it means that we have
// already processed fundingLocked for this channel, so ignore.
if channel.RemoteNextRevocation() != nil {
fndgLog.Infof("Received duplicate fundingLocked for "+
"ChannelID(%v), ignoring.", chanID)
channel.Stop()
return
}
// The funding locked message contains the next commitment point we'll // The funding locked message contains the next commitment point we'll
// need to create the next commitment state for the remote party. So // need to create the next commitment state for the remote party. So
@ -1565,7 +1652,11 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
// With the channel retrieved, we'll send the breach arbiter the new // With the channel retrieved, we'll send the breach arbiter the new
// channel so it can watch for attempts to breach the channel's // channel so it can watch for attempts to breach the channel's
// contract by the remote party. // contract by the remote party.
f.cfg.ArbiterChan <- channel select {
case f.cfg.ArbiterChan <- channel:
case <-f.quit:
return
}
// Launch a defer so we _ensure_ that the channel barrier is properly // Launch a defer so we _ensure_ that the channel barrier is properly
// closed even if the target peer is not longer online at this point. // closed even if the target peer is not longer online at this point.
@ -1574,9 +1665,13 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
// that commitment related modifications to this channel can // that commitment related modifications to this channel can
// now proceed. // now proceed.
f.barrierMtx.Lock() f.barrierMtx.Lock()
fndgLog.Tracef("Closing chan barrier for ChanID(%v)", chanID) chanBarrier, ok := f.newChanBarriers[chanID]
close(f.newChanBarriers[chanID]) if ok {
delete(f.newChanBarriers, chanID) fndgLog.Tracef("Closing chan barrier for ChanID(%v)",
chanID)
close(chanBarrier)
delete(f.newChanBarriers, chanID)
}
f.barrierMtx.Unlock() f.barrierMtx.Unlock()
}() }()
@ -1592,7 +1687,12 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
channel: channel, channel: channel,
done: newChanDone, done: newChanDone,
} }
peer.newChannels <- newChanMsg
select {
case peer.newChannels <- newChanMsg:
case <-f.quit:
return
}
// We pause here to wait for the peer to recognize the new channel // We pause here to wait for the peer to recognize the new channel
// before we close the channel barrier corresponding to the channel. // before we close the channel barrier corresponding to the channel.

@ -100,13 +100,16 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
} }
type testNode struct { type testNode struct {
privKey *btcec.PrivateKey privKey *btcec.PrivateKey
msgChan chan lnwire.Message msgChan chan lnwire.Message
announceChan chan lnwire.Message announceChan chan lnwire.Message
publTxChan chan *wire.MsgTx arbiterChan chan *lnwallet.LightningChannel
fundingMgr *fundingManager publTxChan chan *wire.MsgTx
mockNotifier *mockNotifier fundingMgr *fundingManager
testDir string peer *peer
mockNotifier *mockNotifier
testDir string
shutdownChannel chan struct{}
} }
func disableFndgLogger(t *testing.T) { func disableFndgLogger(t *testing.T) {
@ -115,17 +118,11 @@ func disableFndgLogger(t *testing.T) {
fndgLog = btclog.Disabled fndgLog = btclog.Disabled
} }
func createTestWallet(tempTestDir string, netParams *chaincfg.Params, func createTestWallet(cdb *channeldb.DB, netParams *chaincfg.Params,
notifier chainntnfs.ChainNotifier, wc lnwallet.WalletController, notifier chainntnfs.ChainNotifier, wc lnwallet.WalletController,
signer lnwallet.Signer, bio lnwallet.BlockChainIO, signer lnwallet.Signer, bio lnwallet.BlockChainIO,
estimator lnwallet.FeeEstimator) (*lnwallet.LightningWallet, error) { estimator lnwallet.FeeEstimator) (*lnwallet.LightningWallet, error) {
dbDir := filepath.Join(tempTestDir, "cdb")
cdb, err := channeldb.Open(dbDir)
if err != nil {
return nil, err
}
wallet, err := lnwallet.NewLightningWallet(lnwallet.Config{ wallet, err := lnwallet.NewLightningWallet(lnwallet.Config{
Database: cdb, Database: cdb,
Notifier: notifier, Notifier: notifier,
@ -146,11 +143,27 @@ func createTestWallet(tempTestDir string, netParams *chaincfg.Params,
return wallet, nil return wallet, nil
} }
func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey, func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
tempTestDir string, hdSeed []byte, netParams *chaincfg.Params, tempTestDir string) (*testNode, error) {
chainNotifier chainntnfs.ChainNotifier, estimator lnwallet.FeeEstimator,
sentMessages chan lnwire.Message, sentAnnouncements chan lnwire.Message, netParams := activeNetParams.Params
publTxChan chan *wire.MsgTx, shutdownChan chan struct{}) (*fundingManager, error) { estimator := lnwallet.StaticFeeEstimator{FeeRate: 250}
chainNotifier := &mockNotifier{
confChannel: make(chan *chainntnfs.TxConfirmation, 1),
epochChan: make(chan *chainntnfs.BlockEpoch, 1),
}
newChannelsChan := make(chan *newChannelMsg)
p := &peer{
newChannels: newChannelsChan,
}
sentMessages := make(chan lnwire.Message)
sentAnnouncements := make(chan lnwire.Message)
publTxChan := make(chan *wire.MsgTx, 1)
arbiterChan := make(chan *lnwallet.LightningChannel)
shutdownChan := make(chan struct{})
wc := &mockWalletController{ wc := &mockWalletController{
rootKey: alicePrivKey, rootKey: alicePrivKey,
@ -161,17 +174,22 @@ func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey,
} }
bio := &mockChainIO{} bio := &mockChainIO{}
lnw, err := createTestWallet(tempTestDir, netParams, dbDir := filepath.Join(tempTestDir, "cdb")
cdb, err := channeldb.Open(dbDir)
if err != nil {
return nil, err
}
lnw, err := createTestWallet(cdb, netParams,
chainNotifier, wc, signer, bio, estimator) chainNotifier, wc, signer, bio, estimator)
if err != nil { if err != nil {
t.Fatalf("unable to create test ln wallet: %v", err) t.Fatalf("unable to create test ln wallet: %v", err)
} }
arbiterChan := make(chan *lnwallet.LightningChannel)
var chanIDSeed [32]byte var chanIDSeed [32]byte
f, err := newFundingManager(fundingConfig{ f, err := newFundingManager(fundingConfig{
IDKey: pubKey, IDKey: privKey.PubKey(),
Wallet: lnw, Wallet: lnw,
Notifier: chainNotifier, Notifier: chainNotifier,
FeeEstimator: estimator, FeeEstimator: estimator,
@ -198,15 +216,30 @@ func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey,
} }
return nil return nil
}, },
NotifyWhenOnline: func(peer *btcec.PublicKey, connectedChan chan<- struct{}) {
t.Fatalf("did not expect fundingManager to call NotifyWhenOnline")
},
FindPeer: func(peerKey *btcec.PublicKey) (*peer, error) { FindPeer: func(peerKey *btcec.PublicKey) (*peer, error) {
return nil, nil return p, nil
}, },
TempChanIDSeed: chanIDSeed, TempChanIDSeed: chanIDSeed,
FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) { FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) {
// This is not expected to be used in the current tests. dbChannels, err := cdb.FetchAllChannels()
// Add an implementation if that changes. if err != nil {
t.Fatal("did not expect FindChannel to be called") return nil, err
return nil, nil }
for _, channel := range dbChannels {
if chanID.IsChanPoint(&channel.FundingOutpoint) {
return lnwallet.NewLightningChannel(
signer,
nil,
estimator,
channel)
}
}
return nil, fmt.Errorf("unable to find channel")
}, },
NumRequiredConfs: func(chanAmt btcutil.Amount, NumRequiredConfs: func(chanAmt btcutil.Amount,
pushAmt lnwire.MilliSatoshi) uint16 { pushAmt lnwire.MilliSatoshi) uint16 {
@ -221,17 +254,34 @@ func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey,
t.Fatalf("failed creating fundingManager: %v", err) t.Fatalf("failed creating fundingManager: %v", err)
} }
return f, nil if err = f.Start(); err != nil {
t.Fatalf("failed starting fundingManager: %v", err)
}
return &testNode{
privKey: privKey,
msgChan: sentMessages,
announceChan: sentAnnouncements,
arbiterChan: arbiterChan,
publTxChan: publTxChan,
fundingMgr: f,
peer: p,
mockNotifier: chainNotifier,
testDir: tempTestDir,
shutdownChannel: shutdownChan,
}, nil
} }
func recreateAliceFundingManager(t *testing.T, alice *testNode) { func recreateAliceFundingManager(t *testing.T, alice *testNode) {
// Stop the old fundingManager before creating a new one. // Stop the old fundingManager before creating a new one.
close(alice.shutdownChannel)
if err := alice.fundingMgr.Stop(); err != nil { if err := alice.fundingMgr.Stop(); err != nil {
t.Fatalf("unable to stop old fundingManager: %v", err) t.Fatalf("unable to stop old fundingManager: %v", err)
} }
aliceMsgChan := make(chan lnwire.Message) aliceMsgChan := make(chan lnwire.Message)
aliceAnnounceChan := make(chan lnwire.Message) aliceAnnounceChan := make(chan lnwire.Message)
shutdownChan := make(chan struct{})
oldCfg := alice.fundingMgr.cfg oldCfg := alice.fundingMgr.cfg
@ -245,7 +295,11 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
return nil, nil return nil, nil
}, },
SendAnnouncement: func(msg lnwire.Message) error { SendAnnouncement: func(msg lnwire.Message) error {
aliceAnnounceChan <- msg select {
case aliceAnnounceChan <- msg:
case <-shutdownChan:
return fmt.Errorf("shutting down")
}
return nil return nil
}, },
CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) {
@ -254,12 +308,17 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
ArbiterChan: oldCfg.ArbiterChan, ArbiterChan: oldCfg.ArbiterChan,
SendToPeer: func(target *btcec.PublicKey, SendToPeer: func(target *btcec.PublicKey,
msgs ...lnwire.Message) error { msgs ...lnwire.Message) error {
aliceMsgChan <- msgs[0] select {
case aliceMsgChan <- msgs[0]:
case <-shutdownChan:
return fmt.Errorf("shutting down")
}
return nil return nil
}, },
FindPeer: func(peerKey *btcec.PublicKey) (*peer, error) { NotifyWhenOnline: func(peer *btcec.PublicKey, connectedChan chan<- struct{}) {
return nil, nil t.Fatalf("did not expect fundingManager to call NotifyWhenOnline")
}, },
FindPeer: oldCfg.FindPeer,
TempChanIDSeed: oldCfg.TempChanIDSeed, TempChanIDSeed: oldCfg.TempChanIDSeed,
FindChannel: oldCfg.FindChannel, FindChannel: oldCfg.FindChannel,
}) })
@ -270,97 +329,46 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
alice.fundingMgr = f alice.fundingMgr = f
alice.msgChan = aliceMsgChan alice.msgChan = aliceMsgChan
alice.announceChan = aliceAnnounceChan alice.announceChan = aliceAnnounceChan
alice.shutdownChannel = shutdownChan
if err = f.Start(); err != nil { if err = f.Start(); err != nil {
t.Fatalf("failed starting fundingManager: %v", err) t.Fatalf("failed starting fundingManager: %v", err)
} }
} }
func setupFundingManagers(t *testing.T, shutdownChannel chan struct{}) (*testNode, *testNode) { func setupFundingManagers(t *testing.T) (*testNode, *testNode) {
// We need to set the global config, as fundingManager uses // We need to set the global config, as fundingManager uses
// MaxPendingChannels, and it is usually set in lndMain(). // MaxPendingChannels, and it is usually set in lndMain().
cfg = &config{ cfg = &config{
MaxPendingChannels: defaultMaxPendingChannels, MaxPendingChannels: defaultMaxPendingChannels,
} }
netParams := activeNetParams.Params
estimator := lnwallet.StaticFeeEstimator{FeeRate: 250}
aliceMockNotifier := &mockNotifier{
confChannel: make(chan *chainntnfs.TxConfirmation, 1),
epochChan: make(chan *chainntnfs.BlockEpoch, 1),
}
aliceTestDir, err := ioutil.TempDir("", "alicelnwallet") aliceTestDir, err := ioutil.TempDir("", "alicelnwallet")
if err != nil { if err != nil {
t.Fatalf("unable to create temp directory: %v", err) t.Fatalf("unable to create temp directory: %v", err)
} }
aliceMsgChan := make(chan lnwire.Message) alice, err := createTestFundingManager(t, alicePrivKey, aliceTestDir)
aliceAnnounceChan := make(chan lnwire.Message)
alicePublTxChan := make(chan *wire.MsgTx, 1)
aliceFundingMgr, err := createTestFundingManager(t, alicePubKey,
aliceTestDir, alicePrivKeyBytes[:], netParams, aliceMockNotifier,
estimator, aliceMsgChan, aliceAnnounceChan, alicePublTxChan,
shutdownChannel)
if err != nil { if err != nil {
t.Fatalf("failed creating fundingManager: %v", err) t.Fatalf("failed creating fundingManager: %v", err)
} }
if err = aliceFundingMgr.Start(); err != nil {
t.Fatalf("failed starting fundingManager: %v", err)
}
alice := &testNode{
privKey: alicePrivKey,
msgChan: aliceMsgChan,
announceChan: aliceAnnounceChan,
publTxChan: alicePublTxChan,
fundingMgr: aliceFundingMgr,
mockNotifier: aliceMockNotifier,
testDir: aliceTestDir,
}
bobMockNotifier := &mockNotifier{
confChannel: make(chan *chainntnfs.TxConfirmation, 1),
epochChan: make(chan *chainntnfs.BlockEpoch, 1),
}
bobTestDir, err := ioutil.TempDir("", "boblnwallet") bobTestDir, err := ioutil.TempDir("", "boblnwallet")
if err != nil { if err != nil {
t.Fatalf("unable to create temp directory: %v", err) t.Fatalf("unable to create temp directory: %v", err)
} }
bobMsgChan := make(chan lnwire.Message) bob, err := createTestFundingManager(t, bobPrivKey, bobTestDir)
bobAnnounceChan := make(chan lnwire.Message)
bobPublTxChan := make(chan *wire.MsgTx, 1)
bobFundingMgr, err := createTestFundingManager(t, bobPubKey, bobTestDir,
bobPrivKeyBytes[:], netParams, bobMockNotifier, estimator,
bobMsgChan, bobAnnounceChan, bobPublTxChan, shutdownChannel)
if err != nil { if err != nil {
t.Fatalf("failed creating fundingManager: %v", err) t.Fatalf("failed creating fundingManager: %v", err)
} }
if err = bobFundingMgr.Start(); err != nil {
t.Fatalf("failed starting fundingManager: %v", err)
}
bob := &testNode{
privKey: bobPrivKey,
msgChan: bobMsgChan,
announceChan: bobAnnounceChan,
publTxChan: bobPublTxChan,
fundingMgr: bobFundingMgr,
mockNotifier: bobMockNotifier,
testDir: bobTestDir,
}
return alice, bob return alice, bob
} }
func tearDownFundingManagers(t *testing.T, a, b *testNode, shutdownChannel chan struct{}) { func tearDownFundingManagers(t *testing.T, a, b *testNode) {
close(shutdownChannel) close(a.shutdownChannel)
close(b.shutdownChannel)
if err := a.fundingMgr.Stop(); err != nil { if err := a.fundingMgr.Stop(); err != nil {
t.Fatalf("unable to stop fundingManager: %v", err) t.Fatalf("unable to stop fundingManager: %v", err)
@ -512,32 +520,8 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
return fundingOutPoint return fundingOutPoint
} }
func TestFundingManagerNormalWorkflow(t *testing.T) { func assertMarkedOpen(t *testing.T, alice, bob *testNode,
disableFndgLogger(t) fundingOutPoint *wire.OutPoint) {
shutdownChannel := make(chan struct{})
alice, bob := setupFundingManagers(t, shutdownChannel)
defer tearDownFundingManagers(t, alice, bob, shutdownChannel)
// We will consume the channel updates as we go, so no buffering is needed.
updateChan := make(chan *lnrpc.OpenStatusUpdate)
// Run through the process of opening the channel, up until the funding
// transaction is broadcasted.
fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan)
// Notify that transaction was mined
alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
// Give fundingManager time to process the newly mined tx and write
//state to database.
time.Sleep(300 * time.Millisecond)
// The funding transaction was mined, so assert that both funding
// managers now have the state of this channel 'markedOpen' in their
// internal state machine.
state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
if err != nil { if err != nil {
t.Fatalf("unable to get channel state: %v", err) t.Fatalf("unable to get channel state: %v", err)
@ -554,38 +538,33 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
if state != markedOpen { if state != markedOpen {
t.Fatalf("expected state to be markedOpen, was %v", state) t.Fatalf("expected state to be markedOpen, was %v", state)
} }
}
// After the funding transaction is mined, Alice will send func checkNodeSendingFundingLocked(t *testing.T, node *testNode) *lnwire.FundingLocked {
// fundingLocked to Bob. var msg lnwire.Message
var fundingLockedAlice lnwire.Message
select { select {
case fundingLockedAlice = <-alice.msgChan: case msg = <-node.msgChan:
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
t.Fatalf("alice did not send fundingLocked") t.Fatalf("node did not send fundingLocked")
}
if fundingLockedAlice.MsgType() != lnwire.MsgFundingLocked {
t.Fatalf("expected fundingLocked sent from Alice, "+
"instead got %T", fundingLockedAlice)
} }
// And similarly Bob will send funding locked to Alice. fundingLocked, ok := msg.(*lnwire.FundingLocked)
var fundingLockedBob lnwire.Message if !ok {
select { errorMsg, gotError := msg.(*lnwire.Error)
case fundingLockedBob = <-bob.msgChan: if gotError {
case <-time.After(time.Second * 5): t.Fatalf("expected FundingLocked to be sent "+
t.Fatalf("bob did not send fundingLocked") "from node, instead got error: %v",
lnwire.ErrorCode(errorMsg.Data[0]))
}
t.Fatalf("expected FundingLocked to be sent from node, "+
"instead got %T", msg)
} }
return fundingLocked
}
if fundingLockedBob.MsgType() != lnwire.MsgFundingLocked { func assertFundingLockedSent(t *testing.T, alice, bob *testNode,
t.Fatalf("expected fundingLocked sent from Bob, "+ fundingOutPoint *wire.OutPoint) {
"instead got %T", fundingLockedBob) state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
}
// Sleep to make sure database write is finished.
time.Sleep(300 * time.Millisecond)
// Check that the state machine is updated accordingly
state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
if err != nil { if err != nil {
t.Fatalf("unable to get channel state: %v", err) t.Fatalf("unable to get channel state: %v", err)
} }
@ -601,7 +580,9 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
if state != fundingLockedSent { if state != fundingLockedSent {
t.Fatalf("expected state to be fundingLockedSent, was %v", state) t.Fatalf("expected state to be fundingLockedSent, was %v", state)
} }
}
func assertChannelAnnouncements(t *testing.T, alice, bob *testNode) {
// After the FundingLocked message is sent, the channel will be announced. // After the FundingLocked message is sent, the channel will be announced.
// A chanAnnouncement consists of three distinct messages: // A chanAnnouncement consists of three distinct messages:
// 1) ChannelAnnouncement // 1) ChannelAnnouncement
@ -688,9 +669,9 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
if !gotNodeAnnouncement { if !gotNodeAnnouncement {
t.Fatalf("did not get NodeAnnouncement from Bob") t.Fatalf("did not get NodeAnnouncement from Bob")
} }
}
// The funding process is now finished, wait for the func waitForOpenUpdate(t *testing.T, updateChan chan *lnrpc.OpenStatusUpdate) {
// OpenStatusUpdate_ChanOpen update
var openUpdate *lnrpc.OpenStatusUpdate var openUpdate *lnrpc.OpenStatusUpdate
select { select {
case openUpdate = <-updateChan: case openUpdate = <-updateChan:
@ -702,17 +683,15 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
if !ok { if !ok {
t.Fatal("OpenStatusUpdate was not OpenStatusUpdate_ChanOpen") t.Fatal("OpenStatusUpdate was not OpenStatusUpdate_ChanOpen")
} }
}
// The internal state-machine should now have deleted the channelStates func assertNoChannelState(t *testing.T, alice, bob *testNode,
// from the database, as the channel is announced. fundingOutPoint *wire.OutPoint) {
state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
if err != ErrChannelNotFound { if err != ErrChannelNotFound {
t.Fatalf("expected to not find channel state, but got: %v", state) t.Fatalf("expected to not find channel state, but got: %v", state)
} }
// Need to give bob time to update database.
time.Sleep(300 * time.Millisecond)
state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint)
if err != ErrChannelNotFound { if err != ErrChannelNotFound {
t.Fatalf("expected to not find channel state, but got: %v", state) t.Fatalf("expected to not find channel state, but got: %v", state)
@ -720,13 +699,101 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
} }
func assertHandleFundingLocked(t *testing.T, alice, bob *testNode) {
// They should both send the new channel to the breach arbiter.
select {
case <-alice.arbiterChan:
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send channel to breach arbiter")
}
select {
case <-bob.arbiterChan:
case <-time.After(time.Second * 5):
t.Fatalf("bob did not send channel to breach arbiter")
}
// And send the new channel state to their peer.
select {
case c := <-alice.peer.newChannels:
close(c.done)
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send new channel to peer")
}
select {
case c := <-bob.peer.newChannels:
close(c.done)
case <-time.After(time.Second * 5):
t.Fatalf("bob did not send new channel to peer")
}
}
func TestFundingManagerNormalWorkflow(t *testing.T) {
disableFndgLogger(t)
alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
// We will consume the channel updates as we go, so no buffering is needed.
updateChan := make(chan *lnrpc.OpenStatusUpdate)
// Run through the process of opening the channel, up until the funding
// transaction is broadcasted.
fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan)
// Notify that transaction was mined
alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
// Give fundingManager time to process the newly mined tx and write
//state to database.
time.Sleep(300 * time.Millisecond)
// The funding transaction was mined, so assert that both funding
// managers now have the state of this channel 'markedOpen' in their
// internal state machine.
assertMarkedOpen(t, alice, bob, fundingOutPoint)
// After the funding transaction is mined, Alice will send
// fundingLocked to Bob.
fundingLockedAlice := checkNodeSendingFundingLocked(t, alice)
// And similarly Bob will send funding locked to Alice.
fundingLockedBob := checkNodeSendingFundingLocked(t, bob)
// Sleep to make sure database write is finished.
time.Sleep(300 * time.Millisecond)
// Check that the state machine is updated accordingly
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
// Make sure both fundingManagers send the expected channel announcements.
assertChannelAnnouncements(t, alice, bob)
// The funding process is now finished, wait for the
// OpenStatusUpdate_ChanOpen update
waitForOpenUpdate(t, updateChan)
// The internal state-machine should now have deleted the channelStates
// from the database, as the channel is announced.
time.Sleep(300 * time.Millisecond)
assertNoChannelState(t, alice, bob, fundingOutPoint)
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
// Check that they notify the breach arbiter and peer about the new
// channel.
assertHandleFundingLocked(t, alice, bob)
}
func TestFundingManagerRestartBehavior(t *testing.T) { func TestFundingManagerRestartBehavior(t *testing.T) {
disableFndgLogger(t) disableFndgLogger(t)
shutdownChannel := make(chan struct{}) alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
alice, bob := setupFundingManagers(t, shutdownChannel)
defer tearDownFundingManagers(t, alice, bob, shutdownChannel)
// Run through the process of opening the channel, up until the funding // Run through the process of opening the channel, up until the funding
// transaction is broadcasted. // transaction is broadcasted.
@ -738,12 +805,15 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
// before this message has been successfully sent, it should retry // before this message has been successfully sent, it should retry
// sending it on restart. We mimic this behavior by letting the // sending it on restart. We mimic this behavior by letting the
// SendToPeer method return an error, as if the message was not // SendToPeer method return an error, as if the message was not
// successfully sent. We then the fundingManager and make sure // successfully sent. We then recreate the fundingManager and make sure
// it continues the process as expected. // it continues the process as expected.
alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey, alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey,
msgs ...lnwire.Message) error { msgs ...lnwire.Message) error {
return fmt.Errorf("intentional error in SendToPeer") return fmt.Errorf("intentional error in SendToPeer")
} }
alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, con chan<- struct{}) {
// Intetionally empty.
}
// Notify that transaction was mined // Notify that transaction was mined
alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
@ -756,28 +826,12 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
// The funding transaction was mined, so assert that both funding // The funding transaction was mined, so assert that both funding
// managers now have the state of this channel 'markedOpen' in their // managers now have the state of this channel 'markedOpen' in their
// internal state machine. // internal state machine.
state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) assertMarkedOpen(t, alice, bob, fundingOutPoint)
if err != nil {
t.Fatalf("unable to get channel state: %v", err)
}
if state != markedOpen {
t.Fatalf("expected state to be markedOpen, was %v", state)
}
state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint)
if err != nil {
t.Fatalf("unable to get channel state: %v", err)
}
if state != markedOpen {
t.Fatalf("expected state to be markedOpen, was %v", state)
}
// After the funding transaction was mined, Bob should have successfully // After the funding transaction was mined, Bob should have successfully
// sent the fundingLocked message, while Alice failed sending it. In // sent the fundingLocked message, while Alice failed sending it. In
// Alice's case this means that there should be no messages for Bob, and // Alice's case this means that there should be no messages for Bob, and
// the channel should still be in state 'markedOpen' // the channel should still be in state 'markedOpen'
select { select {
case msg := <-alice.msgChan: case msg := <-alice.msgChan:
t.Fatalf("did not expect any message from Alice: %v", msg) t.Fatalf("did not expect any message from Alice: %v", msg)
@ -785,18 +839,14 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
// Expected. // Expected.
} }
// Bob will send funding locked to Alice // Bob will send funding locked to Alice.
fundingLockedBob := <-bob.msgChan fundingLockedBob := checkNodeSendingFundingLocked(t, bob)
if fundingLockedBob.MsgType() != lnwire.MsgFundingLocked {
t.Fatalf("expected fundingLocked sent from Bob, "+
"instead got %T", fundingLockedBob)
}
// Sleep to make sure database write is finished. // Sleep to make sure database write is finished.
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
// Alice should still be markedOpen // Alice should still be markedOpen
state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
if err != nil { if err != nil {
t.Fatalf("unable to get channel state: %v", err) t.Fatalf("unable to get channel state: %v", err)
} }
@ -825,11 +875,7 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
return fmt.Errorf("intentional error in SendAnnouncement") return fmt.Errorf("intentional error in SendAnnouncement")
} }
fundingLockedAlice := <-alice.msgChan fundingLockedAlice := checkNodeSendingFundingLocked(t, alice)
if fundingLockedAlice.MsgType() != lnwire.MsgFundingLocked {
t.Fatalf("expected fundingLocked sent from Alice, "+
"instead got %T", fundingLockedAlice)
}
// Sleep to make sure database write is finished. // Sleep to make sure database write is finished.
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
@ -852,116 +898,187 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
// Expected // Expected
} }
// Bob, however, should send the announcements
announcements := make([]lnwire.Message, 4)
for i := 0; i < len(announcements); i++ {
select {
case announcements[i] = <-bob.announceChan:
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send announcement %v", i)
}
}
gotChannelAnnouncement := false
gotChannelUpdate := false
gotAnnounceSignatures := false
gotNodeAnnouncement := false
for _, msg := range announcements {
switch msg.(type) {
case *lnwire.ChannelAnnouncement:
gotChannelAnnouncement = true
case *lnwire.ChannelUpdate:
gotChannelUpdate = true
case *lnwire.AnnounceSignatures:
gotAnnounceSignatures = true
case *lnwire.NodeAnnouncement:
gotNodeAnnouncement = true
}
}
if !gotChannelAnnouncement {
t.Fatalf("did not get ChannelAnnouncement from Bob")
}
if !gotChannelUpdate {
t.Fatalf("did not get ChannelUpdate from Bob")
}
if !gotAnnounceSignatures {
t.Fatalf("did not get AnnounceSignatures from Bob")
}
if !gotNodeAnnouncement {
t.Fatalf("did not get NodeAnnouncement from Bob")
}
// Next up, we check that the Alice rebroadcasts the announcement // Next up, we check that the Alice rebroadcasts the announcement
// messages on restart. // messages on restart. Bob should as expected send announcements.
recreateAliceFundingManager(t, alice) recreateAliceFundingManager(t, alice)
time.Sleep(300 * time.Millisecond) time.Sleep(300 * time.Millisecond)
for i := 0; i < len(announcements); i++ { assertChannelAnnouncements(t, alice, bob)
select {
case announcements[i] = <-alice.announceChan:
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send announcement %v", i)
}
}
gotChannelAnnouncement = false
gotChannelUpdate = false
gotAnnounceSignatures = false
gotNodeAnnouncement = false
for _, msg := range announcements {
switch msg.(type) {
case *lnwire.ChannelAnnouncement:
gotChannelAnnouncement = true
case *lnwire.ChannelUpdate:
gotChannelUpdate = true
case *lnwire.AnnounceSignatures:
gotAnnounceSignatures = true
case *lnwire.NodeAnnouncement:
gotNodeAnnouncement = true
}
}
if !gotChannelAnnouncement {
t.Fatalf("did not get ChannelAnnouncement from Alice after restart")
}
if !gotChannelUpdate {
t.Fatalf("did not get ChannelUpdate from Alice after restart")
}
if !gotAnnounceSignatures {
t.Fatalf("did not get AnnounceSignatures from Alice after restart")
}
if !gotNodeAnnouncement {
t.Fatalf("did not get NodeAnnouncement from Alice after restart")
}
// The funding process is now finished. Since we recreated the // The funding process is now finished. Since we recreated the
// fundingManager, we don't have an update channel to synchronize on, // fundingManager, we don't have an update channel to synchronize on,
// so a small sleep makes sure the database writing is finished. // so a small sleep makes sure the database writing is finished.
time.Sleep(300 * time.Millisecond) time.Sleep(300 * time.Millisecond)
// The internal state-machine should now have deleted them from the // The internal state-machine should now have deleted the channelStates
// internal database, as the channel is announced. // from the database, as the channel is announced.
state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) assertNoChannelState(t, alice, bob, fundingOutPoint)
if err != ErrChannelNotFound {
t.Fatalf("expected to not find channel state, but got: %v", state) // Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
// Check that they notify the breach arbiter and peer about the new
// channel.
assertHandleFundingLocked(t, alice, bob)
}
// TestFundingManagerOfflinePeer checks that the fundingManager waits for the
// server to notify when the peer comes online, in case sending the
// fundingLocked message fails the first time.
func TestFundingManagerOfflinePeer(t *testing.T) {
disableFndgLogger(t)
alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
// Run through the process of opening the channel, up until the funding
// transaction is broadcasted.
updateChan := make(chan *lnrpc.OpenStatusUpdate)
fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan)
// After the funding transaction gets mined, both nodes will send the
// fundingLocked message to the other peer. If the funding node fails
// to send the fundingLocked message to the peer, it should wait for
// the server to notify it that the peer is back online, and try again.
alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey,
msgs ...lnwire.Message) error {
return fmt.Errorf("intentional error in SendToPeer")
}
peerChan := make(chan *btcec.PublicKey, 1)
conChan := make(chan chan<- struct{}, 1)
alice.fundingMgr.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, connected chan<- struct{}) {
peerChan <- peer
conChan <- connected
} }
state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) // Notify that transaction was mined
if err != ErrChannelNotFound { alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
t.Fatalf("expected to not find channel state, but got: %v", state) bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
// Give fundingManager time to process the newly mined tx and write to
// the database.
time.Sleep(500 * time.Millisecond)
// The funding transaction was mined, so assert that both funding
// managers now have the state of this channel 'markedOpen' in their
// internal state machine.
assertMarkedOpen(t, alice, bob, fundingOutPoint)
// After the funding transaction was mined, Bob should have successfully
// sent the fundingLocked message, while Alice failed sending it. In
// Alice's case this means that there should be no messages for Bob, and
// the channel should still be in state 'markedOpen'
select {
case msg := <-alice.msgChan:
t.Fatalf("did not expect any message from Alice: %v", msg)
default:
// Expected.
} }
// Bob will send funding locked to Alice
fundingLockedBob := checkNodeSendingFundingLocked(t, bob)
// Sleep to make sure database write is finished.
time.Sleep(1 * time.Second)
// Alice should still be markedOpen
state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
if err != nil {
t.Fatalf("unable to get channel state: %v", err)
}
if state != markedOpen {
t.Fatalf("expected state to be markedOpen, was %v", state)
}
// While Bob successfully sent fundingLocked.
state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint)
if err != nil {
t.Fatalf("unable to get channel state: %v", err)
}
if state != fundingLockedSent {
t.Fatalf("expected state to be fundingLockedSent, was %v", state)
}
// Alice should be waiting for the server to notify when Bob somes back online.
var peer *btcec.PublicKey
var con chan<- struct{}
select {
case peer = <-peerChan:
// Expected
case <-time.After(time.Second * 3):
t.Fatalf("alice did not register peer with server")
}
select {
case con = <-conChan:
// Expected
case <-time.After(time.Second * 3):
t.Fatalf("alice did not register connectedChan with server")
}
if !peer.IsEqual(bobPubKey) {
t.Fatalf("expected to receive Bob's pubkey (%v), instead got %v",
bobPubKey, peer)
}
// Fix Alice's SendToPeer, and notify that Bob is back online.
alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey,
msgs ...lnwire.Message) error {
select {
case alice.msgChan <- msgs[0]:
case <-alice.shutdownChannel:
return fmt.Errorf("shutting down")
}
return nil
}
close(con)
// This should make Alice send the fundingLocked.
fundingLockedAlice := checkNodeSendingFundingLocked(t, alice)
// Sleep to make sure database write is finished.
time.Sleep(500 * time.Millisecond)
// The state should now be fundingLockedSent
state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint)
if err != nil {
t.Fatalf("unable to get channel state: %v", err)
}
if state != fundingLockedSent {
t.Fatalf("expected state to be fundingLockedSent, was %v", state)
}
// Make sure both fundingManagers send the expected channel announcements.
assertChannelAnnouncements(t, alice, bob)
// The funding process is now finished, wait for the
// OpenStatusUpdate_ChanOpen update
waitForOpenUpdate(t, updateChan)
// The internal state-machine should now have deleted the channelStates
// from the database, as the channel is announced.
time.Sleep(300 * time.Millisecond)
assertNoChannelState(t, alice, bob, fundingOutPoint)
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
// Check that they notify the breach arbiter and peer about the new
// channel.
assertHandleFundingLocked(t, alice, bob)
} }
func TestFundingManagerFundingTimeout(t *testing.T) { func TestFundingManagerFundingTimeout(t *testing.T) {
disableFndgLogger(t) disableFndgLogger(t)
shutdownChannel := make(chan struct{}) alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
alice, bob := setupFundingManagers(t, shutdownChannel)
defer tearDownFundingManagers(t, alice, bob, shutdownChannel)
// We will consume the channel updates as we go, so no buffering is needed. // We will consume the channel updates as we go, so no buffering is needed.
updateChan := make(chan *lnrpc.OpenStatusUpdate) updateChan := make(chan *lnrpc.OpenStatusUpdate)
@ -1016,3 +1133,250 @@ func TestFundingManagerFundingTimeout(t *testing.T) {
len(pendingChannels)) len(pendingChannels))
} }
} }
// TestFundingManagerReceiveFundingLockedTwice checks that the fundingManager
// continues to operate as expected in case we receive a duplicate fundingLocked
// message.
func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) {
disableFndgLogger(t)
alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
// We will consume the channel updates as we go, so no buffering is needed.
updateChan := make(chan *lnrpc.OpenStatusUpdate)
// Run through the process of opening the channel, up until the funding
// transaction is broadcasted.
fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan)
// Notify that transaction was mined
alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
// Give fundingManager time to process the newly mined tx and write
//state to database.
time.Sleep(300 * time.Millisecond)
// The funding transaction was mined, so assert that both funding
// managers now have the state of this channel 'markedOpen' in their
// internal state machine.
assertMarkedOpen(t, alice, bob, fundingOutPoint)
// After the funding transaction is mined, Alice will send
// fundingLocked to Bob.
fundingLockedAlice := checkNodeSendingFundingLocked(t, alice)
// And similarly Bob will send funding locked to Alice.
fundingLockedBob := checkNodeSendingFundingLocked(t, bob)
// Sleep to make sure database write is finished.
time.Sleep(300 * time.Millisecond)
// Check that the state machine is updated accordingly
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
// Make sure both fundingManagers send the expected channel announcements.
assertChannelAnnouncements(t, alice, bob)
// The funding process is now finished, wait for the
// OpenStatusUpdate_ChanOpen update
waitForOpenUpdate(t, updateChan)
// The internal state-machine should now have deleted the channelStates
// from the database, as the channel is announced.
time.Sleep(300 * time.Millisecond)
assertNoChannelState(t, alice, bob, fundingOutPoint)
// Send the fundingLocked message twice to Alice, and once to Bob.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
// Check that they notify the breach arbiter and peer about the new
// channel.
assertHandleFundingLocked(t, alice, bob)
// Alice should not send the channel state the second time, as the
// second funding locked should just be ignored.
select {
case <-alice.arbiterChan:
t.Fatalf("alice sent channel to breach arbiter a second time")
case <-time.After(time.Millisecond * 300):
// Expected
}
select {
case <-alice.peer.newChannels:
t.Fatalf("alice sent new channel to peer a second time")
case <-time.After(time.Millisecond * 300):
// Expected
}
// Another fundingLocked should also be ignored, since Alice should
// have updated her database at this point.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
select {
case <-alice.arbiterChan:
t.Fatalf("alice sent channel to breach arbiter a second time")
case <-time.After(time.Millisecond * 300):
// Expected
}
select {
case <-alice.peer.newChannels:
t.Fatalf("alice sent new channel to peer a second time")
case <-time.After(time.Millisecond * 300):
// Expected
}
}
// TestFundingManagerRestartAfterChanAnn checks that the fundingManager properly
// handles receiving a fundingLocked after the its own fundingLocked and channel
// announcement is sent and gets restarted.
func TestFundingManagerRestartAfterChanAnn(t *testing.T) {
disableFndgLogger(t)
alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
// We will consume the channel updates as we go, so no buffering is needed.
updateChan := make(chan *lnrpc.OpenStatusUpdate)
// Run through the process of opening the channel, up until the funding
// transaction is broadcasted.
fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan)
// Notify that transaction was mined
alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
// Give fundingManager time to process the newly mined tx and write
//state to database.
time.Sleep(300 * time.Millisecond)
// The funding transaction was mined, so assert that both funding
// managers now have the state of this channel 'markedOpen' in their
// internal state machine.
assertMarkedOpen(t, alice, bob, fundingOutPoint)
// After the funding transaction is mined, Alice will send
// fundingLocked to Bob.
fundingLockedAlice := checkNodeSendingFundingLocked(t, alice)
// And similarly Bob will send funding locked to Alice.
fundingLockedBob := checkNodeSendingFundingLocked(t, bob)
// Sleep to make sure database write is finished.
time.Sleep(300 * time.Millisecond)
// Check that the state machine is updated accordingly
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
// Make sure both fundingManagers send the expected channel announcements.
assertChannelAnnouncements(t, alice, bob)
// The funding process is now finished, wait for the
// OpenStatusUpdate_ChanOpen update
waitForOpenUpdate(t, updateChan)
// The internal state-machine should now have deleted the channelStates
// from the database, as the channel is announced.
time.Sleep(300 * time.Millisecond)
assertNoChannelState(t, alice, bob, fundingOutPoint)
// At this point we restart Alice's fundingManager, before she receives
// the fundingLocked message. After restart, she will receive it, and
// we expect her to be able to handle it correctly.
recreateAliceFundingManager(t, alice)
time.Sleep(300 * time.Millisecond)
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
// Check that they notify the breach arbiter and peer about the new
// channel.
assertHandleFundingLocked(t, alice, bob)
}
// TestFundingManagerRestartAfterReceivingFundingLocked checks that the
// fundingManager continues to operate as expected after it has received
// fundingLocked and then gets restarted.
func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) {
disableFndgLogger(t)
alice, bob := setupFundingManagers(t)
defer tearDownFundingManagers(t, alice, bob)
// We will consume the channel updates as we go, so no buffering is needed.
updateChan := make(chan *lnrpc.OpenStatusUpdate)
// Run through the process of opening the channel, up until the funding
// transaction is broadcasted.
fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan)
// Notify that transaction was mined
alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{}
// Give fundingManager time to process the newly mined tx and write
//state to database.
time.Sleep(300 * time.Millisecond)
// The funding transaction was mined, so assert that both funding
// managers now have the state of this channel 'markedOpen' in their
// internal state machine.
assertMarkedOpen(t, alice, bob, fundingOutPoint)
// After the funding transaction is mined, Alice will send
// fundingLocked to Bob.
fundingLockedAlice := checkNodeSendingFundingLocked(t, alice)
// And similarly Bob will send funding locked to Alice.
fundingLockedBob := checkNodeSendingFundingLocked(t, bob)
// Sleep to make sure database write is finished.
time.Sleep(300 * time.Millisecond)
// Check that the state machine is updated accordingly
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
// Let Alice immediately get the fundingLocked message.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
time.Sleep(300 * time.Millisecond)
// She will block waiting for local channel announcements to finish
// before sending the new channel state to the peer.
select {
case <-alice.peer.newChannels:
t.Fatalf("did not expect alice to handle the fundinglocked")
case <-time.After(time.Millisecond * 300):
}
// At this point we restart Alice's fundingManager. Bob will resend
// the fundingLocked after the connection is re-established.
recreateAliceFundingManager(t, alice)
time.Sleep(300 * time.Millisecond)
// Simulate Bob resending the message when Alice is back up.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bobAddr)
// Make sure both fundingManagers send the expected channel announcements.
assertChannelAnnouncements(t, alice, bob)
// The funding process is now finished. Since we recreated the
// fundingManager, we don't have an update channel to synchronize on,
// so a small sleep makes sure the database writing is finished.
time.Sleep(300 * time.Millisecond)
// The internal state-machine should now have deleted the channelStates
// from the database, as the channel is announced.
assertNoChannelState(t, alice, bob, fundingOutPoint)
// Also let Bob get the fundingLocked message.
bob.fundingMgr.processFundingLocked(fundingLockedAlice, aliceAddr)
// Check that they notify the breach arbiter and peer about the new
// channel.
assertHandleFundingLocked(t, alice, bob)
}

@ -322,7 +322,26 @@ func (l *channelLink) htlcManager() {
// TODO(roasbeef): fail chan in case of protocol violation // TODO(roasbeef): fail chan in case of protocol violation
// TODO(roasbeef): resend funding locked if state zero // If the number of updates on this channel has been zero, we should
// resend the fundingLocked message. This is because in this case we
// cannot be sure if the peer really received the last fundingLocked we
// sent, so resend now.
if l.channel.StateSnapshot().NumUpdates == 0 {
log.Debugf("Resending fundingLocked message to peer.")
nextRevocation, err := l.channel.NextRevocationKey()
if err != nil {
log.Errorf("unable to create next revocation: %v", err)
}
fundingLockedMsg := lnwire.NewFundingLocked(l.ChanID(),
nextRevocation)
err = l.cfg.Peer.SendMessage(fundingLockedMsg)
if err != nil {
log.Errorf("failed resending fundingLocked to peer: %v",
err)
}
}
out: out:
for { for {

@ -280,6 +280,9 @@ func (s *mockServer) readHandler(message lnwire.Message) error {
targetChan = msg.ChanID targetChan = msg.ChanID
case *lnwire.CommitSig: case *lnwire.CommitSig:
targetChan = msg.ChanID targetChan = msg.ChanID
case *lnwire.FundingLocked:
// Ignore
return nil
default: default:
return errors.New("unknown message type") return errors.New("unknown message type")
} }

9
lnd.go

@ -186,10 +186,11 @@ func lndMain() error {
idPrivKey.PubKey()) idPrivKey.PubKey())
return <-errChan return <-errChan
}, },
ArbiterChan: server.breachArbiter.newContracts, ArbiterChan: server.breachArbiter.newContracts,
SendToPeer: server.SendToPeer, SendToPeer: server.SendToPeer,
FindPeer: server.FindPeer, NotifyWhenOnline: server.NotifyWhenOnline,
TempChanIDSeed: chanIDSeed, FindPeer: server.FindPeer,
TempChanIDSeed: chanIDSeed,
FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) { FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) {
dbChannels, err := chanDB.FetchAllChannels() dbChannels, err := chanDB.FetchAllChannels()
if err != nil { if err != nil {

@ -4000,3 +4000,11 @@ func CreateCooperativeCloseTx(fundingTxIn *wire.TxIn,
func (lc *LightningChannel) CalcFee(feeRate uint64) uint64 { func (lc *LightningChannel) CalcFee(feeRate uint64) uint64 {
return (feeRate * uint64(commitWeight)) / 1000 return (feeRate * uint64(commitWeight)) / 1000
} }
// RemoteNextRevocation returns the channelState's RemoteNextRevocation.
func (lc *LightningChannel) RemoteNextRevocation() *btcec.PublicKey {
lc.Lock()
defer lc.Unlock()
return lc.channelState.RemoteNextRevocation
}

13
peer.go

@ -1003,10 +1003,19 @@ out:
chanID := lnwire.NewChanIDFromOutPoint(chanPoint) chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
newChan := newChanReq.channel newChan := newChanReq.channel
// First, we'll add this channel to the set of active // Make sure this channel is not already active.
p.activeChanMtx.Lock()
if _, ok := p.activeChannels[chanID]; ok {
peerLog.Infof("Already have ChannelPoint(%v), ignoring.", chanPoint)
p.activeChanMtx.Unlock()
close(newChanReq.done)
newChanReq.channel.Stop()
continue
}
// If not already active, we'll add this channel to the set of active
// channels, so we can look it up later easily // channels, so we can look it up later easily
// according to its channel ID. // according to its channel ID.
p.activeChanMtx.Lock()
p.activeChannels[chanID] = newChan p.activeChannels[chanID] = newChan
p.activeChanMtx.Unlock() p.activeChanMtx.Unlock()

@ -57,6 +57,8 @@ type server struct {
inboundPeers map[string]*peer inboundPeers map[string]*peer
outboundPeers map[string]*peer outboundPeers map[string]*peer
peerConnectedListeners map[string][]chan<- struct{}
persistentPeers map[string]struct{} persistentPeers map[string]struct{}
persistentConnReqs map[string][]*connmgr.ConnReq persistentConnReqs map[string][]*connmgr.ConnReq
@ -134,10 +136,11 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl,
persistentPeers: make(map[string]struct{}), persistentPeers: make(map[string]struct{}),
persistentConnReqs: make(map[string][]*connmgr.ConnReq), persistentConnReqs: make(map[string][]*connmgr.ConnReq),
peersByID: make(map[int32]*peer), peersByID: make(map[int32]*peer),
peersByPub: make(map[string]*peer), peersByPub: make(map[string]*peer),
inboundPeers: make(map[string]*peer), inboundPeers: make(map[string]*peer),
outboundPeers: make(map[string]*peer), outboundPeers: make(map[string]*peer),
peerConnectedListeners: make(map[string][]chan<- struct{}),
globalFeatures: globalFeatures, globalFeatures: globalFeatures,
localFeatures: localFeatures, localFeatures: localFeatures,
@ -860,6 +863,33 @@ func (s *server) SendToPeer(target *btcec.PublicKey,
return s.sendToPeer(target, msgs) return s.sendToPeer(target, msgs)
} }
// NotifyWhenOnline can be called by other subsystems to get notified when a
// particular peer comes online.
//
// NOTE: This function is safe for concurrent access.
func (s *server) NotifyWhenOnline(peer *btcec.PublicKey,
connectedChan chan<- struct{}) {
s.mu.Lock()
defer s.mu.Unlock()
// Compute the target peer's identifier.
pubStr := string(peer.SerializeCompressed())
// Check if peer is connected.
_, ok := s.peersByPub[pubStr]
if ok {
// Connected, can return early.
srvrLog.Debugf("Notifying that peer %v is online", pubStr)
close(connectedChan)
return
}
// Not connected, store this listener such that it can be notified when
// the peer comes online.
s.peerConnectedListeners[pubStr] = append(
s.peerConnectedListeners[pubStr], connectedChan)
}
// sendToPeer is an internal method that delivers messages to the specified // sendToPeer is an internal method that delivers messages to the specified
// `target` peer. // `target` peer.
func (s *server) sendToPeer(target *btcec.PublicKey, func (s *server) sendToPeer(target *btcec.PublicKey,
@ -1272,6 +1302,12 @@ func (s *server) addPeer(p *peer) {
// channel router so we can synchronize our view of the channel graph // channel router so we can synchronize our view of the channel graph
// with this new peer. // with this new peer.
go s.authGossiper.SynchronizeNode(p.addr.IdentityKey) go s.authGossiper.SynchronizeNode(p.addr.IdentityKey)
// Check if there are listeners waiting for this peer to come online.
for _, con := range s.peerConnectedListeners[pubStr] {
close(con)
}
delete(s.peerConnectedListeners, pubStr)
} }
// removePeer removes the passed peer from the server's state of all active // removePeer removes the passed peer from the server's state of all active