htlcswitch/mailbox: fail htlcs when delayed for 1 minute

Now that packet failure is handled by the mailbox, we can now enforce
a delivery deadline and fail the packet if it the deadilne is exceeded.
This gives senders quicker feedback about tried routes, and allows them
to try alternative paths to the destination in the meantime.
This commit is contained in:
Conner Fromknecht 2020-04-14 10:50:07 -07:00
parent 1aa2dde4a4
commit e7ece11c29
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
3 changed files with 145 additions and 14 deletions

@ -509,6 +509,13 @@ func (l *channelLink) Stop() {
close(l.quit) close(l.quit)
l.wg.Wait() l.wg.Wait()
// Now that the htlcManager has completely exited, reset the packet
// courier. This allows the mailbox to revaluate any lingering Adds that
// were delivered but didn't make it on a commitment to be failed back
// if the link is offline for an extended period of time. The error is
// ignored since it can only fail when the daemon is exiting.
_ = l.mailBox.ResetPackets()
// As a final precaution, we will attempt to flush any uncommitted // As a final precaution, we will attempt to flush any uncommitted
// preimages to the preimage cache. The preimages should be re-delivered // preimages to the preimage cache. The preimages should be re-delivered
// after channel reestablishment, however this adds an extra layer of // after channel reestablishment, however this adds an extra layer of

@ -253,9 +253,16 @@ func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool {
// It's possible that the courier has already advanced the // It's possible that the courier has already advanced the
// addHead, so this check prevents the addHead from getting // addHead, so this check prevents the addHead from getting
// desynchronized. // desynchronized.
//
// NOTE: While this event is rare for Settles or Fails, it could
// be very common for Adds since the mailbox has the ability to
// cancel Adds before they are delivered. When that occurs, the
// head of addPkts has only been peeked and we expect to be
// removing the head of the queue.
if entry == m.addHead { if entry == m.addHead {
m.addHead = entry.Next() m.addHead = entry.Next()
} }
m.addPkts.Remove(entry) m.addPkts.Remove(entry)
delete(m.addIndex, inKey) delete(m.addIndex, inKey)
@ -314,6 +321,18 @@ func (m *memoryMailBox) signalUntilShutdown(cType courierType) {
} }
} }
// pktWithExpiry wraps an incoming packet and records the time at which it it
// should be canceled from the mailbox. This will be used to detect if it gets
// stuck in the mailbox and inform when to cancel back.
type pktWithExpiry struct {
pkt *htlcPacket
expiry time.Time
}
func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time {
return clock.TickAfter(p.expiry.Sub(clock.Now()))
}
// mailCourier is a dedicated goroutine whose job is to reliably deliver // mailCourier is a dedicated goroutine whose job is to reliably deliver
// messages of a particular type. There are two types of couriers: wire // messages of a particular type. There are two types of couriers: wire
// couriers, and mail couriers. Depending on the passed courierType, this // couriers, and mail couriers. Depending on the passed courierType, this
@ -364,6 +383,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
m.addHead = m.addPkts.Front() m.addHead = m.addPkts.Front()
close(pktDone) close(pktDone)
case <-m.quit: case <-m.quit:
m.pktCond.L.Unlock() m.pktCond.L.Unlock()
return return
@ -375,7 +395,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
var ( var (
nextPkt *htlcPacket nextPkt *htlcPacket
nextPktEl *list.Element nextPktEl *list.Element
nextAdd *htlcPacket nextAdd *pktWithExpiry
nextAddEl *list.Element nextAddEl *list.Element
nextMsg lnwire.Message nextMsg lnwire.Message
) )
@ -392,13 +412,18 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
// doesn't make it into a commitment, then it'll be // doesn't make it into a commitment, then it'll be
// re-delivered once the link comes back online. // re-delivered once the link comes back online.
case pktCourier: case pktCourier:
// Peek at the next item to deliver, prioritizing // Peek at the head of the Settle/Fails and Add queues.
// Settle/Fail packets over Adds. // We peak both even if there is a Settle/Fail present
// because we need to set a deadline for the next
// pending Add if it's present. Due to clock
// monotonicity, we know that the head of the Adds is
// the next to expire.
if m.pktHead != nil { if m.pktHead != nil {
nextPkt = m.pktHead.Value.(*htlcPacket) nextPkt = m.pktHead.Value.(*htlcPacket)
nextPktEl = m.pktHead nextPktEl = m.pktHead
} else { }
nextAdd = m.addHead.Value.(*htlcPacket) if m.addHead != nil {
nextAdd = m.addHead.Value.(*pktWithExpiry)
nextAddEl = m.addHead nextAddEl = m.addHead
} }
} }
@ -433,6 +458,8 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
var ( var (
pktOutbox chan *htlcPacket pktOutbox chan *htlcPacket
addOutbox chan *htlcPacket addOutbox chan *htlcPacket
add *htlcPacket
deadline <-chan time.Time
) )
// Prioritize delivery of Settle/Fail packets over Adds. // Prioritize delivery of Settle/Fail packets over Adds.
@ -453,6 +480,22 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
addOutbox = m.pktOutbox addOutbox = m.pktOutbox
} }
// If we have a pending Add, we'll also construct the
// deadline so we can fail it back if we are unable to
// deliver any message in time. We also dereference the
// nextAdd's packet, since we will need access to it in
// the case we are delivering it and/or if the deadline
// expires.
//
// NOTE: It's possible after this point for add to be
// nil, but this can only occur when addOutbox is also
// nil, hence we won't accidentally deliver a nil
// packet.
if nextAdd != nil {
add = nextAdd.pkt
deadline = nextAdd.deadline(m.cfg.clock)
}
select { select {
case pktOutbox <- nextPkt: case pktOutbox <- nextPkt:
m.pktCond.L.Lock() m.pktCond.L.Lock()
@ -463,7 +506,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
} }
m.pktCond.L.Unlock() m.pktCond.L.Unlock()
case addOutbox <- nextAdd: case addOutbox <- add:
m.pktCond.L.Lock() m.pktCond.L.Lock()
// Only advance the addHead if this Add is still // Only advance the addHead if this Add is still
// at the head of the queue. // at the head of the queue.
@ -472,6 +515,9 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
} }
m.pktCond.L.Unlock() m.pktCond.L.Unlock()
case <-deadline:
m.FailAdd(add)
case pktDone := <-m.pktReset: case pktDone := <-m.pktReset:
m.pktCond.L.Lock() m.pktCond.L.Lock()
m.pktHead = m.htlcPkts.Front() m.pktHead = m.htlcPkts.Front()
@ -534,7 +580,10 @@ func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error {
return nil return nil
} }
entry := m.addPkts.PushBack(pkt) entry := m.addPkts.PushBack(&pktWithExpiry{
pkt: pkt,
expiry: m.cfg.clock.Now().Add(m.cfg.expiry),
})
m.addIndex[pkt.inKey()] = entry m.addIndex[pkt.inKey()] = entry
if m.addHead == nil { if m.addHead == nil {
m.addHead = entry m.addHead = entry

@ -179,8 +179,8 @@ func TestMailBoxResetAfterShutdown(t *testing.T) {
type mailboxContext struct { type mailboxContext struct {
t *testing.T t *testing.T
clock *clock.TestClock
mailbox MailBox mailbox MailBox
clock *clock.TestClock
forwards chan *htlcPacket forwards chan *htlcPacket
} }
@ -294,7 +294,15 @@ func (c *mailboxContext) checkFails(adds []*htlcPacket) {
// TestMailBoxFailAdd asserts that FailAdd returns a response to the switch // TestMailBoxFailAdd asserts that FailAdd returns a response to the switch
// under various interleavings with other operations on the mailbox. // under various interleavings with other operations on the mailbox.
func TestMailBoxFailAdd(t *testing.T) { func TestMailBoxFailAdd(t *testing.T) {
ctx := newMailboxContext(t, time.Now(), time.Minute) var (
batchDelay = time.Second
expiry = time.Minute
firstBatchStart = time.Now()
secondBatchStart = time.Now().Add(batchDelay)
thirdBatchStart = time.Now().Add(2 * batchDelay)
thirdBatchExpiry = thirdBatchStart.Add(expiry)
)
ctx := newMailboxContext(t, firstBatchStart, expiry)
defer ctx.mailbox.Stop() defer ctx.mailbox.Stop()
failAdds := func(adds []*htlcPacket) { failAdds := func(adds []*htlcPacket) {
@ -306,19 +314,54 @@ func TestMailBoxFailAdd(t *testing.T) {
const numBatchPackets = 5 const numBatchPackets = 5
// Send 10 adds, and pull them from the mailbox. // Send 10 adds, and pull them from the mailbox.
adds := ctx.sendAdds(0, numBatchPackets) firstBatch := ctx.sendAdds(0, numBatchPackets)
ctx.receivePkts(adds) ctx.receivePkts(firstBatch)
// Fail all of these adds, simulating an error adding the HTLCs to the // Fail all of these adds, simulating an error adding the HTLCs to the
// commitment. We should see a failure message for each. // commitment. We should see a failure message for each.
go failAdds(adds) go failAdds(firstBatch)
ctx.checkFails(adds) ctx.checkFails(firstBatch)
// As a sanity check, Fail all of them again and assert that no // As a sanity check, Fail all of them again and assert that no
// duplicate fails are sent. // duplicate fails are sent.
go failAdds(adds) go failAdds(firstBatch)
ctx.checkFails(nil) ctx.checkFails(nil)
// Now, send a second batch of adds after a short delay and deliver them
// to the link.
ctx.clock.SetTime(secondBatchStart)
secondBatch := ctx.sendAdds(numBatchPackets, numBatchPackets)
ctx.receivePkts(secondBatch)
// Reset the packet queue w/o changing the current time. This simulates
// the link flapping and coming back up before the second batch's
// expiries have elapsed. We should see no failures sent back.
err := ctx.mailbox.ResetPackets()
if err != nil {
t.Fatalf("unable to reset packets: %v", err)
}
ctx.checkFails(nil)
// Redeliver the second batch to the link and hold them there.
ctx.receivePkts(secondBatch)
// Send a third batch of adds shortly after the second batch.
ctx.clock.SetTime(thirdBatchStart)
thirdBatch := ctx.sendAdds(2*numBatchPackets, numBatchPackets)
// Advance the clock so that the third batch expires. We expect to only
// see fails for the third batch, since the second batch is still being
// held by the link.
ctx.clock.SetTime(thirdBatchExpiry)
ctx.checkFails(thirdBatch)
// Finally, reset the link which should cause the second batch to be
// cancelled immediately.
err = ctx.mailbox.ResetPackets()
if err != nil {
t.Fatalf("unable to reset packets: %v", err)
}
ctx.checkFails(secondBatch)
} }
// TestMailBoxPacketPrioritization asserts that the mailbox will prioritize // TestMailBoxPacketPrioritization asserts that the mailbox will prioritize
@ -420,6 +463,38 @@ func TestMailBoxPacketPrioritization(t *testing.T) {
} }
} }
// TestMailBoxAddExpiry asserts that the mailbox will cancel back Adds that have
// reached their expiry time.
func TestMailBoxAddExpiry(t *testing.T) {
var (
expiry = time.Minute
batchDelay = time.Second
firstBatchStart = time.Now()
firstBatchExpiry = firstBatchStart.Add(expiry)
secondBatchStart = firstBatchStart.Add(batchDelay)
secondBatchExpiry = secondBatchStart.Add(expiry)
)
ctx := newMailboxContext(t, firstBatchStart, expiry)
defer ctx.mailbox.Stop()
// Each batch will consist of 10 messages.
const numBatchPackets = 10
firstBatch := ctx.sendAdds(0, numBatchPackets)
ctx.clock.SetTime(secondBatchStart)
ctx.checkFails(nil)
secondBatch := ctx.sendAdds(numBatchPackets, numBatchPackets)
ctx.clock.SetTime(firstBatchExpiry)
ctx.checkFails(firstBatch)
ctx.clock.SetTime(secondBatchExpiry)
ctx.checkFails(secondBatch)
}
// TestMailOrchestrator asserts that the orchestrator properly buffers packets // TestMailOrchestrator asserts that the orchestrator properly buffers packets
// for channels that haven't been made live, such that they are delivered // for channels that haven't been made live, such that they are delivered
// immediately after BindLiveShortChanID. It also tests that packets are delivered // immediately after BindLiveShortChanID. It also tests that packets are delivered