Merge pull request #485 from halseth/fix-negative-balance

Use remoteACKed index when calculating availableBalance.
This commit is contained in:
Olaoluwa Osuntokun 2018-02-05 16:23:17 -08:00 committed by GitHub
commit 18741831dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 668 additions and 64 deletions

@ -83,6 +83,34 @@ func ExpectedFee(f ForwardingPolicy, htlcAmt lnwire.MilliSatoshi) lnwire.MilliSa
return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
}
// Ticker is an interface used to wrap a time.Ticker in a struct,
// making mocking it easier.
type Ticker interface {
Start() <-chan time.Time
Stop()
}
// BatchTicker implements the Ticker interface, and wraps a time.Ticker.
type BatchTicker struct {
ticker *time.Ticker
}
// NewBatchTicker returns a new BatchTicker that wraps the passed
// time.Ticker.
func NewBatchTicker(t *time.Ticker) *BatchTicker {
return &BatchTicker{t}
}
// Start returns the tick channel for the underlying time.Ticker.
func (t *BatchTicker) Start() <-chan time.Time {
return t.ticker.C
}
// Stop stops the underlying time.Ticker.
func (t *BatchTicker) Stop() {
t.ticker.Stop()
}
// ChannelLinkConfig defines the configuration for the channel link. ALL
// elements within the configuration MUST be non-nil for channel link to carry
// out its duties.
@ -167,6 +195,17 @@ type ChannelLinkConfig struct {
// reestablishment message to the remote peer. It should be done if our
// clients have been restarted, or remote peer have been reconnected.
SyncStates bool
// BatchTicker is the ticker that determines the interval that we'll
// use to check the batch to see if there're any updates we should
// flush out. By batching updates into a single commit, we attempt
// to increase throughput by maximizing the number of updates
// coalesced into a single commit.
BatchTicker Ticker
// BatchSize is the max size of a batch of updates done to the link
// before we do a state update.
BatchSize uint32
}
// channelLink is the service which drives a channel's commitment update
@ -594,8 +633,8 @@ func (l *channelLink) htlcManager() {
}
}
batchTimer := time.NewTicker(50 * time.Millisecond)
defer batchTimer.Stop()
batchTick := l.cfg.BatchTicker.Start()
defer l.cfg.BatchTicker.Stop()
// TODO(roasbeef): fail chan in case of protocol violation
out:
@ -673,7 +712,7 @@ out:
break out
}
case <-batchTimer.C:
case <-batchTick:
// If the current batch is empty, then we have no work
// here.
if l.batchCounter == 0 {
@ -905,7 +944,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
// If this newly added update exceeds the min batch size for adds, or
// this is a settle request, then initiate an update.
if l.batchCounter >= 10 || isSettle {
if l.batchCounter >= l.cfg.BatchSize || isSettle {
if err := l.updateCommitTx(); err != nil {
l.fail("unable to update commitment: %v", err)
return

@ -1389,13 +1389,16 @@ func TestChannelLinkSingleHopMessageOrdering(t *testing.T) {
type mockPeer struct {
sync.Mutex
sentMsgs []lnwire.Message
sentMsgs chan lnwire.Message
quit chan struct{}
}
func (m *mockPeer) SendMessage(msg lnwire.Message) error {
m.Lock()
m.sentMsgs = append(m.sentMsgs, msg)
m.Unlock()
select {
case m.sentMsgs <- msg:
case <-m.quit:
return fmt.Errorf("mockPeer shutting down")
}
return nil
}
func (m *mockPeer) WipeChannel(*wire.OutPoint) error {
@ -1406,19 +1409,11 @@ func (m *mockPeer) PubKey() [33]byte {
}
func (m *mockPeer) Disconnect(reason error) {
}
func (m *mockPeer) popSentMsg() lnwire.Message {
m.Lock()
msg := m.sentMsgs[0]
m.sentMsgs[0] = nil
m.sentMsgs = m.sentMsgs[1:]
m.Unlock()
return msg
}
var _ Peer = (*mockPeer)(nil)
func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), error) {
func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink,
*lnwallet.LightningChannel, chan time.Time, func(), error) {
globalEpoch := &chainntnfs.BlockEpochEvent{
Epochs: make(chan *chainntnfs.BlockEpoch),
Cancel: func() {
@ -1426,18 +1421,21 @@ func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), erro
}
chanID := lnwire.NewShortChanIDFromInt(4)
aliceChannel, _, fCleanUp, _, err := createTestChannel(
aliceChannel, bobChannel, fCleanUp, _, err := createTestChannel(
alicePrivKey, bobPrivKey, chanAmt, chanAmt, chanID,
)
if err != nil {
return nil, nil, err
return nil, nil, nil, nil, err
}
var (
invoiveRegistry = newMockRegistry()
decoder = &mockIteratorDecoder{}
obfuscator = newMockObfuscator()
alicePeer mockPeer
alicePeer = &mockPeer{
sentMsgs: make(chan lnwire.Message, 2000),
quit: make(chan struct{}),
}
globalPolicy = ForwardingPolicy{
MinHTLC: lnwire.NewMSatFromSatoshis(5),
@ -1451,9 +1449,11 @@ func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), erro
preimageMap: make(map[[32]byte][]byte),
}
t := make(chan time.Time)
ticker := &mockTicker{t}
aliceCfg := ChannelLinkConfig{
FwrdingPolicy: globalPolicy,
Peer: &alicePeer,
Peer: alicePeer,
Switch: New(Config{}),
DecodeHopIterator: decoder.DecodeHopIterator,
DecodeOnionObfuscator: func(io.Reader) (ErrorEncrypter, lnwire.FailCode) {
@ -1467,20 +1467,35 @@ func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), erro
Registry: invoiveRegistry,
ChainEvents: &contractcourt.ChainEventSubscription{},
BlockEpochs: globalEpoch,
BatchTicker: ticker,
// Make the BatchSize large enough to not
// trigger commit update automatically during tests.
BatchSize: 10000,
}
const startingHeight = 100
aliceLink := NewChannelLink(aliceCfg, aliceChannel, startingHeight)
if err := aliceLink.Start(); err != nil {
return nil, nil, err
return nil, nil, nil, nil, err
}
go func() {
for {
select {
case <-aliceLink.(*channelLink).htlcUpdates:
case <-aliceLink.(*channelLink).quit:
return
}
}
}()
cleanUp := func() {
close(alicePeer.quit)
defer fCleanUp()
defer aliceLink.Stop()
defer bobChannel.Stop()
}
return aliceLink, cleanUp, nil
return aliceLink, bobChannel, t, cleanUp, nil
}
func assertLinkBandwidth(t *testing.T, link ChannelLink,
@ -1494,6 +1509,148 @@ func assertLinkBandwidth(t *testing.T, link ChannelLink,
}
}
// handleStateUpdate handles the messages sent from the link after
// the batch ticker has triggered a state update.
func handleStateUpdate(link *channelLink,
remoteChannel *lnwallet.LightningChannel) error {
sentMsgs := link.cfg.Peer.(*mockPeer).sentMsgs
var msg lnwire.Message
select {
case msg = <-sentMsgs:
case <-time.After(20 * time.Second):
return fmt.Errorf("did not receive CommitSig from Alice")
}
// The link should be sending a commit sig at this point.
commitSig, ok := msg.(*lnwire.CommitSig)
if !ok {
return fmt.Errorf("expected CommitSig, got %T", msg)
}
// Let the remote channel receive the commit sig, and
// respond with a revocation + commitsig.
err := remoteChannel.ReceiveNewCommitment(
commitSig.CommitSig, commitSig.HtlcSigs)
if err != nil {
return err
}
remoteRev, _, err := remoteChannel.RevokeCurrentCommitment()
if err != nil {
return err
}
link.HandleChannelUpdate(remoteRev)
remoteSig, remoteHtlcSigs, err := remoteChannel.SignNextCommitment()
if err != nil {
return err
}
commitSig = &lnwire.CommitSig{
CommitSig: remoteSig,
HtlcSigs: remoteHtlcSigs,
}
link.HandleChannelUpdate(commitSig)
// This should make the link respond with a revocation.
select {
case msg = <-sentMsgs:
case <-time.After(20 * time.Second):
return fmt.Errorf("did not receive RevokeAndAck from Alice")
}
revoke, ok := msg.(*lnwire.RevokeAndAck)
if !ok {
return fmt.Errorf("expected RevokeAndAck got %T", msg)
}
_, err = remoteChannel.ReceiveRevocation(revoke)
if err != nil {
return fmt.Errorf("unable to recieve "+
"revocation: %v", err)
}
return nil
}
// updateState is used exchange the messages necessary to do a full state
// transition. If initiateUpdate=true, then this call will make the link
// trigger an update by sending on the batchTick channel, if not, it will
// make the remoteChannel initiate the state update.
func updateState(batchTick chan time.Time, link *channelLink,
remoteChannel *lnwallet.LightningChannel,
initiateUpdate bool) error {
sentMsgs := link.cfg.Peer.(*mockPeer).sentMsgs
if initiateUpdate {
// Trigger update by ticking the batchTicker.
select {
case batchTick <- time.Now():
case <-link.quit:
return fmt.Errorf("link shuttin down")
}
return handleStateUpdate(link, remoteChannel)
}
// The remote is triggering the state update, emulate this by
// signing and sending CommitSig to the link.
remoteSig, remoteHtlcSigs, err := remoteChannel.SignNextCommitment()
if err != nil {
return err
}
commitSig := &lnwire.CommitSig{
CommitSig: remoteSig,
HtlcSigs: remoteHtlcSigs,
}
link.HandleChannelUpdate(commitSig)
// The link should respond with a revocation + commit sig.
var msg lnwire.Message
select {
case msg = <-sentMsgs:
case <-time.After(20 * time.Second):
return fmt.Errorf("did not receive RevokeAndAck from Alice")
}
revoke, ok := msg.(*lnwire.RevokeAndAck)
if !ok {
return fmt.Errorf("expected RevokeAndAck got %T",
msg)
}
_, err = remoteChannel.ReceiveRevocation(revoke)
if err != nil {
return fmt.Errorf("unable to recieve "+
"revocation: %v", err)
}
select {
case msg = <-sentMsgs:
case <-time.After(20 * time.Second):
return fmt.Errorf("did not receive CommitSig from Alice")
}
commitSig, ok = msg.(*lnwire.CommitSig)
if !ok {
return fmt.Errorf("expected CommitSig, got %T", msg)
}
err = remoteChannel.ReceiveNewCommitment(
commitSig.CommitSig, commitSig.HtlcSigs)
if err != nil {
return err
}
// Lastly, send a revocation back to the link.
remoteRev, _, err := remoteChannel.RevokeCurrentCommitment()
if err != nil {
return err
}
link.HandleChannelUpdate(remoteRev)
// Sleep to make sure Alice has handled the remote revocation.
time.Sleep(500 * time.Millisecond)
return nil
}
// TestChannelLinkBandwidthConsistency ensures that the reported bandwidth of a
// given ChannelLink is properly updated in response to downstream messages
// from the switch, and upstream messages from its channel peer.
@ -1509,7 +1666,7 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
// We'll start the test by creating a single instance of
const chanAmt = btcutil.SatoshiPerBitcoin * 5
aliceLink, cleanUp, err := newSingleLinkTestHarness(chanAmt)
link, bobChannel, tmr, cleanUp, err := newSingleLinkTestHarness(chanAmt)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
@ -1517,11 +1674,18 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
var (
mockBlob [lnwire.OnionPacketSize]byte
coreChan = aliceLink.(*channelLink).channel
defaultCommitFee = coreChan.StateSnapshot().CommitFee
aliceLink = link.(*channelLink)
aliceChannel = aliceLink.channel
defaultCommitFee = aliceChannel.StateSnapshot().CommitFee
aliceStartingBandwidth = aliceLink.Bandwidth()
aliceMsgs = aliceLink.cfg.Peer.(*mockPeer).sentMsgs
)
// We put Alice into HodlHTLC mode, such that she won't settle
// incoming HTLCs automatically.
aliceLink.cfg.HodlHTLC = true
aliceLink.cfg.DebugHTLC = true
estimator := &lnwallet.StaticFeeEstimator{
FeeRate: 24,
}
@ -1552,17 +1716,61 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
}
aliceLink.HandleSwitchPacket(&addPkt)
time.Sleep(time.Millisecond * 500)
// The resulting bandwidth should reflect that Alice is paying the
// htlc amount in addition to the htlc fee.
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee)
// Alice should send the HTLC to Bob.
var msg lnwire.Message
select {
case msg = <-aliceMsgs:
case <-time.After(2 * time.Second):
t.Fatalf("did not receive message")
}
addHtlc, ok := msg.(*lnwire.UpdateAddHTLC)
if !ok {
t.Fatalf("expected UpdateAddHTLC, got %T", msg)
}
bobIndex, err := bobChannel.ReceiveHTLC(addHtlc)
if err != nil {
t.Fatalf("bob failed receiving htlc: %v", err)
}
// Lock in the HTLC.
if err := updateState(tmr, aliceLink, bobChannel, true); err != nil {
t.Fatalf("unable to update state: %v", err)
}
// Locking in the HTLC should not change Alice's bandwidth.
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee)
// If we now send in a valid HTLC settle for the prior HTLC we added,
// then the bandwidth should remain unchanged as the remote party will
// gain additional channel balance.
err = bobChannel.SettleHTLC(invoice.Terms.PaymentPreimage, bobIndex)
if err != nil {
t.Fatalf("unable to settle htlc: %v", err)
}
htlcSettle := &lnwire.UpdateFufillHTLC{
ID: 0,
ID: bobIndex,
PaymentPreimage: invoice.Terms.PaymentPreimage,
}
aliceLink.HandleChannelUpdate(htlcSettle)
time.Sleep(time.Millisecond * 500)
// Since the settle is not locked in yet, Alice's bandwidth should still
// reflect that she has to pay the fee.
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee)
// Lock in the settle.
if err := updateState(tmr, aliceLink, bobChannel, false); err != nil {
t.Fatalf("unable to update state: %v", err)
}
// Now that it is settled, Alice should have gotten the htlc fee back.
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt)
// Next, we'll add another HTLC initiated by the switch (of the same
@ -1576,31 +1784,96 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
}
aliceLink.HandleSwitchPacket(&addPkt)
time.Sleep(time.Millisecond * 500)
// Again, Alice's bandwidth decreases by htlcAmt+htlcFee.
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-2*htlcAmt-htlcFee)
// Alice will send the HTLC to Bob.
select {
case msg = <-aliceMsgs:
case <-time.After(2 * time.Second):
t.Fatalf("did not receive message")
}
addHtlc, ok = msg.(*lnwire.UpdateAddHTLC)
if !ok {
t.Fatalf("expected UpdateAddHTLC, got %T", msg)
}
bobIndex, err = bobChannel.ReceiveHTLC(addHtlc)
if err != nil {
t.Fatalf("bob failed receiving htlc: %v", err)
}
// Lock in the HTLC, which should not affect the bandwidth.
if err := updateState(tmr, aliceLink, bobChannel, true); err != nil {
t.Fatalf("unable to update state: %v", err)
}
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt*2-htlcFee)
// With that processed, we'll now generate an HTLC fail (sent by the
// remote peer) to cancel the HTLC we just added. This should return us
// back to the bandwidth of the link right before the HTLC was sent.
err = bobChannel.FailHTLC(bobIndex, []byte("nop"))
if err != nil {
t.Fatalf("unable to fail htlc: %v", err)
}
failMsg := &lnwire.UpdateFailHTLC{
ID: 1, // As this is the second HTLC.
ID: bobIndex,
Reason: lnwire.OpaqueReason([]byte("nop")),
}
aliceLink.HandleChannelUpdate(failMsg)
time.Sleep(time.Millisecond * 500)
// Before the Fail gets locked in, the bandwidth should remain unchanged.
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt*2-htlcFee)
// Lock in the Fail.
if err := updateState(tmr, aliceLink, bobChannel, false); err != nil {
t.Fatalf("unable to update state: %v", err)
}
// Now the bancdwidth should reflect the failed HTLC.
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt)
// Moving along, we'll now receive a new HTLC from the remote peer,
// with an ID of 0 as this is their first HTLC. The bandwidth should
// remain unchanged (but Alice will need to pay the fee for the extra
// HTLC).
updateMsg := &lnwire.UpdateAddHTLC{
ID: 0,
Amount: htlcAmt,
Expiry: 9,
PaymentHash: htlc.PaymentHash, // Re-using the same payment hash.
htlcAmt, totalTimelock, hops := generateHops(htlcAmt, testStartingHeight,
aliceLink)
blob, err := generateRoute(hops...)
if err != nil {
t.Fatalf("unable to gen route: %v", err)
}
aliceLink.HandleChannelUpdate(updateMsg)
time.Sleep(time.Millisecond * 500)
invoice, htlc, err = generatePayment(htlcAmt, htlcAmt,
totalTimelock, blob)
if err != nil {
t.Fatalf("unable to create payment: %v", err)
}
// We must add the invoice to the registry, such that Alice expects
// this payment.
err = aliceLink.cfg.Registry.(*mockInvoiceRegistry).AddInvoice(*invoice)
if err != nil {
t.Fatalf("unable to add invoice to registry: %v", err)
}
bobIndex, err = bobChannel.AddHTLC(htlc)
if err != nil {
t.Fatalf("unable to add htlc: %v", err)
}
aliceLink.HandleChannelUpdate(htlc)
// Alice's balance remains unchanged until this HTLC is locked in.
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt)
// Lock in the HTLC.
if err := updateState(tmr, aliceLink, bobChannel, false); err != nil {
t.Fatalf("unable to update state: %v", err)
}
// Since Bob is adding this HTLC, Alice only needs to pay the fee.
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee)
// Next, we'll settle the HTLC with our knowledge of the pre-image that
@ -1608,32 +1881,112 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
// of the channel should now be re-balanced to the starting point.
settlePkt := htlcPacket{
htlc: &lnwire.UpdateFufillHTLC{
ID: 2,
ID: bobIndex,
PaymentPreimage: invoice.Terms.PaymentPreimage,
},
}
aliceLink.HandleSwitchPacket(&settlePkt)
time.Sleep(time.Millisecond * 500)
// Settling this HTLC gives Alice all her original bandwidth back.
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth)
// Finally, we'll test the scenario of failing an HTLC received by the
// remote node. This should result in no perceived bandwidth changes.
htlcAdd := &lnwire.UpdateAddHTLC{
ID: 1,
Amount: htlcAmt,
Expiry: 9,
PaymentHash: htlc.PaymentHash,
// Alice wil send the Settle to Bob.
select {
case msg = <-aliceMsgs:
case <-time.After(2 * time.Second):
t.Fatalf("did not receive message")
}
aliceLink.HandleChannelUpdate(htlcAdd)
settleHtlc, ok := msg.(*lnwire.UpdateFufillHTLC)
if !ok {
t.Fatalf("expected UpdateFufillHTLC, got %T", msg)
}
pre := settleHtlc.PaymentPreimage
idx := settleHtlc.ID
err = bobChannel.ReceiveHTLCSettle(pre, idx)
if err != nil {
t.Fatalf("unable to receive settle: %v", err)
}
// After a settle the link should do a state transition automatically,
// so we don't have to trigger it.
if err := handleStateUpdate(aliceLink, bobChannel); err != nil {
t.Fatalf("unable to update state: %v", err)
}
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth)
// Finally, we'll test the scenario of failing an HTLC received from the
// remote node. This should result in no perceived bandwidth changes.
htlcAmt, totalTimelock, hops = generateHops(htlcAmt, testStartingHeight,
aliceLink)
blob, err = generateRoute(hops...)
if err != nil {
t.Fatalf("unable to gen route: %v", err)
}
invoice, htlc, err = generatePayment(htlcAmt, htlcAmt, totalTimelock, blob)
if err != nil {
t.Fatalf("unable to create payment: %v", err)
}
if err := aliceLink.cfg.Registry.(*mockInvoiceRegistry).AddInvoice(*invoice); err != nil {
t.Fatalf("unable to add invoice to registry: %v", err)
}
// Since we are not using the link to handle HTLC IDs for the
// remote channel, we must set this manually. This is the second
// HTLC we add, hence it should have an ID of 1 (Alice's channel
// link will set this automatically for her side).
htlc.ID = 1
bobIndex, err = bobChannel.AddHTLC(htlc)
if err != nil {
t.Fatalf("unable to add htlc: %v", err)
}
aliceLink.HandleChannelUpdate(htlc)
time.Sleep(time.Millisecond * 500)
// No changes before the HTLC is locked in.
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth)
if err := updateState(tmr, aliceLink, bobChannel, false); err != nil {
t.Fatalf("unable to update state: %v", err)
}
// After lock-in, Alice will have to pay the htlc fee.
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcFee)
// Now fail this HTLC.
failPkt := htlcPacket{
incomingHTLCID: bobIndex,
htlc: &lnwire.UpdateFailHTLC{
ID: 3,
ID: bobIndex,
},
}
aliceLink.HandleSwitchPacket(&failPkt)
time.Sleep(time.Millisecond * 500)
// Alice should get all her bandwidth back.
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth)
// Message should be sent to Bob.
select {
case msg = <-aliceMsgs:
case <-time.After(2 * time.Second):
t.Fatalf("did not receive message")
}
failMsg, ok = msg.(*lnwire.UpdateFailHTLC)
if !ok {
t.Fatalf("expected UpdateFailHTLC, got %T", msg)
}
err = bobChannel.ReceiveFailHTLC(failMsg.ID, []byte("fail"))
if err != nil {
t.Fatalf("failed receiving fail htlc: %v", err)
}
// After failing an HTLC, the link will automatically trigger
// a state update.
if err := handleStateUpdate(aliceLink, bobChannel); err != nil {
t.Fatalf("unable to update state: %v", err)
}
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth)
}
@ -1646,7 +1999,7 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) {
var mockBlob [lnwire.OnionPacketSize]byte
const chanAmt = btcutil.SatoshiPerBitcoin * 5
aliceLink, cleanUp, err := newSingleLinkTestHarness(chanAmt)
aliceLink, bobChannel, batchTick, cleanUp, err := newSingleLinkTestHarness(chanAmt)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
@ -1656,6 +2009,7 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) {
coreLink = aliceLink.(*channelLink)
defaultCommitFee = coreLink.channel.StateSnapshot().CommitFee
aliceStartingBandwidth = aliceLink.Bandwidth()
aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs
)
estimator := &lnwallet.StaticFeeEstimator{
@ -1667,6 +2021,11 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) {
}
feePerKw := feePerWeight * 1000
// The starting bandwidth of the channel should be exactly the amount
// that we created the channel between her and Bob.
expectedBandwidth := lnwire.NewMSatFromSatoshis(chanAmt - defaultCommitFee)
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
addLinkHTLC := func(amt lnwire.MilliSatoshi) [32]byte {
invoice, htlc, err := generatePayment(amt, amt, 5, mockBlob)
if err != nil {
@ -1676,7 +2035,6 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) {
htlc: htlc,
amount: amt,
})
return invoice.Terms.PaymentPreimage
}
@ -1694,13 +2052,39 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) {
totalHtlcAmt += htlcAmt
}
// The HTLCs should all be sent to the remote.
var msg lnwire.Message
for i := 0; i < numHTLCs; i++ {
select {
case msg = <-aliceMsgs:
case <-time.After(2 * time.Second):
t.Fatalf("did not receive message")
}
addHtlc, ok := msg.(*lnwire.UpdateAddHTLC)
if !ok {
t.Fatalf("expected UpdateAddHTLC, got %T", msg)
}
_, err := bobChannel.ReceiveHTLC(addHtlc)
if err != nil {
t.Fatalf("bob failed receiving htlc: %v", err)
}
}
select {
case msg = <-aliceMsgs:
t.Fatalf("unexpected message: %T", msg)
case <-time.After(20 * time.Millisecond):
}
// TODO(roasbeef): increase sleep
time.Sleep(time.Second * 1)
commitWeight := lnwallet.CommitWeight + lnwallet.HtlcWeight*numHTLCs
htlcFee := lnwire.NewMSatFromSatoshis(
btcutil.Amount((int64(feePerKw) * commitWeight) / 1000),
)
expectedBandwidth := aliceStartingBandwidth - totalHtlcAmt - htlcFee
expectedBandwidth = aliceStartingBandwidth - totalHtlcAmt - htlcFee
expectedBandwidth += lnwire.NewMSatFromSatoshis(defaultCommitFee)
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
@ -1722,23 +2106,43 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) {
totalHtlcAmt += htlcAmt
}
// No messages should be sent to the remote at this point.
select {
case msg = <-aliceMsgs:
t.Fatalf("unexpected message: %T", msg)
case <-time.After(20 * time.Millisecond):
}
time.Sleep(time.Second * 2)
expectedBandwidth -= (numOverFlowHTLCs * htlcAmt)
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
// With the extra HTLC's added, the overflow queue should now be
// populated with our 10 additional HTLC's.
// populated with our 20 additional HTLC's.
if coreLink.overflowQueue.Length() != numOverFlowHTLCs {
t.Fatalf("wrong overflow queue length: expected %v, got %v",
numOverFlowHTLCs,
coreLink.overflowQueue.Length())
}
// At this point, we'll now settle one of the HTLC's that were added.
// The resulting bandwidth change should be non-existent as this will
// simply transfer over funds to the remote party. However, the size of
// the overflow queue should be decreasing
// We trigger a state update to lock in the HTLCs. This should
// not change Alice's bandwidth.
if err := updateState(batchTick, coreLink, bobChannel, true); err != nil {
t.Fatalf("unable to update state: %v", err)
}
time.Sleep(time.Millisecond * 500)
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
// At this point, we'll now settle enough HTLCs to empty the overflow
// queue. The resulting bandwidth change should be non-existent as this
// will simply transfer over funds to the remote party. However, the
// size of the overflow queue should be decreasing
for i := 0; i < numOverFlowHTLCs; i++ {
err = bobChannel.SettleHTLC(preImages[i], uint64(i))
if err != nil {
t.Fatalf("unable to settle htlc: %v", err)
}
htlcSettle := &lnwire.UpdateFufillHTLC{
ID: uint64(i),
PaymentPreimage: preImages[i],
@ -1746,19 +2150,48 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) {
aliceLink.HandleChannelUpdate(htlcSettle)
time.Sleep(time.Millisecond * 50)
}
time.Sleep(time.Millisecond * 500)
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
// As we're not actually initiating a full state update, we'll
// trigger a free-slot signal manually here.
coreLink.overflowQueue.SignalFreeSlot()
// We trigger a state update to lock in the Settles.
if err := updateState(batchTick, coreLink, bobChannel, false); err != nil {
t.Fatalf("unable to update state: %v", err)
}
// After the state update is done, Alice should start sending
// HTLCs from the overflow queue.
for i := 0; i < numOverFlowHTLCs; i++ {
var msg lnwire.Message
select {
case msg = <-aliceMsgs:
case <-time.After(2 * time.Second):
t.Fatalf("did not receive message")
}
addHtlc, ok := msg.(*lnwire.UpdateAddHTLC)
if !ok {
t.Fatalf("expected UpdateAddHTLC, got %T", msg)
}
_, err := bobChannel.ReceiveHTLC(addHtlc)
if err != nil {
t.Fatalf("bob failed receiving htlc: %v", err)
}
}
select {
case msg = <-aliceMsgs:
t.Fatalf("unexpected message: %T", msg)
case <-time.After(20 * time.Millisecond):
}
time.Sleep(time.Millisecond * 500)
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
// Finally, at this point, the queue itself should be fully empty. As
// enough slots have been drained from the commitment transaction to
// allocate the queue items to.
time.Sleep(time.Millisecond * 100)
time.Sleep(time.Millisecond * 500)
if coreLink.overflowQueue.Length() != 0 {
t.Fatalf("wrong overflow queue length: expected %v, got %v", 0,
coreLink.overflowQueue.Length())

@ -6,6 +6,7 @@ import (
"fmt"
"sync"
"testing"
"time"
"io"
"sync/atomic"
@ -618,3 +619,14 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.S
Spend: make(chan *chainntnfs.SpendDetail),
}, nil
}
type mockTicker struct {
ticker <-chan time.Time
}
func (m *mockTicker) Start() <-chan time.Time {
return m.ticker
}
func (m *mockTicker) Stop() {
}

@ -437,17 +437,21 @@ type threeHopNetwork struct {
aliceServer *mockServer
aliceChannelLink *channelLink
aliceBlockEpoch chan *chainntnfs.BlockEpoch
aliceTicker *time.Ticker
firstBobChannelLink *channelLink
bobFirstBlockEpoch chan *chainntnfs.BlockEpoch
firstBobTicker *time.Ticker
bobServer *mockServer
secondBobChannelLink *channelLink
bobSecondBlockEpoch chan *chainntnfs.BlockEpoch
secondBobTicker *time.Ticker
carolChannelLink *channelLink
carolServer *mockServer
carolBlockEpoch chan *chainntnfs.BlockEpoch
carolTicker *time.Ticker
feeEstimator *mockFeeEstimator
@ -625,6 +629,11 @@ func (n *threeHopNetwork) stop() {
done <- struct{}{}
}()
n.aliceTicker.Stop()
n.firstBobTicker.Stop()
n.secondBobTicker.Stop()
n.carolTicker.Stop()
for i := 0; i < 3; i++ {
<-done
}
@ -743,6 +752,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
Cancel: func() {
},
}
aliceTicker := time.NewTicker(50 * time.Millisecond)
aliceChannelLink := NewChannelLink(
ChannelLinkConfig{
FwrdingPolicy: globalPolicy,
@ -763,6 +773,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
},
ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true,
BatchTicker: &mockTicker{aliceTicker.C},
BatchSize: 10,
},
aliceChannel,
startingHeight,
@ -772,7 +784,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
}
go func() {
for {
<-aliceChannelLink.(*channelLink).htlcUpdates
select {
case <-aliceChannelLink.(*channelLink).htlcUpdates:
case <-aliceChannelLink.(*channelLink).quit:
return
}
}
}()
@ -782,6 +798,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
Cancel: func() {
},
}
firstBobTicker := time.NewTicker(50 * time.Millisecond)
firstBobChannelLink := NewChannelLink(
ChannelLinkConfig{
FwrdingPolicy: globalPolicy,
@ -802,6 +819,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
},
ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true,
BatchTicker: &mockTicker{firstBobTicker.C},
BatchSize: 10,
},
firstBobChannel,
startingHeight,
@ -811,7 +830,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
}
go func() {
for {
<-firstBobChannelLink.(*channelLink).htlcUpdates
select {
case <-firstBobChannelLink.(*channelLink).htlcUpdates:
case <-firstBobChannelLink.(*channelLink).quit:
return
}
}
}()
@ -821,6 +844,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
Cancel: func() {
},
}
secondBobTicker := time.NewTicker(50 * time.Millisecond)
secondBobChannelLink := NewChannelLink(
ChannelLinkConfig{
FwrdingPolicy: globalPolicy,
@ -841,6 +865,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
},
ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true,
BatchTicker: &mockTicker{secondBobTicker.C},
BatchSize: 10,
},
secondBobChannel,
startingHeight,
@ -850,7 +876,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
}
go func() {
for {
<-secondBobChannelLink.(*channelLink).htlcUpdates
select {
case <-secondBobChannelLink.(*channelLink).htlcUpdates:
case <-secondBobChannelLink.(*channelLink).quit:
return
}
}
}()
@ -860,6 +890,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
Cancel: func() {
},
}
carolTicker := time.NewTicker(50 * time.Millisecond)
carolChannelLink := NewChannelLink(
ChannelLinkConfig{
FwrdingPolicy: globalPolicy,
@ -880,6 +911,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
},
ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true,
BatchTicker: &mockTicker{carolTicker.C},
BatchSize: 10,
},
carolChannel,
startingHeight,
@ -889,7 +922,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
}
go func() {
for {
<-carolChannelLink.(*channelLink).htlcUpdates
select {
case <-carolChannelLink.(*channelLink).htlcUpdates:
case <-carolChannelLink.(*channelLink).quit:
return
}
}
}()
@ -897,17 +934,21 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
aliceServer: aliceServer,
aliceChannelLink: aliceChannelLink.(*channelLink),
aliceBlockEpoch: aliceEpochChan,
aliceTicker: aliceTicker,
firstBobChannelLink: firstBobChannelLink.(*channelLink),
bobFirstBlockEpoch: bobFirstEpochChan,
firstBobTicker: firstBobTicker,
bobServer: bobServer,
secondBobChannelLink: secondBobChannelLink.(*channelLink),
bobSecondBlockEpoch: bobSecondEpochChan,
secondBobTicker: secondBobTicker,
carolChannelLink: carolChannelLink.(*channelLink),
carolServer: carolServer,
carolBlockEpoch: carolBlockEpoch,
carolTicker: carolTicker,
feeEstimator: feeEstimator,
globalPolicy: globalPolicy,

