diff --git a/contractcourt/channel_arbitrator.go b/contractcourt/channel_arbitrator.go index cd6fc2a9..83816f1b 100644 --- a/contractcourt/channel_arbitrator.go +++ b/contractcourt/channel_arbitrator.go @@ -85,8 +85,10 @@ type ChannelArbitratorConfig struct { // ForceCloseChan should force close the contract that this attendant // is watching over. We'll use this when we decide that we need to go - // to chain. The returned summary contains all items needed to - // eventually resolve all outputs on chain. + // to chain. It should in addition tell the switch to remove the + // corresponding link, such that we won't accept any new updates. The + // returned summary contains all items needed to eventually resolve all + // outputs on chain. ForceCloseChan func() (*lnwallet.LocalForceCloseSummary, error) // MarkCommitmentBroadcasted should mark the channel as the commitment @@ -434,9 +436,10 @@ func (c *ChannelArbitrator) stateStep(triggerHeight uint32, // Now that we have all the actions decided for the set of // HTLC's, we'll broadcast the commitment transaction, and // signal the link to exit. - // - // TODO(roasbeef): need to report to switch that channel is - // inactive, should close link + + // We'll tell the switch that it should remove the link for + // this channel, in addition to fetching the force close + // summary needed to close this channel on chain. closeSummary, err := c.cfg.ForceCloseChan() if err != nil { log.Errorf("ChannelArbitrator(%v): unable to "+ diff --git a/htlcswitch/hodl/config.go b/htlcswitch/hodl/config.go index e14eb7ea..22a65b52 100644 --- a/htlcswitch/hodl/config.go +++ b/htlcswitch/hodl/config.go @@ -21,6 +21,8 @@ type Config struct { FailOutgoing bool `long:"fail-outgoing" description:"Instructs the node to drop outgoing FAILs before applying them to the channel state"` Commit bool `long:"commit" description:"Instructs the node to add HTLCs to its local commitment state and to open circuits for any ADDs, but abort before committing the changes"` + + BogusSettle bool `long:"bogus-settle" description:"Instructs the node to settle back any incoming HTLC with a bogus preimage"` } // Mask extracts the flags specified in the configuration, composing a Mask from @@ -52,6 +54,9 @@ func (c *Config) Mask() Mask { if c.Commit { flags = append(flags, Commit) } + if c.BogusSettle { + flags = append(flags, BogusSettle) + } // NOTE: The value returned here will only honor the configuration if // the debug build flag is present. In production, this method always diff --git a/htlcswitch/hodl/flags.go b/htlcswitch/hodl/flags.go index 688999f3..7fed7d09 100644 --- a/htlcswitch/hodl/flags.go +++ b/htlcswitch/hodl/flags.go @@ -51,6 +51,10 @@ const ( // Commit drops all HTLC after any outgoing circuits have been // opened, but before the in-memory commitment state is persisted. Commit + + // BogusSettle attempts to settle back any incoming HTLC for which we + // are the exit node with a bogus preimage. + BogusSettle ) // String returns a human-readable identifier for a given Flag. @@ -72,6 +76,8 @@ func (f Flag) String() string { return "FailOutgoing" case Commit: return "Commit" + case BogusSettle: + return "BogusSettle" default: return "UnknownHodlFlag" } @@ -98,6 +104,8 @@ func (f Flag) Warning() string { msg = "will not update channel state with downstream FAIL" case Commit: msg = "will not commit pending channel updates" + case BogusSettle: + msg = "will settle HTLC with bogus preimage" default: msg = "incorrect hodl flag usage" } diff --git a/htlcswitch/hodl/mask_test.go b/htlcswitch/hodl/mask_test.go index 7becd457..cf29d19b 100644 --- a/htlcswitch/hodl/mask_test.go +++ b/htlcswitch/hodl/mask_test.go @@ -67,6 +67,7 @@ var hodlMaskTests = []struct { hodl.SettleOutgoing, hodl.FailOutgoing, hodl.Commit, + hodl.BogusSettle, ), flags: map[hodl.Flag]struct{}{ hodl.ExitSettle: {}, @@ -77,6 +78,7 @@ var hodlMaskTests = []struct { hodl.SettleOutgoing: {}, hodl.FailOutgoing: {}, hodl.Commit: {}, + hodl.BogusSettle: {}, }, }, } diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 645b33dc..1cd24f52 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -130,10 +130,6 @@ type Peer interface { // PubKey returns the serialize public key of the source peer. PubKey() [33]byte - - // Disconnect disconnects with peer if we have error which we can't - // properly handle. - Disconnect(reason error) } // ForwardingLog is an interface that represents a time series database which diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 66c225da..156e2d93 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -32,15 +32,6 @@ const ( expiryGraceDelta = 2 ) -var ( - // ErrInternalLinkFailure is a generic error returned to the remote - // party so as to obfuscate the true failure. - ErrInternalLinkFailure = errors.New("internal link failure") - - // ErrLinkShuttingDown signals that the link is shutting down. - ErrLinkShuttingDown = errors.New("link shutting down") -) - // ForwardingPolicy describes the set of constraints that a given ChannelLink // is to adhere to when forwarding HTLC's. For each incoming HTLC, this set of // constraints will be consulted in order to ensure that adequate fees are @@ -179,6 +170,16 @@ type ChannelLinkConfig struct { // subscribed to new events. PreimageCache contractcourt.WitnessBeacon + // OnChannelFailure is a function closure that we'll call if the + // channel failed for some reason. Depending on the severity of the + // error, the closure potentially must force close this channel and + // disconnect the peer. + // + // NOTE: The method must return in order for the ChannelLink to be able + // to shut down properly. + OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID, + LinkFailureError) + // UpdateContractSignals is a function closure that we'll use to update // outside sub-systems with the latest signals for our inner Lightning // channel. These signals will notify the caller when the channel has @@ -253,6 +254,10 @@ type channelLink struct { started int32 shutdown int32 + // failed should be set to true in case a link error happens, making + // sure we don't process any more updates. + failed bool + // batchCounter is the number of updates which we received from remote // side, but not include in commitment transaction yet and plus the // current number of settles that have been sent, but not yet committed @@ -694,6 +699,14 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) { fwdPkg.Source, fwdPkg.Height, fwdPkg.Adds, ) needUpdate = l.processRemoteAdds(fwdPkg, adds) + + // If the link failed during processing the adds, we must + // return to ensure we won't attempted to update the state + // further. + if l.failed { + return false, fmt.Errorf("link failed while " + + "processing remote adds") + } } return needUpdate, nil @@ -770,7 +783,10 @@ func (l *channelLink) htlcManager() { if err != nil { l.errorf("unable to synchronize channel states: %v", err) if err != ErrLinkShuttingDown { - l.fail(err.Error()) + // TODO(halseth): must be revisted when + // data-loss protection is in. + l.fail(LinkFailureError{code: ErrSyncError}, + err.Error()) } return } @@ -787,8 +803,8 @@ func (l *channelLink) htlcManager() { // replay our forwarding packages to handle any htlcs that can be // processed locally, or need to be forwarded out to the switch. if err := l.resolveFwdPkgs(); err != nil { - l.errorf("unable to resolve fwd pkgs: %v", err) - l.fail(ErrInternalLinkFailure.Error()) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to resolve fwd pkgs: %v", err) return } @@ -801,9 +817,16 @@ func (l *channelLink) htlcManager() { batchTick := l.cfg.BatchTicker.Start() defer l.cfg.BatchTicker.Stop() - // TODO(roasbeef): fail chan in case of protocol violation out: for { + + // We must always check if we failed at some point processing + // the last update before processing the next. + if l.failed { + l.errorf("link failed, exiting htlcManager") + break out + } + select { // A new block has arrived, we'll check the network fee to see @@ -876,7 +899,8 @@ out: } if err := l.updateCommitTx(); err != nil { - l.fail("unable to update commitment: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to update commitment: %v", err) break out } @@ -893,7 +917,8 @@ out: // update, waiting for the revocation window to open // up. if err := l.updateCommitTx(); err != nil { - l.fail("unable to update commitment: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to update commitment: %v", err) break out } @@ -1090,8 +1115,8 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { pkt.destRef, &closedCircuitRef, ); err != nil { - // TODO(roasbeef): broadcast on-chain - l.fail("unable to settle incoming HTLC: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to settle incoming HTLC: %v", err) return } @@ -1159,7 +1184,8 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // this is a settle request, then initiate an update. if l.batchCounter >= l.cfg.BatchSize || isSettle { if err := l.updateCommitTx(); err != nil { - l.fail("unable to update commitment: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to update commitment: %v", err) return } } @@ -1177,7 +1203,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // "settle" list in the event that we know the preimage. index, err := l.channel.ReceiveHTLC(msg) if err != nil { - l.fail("unable to handle upstream add HTLC: %v", err) + l.fail(LinkFailureError{code: ErrInvalidUpdate}, + "unable to handle upstream add HTLC: %v", err) return } @@ -1188,8 +1215,13 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { pre := msg.PaymentPreimage idx := msg.ID if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil { - // TODO(roasbeef): broadcast on-chain - l.fail("unable to handle upstream settle HTLC: %v", err) + l.fail( + LinkFailureError{ + code: ErrInvalidUpdate, + ForceClose: true, + }, + "unable to handle upstream settle HTLC: %v", err, + ) return } @@ -1243,7 +1275,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // message to the usual HTLC fail message. err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes()) if err != nil { - l.fail("unable to handle upstream fail HTLC: %v", err) + l.fail(LinkFailureError{code: ErrInvalidUpdate}, + "unable to handle upstream fail HTLC: %v", err) return } @@ -1251,7 +1284,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { idx := msg.ID err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:]) if err != nil { - l.fail("unable to handle upstream fail HTLC: %v", err) + l.fail(LinkFailureError{code: ErrInvalidUpdate}, + "unable to handle upstream fail HTLC: %v", err) return } @@ -1265,28 +1299,23 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // commitment, then we'll examine the type of error. If // it's an InvalidCommitSigError, then we'll send a // direct error. - // - // TODO(roasbeef): force close chan - var sendErr bool + var sendData []byte switch err.(type) { case *lnwallet.InvalidCommitSigError: - sendErr = true + sendData = []byte(err.Error()) case *lnwallet.InvalidHtlcSigError: - sendErr = true + sendData = []byte(err.Error()) } - if sendErr { - err := l.cfg.Peer.SendMessage(&lnwire.Error{ - ChanID: l.ChanID(), - Data: []byte(err.Error()), - }, true) - if err != nil { - l.errorf("unable to send msg to "+ - "remote peer: %v", err) - } - } - - l.fail("ChannelPoint(%v): unable to accept new "+ - "commitment: %v", l.channel.ChannelPoint(), err) + l.fail( + LinkFailureError{ + code: ErrInvalidCommitment, + ForceClose: true, + SendData: sendData, + }, + "ChannelPoint(%v): unable to accept new "+ + "commitment: %v", + l.channel.ChannelPoint(), err, + ) return } @@ -1335,7 +1364,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // so we'll reply with a signature to provide them with their // version of the latest commitment. if err := l.updateCommitTx(); err != nil { - l.fail("unable to update commitment: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to update commitment: %v", err) return } @@ -1345,16 +1375,26 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // revocation window. fwdPkg, adds, settleFails, err := l.channel.ReceiveRevocation(msg) if err != nil { - l.fail("unable to accept revocation: %v", err) + // TODO(halseth): force close? + l.fail(LinkFailureError{code: ErrInvalidRevocation}, + "unable to accept revocation: %v", err) return } l.processRemoteSettleFails(fwdPkg, settleFails) - needUpdate := l.processRemoteAdds(fwdPkg, adds) + + // If the link failed during processing the adds, we must + // return to ensure we won't attempted to update the state + // further. + if l.failed { + return + } + if needUpdate { if err := l.updateCommitTx(); err != nil { - l.fail("unable to update commitment: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to update commitment: %v", err) return } } @@ -1364,7 +1404,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // will fail the channel, if not we will apply the update. fee := lnwallet.SatPerKWeight(msg.FeePerKw) if err := l.channel.ReceiveUpdateFee(fee); err != nil { - l.fail("error receiving fee update: %v", err) + l.fail(LinkFailureError{code: ErrInvalidUpdate}, + "error receiving fee update: %v", err) return } case *lnwire.Error: @@ -1375,7 +1416,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { if isASCII(msg.Data) { errMsg = string(msg.Data) } - l.fail("ChannelPoint(%v): received error from peer: %v", + l.fail(LinkFailureError{code: ErrRemoteError}, + "ChannelPoint(%v): received error from peer: %v", l.channel.ChannelPoint(), errMsg) default: log.Warnf("ChannelPoint(%v): received unknown message of type %T", @@ -1936,8 +1978,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, fwdPkg.ID(), decodeReqs, ) if sphinxErr != nil { - l.errorf("unable to decode hop iterators: %v", sphinxErr) - l.fail(ErrInternalLinkFailure.Error()) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to decode hop iterators: %v", sphinxErr) return false } @@ -2174,7 +2216,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, err = l.channel.SettleHTLC(preimage, pd.HtlcIndex, pd.SourceRef, nil, nil) if err != nil { - l.fail("unable to settle htlc: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to settle htlc: %v", err) return false } @@ -2182,12 +2225,23 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, // settled with this latest commitment update. err = l.cfg.Registry.SettleInvoice(invoiceHash) if err != nil { - l.fail("unable to settle invoice: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to settle invoice: %v", err) return false } l.infof("settling %x as exit hop", pd.RHash) + // If the link is in hodl.BogusSettle mode, replace the + // preimage with a fake one before sending it to the + // peer. + if l.cfg.DebugHTLC && + l.cfg.HodlMask.Active(hodl.BogusSettle) { + l.warnf(hodl.BogusSettle.Warning()) + preimage = [32]byte{} + copy(preimage[:], bytes.Repeat([]byte{2}, 32)) + } + // HTLC was successfully settled locally send // notification about it remote peer. l.cfg.Peer.SendMessage(&lnwire.UpdateFulfillHTLC{ @@ -2312,8 +2366,11 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, l.ShortChanID(), ) if err != nil { - l.fail("unable to create channel update "+ - "while handling the error: %v", err) + l.fail(LinkFailureError{ + code: ErrInternalError}, + "unable to create channel "+ + "update while handling "+ + "the error: %v", err) return false } @@ -2398,7 +2455,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, if fwdPkg.State == channeldb.FwdStateLockedIn { err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter) if err != nil { - l.fail("unable to set fwd filter: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to set fwd filter: %v", err) return false } } @@ -2503,12 +2561,28 @@ func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64, }, false) } -// fail helper function which is used to encapsulate the action necessary for -// proper disconnect. -func (l *channelLink) fail(format string, a ...interface{}) { +// fail is a function which is used to encapsulate the action necessary for +// properly failing the link. It takes a LinkFailureError, which will be passed +// to the OnChannelFailure closure, in order for it to determine if we should +// force close the channel, and if we should send an error message to the +// remote peer. +func (l *channelLink) fail(linkErr LinkFailureError, + format string, a ...interface{}) { reason := errors.Errorf(format, a...) - log.Error(reason) - go l.cfg.Peer.Disconnect(reason) + + // Return if we have already notified about a failure. + if l.failed { + l.warnf("Ignoring link failure (%v), as link already failed", + reason) + return + } + + l.errorf("Failing link: %s", reason) + + // Set failed, such that we won't process any more updates, and notify + // the peer about the failure. + l.failed = true + l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr) } // infof prefixes the channel's identifier before printing to info log. diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index ef6136dc..e0f3c07e 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -15,6 +15,7 @@ import ( "math" + "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" @@ -1390,11 +1391,17 @@ func TestChannelLinkSingleHopMessageOrdering(t *testing.T) { type mockPeer struct { sync.Mutex - sentMsgs chan lnwire.Message - quit chan struct{} + disconnected bool + sentMsgs chan lnwire.Message + quit chan struct{} } +var _ Peer = (*mockPeer)(nil) + func (m *mockPeer) SendMessage(msg lnwire.Message, sync bool) error { + if m.disconnected { + return fmt.Errorf("disconnected") + } select { case m.sentMsgs <- msg: case <-m.quit: @@ -1408,18 +1415,16 @@ func (m *mockPeer) WipeChannel(*wire.OutPoint) error { func (m *mockPeer) PubKey() [33]byte { return [33]byte{} } -func (m *mockPeer) Disconnect(reason error) { -} var _ Peer = (*mockPeer)(nil) func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( - ChannelLink, *lnwallet.LightningChannel, chan time.Time, func(), - chanRestoreFunc, error) { + ChannelLink, *lnwallet.LightningChannel, chan time.Time, func() error, + func(), chanRestoreFunc, error) { var chanIDBytes [8]byte if _, err := io.ReadFull(rand.Reader, chanIDBytes[:]); err != nil { - return nil, nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, nil, err } chanID := lnwire.NewShortChanIDFromInt( @@ -1430,7 +1435,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( chanReserve, chanReserve, chanID, ) if err != nil { - return nil, nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, nil, err } var ( @@ -1461,7 +1466,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( aliceDb := aliceChannel.State().Db aliceSwitch, err := initSwitchWithDB(aliceDb) if err != nil { - return nil, nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, nil, err } t := make(chan time.Time) @@ -1479,6 +1484,8 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( }, FetchLastChannelUpdate: mockGetChanUpdateMessage, PreimageCache: pCache, + OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) { + }, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, @@ -1494,8 +1501,8 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( const startingHeight = 100 aliceLink := NewChannelLink(aliceCfg, aliceChannel, startingHeight) - if err := aliceSwitch.AddLink(aliceLink); err != nil { - return nil, nil, nil, nil, nil, err + start := func() error { + return aliceSwitch.AddLink(aliceLink) } go func() { for { @@ -1514,7 +1521,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( defer bobChannel.Stop() } - return aliceLink, bobChannel, t, cleanUp, restore, nil + return aliceLink, bobChannel, t, start, cleanUp, restore, nil } func assertLinkBandwidth(t *testing.T, link ChannelLink, @@ -1686,13 +1693,17 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { // We'll start the test by creating a single instance of const chanAmt = btcutil.SatoshiPerBitcoin * 5 - aliceLink, bobChannel, tmr, cleanUp, _, err := + aliceLink, bobChannel, tmr, start, cleanUp, _, err := newSingleLinkTestHarness(chanAmt, 0) if err != nil { t.Fatalf("unable to create link: %v", err) } defer cleanUp() + if err := start(); err != nil { + t.Fatalf("unable to start test harness: %v", err) + } + var ( carolChanID = lnwire.NewShortChanIDFromInt(3) mockBlob [lnwire.OnionPacketSize]byte @@ -2104,13 +2115,17 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { var mockBlob [lnwire.OnionPacketSize]byte const chanAmt = btcutil.SatoshiPerBitcoin * 5 - aliceLink, bobChannel, batchTick, cleanUp, _, err := + aliceLink, bobChannel, batchTick, start, cleanUp, _, err := newSingleLinkTestHarness(chanAmt, 0) if err != nil { t.Fatalf("unable to create link: %v", err) } defer cleanUp() + if err := start(); err != nil { + t.Fatalf("unable to start test harness: %v", err) + } + var ( coreLink = aliceLink.(*channelLink) defaultCommitFee = coreLink.channel.StateSnapshot().CommitFee @@ -2353,13 +2368,17 @@ func TestChannelLinkTrimCircuitsPending(t *testing.T) { // We'll start by creating a new link with our chanAmt (5 BTC). We will // only be testing Alice's behavior, so the reference to Bob's channel // state is unnecessary. - aliceLink, _, batchTicker, cleanUp, restore, err := + aliceLink, _, batchTicker, start, cleanUp, restore, err := newSingleLinkTestHarness(chanAmt, 0) if err != nil { t.Fatalf("unable to create link: %v", err) } defer cleanUp() + if err := start(); err != nil { + t.Fatalf("unable to start test harness: %v", err) + } + alice := newPersistentLinkHarness(t, aliceLink, batchTicker, restore) // Compute the static fees that will be used to determine the @@ -2619,13 +2638,17 @@ func TestChannelLinkTrimCircuitsNoCommit(t *testing.T) { // We'll start by creating a new link with our chanAmt (5 BTC). We will // only be testing Alice's behavior, so the reference to Bob's channel // state is unnecessary. - aliceLink, _, batchTicker, cleanUp, restore, err := + aliceLink, _, batchTicker, start, cleanUp, restore, err := newSingleLinkTestHarness(chanAmt, 0) if err != nil { t.Fatalf("unable to create link: %v", err) } defer cleanUp() + if err := start(); err != nil { + t.Fatalf("unable to start test harness: %v", err) + } + alice := newPersistentLinkHarness(t, aliceLink, batchTicker, restore) // We'll put Alice into hodl.Commit mode, such that the circuits for any @@ -2876,13 +2899,17 @@ func TestChannelLinkBandwidthChanReserve(t *testing.T) { // channel reserve. const chanAmt = btcutil.SatoshiPerBitcoin * 5 const chanReserve = btcutil.SatoshiPerBitcoin * 1 - aliceLink, bobChannel, batchTimer, cleanUp, _, err := + aliceLink, bobChannel, batchTimer, start, cleanUp, _, err := newSingleLinkTestHarness(chanAmt, chanReserve) if err != nil { t.Fatalf("unable to create link: %v", err) } defer cleanUp() + if err := start(); err != nil { + t.Fatalf("unable to start test harness: %v", err) + } + var ( mockBlob [lnwire.OnionPacketSize]byte coreLink = aliceLink.(*channelLink) @@ -2991,13 +3018,17 @@ func TestChannelLinkBandwidthChanReserve(t *testing.T) { // should therefore be 0. const bobChanAmt = btcutil.SatoshiPerBitcoin * 1 const bobChanReserve = btcutil.SatoshiPerBitcoin * 1.5 - bobLink, _, _, bobCleanUp, _, err := + bobLink, _, _, start, bobCleanUp, _, err := newSingleLinkTestHarness(bobChanAmt, bobChanReserve) if err != nil { t.Fatalf("unable to create link: %v", err) } defer bobCleanUp() + if err := start(); err != nil { + t.Fatalf("unable to start test harness: %v", err) + } + // Make sure bandwidth is reported as 0. assertLinkBandwidth(t, bobLink, 0) } @@ -4070,13 +4101,17 @@ func TestChannelLinkNoMoreUpdates(t *testing.T) { const chanAmt = btcutil.SatoshiPerBitcoin * 5 const chanReserve = btcutil.SatoshiPerBitcoin * 1 - aliceLink, bobChannel, _, cleanUp, _, err := + aliceLink, bobChannel, _, start, cleanUp, _, err := newSingleLinkTestHarness(chanAmt, chanReserve) if err != nil { t.Fatalf("unable to create link: %v", err) } defer cleanUp() + if err := start(); err != nil { + t.Fatalf("unable to start test harness: %v", err) + } + var ( coreLink = aliceLink.(*channelLink) aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs @@ -4158,13 +4193,17 @@ func TestChannelLinkWaitForRevocation(t *testing.T) { const chanAmt = btcutil.SatoshiPerBitcoin * 5 const chanReserve = btcutil.SatoshiPerBitcoin * 1 - aliceLink, bobChannel, _, cleanUp, _, err := + aliceLink, bobChannel, _, start, cleanUp, _, err := newSingleLinkTestHarness(chanAmt, chanReserve) if err != nil { t.Fatalf("unable to create link: %v", err) } defer cleanUp() + if err := start(); err != nil { + t.Fatalf("unable to start test harness: %v", err) + } + var ( coreLink = aliceLink.(*channelLink) aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs @@ -4256,3 +4295,223 @@ func TestChannelLinkWaitForRevocation(t *testing.T) { case <-time.After(50 * time.Millisecond): } } + +type mockPackager struct { + failLoadFwdPkgs bool +} + +func (*mockPackager) AddFwdPkg(tx *bolt.Tx, fwdPkg *channeldb.FwdPkg) error { + return nil +} + +func (*mockPackager) SetFwdFilter(tx *bolt.Tx, height uint64, + fwdFilter *channeldb.PkgFilter) error { + return nil +} + +func (*mockPackager) AckAddHtlcs(tx *bolt.Tx, + addRefs ...channeldb.AddRef) error { + return nil +} + +func (m *mockPackager) LoadFwdPkgs(tx *bolt.Tx) ([]*channeldb.FwdPkg, error) { + if m.failLoadFwdPkgs { + return nil, fmt.Errorf("failing LoadFwdPkgs") + } + return nil, nil +} + +func (*mockPackager) RemovePkg(tx *bolt.Tx, height uint64) error { + return nil +} + +func (*mockPackager) AckSettleFails(tx *bolt.Tx, + settleFailRefs ...channeldb.SettleFailRef) error { + return nil +} + +// TestChannelLinkFail tests that we will fail the channel, and force close the +// channel in certain situations. +func TestChannelLinkFail(t *testing.T) { + t.Parallel() + + testCases := []struct { + // options is used to set up mocks and configure the link + // before it is started. + options func(*channelLink) + + // link test is used to execute the given test on the channel + // link after it is started. + linkTest func(*testing.T, *channelLink, *lnwallet.LightningChannel) + + // shouldForceClose indicates whether we expect the link to + // force close the channel in response to the actions performed + // during the linkTest. + shouldForceClose bool + }{ + { + // Test that we don't force close if syncing states + // fails at startup. + func(c *channelLink) { + c.cfg.SyncStates = true + + // Make the syncChanStateCall fail by making + // the SendMessage call fail. + c.cfg.Peer.(*mockPeer).disconnected = true + }, + func(t *testing.T, c *channelLink, _ *lnwallet.LightningChannel) { + // Should fail at startup. + }, + false, + }, + { + // Test that we don't force closes the channel if + // resolving forward packages fails at startup. + func(c *channelLink) { + // We make the call to resolveFwdPkgs fail by + // making the underlying forwarder fail. + pkg := &mockPackager{ + failLoadFwdPkgs: true, + } + c.channel.State().Packager = pkg + }, + func(t *testing.T, c *channelLink, _ *lnwallet.LightningChannel) { + // Should fail at startup. + }, + false, + }, + { + // Test that we force close the channel if we receive + // an invalid Settle message. + func(c *channelLink) { + }, + func(t *testing.T, c *channelLink, _ *lnwallet.LightningChannel) { + // Recevive an htlc settle for an htlc that was + // never added. + htlcSettle := &lnwire.UpdateFulfillHTLC{ + ID: 0, + PaymentPreimage: [32]byte{}, + } + c.HandleChannelUpdate(htlcSettle) + }, + true, + }, + { + // Test that we force close the channel if we receive + // an invalid CommitSig, not containing enough HTLC + // sigs. + func(c *channelLink) { + }, + func(t *testing.T, c *channelLink, remoteChannel *lnwallet.LightningChannel) { + + // Generate an HTLC and send to the link. + htlc1 := generateHtlc(t, c, remoteChannel, 0) + sendHtlcBobToAlice(t, c, remoteChannel, htlc1) + + // Sign a commitment that will include + // signature for the HTLC just sent. + sig, htlcSigs, err := + remoteChannel.SignNextCommitment() + if err != nil { + t.Fatalf("error signing commitment: %v", + err) + } + + // Remove the HTLC sig, such that the commit + // sig will be invalid. + commitSig := &lnwire.CommitSig{ + CommitSig: sig, + HtlcSigs: htlcSigs[1:], + } + + c.HandleChannelUpdate(commitSig) + }, + true, + }, + { + // Test that we force close the channel if we receive + // an invalid CommitSig, where the sig itself is + // corrupted. + func(c *channelLink) { + }, + func(t *testing.T, c *channelLink, remoteChannel *lnwallet.LightningChannel) { + + // Generate an HTLC and send to the link. + htlc1 := generateHtlc(t, c, remoteChannel, 0) + sendHtlcBobToAlice(t, c, remoteChannel, htlc1) + + // Sign a commitment that will include + // signature for the HTLC just sent. + sig, htlcSigs, err := + remoteChannel.SignNextCommitment() + if err != nil { + t.Fatalf("error signing commitment: %v", + err) + } + + // Flip a bit on the signature, rendering it + // invalid. + sig[19] ^= 1 + commitSig := &lnwire.CommitSig{ + CommitSig: sig, + HtlcSigs: htlcSigs, + } + + c.HandleChannelUpdate(commitSig) + }, + true, + }, + } + + const chanAmt = btcutil.SatoshiPerBitcoin * 5 + const chanReserve = 0 + + // Execute each test case. + for i, test := range testCases { + link, remoteChannel, _, start, cleanUp, _, err := + newSingleLinkTestHarness(chanAmt, 0) + if err != nil { + t.Fatalf("unable to create link: %v", err) + } + + coreLink := link.(*channelLink) + + // Set up a channel used to check whether the link error + // force closed the channel. + linkErrors := make(chan LinkFailureError, 1) + coreLink.cfg.OnChannelFailure = func(_ lnwire.ChannelID, + _ lnwire.ShortChannelID, linkErr LinkFailureError) { + linkErrors <- linkErr + } + + // Set up the link before starting it. + test.options(coreLink) + if err := start(); err != nil { + t.Fatalf("unable to start test harness: %v", err) + } + + // Execute the test case. + test.linkTest(t, coreLink, remoteChannel) + + // Currently we expect all test cases to lead to link error. + var linkErr LinkFailureError + select { + case linkErr = <-linkErrors: + case <-time.After(10 * time.Second): + t.Fatalf("%d) Alice did not fail"+ + "channel", i) + } + + // If we expect the link to force close the channel in this + // case, check that it happens. If not, make sure it does not + // happen. + if test.shouldForceClose != linkErr.ForceClose { + t.Fatalf("%d) Expected Alice to force close(%v), "+ + "instead got(%v)", i, test.shouldForceClose, + linkErr.ForceClose) + } + + // Clean up before starting next test case. + cleanUp() + } +} diff --git a/htlcswitch/linkfailure.go b/htlcswitch/linkfailure.go new file mode 100644 index 00000000..2f44e434 --- /dev/null +++ b/htlcswitch/linkfailure.go @@ -0,0 +1,95 @@ +package htlcswitch + +import "github.com/go-errors/errors" + +var ( + // ErrLinkShuttingDown signals that the link is shutting down. + ErrLinkShuttingDown = errors.New("link shutting down") +) + +// errorCode encodes the possible types of errors that will make us fail the +// current link. +type errorCode uint8 + +const ( + // ErrInternalError indicates that something internal in the link + // failed. In this case we will send a generic error to our peer. + ErrInternalError errorCode = iota + + // ErrRemoteError indicates that our peer sent an error, prompting up + // to fail the link. + ErrRemoteError + + // ErrSyncError indicates that we failed synchronizing the state of the + // channel with our peer. + ErrSyncError + + // ErrInvalidUpdate indicates that the peer send us an invalid update. + ErrInvalidUpdate + + // ErrInvalidCommitment indicates that the remote peer sent us an + // invalid commitment signature. + ErrInvalidCommitment + + // ErrInvalidRevocation indicates that the remote peer send us an + // invalid revocation message. + ErrInvalidRevocation +) + +// LinkFailureError encapsulates an error that will make us fail the current +// link. It contains the necessary information needed to determine if we should +// force close the channel in the process, and if any error data should be sent +// to the peer. +type LinkFailureError struct { + // code is the type of error this LinkFailureError encapsulates. + code errorCode + + // ForceClose indicates whether we should force close the channel + // because of this error. + ForceClose bool + + // SendData is a byte slice that will be sent to the peer. If nil a + // generic error will be sent. + SendData []byte +} + +// A compile time check to ensure LinkFailureError implements the error +// interface. +var _ error = (*LinkFailureError)(nil) + +// Error returns a generic error for the LinkFailureError. +// +// NOTE: Part of the error interface. +func (e LinkFailureError) Error() string { + switch e.code { + case ErrInternalError: + return "internal error" + case ErrRemoteError: + return "remote error" + case ErrSyncError: + return "sync error" + case ErrInvalidUpdate: + return "invalid update" + case ErrInvalidCommitment: + return "invalid commitment" + case ErrInvalidRevocation: + return "invalid revocation" + default: + return "unknown error" + } +} + +// ShouldSendToPeer indicates whether we should send an error to the peer if +// the link fails with this LinkFailureError. +func (e LinkFailureError) ShouldSendToPeer() bool { + switch e.code { + // If the failure is a result of the peer sending us an error, we don't + // have to respond with one. + case ErrRemoteError: + return false + + // In all other cases we will attempt to send our peer an error message. + default: + return true + } +} diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 49238b7a..6c2c9af4 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -506,12 +506,6 @@ func (s *mockServer) PubKey() [33]byte { return s.id } -func (s *mockServer) Disconnect(reason error) { - fmt.Printf("server %v disconnected due to %v\n", s.name, reason) - - s.t.Fatalf("server %v was disconnected: %v", s.name, reason) -} - func (s *mockServer) WipeChannel(*wire.OutPoint) error { return nil } diff --git a/lnd_test.go b/lnd_test.go index 7105676e..4cff0f94 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -4068,6 +4068,199 @@ func waitForNTxsInMempool(miner *rpcclient.Client, n int, } } +// testFailingChannel tests that we will fail the channel by force closing ii +// in the case where a counterparty tries to settle an HTLC with the wrong +// preimage. +func testFailingChannel(net *lntest.NetworkHarness, t *harnessTest) { + ctxb := context.Background() + const ( + timeout = time.Duration(time.Second * 10) + chanAmt = maxFundingAmount + paymentAmt = 10000 + defaultCSV = 4 + ) + + // We'll introduce Carol, which will settle any incoming invoice with a + // totally unrelated preimage. + carol, err := net.NewNode("Carol", + []string{"--debughtlc", "--hodl.bogus-settle"}) + if err != nil { + t.Fatalf("unable to create new nodes: %v", err) + } + + // Let Alice connect and open a channel to Carol, + if err := net.ConnectNodes(ctxb, net.Alice, carol); err != nil { + t.Fatalf("unable to connect alice to carol: %v", err) + } + ctxt, _ := context.WithTimeout(ctxb, timeout) + chanPoint := openChannelAndAssert( + ctxt, t, net, net.Alice, carol, chanAmt, 0, false, + ) + + // With the channel open, we'll create a invoice for Carol that Alice + // will attempt to pay. + preimage := bytes.Repeat([]byte{byte(192)}, 32) + invoice := &lnrpc.Invoice{ + Memo: "testing", + RPreimage: preimage, + Value: paymentAmt, + } + resp, err := carol.AddInvoice(ctxb, invoice) + if err != nil { + t.Fatalf("unable to add invoice: %v", err) + } + carolPayReqs := []string{resp.PaymentRequest} + + // Wait for Alice to receive the channel edge from the funding manager. + ctxt, _ = context.WithTimeout(ctxb, timeout) + err = net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint) + if err != nil { + t.Fatalf("alice didn't see the alice->carol channel before "+ + "timeout: %v", err) + } + + // Send the payment from Alice to Carol. We expect Carol to attempt to + // settle this payment with the wrong preimage. + err = completePaymentRequests(ctxb, net.Alice, carolPayReqs, false) + if err != nil { + t.Fatalf("unable to send payments: %v", err) + } + + // Since Alice detects that Carol is trying to trick her by providing a + // fake preimage, she should fail and force close the channel. + var predErr error + err = lntest.WaitPredicate(func() bool { + pendingChansRequest := &lnrpc.PendingChannelsRequest{} + pendingChanResp, err := net.Alice.PendingChannels(ctxb, + pendingChansRequest) + if err != nil { + predErr = fmt.Errorf("unable to query for pending "+ + "channels: %v", err) + return false + } + n := len(pendingChanResp.WaitingCloseChannels) + if n != 1 { + predErr = fmt.Errorf("Expected to find %d channels "+ + "waiting close, found %d", 1, n) + return false + } + return true + }, time.Second*15) + if err != nil { + t.Fatalf("%v", predErr) + } + + // Mine a block to confirm the broadcasted commitment. + block := mineBlocks(t, net, 1)[0] + if len(block.Transactions) != 2 { + t.Fatalf("transaction wasn't mined") + } + + // The channel should now show up as force closed both for Alice and + // Carol. + err = lntest.WaitPredicate(func() bool { + pendingChansRequest := &lnrpc.PendingChannelsRequest{} + pendingChanResp, err := net.Alice.PendingChannels(ctxb, + pendingChansRequest) + if err != nil { + predErr = fmt.Errorf("unable to query for pending "+ + "channels: %v", err) + return false + } + n := len(pendingChanResp.WaitingCloseChannels) + if n != 0 { + predErr = fmt.Errorf("Expected to find %d channels "+ + "waiting close, found %d", 0, n) + return false + } + n = len(pendingChanResp.PendingForceClosingChannels) + if n != 1 { + predErr = fmt.Errorf("expected to find %d channel "+ + "pending force close, found %d", 1, n) + return false + } + return true + }, time.Second*15) + if err != nil { + t.Fatalf("%v", predErr) + } + + err = lntest.WaitPredicate(func() bool { + pendingChansRequest := &lnrpc.PendingChannelsRequest{} + pendingChanResp, err := carol.PendingChannels(ctxb, + pendingChansRequest) + if err != nil { + predErr = fmt.Errorf("unable to query for pending "+ + "channels: %v", err) + return false + } + n := len(pendingChanResp.PendingForceClosingChannels) + if n != 1 { + predErr = fmt.Errorf("expected to find %d channel "+ + "pending force close, found %d", 1, n) + return false + } + return true + }, time.Second*15) + if err != nil { + t.Fatalf("%v", predErr) + } + + // Carol will use the correct preimage to resolve the HTLC on-chain. + _, err = waitForTxInMempool(net.Miner.Node, 5*time.Second) + if err != nil { + t.Fatalf("unable to find Bob's breach tx in mempool: %v", err) + } + + // Mine enough blocks for Alice to sweep her funds from the force + // closed channel. + _, err = net.Miner.Node.Generate(defaultCSV) + if err != nil { + t.Fatalf("unable to generate blocks: %v", err) + } + + // Wait for the sweeping tx to be broadcast. + _, err = waitForTxInMempool(net.Miner.Node, 5*time.Second) + if err != nil { + t.Fatalf("unable to find Bob's breach tx in mempool: %v", err) + } + + // Mine the sweep. + _, err = net.Miner.Node.Generate(1) + if err != nil { + t.Fatalf("unable to generate blocks: %v", err) + } + + // No pending channels should be left. + err = lntest.WaitPredicate(func() bool { + pendingChansRequest := &lnrpc.PendingChannelsRequest{} + pendingChanResp, err := net.Alice.PendingChannels(ctxb, + pendingChansRequest) + if err != nil { + predErr = fmt.Errorf("unable to query for pending "+ + "channels: %v", err) + return false + } + n := len(pendingChanResp.PendingForceClosingChannels) + if n != 0 { + predErr = fmt.Errorf("expected to find %d channel "+ + "pending force close, found %d", 0, n) + return false + } + return true + }, time.Second*15) + if err != nil { + t.Fatalf("%v", predErr) + } + + // Finally, shutdown the node we created for the duration of the tests, + // only leaving the two seed nodes (Alice and Bob) within our test + // network. + if err := net.ShutdownNode(carol); err != nil { + t.Fatalf("unable to shutdown carol: %v", err) + } +} + // testRevokedCloseRetribution tests that Alice is able carry out // retribution in the event that she fails immediately after detecting Bob's // breach txn in the mempool. @@ -9183,6 +9376,10 @@ var testsCases = []*testCase{ name: "revoked uncooperative close retribution", test: testRevokedCloseRetribution, }, + { + name: "failing link", + test: testFailingChannel, + }, { name: "revoked uncooperative close retribution zero value remote output", test: testRevokedCloseRetributionZeroValueRemoteOutput, diff --git a/peer.go b/peer.go index a260a65c..9d8e7d5d 100644 --- a/peer.go +++ b/peer.go @@ -411,41 +411,11 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { lnChan.Stop() return err } - linkCfg := htlcswitch.ChannelLinkConfig{ - Peer: p, - DecodeHopIterators: p.server.sphinx.DecodeHopIterators, - ExtractErrorEncrypter: p.server.sphinx.ExtractErrorEncrypter, - FetchLastChannelUpdate: fetchLastChanUpdate( - p.server, p.PubKey(), - ), - DebugHTLC: cfg.DebugHTLC, - HodlMask: cfg.Hodl.Mask(), - Registry: p.server.invoices, - Switch: p.server.htlcSwitch, - Circuits: p.server.htlcSwitch.CircuitModifier(), - ForwardPackets: p.server.htlcSwitch.ForwardPackets, - FwrdingPolicy: *forwardingPolicy, - FeeEstimator: p.server.cc.feeEstimator, - BlockEpochs: blockEpoch, - PreimageCache: p.server.witnessBeacon, - ChainEvents: chainEvents, - UpdateContractSignals: func(signals *contractcourt.ContractSignals) error { - return p.server.chainArb.UpdateContractSignals( - *chanPoint, signals, - ) - }, - SyncStates: true, - BatchTicker: htlcswitch.NewBatchTicker( - time.NewTicker(50 * time.Millisecond)), - FwdPkgGCTicker: htlcswitch.NewBatchTicker( - time.NewTicker(time.Minute)), - BatchSize: 10, - UnsafeReplay: cfg.UnsafeReplay, - } - link := htlcswitch.NewChannelLink(linkCfg, lnChan, - uint32(currentHeight)) - if err := p.server.htlcSwitch.AddLink(link); err != nil { + // Create the link and add it to the switch. + err = p.addLink(chanPoint, lnChan, forwardingPolicy, blockEpoch, + chainEvents, currentHeight, true) + if err != nil { lnChan.Stop() return err } @@ -454,6 +424,132 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { return nil } +// addLink creates and adds a new link from the specified channel. +func (p *peer) addLink(chanPoint *wire.OutPoint, + lnChan *lnwallet.LightningChannel, + forwardingPolicy *htlcswitch.ForwardingPolicy, + blockEpoch *chainntnfs.BlockEpochEvent, + chainEvents *contractcourt.ChainEventSubscription, + currentHeight int32, syncStates bool) error { + + // onChannelFailure will be called by the link in case the channel + // fails for some reason. + onChannelFailure := func(chanID lnwire.ChannelID, + shortChanID lnwire.ShortChannelID, + linkErr htlcswitch.LinkFailureError) { + + // The link has notified us about a failure. We launch a go + // routine to stop the link, disconnect the peer and optionally + // force close the channel. We must launch a goroutine since we + // must let OnChannelFailure return in order for the link to + // completely stop in the call to RemoveLink. + p.wg.Add(1) + go func() { + defer p.wg.Done() + + // We begin by removing the link from the switch, such + // that it won't be attempted used for any more + // updates. + // TODO(halseth): should introduce a way to atomically + // stop/pause the link and cancel back any adds in its + // mailboxes such that we can safely force close + // without the link being added again and updates being + // applied. + err := p.server.htlcSwitch.RemoveLink(chanID) + if err != nil { + peerLog.Errorf("unable to stop link(%v): %v", + shortChanID, err) + } + + // If the error encountered was severe enough, we'll + // now force close the channel. + if linkErr.ForceClose { + peerLog.Warnf("Force closing link(%v)", + shortChanID) + + closeTx, err := p.server.chainArb.ForceCloseContract(*chanPoint) + if err != nil { + peerLog.Errorf("unable to force close "+ + "link(%v): %v", shortChanID, + err) + } else { + peerLog.Infof("channel(%v) force "+ + "closed with txid %v", + shortChanID, closeTx.TxHash()) + } + } + + // Send an error to the peer, why we failed the + // channel. + if linkErr.ShouldSendToPeer() { + // If SendData is set, send it to the peer. If + // not, we'll use the standard error messages + // in the payload. We only include sendData in + // the cases where the error data does not + // contain sensitive information. + data := []byte(linkErr.Error()) + if linkErr.SendData != nil { + data = linkErr.SendData + } + err := p.SendMessage(&lnwire.Error{ + ChanID: chanID, + Data: data, + }, true) + if err != nil { + peerLog.Errorf("unable to send msg to "+ + "remote peer: %v", err) + } + } + + // Initiate disconnection. + // TODO(halseth): consider not disconnecting the peer, + // as we might still have other active channels with + // the same peer. + p.Disconnect(linkErr) + }() + } + + linkCfg := htlcswitch.ChannelLinkConfig{ + Peer: p, + DecodeHopIterators: p.server.sphinx.DecodeHopIterators, + ExtractErrorEncrypter: p.server.sphinx.ExtractErrorEncrypter, + FetchLastChannelUpdate: fetchLastChanUpdate( + p.server, p.PubKey(), + ), + DebugHTLC: cfg.DebugHTLC, + HodlMask: cfg.Hodl.Mask(), + Registry: p.server.invoices, + Switch: p.server.htlcSwitch, + Circuits: p.server.htlcSwitch.CircuitModifier(), + ForwardPackets: p.server.htlcSwitch.ForwardPackets, + FwrdingPolicy: *forwardingPolicy, + FeeEstimator: p.server.cc.feeEstimator, + BlockEpochs: blockEpoch, + PreimageCache: p.server.witnessBeacon, + ChainEvents: chainEvents, + UpdateContractSignals: func(signals *contractcourt.ContractSignals) error { + return p.server.chainArb.UpdateContractSignals( + *chanPoint, signals, + ) + }, + OnChannelFailure: onChannelFailure, + SyncStates: syncStates, + BatchTicker: htlcswitch.NewBatchTicker( + time.NewTicker(50 * time.Millisecond)), + FwdPkgGCTicker: htlcswitch.NewBatchTicker( + time.NewTicker(time.Minute)), + BatchSize: 10, + UnsafeReplay: cfg.UnsafeReplay, + } + link := htlcswitch.NewChannelLink(linkCfg, lnChan, + uint32(currentHeight)) + + // With the channel link created, we'll now notify the htlc switch so + // this channel can be used to dispatch local payments and also + // passively forward payments. + return p.server.htlcSwitch.AddLink(link) +} + // WaitForDisconnect waits until the peer has disconnected. A peer may be // disconnected if the local or remote side terminating the connection, or an // irrecoverable protocol error has been encountered. @@ -1387,46 +1483,15 @@ out: "events: %v", err) continue } - linkConfig := htlcswitch.ChannelLinkConfig{ - Peer: p, - DecodeHopIterators: p.server.sphinx.DecodeHopIterators, - ExtractErrorEncrypter: p.server.sphinx.ExtractErrorEncrypter, - FetchLastChannelUpdate: fetchLastChanUpdate( - p.server, p.PubKey(), - ), - DebugHTLC: cfg.DebugHTLC, - HodlMask: cfg.Hodl.Mask(), - Registry: p.server.invoices, - Switch: p.server.htlcSwitch, - Circuits: p.server.htlcSwitch.CircuitModifier(), - ForwardPackets: p.server.htlcSwitch.ForwardPackets, - FwrdingPolicy: p.server.cc.routingPolicy, - FeeEstimator: p.server.cc.feeEstimator, - BlockEpochs: blockEpoch, - PreimageCache: p.server.witnessBeacon, - ChainEvents: chainEvents, - UpdateContractSignals: func(signals *contractcourt.ContractSignals) error { - return p.server.chainArb.UpdateContractSignals( - *chanPoint, signals, - ) - }, - SyncStates: false, - BatchTicker: htlcswitch.NewBatchTicker( - time.NewTicker(50 * time.Millisecond)), - FwdPkgGCTicker: htlcswitch.NewBatchTicker( - time.NewTicker(time.Minute)), - BatchSize: 10, - UnsafeReplay: cfg.UnsafeReplay, - } - link := htlcswitch.NewChannelLink(linkConfig, newChan, - uint32(currentHeight)) - // With the channel link created, we'll now notify the - // htlc switch so this channel can be used to dispatch - // local payments and also passively forward payments. - if err := p.server.htlcSwitch.AddLink(link); err != nil { + // Create the link and add it to the switch. + err = p.addLink(chanPoint, newChan, + &p.server.cc.routingPolicy, blockEpoch, + chainEvents, currentHeight, false) + if err != nil { peerLog.Errorf("can't register new channel "+ - "link(%v) with NodeKey(%x)", chanPoint, p.PubKey()) + "link(%v) with NodeKey(%x)", chanPoint, + p.PubKey()) } close(newChanReq.done)