From 215a47e6bf90a94f405321703c00195d8767c3a1 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 27 Jul 2018 04:35:35 -0700 Subject: [PATCH 1/8] channeldb/forwarding_package: loosen Add/SettleFailRef acking This commit loosens the fwdpkg reference acking to be more tolerant of prior deletions. Specifically, we won't fail if certain channels are not found or fwdpkgs do not exist. This will make us more tolerant to future changes where we: - remove fwdpkgs on channel close - defensively cleanup stray responses --- channeldb/forwarding_package.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/channeldb/forwarding_package.go b/channeldb/forwarding_package.go index 42497183..5817fc21 100644 --- a/channeldb/forwarding_package.go +++ b/channeldb/forwarding_package.go @@ -3,11 +3,11 @@ package channeldb import ( "bytes" "encoding/binary" + "errors" "fmt" "io" "github.com/coreos/bbolt" - "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/lnwire" ) @@ -759,7 +759,11 @@ func ackAddHtlcsAtHeight(sourceBkt *bolt.Bucket, height uint64, heightKey := makeLogKey(height) heightBkt := sourceBkt.Bucket(heightKey[:]) if heightBkt == nil { - return ErrCorruptedFwdPkg + // If the height bucket isn't found, this could be because the + // forwarding package was already removed. We'll return nil to + // signal that the operation is successful, as there is nothing + // to ack. + return nil } // Load ack filter from disk. @@ -824,12 +828,17 @@ func ackSettleFails(tx *bolt.Tx, settleFailRefs []SettleFailRef) error { } // With the references organized by destination and height, we now load - // each remote bucket, and update the settle fail filter for any + // each remote bucket, and update the settle fail filter for any // settle/fail htlcs. for dest, destHeights := range destHeightDiffs { destKey := makeLogKey(dest.ToUint64()) destBkt := fwdPkgBkt.Bucket(destKey[:]) if destBkt == nil { + // If the destination bucket is not found, this is + // likely the result of the destination channel being + // closed and having it's forwarding packages wiped. We + // won't treat this as an error, because the response + // will no longer be retransmitted internally. continue } @@ -852,6 +861,9 @@ func ackSettleFailsAtHeight(destBkt *bolt.Bucket, height uint64, heightKey := makeLogKey(height) heightBkt := destBkt.Bucket(heightKey[:]) if heightBkt == nil { + // If the height bucket isn't found, this could be because the + // forwarding package was already removed. We'll return nil to + // signal that the operation is as there is nothing to ack. return nil } From 81778664a723894ffb3653bc97f26fc92f71d953 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 27 Jul 2018 02:20:05 -0700 Subject: [PATCH 2/8] channeldb/channel: expose AckAddHtlc and AckSettleFail --- channeldb/channel.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/channeldb/channel.go b/channeldb/channel.go index 740d23ea..d0a28821 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -1597,6 +1597,31 @@ func (c *OpenChannel) LoadFwdPkgs() ([]*FwdPkg, error) { return fwdPkgs, nil } +// AckAddHtlcs updates the AckAddFilter containing any of the provided AddRefs +// indicating that a response to this Add has been committed to the remote party. +// Doing so will prevent these Add HTLCs from being reforwarded internally. +func (c *OpenChannel) AckAddHtlcs(addRefs ...AddRef) error { + c.Lock() + defer c.Unlock() + + return c.Db.Update(func(tx *bolt.Tx) error { + return c.Packager.AckAddHtlcs(tx, addRefs...) + }) +} + +// AckSettleFails updates the SettleFailFilter containing any of the provided +// SettleFailRefs, indicating that the response has been delivered to the +// incoming link, corresponding to a particular AddRef. Doing so will prevent +// the responses from being retransmitted internally. +func (c *OpenChannel) AckSettleFails(settleFailRefs ...SettleFailRef) error { + c.Lock() + defer c.Unlock() + + return c.Db.Update(func(tx *bolt.Tx) error { + return c.Packager.AckSettleFails(tx, settleFailRefs...) + }) +} + // SetFwdFilter atomically sets the forwarding filter for the forwarding package // identified by `height`. func (c *OpenChannel) SetFwdFilter(height uint64, fwdFilter *PkgFilter) error { From af6c4e5174984936fa8518d6fcf8c07f0c254c79 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 27 Jul 2018 02:20:53 -0700 Subject: [PATCH 3/8] channeldb/channel: adds readLogKey for chanids Adds helper method to parse short chanids used as keys in the forwarding package. --- channeldb/channel.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/channeldb/channel.go b/channeldb/channel.go index d0a28821..e5dc42ea 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -2440,12 +2440,20 @@ func deleteOpenChannel(chanBucket *bolt.Bucket, chanPointBytes []byte) error { } +// makeLogKey converts a uint64 into an 8 byte array. func makeLogKey(updateNum uint64) [8]byte { var key [8]byte byteOrder.PutUint64(key[:], updateNum) return key } +// readLogKey parse the first 8- bytes of a byte slice into a uint64. +// +// NOTE: The slice must be at least 8 bytes long. +func readLogKey(b []byte) uint64 { + return byteOrder.Uint64(b) +} + func appendChannelLogEntry(log *bolt.Bucket, commit *ChannelCommitment) error { From 0dca373bcd63d49df5b2a486237c8006aa2d85d4 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 7 Aug 2018 20:12:31 -0700 Subject: [PATCH 4/8] lnwallet/errors: create concrete errors for settle/fail --- lnwallet/errors.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/lnwallet/errors.go b/lnwallet/errors.go index a1f83edf..f36d0b99 100644 --- a/lnwallet/errors.go +++ b/lnwallet/errors.go @@ -125,3 +125,49 @@ func ErrChanTooSmall(chanSize, minChanSize btcutil.Amount) ReservationError { chanSize, minChanSize), } } + +// ErrHtlcIndexAlreadyFailed is returned when the HTLC index has already been +// failed, but has not been committed by our commitment state. +type ErrHtlcIndexAlreadyFailed uint64 + +// Error returns a message indicating the index that had already been failed. +func (e ErrHtlcIndexAlreadyFailed) Error() string { + return fmt.Sprintf("HTLC with ID %d has already been failed", e) +} + +// ErrHtlcIndexAlreadySettled is returned when the HTLC index has already been +// settled, but has not been committed by our commitment state. +type ErrHtlcIndexAlreadySettled uint64 + +// Error returns a message indicating the index that had already been settled. +func (e ErrHtlcIndexAlreadySettled) Error() string { + return fmt.Sprintf("HTLC with ID %d has already been settled", e) +} + +// ErrInvalidSettlePreimage is returned when trying to settle an HTLC, but the +// preimage does not correspond to the payment hash. +type ErrInvalidSettlePreimage struct { + preimage []byte + rhash []byte +} + +// Error returns an error message with the offending preimage and intended +// payment hash. +func (e ErrInvalidSettlePreimage) Error() string { + return fmt.Sprintf("Invalid payment preimage %x for hash %x", + e.preimage, e.rhash) +} + +// ErrUnknownHtlcIndex is returned when locally settling or failing an HTLC, but +// the HTLC index is not known to the channel. This typically indicates that the +// HTLC was already settled in a prior commitment. +type ErrUnknownHtlcIndex struct { + chanID lnwire.ShortChannelID + index uint64 +} + +// Error returns an error logging the channel and HTLC index that was unknown. +func (e ErrUnknownHtlcIndex) Error() string { + return fmt.Sprintf("No HTLC with ID %d in channel %v", + e.index, e.chanID) +} From 05308ec22c4a3db0333a0c9c32ae44580b248606 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 27 Jul 2018 03:22:15 -0700 Subject: [PATCH 5/8] lnwallet/channel: concrete HTLC errors and ack AddRefs --- lnwallet/channel.go | 52 ++++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 379ccbb7..f654d1c6 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -4433,6 +4433,22 @@ func (lc *LightningChannel) LoadFwdPkgs() ([]*channeldb.FwdPkg, error) { return lc.channelState.LoadFwdPkgs() } +// AckAddHtlcs sets a bit in the FwdFilter of a forwarding package belonging to +// this channel, that corresponds to the given AddRef. This method also succeeds +// if no forwarding package is found. +func (lc *LightningChannel) AckAddHtlcs(addRef channeldb.AddRef) error { + return lc.channelState.AckAddHtlcs(addRef) +} + +// AckSettleFails sets a bit in the SettleFailFilter of a forwarding package +// belonging to this channel, that corresponds to the given SettleFailRef. This +// method also succeeds if no forwarding package is found. +func (lc *LightningChannel) AckSettleFails( + settleFailRefs ...channeldb.SettleFailRef) error { + + return lc.channelState.AckSettleFails(settleFailRefs...) +} + // SetFwdFilter writes the forwarding decision for a given remote commitment // height. func (lc *LightningChannel) SetFwdFilter(height uint64, @@ -4572,21 +4588,18 @@ func (lc *LightningChannel) SettleHTLC(preimage [32]byte, htlc := lc.remoteUpdateLog.lookupHtlc(htlcIndex) if htlc == nil { - return fmt.Errorf("No HTLC with ID %d in channel %v", htlcIndex, - lc.ShortChanID()) + return ErrUnknownHtlcIndex{lc.ShortChanID(), htlcIndex} } // Now that we know the HTLC exists, before checking to see if the // preimage matches, we'll ensure that we haven't already attempted to // modify the HTLC. if lc.remoteUpdateLog.htlcHasModification(htlcIndex) { - return fmt.Errorf("HTLC with ID %d has already been settled", - htlcIndex) + return ErrHtlcIndexAlreadySettled(htlcIndex) } if htlc.RHash != sha256.Sum256(preimage[:]) { - return fmt.Errorf("Invalid payment preimage %x for hash %x", - preimage[:], htlc.RHash[:]) + return ErrInvalidSettlePreimage{preimage[:], htlc.RHash[:]} } pd := &PaymentDescriptor{ @@ -4620,21 +4633,18 @@ func (lc *LightningChannel) ReceiveHTLCSettle(preimage [32]byte, htlcIndex uint6 htlc := lc.localUpdateLog.lookupHtlc(htlcIndex) if htlc == nil { - return fmt.Errorf("No HTLC with ID %d in channel %v", htlcIndex, - lc.ShortChanID()) + return ErrUnknownHtlcIndex{lc.ShortChanID(), htlcIndex} } // Now that we know the HTLC exists, before checking to see if the // preimage matches, we'll ensure that they haven't already attempted // to modify the HTLC. if lc.localUpdateLog.htlcHasModification(htlcIndex) { - return fmt.Errorf("HTLC with ID %d has already been settled", - htlcIndex) + return ErrHtlcIndexAlreadySettled(htlcIndex) } if htlc.RHash != sha256.Sum256(preimage[:]) { - return fmt.Errorf("Invalid payment preimage %x for hash %x", - preimage[:], htlc.RHash[:]) + return ErrInvalidSettlePreimage{preimage[:], htlc.RHash[:]} } pd := &PaymentDescriptor{ @@ -4688,15 +4698,13 @@ func (lc *LightningChannel) FailHTLC(htlcIndex uint64, reason []byte, htlc := lc.remoteUpdateLog.lookupHtlc(htlcIndex) if htlc == nil { - return fmt.Errorf("No HTLC with ID %d in channel %v", htlcIndex, - lc.ShortChanID()) + return ErrUnknownHtlcIndex{lc.ShortChanID(), htlcIndex} } // Now that we know the HTLC exists, we'll ensure that we haven't // already attempted to fail the HTLC. if lc.remoteUpdateLog.htlcHasModification(htlcIndex) { - return fmt.Errorf("HTLC with ID %d has already been failed", - htlcIndex) + return ErrHtlcIndexAlreadyFailed(htlcIndex) } pd := &PaymentDescriptor{ @@ -4740,15 +4748,13 @@ func (lc *LightningChannel) MalformedFailHTLC(htlcIndex uint64, htlc := lc.remoteUpdateLog.lookupHtlc(htlcIndex) if htlc == nil { - return fmt.Errorf("No HTLC with ID %d in channel %v", htlcIndex, - lc.ShortChanID()) + return ErrUnknownHtlcIndex{lc.ShortChanID(), htlcIndex} } // Now that we know the HTLC exists, we'll ensure that we haven't // already attempted to fail the HTLC. if lc.remoteUpdateLog.htlcHasModification(htlcIndex) { - return fmt.Errorf("HTLC with ID %d has already been failed", - htlcIndex) + return ErrHtlcIndexAlreadyFailed(htlcIndex) } pd := &PaymentDescriptor{ @@ -4785,15 +4791,13 @@ func (lc *LightningChannel) ReceiveFailHTLC(htlcIndex uint64, reason []byte, htlc := lc.localUpdateLog.lookupHtlc(htlcIndex) if htlc == nil { - return fmt.Errorf("No HTLC with ID %d in channel %v", htlcIndex, - lc.ShortChanID()) + return ErrUnknownHtlcIndex{lc.ShortChanID(), htlcIndex} } // Now that we know the HTLC exists, we'll ensure that they haven't // already attempted to fail the HTLC. if lc.localUpdateLog.htlcHasModification(htlcIndex) { - return fmt.Errorf("HTLC with ID %d has already been failed", - htlcIndex) + return ErrHtlcIndexAlreadyFailed(htlcIndex) } pd := &PaymentDescriptor{ From 81b4af2ec8213d95a31b0b3af1468de3a838d74f Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 27 Jul 2018 03:21:12 -0700 Subject: [PATCH 6/8] htlcswitch/link: cleanup spurious fail/settle responses --- htlcswitch/link.go | 122 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 111 insertions(+), 11 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 0c1df9dc..54281500 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1043,6 +1043,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // mailbox, and the HTLC being added to the commitment state. if l.cfg.DebugHTLC && l.cfg.HodlMask.Active(hodl.AddOutgoing) { l.warnf(hodl.AddOutgoing.Warning()) + l.mailBox.AckPacket(pkt.inKey()) return } @@ -1097,6 +1098,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { err := lnwire.EncodeFailure(&b, failure, 0) if err != nil { l.errorf("unable to encode failure: %v", err) + l.mailBox.AckPacket(pkt.inKey()) return } reason = lnwire.OpaqueReason(b.Bytes()) @@ -1106,6 +1108,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { reason, err = pkt.obfuscator.EncryptFirstHop(failure) if err != nil { l.errorf("unable to obfuscate error: %v", err) + l.mailBox.AckPacket(pkt.inKey()) return } } @@ -1162,22 +1165,38 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // commitment state. if l.cfg.DebugHTLC && l.cfg.HodlMask.Active(hodl.SettleOutgoing) { l.warnf(hodl.SettleOutgoing.Warning()) + l.mailBox.AckPacket(pkt.inKey()) return } // An HTLC we forward to the switch has just settled somewhere // upstream. Therefore we settle the HTLC within the our local // state machine. - closedCircuitRef := pkt.inKey() - if err := l.channel.SettleHTLC( + inKey := pkt.inKey() + err := l.channel.SettleHTLC( htlc.PaymentPreimage, pkt.incomingHTLCID, pkt.sourceRef, pkt.destRef, - &closedCircuitRef, - ); err != nil { - l.fail(LinkFailureError{code: ErrInternalError}, - "unable to settle incoming HTLC: %v", err) + &inKey, + ) + if err != nil { + l.errorf("unable to settle incoming HTLC for "+ + "circuit-key=%v: %v", inKey, err) + + // If the HTLC index for Settle response was not known + // to our commitment state, it has already been + // cleaned up by a prior response. We'll thus try to + // clean up any lingering state to ensure we don't + // continue reforwarding. + if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok { + l.cleanupSpuriousResponse(pkt) + } + + // Remove the packet from the link's mailbox to ensure + // it doesn't get replayed after a reconnection. + l.mailBox.AckPacket(inKey) + return } @@ -1204,20 +1223,37 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // state. if l.cfg.DebugHTLC && l.cfg.HodlMask.Active(hodl.FailOutgoing) { l.warnf(hodl.FailOutgoing.Warning()) + l.mailBox.AckPacket(pkt.inKey()) return } // An HTLC cancellation has been triggered somewhere upstream, // we'll remove then HTLC from our local state machine. - closedCircuitRef := pkt.inKey() - if err := l.channel.FailHTLC( + inKey := pkt.inKey() + err := l.channel.FailHTLC( pkt.incomingHTLCID, htlc.Reason, pkt.sourceRef, pkt.destRef, - &closedCircuitRef, - ); err != nil { - log.Errorf("unable to cancel HTLC: %v", err) + &inKey, + ) + if err != nil { + l.errorf("unable to cancel incoming HTLC for "+ + "circuit-key=%v: %v", inKey, err) + + // If the HTLC index for Fail response was not known to + // our commitment state, it has already been cleaned up + // by a prior response. We'll thus try to clean up any + // lingering state to ensure we don't continue + // reforwarding. + if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok { + l.cleanupSpuriousResponse(pkt) + } + + // Remove the packet from the link's mailbox to ensure + // it doesn't get replayed after a reconnection. + l.mailBox.AckPacket(inKey) + return } @@ -1252,6 +1288,70 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { } } +// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef +// associated with this packet. If successful in doing so, it will also purge +// the open circuit from the circuit map and remove the packet from the link's +// mailbox. +func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) { + inKey := pkt.inKey() + + l.debugf("Cleaning up spurious response for incoming circuit-key=%v", + inKey) + + // If the htlc packet doesn't have a source reference, it is unsafe to + // proceed, as skipping this ack may cause the htlc to be reforwarded. + if pkt.sourceRef == nil { + l.errorf("uanble to cleanup response for incoming "+ + "circuit-key=%v, does not contain source reference", + inKey) + return + } + + // If the source reference is present, we will try to prevent this link + // from resending the packet to the switch. To do so, we ack the AddRef + // of the incoming HTLC belonging to this link. + err := l.channel.AckAddHtlcs(*pkt.sourceRef) + if err != nil { + l.errorf("unable to ack AddRef for incoming "+ + "circuit-key=%v: %v", inKey, err) + + // If this operation failed, it is unsafe to attempt removal of + // the destination reference or circuit, so we exit early. The + // cleanup may proceed with a different packet in the future + // that succeeds on this step. + return + } + + // Now that we know this link will stop retransmitting Adds to the + // switch, we can begin to teardown the response reference and circuit + // map. + // + // If the packet includes a destination reference, then a response for + // this HTLC was locked into the outgoing channel. Attempt to remove + // this reference, so we stop retransmitting the response internally. + // Even if this fails, we will proceed in trying to delete the circuit. + // When retransmitting responses, the destination references will be + // cleaned up if an open circuit is not found in the circuit map. + if pkt.destRef != nil { + err := l.channel.AckSettleFails(*pkt.destRef) + if err != nil { + l.errorf("unable to ack SettleFailRef "+ + "for incoming circuit-key=%v: %v", + inKey, err) + } + } + + l.debugf("Deleting circuit for incoming circuit-key=%x", inKey) + + // With all known references acked, we can now safely delete the circuit + // from the switch's circuit map, as the state is no longer needed. + err = l.cfg.Circuits.DeleteCircuits(inKey) + if err != nil { + l.errorf("unable to delete circuit for "+ + "circuit-key=%v: %v", inKey, err) + } +} + // handleUpstreamMsg processes wire messages related to commitment state // updates from the upstream peer. The upstream peer is the peer whom we have a // direct channel with, updating our respective commitment chains. From 6a8507cfb590f607fa7560918ff3df14c567bf70 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 27 Jul 2018 03:33:59 -0700 Subject: [PATCH 7/8] htlcswitch/switch: ignore pending/local responses on reforward --- htlcswitch/switch.go | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 4a547e59..e5c7bfbf 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -1628,15 +1628,35 @@ func (s *Switch) Start() error { // forwarding packages and reforwards any Settle or Fail HTLCs found. This is // used to resurrect the switch's mailboxes after a restart. func (s *Switch) reforwardResponses() error { - activeChannels, err := s.cfg.DB.FetchAllOpenChannels() + openChannels, err := s.cfg.DB.FetchAllOpenChannels() if err != nil { return err } - for _, activeChannel := range activeChannels { - shortChanID := activeChannel.ShortChanID() + for _, openChannel := range openChannels { + shortChanID := openChannel.ShortChanID() + + // Locally-initiated payments never need reforwarding. + if shortChanID == sourceHop { + continue + } + + // If the channel is pending, it should have no forwarding + // packages, and nothing to reforward. + if openChannel.IsPending { + continue + } + + // Channels in open or waiting-close may still have responses in + // their forwarding packages. We will continue to reattempt + // forwarding on startup until the channel is fully-closed. + // + // Load this channel's forwarding packages, and deliver them to + // the switch. fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID) if err != nil { + log.Errorf("unable to load forwarding "+ + "packages for %v: %v", shortChanID, err) return err } From 5d131e5164b3dbabda4869ac6e1cfda1f5a61758 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 20 Aug 2018 17:15:01 -0700 Subject: [PATCH 8/8] htlcswitch/link_test: assert delete spurious response --- htlcswitch/link_test.go | 333 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 333 insertions(+) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 9cddc4fc..a5f417be 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1465,6 +1465,7 @@ func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error { if m.disconnected { return fmt.Errorf("disconnected") } + select { case m.sentMsgs <- msgs[0]: case <-m.quit: @@ -4195,6 +4196,9 @@ func receiveRevAndAckAliceToBob(t *testing.T, aliceMsgs chan lnwire.Message, func receiveCommitSigAliceToBob(t *testing.T, aliceMsgs chan lnwire.Message, aliceLink ChannelLink, bobChannel *lnwallet.LightningChannel, expHtlcs int) { + + t.Helper() + var msg lnwire.Message select { case msg = <-aliceMsgs: @@ -4234,6 +4238,9 @@ func sendRevAndAckBobToAlice(t *testing.T, aliceLink ChannelLink, // Bob, then hands this to Bob. func receiveSettleAliceToBob(t *testing.T, aliceMsgs chan lnwire.Message, aliceLink ChannelLink, bobChannel *lnwallet.LightningChannel) { + + t.Helper() + var msg lnwire.Message select { case msg = <-aliceMsgs: @@ -4253,6 +4260,31 @@ func receiveSettleAliceToBob(t *testing.T, aliceMsgs chan lnwire.Message, } } +// receiveSettleAliceToBob waits for Alice to send a HTLC settle message to +// Bob, then hands this to Bob. +func receiveFailAliceToBob(t *testing.T, aliceMsgs chan lnwire.Message, + aliceLink ChannelLink, bobChannel *lnwallet.LightningChannel) { + + t.Helper() + + var msg lnwire.Message + select { + case msg = <-aliceMsgs: + case <-time.After(15 * time.Second): + t.Fatalf("did not receive message") + } + + failMsg, ok := msg.(*lnwire.UpdateFailHTLC) + if !ok { + t.Fatalf("expected UpdateFailHTLC, got %T", msg) + } + + err := bobChannel.ReceiveFailHTLC(failMsg.ID, failMsg.Reason) + if err != nil { + t.Fatalf("unable to apply received fail htlc: %v", err) + } +} + // TestChannelLinkNoMoreUpdates tests that we won't send a new commitment // when there are no new updates to sign. func TestChannelLinkNoMoreUpdates(t *testing.T) { @@ -4455,6 +4487,307 @@ func TestChannelLinkWaitForRevocation(t *testing.T) { } } +// TestChannelLinkCleanupSpuriousResponses tests that we properly cleanup +// references in the event that internal retransmission continues as a result of +// not properly cleaning up Add/SettleFailRefs. +func TestChannelLinkCleanupSpuriousResponses(t *testing.T) { + t.Parallel() + + const chanAmt = btcutil.SatoshiPerBitcoin * 5 + const chanReserve = btcutil.SatoshiPerBitcoin * 1 + aliceLink, bobChannel, _, start, cleanUp, _, err := + newSingleLinkTestHarness(chanAmt, chanReserve) + if err != nil { + t.Fatalf("unable to create link: %v", err) + } + defer cleanUp() + + if err := start(); err != nil { + t.Fatalf("unable to start test harness: %v", err) + } + + var ( + coreLink = aliceLink.(*channelLink) + aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs + ) + + // Settle Alice in hodl ExitSettle mode so that she won't respond + // immediately to the htlc's meant for her. This allows us to control + // the responses she gives back to Bob. + coreLink.cfg.DebugHTLC = true + coreLink.cfg.HodlMask = hodl.ExitSettle.Mask() + + // Add two HTLCs to Alice's registry, that Bob can pay. + htlc1 := generateHtlc(t, coreLink, bobChannel, 0) + htlc2 := generateHtlc(t, coreLink, bobChannel, 1) + + // We start with he following scenario: Bob sends Alice two HTLCs, and a + // commitment dance ensures, leaving two HTLCs that Alice can respond + // to. Since Alice is in ExitSettle mode, we will then take over and + // provide targetted fail messages to test the link's ability to cleanup + // spurious responses. + // + // Bob Alice + // |------ add-1 ----->| + // |------ add-2 ----->| + // |------ sig ----->| commits add-1 + add-2 + // |<----- rev ------| + // |<----- sig ------| commits add-1 + add-2 + // |------ rev ----->| + sendHtlcBobToAlice(t, aliceLink, bobChannel, htlc1) + sendHtlcBobToAlice(t, aliceLink, bobChannel, htlc2) + sendCommitSigBobToAlice(t, aliceLink, bobChannel, 2) + receiveRevAndAckAliceToBob(t, aliceMsgs, aliceLink, bobChannel) + receiveCommitSigAliceToBob(t, aliceMsgs, aliceLink, bobChannel, 2) + sendRevAndAckBobToAlice(t, aliceLink, bobChannel) + + // Give Alice to time to process the revocation. + time.Sleep(time.Second) + + aliceFwdPkgs, err := coreLink.channel.LoadFwdPkgs() + if err != nil { + t.Fatalf("unable to load alice's fwdpkgs: %v", err) + } + + // Alice should have exactly one forwarding package. + if len(aliceFwdPkgs) != 1 { + t.Fatalf("alice should have 1 fwd pkgs, has %d instead", + len(aliceFwdPkgs)) + } + + // We'll stash the height of these AddRefs, so that we can reconstruct + // the proper references later. + addHeight := aliceFwdPkgs[0].Height + + // The first fwdpkg should have exactly 2 entries, one for each Add that + // was added during the last dance. + if aliceFwdPkgs[0].AckFilter.Count() != 2 { + t.Fatalf("alice fwdpkg should have 2 Adds, has %d instead", + aliceFwdPkgs[0].AckFilter.Count()) + } + + // Both of the entries in the FwdFilter should be unacked. + for i := 0; i < 2; i++ { + if aliceFwdPkgs[0].AckFilter.Contains(uint16(i)) { + t.Fatalf("alice fwdpkg index %d should not "+ + "have ack", i) + } + } + + // Now, construct a Fail packet for Bob settling the first HTLC. This + // packet will NOT include a sourceRef, meaning the AddRef on disk will + // not be acked after committing this response. + fail0 := &htlcPacket{ + incomingChanID: bobChannel.ShortChanID(), + incomingHTLCID: 0, + obfuscator: NewMockObfuscator(), + htlc: &lnwire.UpdateFailHTLC{}, + } + aliceLink.HandleSwitchPacket(fail0) + + // Bob Alice + // |<----- fal-1 ------| + // |<----- sig ------| commits fal-1 + receiveFailAliceToBob(t, aliceMsgs, aliceLink, bobChannel) + receiveCommitSigAliceToBob(t, aliceMsgs, aliceLink, bobChannel, 1) + + aliceFwdPkgs, err = coreLink.channel.LoadFwdPkgs() + if err != nil { + t.Fatalf("unable to load alice's fwdpkgs: %v", err) + } + + // Alice should still only have one fwdpkg, as she hasn't yet received + // another revocation from Bob. + if len(aliceFwdPkgs) != 1 { + t.Fatalf("alice should have 1 fwd pkgs, has %d instead", + len(aliceFwdPkgs)) + } + + // Assert the fwdpkg still has 2 entries for the original Adds. + if aliceFwdPkgs[0].AckFilter.Count() != 2 { + t.Fatalf("alice fwdpkg should have 2 Adds, has %d instead", + aliceFwdPkgs[0].AckFilter.Count()) + } + + // Since the fail packet was missing the AddRef, the forward filter for + // either HTLC should not have been modified. + for i := 0; i < 2; i++ { + if aliceFwdPkgs[0].AckFilter.Contains(uint16(i)) { + t.Fatalf("alice fwdpkg index %d should not "+ + "have ack", i) + } + } + + // Complete the rest of the commitment dance, now that the forwarding + // packages have been verified. + // + // Bob Alice + // |------ rev ----->| + // |------ sig ----->| + // |<----- rev ------| + sendRevAndAckBobToAlice(t, aliceLink, bobChannel) + sendCommitSigBobToAlice(t, aliceLink, bobChannel, 1) + receiveRevAndAckAliceToBob(t, aliceMsgs, aliceLink, bobChannel) + + // Next, we'll construct a fail packet for add-2 (index 1), which we'll + // send to Bob and lock in. Since the AddRef is set on this instance, we + // should see the second HTLCs AddRef update the forward filter for the + // first fwd pkg. + fail1 := &htlcPacket{ + sourceRef: &channeldb.AddRef{ + Height: addHeight, + Index: 1, + }, + incomingChanID: bobChannel.ShortChanID(), + incomingHTLCID: 1, + obfuscator: NewMockObfuscator(), + htlc: &lnwire.UpdateFailHTLC{}, + } + aliceLink.HandleSwitchPacket(fail1) + + // Bob Alice + // |<----- fal-1 ------| + // |<----- sig ------| commits fal-1 + receiveFailAliceToBob(t, aliceMsgs, aliceLink, bobChannel) + receiveCommitSigAliceToBob(t, aliceMsgs, aliceLink, bobChannel, 0) + + aliceFwdPkgs, err = coreLink.channel.LoadFwdPkgs() + if err != nil { + t.Fatalf("unable to load alice's fwdpkgs: %v", err) + } + + // Now that another commitment dance has completed, Alice should have 2 + // forwarding packages. + if len(aliceFwdPkgs) != 2 { + t.Fatalf("alice should have 2 fwd pkgs, has %d instead", + len(aliceFwdPkgs)) + } + + // The most recent package should have no new HTLCs, so it should be + // empty. + if aliceFwdPkgs[1].AckFilter.Count() != 0 { + t.Fatalf("alice fwdpkg height=%d should have 0 Adds, "+ + "has %d instead", aliceFwdPkgs[1].Height, + aliceFwdPkgs[1].AckFilter.Count()) + } + + // The index for the first AddRef should still be unacked, as the + // sourceRef was missing on the htlcPacket. + if aliceFwdPkgs[0].AckFilter.Contains(0) { + t.Fatalf("alice fwdpkg height=%d index=0 should not "+ + "have an ack", aliceFwdPkgs[0].Height) + } + + // The index for the second AddRef should now be acked, as it was + // properly constructed and committed in Alice's last commit sig. + if !aliceFwdPkgs[0].AckFilter.Contains(1) { + t.Fatalf("alice fwdpkg height=%d index=1 should have "+ + "an ack", aliceFwdPkgs[0].Height) + } + + // Complete the rest of the commitment dance. + // + // Bob Alice + // |------ rev ----->| + // |------ sig ----->| + // |<----- rev ------| + sendRevAndAckBobToAlice(t, aliceLink, bobChannel) + sendCommitSigBobToAlice(t, aliceLink, bobChannel, 0) + receiveRevAndAckAliceToBob(t, aliceMsgs, aliceLink, bobChannel) + + // We'll do a quick sanity check, and blindly send the same fail packet + // for the first HTLC. Since this HTLC index has already been settled, + // this should trigger an attempt to cleanup the spurious response. + // However, we expect it to result in a NOP since it is still missing + // its sourceRef. + aliceLink.HandleSwitchPacket(fail0) + + // Allow the link enough time to process and reject the duplicate + // packet, we'll also check that this doesn't trigger Alice to send the + // fail to Bob. + select { + case <-aliceMsgs: + t.Fatalf("message sent for duplicate fail") + case <-time.After(time.Second): + } + + aliceFwdPkgs, err = coreLink.channel.LoadFwdPkgs() + if err != nil { + t.Fatalf("unable to load alice's fwdpkgs: %v", err) + } + + // Alice should now have 3 forwarding packages, and the latest should be + // empty. + if len(aliceFwdPkgs) != 3 { + t.Fatalf("alice should have 3 fwd pkgs, has %d instead", + len(aliceFwdPkgs)) + } + if aliceFwdPkgs[2].AckFilter.Count() != 0 { + t.Fatalf("alice fwdpkg height=%d should have 0 Adds, "+ + "has %d instead", aliceFwdPkgs[2].Height, + aliceFwdPkgs[2].AckFilter.Count()) + } + + // The state of the forwarding packages should be unmodified from the + // prior assertion, since the duplicate Fail for index 0 should have + // been ignored. + if aliceFwdPkgs[0].AckFilter.Contains(0) { + t.Fatalf("alice fwdpkg height=%d index=0 should not "+ + "have an ack", aliceFwdPkgs[0].Height) + } + if !aliceFwdPkgs[0].AckFilter.Contains(1) { + t.Fatalf("alice fwdpkg height=%d index=1 should have "+ + "an ack", aliceFwdPkgs[0].Height) + } + + // Finally, construct a new Fail packet for the first HTLC, this time + // with the sourceRef properly constructed. When the link handles this + // duplicate, it should clean up the remaining AddRef state maintained + // in Alice's link, but it should not result in anything being sent to + // Bob. + fail0 = &htlcPacket{ + sourceRef: &channeldb.AddRef{ + Height: addHeight, + Index: 0, + }, + incomingChanID: bobChannel.ShortChanID(), + incomingHTLCID: 0, + obfuscator: NewMockObfuscator(), + htlc: &lnwire.UpdateFailHTLC{}, + } + aliceLink.HandleSwitchPacket(fail0) + + // Allow the link enough time to process and reject the duplicate + // packet, we'll also check that this doesn't trigger Alice to send the + // fail to Bob. + select { + case <-aliceMsgs: + t.Fatalf("message sent for duplicate fail") + case <-time.After(time.Second): + } + + aliceFwdPkgs, err = coreLink.channel.LoadFwdPkgs() + if err != nil { + t.Fatalf("unable to load alice's fwdpkgs: %v", err) + } + + // Since no state transitions have been performed for the duplicate + // packets, Alice should still have the same 3 forwarding packages. + if len(aliceFwdPkgs) != 3 { + t.Fatalf("alice should have 3 fwd pkgs, has %d instead", + len(aliceFwdPkgs)) + } + + // Assert that all indices in our original forwarded have now been acked + // as a result of our spurious cleanup logic. + for i := 0; i < 2; i++ { + if !aliceFwdPkgs[0].AckFilter.Contains(uint16(i)) { + t.Fatalf("alice fwdpkg height=%d index=%d "+ + "should have ack", aliceFwdPkgs[0].Height, i) + } + } +} + type mockPackager struct { failLoadFwdPkgs bool }