diff --git a/channeldb/channel.go b/channeldb/channel.go index 7ffc8f17..cae5ae12 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -369,6 +369,8 @@ type OpenChannel struct { // TODO(roasbeef): eww Db *DB + // TODO(roasbeef): just need to store local and remote HTLC's? + sync.RWMutex } @@ -382,43 +384,171 @@ func (c *OpenChannel) FullSync() error { return c.Db.Update(c.fullSync) } -// fullSync is an internal versino of the FullSync method which allows callers -// to sync the contents of an OpenChannel while re-using an existing database -// transaction. -func (c *OpenChannel) fullSync(tx *bolt.Tx) error { - // TODO(roasbeef): add helper funcs to create scoped update +// updateChanBucket is a helper function that returns a writeable bucket that a +// channel's data resides in given: the public key for the node, the outpoint, +// and the chainhash that the channel resides on. +func updateChanBucket(tx *bolt.Tx, nodeKey *btcec.PublicKey, + outPoint *wire.OutPoint, chainHash chainhash.Hash) (*bolt.Bucket, error) { + // First fetch the top level bucket which stores all data related to // current, active channels. - chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket) + openChanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket) if err != nil { - return err + return nil, err } // Within this top level bucket, fetch the bucket dedicated to storing // open channel data specific to the remote node. - nodePub := c.IdentityPub.SerializeCompressed() - nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(nodePub) + nodePub := nodeKey.SerializeCompressed() + nodeChanBucket, err := openChanBucket.CreateBucketIfNotExists(nodePub) + if err != nil { + return nil, err + } + + // We'll then recurse down an additional layer in order to fetch the + // bucket for this particular chain. + chainBucket, err := nodeChanBucket.CreateBucketIfNotExists(chainHash[:]) + if err != nil { + return nil, err + } + + // With the bucket for the node fetched, we can now go down another + // level, creating the bucket (if it doesn't exist), for this channel + // itself. + var chanPointBuf bytes.Buffer + chanPointBuf.Grow(outPointSize) + if err := writeOutpoint(&chanPointBuf, outPoint); err != nil { + return nil, err + } + chanBucket, err := chainBucket.CreateBucketIfNotExists( + chanPointBuf.Bytes(), + ) + if err != nil { + return nil, err + } + + return chanBucket, nil +} + +// readChanBucket is a helper function that returns a readable bucket that a +// channel's data resides in given: the public key for the node, the outpoint, +// and the chainhash that the channel resides on. +func readChanBucket(tx *bolt.Tx, nodeKey *btcec.PublicKey, + outPoint *wire.OutPoint, chainHash chainhash.Hash) (*bolt.Bucket, error) { + + // First fetch the top level bucket which stores all data related to + // current, active channels. + openChanBucket := tx.Bucket(openChannelBucket) + if openChanBucket == nil { + return nil, ErrNoChanDBExists + } + + // Within this top level bucket, fetch the bucket dedicated to storing + // open channel data specific to the remote node. + nodePub := nodeKey.SerializeCompressed() + nodeChanBucket := openChanBucket.Bucket(nodePub) + if nodeChanBucket == nil { + return nil, ErrNoActiveChannels + } + + // We'll then recurse down an additional layer in order to fetch the + // bucket for this particular chain. + chainBucket := nodeChanBucket.Bucket(chainHash[:]) + if chainBucket == nil { + return nil, ErrNoActiveChannels + } + + // With the bucket for the node fetched, we can now go down another + // level, for this channel iteslf. + var chanPointBuf bytes.Buffer + chanPointBuf.Grow(outPointSize) + if err := writeOutpoint(&chanPointBuf, outPoint); err != nil { + return nil, err + } + + chanBucket := chainBucket.Bucket(chanPointBuf.Bytes()) + if chanBucket == nil { + return nil, ErrNoActiveChannels + } + + return chanBucket, nil +} + +// fullSync is an internal version of the FullSync method which allows callers +// to sync the contents of an OpenChannel while re-using an existing database +// transaction. +func (c *OpenChannel) fullSync(tx *bolt.Tx) error { + chanBucket, err := updateChanBucket(tx, c.IdentityPub, + &c.FundingOutpoint, c.ChainHash) if err != nil { return err } - // Add this channel ID to the node's active channel index if - // it doesn't already exist. - chanIndexBucket, err := nodeChanBucket.CreateBucketIfNotExists(chanIDBucket) - if err != nil { - return err - } - var b bytes.Buffer - if err := writeOutpoint(&b, &c.FundingOutpoint); err != nil { - return err - } - if chanIndexBucket.Get(b.Bytes()) == nil { - if err := chanIndexBucket.Put(b.Bytes(), nil); err != nil { + return putOpenChannel(chanBucket, c) +} + return err } + + channel.IsPending = false + channel.ShortChanID = openLoc + + return putOpenChannel(chanBucket, channel) + }) +} + +// putChannel serializes, and stores the current state of the channel in its +// entirety. +func putOpenChannel(chanBucket *bolt.Bucket, channel *OpenChannel) error { + // First, we'll write out all the relatively static fields, that are + // decided upon initial channel creation. + if err := putChanInfo(chanBucket, channel); err != nil { + return fmt.Errorf("unable to store chan info: %v", err) } - return putOpenChannel(chanBucket, nodeChanBucket, c) + // With the static channel info written out, we'll now write out the + // current commitment state for both parties. + if err := putChanCommitments(chanBucket, channel); err != nil { + return fmt.Errorf("unable to store chan commitments: %v", err) + } + + // Finally, we'll write out the revocation state for both parties + // within a distinct key space. + if err := putChanRevocationState(chanBucket, channel); err != nil { + return fmt.Errorf("unable to store chan revocations: %v", err) + } + + return nil +} + +// fetchOpenChannel retrieves, and deserializes (including decrypting +// sensitive) the complete channel currently active with the passed nodeID. +func fetchOpenChannel(chanBucket *bolt.Bucket, + chanPoint *wire.OutPoint) (*OpenChannel, error) { + + channel := &OpenChannel{ + FundingOutpoint: *chanPoint, + } + + // First, we'll read all the static information that changes less + // frequently from disk. + if err := fetchChanInfo(chanBucket, channel); err != nil { + return nil, fmt.Errorf("unable to fetch chan info: %v", err) + } + + // With the static information read, we'll now read the current + // commitment state for both sides of the channel. + if err := fetchChanCommitments(chanBucket, channel); err != nil { + return nil, fmt.Errorf("unable to fetch chan commitments: %v", err) + } + + // Finally, we'll retrieve the current revocation state so we can + // properly + if err := fetchChanRevocationState(chanBucket, channel); err != nil { + return nil, fmt.Errorf("unable to fetch chan revocations: %v", err) + } + + return channel, nil } // SyncPending writes the contents of the channel to the database while it's in @@ -460,9 +590,10 @@ func (c *OpenChannel) SyncPending(addr *net.TCPAddr, pendingHeight uint32) error // relationship for this channel. The LinkNode metadata // contains reachability, up-time, and service bits related // information. - // TODO(roasbeef): net info should be in lnwire.NetAddress linkNode := c.Db.NewLinkNode(wire.MainNet, c.IdentityPub, addr) + // TODO(roasbeef): do away with link node all together? + return putLinkNode(nodeInfoBucket, linkNode) }) } @@ -538,32 +669,8 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *wire.MsgTx, return nil }) -} -// UpdateHTLCs.... -func (c *OpenChannel) UpdateHTLCs(htlcs []*HTLC) error { - c.Lock() - defer c.Unlock() - return c.Db.Update(func(tx *bolt.Tx) error { - chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket) - if err != nil { - return err - } - - id := c.IdentityPub.SerializeCompressed() - nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(id) - if err != nil { - return err - } - - if err := putCurrentHtlcs(nodeChanBucket, htlcs, - &c.FundingOutpoint); err != nil { - return err - } - - return nil - }) } // HTLC is the on-disk representation of a hash time-locked contract. HTLCs are @@ -614,11 +721,49 @@ type HTLC struct { LogIndex uint64 } - // AddRemoteInclusionHeight... - AddRemoteInclusionHeight uint64 +func serializeHtlcs(b io.Writer, htlcs []HTLC) error { + numHtlcs := uint16(len(htlcs)) + if err := writeElement(b, numHtlcs); err != nil { + return err + } - // DescriptorIndex... - DescriptorIndex uint64 + for _, htlc := range htlcs { + if err := writeElements(b, + htlc.Signature, htlc.RHash, htlc.Amt, htlc.RefundTimeout, + htlc.OutputIndex, htlc.Incoming, htlc.OnionBlob[:], + htlc.HtlcIndex, htlc.LogIndex, + ); err != nil { + return err + } + } + + return nil +} + +func deserializeHtlcs(r io.Reader) ([]HTLC, error) { + var numHtlcs uint16 + if err := readElement(r, &numHtlcs); err != nil { + return nil, err + } + + var htlcs []HTLC + if numHtlcs == 0 { + return htlcs, nil + } + + htlcs = make([]HTLC, numHtlcs) + for i := uint16(0); i < numHtlcs; i++ { + if err := readElements(r, + &htlcs[i].Signature, &htlcs[i].RHash, &htlcs[i].Amt, + &htlcs[i].RefundTimeout, &htlcs[i].OutputIndex, + &htlcs[i].Incoming, &htlcs[i].OnionBlob, + &htlcs[i].HtlcIndex, &htlcs[i].LogIndex, + ); err != nil { + return htlcs, err + } + } + + return htlcs, nil } // Copy returns a full copy of the target HTLC. @@ -635,44 +780,7 @@ func (h *HTLC) Copy() HTLC { return clone } -// ChannelDelta is a snapshot of the commitment state at a particular point in -// the commitment chain. With each state transition, a snapshot of the current -// state along with all non-settled HTLCs are recorded. These snapshots detail -// the state of the _remote_ party's commitment at a particular state number. -// For ourselves (the local node) we ONLY store our most recent (unrevoked) -// state for safety purposes. -type ChannelDelta struct { - // OurMessageIndex... - OurMessageIndex uint64 - // TheirMessageIndex... - TheirMessageIndex uint64 - - // LocalBalance is our current balance at this particular update - // number. - LocalBalance lnwire.MilliSatoshi - - // RemoteBalanceis the balance of the remote node at this particular - // update number. - RemoteBalance lnwire.MilliSatoshi - - // CommitFee is the fee that has been subtracted from the channel - // initiator's balance at this point in the commitment chain. - CommitFee btcutil.Amount - - // FeePerKw is the fee per kw used to calculate the commit fee at this point - // in the commit chain. - FeePerKw btcutil.Amount - - // UpdateNum is the update number that this ChannelDelta represents the - // total number of commitment updates to this point. This can be viewed - // as sort of a "commitment height" as this number is monotonically - // increasing. - UpdateNum uint64 - - // Htlcs is the set of HTLC's that are pending at this particular - // commitment height. - Htlcs []*HTLC } // CommitDiff... @@ -803,21 +911,22 @@ func (c *OpenChannel) InsertNextRevocation(revKey *btcec.PublicKey) error { c.Lock() defer c.Unlock() - return c.Db.Update(func(tx *bolt.Tx) error { - chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket) + c.RemoteNextRevocation = revKey + + err := c.Db.Update(func(tx *bolt.Tx) error { + chanBucket, err := updateChanBucket(tx, c.IdentityPub, + &c.FundingOutpoint, c.ChainHash) if err != nil { return err } - id := c.IdentityPub.SerializeCompressed() - nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(id) - if err != nil { - return err - } - - c.RemoteNextRevocation = revKey - return putChanRevocationState(nodeChanBucket, c) + return putChanRevocationState(chanBucket, c) }) + if err != nil { + return err + } + + return nil } // AppendToRevocationLog records the new state transition within an on-disk @@ -935,26 +1044,29 @@ func (c *OpenChannel) RevocationLogTail() (*ChannelDelta, error) { // order to allow multiple instances of a particular open channel to obtain a // consistent view of the number of channel updates to data. func (c *OpenChannel) CommitmentHeight() (uint64, error) { - // TODO(roasbeef): this is super hacky, remedy during refactor!!! - o := &OpenChannel{ - FundingOutpoint: c.FundingOutpoint, - } - + var height uint64 err := c.Db.View(func(tx *bolt.Tx) error { // Get the bucket dedicated to storing the metadata for open // channels. - openChanBucket := tx.Bucket(openChannelBucket) - if openChanBucket == nil { - return ErrNoActiveChannels + chanBucket, err := readChanBucket(tx, c.IdentityPub, + &c.FundingOutpoint, c.ChainHash) + if err != nil { + return err } - return fetchChanNumUpdates(openChanBucket, o) + commit, err := fetchChanCommitment(chanBucket, true) + if err != nil { + return err + } + + height = commit.CommitHeight + return nil }) if err != nil { return 0, nil } - return o.NumUpdates, nil + return height, nil } // FindPreviousState scans through the append-only log in an attempt to recover @@ -1012,10 +1124,10 @@ const ( // _revoked_ channel state. BreachClose - // FundingCanceled indicates that the channel never was fully opened before it - // was marked as closed in the database. This can happen if we or the remote - // fail at some point during the opening workflow, or we timeout waiting for - // the funding transaction to be confirmed. + // FundingCanceled indicates that the channel never was fully opened + // before it was marked as closed in the database. This can happen if + // we or the remote fail at some point during the opening workflow, or + // we timeout waiting for the funding transaction to be confirmed. FundingCanceled ) @@ -1143,43 +1255,36 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary) error { // ChannelSnapshot is a frozen snapshot of the current channel state. A // snapshot is detached from the original channel that generated it, providing // read-only access to the current or prior state of an active channel. +// +// TODO(roasbeef): remove all together? pretty much just commitment type ChannelSnapshot struct { // RemoteIdentity is the identity public key of the remote node that we // are maintaining the open channel with. RemoteIdentity btcec.PublicKey - // ChannelPoint is the channel point that uniquly identifies the - // channel whose delta this is. + // ChanPoint is the outpoint that created the channel. This output is + // found within the funding transaction and uniquely identified the + // channel on the resident chain. ChannelPoint wire.OutPoint - // Capacity is the total capacity of the channel in satoshis. + // ChainHash is the genesis hash of the chain that the channel resides + // within. + ChainHash chainhash.Hash + + // Capacity is the total capacity of the channel. Capacity btcutil.Amount - // LocalBalance is the amount of mSAT allocated to the local party. - LocalBalance lnwire.MilliSatoshi + // TotalMSatSent is the total number of milli-satoshis we've sent + // within this channel. + TotalMSatSent lnwire.MilliSatoshi - // RemoteBalance is the amount of mSAT allocated to the remote party. - RemoteBalance lnwire.MilliSatoshi + // TotalMSatReceived is the total number of milli-satoshis we've + // received within this channel. + TotalMSatReceived lnwire.MilliSatoshi - // NumUpdates is the number of updates that have taken place within the - // commitment transaction itself. - NumUpdates uint64 - - // CommitFee is the total fee paid on the commitment transaction at - // this current commitment state. - CommitFee btcutil.Amount - - // TotalMilliSatoshisSent is the total number of mSAT sent by the local - // party at this current commitment instance. - TotalMilliSatoshisSent lnwire.MilliSatoshi - - // TotalMilliSatoshisReceived is the total number of mSAT received by - // the local party current commitment instance. - TotalMilliSatoshisReceived lnwire.MilliSatoshi - - // Htlcs is the current set of outstanding HTLC's live on the - // commitment transaction at this instance. - Htlcs []HTLC + // ChannelCommitment is the current up-to-date commitment for the + // target channel. + ChannelCommitment } // Snapshot returns a read-only snapshot of the current channel state. This @@ -1189,22 +1294,26 @@ func (c *OpenChannel) Snapshot() *ChannelSnapshot { c.RLock() defer c.RUnlock() + localCommit := c.LocalCommitment snapshot := &ChannelSnapshot{ - RemoteIdentity: *c.IdentityPub, - ChannelPoint: c.FundingOutpoint, - Capacity: c.Capacity, - LocalBalance: c.LocalBalance, - RemoteBalance: c.RemoteBalance, - NumUpdates: c.NumUpdates, - CommitFee: c.CommitFee, - TotalMilliSatoshisSent: c.TotalMSatSent, - TotalMilliSatoshisReceived: c.TotalMSatReceived, + RemoteIdentity: *c.IdentityPub, + ChannelPoint: c.FundingOutpoint, + Capacity: c.Capacity, + TotalMSatSent: c.TotalMSatSent, + TotalMSatReceived: c.TotalMSatReceived, + ChainHash: c.ChainHash, + ChannelCommitment: ChannelCommitment{ + LocalBalance: localCommit.LocalBalance, + RemoteBalance: localCommit.RemoteBalance, + CommitHeight: localCommit.CommitHeight, + CommitFee: localCommit.CommitFee, + }, } - // Copy over the current set of HTLCs to ensure the caller can't - // mutate our internal state. - snapshot.Htlcs = make([]HTLC, len(c.Htlcs)) - for i, h := range c.Htlcs { + // Copy over the current set of HTLCs to ensure the caller can't mutate + // our internal state. + snapshot.Htlcs = make([]HTLC, len(localCommit.Htlcs)) + for i, h := range localCommit.Htlcs { snapshot.Htlcs[i] = h.Copy() } @@ -1228,37 +1337,11 @@ func putChannelCloseSummary(tx *bolt.Tx, chanID []byte, } func serializeChannelCloseSummary(w io.Writer, cs *ChannelCloseSummary) error { - if err := binary.Write(w, byteOrder, cs.IsPending); err != nil { - return err - } - - if err := writeOutpoint(w, &cs.ChanPoint); err != nil { - return err - } - if _, err := w.Write(cs.ClosingTXID[:]); err != nil { - return err - } - - if err := binary.Write(w, byteOrder, cs.SettledBalance); err != nil { - return err - } - if err := binary.Write(w, byteOrder, cs.TimeLockedBalance); err != nil { - return err - } - if err := binary.Write(w, byteOrder, cs.Capacity); err != nil { - return err - } - - if _, err := w.Write([]byte{byte(cs.CloseType)}); err != nil { - return err - } - - pub := cs.RemotePub.SerializeCompressed() - if _, err := w.Write(pub); err != nil { - return err - } - - return nil + return writeElements(w, + cs.ChanPoint, cs.ChainHash, cs.ClosingTXID, cs.RemotePub, cs.Capacity, + cs.SettledBalance, cs.TimeLockedBalance, cs.CloseType, + cs.IsPending, + ) } func fetchChannelCloseSummary(tx *bolt.Tx, @@ -1281,40 +1364,11 @@ func fetchChannelCloseSummary(tx *bolt.Tx, func deserializeCloseChannelSummary(r io.Reader) (*ChannelCloseSummary, error) { c := &ChannelCloseSummary{} - var err error - - if err := binary.Read(r, byteOrder, &c.IsPending); err != nil { - return nil, err - } - - if err := readOutpoint(r, &c.ChanPoint); err != nil { - return nil, err - } - if _, err := io.ReadFull(r, c.ClosingTXID[:]); err != nil { - return nil, err - } - - if err := binary.Read(r, byteOrder, &c.SettledBalance); err != nil { - return nil, err - } - if err := binary.Read(r, byteOrder, &c.TimeLockedBalance); err != nil { - return nil, err - } - if err := binary.Read(r, byteOrder, &c.Capacity); err != nil { - return nil, err - } - - var closeType [1]byte - if err := binary.Read(r, byteOrder, closeType[:]); err != nil { - return nil, err - } - c.CloseType = ClosureType(closeType[0]) - - var pub [33]byte - if _, err := io.ReadFull(r, pub[:]); err != nil { - return nil, err - } - c.RemotePub, err = btcec.ParsePubKey(pub[:], btcec.S256()) + err := readElements(r, + &c.ChanPoint, &c.ChainHash, &c.ClosingTXID, &c.RemotePub, &c.Capacity, + &c.SettledBalance, &c.TimeLockedBalance, &c.CloseType, + &c.IsPending, + ) if err != nil { return nil, err } @@ -1322,625 +1376,182 @@ func deserializeCloseChannelSummary(r io.Reader) (*ChannelCloseSummary, error) { return c, nil } -// putChannel serializes, and stores the current state of the channel in its -// entirety. -func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, - channel *OpenChannel) error { - - // First write out all the "common" fields using the field's prefix - // append with the channel's ID. These fields go into a top-level - // bucket to allow for ease of metric aggregation via efficient prefix - // scans. - if err := putChanCapacity(openChanBucket, channel); err != nil { - return err - } - if err := putChanFeePerKw(openChanBucket, channel); err != nil { - return err - } - if err := putChanNumUpdates(openChanBucket, channel); err != nil { - return err - } - if err := putOurMessageIndex(openChanBucket, channel); err != nil { - return err - } - if err := putTheirMessageIndex(openChanBucket, channel); err != nil { - return err - } - if err := putChanAmountsTransferred(openChanBucket, channel); err != nil { - return err - } - if err := putChanIsPending(openChanBucket, channel); err != nil { - return err - } - if err := putChanConfInfo(openChanBucket, channel); err != nil { - return err - } - if err := putChanCommitFee(openChanBucket, channel); err != nil { +func putChanInfo(chanBucket *bolt.Bucket, channel *OpenChannel) error { + var w bytes.Buffer + if err := writeElements(&w, + channel.ChanType, channel.ChainHash, channel.FundingOutpoint, + channel.ShortChanID, channel.IsPending, channel.IsInitiator, + channel.FundingBroadcastHeight, channel.NumConfsRequired, + channel.IdentityPub, channel.Capacity, channel.TotalMSatSent, + channel.TotalMSatReceived, + ); err != nil { return err } - // Next, write out the fields of the channel update less frequently. - if err := putChannelIDs(nodeChanBucket, channel); err != nil { + writeChanConfig := func(b io.Writer, c *ChannelConfig) error { + return writeElements(b, + c.DustLimit, c.MaxPendingAmount, c.ChanReserve, c.MinHTLC, + c.MaxAcceptedHtlcs, c.CsvDelay, c.MultiSigKey, + c.RevocationBasePoint, c.PaymentBasePoint, c.DelayBasePoint, + ) + } + if err := writeChanConfig(&w, &channel.LocalChanCfg); err != nil { return err } - if err := putChanConfigs(nodeChanBucket, channel); err != nil { - return err - } - if err := putChanCommitTxns(nodeChanBucket, channel); err != nil { - return err - } - if err := putChanFundingInfo(nodeChanBucket, channel); err != nil { - return err - } - if err := putChanRevocationState(nodeChanBucket, channel); err != nil { - return err - } - if err := putCurrentHtlcs(nodeChanBucket, channel.Htlcs, - &channel.FundingOutpoint); err != nil { + if err := writeChanConfig(&w, &channel.RemoteChanCfg); err != nil { return err } - return nil + return chanBucket.Put(chanInfoKey, w.Bytes()) } -// fetchOpenChannel retrieves, and deserializes (including decrypting -// sensitive) the complete channel currently active with the passed nodeID. -func fetchOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, - chanID *wire.OutPoint) (*OpenChannel, error) { - - var err error - channel := &OpenChannel{ - FundingOutpoint: *chanID, +func serializeChanCommit(w io.Writer, c *ChannelCommitment) error { + if err := writeElements(w, + c.CommitHeight, c.LocalLogIndex, c.LocalHtlcIndex, + c.RemoteLogIndex, c.RemoteHtlcIndex, c.LocalBalance, + c.RemoteBalance, c.CommitFee, c.FeePerKw, c.CommitTx, + c.CommitSig, + ); err != nil { + return err } - // First, read out the fields of the channel update less frequently. - if err = fetchChannelIDs(nodeChanBucket, channel); err != nil { - return nil, fmt.Errorf("unable to read chan ID's: %v", err) - } - if err = fetchChanConfigs(nodeChanBucket, channel); err != nil { - return nil, fmt.Errorf("unable to read chan config: %v", err) - } - if err = fetchChanCommitTxns(nodeChanBucket, channel); err != nil { - return nil, fmt.Errorf("unable to read commit txns: %v", err) - } - if err = fetchChanFundingInfo(nodeChanBucket, channel); err != nil { - return nil, fmt.Errorf("unable to read funding info: %v", err) - } - if err = fetchChanRevocationState(nodeChanBucket, channel); err != nil { - return nil, err - } - channel.Htlcs, err = fetchCurrentHtlcs(nodeChanBucket, chanID) - if err != nil { - return nil, fmt.Errorf("unable to read current htlc's: %v", err) - } - - // With the existence of an open channel bucket with this node verified, - // perform a full read of the entire struct. Starting with the prefixed - // fields residing in the parent bucket. - // TODO(roasbeef): combine the below into channel config like key - if err = fetchChanCapacity(openChanBucket, channel); err != nil { - return nil, fmt.Errorf("unable to read chan capacity: %v", err) - } - if err = fetchChanMinFeePerKw(openChanBucket, channel); err != nil { - return nil, fmt.Errorf("unable to read fee-per-kb: %v", err) - } - if err = fetchChanNumUpdates(openChanBucket, channel); err != nil { - return nil, fmt.Errorf("unable to read num updates: %v", err) - } - if err = fetchOurMessageIndex(openChanBucket, channel); err != nil { - return nil, fmt.Errorf("unable to read our message index: %v", err) - } - if err = fetchTheirMessageIndex(openChanBucket, channel); err != nil { - return nil, fmt.Errorf("unable to read their message index: %v", err) - } - if err = fetchChanAmountsTransferred(openChanBucket, channel); err != nil { - return nil, fmt.Errorf("unable to read sat transferred: %v", err) - } - if err = fetchChanIsPending(openChanBucket, channel); err != nil { - return nil, err - } - if err := fetchChanConfInfo(openChanBucket, channel); err != nil { - return nil, err - } - if err = fetchChanCommitFee(openChanBucket, channel); err != nil { - return nil, err - } - - return channel, nil + return serializeHtlcs(w, c.Htlcs) } -func deleteOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, - channelID []byte, o *wire.OutPoint) error { +func putChanCommitment(chanBucket *bolt.Bucket, c *ChannelCommitment, + local bool) error { - // First we'll delete all the "common" top level items stored outside - // the node's channel bucket. - if err := deleteChanCapacity(openChanBucket, channelID); err != nil { - return err - } - if err := deleteChanMinFeePerKw(openChanBucket, channelID); err != nil { - return err - } - if err := deleteChanNumUpdates(openChanBucket, channelID); err != nil { - return err - } - if err := deleteOurMessageIndex(openChanBucket, channelID); err != nil { - return err - } - if err := deleteTheirMessageIndex(openChanBucket, channelID); err != nil { - return err - } - if err := deleteChanAmountsTransferred(openChanBucket, channelID); err != nil { - return err - } - if err := deleteChanIsPending(openChanBucket, channelID); err != nil { - return err - } - if err := deleteChanConfInfo(openChanBucket, channelID); err != nil { - return err - } - if err := deleteChanCommitFee(openChanBucket, channelID); err != nil { - return err - } - - // Finally, delete all the fields directly within the node's channel - // bucket. - if err := deleteChannelIDs(nodeChanBucket, channelID); err != nil { - return err - } - if err := deleteChanConfigs(nodeChanBucket, channelID); err != nil { - return err - } - if err := deleteChanCommitTxns(nodeChanBucket, channelID); err != nil { - return err - } - if err := deleteChanFundingInfo(nodeChanBucket, channelID); err != nil { - return err - } - if err := deleteChanRevocationState(nodeChanBucket, channelID); err != nil { - return err - } - if err := deleteCurrentHtlcs(nodeChanBucket, o); err != nil { - return err - } - - return nil -} - -func putChanCapacity(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - // Some scratch bytes re-used for serializing each of the uint64's. - scratch1 := make([]byte, 8) - scratch2 := make([]byte, 8) - scratch3 := make([]byte, 8) - - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix[3:], b.Bytes()) - - copy(keyPrefix[:3], chanCapacityPrefix) - byteOrder.PutUint64(scratch1, uint64(channel.Capacity)) - if err := openChanBucket.Put(keyPrefix, scratch1); err != nil { - return err - } - - copy(keyPrefix[:3], selfBalancePrefix) - byteOrder.PutUint64(scratch2, uint64(channel.LocalBalance)) - if err := openChanBucket.Put(keyPrefix, scratch2); err != nil { - return err - } - - copy(keyPrefix[:3], theirBalancePrefix) - byteOrder.PutUint64(scratch3, uint64(channel.RemoteBalance)) - return openChanBucket.Put(keyPrefix, scratch3) -} - -func deleteChanCapacity(openChanBucket *bolt.Bucket, chanID []byte) error { - keyPrefix := make([]byte, 3+len(chanID)) - copy(keyPrefix[3:], chanID) - - copy(keyPrefix[:3], chanCapacityPrefix) - if err := openChanBucket.Delete(keyPrefix); err != nil { - return err - } - - copy(keyPrefix[:3], selfBalancePrefix) - if err := openChanBucket.Delete(keyPrefix); err != nil { - return err - } - - copy(keyPrefix[:3], theirBalancePrefix) - return openChanBucket.Delete(keyPrefix) -} - -func fetchChanCapacity(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - // A byte slice re-used to compute each key prefix below. - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix[3:], b.Bytes()) - - copy(keyPrefix[:3], chanCapacityPrefix) - capacityBytes := openChanBucket.Get(keyPrefix) - channel.Capacity = btcutil.Amount(byteOrder.Uint64(capacityBytes)) - - copy(keyPrefix[:3], selfBalancePrefix) - selfBalanceBytes := openChanBucket.Get(keyPrefix) - channel.LocalBalance = lnwire.MilliSatoshi(byteOrder.Uint64(selfBalanceBytes)) - - copy(keyPrefix[:3], theirBalancePrefix) - theirBalanceBytes := openChanBucket.Get(keyPrefix) - channel.RemoteBalance = lnwire.MilliSatoshi(byteOrder.Uint64(theirBalanceBytes)) - - return nil -} - -func putChanFeePerKw(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - scratch := make([]byte, 8) - byteOrder.PutUint64(scratch, uint64(channel.FeePerKw)) - - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix, minFeePerKwPrefix) - copy(keyPrefix[3:], b.Bytes()) - - return openChanBucket.Put(keyPrefix, scratch) -} - -func deleteChanMinFeePerKw(openChanBucket *bolt.Bucket, chanID []byte) error { - keyPrefix := make([]byte, 3+len(chanID)) - copy(keyPrefix, minFeePerKwPrefix) - copy(keyPrefix[3:], chanID) - return openChanBucket.Delete(keyPrefix) -} - -func fetchChanMinFeePerKw(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix, minFeePerKwPrefix) - copy(keyPrefix[3:], b.Bytes()) - - feeBytes := openChanBucket.Get(keyPrefix) - channel.FeePerKw = btcutil.Amount(byteOrder.Uint64(feeBytes)) - - return nil -} - -func putChanNumUpdates(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - scratch := make([]byte, 8) - byteOrder.PutUint64(scratch, channel.NumUpdates) - - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix, updatePrefix) - copy(keyPrefix[3:], b.Bytes()) - - return openChanBucket.Put(keyPrefix, scratch) -} - -func deleteChanNumUpdates(openChanBucket *bolt.Bucket, chanID []byte) error { - keyPrefix := make([]byte, 3+len(chanID)) - copy(keyPrefix, updatePrefix) - copy(keyPrefix[3:], chanID) - return openChanBucket.Delete(keyPrefix) -} - -func fetchChanNumUpdates(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix, updatePrefix) - copy(keyPrefix[3:], b.Bytes()) - - updateBytes := openChanBucket.Get(keyPrefix) - channel.NumUpdates = byteOrder.Uint64(updateBytes) - - return nil -} - -func putChanAmountsTransferred(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - scratch1 := make([]byte, 8) - scratch2 := make([]byte, 8) - - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix[3:], b.Bytes()) - - copy(keyPrefix[:3], satSentPrefix) - byteOrder.PutUint64(scratch1, uint64(channel.TotalMSatSent)) - if err := openChanBucket.Put(keyPrefix, scratch1); err != nil { - return err - } - - copy(keyPrefix[:3], satReceivedPrefix) - byteOrder.PutUint64(scratch2, uint64(channel.TotalMSatReceived)) - return openChanBucket.Put(keyPrefix, scratch2) -} - -func deleteChanAmountsTransferred(openChanBucket *bolt.Bucket, chanID []byte) error { - keyPrefix := make([]byte, 3+len(chanID)) - copy(keyPrefix[3:], chanID) - - copy(keyPrefix[:3], satSentPrefix) - if err := openChanBucket.Delete(keyPrefix); err != nil { - return err - } - - copy(keyPrefix[:3], satReceivedPrefix) - return openChanBucket.Delete(keyPrefix) -} - -func fetchChanAmountsTransferred(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix[3:], b.Bytes()) - - copy(keyPrefix[:3], satSentPrefix) - totalSentBytes := openChanBucket.Get(keyPrefix) - channel.TotalMSatSent = lnwire.MilliSatoshi(byteOrder.Uint64(totalSentBytes)) - - copy(keyPrefix[:3], satReceivedPrefix) - totalReceivedBytes := openChanBucket.Get(keyPrefix) - channel.TotalMSatReceived = lnwire.MilliSatoshi(byteOrder.Uint64(totalReceivedBytes)) - - return nil -} - -func putChanIsPending(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - scratch := make([]byte, 2) - - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix[3:], b.Bytes()) - copy(keyPrefix[:3], isPendingPrefix) - - if channel.IsPending { - byteOrder.PutUint16(scratch, uint16(1)) - return openChanBucket.Put(keyPrefix, scratch) - } - - byteOrder.PutUint16(scratch, uint16(0)) - return openChanBucket.Put(keyPrefix, scratch) -} - -func deleteChanIsPending(openChanBucket *bolt.Bucket, chanID []byte) error { - keyPrefix := make([]byte, 3+len(chanID)) - copy(keyPrefix[3:], chanID) - copy(keyPrefix[:3], isPendingPrefix) - return openChanBucket.Delete(keyPrefix) -} - -func fetchChanIsPending(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix[3:], b.Bytes()) - copy(keyPrefix[:3], isPendingPrefix) - - isPending := byteOrder.Uint16(openChanBucket.Get(keyPrefix)) - if isPending == 1 { - channel.IsPending = true + var key []byte + copy(key[:], chanCommitmentKey) + if local { + key = append(key, byte(0x00)) } else { - channel.IsPending = false + key = append(key, byte(0x01)) } - return nil -} - -func putChanConfInfo(openChanBucket *bolt.Bucket, channel *OpenChannel) error { var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { + if err := serializeChanCommit(&b, c); err != nil { return err } - keyPrefix := make([]byte, len(confInfoPrefix)+b.Len()) - copy(keyPrefix[:len(confInfoPrefix)], confInfoPrefix) - copy(keyPrefix[len(confInfoPrefix):], b.Bytes()) - - // We store the conf info in the following format: broadcast || open. - var scratch [12]byte - byteOrder.PutUint32(scratch[:], channel.FundingBroadcastHeight) - byteOrder.PutUint64(scratch[4:], channel.ShortChanID.ToUint64()) - - return openChanBucket.Put(keyPrefix, scratch[:]) + return chanBucket.Put(key, b.Bytes()) } -func fetchChanConfInfo(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - keyPrefix := make([]byte, len(confInfoPrefix)+b.Len()) - copy(keyPrefix[:len(confInfoPrefix)], confInfoPrefix) - copy(keyPrefix[len(confInfoPrefix):], b.Bytes()) - - confInfoBytes := openChanBucket.Get(keyPrefix) - channel.FundingBroadcastHeight = byteOrder.Uint32(confInfoBytes[:4]) - channel.ShortChanID = lnwire.NewShortChanIDFromInt( - byteOrder.Uint64(confInfoBytes[4:]), - ) - - return nil -} - -func deleteChanConfInfo(openChanBucket *bolt.Bucket, chanID []byte) error { - keyPrefix := make([]byte, len(confInfoPrefix)+len(chanID)) - copy(keyPrefix[:len(confInfoPrefix)], confInfoPrefix) - copy(keyPrefix[len(confInfoPrefix):], chanID) - return openChanBucket.Delete(keyPrefix) -} - -func putChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { - // TODO(roasbeef): just pass in chanID everywhere for puts - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - // Construct the id key: cid || channelID. - // TODO(roasbeef): abstract out to func - idKey := make([]byte, len(chanIDKey)+b.Len()) - copy(idKey[:3], chanIDKey) - copy(idKey[3:], b.Bytes()) - - idBytes := channel.IdentityPub.SerializeCompressed() - return nodeChanBucket.Put(idKey, idBytes) -} - -func deleteChannelIDs(nodeChanBucket *bolt.Bucket, chanID []byte) error { - idKey := make([]byte, len(chanIDKey)+len(chanID)) - copy(idKey[:3], chanIDKey) - copy(idKey[3:], chanID) - return nodeChanBucket.Delete(idKey) -} - -func fetchChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { - var ( - err error - b bytes.Buffer - ) - - if err = writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - // Construct the id key: cid || channelID. - idKey := make([]byte, len(chanIDKey)+b.Len()) - copy(idKey[:3], chanIDKey) - copy(idKey[3:], b.Bytes()) - - idBytes := nodeChanBucket.Get(idKey) - channel.IdentityPub, err = btcec.ParsePubKey(idBytes, btcec.S256()) +func putChanCommitments(chanBucket *bolt.Bucket, channel *OpenChannel) error { + err := putChanCommitment(chanBucket, &channel.LocalCommitment, true) if err != nil { return err } - return nil + return putChanCommitment(chanBucket, &channel.RemoteCommitment, false) } -func putChanCommitFee(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - scratch := make([]byte, 8) - byteOrder.PutUint64(scratch, uint64(channel.CommitFee)) +func putChanRevocationState(chanBucket *bolt.Bucket, channel *OpenChannel) error { var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { + err := writeElements( + &b, channel.RemoteCurrentRevocation, channel.RevocationProducer, + channel.RevocationStore, + ) + if err != nil { return err } - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix, commitFeePrefix) - copy(keyPrefix[3:], b.Bytes()) + // TODO(roasbeef): don't keep producer on disk - return openChanBucket.Put(keyPrefix, scratch) + // If the next revocation is present, which is only the case after the + // FundingLocked message has been sent, then we'll write it to disk. + if channel.RemoteNextRevocation != nil { + err = writeElements(&b, channel.RemoteNextRevocation) + if err != nil { + return err + } + } + + return chanBucket.Put(revocationStateKey, b.Bytes()) } -func fetchChanCommitFee(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { +func fetchChanInfo(chanBucket *bolt.Bucket, channel *OpenChannel) error { + infoBytes := chanBucket.Get(chanInfoKey) + if infoBytes == nil { + return ErrNoChanInfoFound + } + r := bytes.NewReader(infoBytes) + + if err := readElements(r, + &channel.ChanType, &channel.ChainHash, &channel.FundingOutpoint, + &channel.ShortChanID, &channel.IsPending, &channel.IsInitiator, + &channel.FundingBroadcastHeight, &channel.NumConfsRequired, + &channel.IdentityPub, &channel.Capacity, &channel.TotalMSatSent, + &channel.TotalMSatReceived, + ); err != nil { return err } - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix, commitFeePrefix) - copy(keyPrefix[3:], b.Bytes()) - - commitFeeBytes := openChanBucket.Get(keyPrefix) - channel.CommitFee = btcutil.Amount(byteOrder.Uint64(commitFeeBytes)) + readChanConfig := func(b io.Reader, c *ChannelConfig) error { + return readElements(b, + &c.DustLimit, &c.MaxPendingAmount, &c.ChanReserve, + &c.MinHTLC, &c.MaxAcceptedHtlcs, &c.CsvDelay, + &c.MultiSigKey, &c.RevocationBasePoint, + &c.PaymentBasePoint, &c.DelayBasePoint, + ) + } + if err := readChanConfig(r, &channel.LocalChanCfg); err != nil { + return err + } + if err := readChanConfig(r, &channel.RemoteChanCfg); err != nil { + return err + } return nil } -func deleteChanCommitFee(openChanBucket *bolt.Bucket, chanID []byte) error { - commitFeeKey := make([]byte, 3+len(chanID)) - copy(commitFeeKey, commitFeePrefix) - copy(commitFeeKey[3:], chanID) +func deserializeChanCommit(r io.Reader) (ChannelCommitment, error) { + var c ChannelCommitment - return openChanBucket.Delete(commitFeeKey) -} - -func putChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { - var bc bytes.Buffer - if err := writeOutpoint(&bc, &channel.FundingOutpoint); err != nil { - return err - } - txnsKey := make([]byte, len(commitTxnsKey)+bc.Len()) - copy(txnsKey[:3], commitTxnsKey) - copy(txnsKey[3:], bc.Bytes()) - - var b bytes.Buffer - - if err := channel.CommitTx.Serialize(&b); err != nil { - return err + err := readElements(r, + &c.CommitHeight, &c.LocalLogIndex, &c.LocalHtlcIndex, &c.RemoteLogIndex, + &c.RemoteHtlcIndex, &c.LocalBalance, &c.RemoteBalance, + &c.CommitFee, &c.FeePerKw, &c.CommitTx, &c.CommitSig, + ) + if err != nil { + return c, err } - if err := wire.WriteVarBytes(&b, 0, channel.CommitSig); err != nil { - return err + c.Htlcs, err = deserializeHtlcs(r) + if err != nil { + return c, err } - return nodeChanBucket.Put(txnsKey, b.Bytes()) + return c, nil } -func deleteChanCommitTxns(nodeChanBucket *bolt.Bucket, chanID []byte) error { - txnsKey := make([]byte, len(commitTxnsKey)+len(chanID)) - copy(txnsKey[:3], commitTxnsKey) - copy(txnsKey[3:], chanID) - return nodeChanBucket.Delete(txnsKey) +func fetchChanCommitment(chanBucket *bolt.Bucket, local bool) (ChannelCommitment, error) { + var key []byte + copy(key[:], chanCommitmentKey) + if local { + key = append(key, byte(0x00)) + } else { + key = append(key, byte(0x01)) + } + + commitBytes := chanBucket.Get(key) + if commitBytes == nil { + return ChannelCommitment{}, ErrNoCommitmentsFound + } + + r := bytes.NewReader(commitBytes) + return deserializeChanCommit(r) } -func fetchChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { - var bc bytes.Buffer +func fetchChanCommitments(chanBucket *bolt.Bucket, channel *OpenChannel) error { var err error - if err = writeOutpoint(&bc, &channel.FundingOutpoint); err != nil { + + channel.LocalCommitment, err = fetchChanCommitment(chanBucket, true) + if err != nil { return err } - txnsKey := make([]byte, len(commitTxnsKey)+bc.Len()) - copy(txnsKey[:3], commitTxnsKey) - copy(txnsKey[3:], bc.Bytes()) - - txnBytes := bytes.NewReader(nodeChanBucket.Get(txnsKey)) - - channel.CommitTx = *wire.NewMsgTx(2) - if err = channel.CommitTx.Deserialize(txnBytes); err != nil { - return err - } - - channel.CommitSig, err = wire.ReadVarBytes(txnBytes, 0, 80, "") + channel.RemoteCommitment, err = fetchChanCommitment(chanBucket, false) if err != nil { return err } @@ -1948,803 +1559,99 @@ func fetchChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) erro return nil } -func putChanConfigs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { - var b bytes.Buffer +func fetchChanRevocationState(chanBucket *bolt.Bucket, channel *OpenChannel) error { + revBytes := chanBucket.Get(revocationStateKey) + if revBytes == nil { + return ErrNoRevocationsFound + } + r := bytes.NewReader(revBytes) - putChanConfig := func(cfg *ChannelConfig) error { - err := binary.Write(&b, byteOrder, cfg.DustLimit) - if err != nil { - return err - } - err = binary.Write(&b, byteOrder, cfg.MaxPendingAmount) - if err != nil { - return err - } - err = binary.Write(&b, byteOrder, cfg.ChanReserve) - if err != nil { - return err - } - err = binary.Write(&b, byteOrder, cfg.MinHTLC) - if err != nil { - return err - } - err = binary.Write(&b, byteOrder, cfg.CsvDelay) - if err != nil { - return err - } - err = binary.Write(&b, byteOrder, cfg.MaxAcceptedHtlcs) - if err != nil { - return err - } - - _, err = b.Write(cfg.MultiSigKey.SerializeCompressed()) - if err != nil { - return err - } - _, err = b.Write(cfg.RevocationBasePoint.SerializeCompressed()) - if err != nil { - return err - } - _, err = b.Write(cfg.PaymentBasePoint.SerializeCompressed()) - if err != nil { - return err - } - _, err = b.Write(cfg.DelayBasePoint.SerializeCompressed()) - if err != nil { - return err - } + err := readElements( + r, &channel.RemoteCurrentRevocation, &channel.RevocationProducer, + &channel.RevocationStore, + ) + if err != nil { + return err + } + // If there aren't any bytes left in the buffer, then we don't yet have + // the next remote revocation, so we can exit early here. + if r.Len() == 0 { return nil } - putChanConfig(&channel.LocalChanCfg) - putChanConfig(&channel.RemoteChanCfg) - - var bc bytes.Buffer - if err := writeOutpoint(&bc, &channel.FundingOutpoint); err != nil { - return err - } - configKey := make([]byte, len(chanConfigPrefix)+len(bc.Bytes())) - copy(configKey, chanConfigPrefix) - copy(configKey, bc.Bytes()) - - return nodeChanBucket.Put(configKey, b.Bytes()) + // Otherwise we'll read the next revocation for the remote party which + // is always the last item within the buffer. + return readElements(r, &channel.RemoteNextRevocation) } -func fetchChanConfigs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { - var bc bytes.Buffer - if err := writeOutpoint(&bc, &channel.FundingOutpoint); err != nil { +func deleteOpenChannel(chanBucket *bolt.Bucket, chanPointBytes []byte) error { + + if err := chanBucket.Delete(chanInfoKey); err != nil { return err } - configKey := make([]byte, len(chanConfigPrefix)+len(bc.Bytes())) - copy(configKey, chanConfigPrefix) - copy(configKey, bc.Bytes()) - configBytes := nodeChanBucket.Get(configKey) - if configBytes == nil { - return fmt.Errorf("unable to find channel config for %v: ", - channel.FundingOutpoint) - } - configReader := bytes.NewReader(configBytes) - - fetchChanConfig := func() (*ChannelConfig, error) { - cfg := &ChannelConfig{} - - err := binary.Read(configReader, byteOrder, &cfg.DustLimit) - if err != nil { - return nil, err - } - err = binary.Read(configReader, byteOrder, &cfg.MaxPendingAmount) - if err != nil { - return nil, err - } - err = binary.Read(configReader, byteOrder, &cfg.ChanReserve) - if err != nil { - return nil, err - } - err = binary.Read(configReader, byteOrder, &cfg.MinHTLC) - if err != nil { - return nil, err - } - err = binary.Read(configReader, byteOrder, &cfg.CsvDelay) - if err != nil { - return nil, err - } - err = binary.Read(configReader, byteOrder, &cfg.MaxAcceptedHtlcs) - if err != nil { - return nil, err - } - - var pub [33]byte - readKey := func() (*btcec.PublicKey, error) { - if _, err := io.ReadFull(configReader, pub[:]); err != nil { - return nil, err - } - return btcec.ParsePubKey(pub[:], btcec.S256()) - } - - cfg.MultiSigKey, err = readKey() - if err != nil { - return nil, err - } - cfg.RevocationBasePoint, err = readKey() - if err != nil { - return nil, err - } - cfg.PaymentBasePoint, err = readKey() - if err != nil { - return nil, err - } - cfg.DelayBasePoint, err = readKey() - if err != nil { - return nil, err - } - - return cfg, nil - } - - var err error - cfg, err := fetchChanConfig() + err := chanBucket.Delete(append(chanCommitmentKey, byte(0x00))) if err != nil { return err } - channel.LocalChanCfg = *cfg - - cfg, err = fetchChanConfig() - if err != nil { - return err - } - channel.RemoteChanCfg = *cfg - - return nil -} - -func deleteChanConfigs(nodeChanBucket *bolt.Bucket, chanID []byte) error { - configKey := make([]byte, len(chanConfigPrefix)+len(chanID)) - copy(configKey, chanConfigPrefix) - copy(configKey, chanID) - return nodeChanBucket.Delete(configKey) -} - -func putChanFundingInfo(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { - var bc bytes.Buffer - if err := writeOutpoint(&bc, &channel.FundingOutpoint); err != nil { - return err - } - fundTxnKey := make([]byte, len(fundingTxnKey)+bc.Len()) - copy(fundTxnKey[:3], fundingTxnKey) - copy(fundTxnKey[3:], bc.Bytes()) - - var b bytes.Buffer - - var boolByte [1]byte - if channel.IsInitiator { - boolByte[0] = 1 - } else { - boolByte[0] = 0 - } - - if err := binary.Write(&b, byteOrder, boolByte[:]); err != nil { - return err - } - - // TODO(roasbeef): make first field instead? - if _, err := b.Write([]byte{uint8(channel.ChanType)}); err != nil { - return err - } - if _, err := b.Write(channel.ChainHash[:]); err != nil { - return err - } - - var scratch [2]byte - byteOrder.PutUint16(scratch[:], channel.NumConfsRequired) - if _, err := b.Write(scratch[:]); err != nil { - return err - } - - return nodeChanBucket.Put(fundTxnKey, b.Bytes()) -} - -func deleteChanFundingInfo(nodeChanBucket *bolt.Bucket, chanID []byte) error { - fundTxnKey := make([]byte, len(fundingTxnKey)+len(chanID)) - copy(fundTxnKey[:3], fundingTxnKey) - copy(fundTxnKey[3:], chanID) - return nodeChanBucket.Delete(fundTxnKey) -} - -func fetchChanFundingInfo(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - fundTxnKey := make([]byte, len(fundingTxnKey)+b.Len()) - copy(fundTxnKey[:3], fundingTxnKey) - copy(fundTxnKey[3:], b.Bytes()) - - infoBytes := bytes.NewReader(nodeChanBucket.Get(fundTxnKey)) - - var err error - var boolByte [1]byte - err = binary.Read(infoBytes, byteOrder, boolByte[:]) - - if err != nil { - return err - } - if boolByte[0] == 1 { - channel.IsInitiator = true - } else { - channel.IsInitiator = false - } - - var chanType [1]byte - err = binary.Read(infoBytes, byteOrder, chanType[:]) - - if err != nil { - return err - } - channel.ChanType = ChannelType(chanType[0]) - err = binary.Read(infoBytes, byteOrder, channel.ChainHash[:]) - + err = chanBucket.Delete(append(chanCommitmentKey, byte(0x01))) if err != nil { return err } - var scratch [2]byte - if _, err := infoBytes.Read(scratch[:]); err != nil { - return err - } - channel.NumConfsRequired = byteOrder.Uint16(scratch[:]) - - return nil -} - -func putChanRevocationState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { - var b bytes.Buffer - - curRevKey := channel.RemoteCurrentRevocation.SerializeCompressed() - if err := wire.WriteVarBytes(&b, 0, curRevKey); err != nil { + if err := chanBucket.Delete(revocationStateKey); err != nil { return err } - // TODO(roasbeef): shouldn't be storing on disk, should re-derive as - // needed - if err := channel.RevocationProducer.Encode(&b); err != nil { - return err - } - if err := channel.RevocationStore.Encode(&b); err != nil { - return err - } - - var bc bytes.Buffer - if err := writeOutpoint(&bc, &channel.FundingOutpoint); err != nil { - return err - } - - // We place the next revocation key at the very end, as under certain - // circumstances (when a channel is initially funded), this value will - // not yet have been set. - // - // TODO(roasbeef): segment the storage? - if channel.RemoteNextRevocation != nil { - nextRevKey := channel.RemoteNextRevocation.SerializeCompressed() - if err := wire.WriteVarBytes(&b, 0, nextRevKey); err != nil { - return err - } - } - - revocationKey := make([]byte, len(revocationStateKey)+bc.Len()) - copy(revocationKey[:3], revocationStateKey) - copy(revocationKey[3:], bc.Bytes()) - return nodeChanBucket.Put(revocationKey, b.Bytes()) -} - -func deleteChanRevocationState(nodeChanBucket *bolt.Bucket, chanID []byte) error { - revocationKey := make([]byte, len(revocationStateKey)+len(chanID)) - copy(revocationKey[:3], revocationStateKey) - copy(revocationKey[3:], chanID) - return nodeChanBucket.Delete(revocationKey) -} - -func fetchChanRevocationState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - preimageKey := make([]byte, len(revocationStateKey)+b.Len()) - copy(preimageKey[:3], revocationStateKey) - copy(preimageKey[3:], b.Bytes()) - - reader := bytes.NewReader(nodeChanBucket.Get(preimageKey)) - - curRevKeyBytes, err := wire.ReadVarBytes(reader, 0, 1000, "") - if err != nil { - return err - } - channel.RemoteCurrentRevocation, err = btcec.ParsePubKey(curRevKeyBytes, btcec.S256()) - if err != nil { - return err - } - - // TODO(roasbeef): should be rederiving on fly, or encrypting on disk. - var root [32]byte - if _, err := io.ReadFull(reader, root[:]); err != nil { - return err - } - channel.RevocationProducer, err = shachain.NewRevocationProducerFromBytes(root[:]) - if err != nil { - return err - } - - channel.RevocationStore, err = shachain.NewRevocationStoreFromBytes(reader) - if err != nil { - return err - } - - // We'll attempt to see if the remote party's next revocation key is - // currently set, if so then we'll read and deserialize it. Otherwise, - // we can exit early. - if reader.Len() != 0 { - nextRevKeyBytes, err := wire.ReadVarBytes(reader, 0, 1000, "") - if err != nil { - return err - } - channel.RemoteNextRevocation, err = btcec.ParsePubKey( - nextRevKeyBytes, btcec.S256(), - ) - if err != nil { - return err - } + if diff := chanBucket.Get(commitDiffKey); diff != nil { + return chanBucket.Delete(commitDiffKey) } return nil + } -func putOurMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - scratch := make([]byte, 8) - byteOrder.PutUint64(scratch, channel.OurMessageIndex) +func makeLogKey(updateNum uint64) [8]byte { + var key [8]byte + byteOrder.PutUint64(key[:], updateNum) + return key +} + +func appendChannelLogEntry(log *bolt.Bucket, + commit *ChannelCommitment) error { var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { + if err := serializeChanCommit(&b, commit); err != nil { return err } - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix, ourIndexPrefix) - copy(keyPrefix[3:], b.Bytes()) - - return openChanBucket.Put(keyPrefix, scratch) -} - -func deleteOurMessageIndex(openChanBucket *bolt.Bucket, chanID []byte) error { - keyPrefix := make([]byte, 3+len(chanID)) - copy(keyPrefix, ourIndexPrefix) - copy(keyPrefix[3:], chanID) - return openChanBucket.Delete(keyPrefix) -} - -func fetchOurMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix, ourIndexPrefix) - copy(keyPrefix[3:], b.Bytes()) - - updateBytes := openChanBucket.Get(keyPrefix) - channel.OurMessageIndex = byteOrder.Uint64(updateBytes) - - return nil -} - -func putTheirMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - scratch := make([]byte, 8) - byteOrder.PutUint64(scratch, channel.TheirMessageIndex) - - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix, theirIndexPrefix) - copy(keyPrefix[3:], b.Bytes()) - - return openChanBucket.Put(keyPrefix, scratch) -} - -func deleteTheirMessageIndex(openChanBucket *bolt.Bucket, chanID []byte) error { - keyPrefix := make([]byte, 3+len(chanID)) - copy(keyPrefix, theirIndexPrefix) - copy(keyPrefix[3:], chanID) - return openChanBucket.Delete(keyPrefix) -} - -func fetchTheirMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error { - var b bytes.Buffer - if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil { - return err - } - - keyPrefix := make([]byte, 3+b.Len()) - copy(keyPrefix, theirIndexPrefix) - copy(keyPrefix[3:], b.Bytes()) - - updateBytes := openChanBucket.Get(keyPrefix) - channel.TheirMessageIndex = byteOrder.Uint64(updateBytes) - - return nil -} - -func serializeHTLC(w io.Writer, h *HTLC) error { - if err := wire.WriteVarBytes(w, 0, h.Signature); err != nil { - return err - } - - if _, err := w.Write(h.RHash[:]); err != nil { - return err - } - - if err := binary.Write(w, byteOrder, h.Amt); err != nil { - return err - } - if err := binary.Write(w, byteOrder, h.RefundTimeout); err != nil { - return err - } - if err := binary.Write(w, byteOrder, h.OutputIndex); err != nil { - return err - } - - var boolByte [1]byte - if h.Incoming { - boolByte[0] = 1 - } else { - boolByte[0] = 0 - } - - if err := binary.Write(w, byteOrder, boolByte[:]); err != nil { - return err - } - - var onionLength [2]byte - byteOrder.PutUint16(onionLength[:], uint16(len(h.OnionBlob))) - if _, err := w.Write(onionLength[:]); err != nil { - return err - } - - if _, err := w.Write(h.OnionBlob); err != nil { - return err - } - - if err := binary.Write(w, byteOrder, h.AddLocalInclusionHeight); err != nil { - return err - } - - if err := binary.Write(w, byteOrder, h.AddRemoteInclusionHeight); err != nil { - return err - } - - if err := binary.Write(w, byteOrder, h.DescriptorIndex); err != nil { - return err - } - - return nil -} - -func deserializeHTLC(r io.Reader) (*HTLC, error) { - h := &HTLC{} - var err error - - h.Signature, err = wire.ReadVarBytes(r, 0, 80, "") - if err != nil { - return nil, err - } - - if _, err := io.ReadFull(r, h.RHash[:]); err != nil { - return nil, err - } - - if err := binary.Read(r, byteOrder, &h.Amt); err != nil { - return nil, err - } - if err := binary.Read(r, byteOrder, &h.RefundTimeout); err != nil { - return nil, err - } - if err := binary.Read(r, byteOrder, &h.OutputIndex); err != nil { - return nil, err - } - - var scratch [1]byte - if err := binary.Read(r, byteOrder, scratch[:]); err != nil { - return nil, err - } - - if boolByte[0] == 1 { - h.Incoming = true - } else { - h.Incoming = false - } - - var onionLength [2]byte - if _, err := r.Read(onionLength[:]); err != nil { - return nil, err - } - - l := byteOrder.Uint16(onionLength[:]) - if l != 0 { - h.OnionBlob = make([]byte, l) - if _, err := io.ReadFull(r, h.OnionBlob); err != nil { - return nil, err - } - } - - if err := binary.Read(r, byteOrder, &h.AddLocalInclusionHeight); err != nil { - return nil, err - } - - if err := binary.Read(r, byteOrder, &h.AddRemoteInclusionHeight); err != nil { - return nil, err - } - - if err := binary.Read(r, byteOrder, &h.DescriptorIndex); err != nil { - return nil, err - } - - return h, nil -} - -func makeHtlcKey(o *wire.OutPoint) [39]byte { - var ( - n int - k [39]byte - ) - - // chk || txid || index - n += copy(k[:], currentHtlcKey) - n += copy(k[n:], o.Hash[:]) - var scratch [4]byte - byteOrder.PutUint32(scratch[:], o.Index) - copy(k[n:], scratch[:]) - - return k -} - -func putCurrentHtlcs(nodeChanBucket *bolt.Bucket, htlcs []*HTLC, - o *wire.OutPoint) error { - var b bytes.Buffer - - for _, htlc := range htlcs { - if err := serializeHTLC(&b, htlc); err != nil { - return err - } - } - - htlcKey := makeHtlcKey(o) - return nodeChanBucket.Put(htlcKey[:], b.Bytes()) -} - -func fetchCurrentHtlcs(nodeChanBucket *bolt.Bucket, - o *wire.OutPoint) ([]*HTLC, error) { - - htlcKey := makeHtlcKey(o) - htlcBytes := nodeChanBucket.Get(htlcKey[:]) - if htlcBytes == nil { - return nil, nil - } - - // TODO(roasbeef): can preallocate here - var htlcs []*HTLC - htlcReader := bytes.NewReader(htlcBytes) - for htlcReader.Len() != 0 { - htlc, err := deserializeHTLC(htlcReader) - if err != nil { - return nil, err - } - - htlcs = append(htlcs, htlc) - } - - return htlcs, nil -} - -func deleteCurrentHtlcs(nodeChanBucket *bolt.Bucket, o *wire.OutPoint) error { - htlcKey := makeHtlcKey(o) - return nodeChanBucket.Delete(htlcKey[:]) -} - -func serializeChannelDelta(w io.Writer, delta *ChannelDelta) error { - // TODO(roasbeef): could use compression here to reduce on-disk space. - var scratch [8]byte - byteOrder.PutUint64(scratch[:], uint64(delta.LocalBalance)) - if _, err := w.Write(scratch[:]); err != nil { - return err - } - byteOrder.PutUint64(scratch[:], uint64(delta.RemoteBalance)) - if _, err := w.Write(scratch[:]); err != nil { - return err - } - - byteOrder.PutUint64(scratch[:], delta.UpdateNum) - if _, err := w.Write(scratch[:]); err != nil { - return err - } - - numHtlcs := uint64(len(delta.Htlcs)) - if err := wire.WriteVarInt(w, 0, numHtlcs); err != nil { - return err - } - for _, htlc := range delta.Htlcs { - if err := serializeHTLC(w, htlc); err != nil { - return err - } - } - - byteOrder.PutUint64(scratch[:], uint64(delta.CommitFee)) - if _, err := w.Write(scratch[:]); err != nil { - return err - } - - byteOrder.PutUint64(scratch[:], uint64(delta.FeePerKw)) - if _, err := w.Write(scratch[:]); err != nil { - return err - } - - return nil -} - -func deserializeChannelDelta(r io.Reader) (*ChannelDelta, error) { - var ( - err error - scratch [8]byte - ) - - delta := &ChannelDelta{} - - if _, err := r.Read(scratch[:]); err != nil { - return nil, err - } - delta.LocalBalance = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:])) - if _, err := r.Read(scratch[:]); err != nil { - return nil, err - } - delta.RemoteBalance = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:])) - - if _, err := r.Read(scratch[:]); err != nil { - return nil, err - } - delta.UpdateNum = byteOrder.Uint64(scratch[:]) - - numHtlcs, err := wire.ReadVarInt(r, 0) - if err != nil { - return nil, err - } - delta.Htlcs = make([]*HTLC, numHtlcs) - for i := uint64(0); i < numHtlcs; i++ { - htlc, err := deserializeHTLC(r) - if err != nil { - return nil, err - } - - delta.Htlcs[i] = htlc - } - if _, err := r.Read(scratch[:]); err != nil { - return nil, err - } - delta.CommitFee = btcutil.Amount(byteOrder.Uint64(scratch[:])) - - if _, err := r.Read(scratch[:]); err != nil { - return nil, err - } - delta.FeePerKw = btcutil.Amount(byteOrder.Uint64(scratch[:])) - - return delta, nil -} - -func makeLogKey(o *wire.OutPoint, updateNum uint64) [44]byte { - var ( - scratch [8]byte - n int - - // txid (32) || index (4) || update_num (8) - // 32 + 4 + 8 = 44 - k [44]byte - ) - - n += copy(k[:], o.Hash[:]) - - byteOrder.PutUint32(scratch[:4], o.Index) - n += copy(k[n:], scratch[:4]) - - byteOrder.PutUint64(scratch[:], updateNum) - copy(k[n:], scratch[:]) - - return k -} - -func appendChannelLogEntry(log *bolt.Bucket, delta *ChannelDelta, - chanPoint *wire.OutPoint) error { - - var b bytes.Buffer - if err := serializeChannelDelta(&b, delta); err != nil { - return err - } - - logEntrykey := makeLogKey(chanPoint, delta.UpdateNum) + logEntrykey := makeLogKey(commit.CommitHeight) return log.Put(logEntrykey[:], b.Bytes()) } -func fetchChannelLogEntry(log *bolt.Bucket, chanPoint *wire.OutPoint, - updateNum uint64) (*ChannelDelta, error) { +func fetchChannelLogEntry(log *bolt.Bucket, + updateNum uint64) (ChannelCommitment, error) { - logEntrykey := makeLogKey(chanPoint, updateNum) - deltaBytes := log.Get(logEntrykey[:]) - if deltaBytes == nil { - return nil, fmt.Errorf("log entry not found") + logEntrykey := makeLogKey(updateNum) + commitBytes := log.Get(logEntrykey[:]) + if commitBytes == nil { + return ChannelCommitment{}, fmt.Errorf("log entry not found") } - deltaReader := bytes.NewReader(deltaBytes) - - return deserializeChannelDelta(deltaReader) + commitReader := bytes.NewReader(commitBytes) + return deserializeChanCommit(commitReader) } -func wipeChannelLogEntries(log *bolt.Bucket, o *wire.OutPoint) error { - var ( - n int - logPrefix [32 + 4]byte - scratch [4]byte - ) +func wipeChannelLogEntries(log *bolt.Bucket) error { + // TODO(roasbeef): comment - // First we'll construct a key prefix that we'll use to scan through - // and delete all the log entries related to this channel. The format - // for log entries within the database is: txid || index || update_num. - // We'll construct a prefix key with the first two thirds of the full - // key to scan with and delete all entries. - n += copy(logPrefix[:], o.Hash[:]) - byteOrder.PutUint32(scratch[:], o.Index) - copy(logPrefix[n:], scratch[:]) - - // With the prefix constructed, scan through the log bucket from the - // starting point of the log entries for this channel. We'll keep - // deleting keys until the prefix no longer matches. logCursor := log.Cursor() - for logKey, _ := logCursor.Seek(logPrefix[:]); bytes.HasPrefix(logKey, logPrefix[:]); logKey, _ = logCursor.Next() { - if err := log.Delete(logKey); err != nil { + for k, _ := logCursor.First(); k != nil; k, _ = logCursor.Next() { + if err := logCursor.Delete(); err != nil { return err } } return nil } - -func writeOutpoint(w io.Writer, o *wire.OutPoint) error { - // TODO(roasbeef): make all scratch buffers on the stack - scratch := make([]byte, 4) - - // TODO(roasbeef): write raw 32 bytes instead of wasting the extra - // byte. - if err := wire.WriteVarBytes(w, 0, o.Hash[:]); err != nil { - return err - } - - byteOrder.PutUint32(scratch, o.Index) - _, err := w.Write(scratch) - return err -} - -func readOutpoint(r io.Reader, o *wire.OutPoint) error { - scratch := make([]byte, 4) - - txid, err := wire.ReadVarBytes(r, 0, 32, "prevout") - if err != nil { - return err - } - copy(o.Hash[:], txid) - - if _, err := r.Read(scratch); err != nil { - return err - } - o.Index = byteOrder.Uint32(scratch) - - return nil -}