diff --git a/htlcswitch/link.go b/htlcswitch/link.go index afdd50cc..27102d55 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -509,6 +509,13 @@ func (l *channelLink) Stop() { close(l.quit) l.wg.Wait() + // Now that the htlcManager has completely exited, reset the packet + // courier. This allows the mailbox to revaluate any lingering Adds that + // were delivered but didn't make it on a commitment to be failed back + // if the link is offline for an extended period of time. The error is + // ignored since it can only fail when the daemon is exiting. + _ = l.mailBox.ResetPackets() + // As a final precaution, we will attempt to flush any uncommitted // preimages to the preimage cache. The preimages should be re-delivered // after channel reestablishment, however this adds an extra layer of diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index 4eff7026..dd5b4a64 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -253,9 +253,16 @@ func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool { // It's possible that the courier has already advanced the // addHead, so this check prevents the addHead from getting // desynchronized. + // + // NOTE: While this event is rare for Settles or Fails, it could + // be very common for Adds since the mailbox has the ability to + // cancel Adds before they are delivered. When that occurs, the + // head of addPkts has only been peeked and we expect to be + // removing the head of the queue. if entry == m.addHead { m.addHead = entry.Next() } + m.addPkts.Remove(entry) delete(m.addIndex, inKey) @@ -314,6 +321,18 @@ func (m *memoryMailBox) signalUntilShutdown(cType courierType) { } } +// pktWithExpiry wraps an incoming packet and records the time at which it it +// should be canceled from the mailbox. This will be used to detect if it gets +// stuck in the mailbox and inform when to cancel back. +type pktWithExpiry struct { + pkt *htlcPacket + expiry time.Time +} + +func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time { + return clock.TickAfter(p.expiry.Sub(clock.Now())) +} + // mailCourier is a dedicated goroutine whose job is to reliably deliver // messages of a particular type. There are two types of couriers: wire // couriers, and mail couriers. Depending on the passed courierType, this @@ -364,6 +383,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { m.addHead = m.addPkts.Front() close(pktDone) + case <-m.quit: m.pktCond.L.Unlock() return @@ -375,7 +395,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { var ( nextPkt *htlcPacket nextPktEl *list.Element - nextAdd *htlcPacket + nextAdd *pktWithExpiry nextAddEl *list.Element nextMsg lnwire.Message ) @@ -392,13 +412,18 @@ func (m *memoryMailBox) mailCourier(cType courierType) { // doesn't make it into a commitment, then it'll be // re-delivered once the link comes back online. case pktCourier: - // Peek at the next item to deliver, prioritizing - // Settle/Fail packets over Adds. + // Peek at the head of the Settle/Fails and Add queues. + // We peak both even if there is a Settle/Fail present + // because we need to set a deadline for the next + // pending Add if it's present. Due to clock + // monotonicity, we know that the head of the Adds is + // the next to expire. if m.pktHead != nil { nextPkt = m.pktHead.Value.(*htlcPacket) nextPktEl = m.pktHead - } else { - nextAdd = m.addHead.Value.(*htlcPacket) + } + if m.addHead != nil { + nextAdd = m.addHead.Value.(*pktWithExpiry) nextAddEl = m.addHead } } @@ -433,6 +458,8 @@ func (m *memoryMailBox) mailCourier(cType courierType) { var ( pktOutbox chan *htlcPacket addOutbox chan *htlcPacket + add *htlcPacket + deadline <-chan time.Time ) // Prioritize delivery of Settle/Fail packets over Adds. @@ -453,6 +480,22 @@ func (m *memoryMailBox) mailCourier(cType courierType) { addOutbox = m.pktOutbox } + // If we have a pending Add, we'll also construct the + // deadline so we can fail it back if we are unable to + // deliver any message in time. We also dereference the + // nextAdd's packet, since we will need access to it in + // the case we are delivering it and/or if the deadline + // expires. + // + // NOTE: It's possible after this point for add to be + // nil, but this can only occur when addOutbox is also + // nil, hence we won't accidentally deliver a nil + // packet. + if nextAdd != nil { + add = nextAdd.pkt + deadline = nextAdd.deadline(m.cfg.clock) + } + select { case pktOutbox <- nextPkt: m.pktCond.L.Lock() @@ -463,7 +506,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { } m.pktCond.L.Unlock() - case addOutbox <- nextAdd: + case addOutbox <- add: m.pktCond.L.Lock() // Only advance the addHead if this Add is still // at the head of the queue. @@ -472,6 +515,9 @@ func (m *memoryMailBox) mailCourier(cType courierType) { } m.pktCond.L.Unlock() + case <-deadline: + m.FailAdd(add) + case pktDone := <-m.pktReset: m.pktCond.L.Lock() m.pktHead = m.htlcPkts.Front() @@ -534,7 +580,10 @@ func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error { return nil } - entry := m.addPkts.PushBack(pkt) + entry := m.addPkts.PushBack(&pktWithExpiry{ + pkt: pkt, + expiry: m.cfg.clock.Now().Add(m.cfg.expiry), + }) m.addIndex[pkt.inKey()] = entry if m.addHead == nil { m.addHead = entry diff --git a/htlcswitch/mailbox_test.go b/htlcswitch/mailbox_test.go index 6a7cf026..999468dc 100644 --- a/htlcswitch/mailbox_test.go +++ b/htlcswitch/mailbox_test.go @@ -179,8 +179,8 @@ func TestMailBoxResetAfterShutdown(t *testing.T) { type mailboxContext struct { t *testing.T - clock *clock.TestClock mailbox MailBox + clock *clock.TestClock forwards chan *htlcPacket } @@ -294,7 +294,15 @@ func (c *mailboxContext) checkFails(adds []*htlcPacket) { // TestMailBoxFailAdd asserts that FailAdd returns a response to the switch // under various interleavings with other operations on the mailbox. func TestMailBoxFailAdd(t *testing.T) { - ctx := newMailboxContext(t, time.Now(), time.Minute) + var ( + batchDelay = time.Second + expiry = time.Minute + firstBatchStart = time.Now() + secondBatchStart = time.Now().Add(batchDelay) + thirdBatchStart = time.Now().Add(2 * batchDelay) + thirdBatchExpiry = thirdBatchStart.Add(expiry) + ) + ctx := newMailboxContext(t, firstBatchStart, expiry) defer ctx.mailbox.Stop() failAdds := func(adds []*htlcPacket) { @@ -306,19 +314,54 @@ func TestMailBoxFailAdd(t *testing.T) { const numBatchPackets = 5 // Send 10 adds, and pull them from the mailbox. - adds := ctx.sendAdds(0, numBatchPackets) - ctx.receivePkts(adds) + firstBatch := ctx.sendAdds(0, numBatchPackets) + ctx.receivePkts(firstBatch) // Fail all of these adds, simulating an error adding the HTLCs to the // commitment. We should see a failure message for each. - go failAdds(adds) - ctx.checkFails(adds) + go failAdds(firstBatch) + ctx.checkFails(firstBatch) // As a sanity check, Fail all of them again and assert that no // duplicate fails are sent. - go failAdds(adds) + go failAdds(firstBatch) ctx.checkFails(nil) + // Now, send a second batch of adds after a short delay and deliver them + // to the link. + ctx.clock.SetTime(secondBatchStart) + secondBatch := ctx.sendAdds(numBatchPackets, numBatchPackets) + ctx.receivePkts(secondBatch) + + // Reset the packet queue w/o changing the current time. This simulates + // the link flapping and coming back up before the second batch's + // expiries have elapsed. We should see no failures sent back. + err := ctx.mailbox.ResetPackets() + if err != nil { + t.Fatalf("unable to reset packets: %v", err) + } + ctx.checkFails(nil) + + // Redeliver the second batch to the link and hold them there. + ctx.receivePkts(secondBatch) + + // Send a third batch of adds shortly after the second batch. + ctx.clock.SetTime(thirdBatchStart) + thirdBatch := ctx.sendAdds(2*numBatchPackets, numBatchPackets) + + // Advance the clock so that the third batch expires. We expect to only + // see fails for the third batch, since the second batch is still being + // held by the link. + ctx.clock.SetTime(thirdBatchExpiry) + ctx.checkFails(thirdBatch) + + // Finally, reset the link which should cause the second batch to be + // cancelled immediately. + err = ctx.mailbox.ResetPackets() + if err != nil { + t.Fatalf("unable to reset packets: %v", err) + } + ctx.checkFails(secondBatch) } // TestMailBoxPacketPrioritization asserts that the mailbox will prioritize @@ -420,6 +463,38 @@ func TestMailBoxPacketPrioritization(t *testing.T) { } } +// TestMailBoxAddExpiry asserts that the mailbox will cancel back Adds that have +// reached their expiry time. +func TestMailBoxAddExpiry(t *testing.T) { + var ( + expiry = time.Minute + batchDelay = time.Second + firstBatchStart = time.Now() + firstBatchExpiry = firstBatchStart.Add(expiry) + secondBatchStart = firstBatchStart.Add(batchDelay) + secondBatchExpiry = secondBatchStart.Add(expiry) + ) + + ctx := newMailboxContext(t, firstBatchStart, expiry) + defer ctx.mailbox.Stop() + + // Each batch will consist of 10 messages. + const numBatchPackets = 10 + + firstBatch := ctx.sendAdds(0, numBatchPackets) + + ctx.clock.SetTime(secondBatchStart) + ctx.checkFails(nil) + + secondBatch := ctx.sendAdds(numBatchPackets, numBatchPackets) + + ctx.clock.SetTime(firstBatchExpiry) + ctx.checkFails(firstBatch) + + ctx.clock.SetTime(secondBatchExpiry) + ctx.checkFails(secondBatch) +} + // TestMailOrchestrator asserts that the orchestrator properly buffers packets // for channels that haven't been made live, such that they are delivered // immediately after BindLiveShortChanID. It also tests that packets are delivered