contractcourt: add storage for the confirmed CommitSet

In this commit, we add storage to the Briefcase for reading/writing a
confirmed CommitSet. This will be used in follow up commits to ensure
that we're able to survive restarts after we mark a channel as pending
closed. Along the way, we also re-add the FetchChainActions struct as
legacy nodes will need this storage.
This commit is contained in:
Olaoluwa Osuntokun 2019-05-23 20:20:06 -07:00
parent 086f4eb8b3
commit 364c0dd9f1
No known key found for this signature in database
GPG Key ID: CE58F7F8E20FD9A2
2 changed files with 257 additions and 0 deletions

@ -9,6 +9,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
)
@ -81,6 +82,24 @@ type ArbitratorLog interface {
// contract resolutions from persistent storage.
FetchContractResolutions() (*ContractResolutions, error)
// InsertConfirmedCommitSet stores the known set of active HTLCs at the
// time channel closure. We'll use this to reconstruct our set of chain
// actions anew based on the confirmed and pending commitment state.
InsertConfirmedCommitSet(c *CommitSet) error
// FetchConfirmedCommitSet fetches the known confirmed active HTLC set
// from the database.
FetchConfirmedCommitSet() (*CommitSet, error)
// FetchChainActions attempts to fetch the set of previously stored
// chain actions. We'll use this upon restart to properly advance our
// state machine forward.
//
// NOTE: This method only exists in order to be able to serve nodes had
// channels in the process of closing before the CommitSet struct was
// introduced.
FetchChainActions() (ChainActionMap, error)
// WipeHistory is to be called ONLY once *all* contracts have been
// fully resolved, and the channel closure if finalized. This method
// will delete all on-disk state within the persistent log.
@ -247,6 +266,11 @@ var (
// actionsBucketKey is the key under the logScope that we'll use to
// store all chain actions once they're determined.
actionsBucketKey = []byte("chain-actions")
// commitSetKey is the primary key under the logScope that we'll use to
// store the confirmed active HTLC sets once we learn that a channel
// has closed out on chain.
commitSetKey = []byte("commit-set")
)
var (
@ -265,6 +289,11 @@ var (
// errNoActions is retuned when the log doesn't contain any stored
// chain actions.
errNoActions = fmt.Errorf("no chain actions exist")
// errNoCommitSet is return when the log doesn't contained a CommitSet.
// This can happen if the channel hasn't closed yet, or a client is
// running an older version that didn't yet write this state.
errNoCommitSet = fmt.Errorf("no commit set exists")
)
// boltArbitratorLog is an implementation of the ArbitratorLog interface backed
@ -708,6 +737,103 @@ func (b *boltArbitratorLog) FetchContractResolutions() (*ContractResolutions, er
return c, err
}
// FetchChainActions attempts to fetch the set of previously stored chain
// actions. We'll use this upon restart to properly advance our state machine
// forward.
//
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) FetchChainActions() (ChainActionMap, error) {
actionsMap := make(ChainActionMap)
err := b.db.View(func(tx *bbolt.Tx) error {
scopeBucket := tx.Bucket(b.scopeKey[:])
if scopeBucket == nil {
return errScopeBucketNoExist
}
actionsBucket := scopeBucket.Bucket(actionsBucketKey)
if actionsBucket == nil {
return errNoActions
}
return actionsBucket.ForEach(func(action, htlcBytes []byte) error {
if htlcBytes == nil {
return nil
}
chainAction := ChainAction(action[0])
htlcReader := bytes.NewReader(htlcBytes)
htlcs, err := channeldb.DeserializeHtlcs(htlcReader)
if err != nil {
return err
}
actionsMap[chainAction] = htlcs
return nil
})
})
if err != nil {
return nil, err
}
return actionsMap, nil
}
// InsertConfirmedCommitSet stores the known set of active HTLCs at the time
// channel closure. We'll use this to reconstruct our set of chain actions anew
// based on the confirmed and pending commitment state.
//
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) InsertConfirmedCommitSet(c *CommitSet) error {
return b.db.Update(func(tx *bbolt.Tx) error {
scopeBucket, err := tx.CreateBucketIfNotExists(b.scopeKey[:])
if err != nil {
return err
}
var b bytes.Buffer
if err := encodeCommitSet(&b, c); err != nil {
return err
}
return scopeBucket.Put(commitSetKey, b.Bytes())
})
}
// FetchConfirmedCommitSet fetches the known confirmed active HTLC set from the
// database.
//
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) {
var c *CommitSet
err := b.db.View(func(tx *bbolt.Tx) error {
scopeBucket := tx.Bucket(b.scopeKey[:])
if scopeBucket == nil {
return errScopeBucketNoExist
}
commitSetBytes := scopeBucket.Get(commitSetKey)
if commitSetBytes == nil {
return errNoCommitSet
}
commitSet, err := decodeCommitSet(bytes.NewReader(commitSetBytes))
if err != nil {
return err
}
c = commitSet
return nil
})
if err != nil {
return nil, err
}
return c, nil
}
// WipeHistory is to be called ONLY once *all* contracts have been fully
// resolved, and the channel closure if finalized. This method will delete all
// on-disk state within the persistent log.
@ -957,3 +1083,76 @@ func decodeCommitResolution(r io.Reader,
return binary.Read(r, endian, &c.MaturityDelay)
}
func encodeHtlcSetKey(w io.Writer, h *HtlcSetKey) error {
err := binary.Write(w, endian, h.IsRemote)
if err != nil {
return err
}
return binary.Write(w, endian, h.IsPending)
}
func encodeCommitSet(w io.Writer, c *CommitSet) error {
if err := encodeHtlcSetKey(w, c.ConfCommitKey); err != nil {
return err
}
numSets := uint8(len(c.HtlcSets))
if err := binary.Write(w, endian, numSets); err != nil {
return err
}
for htlcSetKey, htlcs := range c.HtlcSets {
htlcSetKey := htlcSetKey
if err := encodeHtlcSetKey(w, &htlcSetKey); err != nil {
return err
}
if err := channeldb.SerializeHtlcs(w, htlcs...); err != nil {
return err
}
}
return nil
}
func decodeHtlcSetKey(r io.Reader, h *HtlcSetKey) error {
err := binary.Read(r, endian, &h.IsRemote)
if err != nil {
return err
}
return binary.Read(r, endian, &h.IsPending)
}
func decodeCommitSet(r io.Reader) (*CommitSet, error) {
c := &CommitSet{
ConfCommitKey: &HtlcSetKey{},
HtlcSets: make(map[HtlcSetKey][]channeldb.HTLC),
}
if err := decodeHtlcSetKey(r, c.ConfCommitKey); err != nil {
return nil, err
}
var numSets uint8
if err := binary.Read(r, endian, &numSets); err != nil {
return nil, err
}
for i := uint8(0); i < numSets; i++ {
var htlcSetKey HtlcSetKey
if err := decodeHtlcSetKey(r, &htlcSetKey); err != nil {
return nil, err
}
htlcs, err := channeldb.DeserializeHtlcs(r)
if err != nil {
return nil, err
}
c.HtlcSets[htlcSetKey] = htlcs
}
return c, nil
}