@ -4948,7 +4948,8 @@ func (lc *LightningChannel) availableBalance() (lnwire.MilliSatoshi, int64) {
// Next we'll grab the current set of log updates that are still active
// and haven't been garbage collected.
htlcView := lc.fetchHTLCView(lc.remoteUpdateLog.logIndex,
remoteACKedIndex := lc.localCommitChain.tip().theirMessageIndex
htlcView := lc.fetchHTLCView(remoteACKedIndex,
lc.localUpdateLog.logIndex)
feePerKw := lc.channelState.LocalCommitment.FeePerKw
dustLimit := lc.channelState.LocalChanCfg.DustLimit

@ -3914,6 +3914,13 @@ func TestChanAvailableBandwidth(t *testing.T) {
t.Fatalf("unable to recv htlc cancel: %v", err)
}
// We must do a state transition before the balance is available
// for Alice.
if err := forceStateTransition(aliceChannel, bobChannel); err != nil {
t.Fatalf("unable to complete alice's state "+
"transition: %v", err)
}
// With the HTLC's settled in the log, we'll now assert that if we
// initiate a state transition, then our guess was correct.
assertBandwidthEstimateCorrect(false)
@ -4293,4 +4300,67 @@ func TestChannelUnilateralCloseHtlcResolution(t *testing.T) {
}
}
// TestDesyncHTLCs checks that we cannot add HTLCs that would make the
// balance negative, when the remote and local update logs are desynced.
func TestDesyncHTLCs(t *testing.T) {
t.Parallel()
// We'll kick off the test by creating our channels which both are
// loaded with 5 BTC each.
aliceChannel, bobChannel, cleanUp, err := createTestChannels(1)
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
}
defer cleanUp()
// First add one HTLC of value 4.1 BTC.
htlcAmt := lnwire.NewMSatFromSatoshis(4.1 * btcutil.SatoshiPerBitcoin)
htlc, _ := createHTLC(0, htlcAmt)
aliceIndex, err := aliceChannel.AddHTLC(htlc)
if err != nil {
t.Fatalf("unable to add htlc: %v", err)
}
bobIndex, err := bobChannel.ReceiveHTLC(htlc)
if err != nil {
t.Fatalf("unable to recv htlc: %v", err)
}
// Lock this HTLC in.
if err := forceStateTransition(aliceChannel, bobChannel); err != nil {
t.Fatalf("unable to complete state update: %v", err)
}
// Now let let Bob fail this HTLC.
if err := bobChannel.FailHTLC(bobIndex, []byte("failreason")); err != nil {
t.Fatalf("unable to cancel HTLC: %v", err)
}
if err := aliceChannel.ReceiveFailHTLC(aliceIndex, []byte("bad")); err != nil {
t.Fatalf("unable to recv htlc cancel: %v", err)
}
// Alice now has gotten all here original balance (5 BTC) back,
// however, adding a new HTLC at this point SHOULD fail, since
// if she add the HTLC and sign the next state, Bob cannot assume
// she received the FailHTLC, and must assume she doesn't have
// the necessary balance available.
//
// We try adding an HTLC of value 1 BTC, which should fail
// because the balance is unavailable.
htlcAmt = lnwire.NewMSatFromSatoshis(1 * btcutil.SatoshiPerBitcoin)
htlc, _ = createHTLC(1, htlcAmt)
if _, err = aliceChannel.AddHTLC(htlc); err != ErrInsufficientBalance {
t.Fatalf("expected ErrInsufficientBalance, instead received: %v",
err)
}
// Now do a state transition, which will ACK the FailHTLC, making
// Alice able to add the new HTLC.
if err := forceStateTransition(aliceChannel, bobChannel); err != nil {
t.Fatalf("unable to complete state update: %v", err)
}
if _, err = aliceChannel.AddHTLC(htlc); err != nil {
t.Fatalf("unable to add htlc: %v", err)
}
}
// TODO(roasbeef): testing.Quick test case for retrans!!!

@ -259,6 +259,8 @@ func (s *sigPool) SubmitSignBatch(signJobs []signJob) {
case s.signJobs <- job:
case <-job.cancel:
// TODO(roasbeef): return error?
case <-s.quit:
return
}
}
}

@ -397,6 +397,9 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
)
},
SyncStates: true,
BatchTicker: htlcswitch.NewBatchTicker(
time.NewTicker(50 * time.Millisecond)),
BatchSize: 10,
}
link := htlcswitch.NewChannelLink(linkCfg, lnChan,
uint32(currentHeight))
@ -1289,6 +1292,9 @@ out:
)
},
SyncStates: false,
BatchTicker: htlcswitch.NewBatchTicker(
time.NewTicker(50 * time.Millisecond)),
BatchSize: 10,
}
link := htlcswitch.NewChannelLink(linkConfig, newChan,
uint32(currentHeight))