htlcswitch/link_test: add link trimming tests

This commit is contained in:
Conner Fromknecht 2018-04-28 23:21:43 -07:00
parent 9c178f3d7f
commit 308ad1caf6
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

@ -6,6 +6,7 @@ import (
"encoding/binary"
"fmt"
"io"
"reflect"
"runtime"
"strings"
"sync"
@ -1413,30 +1414,31 @@ func (m *mockPeer) Disconnect(reason error) {
var _ Peer = (*mockPeer)(nil)
func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
ChannelLink, *lnwallet.LightningChannel, chan time.Time, func(), error) {
globalEpoch := &chainntnfs.BlockEpochEvent{
Epochs: make(chan *chainntnfs.BlockEpoch),
Cancel: func() {
},
}
ChannelLink, *lnwallet.LightningChannel, chan time.Time, func(),
chanRestoreFunc, error) {
var chanIDBytes [8]byte
if _, err := io.ReadFull(rand.Reader, chanIDBytes[:]); err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
chanID := lnwire.NewShortChanIDFromInt(
binary.BigEndian.Uint64(chanIDBytes[:]))
aliceChannel, bobChannel, fCleanUp, _, err := createTestChannel(
aliceChannel, bobChannel, fCleanUp, restore, err := createTestChannel(
alicePrivKey, bobPrivKey, chanAmt, chanAmt,
chanReserve, chanReserve, chanID,
)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
var (
globalEpoch = &chainntnfs.BlockEpochEvent{
Epochs: make(chan *chainntnfs.BlockEpoch),
Cancel: func() {
},
}
invoiceRegistry = newMockRegistry()
decoder = newMockIteratorDecoder()
obfuscator = NewMockObfuscator()
@ -1444,7 +1446,6 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
sentMsgs: make(chan lnwire.Message, 2000),
quit: make(chan struct{}),
}
globalPolicy = ForwardingPolicy{
MinHTLC: lnwire.NewMSatFromSatoshis(5),
BaseFee: lnwire.NewMSatFromSatoshis(1),
@ -1461,7 +1462,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
aliceSwitch, err := New(Config{DB: aliceDb})
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
t := make(chan time.Time)
@ -1494,11 +1495,8 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
const startingHeight = 100
aliceLink := NewChannelLink(aliceCfg, aliceChannel, startingHeight)
mailbox := newMemoryMailBox()
mailbox.Start()
aliceLink.AttachMailBox(mailbox)
if err := aliceLink.Start(); err != nil {
return nil, nil, nil, nil, err
if err := aliceSwitch.AddLink(aliceLink); err != nil {
return nil, nil, nil, nil, nil, err
}
go func() {
for {
@ -1517,7 +1515,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
defer bobChannel.Stop()
}
return aliceLink, bobChannel, t, cleanUp, nil
return aliceLink, bobChannel, t, cleanUp, restore, nil
}
func assertLinkBandwidth(t *testing.T, link ChannelLink,
@ -1689,7 +1687,8 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
// We'll start the test by creating a single instance of
const chanAmt = btcutil.SatoshiPerBitcoin * 5
aliceLink, bobChannel, tmr, cleanUp, err := newSingleLinkTestHarness(chanAmt, 0)
aliceLink, bobChannel, tmr, cleanUp, _, err :=
newSingleLinkTestHarness(chanAmt, 0)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
@ -2106,7 +2105,7 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) {
var mockBlob [lnwire.OnionPacketSize]byte
const chanAmt = btcutil.SatoshiPerBitcoin * 5
aliceLink, bobChannel, batchTick, cleanUp, err :=
aliceLink, bobChannel, batchTick, cleanUp, _, err :=
newSingleLinkTestHarness(chanAmt, 0)
if err != nil {
t.Fatalf("unable to create link: %v", err)
@ -2315,6 +2314,557 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) {
}
}
// genAddsAndCircuits creates `numHtlcs` sequential ADD packets and there
// corresponding circuits. The provided `htlc` is used in all test packets.
func genAddsAndCircuits(numHtlcs int, htlc *lnwire.UpdateAddHTLC) (
[]*htlcPacket, []*PaymentCircuit) {
addPkts := make([]*htlcPacket, 0, numHtlcs)
circuits := make([]*PaymentCircuit, 0, numHtlcs)
for i := 0; i < numHtlcs; i++ {
addPkt := htlcPacket{
htlc: htlc,
incomingChanID: sourceHop,
incomingHTLCID: uint64(i),
obfuscator: NewMockObfuscator(),
}
circuit := makePaymentCircuit(&htlc.PaymentHash, &addPkt)
addPkt.circuit = &circuit
addPkts = append(addPkts, &addPkt)
circuits = append(circuits, &circuit)
}
return addPkts, circuits
}
// TestChannelLinkTrimCircuitsPending checks that the switch and link properly
// trim circuits if there are open circuits corresponding to ADDs on a pending
// commmitment transaction.
func TestChannelLinkTrimCircuitsPending(t *testing.T) {
t.Parallel()
const (
chanAmt = btcutil.SatoshiPerBitcoin * 5
numHtlcs = 4
halfHtlcs = numHtlcs / 2
)
// We'll start by creating a new link with our chanAmt (5 BTC). We will
// only be testing Alice's behavior, so the reference to Bob's channel
// state is unnecessary.
aliceLink, _, batchTicker, cleanUp, restore, err :=
newSingleLinkTestHarness(chanAmt, 0)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
defer cleanUp()
alice := newPersistentLinkHarness(t, aliceLink, batchTicker, restore)
// Compute the static fees that will be used to determine the
// correctness of Alice's bandwidth when forwarding HTLCs.
estimator := &lnwallet.StaticFeeEstimator{
FeeRate: 24,
}
feeRate, err := estimator.EstimateFeePerVSize(1)
if err != nil {
t.Fatalf("unable to query fee estimator: %v", err)
}
defaultCommitFee := alice.channel.StateSnapshot().CommitFee
htlcFee := lnwire.NewMSatFromSatoshis(
feeRate.FeePerKWeight().FeeForWeight(lnwallet.HtlcWeight),
)
// The starting bandwidth of the channel should be exactly the amount
// that we created the channel between her and Bob, minus the commitment
// fee.
expectedBandwidth := lnwire.NewMSatFromSatoshis(chanAmt - defaultCommitFee)
assertLinkBandwidth(t, alice.link, expectedBandwidth)
// Capture Alice's starting bandwidth to perform later, relative
// bandwidth assertions.
aliceStartingBandwidth := alice.link.Bandwidth()
// Next, we'll create an HTLC worth 1 BTC that will be used as a dummy
// message for the test.
var mockBlob [lnwire.OnionPacketSize]byte
htlcAmt := lnwire.NewMSatFromSatoshis(btcutil.SatoshiPerBitcoin)
_, htlc, err := generatePayment(htlcAmt, htlcAmt, 5, mockBlob)
if err != nil {
t.Fatalf("unable to create payment: %v", err)
}
// Create `numHtlc` htlcPackets and payment circuits that will be used
// to drive the test. All of the packets will use the same dummy HTLC.
addPkts, circuits := genAddsAndCircuits(numHtlcs, htlc)
// To begin the test, start by committing the circuits belong to our
// first two HTLCs.
fwdActions := alice.commitCircuits(circuits[:halfHtlcs])
// Both of these circuits should have successfully added, as this is the
// first attempt to send them.
if len(fwdActions.Adds) != halfHtlcs {
t.Fatalf("expected %d circuits to be added", halfHtlcs)
}
alice.assertNumPendingNumOpenCircuits(2, 0)
// Since both were committed successfully, we will now deliver them to
// Alice's link.
for _, addPkt := range addPkts[:halfHtlcs] {
if err := alice.link.HandleSwitchPacket(addPkt); err != nil {
t.Fatalf("unable to handle switch packet: %v", err)
}
}
// Wait until Alice's link has sent both HTLCs via the peer.
alice.checkSent(addPkts[:halfHtlcs])
// The resulting bandwidth should reflect that Alice is paying both
// htlc amounts, in addition to both htlc fees.
assertLinkBandwidth(t, alice.link,
aliceStartingBandwidth-halfHtlcs*(htlcAmt+htlcFee),
)
// Now, initiate a state transition by Alice so that the pending HTLCs
// are locked in. This will *not* involve any participation by Bob,
// which ensures the commitment will remain in a pending state.
alice.trySignNextCommitment()
alice.assertNumPendingNumOpenCircuits(2, 2)
// Restart Alice's link, which simulates a disconnection with the remote
// peer.
cleanUp = alice.restart(false)
defer cleanUp()
alice.assertNumPendingNumOpenCircuits(2, 2)
// Make a second attempt to commit the first two circuits. This can
// happen if the incoming link flaps, but also allows us to verify that
// the circuits were trimmed properly.
fwdActions = alice.commitCircuits(circuits[:halfHtlcs])
// Since Alice has a pending commitment with the first two HTLCs, the
// restart should not have trimmed them from the circuit map.
// Therefore, we expect both of these circuits to be dropped by the
// switch, as keystones should still be set.
if len(fwdActions.Drops) != halfHtlcs {
t.Fatalf("expected %d packets to be dropped", halfHtlcs)
}
// The resulting bandwidth should remain unchanged from before,
// reflecting that Alice is paying both htlc amounts, in addition to
// both htlc fees.
assertLinkBandwidth(t, alice.link,
aliceStartingBandwidth-halfHtlcs*(htlcAmt+htlcFee),
)
// Now, restart Alice's link *and* the entire switch. This will ensure
// that entire circuit map is reloaded from disk, and we can now test
// against the behavioral differences of committing circuits that
// conflict with duplicate circuits after a restart.
cleanUp = alice.restart(true)
defer cleanUp()
alice.assertNumPendingNumOpenCircuits(2, 2)
// Alice should not send out any messages. Even though Alice has a
// pending commitment transaction, channel reestablishment is not
// enabled in this test.
select {
case <-alice.msgs:
t.Fatalf("message should not have been sent by Alice")
case <-time.After(time.Second):
}
// We will now try to commit the circuits for all of our HTLCs. The
// first two are already on the pending commitment transaction, the
// latter two are new HTLCs.
fwdActions = alice.commitCircuits(circuits)
// The first two circuits should have been dropped, as they are still on
// the pending commitment transaction, and the restart should not have
// trimmed the circuits for these valid HTLCs.
if len(fwdActions.Drops) != halfHtlcs {
t.Fatalf("expected %d packets to be dropped", halfHtlcs)
}
// The latter two circuits are unknown the circuit map, and should
// report being added.
if len(fwdActions.Adds) != halfHtlcs {
t.Fatalf("expected %d packets to be added", halfHtlcs)
}
// Deliver the latter two HTLCs to Alice's links so that they can be
// processed and added to the in-memory commitment state.
for _, addPkt := range addPkts[halfHtlcs:] {
if err := alice.link.HandleSwitchPacket(addPkt); err != nil {
t.Fatalf("unable to handle switch packet: %v", err)
}
}
// Wait for Alice to send the two latter HTLCs via the peer.
alice.checkSent(addPkts[halfHtlcs:])
// With two HTLCs on the pending commit, and two added to the in-memory
// commitment state, the resulting bandwidth should reflect that Alice
// is paying the all htlc amounts in addition to all htlc fees.
assertLinkBandwidth(t, alice.link,
aliceStartingBandwidth-numHtlcs*(htlcAmt+htlcFee),
)
// We will try to initiate a state transition for Alice, which will
// ensure the circuits for the two in-memory HTLCs are opened. However,
// since we have a pending commitment, these HTLCs will not actually be
// included in a commitment.
alice.trySignNextCommitment()
alice.assertNumPendingNumOpenCircuits(4, 4)
// Restart Alice's link to simulate a disconnect. Since the switch
// remains up throughout, the two latter HTLCs will remain in the link's
// mailbox, and will reprocessed upon being reattached to the link.
cleanUp = alice.restart(false)
defer cleanUp()
alice.assertNumPendingNumOpenCircuits(4, 4)
// Again, try to recommit all of our circuits.
fwdActions = alice.commitCircuits(circuits)
// It is expected that all of these will get dropped by the switch.
// The first two circuits are still open as a result of being on the
// commitment transaction. The latter two should have had their open
// circuits trimmed, *but* since the HTLCs are still in Alice's mailbox,
// the switch knows not to fail them as a result of the latter two
// circuits never having been loaded from disk.
if len(fwdActions.Drops) != numHtlcs {
t.Fatalf("expected %d packets to be dropped", numHtlcs)
}
// Wait for the latter two htlcs to be pulled from the mailbox, added to
// the in-memory channel state, and sent out via the peer.
alice.checkSent(addPkts[halfHtlcs:])
// This should result in reconstructing the same bandwidth as our last
// assertion. There are two HTLCs on the pending commit, and two added
// to the in-memory commitment state, the resulting bandwidth should
// reflect that Alice is paying the all htlc amounts in addition to all
// htlc fees.
assertLinkBandwidth(t, alice.link,
aliceStartingBandwidth-numHtlcs*(htlcAmt+htlcFee),
)
// Again, we will try to initiate a state transition for Alice, which
// will ensure the circuits for the two in-memory HTLCs are opened.
// As before, these HTLCs will not actually be included in a commitment
// since we have a pending commitment.
alice.trySignNextCommitment()
alice.assertNumPendingNumOpenCircuits(4, 4)
// As a final persistence check, we will restart the link and switch,
// wiping the latter two HTLCs from memory, and forcing their circuits
// to be reloaded from disk.
cleanUp = alice.restart(true)
defer cleanUp()
alice.assertNumPendingNumOpenCircuits(4, 2)
// Alice's mailbox will be empty after the restart, and no channel
// reestablishment is configured, so no messages will be sent upon
// restart.
select {
case <-alice.msgs:
t.Fatalf("message should not have been sent by Alice")
case <-time.After(time.Second):
}
// Finally, make one last attempt to commit all circuits.
fwdActions = alice.commitCircuits(circuits)
// The first two HTLCs should still be dropped by the htlcswitch. Their
// existence on the pending commitment transaction should prevent their
// open circuits from being trimmed.
if len(fwdActions.Drops) != halfHtlcs {
t.Fatalf("expected %d packets to be dropped", halfHtlcs)
}
// The latter two HTLCs should now be failed by the switch. These will
// have been trimmed by the link or switch restarting, and since the
// HTLCs are known to be lost from memory (since their circuits were
// loaded from disk), it is safe fail them back as they won't ever be
// delivered to the outgoing link.
if len(fwdActions.Fails) != halfHtlcs {
t.Fatalf("expected %d packets to be dropped", halfHtlcs)
}
// Since the latter two HTLCs have been completely dropped from memory,
// only the first two HTLCs we added should still be reflected in the
// channel bandwidth.
assertLinkBandwidth(t, alice.link,
aliceStartingBandwidth-halfHtlcs*(htlcAmt+htlcFee),
)
}
// TestChannelLinkTrimCircuitsNoCommit checks that the switch and link properly trim
// circuits if the ADDs corresponding to open circuits are never committed.
func TestChannelLinkTrimCircuitsNoCommit(t *testing.T) {
t.Parallel()
const (
chanAmt = btcutil.SatoshiPerBitcoin * 5
numHtlcs = 4
halfHtlcs = numHtlcs / 2
)
// We'll start by creating a new link with our chanAmt (5 BTC). We will
// only be testing Alice's behavior, so the reference to Bob's channel
// state is unnecessary.
aliceLink, _, batchTicker, cleanUp, restore, err :=
newSingleLinkTestHarness(chanAmt, 0)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
defer cleanUp()
alice := newPersistentLinkHarness(t, aliceLink, batchTicker, restore)
// We'll put Alice into hodl.Commit mode, such that the circuits for any
// outgoing ADDs are opened, but the changes are not committed in the
// channel state.
alice.coreLink.cfg.HodlMask = hodl.Commit.Mask()
alice.coreLink.cfg.DebugHTLC = true
// Compute the static fees that will be used to determine the
// correctness of Alice's bandwidth when forwarding HTLCs.
estimator := &lnwallet.StaticFeeEstimator{
FeeRate: 24,
}
feeRate, err := estimator.EstimateFeePerVSize(1)
if err != nil {
t.Fatalf("unable to query fee estimator: %v", err)
}
defaultCommitFee := alice.channel.StateSnapshot().CommitFee
htlcFee := lnwire.NewMSatFromSatoshis(
feeRate.FeePerKWeight().FeeForWeight(lnwallet.HtlcWeight),
)
// The starting bandwidth of the channel should be exactly the amount
// that we created the channel between her and Bob, minus the commitment
// fee.
expectedBandwidth := lnwire.NewMSatFromSatoshis(chanAmt - defaultCommitFee)
assertLinkBandwidth(t, alice.link, expectedBandwidth)
// Capture Alice's starting bandwidth to perform later, relative
// bandwidth assertions.
aliceStartingBandwidth := alice.link.Bandwidth()
// Next, we'll create an HTLC worth 1 BTC that will be used as a dummy
// message for the test.
var mockBlob [lnwire.OnionPacketSize]byte
htlcAmt := lnwire.NewMSatFromSatoshis(btcutil.SatoshiPerBitcoin)
_, htlc, err := generatePayment(htlcAmt, htlcAmt, 5, mockBlob)
if err != nil {
t.Fatalf("unable to create payment: %v", err)
}
// Create `numHtlc` htlcPackets and payment circuits that will be used
// to drive the test. All of the packets will use the same dummy HTLC.
addPkts, circuits := genAddsAndCircuits(numHtlcs, htlc)
// To begin the test, start by committing the circuits belong to our
// first two HTLCs.
fwdActions := alice.commitCircuits(circuits[:halfHtlcs])
// Both of these circuits should have successfully added, as this is the
// first attempt to send them.
if len(fwdActions.Adds) != halfHtlcs {
t.Fatalf("expected %d circuits to be added", halfHtlcs)
}
// Since both were committed successfully, we will now deliver them to
// Alice's link.
for _, addPkt := range addPkts[:halfHtlcs] {
if err := alice.link.HandleSwitchPacket(addPkt); err != nil {
t.Fatalf("unable to handle switch packet: %v", err)
}
}
// Wait until Alice's link has sent both HTLCs via the peer.
alice.checkSent(addPkts[:halfHtlcs])
// The resulting bandwidth should reflect that Alice is paying both
// htlc amounts, in addition to both htlc fees.
assertLinkBandwidth(t, alice.link,
aliceStartingBandwidth-halfHtlcs*(htlcAmt+htlcFee),
)
alice.assertNumPendingNumOpenCircuits(2, 0)
// Now, init a state transition by Alice to try and commit the HTLCs.
// Since she is in hodl.Commit mode, this will fail, but the circuits
// will be opened persistently.
alice.trySignNextCommitment()
alice.assertNumPendingNumOpenCircuits(2, 2)
// Restart Alice's link, which simulates a disconnection with the remote
// peer. Alice's link and switch should trim the circuits that were
// opened but not committed.
cleanUp = alice.restart(false, hodl.Commit)
defer cleanUp()
alice.assertNumPendingNumOpenCircuits(2, 2)
// The first two HTLCs should have been reset in Alice's mailbox since
// the switch was not shutdown. Knowing this the switch should drop the
// two circuits, even if the circuits were trimmed.
fwdActions = alice.commitCircuits(circuits[:halfHtlcs])
if len(fwdActions.Drops) != halfHtlcs {
t.Fatalf("expected %d packets to be dropped since "+
"the switch has not been restarted", halfHtlcs)
}
// Wait for alice to process the first two HTLCs resend them via the
// peer.
alice.checkSent(addPkts[:halfHtlcs])
// The resulting bandwidth should reflect that Alice is paying both htlc
// amounts, in addition to both htlc fees.
assertLinkBandwidth(t, alice.link,
aliceStartingBandwidth-halfHtlcs*(htlcAmt+htlcFee),
)
// Again, initiate another state transition by Alice to try and commit
// the HTLCs. Since she is in hodl.Commit mode, this will fail, but the
// circuits will be opened persistently.
alice.trySignNextCommitment()
alice.assertNumPendingNumOpenCircuits(2, 2)
// Now, we we will do a full restart of the link and switch, configuring
// Alice again in hodl.Commit mode. Since none of the HTLCs were
// actually committed, the previously opened circuits should be trimmed
// by both the link and switch.
cleanUp = alice.restart(true, hodl.Commit)
defer cleanUp()
alice.assertNumPendingNumOpenCircuits(2, 0)
// Attempt another commit of our first two circuits. Both should fail,
// as the opened circuits should have been trimmed, and circuit map
// recognizes that these HTLCs were lost during the restart.
fwdActions = alice.commitCircuits(circuits[:halfHtlcs])
if len(fwdActions.Fails) != halfHtlcs {
t.Fatalf("expected %d packets to be failed", halfHtlcs)
}
// Bob should not receive any HTLCs from Alice, since Alice's mailbox is
// empty and there is no pending commitment.
select {
case <-alice.msgs:
t.Fatalf("received unexpected message from Alice")
case <-time.After(time.Second):
}
// Alice's bandwidth should have reverted back to her starting value.
assertLinkBandwidth(t, alice.link, aliceStartingBandwidth)
// Now, try to commit the last two payment circuits, which are unused
// thus far. These should succeed without hestiation.
fwdActions = alice.commitCircuits(circuits[halfHtlcs:])
if len(fwdActions.Adds) != halfHtlcs {
t.Fatalf("expected %d packets to be added", halfHtlcs)
}
// Deliver the last two HTLCs to the link via Alice's mailbox.
for _, addPkt := range addPkts[halfHtlcs:] {
if err := alice.link.HandleSwitchPacket(addPkt); err != nil {
t.Fatalf("unable to handle switch packet: %v", err)
}
}
// Verify that Alice processed and sent out the ADD packets via the
// peer.
alice.checkSent(addPkts[halfHtlcs:])
// The resulting bandwidth should reflect that Alice is paying both htlc
// amounts, in addition to both htlc fees.
assertLinkBandwidth(t, alice.link,
aliceStartingBandwidth-halfHtlcs*(htlcAmt+htlcFee),
)
// Now, initiate a state transition for Alice. Since we are hodl.Commit
// mode, this will only open the circuits that were added to the
// in-memory channel state.
alice.trySignNextCommitment()
alice.assertNumPendingNumOpenCircuits(4, 2)
// Restart Alice's link, and place her back in hodl.Commit mode. On
// restart, all previously opened circuits should be trimmed by both the
// link and the switch.
cleanUp = alice.restart(false, hodl.Commit)
defer cleanUp()
alice.assertNumPendingNumOpenCircuits(4, 2)
// Now, try to commit all of known circuits.
fwdActions = alice.commitCircuits(circuits)
// The first two HTLCs will fail to commit for the same reason as
// before, the circuits have been trimmed.
if len(fwdActions.Fails) != halfHtlcs {
t.Fatalf("expected %d packet to be failed", halfHtlcs)
}
// The last two HTLCs will be dropped, as thought the circuits are
// trimmed, the switch is aware that the HTLCs are still in Alice's
// mailbox.
if len(fwdActions.Drops) != halfHtlcs {
t.Fatalf("expected %d packet to be dropped", halfHtlcs)
}
// Wait until Alice reprocesses the last two HTLCs and sends them via
// the peer.
alice.checkSent(addPkts[halfHtlcs:])
// Her bandwidth should now reflect having sent only those two HTLCs.
assertLinkBandwidth(t, alice.link,
aliceStartingBandwidth-halfHtlcs*(htlcAmt+htlcFee),
)
// Now, initiate a state transition for Alice. Since we are hodl.Commit
// mode, this will only open the circuits that were added to the
// in-memory channel state.
alice.trySignNextCommitment()
alice.assertNumPendingNumOpenCircuits(4, 2)
// Finally, do one last restart of both the link and switch. This will
// flush the HTLCs from the mailbox. The circuits should now be trimmed
// for all of the HTLCs.
cleanUp = alice.restart(true, hodl.Commit)
defer cleanUp()
alice.assertNumPendingNumOpenCircuits(4, 0)
// Bob should not receive any HTLCs from Alice, as none of the HTLCs are
// in Alice's mailbox, and channel reestablishment is disabled.
select {
case <-alice.msgs:
t.Fatalf("received unexpected message from Alice")
case <-time.After(time.Second):
}
// Attempt to commit the last two circuits, both should now fail since
// though they were opened before shutting down, the circuits have been
// properly trimmed.
fwdActions = alice.commitCircuits(circuits[halfHtlcs:])
if len(fwdActions.Fails) != halfHtlcs {
t.Fatalf("expected %d packet to be failed", halfHtlcs)
}
// Alice balance should not have changed since the start.
assertLinkBandwidth(t, alice.link, aliceStartingBandwidth)
}
// TestChannelLinkBandwidthChanReserve checks that the bandwidth available
// on the channel link reflects the channel reserve that must be kept
// at all times.
@ -2325,7 +2875,7 @@ func TestChannelLinkBandwidthChanReserve(t *testing.T) {
// channel reserve.
const chanAmt = btcutil.SatoshiPerBitcoin * 5
const chanReserve = btcutil.SatoshiPerBitcoin * 1
aliceLink, bobChannel, batchTimer, cleanUp, err :=
aliceLink, bobChannel, batchTimer, cleanUp, _, err :=
newSingleLinkTestHarness(chanAmt, chanReserve)
if err != nil {
t.Fatalf("unable to create link: %v", err)
@ -2440,8 +2990,8 @@ func TestChannelLinkBandwidthChanReserve(t *testing.T) {
// should therefore be 0.
const bobChanAmt = btcutil.SatoshiPerBitcoin * 1
const bobChanReserve = btcutil.SatoshiPerBitcoin * 1.5
bobLink, _, _, bobCleanUp, err := newSingleLinkTestHarness(bobChanAmt,
bobChanReserve)
bobLink, _, _, bobCleanUp, _, err :=
newSingleLinkTestHarness(bobChanAmt, bobChanReserve)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
@ -3111,3 +3661,253 @@ func TestChannelLinkAcceptOverpay(t *testing.T) {
expectedCarolBandwidth, n.carolChannelLink.Bandwidth())
}
}
// chanRestoreFunc is a method signature for functions that can reload both
// endpoints of a link from their persistent storage engines.
type chanRestoreFunc func() (*lnwallet.LightningChannel, *lnwallet.LightningChannel, error)
// persistentLinkHarness is used to control the lifecylce of a link and the
// switch that operates it. It supports the ability to restart either the link
// or both the link and the switch.
type persistentLinkHarness struct {
t *testing.T
link ChannelLink
coreLink *channelLink
channel *lnwallet.LightningChannel
batchTicker chan time.Time
msgs chan lnwire.Message
restoreChan chanRestoreFunc
}
// newPersistentLinkHarness initializes a new persistentLinkHarness and derives
// the supporting references from the active link.
func newPersistentLinkHarness(t *testing.T, link ChannelLink,
batchTicker chan time.Time,
restore chanRestoreFunc) *persistentLinkHarness {
coreLink := link.(*channelLink)
return &persistentLinkHarness{
t: t,
link: link,
coreLink: coreLink,
channel: coreLink.channel,
batchTicker: batchTicker,
msgs: coreLink.cfg.Peer.(*mockPeer).sentMsgs,
restoreChan: restore,
}
}
// restart facilitates a shutdown and restart of the link maintained by the
// harness. The primary purpose of this method is to ensure the consistency of
// the supporting references is maintained across restarts.
//
// If `restartSwitch` is set, the entire switch will also be restarted,
// and will be reinitialized with the contents of the channeldb backing Alice's
// channel.
//
// Any number of hodl flags can be passed as additional arguments to this
// method. If none are provided, the mask will be extracted as hodl.MaskNone.
func (h *persistentLinkHarness) restart(restartSwitch bool,
hodlFlags ...hodl.Flag) func() {
// First, remove the link from the switch.
h.coreLink.cfg.Switch.RemoveLink(h.link.ChanID())
var htlcSwitch *Switch
if restartSwitch {
// If a switch restart is requested, we will stop it and
// leave htlcSwitch nil, which will trigger the creation
// of a fresh instance in restartLink.
h.coreLink.cfg.Switch.Stop()
} else {
// Otherwise, we capture the switch's reference so that
// it can be carried over to the restarted link.
htlcSwitch = h.coreLink.cfg.Switch
}
// Since our in-memory state may have diverged from our persistent
// state, we will restore the persisted state to ensure we always start
// the link in a consistent state.
var err error
h.channel, _, err = h.restoreChan()
if err != nil {
h.t.Fatalf("unable to restore channels: %v", err)
}
// Now, restart the link using the channel state. This will take care of
// adding the link to an existing switch, or creating a new one using
// the database owned by the link.
var cleanUp func()
h.link, h.batchTicker, cleanUp, err = restartLink(
h.channel, htlcSwitch, hodlFlags,
)
if err != nil {
h.t.Fatalf("unable to restart alicelink: %v", err)
}
// Repopulate the remaining fields in the harness.
h.coreLink = h.link.(*channelLink)
h.msgs = h.coreLink.cfg.Peer.(*mockPeer).sentMsgs
return cleanUp
}
// checkSent reads the links message stream and verify that the messages are
// dequeued in the same order as provided by `pkts`.
func (h *persistentLinkHarness) checkSent(pkts []*htlcPacket) {
for _, pkt := range pkts {
var msg lnwire.Message
select {
case msg = <-h.msgs:
case <-time.After(15 * time.Second):
h.t.Fatalf("did not receive message")
}
if !reflect.DeepEqual(msg, pkt.htlc) {
h.t.Fatalf("unexpected packet, want %v, got %v",
pkt.htlc, msg)
}
}
}
// commitCircuits accepts a list of circuits and tries to commit them to the
// switch's circuit map. The forwarding actions are returned if there was no
// failure.
func (h *persistentLinkHarness) commitCircuits(circuits []*PaymentCircuit) *CircuitFwdActions {
fwdActions, err := h.coreLink.cfg.Switch.commitCircuits(circuits...)
if err != nil {
h.t.Fatalf("unable to commit circuit: %v", err)
}
return fwdActions
}
func (h *persistentLinkHarness) assertNumPendingNumOpenCircuits(
wantPending, wantOpen int) {
_, _, line, _ := runtime.Caller(1)
numPending := h.coreLink.cfg.Switch.circuits.NumPending()
if numPending != wantPending {
h.t.Fatalf("line: %d: wrong number of pending circuits: "+
"want %d, got %d", line, wantPending, numPending)
}
numOpen := h.coreLink.cfg.Switch.circuits.NumOpen()
if numOpen != wantOpen {
h.t.Fatalf("line: %d: wrong number of open circuits: "+
"want %d, got %d", line, wantOpen, numOpen)
}
}
// trySignNextCommitment signals the batch ticker so that the link will try to
// update its commitment transaction.
func (h *persistentLinkHarness) trySignNextCommitment() {
select {
case h.batchTicker <- time.Now():
// Give the link enough time to process the request.
time.Sleep(time.Millisecond * 500)
case <-time.After(15 * time.Second):
h.t.Fatalf("did not initiate state transition")
}
}
// restartLink creates a new channel link from the given channel state, and adds
// to an htlcswitch. If none is provided by the caller, a new one will be
// created using Alice's database.
func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch,
hodlFlags []hodl.Flag) (ChannelLink, chan time.Time, func(), error) {
var (
globalEpoch = &chainntnfs.BlockEpochEvent{
Epochs: make(chan *chainntnfs.BlockEpoch),
Cancel: func() {
},
}
invoiceRegistry = newMockRegistry()
decoder = newMockIteratorDecoder()
obfuscator = NewMockObfuscator()
alicePeer = &mockPeer{
sentMsgs: make(chan lnwire.Message, 2000),
quit: make(chan struct{}),
}
globalPolicy = ForwardingPolicy{
MinHTLC: lnwire.NewMSatFromSatoshis(5),
BaseFee: lnwire.NewMSatFromSatoshis(1),
TimeLockDelta: 6,
}
pCache = &mockPreimageCache{
// hash -> preimage
preimageMap: make(map[[32]byte][]byte),
}
)
aliceDb := aliceChannel.State().Db
if aliceSwitch == nil {
var err error
aliceSwitch, err = New(Config{DB: aliceDb})
if err != nil {
return nil, nil, nil, err
}
}
t := make(chan time.Time)
ticker := &mockTicker{t}
aliceCfg := ChannelLinkConfig{
FwrdingPolicy: globalPolicy,
Peer: alicePeer,
Switch: aliceSwitch,
Circuits: aliceSwitch.CircuitModifier(),
ForwardPackets: aliceSwitch.ForwardPackets,
DecodeHopIterators: decoder.DecodeHopIterators,
ExtractErrorEncrypter: func(*btcec.PublicKey) (
ErrorEncrypter, lnwire.FailCode) {
return obfuscator, lnwire.CodeNone
},
FetchLastChannelUpdate: mockGetChanUpdateMessage,
PreimageCache: pCache,
UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil
},
Registry: invoiceRegistry,
ChainEvents: &contractcourt.ChainEventSubscription{},
BlockEpochs: globalEpoch,
BatchTicker: ticker,
FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)),
// Make the BatchSize large enough to not
// trigger commit update automatically during tests.
BatchSize: 10000,
// Set any hodl flags requested for the new link.
HodlMask: hodl.MaskFromFlags(hodlFlags...),
DebugHTLC: len(hodlFlags) > 0,
}
const startingHeight = 100
aliceLink := NewChannelLink(aliceCfg, aliceChannel, startingHeight)
if err := aliceSwitch.AddLink(aliceLink); err != nil {
return nil, nil, nil, err
}
go func() {
for {
select {
case <-aliceLink.(*channelLink).htlcUpdates:
case <-aliceLink.(*channelLink).quit:
return
}
}
}()
cleanUp := func() {
close(alicePeer.quit)
defer aliceLink.Stop()
}
return aliceLink, t, cleanUp, nil
}