Merge pull request #4174 from cfromknecht/mailbox-cancel

htlcswitch: mailbox cancellation
This commit is contained in:
Olaoluwa Osuntokun 2020-04-14 20:04:57 -07:00 committed by GitHub
commit 5955f83504
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 981 additions and 176 deletions

@ -223,6 +223,11 @@ type ChannelLinkConfig struct {
// syncing. // syncing.
FwdPkgGCTicker ticker.Ticker FwdPkgGCTicker ticker.Ticker
// PendingCommitTicker is a ticker that allows the link to determine if
// a locally initiated commitment dance gets stuck waiting for the
// remote party to revoke.
PendingCommitTicker ticker.Ticker
// BatchSize is the max size of a batch of updates done to the link // BatchSize is the max size of a batch of updates done to the link
// before we do a state update. // before we do a state update.
BatchSize uint32 BatchSize uint32
@ -509,6 +514,13 @@ func (l *channelLink) Stop() {
close(l.quit) close(l.quit)
l.wg.Wait() l.wg.Wait()
// Now that the htlcManager has completely exited, reset the packet
// courier. This allows the mailbox to revaluate any lingering Adds that
// were delivered but didn't make it on a commitment to be failed back
// if the link is offline for an extended period of time. The error is
// ignored since it can only fail when the daemon is exiting.
_ = l.mailBox.ResetPackets()
// As a final precaution, we will attempt to flush any uncommitted // As a final precaution, we will attempt to flush any uncommitted
// preimages to the preimage cache. The preimages should be re-delivered // preimages to the preimage cache. The preimages should be re-delivered
// after channel reestablishment, however this adds an extra layer of // after channel reestablishment, however this adds an extra layer of
@ -1003,13 +1015,12 @@ func (l *channelLink) htlcManager() {
go l.fwdPkgGarbager() go l.fwdPkgGarbager()
} }
out:
for { for {
// We must always check if we failed at some point processing // We must always check if we failed at some point processing
// the last update before processing the next. // the last update before processing the next.
if l.failed { if l.failed {
l.log.Errorf("link failed, exiting htlcManager") l.log.Errorf("link failed, exiting htlcManager")
break out return
} }
// If the previous event resulted in a non-empty batch, resume // If the previous event resulted in a non-empty batch, resume
@ -1079,7 +1090,7 @@ out:
l.cfg.Peer.WipeChannel(chanPoint) l.cfg.Peer.WipeChannel(chanPoint)
}() }()
break out return
case <-l.cfg.BatchTicker.Ticks(): case <-l.cfg.BatchTicker.Ticks():
// Attempt to extend the remote commitment chain // Attempt to extend the remote commitment chain
@ -1089,9 +1100,14 @@ out:
if err := l.updateCommitTx(); err != nil { if err := l.updateCommitTx(); err != nil {
l.fail(LinkFailureError{code: ErrInternalError}, l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err) "unable to update commitment: %v", err)
break out return
} }
case <-l.cfg.PendingCommitTicker.Ticks():
l.fail(LinkFailureError{code: ErrRemoteUnresponsive},
"unable to complete dance")
return
// A message from the switch was just received. This indicates // A message from the switch was just received. This indicates
// that the link is an intermediate hop in a multi-hop HTLC // that the link is an intermediate hop in a multi-hop HTLC
// circuit. // circuit.
@ -1114,11 +1130,11 @@ out:
fmt.Sprintf("process hodl queue: %v", fmt.Sprintf("process hodl queue: %v",
err.Error()), err.Error()),
) )
break out return
} }
case <-l.quit: case <-l.quit:
break out return
} }
} }
} }
@ -1179,7 +1195,7 @@ func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
// Settle htlcs that returned a settle resolution using the preimage // Settle htlcs that returned a settle resolution using the preimage
// in the resolution. // in the resolution.
case *invoices.HtlcSettleResolution: case *invoices.HtlcSettleResolution:
l.log.Debugf("received settle resolution for %v"+ l.log.Debugf("received settle resolution for %v "+
"with outcome: %v", circuitKey, res.Outcome) "with outcome: %v", circuitKey, res.Outcome)
return l.settleHTLC(res.Preimage, htlc.pd) return l.settleHTLC(res.Preimage, htlc.pd)
@ -1272,72 +1288,6 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
l.log.Warnf("Unable to handle downstream add HTLC: %v", l.log.Warnf("Unable to handle downstream add HTLC: %v",
err) err)
var (
localFailure = false
reason lnwire.OpaqueReason
)
// Create a temporary channel failure which we will send
// back to our peer if this is a forward, or report to
// the user if the failed payment was locally initiated.
failure := l.createFailureWithUpdate(
func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
return lnwire.NewTemporaryChannelFailure(
upd,
)
},
)
// If the payment was locally initiated (which is
// indicated by a nil obfuscator), we do not need to
// encrypt it back to the sender.
if pkt.obfuscator == nil {
var b bytes.Buffer
err := lnwire.EncodeFailure(&b, failure, 0)
if err != nil {
l.log.Errorf("unable to encode "+
"failure: %v", err)
l.mailBox.AckPacket(pkt.inKey())
return
}
reason = lnwire.OpaqueReason(b.Bytes())
localFailure = true
} else {
// If the packet is part of a forward,
// (identified by a non-nil obfuscator) we need
// to encrypt the error back to the source.
var err error
reason, err = pkt.obfuscator.EncryptFirstHop(failure)
if err != nil {
l.log.Errorf("unable to "+
"obfuscate error: %v", err)
l.mailBox.AckPacket(pkt.inKey())
return
}
}
// Create a link error containing the temporary channel
// failure and a detail which indicates the we failed to
// add the htlc.
linkError := NewDetailedLinkError(
failure, OutgoingFailureDownstreamHtlcAdd,
)
failPkt := &htlcPacket{
incomingChanID: pkt.incomingChanID,
incomingHTLCID: pkt.incomingHTLCID,
circuit: pkt.circuit,
sourceRef: pkt.sourceRef,
hasSource: true,
localFailure: localFailure,
linkFailure: linkError,
htlc: &lnwire.UpdateFailHTLC{
Reason: reason,
},
}
go l.forwardBatch(failPkt)
// Remove this packet from the link's mailbox, this // Remove this packet from the link's mailbox, this
// prevents it from being reprocessed if the link // prevents it from being reprocessed if the link
// restarts and resets it mailbox. If this response // restarts and resets it mailbox. If this response
@ -1346,7 +1296,7 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
// the switch, since the circuit was never fully opened, // the switch, since the circuit was never fully opened,
// and the forwarding package shows it as // and the forwarding package shows it as
// unacknowledged. // unacknowledged.
l.mailBox.AckPacket(pkt.inKey()) l.mailBox.FailAdd(pkt)
return return
} }
@ -1994,6 +1944,8 @@ func (l *channelLink) updateCommitTx() error {
theirCommitSig, htlcSigs, pendingHTLCs, err := l.channel.SignNextCommitment() theirCommitSig, htlcSigs, pendingHTLCs, err := l.channel.SignNextCommitment()
if err == lnwallet.ErrNoWindow { if err == lnwallet.ErrNoWindow {
l.cfg.PendingCommitTicker.Resume()
l.log.Tracef("revocation window exhausted, unable to send: "+ l.log.Tracef("revocation window exhausted, unable to send: "+
"%v, pend_updates=%v, dangling_closes%v", "%v, pend_updates=%v, dangling_closes%v",
l.channel.PendingLocalUpdateCount(), l.channel.PendingLocalUpdateCount(),
@ -2013,6 +1965,8 @@ func (l *channelLink) updateCommitTx() error {
return err return err
} }
l.cfg.PendingCommitTicker.Pause()
// The remote party now has a new pending commitment, so we'll update // The remote party now has a new pending commitment, so we'll update
// the contract court to be aware of this new set (the prior old remote // the contract court to be aware of this new set (the prior old remote
// pending). // pending).
@ -2948,27 +2902,7 @@ func (l *channelLink) forwardBatch(packets ...*htlcPacket) {
} }
errChan := l.cfg.ForwardPackets(l.quit, filteredPkts...) errChan := l.cfg.ForwardPackets(l.quit, filteredPkts...)
go l.handleBatchFwdErrs(errChan) go handleBatchFwdErrs(errChan, l.log)
}
// handleBatchFwdErrs waits on the given errChan until it is closed, logging
// the errors returned from any unsuccessful forwarding attempts.
func (l *channelLink) handleBatchFwdErrs(errChan chan error) {
for {
err, ok := <-errChan
if !ok {
// Err chan has been drained or switch is shutting
// down. Either way, return.
return
}
if err == nil {
continue
}
l.log.Errorf("unhandled error while forwarding htlc packet over "+
"htlcswitch: %v", err)
}
} }
// sendHTLCError functions cancels HTLC and send cancel message back to the // sendHTLCError functions cancels HTLC and send cancel message back to the

@ -1699,10 +1699,11 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
UpdateContractSignals: func(*contractcourt.ContractSignals) error { UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil return nil
}, },
Registry: invoiceRegistry, Registry: invoiceRegistry,
ChainEvents: &contractcourt.ChainEventSubscription{}, ChainEvents: &contractcourt.ChainEventSubscription{},
BatchTicker: bticker, BatchTicker: bticker,
FwdPkgGCTicker: ticker.NewForce(15 * time.Second), FwdPkgGCTicker: ticker.NewForce(15 * time.Second),
PendingCommitTicker: ticker.New(time.Minute),
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
// to not trigger commit updates automatically during tests. // to not trigger commit updates automatically during tests.
BatchSize: 10000, BatchSize: 10000,
@ -4203,10 +4204,11 @@ func (h *persistentLinkHarness) restartLink(
UpdateContractSignals: func(*contractcourt.ContractSignals) error { UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil return nil
}, },
Registry: h.coreLink.cfg.Registry, Registry: h.coreLink.cfg.Registry,
ChainEvents: &contractcourt.ChainEventSubscription{}, ChainEvents: &contractcourt.ChainEventSubscription{},
BatchTicker: bticker, BatchTicker: bticker,
FwdPkgGCTicker: ticker.New(5 * time.Second), FwdPkgGCTicker: ticker.New(5 * time.Second),
PendingCommitTicker: ticker.New(time.Minute),
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
// to not trigger commit updates automatically during tests. // to not trigger commit updates automatically during tests.
BatchSize: 10000, BatchSize: 10000,
@ -6134,6 +6136,91 @@ func TestChannelLinkReceiveEmptySig(t *testing.T) {
aliceLink.Stop() aliceLink.Stop()
} }
// TestPendingCommitTicker tests that a link will fail itself after a timeout if
// the commitment dance stalls out.
func TestPendingCommitTicker(t *testing.T) {
t.Parallel()
const chanAmt = btcutil.SatoshiPerBitcoin * 5
const chanReserve = btcutil.SatoshiPerBitcoin * 1
aliceLink, bobChannel, batchTicker, start, cleanUp, _, err :=
newSingleLinkTestHarness(chanAmt, chanReserve)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
var (
coreLink = aliceLink.(*channelLink)
aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs
)
coreLink.cfg.PendingCommitTicker = ticker.NewForce(time.Millisecond)
linkErrs := make(chan LinkFailureError)
coreLink.cfg.OnChannelFailure = func(_ lnwire.ChannelID,
_ lnwire.ShortChannelID, linkErr LinkFailureError) {
linkErrs <- linkErr
}
if err := start(); err != nil {
t.Fatalf("unable to start test harness: %v", err)
}
defer cleanUp()
ctx := linkTestContext{
t: t,
aliceLink: aliceLink,
bobChannel: bobChannel,
aliceMsgs: aliceMsgs,
}
// Send an HTLC from Alice to Bob, and signal the batch ticker to signa
// a commitment.
htlc, _ := generateHtlcAndInvoice(t, 0)
ctx.sendHtlcAliceToBob(0, htlc)
ctx.receiveHtlcAliceToBob()
batchTicker <- time.Now()
select {
case msg := <-aliceMsgs:
if _, ok := msg.(*lnwire.CommitSig); !ok {
t.Fatalf("expected CommitSig, got: %T", msg)
}
case <-time.After(time.Second):
t.Fatalf("alice did not send commit sig")
}
// Check that Alice hasn't failed.
select {
case linkErr := <-linkErrs:
t.Fatalf("link failed unexpectedly: %v", linkErr)
case <-time.After(50 * time.Millisecond):
}
// Without completing the dance, send another HTLC from Alice to Bob.
// Since the revocation window has been exhausted, we should see the
// link fail itself immediately due to the low pending commit timeout.
// In production this would be much longer, e.g. a minute.
htlc, _ = generateHtlcAndInvoice(t, 1)
ctx.sendHtlcAliceToBob(1, htlc)
ctx.receiveHtlcAliceToBob()
batchTicker <- time.Now()
// Assert that we get the expected link failure from Alice.
select {
case linkErr := <-linkErrs:
if linkErr.code != ErrRemoteUnresponsive {
t.Fatalf("error code mismatch, "+
"want: ErrRemoteUnresponsive, got: %v",
linkErr.code)
}
case <-time.After(time.Second):
t.Fatalf("did not receive failure")
}
}
// assertFailureCode asserts that an error is of type ClearTextError and that // assertFailureCode asserts that an error is of type ClearTextError and that
// the failure code is as expected. // the failure code is as expected.
func assertFailureCode(t *testing.T, err error, code lnwire.FailCode) { func assertFailureCode(t *testing.T, err error, code lnwire.FailCode) {

@ -20,6 +20,10 @@ const (
// to fail the link. // to fail the link.
ErrRemoteError ErrRemoteError
// ErrRemoteUnresponsive indicates that our peer took too long to
// complete a commitment dance.
ErrRemoteUnresponsive
// ErrSyncError indicates that we failed synchronizing the state of the // ErrSyncError indicates that we failed synchronizing the state of the
// channel with our peer. // channel with our peer.
ErrSyncError ErrSyncError
@ -71,6 +75,8 @@ func (e LinkFailureError) Error() string {
return "internal error" return "internal error"
case ErrRemoteError: case ErrRemoteError:
return "remote error" return "remote error"
case ErrRemoteUnresponsive:
return "remote unresponsive"
case ErrSyncError: case ErrSyncError:
return "sync error" return "sync error"
case ErrInvalidUpdate: case ErrInvalidUpdate:
@ -90,13 +96,23 @@ func (e LinkFailureError) Error() string {
// the link fails with this LinkFailureError. // the link fails with this LinkFailureError.
func (e LinkFailureError) ShouldSendToPeer() bool { func (e LinkFailureError) ShouldSendToPeer() bool {
switch e.code { switch e.code {
// If the failure is a result of the peer sending us an error, we don't
// have to respond with one.
case ErrRemoteError:
return false
// In all other cases we will attempt to send our peer an error message. // Since sending an error can lead some nodes to force close the
default: // channel, create a whitelist of the failures we want to send so that
// newly added error codes aren't automatically sent to the remote peer.
case
ErrInternalError,
ErrRemoteError,
ErrSyncError,
ErrInvalidUpdate,
ErrInvalidCommitment,
ErrInvalidRevocation,
ErrRecoveryError:
return true return true
// In all other cases we will not attempt to send our peer an error.
default:
return false
} }
} }

@ -1,17 +1,26 @@
package htlcswitch package htlcswitch
import ( import (
"bytes"
"container/list" "container/list"
"errors" "errors"
"fmt"
"sync" "sync"
"time" "time"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
// ErrMailBoxShuttingDown is returned when the mailbox is interrupted by a var (
// shutdown request. // ErrMailBoxShuttingDown is returned when the mailbox is interrupted by
var ErrMailBoxShuttingDown = errors.New("mailbox is shutting down") // a shutdown request.
ErrMailBoxShuttingDown = errors.New("mailbox is shutting down")
// ErrPacketAlreadyExists signals that an attempt to add a packet failed
// because it already exists in the mailbox.
ErrPacketAlreadyExists = errors.New("mailbox already has packet")
)
// MailBox is an interface which represents a concurrent-safe, in-order // MailBox is an interface which represents a concurrent-safe, in-order
// delivery queue for messages from the network and also from the main switch. // delivery queue for messages from the network and also from the main switch.
@ -31,8 +40,17 @@ type MailBox interface {
// AckPacket removes a packet from the mailboxes in-memory replay // AckPacket removes a packet from the mailboxes in-memory replay
// buffer. This will prevent a packet from being delivered after a link // buffer. This will prevent a packet from being delivered after a link
// restarts if the switch has remained online. // restarts if the switch has remained online. The returned boolean
AckPacket(CircuitKey) // indicates whether or not a packet with the passed incoming circuit
// key was removed.
AckPacket(CircuitKey) bool
// FailAdd fails an UpdateAddHTLC that exists within the mailbox,
// removing it from the in-memory replay buffer. This will prevent the
// packet from being delivered after the link restarts if the switch has
// remained online. The generated LinkError will show an
// OutgoingFailureDownstreamHtlcAdd FailureDetail.
FailAdd(pkt *htlcPacket)
// MessageOutBox returns a channel that any new messages ready for // MessageOutBox returns a channel that any new messages ready for
// delivery will be sent on. // delivery will be sent on.
@ -56,12 +74,37 @@ type MailBox interface {
Stop() Stop()
} }
type mailBoxConfig struct {
// shortChanID is the short channel id of the channel this mailbox
// belongs to.
shortChanID lnwire.ShortChannelID
// fetchUpdate retreives the most recent channel update for the channel
// this mailbox belongs to.
fetchUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error)
// forwardPackets send a varidic number of htlcPackets to the switch to
// be routed. A quit channel should be provided so that the call can
// properly exit during shutdown.
forwardPackets func(chan struct{}, ...*htlcPacket) chan error
// clock is a time source for the mailbox.
clock clock.Clock
// expiry is the interval after which Adds will be cancelled if they
// have not been yet been delivered. The computed deadline will expiry
// this long after the Adds are added via AddPacket.
expiry time.Duration
}
// memoryMailBox is an implementation of the MailBox struct backed by purely // memoryMailBox is an implementation of the MailBox struct backed by purely
// in-memory queues. // in-memory queues.
type memoryMailBox struct { type memoryMailBox struct {
started sync.Once started sync.Once
stopped sync.Once stopped sync.Once
cfg *mailBoxConfig
wireMessages *list.List wireMessages *list.List
wireMtx sync.Mutex wireMtx sync.Mutex
wireCond *sync.Cond wireCond *sync.Cond
@ -69,29 +112,42 @@ type memoryMailBox struct {
messageOutbox chan lnwire.Message messageOutbox chan lnwire.Message
msgReset chan chan struct{} msgReset chan chan struct{}
htlcPkts *list.List // repPkts is a queue for reply packets, e.g. Settles and Fails.
pktIndex map[CircuitKey]*list.Element repPkts *list.List
pktHead *list.Element repIndex map[CircuitKey]*list.Element
pktMtx sync.Mutex repHead *list.Element
pktCond *sync.Cond
// addPkts is a dedicated queue for Adds.
addPkts *list.List
addIndex map[CircuitKey]*list.Element
addHead *list.Element
pktMtx sync.Mutex
pktCond *sync.Cond
pktOutbox chan *htlcPacket pktOutbox chan *htlcPacket
pktReset chan chan struct{} pktReset chan chan struct{}
wg sync.WaitGroup wireShutdown chan struct{}
quit chan struct{} pktShutdown chan struct{}
quit chan struct{}
} }
// newMemoryMailBox creates a new instance of the memoryMailBox. // newMemoryMailBox creates a new instance of the memoryMailBox.
func newMemoryMailBox() *memoryMailBox { func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox {
box := &memoryMailBox{ box := &memoryMailBox{
cfg: cfg,
wireMessages: list.New(), wireMessages: list.New(),
htlcPkts: list.New(), repPkts: list.New(),
addPkts: list.New(),
messageOutbox: make(chan lnwire.Message), messageOutbox: make(chan lnwire.Message),
pktOutbox: make(chan *htlcPacket), pktOutbox: make(chan *htlcPacket),
msgReset: make(chan chan struct{}, 1), msgReset: make(chan chan struct{}, 1),
pktReset: make(chan chan struct{}, 1), pktReset: make(chan chan struct{}, 1),
pktIndex: make(map[CircuitKey]*list.Element), repIndex: make(map[CircuitKey]*list.Element),
addIndex: make(map[CircuitKey]*list.Element),
wireShutdown: make(chan struct{}),
pktShutdown: make(chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
box.wireCond = sync.NewCond(&box.wireMtx) box.wireCond = sync.NewCond(&box.wireMtx)
@ -122,7 +178,6 @@ const (
// NOTE: This method is part of the MailBox interface. // NOTE: This method is part of the MailBox interface.
func (m *memoryMailBox) Start() { func (m *memoryMailBox) Start() {
m.started.Do(func() { m.started.Do(func() {
m.wg.Add(2)
go m.mailCourier(wireCourier) go m.mailCourier(wireCourier)
go m.mailCourier(pktCourier) go m.mailCourier(pktCourier)
}) })
@ -157,6 +212,7 @@ func (m *memoryMailBox) signalUntilReset(cType courierType,
done chan struct{}) error { done chan struct{}) error {
for { for {
switch cType { switch cType {
case wireCourier: case wireCourier:
m.wireCond.Signal() m.wireCond.Signal()
@ -176,27 +232,59 @@ func (m *memoryMailBox) signalUntilReset(cType courierType,
} }
// AckPacket removes the packet identified by it's incoming circuit key from the // AckPacket removes the packet identified by it's incoming circuit key from the
// queue of packets to be delivered. // queue of packets to be delivered. The returned boolean indicates whether or
// not a packet with the passed incoming circuit key was removed.
// //
// NOTE: It is safe to call this method multiple times for the same circuit key. // NOTE: It is safe to call this method multiple times for the same circuit key.
func (m *memoryMailBox) AckPacket(inKey CircuitKey) { func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool {
m.pktCond.L.Lock() m.pktCond.L.Lock()
entry, ok := m.pktIndex[inKey] defer m.pktCond.L.Unlock()
if !ok {
m.pktCond.L.Unlock() if entry, ok := m.repIndex[inKey]; ok {
return // Check whether we are removing the head of the queue. If so,
// we must advance the head to the next packet before removing.
// It's possible that the courier has already advanced the
// repHead, so this check prevents the repHead from getting
// desynchronized.
if entry == m.repHead {
m.repHead = entry.Next()
}
m.repPkts.Remove(entry)
delete(m.repIndex, inKey)
return true
} }
m.htlcPkts.Remove(entry) if entry, ok := m.addIndex[inKey]; ok {
delete(m.pktIndex, inKey) // Check whether we are removing the head of the queue. If so,
m.pktCond.L.Unlock() // we must advance the head to the next add before removing.
// It's possible that the courier has already advanced the
// addHead, so this check prevents the addHead from getting
// desynchronized.
//
// NOTE: While this event is rare for Settles or Fails, it could
// be very common for Adds since the mailbox has the ability to
// cancel Adds before they are delivered. When that occurs, the
// head of addPkts has only been peeked and we expect to be
// removing the head of the queue.
if entry == m.addHead {
m.addHead = entry.Next()
}
m.addPkts.Remove(entry)
delete(m.addIndex, inKey)
return true
}
return false
} }
// HasPacket queries the packets for a circuit key, this is used to drop packets // HasPacket queries the packets for a circuit key, this is used to drop packets
// bound for the switch that already have a queued response. // bound for the switch that already have a queued response.
func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool { func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
m.pktCond.L.Lock() m.pktCond.L.Lock()
_, ok := m.pktIndex[inKey] _, ok := m.repIndex[inKey]
m.pktCond.L.Unlock() m.pktCond.L.Unlock()
return ok return ok
@ -209,17 +297,61 @@ func (m *memoryMailBox) Stop() {
m.stopped.Do(func() { m.stopped.Do(func() {
close(m.quit) close(m.quit)
m.wireCond.Signal() m.signalUntilShutdown(wireCourier)
m.pktCond.Signal() m.signalUntilShutdown(pktCourier)
}) })
} }
// signalUntilShutdown strobes the condition variable of the passed courier
// type, blocking until the worker has exited.
func (m *memoryMailBox) signalUntilShutdown(cType courierType) {
var (
cond *sync.Cond
shutdown chan struct{}
)
switch cType {
case wireCourier:
cond = m.wireCond
shutdown = m.wireShutdown
case pktCourier:
cond = m.pktCond
shutdown = m.pktShutdown
}
for {
select {
case <-time.After(time.Millisecond):
cond.Signal()
case <-shutdown:
return
}
}
}
// pktWithExpiry wraps an incoming packet and records the time at which it it
// should be canceled from the mailbox. This will be used to detect if it gets
// stuck in the mailbox and inform when to cancel back.
type pktWithExpiry struct {
pkt *htlcPacket
expiry time.Time
}
func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time {
return clock.TickAfter(p.expiry.Sub(clock.Now()))
}
// mailCourier is a dedicated goroutine whose job is to reliably deliver // mailCourier is a dedicated goroutine whose job is to reliably deliver
// messages of a particular type. There are two types of couriers: wire // messages of a particular type. There are two types of couriers: wire
// couriers, and mail couriers. Depending on the passed courierType, this // couriers, and mail couriers. Depending on the passed courierType, this
// goroutine will assume one of two roles. // goroutine will assume one of two roles.
func (m *memoryMailBox) mailCourier(cType courierType) { func (m *memoryMailBox) mailCourier(cType courierType) {
defer m.wg.Done() switch cType {
case wireCourier:
defer close(m.wireShutdown)
case pktCourier:
defer close(m.pktShutdown)
}
// TODO(roasbeef): refactor... // TODO(roasbeef): refactor...
@ -246,7 +378,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
case pktCourier: case pktCourier:
m.pktCond.L.Lock() m.pktCond.L.Lock()
for m.pktHead == nil { for m.repHead == nil && m.addHead == nil {
m.pktCond.Wait() m.pktCond.Wait()
select { select {
@ -255,9 +387,11 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
// any un-ACK'd messages are re-delivered upon // any un-ACK'd messages are re-delivered upon
// reconnect. // reconnect.
case pktDone := <-m.pktReset: case pktDone := <-m.pktReset:
m.pktHead = m.htlcPkts.Front() m.repHead = m.repPkts.Front()
m.addHead = m.addPkts.Front()
close(pktDone) close(pktDone)
case <-m.quit: case <-m.quit:
m.pktCond.L.Unlock() m.pktCond.L.Unlock()
return return
@ -267,8 +401,11 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
} }
var ( var (
nextPkt *htlcPacket nextRep *htlcPacket
nextMsg lnwire.Message nextRepEl *list.Element
nextAdd *pktWithExpiry
nextAddEl *list.Element
nextMsg lnwire.Message
) )
switch cType { switch cType {
// Grab the datum off the front of the queue, shifting the // Grab the datum off the front of the queue, shifting the
@ -283,8 +420,20 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
// doesn't make it into a commitment, then it'll be // doesn't make it into a commitment, then it'll be
// re-delivered once the link comes back online. // re-delivered once the link comes back online.
case pktCourier: case pktCourier:
nextPkt = m.pktHead.Value.(*htlcPacket) // Peek at the head of the Settle/Fails and Add queues.
m.pktHead = m.pktHead.Next() // We peak both even if there is a Settle/Fail present
// because we need to set a deadline for the next
// pending Add if it's present. Due to clock
// monotonicity, we know that the head of the Adds is
// the next to expire.
if m.repHead != nil {
nextRep = m.repHead.Value.(*htlcPacket)
nextRepEl = m.repHead
}
if m.addHead != nil {
nextAdd = m.addHead.Value.(*pktWithExpiry)
nextAddEl = m.addHead
}
} }
// Now that we're done with the condition, we can unlock it to // Now that we're done with the condition, we can unlock it to
@ -314,14 +463,77 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
} }
case pktCourier: case pktCourier:
var (
pktOutbox chan *htlcPacket
addOutbox chan *htlcPacket
add *htlcPacket
deadline <-chan time.Time
)
// Prioritize delivery of Settle/Fail packets over Adds.
// This ensures that we actively clear the commitment of
// existing HTLCs before trying to add new ones. This
// can help to improve forwarding performance since the
// time to sign a commitment is linear in the number of
// HTLCs manifested on the commitments.
//
// NOTE: Both types are eventually delivered over the
// same channel, but we can control which is delivered
// by exclusively making one nil and the other non-nil.
// We know from our loop condition that at least one
// nextRep and nextAdd are non-nil.
if nextRep != nil {
pktOutbox = m.pktOutbox
} else {
addOutbox = m.pktOutbox
}
// If we have a pending Add, we'll also construct the
// deadline so we can fail it back if we are unable to
// deliver any message in time. We also dereference the
// nextAdd's packet, since we will need access to it in
// the case we are delivering it and/or if the deadline
// expires.
//
// NOTE: It's possible after this point for add to be
// nil, but this can only occur when addOutbox is also
// nil, hence we won't accidentally deliver a nil
// packet.
if nextAdd != nil {
add = nextAdd.pkt
deadline = nextAdd.deadline(m.cfg.clock)
}
select { select {
case m.pktOutbox <- nextPkt: case pktOutbox <- nextRep:
m.pktCond.L.Lock()
// Only advance the repHead if this Settle or
// Fail is still at the head of the queue.
if m.repHead != nil && m.repHead == nextRepEl {
m.repHead = m.repHead.Next()
}
m.pktCond.L.Unlock()
case addOutbox <- add:
m.pktCond.L.Lock()
// Only advance the addHead if this Add is still
// at the head of the queue.
if m.addHead != nil && m.addHead == nextAddEl {
m.addHead = m.addHead.Next()
}
m.pktCond.L.Unlock()
case <-deadline:
m.FailAdd(add)
case pktDone := <-m.pktReset: case pktDone := <-m.pktReset:
m.pktCond.L.Lock() m.pktCond.L.Lock()
m.pktHead = m.htlcPkts.Front() m.repHead = m.repPkts.Front()
m.addHead = m.addPkts.Front()
m.pktCond.L.Unlock() m.pktCond.L.Unlock()
close(pktDone) close(pktDone)
case <-m.quit: case <-m.quit:
return return
} }
@ -353,18 +565,41 @@ func (m *memoryMailBox) AddMessage(msg lnwire.Message) error {
// NOTE: This method is safe for concrete use and part of the MailBox // NOTE: This method is safe for concrete use and part of the MailBox
// interface. // interface.
func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error { func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error {
// First, we'll lock the condition, and add the packet to the end of
// the htlc packet inbox.
m.pktCond.L.Lock() m.pktCond.L.Lock()
if _, ok := m.pktIndex[pkt.inKey()]; ok { switch htlc := pkt.htlc.(type) {
m.pktCond.L.Unlock()
return nil
}
entry := m.htlcPkts.PushBack(pkt) // Split off Settle/Fail packets into the repPkts queue.
m.pktIndex[pkt.inKey()] = entry case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC:
if m.pktHead == nil { if _, ok := m.repIndex[pkt.inKey()]; ok {
m.pktHead = entry m.pktCond.L.Unlock()
return ErrPacketAlreadyExists
}
entry := m.repPkts.PushBack(pkt)
m.repIndex[pkt.inKey()] = entry
if m.repHead == nil {
m.repHead = entry
}
// Split off Add packets into the addPkts queue.
case *lnwire.UpdateAddHTLC:
if _, ok := m.addIndex[pkt.inKey()]; ok {
m.pktCond.L.Unlock()
return ErrPacketAlreadyExists
}
entry := m.addPkts.PushBack(&pktWithExpiry{
pkt: pkt,
expiry: m.cfg.clock.Now().Add(m.cfg.expiry),
})
m.addIndex[pkt.inKey()] = entry
if m.addHead == nil {
m.addHead = entry
}
default:
m.pktCond.L.Unlock()
return fmt.Errorf("unknown htlc type: %T", htlc)
} }
m.pktCond.L.Unlock() m.pktCond.L.Unlock()
@ -375,6 +610,80 @@ func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error {
return nil return nil
} }
// FailAdd fails an UpdateAddHTLC that exists within the mailbox, removing it
// from the in-memory replay buffer. This will prevent the packet from being
// delivered after the link restarts if the switch has remained online. The
// generated LinkError will show an OutgoingFailureDownstreamHtlcAdd
// FailureDetail.
func (m *memoryMailBox) FailAdd(pkt *htlcPacket) {
// First, remove the packet from mailbox. If we didn't find the packet
// because it has already been acked, we'll exit early to avoid sending
// a duplicate fail message through the switch.
if !m.AckPacket(pkt.inKey()) {
return
}
var (
localFailure = false
reason lnwire.OpaqueReason
)
// Create a temporary channel failure which we will send back to our
// peer if this is a forward, or report to the user if the failed
// payment was locally initiated.
var failure lnwire.FailureMessage
update, err := m.cfg.fetchUpdate(m.cfg.shortChanID)
if err != nil {
failure = &lnwire.FailTemporaryNodeFailure{}
} else {
failure = lnwire.NewTemporaryChannelFailure(update)
}
// If the payment was locally initiated (which is indicated by a nil
// obfuscator), we do not need to encrypt it back to the sender.
if pkt.obfuscator == nil {
var b bytes.Buffer
err := lnwire.EncodeFailure(&b, failure, 0)
if err != nil {
log.Errorf("Unable to encode failure: %v", err)
return
}
reason = lnwire.OpaqueReason(b.Bytes())
localFailure = true
} else {
// If the packet is part of a forward, (identified by a non-nil
// obfuscator) we need to encrypt the error back to the source.
var err error
reason, err = pkt.obfuscator.EncryptFirstHop(failure)
if err != nil {
log.Errorf("Unable to obfuscate error: %v", err)
return
}
}
// Create a link error containing the temporary channel failure and a
// detail which indicates the we failed to add the htlc.
linkError := NewDetailedLinkError(
failure, OutgoingFailureDownstreamHtlcAdd,
)
failPkt := &htlcPacket{
incomingChanID: pkt.incomingChanID,
incomingHTLCID: pkt.incomingHTLCID,
circuit: pkt.circuit,
sourceRef: pkt.sourceRef,
hasSource: true,
localFailure: localFailure,
linkFailure: linkError,
htlc: &lnwire.UpdateFailHTLC{
Reason: reason,
},
}
errChan := m.cfg.forwardPackets(m.quit, failPkt)
go handleBatchFwdErrs(errChan, log)
}
// MessageOutBox returns a channel that any new messages ready for delivery // MessageOutBox returns a channel that any new messages ready for delivery
// will be sent on. // will be sent on.
// //
@ -399,6 +708,8 @@ func (m *memoryMailBox) PacketOutBox() chan *htlcPacket {
type mailOrchestrator struct { type mailOrchestrator struct {
mu sync.RWMutex mu sync.RWMutex
cfg *mailOrchConfig
// mailboxes caches exactly one mailbox for all known channels. // mailboxes caches exactly one mailbox for all known channels.
mailboxes map[lnwire.ChannelID]MailBox mailboxes map[lnwire.ChannelID]MailBox
@ -419,9 +730,29 @@ type mailOrchestrator struct {
unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket
} }
type mailOrchConfig struct {
// forwardPackets send a varidic number of htlcPackets to the switch to
// be routed. A quit channel should be provided so that the call can
// properly exit during shutdown.
forwardPackets func(chan struct{}, ...*htlcPacket) chan error
// fetchUpdate retreives the most recent channel update for the channel
// this mailbox belongs to.
fetchUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error)
// clock is a time source for the generated mailboxes.
clock clock.Clock
// expiry is the interval after which Adds will be cancelled if they
// have not been yet been delivered. The computed deadline will expiry
// this long after the Adds are added to a mailbox via AddPacket.
expiry time.Duration
}
// newMailOrchestrator initializes a fresh mailOrchestrator. // newMailOrchestrator initializes a fresh mailOrchestrator.
func newMailOrchestrator() *mailOrchestrator { func newMailOrchestrator(cfg *mailOrchConfig) *mailOrchestrator {
return &mailOrchestrator{ return &mailOrchestrator{
cfg: cfg,
mailboxes: make(map[lnwire.ChannelID]MailBox), mailboxes: make(map[lnwire.ChannelID]MailBox),
liveIndex: make(map[lnwire.ShortChannelID]lnwire.ChannelID), liveIndex: make(map[lnwire.ShortChannelID]lnwire.ChannelID),
unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket), unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket),
@ -437,7 +768,9 @@ func (mo *mailOrchestrator) Stop() {
// GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or // GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or
// creates and returns a new mailbox if none is found. // creates and returns a new mailbox if none is found.
func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID) MailBox { func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID,
shortChanID lnwire.ShortChannelID) MailBox {
// First, try lookup the mailbox directly using only the shared mutex. // First, try lookup the mailbox directly using only the shared mutex.
mo.mu.RLock() mo.mu.RLock()
mailbox, ok := mo.mailboxes[chanID] mailbox, ok := mo.mailboxes[chanID]
@ -450,7 +783,7 @@ func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID) MailBox
// Otherwise, we will try again with exclusive lock, creating a mailbox // Otherwise, we will try again with exclusive lock, creating a mailbox
// if one still has not been created. // if one still has not been created.
mo.mu.Lock() mo.mu.Lock()
mailbox = mo.exclusiveGetOrCreateMailBox(chanID) mailbox = mo.exclusiveGetOrCreateMailBox(chanID, shortChanID)
mo.mu.Unlock() mo.mu.Unlock()
return mailbox return mailbox
@ -462,11 +795,17 @@ func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID) MailBox
// //
// NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock. // NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock.
func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox( func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox(
chanID lnwire.ChannelID) MailBox { chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox {
mailbox, ok := mo.mailboxes[chanID] mailbox, ok := mo.mailboxes[chanID]
if !ok { if !ok {
mailbox = newMemoryMailBox() mailbox = newMemoryMailBox(&mailBoxConfig{
shortChanID: shortChanID,
fetchUpdate: mo.cfg.fetchUpdate,
forwardPackets: mo.cfg.forwardPackets,
clock: mo.cfg.clock,
expiry: mo.cfg.expiry,
})
mailbox.Start() mailbox.Start()
mo.mailboxes[chanID] = mailbox mo.mailboxes[chanID] = mailbox
} }
@ -546,7 +885,7 @@ func (mo *mailOrchestrator) Deliver(
// index should only be set if the mailbox had been initialized // index should only be set if the mailbox had been initialized
// beforehand. However, this does ensure that this case is // beforehand. However, this does ensure that this case is
// handled properly in the event that it could happen. // handled properly in the event that it could happen.
mailbox = mo.exclusiveGetOrCreateMailBox(chanID) mailbox = mo.exclusiveGetOrCreateMailBox(chanID, sid)
mo.mu.Unlock() mo.mu.Unlock()
// Deliver the packet to the mailbox if it was found or created. // Deliver the packet to the mailbox if it was found or created.

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
@ -19,7 +20,10 @@ func TestMailBoxCouriers(t *testing.T) {
// First, we'll create new instance of the current default mailbox // First, we'll create new instance of the current default mailbox
// type. // type.
mailBox := newMemoryMailBox() mailBox := newMemoryMailBox(&mailBoxConfig{
clock: clock.NewDefaultClock(),
expiry: time.Minute,
})
mailBox.Start() mailBox.Start()
defer mailBox.Stop() defer mailBox.Stop()
@ -34,10 +38,16 @@ func TestMailBoxCouriers(t *testing.T) {
outgoingChanID: lnwire.NewShortChanIDFromInt(uint64(prand.Int63())), outgoingChanID: lnwire.NewShortChanIDFromInt(uint64(prand.Int63())),
incomingChanID: lnwire.NewShortChanIDFromInt(uint64(prand.Int63())), incomingChanID: lnwire.NewShortChanIDFromInt(uint64(prand.Int63())),
amount: lnwire.MilliSatoshi(prand.Int63()), amount: lnwire.MilliSatoshi(prand.Int63()),
htlc: &lnwire.UpdateAddHTLC{
ID: uint64(i),
},
} }
sentPackets[i] = pkt sentPackets[i] = pkt
mailBox.AddPacket(pkt) err := mailBox.AddPacket(pkt)
if err != nil {
t.Fatalf("unable to add packet: %v", err)
}
} }
// Next, we'll do the same, but this time adding wire messages. // Next, we'll do the same, but this time adding wire messages.
@ -148,6 +158,387 @@ func TestMailBoxCouriers(t *testing.T) {
} }
} }
// TestMailBoxResetAfterShutdown tests that ResetMessages and ResetPackets
// return ErrMailBoxShuttingDown after the mailbox has been stopped.
func TestMailBoxResetAfterShutdown(t *testing.T) {
t.Parallel()
m := newMemoryMailBox(&mailBoxConfig{})
m.Start()
// Stop the mailbox, then try to reset the message and packet couriers.
m.Stop()
err := m.ResetMessages()
if err != ErrMailBoxShuttingDown {
t.Fatalf("expected ErrMailBoxShuttingDown, got: %v", err)
}
err = m.ResetPackets()
if err != ErrMailBoxShuttingDown {
t.Fatalf("expected ErrMailBoxShuttingDown, got: %v", err)
}
}
type mailboxContext struct {
t *testing.T
mailbox MailBox
clock *clock.TestClock
forwards chan *htlcPacket
}
func newMailboxContext(t *testing.T, startTime time.Time,
expiry time.Duration) *mailboxContext {
ctx := &mailboxContext{
t: t,
clock: clock.NewTestClock(startTime),
forwards: make(chan *htlcPacket, 1),
}
ctx.mailbox = newMemoryMailBox(&mailBoxConfig{
fetchUpdate: func(sid lnwire.ShortChannelID) (
*lnwire.ChannelUpdate, error) {
return &lnwire.ChannelUpdate{
ShortChannelID: sid,
}, nil
},
forwardPackets: ctx.forward,
clock: ctx.clock,
expiry: expiry,
})
ctx.mailbox.Start()
return ctx
}
func (c *mailboxContext) forward(_ chan struct{},
pkts ...*htlcPacket) chan error {
for _, pkt := range pkts {
c.forwards <- pkt
}
errChan := make(chan error)
close(errChan)
return errChan
}
func (c *mailboxContext) sendAdds(start, num int) []*htlcPacket {
c.t.Helper()
sentPackets := make([]*htlcPacket, num)
for i := 0; i < num; i++ {
pkt := &htlcPacket{
outgoingChanID: lnwire.NewShortChanIDFromInt(
uint64(prand.Int63())),
incomingChanID: lnwire.NewShortChanIDFromInt(
uint64(prand.Int63())),
incomingHTLCID: uint64(start + i),
amount: lnwire.MilliSatoshi(prand.Int63()),
htlc: &lnwire.UpdateAddHTLC{
ID: uint64(start + i),
},
}
sentPackets[i] = pkt
err := c.mailbox.AddPacket(pkt)
if err != nil {
c.t.Fatalf("unable to add packet: %v", err)
}
}
return sentPackets
}
func (c *mailboxContext) receivePkts(pkts []*htlcPacket) {
c.t.Helper()
for i, expPkt := range pkts {
select {
case pkt := <-c.mailbox.PacketOutBox():
if reflect.DeepEqual(expPkt, pkt) {
continue
}
c.t.Fatalf("inkey mismatch #%d, want: %v vs "+
"got: %v", i, expPkt.inKey(), pkt.inKey())
case <-time.After(50 * time.Millisecond):
c.t.Fatalf("did not receive fail for index %d", i)
}
}
}
func (c *mailboxContext) checkFails(adds []*htlcPacket) {
c.t.Helper()
for i, add := range adds {
select {
case fail := <-c.forwards:
if add.inKey() == fail.inKey() {
continue
}
c.t.Fatalf("inkey mismatch #%d, add: %v vs fail: %v",
i, add.inKey(), fail.inKey())
case <-time.After(50 * time.Millisecond):
c.t.Fatalf("did not receive fail for index %d", i)
}
}
select {
case pkt := <-c.forwards:
c.t.Fatalf("unexpected forward: %v", pkt)
case <-time.After(50 * time.Millisecond):
}
}
// TestMailBoxFailAdd asserts that FailAdd returns a response to the switch
// under various interleavings with other operations on the mailbox.
func TestMailBoxFailAdd(t *testing.T) {
var (
batchDelay = time.Second
expiry = time.Minute
firstBatchStart = time.Now()
secondBatchStart = time.Now().Add(batchDelay)
thirdBatchStart = time.Now().Add(2 * batchDelay)
thirdBatchExpiry = thirdBatchStart.Add(expiry)
)
ctx := newMailboxContext(t, firstBatchStart, expiry)
defer ctx.mailbox.Stop()
failAdds := func(adds []*htlcPacket) {
for _, add := range adds {
ctx.mailbox.FailAdd(add)
}
}
const numBatchPackets = 5
// Send 10 adds, and pull them from the mailbox.
firstBatch := ctx.sendAdds(0, numBatchPackets)
ctx.receivePkts(firstBatch)
// Fail all of these adds, simulating an error adding the HTLCs to the
// commitment. We should see a failure message for each.
go failAdds(firstBatch)
ctx.checkFails(firstBatch)
// As a sanity check, Fail all of them again and assert that no
// duplicate fails are sent.
go failAdds(firstBatch)
ctx.checkFails(nil)
// Now, send a second batch of adds after a short delay and deliver them
// to the link.
ctx.clock.SetTime(secondBatchStart)
secondBatch := ctx.sendAdds(numBatchPackets, numBatchPackets)
ctx.receivePkts(secondBatch)
// Reset the packet queue w/o changing the current time. This simulates
// the link flapping and coming back up before the second batch's
// expiries have elapsed. We should see no failures sent back.
err := ctx.mailbox.ResetPackets()
if err != nil {
t.Fatalf("unable to reset packets: %v", err)
}
ctx.checkFails(nil)
// Redeliver the second batch to the link and hold them there.
ctx.receivePkts(secondBatch)
// Send a third batch of adds shortly after the second batch.
ctx.clock.SetTime(thirdBatchStart)
thirdBatch := ctx.sendAdds(2*numBatchPackets, numBatchPackets)
// Advance the clock so that the third batch expires. We expect to only
// see fails for the third batch, since the second batch is still being
// held by the link.
ctx.clock.SetTime(thirdBatchExpiry)
ctx.checkFails(thirdBatch)
// Finally, reset the link which should cause the second batch to be
// cancelled immediately.
err = ctx.mailbox.ResetPackets()
if err != nil {
t.Fatalf("unable to reset packets: %v", err)
}
ctx.checkFails(secondBatch)
}
// TestMailBoxPacketPrioritization asserts that the mailbox will prioritize
// delivering Settle and Fail packets over Adds if both are available for
// delivery at the same time.
func TestMailBoxPacketPrioritization(t *testing.T) {
t.Parallel()
// First, we'll create new instance of the current default mailbox
// type.
mailBox := newMemoryMailBox(&mailBoxConfig{
clock: clock.NewDefaultClock(),
expiry: time.Minute,
})
mailBox.Start()
defer mailBox.Stop()
const numPackets = 5
_, _, aliceChanID, bobChanID := genIDs()
// Next we'll send the following sequence of packets:
// - Settle1
// - Add1
// - Add2
// - Fail
// - Settle2
sentPackets := make([]*htlcPacket, numPackets)
for i := 0; i < numPackets; i++ {
pkt := &htlcPacket{
outgoingChanID: aliceChanID,
outgoingHTLCID: uint64(i),
incomingChanID: bobChanID,
incomingHTLCID: uint64(i),
amount: lnwire.MilliSatoshi(prand.Int63()),
}
switch i {
case 0, 4:
// First and last packets are a Settle. A non-Add is
// sent first to make the test deterministic w/o needing
// to sleep.
pkt.htlc = &lnwire.UpdateFulfillHTLC{ID: uint64(i)}
case 1, 2:
// Next two packets are Adds.
pkt.htlc = &lnwire.UpdateAddHTLC{ID: uint64(i)}
case 3:
// Last packet is a Fail.
pkt.htlc = &lnwire.UpdateFailHTLC{ID: uint64(i)}
}
sentPackets[i] = pkt
err := mailBox.AddPacket(pkt)
if err != nil {
t.Fatalf("failed to add packet: %v", err)
}
}
// When dequeueing the packets, we expect the following sequence:
// - Settle1
// - Fail
// - Settle2
// - Add1
// - Add2
//
// We expect to see Fail and Settle2 to be delivered before either Add1
// or Add2 due to the prioritization between the split queue.
for i := 0; i < numPackets; i++ {
select {
case pkt := <-mailBox.PacketOutBox():
var expPkt *htlcPacket
switch i {
case 0:
// First packet should be Settle1.
expPkt = sentPackets[0]
case 1:
// Second packet should be Fail.
expPkt = sentPackets[3]
case 2:
// Third packet should be Settle2.
expPkt = sentPackets[4]
case 3:
// Fourth packet should be Add1.
expPkt = sentPackets[1]
case 4:
// Last packet should be Add2.
expPkt = sentPackets[2]
}
if !reflect.DeepEqual(expPkt, pkt) {
t.Fatalf("recvd packet mismatch %d, want: %v, got: %v",
i, spew.Sdump(expPkt), spew.Sdump(pkt))
}
case <-time.After(50 * time.Millisecond):
t.Fatalf("didn't receive packet %d before timeout", i)
}
}
}
// TestMailBoxAddExpiry asserts that the mailbox will cancel back Adds that have
// reached their expiry time.
func TestMailBoxAddExpiry(t *testing.T) {
var (
expiry = time.Minute
batchDelay = time.Second
firstBatchStart = time.Now()
firstBatchExpiry = firstBatchStart.Add(expiry)
secondBatchStart = firstBatchStart.Add(batchDelay)
secondBatchExpiry = secondBatchStart.Add(expiry)
)
ctx := newMailboxContext(t, firstBatchStart, expiry)
defer ctx.mailbox.Stop()
// Each batch will consist of 10 messages.
const numBatchPackets = 10
firstBatch := ctx.sendAdds(0, numBatchPackets)
ctx.clock.SetTime(secondBatchStart)
ctx.checkFails(nil)
secondBatch := ctx.sendAdds(numBatchPackets, numBatchPackets)
ctx.clock.SetTime(firstBatchExpiry)
ctx.checkFails(firstBatch)
ctx.clock.SetTime(secondBatchExpiry)
ctx.checkFails(secondBatch)
}
// TestMailBoxDuplicateAddPacket asserts that the mailbox returns an
// ErrPacketAlreadyExists failure when two htlcPackets are added with identical
// incoming circuit keys.
func TestMailBoxDuplicateAddPacket(t *testing.T) {
t.Parallel()
mailBox := newMemoryMailBox(&mailBoxConfig{
clock: clock.NewDefaultClock(),
})
mailBox.Start()
defer mailBox.Stop()
addTwice := func(t *testing.T, pkt *htlcPacket) {
// The first add should succeed.
err := mailBox.AddPacket(pkt)
if err != nil {
t.Fatalf("unable to add packet: %v", err)
}
// Adding again with the same incoming circuit key should fail.
err = mailBox.AddPacket(pkt)
if err != ErrPacketAlreadyExists {
t.Fatalf("expected ErrPacketAlreadyExists, got: %v", err)
}
}
// Assert duplicate AddPacket calls fail for all types of HTLCs.
addTwice(t, &htlcPacket{
incomingHTLCID: 0,
htlc: &lnwire.UpdateAddHTLC{},
})
addTwice(t, &htlcPacket{
incomingHTLCID: 1,
htlc: &lnwire.UpdateFulfillHTLC{},
})
addTwice(t, &htlcPacket{
incomingHTLCID: 2,
htlc: &lnwire.UpdateFailHTLC{},
})
}
// TestMailOrchestrator asserts that the orchestrator properly buffers packets // TestMailOrchestrator asserts that the orchestrator properly buffers packets
// for channels that haven't been made live, such that they are delivered // for channels that haven't been made live, such that they are delivered
// immediately after BindLiveShortChanID. It also tests that packets are delivered // immediately after BindLiveShortChanID. It also tests that packets are delivered
@ -156,7 +547,10 @@ func TestMailOrchestrator(t *testing.T) {
t.Parallel() t.Parallel()
// First, we'll create a new instance of our orchestrator. // First, we'll create a new instance of our orchestrator.
mo := newMailOrchestrator() mo := newMailOrchestrator(&mailOrchConfig{
clock: clock.NewDefaultClock(),
expiry: time.Minute,
})
defer mo.Stop() defer mo.Stop()
// We'll be delivering 10 htlc packets via the orchestrator. // We'll be delivering 10 htlc packets via the orchestrator.
@ -174,6 +568,9 @@ func TestMailOrchestrator(t *testing.T) {
incomingChanID: bobChanID, incomingChanID: bobChanID,
incomingHTLCID: uint64(i), incomingHTLCID: uint64(i),
amount: lnwire.MilliSatoshi(prand.Int63()), amount: lnwire.MilliSatoshi(prand.Int63()),
htlc: &lnwire.UpdateAddHTLC{
ID: uint64(i),
},
} }
sentPackets[i] = pkt sentPackets[i] = pkt
@ -181,7 +578,7 @@ func TestMailOrchestrator(t *testing.T) {
} }
// Now, initialize a new mailbox for Alice's chanid. // Now, initialize a new mailbox for Alice's chanid.
mailbox := mo.GetOrCreateMailBox(chanID1) mailbox := mo.GetOrCreateMailBox(chanID1, aliceChanID)
// Verify that no messages are received, since Alice's mailbox has not // Verify that no messages are received, since Alice's mailbox has not
// been made live. // been made live.
@ -226,7 +623,7 @@ func TestMailOrchestrator(t *testing.T) {
// For the second half of the test, create a new mailbox for Bob and // For the second half of the test, create a new mailbox for Bob and
// immediately make it live with an assigned short chan id. // immediately make it live with an assigned short chan id.
mailbox = mo.GetOrCreateMailBox(chanID2) mailbox = mo.GetOrCreateMailBox(chanID2, bobChanID)
mo.BindLiveShortChanID(mailbox, chanID2, bobChanID) mo.BindLiveShortChanID(mailbox, chanID2, bobChanID)
// Create the second half of our htlcs, and deliver them via the // Create the second half of our htlcs, and deliver them via the
@ -239,6 +636,9 @@ func TestMailOrchestrator(t *testing.T) {
incomingChanID: bobChanID, incomingChanID: bobChanID,
incomingHTLCID: uint64(halfPackets + i), incomingHTLCID: uint64(halfPackets + i),
amount: lnwire.MilliSatoshi(prand.Int63()), amount: lnwire.MilliSatoshi(prand.Int63()),
htlc: &lnwire.UpdateAddHTLC{
ID: uint64(halfPackets + i),
},
} }
sentPackets[i] = pkt sentPackets[i] = pkt

@ -177,6 +177,8 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error)
LogEventTicker: ticker.NewForce(DefaultLogInterval), LogEventTicker: ticker.NewForce(DefaultLogInterval),
AckEventTicker: ticker.NewForce(DefaultAckInterval), AckEventTicker: ticker.NewForce(DefaultAckInterval),
HtlcNotifier: &mockHTLCNotifier{}, HtlcNotifier: &mockHTLCNotifier{},
Clock: clock.NewDefaultClock(),
HTLCExpiry: time.Hour,
} }
return New(cfg, startingHeight) return New(cfg, startingHeight)

@ -9,11 +9,13 @@ import (
"time" "time"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
@ -35,6 +37,10 @@ const (
// DefaultAckInterval is the duration between attempts to ack any settle // DefaultAckInterval is the duration between attempts to ack any settle
// fails in a forwarding package. // fails in a forwarding package.
DefaultAckInterval = 15 * time.Second DefaultAckInterval = 15 * time.Second
// DefaultHTLCExpiry is the duration after which Adds will be cancelled
// if they could not get added to an outgoing commitment.
DefaultHTLCExpiry = time.Minute
) )
var ( var (
@ -173,6 +179,15 @@ type Config struct {
// RejectHTLC is a flag that instructs the htlcswitch to reject any // RejectHTLC is a flag that instructs the htlcswitch to reject any
// HTLCs that are not from the source hop. // HTLCs that are not from the source hop.
RejectHTLC bool RejectHTLC bool
// Clock is a time source for the switch.
Clock clock.Clock
// HTLCExpiry is the interval after which Adds will be cancelled if they
// have not been yet been delivered to a link. The computed deadline
// will expiry this long after the Adds are added to a mailbox via
// AddPacket.
HTLCExpiry time.Duration
} }
// Switch is the central messaging bus for all incoming/outgoing HTLCs. // Switch is the central messaging bus for all incoming/outgoing HTLCs.
@ -282,12 +297,11 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) {
return nil, err return nil, err
} }
return &Switch{ s := &Switch{
bestHeight: currentHeight, bestHeight: currentHeight,
cfg: &cfg, cfg: &cfg,
circuits: circuitMap, circuits: circuitMap,
linkIndex: make(map[lnwire.ChannelID]ChannelLink), linkIndex: make(map[lnwire.ChannelID]ChannelLink),
mailOrchestrator: newMailOrchestrator(),
forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink), forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink),
interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink), interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink),
pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink), pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink),
@ -296,7 +310,16 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) {
chanCloseRequests: make(chan *ChanClose), chanCloseRequests: make(chan *ChanClose),
resolutionMsgs: make(chan *resolutionMsg), resolutionMsgs: make(chan *resolutionMsg),
quit: make(chan struct{}), quit: make(chan struct{}),
}, nil }
s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{
fetchUpdate: s.cfg.FetchLastChannelUpdate,
forwardPackets: s.ForwardPackets,
clock: s.cfg.Clock,
expiry: s.cfg.HTLCExpiry,
})
return s, nil
} }
// resolutionMsg is a struct that wraps an existing ResolutionMsg with a done // resolutionMsg is a struct that wraps an existing ResolutionMsg with a done
@ -1972,13 +1995,13 @@ func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) {
// link quit channel, meaning the send will fail only if the // link quit channel, meaning the send will fail only if the
// switch receives a shutdown request. // switch receives a shutdown request.
errChan := s.ForwardPackets(nil, switchPackets...) errChan := s.ForwardPackets(nil, switchPackets...)
go handleBatchFwdErrs(errChan) go handleBatchFwdErrs(errChan, log)
} }
} }
// handleBatchFwdErrs waits on the given errChan until it is closed, logging the // handleBatchFwdErrs waits on the given errChan until it is closed, logging the
// errors returned from any unsuccessful forwarding attempts. // errors returned from any unsuccessful forwarding attempts.
func handleBatchFwdErrs(errChan chan error) { func handleBatchFwdErrs(errChan chan error, l btclog.Logger) {
for { for {
err, ok := <-errChan err, ok := <-errChan
if !ok { if !ok {
@ -1991,7 +2014,7 @@ func handleBatchFwdErrs(errChan chan error) {
continue continue
} }
log.Errorf("unhandled error while reforwarding htlc "+ l.Errorf("Unhandled error while reforwarding htlc "+
"settle/fail over htlcswitch: %v", err) "settle/fail over htlcswitch: %v", err)
} }
} }
@ -2036,7 +2059,8 @@ func (s *Switch) AddLink(link ChannelLink) error {
// Get and attach the mailbox for this link, which buffers packets in // Get and attach the mailbox for this link, which buffers packets in
// case there packets that we tried to deliver while this link was // case there packets that we tried to deliver while this link was
// offline. // offline.
mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID) shortChanID := link.ShortChanID()
mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID)
link.AttachMailBox(mailbox) link.AttachMailBox(mailbox)
if err := link.Start(); err != nil { if err := link.Start(); err != nil {
@ -2044,7 +2068,6 @@ func (s *Switch) AddLink(link ChannelLink) error {
return err return err
} }
shortChanID := link.ShortChanID()
if shortChanID == hop.Source { if shortChanID == hop.Source {
log.Infof("Adding pending link chan_id=%v, short_chan_id=%v", log.Infof("Adding pending link chan_id=%v, short_chan_id=%v",
chanID, shortChanID) chanID, shortChanID)
@ -2216,7 +2239,7 @@ func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error {
// Finally, alert the mail orchestrator to the change of short channel // Finally, alert the mail orchestrator to the change of short channel
// ID, and deliver any unclaimed packets to the link. // ID, and deliver any unclaimed packets to the link.
mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID) mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID)
s.mailOrchestrator.BindLiveShortChanID( s.mailOrchestrator.BindLiveShortChanID(
mailbox, chanID, shortChanID, mailbox, chanID, shortChanID,
) )

@ -1167,6 +1167,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
BatchSize: 10, BatchSize: 10,
BatchTicker: ticker.NewForce(testBatchTimeout), BatchTicker: ticker.NewForce(testBatchTimeout),
FwdPkgGCTicker: ticker.NewForce(fwdPkgTimeout), FwdPkgGCTicker: ticker.NewForce(fwdPkgTimeout),
PendingCommitTicker: ticker.NewForce(time.Minute),
MinFeeUpdateTimeout: minFeeUpdateTimeout, MinFeeUpdateTimeout: minFeeUpdateTimeout,
MaxFeeUpdateTimeout: maxFeeUpdateTimeout, MaxFeeUpdateTimeout: maxFeeUpdateTimeout,
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},

@ -669,6 +669,7 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
SyncStates: syncStates, SyncStates: syncStates,
BatchTicker: ticker.New(50 * time.Millisecond), BatchTicker: ticker.New(50 * time.Millisecond),
FwdPkgGCTicker: ticker.New(time.Minute), FwdPkgGCTicker: ticker.New(time.Minute),
PendingCommitTicker: ticker.New(time.Minute),
BatchSize: 10, BatchSize: 10,
UnsafeReplay: cfg.UnsafeReplay, UnsafeReplay: cfg.UnsafeReplay,
MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout, MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout,

@ -496,6 +496,8 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
AckEventTicker: ticker.New(htlcswitch.DefaultAckInterval), AckEventTicker: ticker.New(htlcswitch.DefaultAckInterval),
AllowCircularRoute: cfg.AllowCircularRoute, AllowCircularRoute: cfg.AllowCircularRoute,
RejectHTLC: cfg.RejectHTLC, RejectHTLC: cfg.RejectHTLC,
Clock: clock.NewDefaultClock(),
HTLCExpiry: htlcswitch.DefaultHTLCExpiry,
}, uint32(currentHeight)) }, uint32(currentHeight))
if err != nil { if err != nil {
return nil, err return nil, err