@ -15,6 +15,8 @@ import (
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
)
@ -672,6 +674,62 @@ func TestScopeIsolation(t *testing.T) {
}
}
// TestCommitSetStorage tests that we're able to properly read/write active
// commitment sets.
func TestCommitSetStorage(t *testing.T) {
t.Parallel()
testLog, cleanUp, err := newTestBoltArbLog(
testChainHash, testChanPoint1,
)
if err != nil {
t.Fatalf("unable to create test log: %v", err)
}
defer cleanUp()
activeHTLCs := []channeldb.HTLC{
{
Amt: 1000,
OnionBlob: make([]byte, 0),
Signature: make([]byte, 0),
},
}
confTypes := []HtlcSetKey{
LocalHtlcSet, RemoteHtlcSet, RemotePendingHtlcSet,
}
for _, pendingRemote := range []bool{true, false} {
for _, confType := range confTypes {
commitSet := &CommitSet{
ConfCommitKey: &confType,
HtlcSets: make(map[HtlcSetKey][]channeldb.HTLC),
}
commitSet.HtlcSets[LocalHtlcSet] = activeHTLCs
commitSet.HtlcSets[RemoteHtlcSet] = activeHTLCs
if pendingRemote {
commitSet.HtlcSets[RemotePendingHtlcSet] = activeHTLCs
}
err := testLog.InsertConfirmedCommitSet(commitSet)
if err != nil {
t.Fatalf("unable to write commit set: %v", err)
}
diskCommitSet, err := testLog.FetchConfirmedCommitSet()
if err != nil {
t.Fatalf("unable to read commit set: %v", err)
}
if !reflect.DeepEqual(commitSet, diskCommitSet) {
t.Fatalf("commit set mismatch: expected %v, got %v",
spew.Sdump(commitSet), spew.Sdump(diskCommitSet))
}
}
}
}
func init() {
testSignDesc.KeyDesc.PubKey, _ = btcec.ParsePubKey(key1, btcec.S256())