htlcswitch: thread clock from switch to mailbox

This commit is contained in:
Conner Fromknecht 2020-04-14 10:49:26 -07:00
parent 63f3d0b012
commit 37dca27a3d
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
5 changed files with 55 additions and 4 deletions

@ -7,6 +7,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
@ -79,6 +80,14 @@ type mailBoxConfig struct {
// be routed. A quit channel should be provided so that the call can // be routed. A quit channel should be provided so that the call can
// properly exit during shutdown. // properly exit during shutdown.
forwardPackets func(chan struct{}, ...*htlcPacket) chan error forwardPackets func(chan struct{}, ...*htlcPacket) chan error
// clock is a time source for the mailbox.
clock clock.Clock
// expiry is the interval after which Adds will be cancelled if they
// have not been yet been delivered. The computed deadline will expiry
// this long after the Adds are added via AddPacket.
expiry time.Duration
} }
// memoryMailBox is an implementation of the MailBox struct backed by purely // memoryMailBox is an implementation of the MailBox struct backed by purely
@ -586,6 +595,14 @@ type mailOrchConfig struct {
// fetchUpdate retreives the most recent channel update for the channel // fetchUpdate retreives the most recent channel update for the channel
// this mailbox belongs to. // this mailbox belongs to.
fetchUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) fetchUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error)
// clock is a time source for the generated mailboxes.
clock clock.Clock
// expiry is the interval after which Adds will be cancelled if they
// have not been yet been delivered. The computed deadline will expiry
// this long after the Adds are added to a mailbox via AddPacket.
expiry time.Duration
} }
// newMailOrchestrator initializes a fresh mailOrchestrator. // newMailOrchestrator initializes a fresh mailOrchestrator.
@ -642,6 +659,8 @@ func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox(
shortChanID: shortChanID, shortChanID: shortChanID,
fetchUpdate: mo.cfg.fetchUpdate, fetchUpdate: mo.cfg.fetchUpdate,
forwardPackets: mo.cfg.forwardPackets, forwardPackets: mo.cfg.forwardPackets,
clock: mo.cfg.clock,
expiry: mo.cfg.expiry,
}) })
mailbox.Start() mailbox.Start()
mo.mailboxes[chanID] = mailbox mo.mailboxes[chanID] = mailbox

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
@ -19,7 +20,10 @@ func TestMailBoxCouriers(t *testing.T) {
// First, we'll create new instance of the current default mailbox // First, we'll create new instance of the current default mailbox
// type. // type.
mailBox := newMemoryMailBox(&mailBoxConfig{}) mailBox := newMemoryMailBox(&mailBoxConfig{
clock: clock.NewDefaultClock(),
expiry: time.Minute,
})
mailBox.Start() mailBox.Start()
defer mailBox.Stop() defer mailBox.Stop()
@ -172,14 +176,17 @@ func TestMailBoxResetAfterShutdown(t *testing.T) {
type mailboxContext struct { type mailboxContext struct {
t *testing.T t *testing.T
clock *clock.TestClock
mailbox MailBox mailbox MailBox
forwards chan *htlcPacket forwards chan *htlcPacket
} }
func newMailboxContext(t *testing.T) *mailboxContext { func newMailboxContext(t *testing.T, startTime time.Time,
expiry time.Duration) *mailboxContext {
ctx := &mailboxContext{ ctx := &mailboxContext{
t: t, t: t,
clock: clock.NewTestClock(startTime),
forwards: make(chan *htlcPacket, 1), forwards: make(chan *htlcPacket, 1),
} }
ctx.mailbox = newMemoryMailBox(&mailBoxConfig{ ctx.mailbox = newMemoryMailBox(&mailBoxConfig{
@ -190,6 +197,8 @@ func newMailboxContext(t *testing.T) *mailboxContext {
}, nil }, nil
}, },
forwardPackets: ctx.forward, forwardPackets: ctx.forward,
clock: ctx.clock,
expiry: expiry,
}) })
ctx.mailbox.Start() ctx.mailbox.Start()
@ -282,7 +291,7 @@ func (c *mailboxContext) checkFails(adds []*htlcPacket) {
// TestMailBoxFailAdd asserts that FailAdd returns a response to the switch // TestMailBoxFailAdd asserts that FailAdd returns a response to the switch
// under various interleavings with other operations on the mailbox. // under various interleavings with other operations on the mailbox.
func TestMailBoxFailAdd(t *testing.T) { func TestMailBoxFailAdd(t *testing.T) {
ctx := newMailboxContext(t) ctx := newMailboxContext(t, time.Now(), time.Minute)
defer ctx.mailbox.Stop() defer ctx.mailbox.Stop()
failAdds := func(adds []*htlcPacket) { failAdds := func(adds []*htlcPacket) {
@ -316,7 +325,10 @@ func TestMailOrchestrator(t *testing.T) {
t.Parallel() t.Parallel()
// First, we'll create a new instance of our orchestrator. // First, we'll create a new instance of our orchestrator.
mo := newMailOrchestrator(&mailOrchConfig{}) mo := newMailOrchestrator(&mailOrchConfig{
clock: clock.NewDefaultClock(),
expiry: time.Minute,
})
defer mo.Stop() defer mo.Stop()
// We'll be delivering 10 htlc packets via the orchestrator. // We'll be delivering 10 htlc packets via the orchestrator.

@ -177,6 +177,8 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error)
LogEventTicker: ticker.NewForce(DefaultLogInterval), LogEventTicker: ticker.NewForce(DefaultLogInterval),
AckEventTicker: ticker.NewForce(DefaultAckInterval), AckEventTicker: ticker.NewForce(DefaultAckInterval),
HtlcNotifier: &mockHTLCNotifier{}, HtlcNotifier: &mockHTLCNotifier{},
Clock: clock.NewDefaultClock(),
HTLCExpiry: time.Hour,
} }
return New(cfg, startingHeight) return New(cfg, startingHeight)

@ -15,6 +15,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
@ -36,6 +37,10 @@ const (
// DefaultAckInterval is the duration between attempts to ack any settle // DefaultAckInterval is the duration between attempts to ack any settle
// fails in a forwarding package. // fails in a forwarding package.
DefaultAckInterval = 15 * time.Second DefaultAckInterval = 15 * time.Second
// DefaultHTLCExpiry is the duration after which Adds will be cancelled
// if they could not get added to an outgoing commitment.
DefaultHTLCExpiry = time.Minute
) )
var ( var (
@ -174,6 +179,15 @@ type Config struct {
// RejectHTLC is a flag that instructs the htlcswitch to reject any // RejectHTLC is a flag that instructs the htlcswitch to reject any
// HTLCs that are not from the source hop. // HTLCs that are not from the source hop.
RejectHTLC bool RejectHTLC bool
// Clock is a time source for the switch.
Clock clock.Clock
// HTLCExpiry is the interval after which Adds will be cancelled if they
// have not been yet been delivered to a link. The computed deadline
// will expiry this long after the Adds are added to a mailbox via
// AddPacket.
HTLCExpiry time.Duration
} }
// Switch is the central messaging bus for all incoming/outgoing HTLCs. // Switch is the central messaging bus for all incoming/outgoing HTLCs.
@ -301,6 +315,8 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) {
s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{ s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{
fetchUpdate: s.cfg.FetchLastChannelUpdate, fetchUpdate: s.cfg.FetchLastChannelUpdate,
forwardPackets: s.ForwardPackets, forwardPackets: s.ForwardPackets,
clock: s.cfg.Clock,
expiry: s.cfg.HTLCExpiry,
}) })
return s, nil return s, nil

@ -496,6 +496,8 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
AckEventTicker: ticker.New(htlcswitch.DefaultAckInterval), AckEventTicker: ticker.New(htlcswitch.DefaultAckInterval),
AllowCircularRoute: cfg.AllowCircularRoute, AllowCircularRoute: cfg.AllowCircularRoute,
RejectHTLC: cfg.RejectHTLC, RejectHTLC: cfg.RejectHTLC,
Clock: clock.NewDefaultClock(),
HTLCExpiry: htlcswitch.DefaultHTLCExpiry,
}, uint32(currentHeight)) }, uint32(currentHeight))
if err != nil { if err != nil {
return nil, err return nil, err