Merge pull request #4432 from Crypt-iQ/peer_unsigned_fix_0702

channeldb+lnwallet: store updates the peer should sign under new key
This commit is contained in:
Olaoluwa Osuntokun 2020-07-29 16:54:23 -07:00 committed by GitHub
commit d36d22107c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 411 additions and 9 deletions

@ -79,6 +79,11 @@ var (
// for in one of our remote commits.
unsignedAckedUpdatesKey = []byte("unsigned-acked-updates-key")
// remoteUnsignedLocalUpdatesKey is an entry in the channel bucket that
// contains the local updates that the remote party has acked, but
// has not yet signed for in one of their local commits.
remoteUnsignedLocalUpdatesKey = []byte("remote-unsigned-local-updates-key")
// revocationStateKey stores their current revocation hash, our
// preimage producer and their preimage store.
revocationStateKey = []byte("revocation-state-key")
@ -1448,6 +1453,39 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment,
"updates: %v", err)
}
// Persist the remote unsigned local updates that are not included
// in our new commitment.
updateBytes := chanBucket.Get(remoteUnsignedLocalUpdatesKey)
if updateBytes == nil {
return nil
}
r := bytes.NewReader(updateBytes)
updates, err := deserializeLogUpdates(r)
if err != nil {
return err
}
var validUpdates []LogUpdate
for _, upd := range updates {
// Filter for updates that are not on our local
// commitment.
if upd.LogIndex >= newCommitment.LocalLogIndex {
validUpdates = append(validUpdates, upd)
}
}
var b2 bytes.Buffer
err = serializeLogUpdates(&b2, validUpdates)
if err != nil {
return fmt.Errorf("unable to serialize log updates: %v", err)
}
err = chanBucket.Put(remoteUnsignedLocalUpdatesKey, b2.Bytes())
if err != nil {
return fmt.Errorf("unable to restore chanbucket: %v", err)
}
return nil
})
if err != nil {
@ -2065,6 +2103,39 @@ func (c *OpenChannel) UnsignedAckedUpdates() ([]LogUpdate, error) {
return updates, nil
}
// RemoteUnsignedLocalUpdates retrieves the persisted, unsigned local log
// updates that the remote still needs to sign for.
func (c *OpenChannel) RemoteUnsignedLocalUpdates() ([]LogUpdate, error) {
var updates []LogUpdate
err := kvdb.View(c.Db, func(tx kvdb.RTx) error {
chanBucket, err := fetchChanBucket(
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
switch err {
case nil:
break
case ErrNoChanDBExists, ErrNoActiveChannels, ErrChannelNotFound:
return nil
default:
return err
}
updateBytes := chanBucket.Get(remoteUnsignedLocalUpdatesKey)
if updateBytes == nil {
return nil
}
r := bytes.NewReader(updateBytes)
updates, err = deserializeLogUpdates(r)
return err
})
if err != nil {
return nil, err
}
return updates, nil
}
// InsertNextRevocation inserts the _next_ commitment point (revocation) into
// the database, and also modifies the internal RemoteNextRevocation attribute
// to point to the passed key. This method is to be using during final channel
@ -2101,8 +2172,12 @@ func (c *OpenChannel) InsertNextRevocation(revKey *btcec.PublicKey) error {
// this log can be consulted in order to reconstruct the state needed to
// rectify the situation. This method will add the current commitment for the
// remote party to the revocation log, and promote the current pending
// commitment to the current remote commitment.
func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error {
// commitment to the current remote commitment. The updates parameter is the
// set of local updates that the peer still needs to send us a signature for.
// We store this set of updates in case we go down.
func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg,
updates []LogUpdate) error {
c.Lock()
defer c.Unlock()
@ -2226,6 +2301,20 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error {
return fmt.Errorf("unable to store under unsignedAckedUpdatesKey: %v", err)
}
// Persist the local updates the peer hasn't yet signed so they
// can be restored after restart.
var b2 bytes.Buffer
err = serializeLogUpdates(&b2, updates)
if err != nil {
return err
}
err = chanBucket.Put(remoteUnsignedLocalUpdatesKey, b2.Bytes())
if err != nil {
return fmt.Errorf("unable to restore remote unsigned "+
"local updates: %v", err)
}
newRemoteCommit = &newCommit.Commitment
return nil

@ -797,7 +797,7 @@ func TestChannelStateTransition(t *testing.T) {
fwdPkg := NewFwdPkg(channel.ShortChanID(), oldRemoteCommit.CommitHeight,
diskCommitDiff.LogUpdates, nil)
err = channel.AdvanceCommitChainTail(fwdPkg)
err = channel.AdvanceCommitChainTail(fwdPkg, nil)
if err != nil {
t.Fatalf("unable to append to revocation log: %v", err)
}
@ -845,7 +845,7 @@ func TestChannelStateTransition(t *testing.T) {
fwdPkg = NewFwdPkg(channel.ShortChanID(), oldRemoteCommit.CommitHeight, nil, nil)
err = channel.AdvanceCommitChainTail(fwdPkg)
err = channel.AdvanceCommitChainTail(fwdPkg, nil)
if err != nil {
t.Fatalf("unable to append to revocation log: %v", err)
}

@ -398,7 +398,7 @@ func TestRestoreChannelShells(t *testing.T) {
if err != ErrNoRestoredChannelMutation {
t.Fatalf("able to mutate restored channel")
}
err = channel.AdvanceCommitChainTail(nil)
err = channel.AdvanceCommitChainTail(nil, nil)
if err != ErrNoRestoredChannelMutation {
t.Fatalf("able to mutate restored channel")
}

@ -1527,6 +1527,87 @@ func (lc *LightningChannel) logUpdateToPayDesc(logUpdate *channeldb.LogUpdate,
return pd, nil
}
// localLogUpdateToPayDesc converts a LogUpdate into a matching PaymentDescriptor
// entry that can be re-inserted into the local update log. This method is used
// when we sent an update+sig, receive a revocation, but drop right before the
// counterparty can sign for the update we just sent. In this case, we need to
// re-insert the original entries back into the update log so we'll be expecting
// the peer to sign them. The height of the remote commitment is expected to be
// provided and we restore all log update entries with this height, even though
// the real height may be lower. In the way these fields are used elsewhere, this
// doesn't change anything.
func (lc *LightningChannel) localLogUpdateToPayDesc(logUpdate *channeldb.LogUpdate,
remoteUpdateLog *updateLog, commitHeight uint64) (*PaymentDescriptor,
error) {
// Since Add updates aren't saved to disk under this key, the update will
// never be an Add.
switch wireMsg := logUpdate.UpdateMsg.(type) {
// For HTLCs that we settled, we'll fetch the original offered HTLC from
// the remote update log so we can retrieve the same PaymentDescriptor that
// ReceiveHTLCSettle would produce.
case *lnwire.UpdateFulfillHTLC:
ogHTLC := remoteUpdateLog.lookupHtlc(wireMsg.ID)
return &PaymentDescriptor{
Amount: ogHTLC.Amount,
RHash: ogHTLC.RHash,
RPreimage: wireMsg.PaymentPreimage,
LogIndex: logUpdate.LogIndex,
ParentIndex: ogHTLC.HtlcIndex,
EntryType: Settle,
removeCommitHeightRemote: commitHeight,
}, nil
// If we sent a failure for a prior incoming HTLC, then we'll consult the
// remote update log so we can retrieve the information of the original
// HTLC we're failing.
case *lnwire.UpdateFailHTLC:
ogHTLC := remoteUpdateLog.lookupHtlc(wireMsg.ID)
return &PaymentDescriptor{
Amount: ogHTLC.Amount,
RHash: ogHTLC.RHash,
ParentIndex: ogHTLC.HtlcIndex,
LogIndex: logUpdate.LogIndex,
EntryType: Fail,
FailReason: wireMsg.Reason[:],
removeCommitHeightRemote: commitHeight,
}, nil
// HTLC fails due to malformed onion blocks are treated the exact same
// way as regular HTLC fails.
case *lnwire.UpdateFailMalformedHTLC:
ogHTLC := remoteUpdateLog.lookupHtlc(wireMsg.ID)
return &PaymentDescriptor{
Amount: ogHTLC.Amount,
RHash: ogHTLC.RHash,
ParentIndex: ogHTLC.HtlcIndex,
LogIndex: logUpdate.LogIndex,
EntryType: MalformedFail,
FailCode: wireMsg.FailureCode,
ShaOnionBlob: wireMsg.ShaOnionBlob,
removeCommitHeightRemote: commitHeight,
}, nil
case *lnwire.UpdateFee:
return &PaymentDescriptor{
LogIndex: logUpdate.LogIndex,
Amount: lnwire.NewMSatFromSatoshis(
btcutil.Amount(wireMsg.FeePerKw),
),
EntryType: FeeUpdate,
addCommitHeightRemote: commitHeight,
removeCommitHeightRemote: commitHeight,
}, nil
default:
return nil, fmt.Errorf("unknown message type: %T", wireMsg)
}
}
// remoteLogUpdateToPayDesc converts a LogUpdate into a matching
// PaymentDescriptor entry that can be re-inserted into the update log. This
// method is used when we revoked a local commitment, but the connection was
@ -1736,13 +1817,19 @@ func (lc *LightningChannel) restoreCommitState(
return err
}
// Fetch the local updates the peer still needs to sign for.
remoteUnsignedLocalUpdates, err := lc.channelState.RemoteUnsignedLocalUpdates()
if err != nil {
return err
}
// Finally, with the commitment states restored, we'll now restore the
// state logs based on the current local+remote commit, and any pending
// remote commit that exists.
err = lc.restoreStateLogs(
localCommit, remoteCommit, pendingRemoteCommit,
pendingRemoteCommitDiff, pendingRemoteKeyChain,
unsignedAckedUpdates,
unsignedAckedUpdates, remoteUnsignedLocalUpdates,
)
if err != nil {
return err
@ -1759,7 +1846,8 @@ func (lc *LightningChannel) restoreStateLogs(
localCommitment, remoteCommitment, pendingRemoteCommit *commitment,
pendingRemoteCommitDiff *channeldb.CommitDiff,
pendingRemoteKeys *CommitmentKeyRing,
unsignedAckedUpdates []channeldb.LogUpdate) error {
unsignedAckedUpdates,
remoteUnsignedLocalUpdates []channeldb.LogUpdate) error {
// We make a map of incoming HTLCs to the height of the remote
// commitment they were first added, and outgoing HTLCs to the height
@ -1817,6 +1905,34 @@ func (lc *LightningChannel) restoreStateLogs(
outgoingLocalAddHeights[htlcIdx] = localCommitment.height
}
// If there are local updates that the peer needs to sign for, then the
// corresponding add is no longer on the remote commitment, but is still on
// our local commitment.
// ----fail--->
// ----sig---->
// <---rev-----
// To ensure proper channel operation, we restore the add's addCommitHeightRemote
// field to the height of the remote commitment.
for _, logUpdate := range remoteUnsignedLocalUpdates {
var htlcIdx uint64
switch wireMsg := logUpdate.UpdateMsg.(type) {
case *lnwire.UpdateFulfillHTLC:
htlcIdx = wireMsg.ID
case *lnwire.UpdateFailHTLC:
htlcIdx = wireMsg.ID
case *lnwire.UpdateFailMalformedHTLC:
htlcIdx = wireMsg.ID
default:
continue
}
// The htlcIdx is stored in the map with the remote commitment
// height so the related add's addCommitHeightRemote field can be
// restored.
incomingRemoteAddHeights[htlcIdx] = remoteCommitment.height
}
// For each incoming HTLC within the local commitment, we add it to the
// remote update log. Since HTLCs are added first to the receiver's
// commitment, we don't have to restore outgoing HTLCs, as they will be
@ -1873,7 +1989,11 @@ func (lc *LightningChannel) restoreStateLogs(
return err
}
return nil
// Restore unsigned acked local log updates so we expect the peer to
// sign for them.
return lc.restorePeerLocalUpdates(
remoteUnsignedLocalUpdates, remoteCommitment.height,
)
}
// restorePendingRemoteUpdates restores the acked remote log updates that we
@ -1956,6 +2076,38 @@ func (lc *LightningChannel) restorePendingRemoteUpdates(
return nil
}
// restorePeerLocalUpdates restores the acked local log updates the peer still
// needs to sign for.
func (lc *LightningChannel) restorePeerLocalUpdates(updates []channeldb.LogUpdate,
remoteCommitmentHeight uint64) error {
lc.log.Debugf("Restoring %v local updates that the peer should sign",
len(updates))
for _, logUpdate := range updates {
logUpdate := logUpdate
payDesc, err := lc.localLogUpdateToPayDesc(
&logUpdate, lc.remoteUpdateLog, remoteCommitmentHeight,
)
if err != nil {
return err
}
lc.localUpdateLog.restoreUpdate(payDesc)
// Since Add updates are not stored and FeeUpdates don't have a
// corresponding entry in the remote update log, we only need to
// mark the htlc as modified if the update was Settle, Fail, or
// MalformedFail.
if payDesc.EntryType != FeeUpdate {
lc.remoteUpdateLog.markHtlcModified(payDesc.ParentIndex)
}
}
return nil
}
// restorePendingLocalUpdates restores the local log updates leading up to the
// given pending remote commitment.
func (lc *LightningChannel) restorePendingLocalUpdates(
@ -4625,6 +4777,15 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) (
}
}
// We use the remote commitment chain's tip as it will soon become the tail
// once advanceTail is called.
remoteMessageIndex := lc.remoteCommitChain.tip().ourMessageIndex
localMessageIndex := lc.localCommitChain.tail().ourMessageIndex
localPeerUpdates := lc.unsignedLocalUpdates(
remoteMessageIndex, localMessageIndex, chanID,
)
// Now that we have gathered the set of HTLCs to forward, separated by
// type, construct a forwarding package using the height that the remote
// commitment chain will be extended after persisting the revocation.
@ -4637,7 +4798,7 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) (
// sync now to ensure the revocation producer state is consistent with
// the current commitment height and also to advance the on-disk
// commitment chain.
err = lc.channelState.AdvanceCommitChainTail(fwdPkg)
err = lc.channelState.AdvanceCommitChainTail(fwdPkg, localPeerUpdates)
if err != nil {
return nil, nil, nil, nil, err
}
@ -6750,3 +6911,70 @@ func (lc *LightningChannel) NextLocalHtlcIndex() (uint64, error) {
func (lc *LightningChannel) FwdMinHtlc() lnwire.MilliSatoshi {
return lc.channelState.LocalChanCfg.MinHTLC
}
// unsignedLocalUpdates retrieves the unsigned local updates that we should
// store upon receiving a revocation. This function is called from
// ReceiveRevocation. remoteMessageIndex is the height into the local update
// log that the remote commitment chain tip includes. localMessageIndex
// is the height into the local update log that the local commitment tail
// includes. Our local updates that are unsigned by the remote should
// have height greater than or equal to localMessageIndex (not on our commit),
// and height less than remoteMessageIndex (on the remote commit).
//
// NOTE: remoteMessageIndex is the height on the tip because this is called
// before the tail is advanced to the tip during ReceiveRevocation.
func (lc *LightningChannel) unsignedLocalUpdates(remoteMessageIndex,
localMessageIndex uint64, chanID lnwire.ChannelID) []channeldb.LogUpdate {
var localPeerUpdates []channeldb.LogUpdate
for e := lc.localUpdateLog.Front(); e != nil; e = e.Next() {
pd := e.Value.(*PaymentDescriptor)
// We don't save add updates as they are restored from the
// remote commitment in restoreStateLogs.
if pd.EntryType == Add {
continue
}
// This is a settle/fail that is on the remote commitment, but
// not on the local commitment. We expect this update to be
// covered in the next commitment signature that the remote
// sends.
if pd.LogIndex < remoteMessageIndex && pd.LogIndex >= localMessageIndex {
logUpdate := channeldb.LogUpdate{
LogIndex: pd.LogIndex,
}
switch pd.EntryType {
case FeeUpdate:
logUpdate.UpdateMsg = &lnwire.UpdateFee{
ChanID: chanID,
FeePerKw: uint32(pd.Amount.ToSatoshis()),
}
case Settle:
logUpdate.UpdateMsg = &lnwire.UpdateFulfillHTLC{
ChanID: chanID,
ID: pd.ParentIndex,
PaymentPreimage: pd.RPreimage,
}
case Fail:
logUpdate.UpdateMsg = &lnwire.UpdateFailHTLC{
ChanID: chanID,
ID: pd.ParentIndex,
Reason: pd.FailReason,
}
case MalformedFail:
logUpdate.UpdateMsg = &lnwire.UpdateFailMalformedHTLC{
ChanID: chanID,
ID: pd.ParentIndex,
ShaOnionBlob: pd.ShaOnionBlob,
FailureCode: pd.FailCode,
}
}
localPeerUpdates = append(localPeerUpdates, logUpdate)
}
}
return localPeerUpdates
}

@ -9212,3 +9212,88 @@ func TestChannelUnsignedAckedFailure(t *testing.T) {
err = newAliceChannel.ReceiveNewCommitment(bobSig, bobHtlcSigs)
require.NoError(t, err)
}
// TestChannelLocalUnsignedUpdatesFailure checks that updates from the local
// log are restored if the remote hasn't sent us a signature covering them.
//
// The full state transition is:
//
// Alice Bob
// <----add-----
// <----sig-----
// -----rev---->
// -----sig---->
// <----rev-----
// ----fail---->
// -----sig---->
// <----rev-----
// *reconnect*
// <----sig-----
//
// Alice should reject the last signature since the settle is not restored
// into the local update log and thus calculates Bob's signature as invalid.
func TestChannelLocalUnsignedUpdatesFailure(t *testing.T) {
t.Parallel()
// Create a test channel so that we can test the buggy behavior.
aliceChannel, bobChannel, cleanUp, err := CreateTestChannels(
channeldb.SingleFunderTweaklessBit,
)
require.NoError(t, err)
defer cleanUp()
// First we create an htlc that Bob sends to Alice.
htlc, _ := createHTLC(0, lnwire.MilliSatoshi(500000))
// <----add-----
_, err = bobChannel.AddHTLC(htlc, nil)
require.NoError(t, err)
_, err = aliceChannel.ReceiveHTLC(htlc)
require.NoError(t, err)
// Force a state transition to lock in this add on both commitments.
// <----sig-----
// -----rev---->
// -----sig---->
// <----rev-----
err = ForceStateTransition(bobChannel, aliceChannel)
require.NoError(t, err)
// Now Alice should fail the htlc back to Bob.
// -----fail--->
err = aliceChannel.FailHTLC(0, []byte("failreason"), nil, nil, nil)
require.NoError(t, err)
err = bobChannel.ReceiveFailHTLC(0, []byte("bad"))
require.NoError(t, err)
// Alice should send a commitment signature to Bob.
// -----sig---->
aliceSig, aliceHtlcSigs, _, err := aliceChannel.SignNextCommitment()
require.NoError(t, err)
err = bobChannel.ReceiveNewCommitment(aliceSig, aliceHtlcSigs)
require.NoError(t, err)
// Bob should reply with a revocation and Alice should save the fail as
// an unsigned local update.
// <----rev-----
bobRevocation, _, err := bobChannel.RevokeCurrentCommitment()
require.NoError(t, err)
_, _, _, _, err = aliceChannel.ReceiveRevocation(bobRevocation)
require.NoError(t, err)
// Restart Alice and assert that she can receive Bob's next commitment
// signature.
// *reconnect*
newAliceChannel, err := NewLightningChannel(
aliceChannel.Signer, aliceChannel.channelState,
aliceChannel.sigPool,
)
require.NoError(t, err)
// Bob sends the final signature and Alice should not reject it.
// <----sig-----
bobSig, bobHtlcSigs, _, err := bobChannel.SignNextCommitment()
require.NoError(t, err)
err = newAliceChannel.ReceiveNewCommitment(bobSig, bobHtlcSigs)
require.NoError(t, err)
}