diff --git a/channeldb/channel.go b/channeldb/channel.go index 740d23ea..e5dc42ea 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -1597,6 +1597,31 @@ func (c *OpenChannel) LoadFwdPkgs() ([]*FwdPkg, error) { return fwdPkgs, nil } +// AckAddHtlcs updates the AckAddFilter containing any of the provided AddRefs +// indicating that a response to this Add has been committed to the remote party. +// Doing so will prevent these Add HTLCs from being reforwarded internally. +func (c *OpenChannel) AckAddHtlcs(addRefs ...AddRef) error { + c.Lock() + defer c.Unlock() + + return c.Db.Update(func(tx *bolt.Tx) error { + return c.Packager.AckAddHtlcs(tx, addRefs...) + }) +} + +// AckSettleFails updates the SettleFailFilter containing any of the provided +// SettleFailRefs, indicating that the response has been delivered to the +// incoming link, corresponding to a particular AddRef. Doing so will prevent +// the responses from being retransmitted internally. +func (c *OpenChannel) AckSettleFails(settleFailRefs ...SettleFailRef) error { + c.Lock() + defer c.Unlock() + + return c.Db.Update(func(tx *bolt.Tx) error { + return c.Packager.AckSettleFails(tx, settleFailRefs...) + }) +} + // SetFwdFilter atomically sets the forwarding filter for the forwarding package // identified by `height`. func (c *OpenChannel) SetFwdFilter(height uint64, fwdFilter *PkgFilter) error { @@ -2415,12 +2440,20 @@ func deleteOpenChannel(chanBucket *bolt.Bucket, chanPointBytes []byte) error { } +// makeLogKey converts a uint64 into an 8 byte array. func makeLogKey(updateNum uint64) [8]byte { var key [8]byte byteOrder.PutUint64(key[:], updateNum) return key } +// readLogKey parse the first 8- bytes of a byte slice into a uint64. +// +// NOTE: The slice must be at least 8 bytes long. +func readLogKey(b []byte) uint64 { + return byteOrder.Uint64(b) +} + func appendChannelLogEntry(log *bolt.Bucket, commit *ChannelCommitment) error { diff --git a/channeldb/forwarding_package.go b/channeldb/forwarding_package.go index 42497183..5817fc21 100644 --- a/channeldb/forwarding_package.go +++ b/channeldb/forwarding_package.go @@ -3,11 +3,11 @@ package channeldb import ( "bytes" "encoding/binary" + "errors" "fmt" "io" "github.com/coreos/bbolt" - "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/lnwire" ) @@ -759,7 +759,11 @@ func ackAddHtlcsAtHeight(sourceBkt *bolt.Bucket, height uint64, heightKey := makeLogKey(height) heightBkt := sourceBkt.Bucket(heightKey[:]) if heightBkt == nil { - return ErrCorruptedFwdPkg + // If the height bucket isn't found, this could be because the + // forwarding package was already removed. We'll return nil to + // signal that the operation is successful, as there is nothing + // to ack. + return nil } // Load ack filter from disk. @@ -824,12 +828,17 @@ func ackSettleFails(tx *bolt.Tx, settleFailRefs []SettleFailRef) error { } // With the references organized by destination and height, we now load - // each remote bucket, and update the settle fail filter for any + // each remote bucket, and update the settle fail filter for any // settle/fail htlcs. for dest, destHeights := range destHeightDiffs { destKey := makeLogKey(dest.ToUint64()) destBkt := fwdPkgBkt.Bucket(destKey[:]) if destBkt == nil { + // If the destination bucket is not found, this is + // likely the result of the destination channel being + // closed and having it's forwarding packages wiped. We + // won't treat this as an error, because the response + // will no longer be retransmitted internally. continue } @@ -852,6 +861,9 @@ func ackSettleFailsAtHeight(destBkt *bolt.Bucket, height uint64, heightKey := makeLogKey(height) heightBkt := destBkt.Bucket(heightKey[:]) if heightBkt == nil { + // If the height bucket isn't found, this could be because the + // forwarding package was already removed. We'll return nil to + // signal that the operation is as there is nothing to ack. return nil } diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 0c1df9dc..54281500 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1043,6 +1043,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // mailbox, and the HTLC being added to the commitment state. if l.cfg.DebugHTLC && l.cfg.HodlMask.Active(hodl.AddOutgoing) { l.warnf(hodl.AddOutgoing.Warning()) + l.mailBox.AckPacket(pkt.inKey()) return } @@ -1097,6 +1098,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { err := lnwire.EncodeFailure(&b, failure, 0) if err != nil { l.errorf("unable to encode failure: %v", err) + l.mailBox.AckPacket(pkt.inKey()) return } reason = lnwire.OpaqueReason(b.Bytes()) @@ -1106,6 +1108,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { reason, err = pkt.obfuscator.EncryptFirstHop(failure) if err != nil { l.errorf("unable to obfuscate error: %v", err) + l.mailBox.AckPacket(pkt.inKey()) return } } @@ -1162,22 +1165,38 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // commitment state. if l.cfg.DebugHTLC && l.cfg.HodlMask.Active(hodl.SettleOutgoing) { l.warnf(hodl.SettleOutgoing.Warning()) + l.mailBox.AckPacket(pkt.inKey()) return } // An HTLC we forward to the switch has just settled somewhere // upstream. Therefore we settle the HTLC within the our local // state machine. - closedCircuitRef := pkt.inKey() - if err := l.channel.SettleHTLC( + inKey := pkt.inKey() + err := l.channel.SettleHTLC( htlc.PaymentPreimage, pkt.incomingHTLCID, pkt.sourceRef, pkt.destRef, - &closedCircuitRef, - ); err != nil { - l.fail(LinkFailureError{code: ErrInternalError}, - "unable to settle incoming HTLC: %v", err) + &inKey, + ) + if err != nil { + l.errorf("unable to settle incoming HTLC for "+ + "circuit-key=%v: %v", inKey, err) + + // If the HTLC index for Settle response was not known + // to our commitment state, it has already been + // cleaned up by a prior response. We'll thus try to + // clean up any lingering state to ensure we don't + // continue reforwarding. + if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok { + l.cleanupSpuriousResponse(pkt) + } + + // Remove the packet from the link's mailbox to ensure + // it doesn't get replayed after a reconnection. + l.mailBox.AckPacket(inKey) + return } @@ -1204,20 +1223,37 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // state. if l.cfg.DebugHTLC && l.cfg.HodlMask.Active(hodl.FailOutgoing) { l.warnf(hodl.FailOutgoing.Warning()) + l.mailBox.AckPacket(pkt.inKey()) return } // An HTLC cancellation has been triggered somewhere upstream, // we'll remove then HTLC from our local state machine. - closedCircuitRef := pkt.inKey() - if err := l.channel.FailHTLC( + inKey := pkt.inKey() + err := l.channel.FailHTLC( pkt.incomingHTLCID, htlc.Reason, pkt.sourceRef, pkt.destRef, - &closedCircuitRef, - ); err != nil { - log.Errorf("unable to cancel HTLC: %v", err) + &inKey, + ) + if err != nil { + l.errorf("unable to cancel incoming HTLC for "+ + "circuit-key=%v: %v", inKey, err) + + // If the HTLC index for Fail response was not known to + // our commitment state, it has already been cleaned up + // by a prior response. We'll thus try to clean up any + // lingering state to ensure we don't continue + // reforwarding. + if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok { + l.cleanupSpuriousResponse(pkt) + } + + // Remove the packet from the link's mailbox to ensure + // it doesn't get replayed after a reconnection. + l.mailBox.AckPacket(inKey) + return } @@ -1252,6 +1288,70 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { } } +// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef +// associated with this packet. If successful in doing so, it will also purge +// the open circuit from the circuit map and remove the packet from the link's +// mailbox. +func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) { + inKey := pkt.inKey() + + l.debugf("Cleaning up spurious response for incoming circuit-key=%v", + inKey) + + // If the htlc packet doesn't have a source reference, it is unsafe to + // proceed, as skipping this ack may cause the htlc to be reforwarded. + if pkt.sourceRef == nil { + l.errorf("uanble to cleanup response for incoming "+ + "circuit-key=%v, does not contain source reference", + inKey) + return + } + + // If the source reference is present, we will try to prevent this link + // from resending the packet to the switch. To do so, we ack the AddRef + // of the incoming HTLC belonging to this link. + err := l.channel.AckAddHtlcs(*pkt.sourceRef) + if err != nil { + l.errorf("unable to ack AddRef for incoming "+ + "circuit-key=%v: %v", inKey, err) + + // If this operation failed, it is unsafe to attempt removal of + // the destination reference or circuit, so we exit early. The + // cleanup may proceed with a different packet in the future + // that succeeds on this step. + return + } + + // Now that we know this link will stop retransmitting Adds to the + // switch, we can begin to teardown the response reference and circuit + // map. + // + // If the packet includes a destination reference, then a response for + // this HTLC was locked into the outgoing channel. Attempt to remove + // this reference, so we stop retransmitting the response internally. + // Even if this fails, we will proceed in trying to delete the circuit. + // When retransmitting responses, the destination references will be + // cleaned up if an open circuit is not found in the circuit map. + if pkt.destRef != nil { + err := l.channel.AckSettleFails(*pkt.destRef) + if err != nil { + l.errorf("unable to ack SettleFailRef "+ + "for incoming circuit-key=%v: %v", + inKey, err) + } + } + + l.debugf("Deleting circuit for incoming circuit-key=%x", inKey) + + // With all known references acked, we can now safely delete the circuit + // from the switch's circuit map, as the state is no longer needed. + err = l.cfg.Circuits.DeleteCircuits(inKey) + if err != nil { + l.errorf("unable to delete circuit for "+ + "circuit-key=%v: %v", inKey, err) + } +} + // handleUpstreamMsg processes wire messages related to commitment state // updates from the upstream peer. The upstream peer is the peer whom we have a // direct channel with, updating our respective commitment chains. diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 9cddc4fc..a5f417be 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1465,6 +1465,7 @@ func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error { if m.disconnected { return fmt.Errorf("disconnected") } + select { case m.sentMsgs <- msgs[0]: case <-m.quit: @@ -4195,6 +4196,9 @@ func receiveRevAndAckAliceToBob(t *testing.T, aliceMsgs chan lnwire.Message, func receiveCommitSigAliceToBob(t *testing.T, aliceMsgs chan lnwire.Message, aliceLink ChannelLink, bobChannel *lnwallet.LightningChannel, expHtlcs int) { + + t.Helper() + var msg lnwire.Message select { case msg = <-aliceMsgs: @@ -4234,6 +4238,9 @@ func sendRevAndAckBobToAlice(t *testing.T, aliceLink ChannelLink, // Bob, then hands this to Bob. func receiveSettleAliceToBob(t *testing.T, aliceMsgs chan lnwire.Message, aliceLink ChannelLink, bobChannel *lnwallet.LightningChannel) { + + t.Helper() + var msg lnwire.Message select { case msg = <-aliceMsgs: @@ -4253,6 +4260,31 @@ func receiveSettleAliceToBob(t *testing.T, aliceMsgs chan lnwire.Message, } } +// receiveSettleAliceToBob waits for Alice to send a HTLC settle message to +// Bob, then hands this to Bob. +func receiveFailAliceToBob(t *testing.T, aliceMsgs chan lnwire.Message, + aliceLink ChannelLink, bobChannel *lnwallet.LightningChannel) { + + t.Helper() + + var msg lnwire.Message + select { + case msg = <-aliceMsgs: + case <-time.After(15 * time.Second): + t.Fatalf("did not receive message") + } + + failMsg, ok := msg.(*lnwire.UpdateFailHTLC) + if !ok { + t.Fatalf("expected UpdateFailHTLC, got %T", msg) + } + + err := bobChannel.ReceiveFailHTLC(failMsg.ID, failMsg.Reason) + if err != nil { + t.Fatalf("unable to apply received fail htlc: %v", err) + } +} + // TestChannelLinkNoMoreUpdates tests that we won't send a new commitment // when there are no new updates to sign. func TestChannelLinkNoMoreUpdates(t *testing.T) { @@ -4455,6 +4487,307 @@ func TestChannelLinkWaitForRevocation(t *testing.T) { } } +// TestChannelLinkCleanupSpuriousResponses tests that we properly cleanup +// references in the event that internal retransmission continues as a result of +// not properly cleaning up Add/SettleFailRefs. +func TestChannelLinkCleanupSpuriousResponses(t *testing.T) { + t.Parallel() + + const chanAmt = btcutil.SatoshiPerBitcoin * 5 + const chanReserve = btcutil.SatoshiPerBitcoin * 1 + aliceLink, bobChannel, _, start, cleanUp, _, err := + newSingleLinkTestHarness(chanAmt, chanReserve) + if err != nil { + t.Fatalf("unable to create link: %v", err) + } + defer cleanUp() + + if err := start(); err != nil { + t.Fatalf("unable to start test harness: %v", err) + } + + var ( + coreLink = aliceLink.(*channelLink) + aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs + ) + + // Settle Alice in hodl ExitSettle mode so that she won't respond + // immediately to the htlc's meant for her. This allows us to control + // the responses she gives back to Bob. + coreLink.cfg.DebugHTLC = true + coreLink.cfg.HodlMask = hodl.ExitSettle.Mask() + + // Add two HTLCs to Alice's registry, that Bob can pay. + htlc1 := generateHtlc(t, coreLink, bobChannel, 0) + htlc2 := generateHtlc(t, coreLink, bobChannel, 1) + + // We start with he following scenario: Bob sends Alice two HTLCs, and a + // commitment dance ensures, leaving two HTLCs that Alice can respond + // to. Since Alice is in ExitSettle mode, we will then take over and + // provide targetted fail messages to test the link's ability to cleanup + // spurious responses. + // + // Bob Alice + // |------ add-1 ----->| + // |------ add-2 ----->| + // |------ sig ----->| commits add-1 + add-2 + // |<----- rev ------| + // |<----- sig ------| commits add-1 + add-2 + // |------ rev ----->| + sendHtlcBobToAlice(t, aliceLink, bobChannel, htlc1) + sendHtlcBobToAlice(t, aliceLink, bobChannel, htlc2) + sendCommitSigBobToAlice(t, aliceLink, bobChannel, 2) + receiveRevAndAckAliceToBob(t, aliceMsgs, aliceLink, bobChannel) + receiveCommitSigAliceToBob(t, aliceMsgs, aliceLink, bobChannel, 2) + sendRevAndAckBobToAlice(t, aliceLink, bobChannel) + + // Give Alice to time to process the revocation. + time.Sleep(time.Second) + + aliceFwdPkgs, err := coreLink.channel.LoadFwdPkgs() + if err != nil { + t.Fatalf("unable to load alice's fwdpkgs: %v", err) + } + + // Alice should have exactly one forwarding package. + if len(aliceFwdPkgs) != 1 { + t.Fatalf("alice should have 1 fwd pkgs, has %d instead", + len(aliceFwdPkgs)) + } + + // We'll stash the height of these AddRefs, so that we can reconstruct + // the proper references later. + addHeight := aliceFwdPkgs[0].Height + + // The first fwdpkg should have exactly 2 entries, one for each Add that + // was added during the last dance. + if aliceFwdPkgs[0].AckFilter.Count() != 2 { + t.Fatalf("alice fwdpkg should have 2 Adds, has %d instead", + aliceFwdPkgs[0].AckFilter.Count()) + } + + // Both of the entries in the FwdFilter should be unacked. + for i := 0; i < 2; i++ { + if aliceFwdPkgs[0].AckFilter.Contains(uint16(i)) { + t.Fatalf("alice fwdpkg index %d should not "+ + "have ack", i) + } + } + + // Now, construct a Fail packet for Bob settling the first HTLC. This + // packet will NOT include a sourceRef, meaning the AddRef on disk will + // not be acked after committing this response. + fail0 := &htlcPacket{ + incomingChanID: bobChannel.ShortChanID(), + incomingHTLCID: 0, + obfuscator: NewMockObfuscator(), + htlc: &lnwire.UpdateFailHTLC{}, + } + aliceLink.HandleSwitchPacket(fail0) + + // Bob Alice + // |<----- fal-1 ------| + // |<----- sig ------| commits fal-1 + receiveFailAliceToBob(t, aliceMsgs, aliceLink, bobChannel) + receiveCommitSigAliceToBob(t, aliceMsgs, aliceLink, bobChannel, 1) + + aliceFwdPkgs, err = coreLink.channel.LoadFwdPkgs() + if err != nil { + t.Fatalf("unable to load alice's fwdpkgs: %v", err) + } + + // Alice should still only have one fwdpkg, as she hasn't yet received + // another revocation from Bob. + if len(aliceFwdPkgs) != 1 { + t.Fatalf("alice should have 1 fwd pkgs, has %d instead", + len(aliceFwdPkgs)) + } + + // Assert the fwdpkg still has 2 entries for the original Adds. + if aliceFwdPkgs[0].AckFilter.Count() != 2 { + t.Fatalf("alice fwdpkg should have 2 Adds, has %d instead", + aliceFwdPkgs[0].AckFilter.Count()) + } + + // Since the fail packet was missing the AddRef, the forward filter for + // either HTLC should not have been modified. + for i := 0; i < 2; i++ { + if aliceFwdPkgs[0].AckFilter.Contains(uint16(i)) { + t.Fatalf("alice fwdpkg index %d should not "+ + "have ack", i) + } + } + + // Complete the rest of the commitment dance, now that the forwarding + // packages have been verified. + // + // Bob Alice + // |------ rev ----->| + // |------ sig ----->| + // |<----- rev ------| + sendRevAndAckBobToAlice(t, aliceLink, bobChannel) + sendCommitSigBobToAlice(t, aliceLink, bobChannel, 1) + receiveRevAndAckAliceToBob(t, aliceMsgs, aliceLink, bobChannel) + + // Next, we'll construct a fail packet for add-2 (index 1), which we'll + // send to Bob and lock in. Since the AddRef is set on this instance, we + // should see the second HTLCs AddRef update the forward filter for the + // first fwd pkg. + fail1 := &htlcPacket{ + sourceRef: &channeldb.AddRef{ + Height: addHeight, + Index: 1, + }, + incomingChanID: bobChannel.ShortChanID(), + incomingHTLCID: 1, + obfuscator: NewMockObfuscator(), + htlc: &lnwire.UpdateFailHTLC{}, + } + aliceLink.HandleSwitchPacket(fail1) + + // Bob Alice + // |<----- fal-1 ------| + // |<----- sig ------| commits fal-1 + receiveFailAliceToBob(t, aliceMsgs, aliceLink, bobChannel) + receiveCommitSigAliceToBob(t, aliceMsgs, aliceLink, bobChannel, 0) + + aliceFwdPkgs, err = coreLink.channel.LoadFwdPkgs() + if err != nil { + t.Fatalf("unable to load alice's fwdpkgs: %v", err) + } + + // Now that another commitment dance has completed, Alice should have 2 + // forwarding packages. + if len(aliceFwdPkgs) != 2 { + t.Fatalf("alice should have 2 fwd pkgs, has %d instead", + len(aliceFwdPkgs)) + } + + // The most recent package should have no new HTLCs, so it should be + // empty. + if aliceFwdPkgs[1].AckFilter.Count() != 0 { + t.Fatalf("alice fwdpkg height=%d should have 0 Adds, "+ + "has %d instead", aliceFwdPkgs[1].Height, + aliceFwdPkgs[1].AckFilter.Count()) + } + + // The index for the first AddRef should still be unacked, as the + // sourceRef was missing on the htlcPacket. + if aliceFwdPkgs[0].AckFilter.Contains(0) { + t.Fatalf("alice fwdpkg height=%d index=0 should not "+ + "have an ack", aliceFwdPkgs[0].Height) + } + + // The index for the second AddRef should now be acked, as it was + // properly constructed and committed in Alice's last commit sig. + if !aliceFwdPkgs[0].AckFilter.Contains(1) { + t.Fatalf("alice fwdpkg height=%d index=1 should have "+ + "an ack", aliceFwdPkgs[0].Height) + } + + // Complete the rest of the commitment dance. + // + // Bob Alice + // |------ rev ----->| + // |------ sig ----->| + // |<----- rev ------| + sendRevAndAckBobToAlice(t, aliceLink, bobChannel) + sendCommitSigBobToAlice(t, aliceLink, bobChannel, 0) + receiveRevAndAckAliceToBob(t, aliceMsgs, aliceLink, bobChannel) + + // We'll do a quick sanity check, and blindly send the same fail packet + // for the first HTLC. Since this HTLC index has already been settled, + // this should trigger an attempt to cleanup the spurious response. + // However, we expect it to result in a NOP since it is still missing + // its sourceRef. + aliceLink.HandleSwitchPacket(fail0) + + // Allow the link enough time to process and reject the duplicate + // packet, we'll also check that this doesn't trigger Alice to send the + // fail to Bob. + select { + case <-aliceMsgs: + t.Fatalf("message sent for duplicate fail") + case <-time.After(time.Second): + } + + aliceFwdPkgs, err = coreLink.channel.LoadFwdPkgs() + if err != nil { + t.Fatalf("unable to load alice's fwdpkgs: %v", err) + } + + // Alice should now have 3 forwarding packages, and the latest should be + // empty. + if len(aliceFwdPkgs) != 3 { + t.Fatalf("alice should have 3 fwd pkgs, has %d instead", + len(aliceFwdPkgs)) + } + if aliceFwdPkgs[2].AckFilter.Count() != 0 { + t.Fatalf("alice fwdpkg height=%d should have 0 Adds, "+ + "has %d instead", aliceFwdPkgs[2].Height, + aliceFwdPkgs[2].AckFilter.Count()) + } + + // The state of the forwarding packages should be unmodified from the + // prior assertion, since the duplicate Fail for index 0 should have + // been ignored. + if aliceFwdPkgs[0].AckFilter.Contains(0) { + t.Fatalf("alice fwdpkg height=%d index=0 should not "+ + "have an ack", aliceFwdPkgs[0].Height) + } + if !aliceFwdPkgs[0].AckFilter.Contains(1) { + t.Fatalf("alice fwdpkg height=%d index=1 should have "+ + "an ack", aliceFwdPkgs[0].Height) + } + + // Finally, construct a new Fail packet for the first HTLC, this time + // with the sourceRef properly constructed. When the link handles this + // duplicate, it should clean up the remaining AddRef state maintained + // in Alice's link, but it should not result in anything being sent to + // Bob. + fail0 = &htlcPacket{ + sourceRef: &channeldb.AddRef{ + Height: addHeight, + Index: 0, + }, + incomingChanID: bobChannel.ShortChanID(), + incomingHTLCID: 0, + obfuscator: NewMockObfuscator(), + htlc: &lnwire.UpdateFailHTLC{}, + } + aliceLink.HandleSwitchPacket(fail0) + + // Allow the link enough time to process and reject the duplicate + // packet, we'll also check that this doesn't trigger Alice to send the + // fail to Bob. + select { + case <-aliceMsgs: + t.Fatalf("message sent for duplicate fail") + case <-time.After(time.Second): + } + + aliceFwdPkgs, err = coreLink.channel.LoadFwdPkgs() + if err != nil { + t.Fatalf("unable to load alice's fwdpkgs: %v", err) + } + + // Since no state transitions have been performed for the duplicate + // packets, Alice should still have the same 3 forwarding packages. + if len(aliceFwdPkgs) != 3 { + t.Fatalf("alice should have 3 fwd pkgs, has %d instead", + len(aliceFwdPkgs)) + } + + // Assert that all indices in our original forwarded have now been acked + // as a result of our spurious cleanup logic. + for i := 0; i < 2; i++ { + if !aliceFwdPkgs[0].AckFilter.Contains(uint16(i)) { + t.Fatalf("alice fwdpkg height=%d index=%d "+ + "should have ack", aliceFwdPkgs[0].Height, i) + } + } +} + type mockPackager struct { failLoadFwdPkgs bool } diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index b7df23f8..8f94a9a0 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -1636,15 +1636,35 @@ func (s *Switch) Start() error { // forwarding packages and reforwards any Settle or Fail HTLCs found. This is // used to resurrect the switch's mailboxes after a restart. func (s *Switch) reforwardResponses() error { - activeChannels, err := s.cfg.DB.FetchAllOpenChannels() + openChannels, err := s.cfg.DB.FetchAllOpenChannels() if err != nil { return err } - for _, activeChannel := range activeChannels { - shortChanID := activeChannel.ShortChanID() + for _, openChannel := range openChannels { + shortChanID := openChannel.ShortChanID() + + // Locally-initiated payments never need reforwarding. + if shortChanID == sourceHop { + continue + } + + // If the channel is pending, it should have no forwarding + // packages, and nothing to reforward. + if openChannel.IsPending { + continue + } + + // Channels in open or waiting-close may still have responses in + // their forwarding packages. We will continue to reattempt + // forwarding on startup until the channel is fully-closed. + // + // Load this channel's forwarding packages, and deliver them to + // the switch. fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID) if err != nil { + log.Errorf("unable to load forwarding "+ + "packages for %v: %v", shortChanID, err) return err } diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 379ccbb7..f654d1c6 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -4433,6 +4433,22 @@ func (lc *LightningChannel) LoadFwdPkgs() ([]*channeldb.FwdPkg, error) { return lc.channelState.LoadFwdPkgs() } +// AckAddHtlcs sets a bit in the FwdFilter of a forwarding package belonging to +// this channel, that corresponds to the given AddRef. This method also succeeds +// if no forwarding package is found. +func (lc *LightningChannel) AckAddHtlcs(addRef channeldb.AddRef) error { + return lc.channelState.AckAddHtlcs(addRef) +} + +// AckSettleFails sets a bit in the SettleFailFilter of a forwarding package +// belonging to this channel, that corresponds to the given SettleFailRef. This +// method also succeeds if no forwarding package is found. +func (lc *LightningChannel) AckSettleFails( + settleFailRefs ...channeldb.SettleFailRef) error { + + return lc.channelState.AckSettleFails(settleFailRefs...) +} + // SetFwdFilter writes the forwarding decision for a given remote commitment // height. func (lc *LightningChannel) SetFwdFilter(height uint64, @@ -4572,21 +4588,18 @@ func (lc *LightningChannel) SettleHTLC(preimage [32]byte, htlc := lc.remoteUpdateLog.lookupHtlc(htlcIndex) if htlc == nil { - return fmt.Errorf("No HTLC with ID %d in channel %v", htlcIndex, - lc.ShortChanID()) + return ErrUnknownHtlcIndex{lc.ShortChanID(), htlcIndex} } // Now that we know the HTLC exists, before checking to see if the // preimage matches, we'll ensure that we haven't already attempted to // modify the HTLC. if lc.remoteUpdateLog.htlcHasModification(htlcIndex) { - return fmt.Errorf("HTLC with ID %d has already been settled", - htlcIndex) + return ErrHtlcIndexAlreadySettled(htlcIndex) } if htlc.RHash != sha256.Sum256(preimage[:]) { - return fmt.Errorf("Invalid payment preimage %x for hash %x", - preimage[:], htlc.RHash[:]) + return ErrInvalidSettlePreimage{preimage[:], htlc.RHash[:]} } pd := &PaymentDescriptor{ @@ -4620,21 +4633,18 @@ func (lc *LightningChannel) ReceiveHTLCSettle(preimage [32]byte, htlcIndex uint6 htlc := lc.localUpdateLog.lookupHtlc(htlcIndex) if htlc == nil { - return fmt.Errorf("No HTLC with ID %d in channel %v", htlcIndex, - lc.ShortChanID()) + return ErrUnknownHtlcIndex{lc.ShortChanID(), htlcIndex} } // Now that we know the HTLC exists, before checking to see if the // preimage matches, we'll ensure that they haven't already attempted // to modify the HTLC. if lc.localUpdateLog.htlcHasModification(htlcIndex) { - return fmt.Errorf("HTLC with ID %d has already been settled", - htlcIndex) + return ErrHtlcIndexAlreadySettled(htlcIndex) } if htlc.RHash != sha256.Sum256(preimage[:]) { - return fmt.Errorf("Invalid payment preimage %x for hash %x", - preimage[:], htlc.RHash[:]) + return ErrInvalidSettlePreimage{preimage[:], htlc.RHash[:]} } pd := &PaymentDescriptor{ @@ -4688,15 +4698,13 @@ func (lc *LightningChannel) FailHTLC(htlcIndex uint64, reason []byte, htlc := lc.remoteUpdateLog.lookupHtlc(htlcIndex) if htlc == nil { - return fmt.Errorf("No HTLC with ID %d in channel %v", htlcIndex, - lc.ShortChanID()) + return ErrUnknownHtlcIndex{lc.ShortChanID(), htlcIndex} } // Now that we know the HTLC exists, we'll ensure that we haven't // already attempted to fail the HTLC. if lc.remoteUpdateLog.htlcHasModification(htlcIndex) { - return fmt.Errorf("HTLC with ID %d has already been failed", - htlcIndex) + return ErrHtlcIndexAlreadyFailed(htlcIndex) } pd := &PaymentDescriptor{ @@ -4740,15 +4748,13 @@ func (lc *LightningChannel) MalformedFailHTLC(htlcIndex uint64, htlc := lc.remoteUpdateLog.lookupHtlc(htlcIndex) if htlc == nil { - return fmt.Errorf("No HTLC with ID %d in channel %v", htlcIndex, - lc.ShortChanID()) + return ErrUnknownHtlcIndex{lc.ShortChanID(), htlcIndex} } // Now that we know the HTLC exists, we'll ensure that we haven't // already attempted to fail the HTLC. if lc.remoteUpdateLog.htlcHasModification(htlcIndex) { - return fmt.Errorf("HTLC with ID %d has already been failed", - htlcIndex) + return ErrHtlcIndexAlreadyFailed(htlcIndex) } pd := &PaymentDescriptor{ @@ -4785,15 +4791,13 @@ func (lc *LightningChannel) ReceiveFailHTLC(htlcIndex uint64, reason []byte, htlc := lc.localUpdateLog.lookupHtlc(htlcIndex) if htlc == nil { - return fmt.Errorf("No HTLC with ID %d in channel %v", htlcIndex, - lc.ShortChanID()) + return ErrUnknownHtlcIndex{lc.ShortChanID(), htlcIndex} } // Now that we know the HTLC exists, we'll ensure that they haven't // already attempted to fail the HTLC. if lc.localUpdateLog.htlcHasModification(htlcIndex) { - return fmt.Errorf("HTLC with ID %d has already been failed", - htlcIndex) + return ErrHtlcIndexAlreadyFailed(htlcIndex) } pd := &PaymentDescriptor{ diff --git a/lnwallet/errors.go b/lnwallet/errors.go index a1f83edf..f36d0b99 100644 --- a/lnwallet/errors.go +++ b/lnwallet/errors.go @@ -125,3 +125,49 @@ func ErrChanTooSmall(chanSize, minChanSize btcutil.Amount) ReservationError { chanSize, minChanSize), } } + +// ErrHtlcIndexAlreadyFailed is returned when the HTLC index has already been +// failed, but has not been committed by our commitment state. +type ErrHtlcIndexAlreadyFailed uint64 + +// Error returns a message indicating the index that had already been failed. +func (e ErrHtlcIndexAlreadyFailed) Error() string { + return fmt.Sprintf("HTLC with ID %d has already been failed", e) +} + +// ErrHtlcIndexAlreadySettled is returned when the HTLC index has already been +// settled, but has not been committed by our commitment state. +type ErrHtlcIndexAlreadySettled uint64 + +// Error returns a message indicating the index that had already been settled. +func (e ErrHtlcIndexAlreadySettled) Error() string { + return fmt.Sprintf("HTLC with ID %d has already been settled", e) +} + +// ErrInvalidSettlePreimage is returned when trying to settle an HTLC, but the +// preimage does not correspond to the payment hash. +type ErrInvalidSettlePreimage struct { + preimage []byte + rhash []byte +} + +// Error returns an error message with the offending preimage and intended +// payment hash. +func (e ErrInvalidSettlePreimage) Error() string { + return fmt.Sprintf("Invalid payment preimage %x for hash %x", + e.preimage, e.rhash) +} + +// ErrUnknownHtlcIndex is returned when locally settling or failing an HTLC, but +// the HTLC index is not known to the channel. This typically indicates that the +// HTLC was already settled in a prior commitment. +type ErrUnknownHtlcIndex struct { + chanID lnwire.ShortChannelID + index uint64 +} + +// Error returns an error logging the channel and HTLC index that was unknown. +func (e ErrUnknownHtlcIndex) Error() string { + return fmt.Sprintf("No HTLC with ID %d in channel %v", + e.index, e.chanID) +}