From 82579400b32db8e831e195d3bde623e205ece01c Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Fri, 3 Jan 2020 15:53:51 +0100 Subject: [PATCH] lnwallet: restore unsigned acked remote updates This commit updates the channel state machine to persistently store remote updates that we have received a signature for, but that we haven't yet included in a commit signature of our own. Previously those updates were only stored in memory and dropped across restarts. This lead to the production of an invalid signature and channel force closure. The remote party expects us to include those updates. --- channeldb/channel.go | 68 ++++++++- channeldb/channel_test.go | 26 +++- channeldb/db_test.go | 2 +- lnwallet/channel.go | 292 +++++++++++++++++++++++++++++++++++++- lnwallet/channel_test.go | 12 +- 5 files changed, 384 insertions(+), 16 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index de4bd6d9..ec81a600 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -66,6 +66,11 @@ var ( // party. chanCommitmentKey = []byte("chan-commitment-key") + // unsignedAckedUpdatesKey is an entry in the channel bucket that + // contains the remote updates that we have acked, but not yet signed + // for in one of our remote commits. + unsignedAckedUpdatesKey = []byte("unsigned-acked-updates-key") + // revocationStateKey stores their current revocation hash, our // preimage producer and their preimage store. revocationStateKey = []byte("revocation-state-key") @@ -1242,9 +1247,14 @@ func syncNewChannel(tx *bbolt.Tx, c *OpenChannel, addrs []net.Addr) error { // UpdateCommitment updates the local commitment state. It locks in the pending // local updates that were received by us from the remote party. The commitment // state completely describes the balance state at this point in the commitment -// chain. This method its to be called when we revoke our prior commitment +// chain. In addition to that, it persists all the remote log updates that we +// have acked, but not signed a remote commitment for yet. These need to be +// persisted to be able to produce a valid commit signature if a restart would +// occur. This method its to be called when we revoke our prior commitment // state. -func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error { +func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment, + unsignedAckedUpdates []LogUpdate) error { + c.Lock() defer c.Unlock() @@ -1287,6 +1297,20 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error { "revocations: %v", err) } + // Persist unsigned but acked remote updates that need to be + // restored after a restart. + var b bytes.Buffer + err = serializeLogUpdates(&b, unsignedAckedUpdates) + if err != nil { + return err + } + + err = chanBucket.Put(unsignedAckedUpdatesKey, b.Bytes()) + if err != nil { + return fmt.Errorf("unable to store dangline remote "+ + "updates: %v", err) + } + return nil }) if err != nil { @@ -1751,6 +1775,14 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error { return err } + // Clear unsigned acked remote updates. We are signing now for + // all that we've got. + err = chanBucket.Delete(unsignedAckedUpdatesKey) + if err != nil { + return fmt.Errorf("unable to clear dangling remote "+ + "updates: %v", err) + } + // TODO(roasbeef): use seqno to derive key for later LCP // With the bucket retrieved, we'll now serialize the commit @@ -1804,6 +1836,38 @@ func (c *OpenChannel) RemoteCommitChainTip() (*CommitDiff, error) { return cd, err } +// UnsignedAckedUpdates retrieves the persisted unsigned acked remote log +// updates that still need to be signed for. +func (c *OpenChannel) UnsignedAckedUpdates() ([]LogUpdate, error) { + var updates []LogUpdate + err := c.Db.View(func(tx *bbolt.Tx) error { + chanBucket, err := fetchChanBucket( + tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, + ) + switch err { + case nil: + case ErrNoChanDBExists, ErrNoActiveChannels, ErrChannelNotFound: + return nil + default: + return err + } + + updateBytes := chanBucket.Get(unsignedAckedUpdatesKey) + if updateBytes == nil { + return nil + } + + r := bytes.NewReader(updateBytes) + updates, err = deserializeLogUpdates(r) + return err + }) + if err != nil { + return nil, err + } + + return updates, nil +} + // InsertNextRevocation inserts the _next_ commitment point (revocation) into // the database, and also modifies the internal RemoteNextRevocation attribute // to point to the passed key. This method is to be using during final channel diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index 525b2489..2580f552 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -524,10 +524,34 @@ func TestChannelStateTransition(t *testing.T) { // First update the local node's broadcastable state and also add a // CommitDiff remote node's as well in order to simulate a proper state // transition. - if err := channel.UpdateCommitment(&commitment); err != nil { + unsignedAckedUpdates := []LogUpdate{ + { + LogIndex: 2, + UpdateMsg: &lnwire.UpdateAddHTLC{ + ChanID: lnwire.ChannelID{1, 2, 3}, + }, + }, + } + + err = channel.UpdateCommitment(&commitment, unsignedAckedUpdates) + if err != nil { t.Fatalf("unable to update commitment: %v", err) } + // Assert that update is correctly written to the database. + dbUnsignedAckedUpdates, err := channel.UnsignedAckedUpdates() + if err != nil { + t.Fatalf("unable to fetch dangling remote updates: %v", err) + } + if len(dbUnsignedAckedUpdates) != 1 { + t.Fatalf("unexpected number of dangling remote updates") + } + if !reflect.DeepEqual( + dbUnsignedAckedUpdates[0], unsignedAckedUpdates[0], + ) { + t.Fatalf("unexpected update") + } + // The balances, new update, the HTLCs and the changes to the fake // commitment transaction along with the modified signature should all // have been updated. diff --git a/channeldb/db_test.go b/channeldb/db_test.go index 4deffe98..b9b1189b 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -403,7 +403,7 @@ func TestRestoreChannelShells(t *testing.T) { // Ensure that it isn't possible to modify the commitment state machine // of this restored channel. channel := nodeChans[0] - err = channel.UpdateCommitment(nil) + err = channel.UpdateCommitment(nil, nil) if err != ErrNoRestoredChannelMutation { t.Fatalf("able to mutate restored channel") } diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 08d68506..be4302d9 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -4,6 +4,7 @@ import ( "bytes" "container/list" "crypto/sha256" + "errors" "fmt" "math" "sort" @@ -1177,6 +1178,13 @@ func (u *updateLog) appendUpdate(pd *PaymentDescriptor) { u.logIndex++ } +// restoreUpdate appends a new update to the tip of the updateLog. The entry is +// also added to index accordingly. This function differs from appendUpdate in +// that it won't increment the log index counter. +func (u *updateLog) restoreUpdate(pd *PaymentDescriptor) { + u.updateIndex[u.logIndex] = u.PushBack(pd) +} + // appendHtlc appends a new HTLC offer to the tip of the update log. The entry // is also added to the offer index accordingly. func (u *updateLog) appendHtlc(pd *PaymentDescriptor) { @@ -1627,6 +1635,110 @@ func (lc *LightningChannel) logUpdateToPayDesc(logUpdate *channeldb.LogUpdate, return pd, nil } +// remoteLogUpdateToPayDesc converts a LogUpdate into a matching +// PaymentDescriptor entry that can be re-inserted into the update log. This +// method is used when we revoked a local commitment, but the connection was +// obstructed before we could sign a remote commitment that contains these +// updates. In this case, we need to re-insert the original entries back into +// the update log so we can resume as if nothing happened. The height of the +// latest local commitment is also expected to be provided. We are restoring all +// log update entries with this height, even though the real commitment height +// may be lower. In the way these fields are used elsewhere, this doesn't change +// anything. +func (lc *LightningChannel) remoteLogUpdateToPayDesc(logUpdate *channeldb.LogUpdate, + localUpdateLog *updateLog, commitHeight uint64) (*PaymentDescriptor, + error) { + + switch wireMsg := logUpdate.UpdateMsg.(type) { + + case *lnwire.UpdateAddHTLC: + pd := &PaymentDescriptor{ + RHash: wireMsg.PaymentHash, + Timeout: wireMsg.Expiry, + Amount: wireMsg.Amount, + EntryType: Add, + HtlcIndex: wireMsg.ID, + LogIndex: logUpdate.LogIndex, + addCommitHeightLocal: commitHeight, + } + pd.OnionBlob = make([]byte, len(wireMsg.OnionBlob)) + copy(pd.OnionBlob, wireMsg.OnionBlob[:]) + + // We don't need to generate an htlc script yet. This will be + // done once we sign our remote commitment. + + return pd, nil + + // For HTLCs that the remote party settled, we'll fetch the original + // offered HTLC from the local update log so we can retrieve the same + // PaymentDescriptor that ReceiveHTLCSettle would produce. + case *lnwire.UpdateFulfillHTLC: + ogHTLC := localUpdateLog.lookupHtlc(wireMsg.ID) + + return &PaymentDescriptor{ + Amount: ogHTLC.Amount, + RHash: ogHTLC.RHash, + RPreimage: wireMsg.PaymentPreimage, + LogIndex: logUpdate.LogIndex, + ParentIndex: ogHTLC.HtlcIndex, + EntryType: Settle, + removeCommitHeightLocal: commitHeight, + }, nil + + // If we received a failure for a prior outgoing HTLC, then we'll + // consult the local update log so we can retrieve the information of + // the original HTLC we're failing. + case *lnwire.UpdateFailHTLC: + ogHTLC := localUpdateLog.lookupHtlc(wireMsg.ID) + + return &PaymentDescriptor{ + Amount: ogHTLC.Amount, + RHash: ogHTLC.RHash, + ParentIndex: ogHTLC.HtlcIndex, + LogIndex: logUpdate.LogIndex, + EntryType: Fail, + FailReason: wireMsg.Reason[:], + removeCommitHeightLocal: commitHeight, + }, nil + + // HTLC fails due to malformed onion blobs are treated the exact same + // way as regular HTLC fails. + case *lnwire.UpdateFailMalformedHTLC: + ogHTLC := localUpdateLog.lookupHtlc(wireMsg.ID) + + return &PaymentDescriptor{ + Amount: ogHTLC.Amount, + RHash: ogHTLC.RHash, + ParentIndex: ogHTLC.HtlcIndex, + LogIndex: logUpdate.LogIndex, + EntryType: MalformedFail, + FailCode: wireMsg.FailureCode, + ShaOnionBlob: wireMsg.ShaOnionBlob, + removeCommitHeightLocal: commitHeight, + }, nil + + // For fee updates we'll create a FeeUpdate type to add to the log. We + // reuse the amount field to hold the fee rate. Since the amount field + // is denominated in msat we won't lose precision when storing the + // sat/kw denominated feerate. Note that we set both the add and remove + // height to the same value, as we consider the fee update locked in by + // adding and removing it at the same height. + case *lnwire.UpdateFee: + return &PaymentDescriptor{ + LogIndex: logUpdate.LogIndex, + Amount: lnwire.NewMSatFromSatoshis( + btcutil.Amount(wireMsg.FeePerKw), + ), + EntryType: FeeUpdate, + addCommitHeightLocal: commitHeight, + removeCommitHeightLocal: commitHeight, + }, nil + + default: + return nil, errors.New("unknown message type") + } +} + // restoreCommitState will restore the local commitment chain and updateLog // state to a consistent in-memory representation of the passed disk commitment. // This method is to be used upon reconnection to our channel counter party. @@ -1727,12 +1839,19 @@ func (lc *LightningChannel) restoreCommitState( ) } + // Fetch remote updates that we have acked but not yet signed for. + unsignedAckedUpdates, err := lc.channelState.UnsignedAckedUpdates() + if err != nil { + return err + } + // Finally, with the commitment states restored, we'll now restore the // state logs based on the current local+remote commit, and any pending // remote commit that exists. err = lc.restoreStateLogs( localCommit, remoteCommit, pendingRemoteCommit, pendingRemoteCommitDiff, pendingRemoteKeyChain, + unsignedAckedUpdates, ) if err != nil { return err @@ -1748,7 +1867,8 @@ func (lc *LightningChannel) restoreCommitState( func (lc *LightningChannel) restoreStateLogs( localCommitment, remoteCommitment, pendingRemoteCommit *commitment, pendingRemoteCommitDiff *channeldb.CommitDiff, - pendingRemoteKeys *CommitmentKeyRing) error { + pendingRemoteKeys *CommitmentKeyRing, + unsignedAckedUpdates []channeldb.LogUpdate) error { // We make a map of incoming HTLCs to the height of the remote // commitment they were first added, and outgoing HTLCs to the height @@ -1825,6 +1945,71 @@ func (lc *LightningChannel) restoreStateLogs( } } + // Restore unsigned acked remote log updates so that we can include them + // in our next signature. + err := lc.restorePendingRemoteUpdates( + unsignedAckedUpdates, localCommitment.height, + ) + if err != nil { + return err + } + + return nil +} + +// restorePendingRemoteUpdates restores the acked remote log updates that we +// haven't yet signed for. +func (lc *LightningChannel) restorePendingRemoteUpdates( + unsignedAckedUpdates []channeldb.LogUpdate, + localCommitmentHeight uint64) error { + + lc.log.Debugf("Restoring %v dangling remote updates", + len(unsignedAckedUpdates)) + + for _, logUpdate := range unsignedAckedUpdates { + logUpdate := logUpdate + + payDesc, err := lc.remoteLogUpdateToPayDesc( + &logUpdate, lc.localUpdateLog, localCommitmentHeight, + ) + if err != nil { + return err + } + + // Sanity check that we are not restoring a remote log update + // that we haven't received a sig for. + if payDesc.LogIndex >= lc.remoteUpdateLog.logIndex { + return fmt.Errorf("attempted to restore an "+ + "unsigned remote update: log_index=%v", + payDesc.LogIndex) + } + + // Insert the update into the log. The log update index doesn't + // need to be incremented (hence the restore calls), because its + // final value was properly persisted with the last local + // commitment update. + switch payDesc.EntryType { + case Add: + lc.remoteUpdateLog.restoreHtlc(payDesc) + + // Sanity check to be sure that we are not restoring an + // add update that the remote hasn't signed for yet. + if payDesc.HtlcIndex >= lc.remoteUpdateLog.htlcCounter { + return fmt.Errorf("attempted to restore an "+ + "unsigned remote htlc: htlc_index=%v", + payDesc.HtlcIndex) + } + + case FeeUpdate: + lc.remoteUpdateLog.restoreUpdate(payDesc) + + default: + lc.remoteUpdateLog.restoreUpdate(payDesc) + + lc.localUpdateLog.markHtlcModified(payDesc.ParentIndex) + } + } + return nil } @@ -3092,6 +3277,96 @@ func (lc *LightningChannel) createCommitDiff( }, nil } +// getUnsignedAckedUpdates returns all remote log updates that we haven't +// signed for yet ourselves. +func (lc *LightningChannel) getUnsignedAckedUpdates() []channeldb.LogUpdate { + // First, we need to convert the funding outpoint into the ID that's + // used on the wire to identify this channel. + chanID := lnwire.NewChanIDFromOutPoint(&lc.channelState.FundingOutpoint) + + // Fetch the last remote update that we have signed for. + lastRemoteCommitted := lc.remoteCommitChain.tip().theirMessageIndex + + // Fetch the last remote update that we have acked. + lastLocalCommitted := lc.localCommitChain.tail().theirMessageIndex + + // We'll now run through the remote update log to locate the items that + // we haven't signed for yet. This will be the set of items we need to + // restore if we reconnect in order to produce the signature that the + // remote party expects. + var logUpdates []channeldb.LogUpdate + for e := lc.remoteUpdateLog.Front(); e != nil; e = e.Next() { + pd := e.Value.(*PaymentDescriptor) + + // Skip all remote updates that we have already included in our + // commit chain. + if pd.LogIndex < lastRemoteCommitted { + continue + } + + // Skip all remote updates that we haven't acked yet. At the + // moment this function is called, there shouldn't be any, but + // we check it anyway to make this function more generally + // usable. + if pd.LogIndex >= lastLocalCommitted { + continue + } + + logUpdate := channeldb.LogUpdate{ + LogIndex: pd.LogIndex, + } + + // We'll map the type of the PaymentDescriptor to one of the + // four messages that it corresponds to. + switch pd.EntryType { + case Add: + htlc := &lnwire.UpdateAddHTLC{ + ChanID: chanID, + ID: pd.HtlcIndex, + Amount: pd.Amount, + Expiry: pd.Timeout, + PaymentHash: pd.RHash, + } + copy(htlc.OnionBlob[:], pd.OnionBlob) + logUpdate.UpdateMsg = htlc + + case Settle: + logUpdate.UpdateMsg = &lnwire.UpdateFulfillHTLC{ + ChanID: chanID, + ID: pd.ParentIndex, + PaymentPreimage: pd.RPreimage, + } + + case Fail: + logUpdate.UpdateMsg = &lnwire.UpdateFailHTLC{ + ChanID: chanID, + ID: pd.ParentIndex, + Reason: pd.FailReason, + } + + case MalformedFail: + logUpdate.UpdateMsg = &lnwire.UpdateFailMalformedHTLC{ + ChanID: chanID, + ID: pd.ParentIndex, + ShaOnionBlob: pd.ShaOnionBlob, + FailureCode: pd.FailCode, + } + + case FeeUpdate: + // The Amount field holds the feerate denominated in + // msat. Since feerates are only denominated in sat/kw, + // we can convert it without loss of precision. + logUpdate.UpdateMsg = &lnwire.UpdateFee{ + ChanID: chanID, + FeePerKw: uint32(pd.Amount.ToSatoshis()), + } + } + + logUpdates = append(logUpdates, logUpdate) + } + return logUpdates +} + // validateCommitmentSanity is used to validate the current state of the // commitment transaction in terms of the ChannelConstraints that we and our // remote peer agreed upon during the funding workflow. The predictAdded @@ -4280,15 +4555,24 @@ func (lc *LightningChannel) RevokeCurrentCommitment() (*lnwire.RevokeAndAck, []c // persistent storage. chainTail := lc.localCommitChain.tail() newCommitment := chainTail.toDiskCommit(true) - err = lc.channelState.UpdateCommitment(newCommitment) + + // Get the unsigned acked remotes updates that are currently in memory. + // We need them after a restart to sync our remote commitment with what + // is committed locally. + unsignedAckedUpdates := lc.getUnsignedAckedUpdates() + + err = lc.channelState.UpdateCommitment( + newCommitment, unsignedAckedUpdates, + ) if err != nil { return nil, nil, err } lc.log.Tracef("state transition accepted: "+ - "our_balance=%v, their_balance=%v", + "our_balance=%v, their_balance=%v, unsigned_acked_updates=%v", chainTail.ourBalance, - chainTail.theirBalance) + chainTail.theirBalance, + len(unsignedAckedUpdates)) revocationMsg.ChanID = lnwire.NewChanIDFromOutPoint( &lc.channelState.FundingOutpoint, diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index 94b9d63c..9e5ecec5 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -3151,10 +3151,9 @@ func TestChanSyncOweCommitmentPendingRemote(t *testing.T) { t.Fatalf("unable to sign commitment: %v", err) } - // This commitment is expected to contain no htlcs anymore, but because - // of a bug it is still present. THIS IS NOT CORRECT! - if len(bobHtlcSigs) != 2 { - t.Fatal("expected htlc to still be pending") + // This commitment is expected to contain no htlcs anymore. + if len(bobHtlcSigs) != 0 { + t.Fatalf("no htlcs expected, but got %v", len(bobHtlcSigs)) } } @@ -6289,11 +6288,8 @@ func TestChannelRestoreUpdateLogsFailedHTLC(t *testing.T) { // sent a new signature yet. If we'd now restart and restore, the htlc // failure update should still be waiting for inclusion in Alice's next // signature. Otherwise the produced signature would be invalid. - // - // THIS IS NOT HAPPENING. The update log entry is dropped after a - // restart! assertInLogs(t, aliceChannel, 1, 0, 0, 1) - restoreAndAssert(t, aliceChannel, 1, 0, 0, 0) + restoreAndAssert(t, aliceChannel, 1, 0, 0, 1) // Now send a signature from Alice. This will give Bob a new commitment // where the HTLC is removed.