lnwallet: restore unsigned acked remote updates

This commit updates the channel state machine to
persistently store remote updates that we have received a
signature for, but that we haven't yet included in a commit
signature of our own.

Previously those updates were only stored in memory and
dropped across restarts. This lead to the production of
an invalid signature and channel force closure. The remote
party expects us to include those updates.
This commit is contained in:
Joost Jager 2020-01-03 15:53:51 +01:00
parent ed8fa35ed4
commit 82579400b3
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7
5 changed files with 384 additions and 16 deletions

@ -66,6 +66,11 @@ var (
// party.
chanCommitmentKey = []byte("chan-commitment-key")
// unsignedAckedUpdatesKey is an entry in the channel bucket that
// contains the remote updates that we have acked, but not yet signed
// for in one of our remote commits.
unsignedAckedUpdatesKey = []byte("unsigned-acked-updates-key")
// revocationStateKey stores their current revocation hash, our
// preimage producer and their preimage store.
revocationStateKey = []byte("revocation-state-key")
@ -1242,9 +1247,14 @@ func syncNewChannel(tx *bbolt.Tx, c *OpenChannel, addrs []net.Addr) error {
// UpdateCommitment updates the local commitment state. It locks in the pending
// local updates that were received by us from the remote party. The commitment
// state completely describes the balance state at this point in the commitment
// chain. This method its to be called when we revoke our prior commitment
// chain. In addition to that, it persists all the remote log updates that we
// have acked, but not signed a remote commitment for yet. These need to be
// persisted to be able to produce a valid commit signature if a restart would
// occur. This method its to be called when we revoke our prior commitment
// state.
func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error {
func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment,
unsignedAckedUpdates []LogUpdate) error {
c.Lock()
defer c.Unlock()
@ -1287,6 +1297,20 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error {
"revocations: %v", err)
}
// Persist unsigned but acked remote updates that need to be
// restored after a restart.
var b bytes.Buffer
err = serializeLogUpdates(&b, unsignedAckedUpdates)
if err != nil {
return err
}
err = chanBucket.Put(unsignedAckedUpdatesKey, b.Bytes())
if err != nil {
return fmt.Errorf("unable to store dangline remote "+
"updates: %v", err)
}
return nil
})
if err != nil {
@ -1751,6 +1775,14 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error {
return err
}
// Clear unsigned acked remote updates. We are signing now for
// all that we've got.
err = chanBucket.Delete(unsignedAckedUpdatesKey)
if err != nil {
return fmt.Errorf("unable to clear dangling remote "+
"updates: %v", err)
}
// TODO(roasbeef): use seqno to derive key for later LCP
// With the bucket retrieved, we'll now serialize the commit
@ -1804,6 +1836,38 @@ func (c *OpenChannel) RemoteCommitChainTip() (*CommitDiff, error) {
return cd, err
}
// UnsignedAckedUpdates retrieves the persisted unsigned acked remote log
// updates that still need to be signed for.
func (c *OpenChannel) UnsignedAckedUpdates() ([]LogUpdate, error) {
var updates []LogUpdate
err := c.Db.View(func(tx *bbolt.Tx) error {
chanBucket, err := fetchChanBucket(
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
switch err {
case nil:
case ErrNoChanDBExists, ErrNoActiveChannels, ErrChannelNotFound:
return nil
default:
return err
}
updateBytes := chanBucket.Get(unsignedAckedUpdatesKey)
if updateBytes == nil {
return nil
}
r := bytes.NewReader(updateBytes)
updates, err = deserializeLogUpdates(r)
return err
})
if err != nil {
return nil, err
}
return updates, nil
}
// InsertNextRevocation inserts the _next_ commitment point (revocation) into
// the database, and also modifies the internal RemoteNextRevocation attribute
// to point to the passed key. This method is to be using during final channel

@ -524,10 +524,34 @@ func TestChannelStateTransition(t *testing.T) {
// First update the local node's broadcastable state and also add a
// CommitDiff remote node's as well in order to simulate a proper state
// transition.
if err := channel.UpdateCommitment(&commitment); err != nil {
unsignedAckedUpdates := []LogUpdate{
{
LogIndex: 2,
UpdateMsg: &lnwire.UpdateAddHTLC{
ChanID: lnwire.ChannelID{1, 2, 3},
},
},
}
err = channel.UpdateCommitment(&commitment, unsignedAckedUpdates)
if err != nil {
t.Fatalf("unable to update commitment: %v", err)
}
// Assert that update is correctly written to the database.
dbUnsignedAckedUpdates, err := channel.UnsignedAckedUpdates()
if err != nil {
t.Fatalf("unable to fetch dangling remote updates: %v", err)
}
if len(dbUnsignedAckedUpdates) != 1 {
t.Fatalf("unexpected number of dangling remote updates")
}
if !reflect.DeepEqual(
dbUnsignedAckedUpdates[0], unsignedAckedUpdates[0],
) {
t.Fatalf("unexpected update")
}
// The balances, new update, the HTLCs and the changes to the fake
// commitment transaction along with the modified signature should all
// have been updated.

@ -403,7 +403,7 @@ func TestRestoreChannelShells(t *testing.T) {
// Ensure that it isn't possible to modify the commitment state machine
// of this restored channel.
channel := nodeChans[0]
err = channel.UpdateCommitment(nil)
err = channel.UpdateCommitment(nil, nil)
if err != ErrNoRestoredChannelMutation {
t.Fatalf("able to mutate restored channel")
}

@ -4,6 +4,7 @@ import (
"bytes"
"container/list"
"crypto/sha256"
"errors"
"fmt"
"math"
"sort"
@ -1177,6 +1178,13 @@ func (u *updateLog) appendUpdate(pd *PaymentDescriptor) {
u.logIndex++
}
// restoreUpdate appends a new update to the tip of the updateLog. The entry is
// also added to index accordingly. This function differs from appendUpdate in
// that it won't increment the log index counter.
func (u *updateLog) restoreUpdate(pd *PaymentDescriptor) {
u.updateIndex[u.logIndex] = u.PushBack(pd)
}
// appendHtlc appends a new HTLC offer to the tip of the update log. The entry
// is also added to the offer index accordingly.
func (u *updateLog) appendHtlc(pd *PaymentDescriptor) {
@ -1627,6 +1635,110 @@ func (lc *LightningChannel) logUpdateToPayDesc(logUpdate *channeldb.LogUpdate,
return pd, nil
}
// remoteLogUpdateToPayDesc converts a LogUpdate into a matching
// PaymentDescriptor entry that can be re-inserted into the update log. This
// method is used when we revoked a local commitment, but the connection was
// obstructed before we could sign a remote commitment that contains these
// updates. In this case, we need to re-insert the original entries back into
// the update log so we can resume as if nothing happened. The height of the
// latest local commitment is also expected to be provided. We are restoring all
// log update entries with this height, even though the real commitment height
// may be lower. In the way these fields are used elsewhere, this doesn't change
// anything.
func (lc *LightningChannel) remoteLogUpdateToPayDesc(logUpdate *channeldb.LogUpdate,
localUpdateLog *updateLog, commitHeight uint64) (*PaymentDescriptor,
error) {
switch wireMsg := logUpdate.UpdateMsg.(type) {
case *lnwire.UpdateAddHTLC:
pd := &PaymentDescriptor{
RHash: wireMsg.PaymentHash,
Timeout: wireMsg.Expiry,
Amount: wireMsg.Amount,
EntryType: Add,
HtlcIndex: wireMsg.ID,
LogIndex: logUpdate.LogIndex,
addCommitHeightLocal: commitHeight,
}
pd.OnionBlob = make([]byte, len(wireMsg.OnionBlob))
copy(pd.OnionBlob, wireMsg.OnionBlob[:])
// We don't need to generate an htlc script yet. This will be
// done once we sign our remote commitment.
return pd, nil
// For HTLCs that the remote party settled, we'll fetch the original
// offered HTLC from the local update log so we can retrieve the same
// PaymentDescriptor that ReceiveHTLCSettle would produce.
case *lnwire.UpdateFulfillHTLC:
ogHTLC := localUpdateLog.lookupHtlc(wireMsg.ID)
return &PaymentDescriptor{
Amount: ogHTLC.Amount,
RHash: ogHTLC.RHash,
RPreimage: wireMsg.PaymentPreimage,
LogIndex: logUpdate.LogIndex,
ParentIndex: ogHTLC.HtlcIndex,
EntryType: Settle,
removeCommitHeightLocal: commitHeight,
}, nil
// If we received a failure for a prior outgoing HTLC, then we'll
// consult the local update log so we can retrieve the information of
// the original HTLC we're failing.
case *lnwire.UpdateFailHTLC:
ogHTLC := localUpdateLog.lookupHtlc(wireMsg.ID)
return &PaymentDescriptor{
Amount: ogHTLC.Amount,
RHash: ogHTLC.RHash,
ParentIndex: ogHTLC.HtlcIndex,
LogIndex: logUpdate.LogIndex,
EntryType: Fail,
FailReason: wireMsg.Reason[:],
removeCommitHeightLocal: commitHeight,
}, nil
// HTLC fails due to malformed onion blobs are treated the exact same
// way as regular HTLC fails.
case *lnwire.UpdateFailMalformedHTLC:
ogHTLC := localUpdateLog.lookupHtlc(wireMsg.ID)
return &PaymentDescriptor{
Amount: ogHTLC.Amount,
RHash: ogHTLC.RHash,
ParentIndex: ogHTLC.HtlcIndex,
LogIndex: logUpdate.LogIndex,
EntryType: MalformedFail,
FailCode: wireMsg.FailureCode,
ShaOnionBlob: wireMsg.ShaOnionBlob,
removeCommitHeightLocal: commitHeight,
}, nil
// For fee updates we'll create a FeeUpdate type to add to the log. We
// reuse the amount field to hold the fee rate. Since the amount field
// is denominated in msat we won't lose precision when storing the
// sat/kw denominated feerate. Note that we set both the add and remove
// height to the same value, as we consider the fee update locked in by
// adding and removing it at the same height.
case *lnwire.UpdateFee:
return &PaymentDescriptor{
LogIndex: logUpdate.LogIndex,
Amount: lnwire.NewMSatFromSatoshis(
btcutil.Amount(wireMsg.FeePerKw),
),
EntryType: FeeUpdate,
addCommitHeightLocal: commitHeight,
removeCommitHeightLocal: commitHeight,
}, nil
default:
return nil, errors.New("unknown message type")
}
}
// restoreCommitState will restore the local commitment chain and updateLog
// state to a consistent in-memory representation of the passed disk commitment.
// This method is to be used upon reconnection to our channel counter party.
@ -1727,12 +1839,19 @@ func (lc *LightningChannel) restoreCommitState(
)
}
// Fetch remote updates that we have acked but not yet signed for.
unsignedAckedUpdates, err := lc.channelState.UnsignedAckedUpdates()
if err != nil {
return err
}
// Finally, with the commitment states restored, we'll now restore the
// state logs based on the current local+remote commit, and any pending
// remote commit that exists.
err = lc.restoreStateLogs(
localCommit, remoteCommit, pendingRemoteCommit,
pendingRemoteCommitDiff, pendingRemoteKeyChain,
unsignedAckedUpdates,
)
if err != nil {
return err
@ -1748,7 +1867,8 @@ func (lc *LightningChannel) restoreCommitState(
func (lc *LightningChannel) restoreStateLogs(
localCommitment, remoteCommitment, pendingRemoteCommit *commitment,
pendingRemoteCommitDiff *channeldb.CommitDiff,
pendingRemoteKeys *CommitmentKeyRing) error {
pendingRemoteKeys *CommitmentKeyRing,
unsignedAckedUpdates []channeldb.LogUpdate) error {
// We make a map of incoming HTLCs to the height of the remote
// commitment they were first added, and outgoing HTLCs to the height
@ -1825,6 +1945,71 @@ func (lc *LightningChannel) restoreStateLogs(
}
}
// Restore unsigned acked remote log updates so that we can include them
// in our next signature.
err := lc.restorePendingRemoteUpdates(
unsignedAckedUpdates, localCommitment.height,
)
if err != nil {
return err
}
return nil
}
// restorePendingRemoteUpdates restores the acked remote log updates that we
// haven't yet signed for.
func (lc *LightningChannel) restorePendingRemoteUpdates(
unsignedAckedUpdates []channeldb.LogUpdate,
localCommitmentHeight uint64) error {
lc.log.Debugf("Restoring %v dangling remote updates",
len(unsignedAckedUpdates))
for _, logUpdate := range unsignedAckedUpdates {
logUpdate := logUpdate
payDesc, err := lc.remoteLogUpdateToPayDesc(
&logUpdate, lc.localUpdateLog, localCommitmentHeight,
)
if err != nil {
return err
}
// Sanity check that we are not restoring a remote log update
// that we haven't received a sig for.
if payDesc.LogIndex >= lc.remoteUpdateLog.logIndex {
return fmt.Errorf("attempted to restore an "+
"unsigned remote update: log_index=%v",
payDesc.LogIndex)
}
// Insert the update into the log. The log update index doesn't
// need to be incremented (hence the restore calls), because its
// final value was properly persisted with the last local
// commitment update.
switch payDesc.EntryType {
case Add:
lc.remoteUpdateLog.restoreHtlc(payDesc)
// Sanity check to be sure that we are not restoring an
// add update that the remote hasn't signed for yet.
if payDesc.HtlcIndex >= lc.remoteUpdateLog.htlcCounter {
return fmt.Errorf("attempted to restore an "+
"unsigned remote htlc: htlc_index=%v",
payDesc.HtlcIndex)
}
case FeeUpdate:
lc.remoteUpdateLog.restoreUpdate(payDesc)
default:
lc.remoteUpdateLog.restoreUpdate(payDesc)
lc.localUpdateLog.markHtlcModified(payDesc.ParentIndex)
}
}
return nil
}
@ -3092,6 +3277,96 @@ func (lc *LightningChannel) createCommitDiff(
}, nil
}
// getUnsignedAckedUpdates returns all remote log updates that we haven't
// signed for yet ourselves.
func (lc *LightningChannel) getUnsignedAckedUpdates() []channeldb.LogUpdate {
// First, we need to convert the funding outpoint into the ID that's
// used on the wire to identify this channel.
chanID := lnwire.NewChanIDFromOutPoint(&lc.channelState.FundingOutpoint)
// Fetch the last remote update that we have signed for.
lastRemoteCommitted := lc.remoteCommitChain.tip().theirMessageIndex
// Fetch the last remote update that we have acked.
lastLocalCommitted := lc.localCommitChain.tail().theirMessageIndex
// We'll now run through the remote update log to locate the items that
// we haven't signed for yet. This will be the set of items we need to
// restore if we reconnect in order to produce the signature that the
// remote party expects.
var logUpdates []channeldb.LogUpdate
for e := lc.remoteUpdateLog.Front(); e != nil; e = e.Next() {
pd := e.Value.(*PaymentDescriptor)
// Skip all remote updates that we have already included in our
// commit chain.
if pd.LogIndex < lastRemoteCommitted {
continue
}
// Skip all remote updates that we haven't acked yet. At the
// moment this function is called, there shouldn't be any, but
// we check it anyway to make this function more generally
// usable.
if pd.LogIndex >= lastLocalCommitted {
continue
}
logUpdate := channeldb.LogUpdate{
LogIndex: pd.LogIndex,
}
// We'll map the type of the PaymentDescriptor to one of the
// four messages that it corresponds to.
switch pd.EntryType {
case Add:
htlc := &lnwire.UpdateAddHTLC{
ChanID: chanID,
ID: pd.HtlcIndex,
Amount: pd.Amount,
Expiry: pd.Timeout,
PaymentHash: pd.RHash,
}
copy(htlc.OnionBlob[:], pd.OnionBlob)
logUpdate.UpdateMsg = htlc
case Settle:
logUpdate.UpdateMsg = &lnwire.UpdateFulfillHTLC{
ChanID: chanID,
ID: pd.ParentIndex,
PaymentPreimage: pd.RPreimage,
}
case Fail:
logUpdate.UpdateMsg = &lnwire.UpdateFailHTLC{
ChanID: chanID,
ID: pd.ParentIndex,
Reason: pd.FailReason,
}
case MalformedFail:
logUpdate.UpdateMsg = &lnwire.UpdateFailMalformedHTLC{
ChanID: chanID,
ID: pd.ParentIndex,
ShaOnionBlob: pd.ShaOnionBlob,
FailureCode: pd.FailCode,
}
case FeeUpdate:
// The Amount field holds the feerate denominated in
// msat. Since feerates are only denominated in sat/kw,
// we can convert it without loss of precision.
logUpdate.UpdateMsg = &lnwire.UpdateFee{
ChanID: chanID,
FeePerKw: uint32(pd.Amount.ToSatoshis()),
}
}
logUpdates = append(logUpdates, logUpdate)
}
return logUpdates
}
// validateCommitmentSanity is used to validate the current state of the
// commitment transaction in terms of the ChannelConstraints that we and our
// remote peer agreed upon during the funding workflow. The predictAdded
@ -4280,15 +4555,24 @@ func (lc *LightningChannel) RevokeCurrentCommitment() (*lnwire.RevokeAndAck, []c
// persistent storage.
chainTail := lc.localCommitChain.tail()
newCommitment := chainTail.toDiskCommit(true)
err = lc.channelState.UpdateCommitment(newCommitment)
// Get the unsigned acked remotes updates that are currently in memory.
// We need them after a restart to sync our remote commitment with what
// is committed locally.
unsignedAckedUpdates := lc.getUnsignedAckedUpdates()
err = lc.channelState.UpdateCommitment(
newCommitment, unsignedAckedUpdates,
)
if err != nil {
return nil, nil, err
}
lc.log.Tracef("state transition accepted: "+
"our_balance=%v, their_balance=%v",
"our_balance=%v, their_balance=%v, unsigned_acked_updates=%v",
chainTail.ourBalance,
chainTail.theirBalance)
chainTail.theirBalance,
len(unsignedAckedUpdates))
revocationMsg.ChanID = lnwire.NewChanIDFromOutPoint(
&lc.channelState.FundingOutpoint,

@ -3151,10 +3151,9 @@ func TestChanSyncOweCommitmentPendingRemote(t *testing.T) {
t.Fatalf("unable to sign commitment: %v", err)
}
// This commitment is expected to contain no htlcs anymore, but because
// of a bug it is still present. THIS IS NOT CORRECT!
if len(bobHtlcSigs) != 2 {
t.Fatal("expected htlc to still be pending")
// This commitment is expected to contain no htlcs anymore.
if len(bobHtlcSigs) != 0 {
t.Fatalf("no htlcs expected, but got %v", len(bobHtlcSigs))
}
}
@ -6289,11 +6288,8 @@ func TestChannelRestoreUpdateLogsFailedHTLC(t *testing.T) {
// sent a new signature yet. If we'd now restart and restore, the htlc
// failure update should still be waiting for inclusion in Alice's next
// signature. Otherwise the produced signature would be invalid.
//
// THIS IS NOT HAPPENING. The update log entry is dropped after a
// restart!
assertInLogs(t, aliceChannel, 1, 0, 0, 1)
restoreAndAssert(t, aliceChannel, 1, 0, 0, 0)
restoreAndAssert(t, aliceChannel, 1, 0, 0, 1)
// Now send a signature from Alice. This will give Bob a new commitment
// where the HTLC is removed.