channeldb: modify storage of OpenChannel struct to use new codec.go

In this commit we comptely overhaul the existing storage of the
OpenChannel struct to use the new common serialization defined within
the codec.go file. Additionally, we’ve modified the structure of the
channel database on disk. Rather then use the existing prefix based
segmentation, everything is now bucket based. This has resulted in much
simpler and easier to follow code. The bucket progression is:
openChannelBucket -> nodeBucket -> chainBucket -> channelBucket. We add
a chainBucket as it’s possible that in the future we may have several
channels on distinct chains with a given node.

With the above changes, we’re able to delete much of the existing code
within the file, drastically reducing its size.
This commit is contained in:
Olaoluwa Osuntokun 2017-11-09 20:22:10 -08:00
parent 040c6bdf22
commit 4ae0b008ed
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -369,6 +369,8 @@ type OpenChannel struct {
// TODO(roasbeef): eww
Db *DB
// TODO(roasbeef): just need to store local and remote HTLC's?
sync.RWMutex
}
@ -382,43 +384,171 @@ func (c *OpenChannel) FullSync() error {
return c.Db.Update(c.fullSync)
}
// fullSync is an internal versino of the FullSync method which allows callers
// to sync the contents of an OpenChannel while re-using an existing database
// transaction.
func (c *OpenChannel) fullSync(tx *bolt.Tx) error {
// TODO(roasbeef): add helper funcs to create scoped update
// updateChanBucket is a helper function that returns a writeable bucket that a
// channel's data resides in given: the public key for the node, the outpoint,
// and the chainhash that the channel resides on.
func updateChanBucket(tx *bolt.Tx, nodeKey *btcec.PublicKey,
outPoint *wire.OutPoint, chainHash chainhash.Hash) (*bolt.Bucket, error) {
// First fetch the top level bucket which stores all data related to
// current, active channels.
chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket)
openChanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket)
if err != nil {
return err
return nil, err
}
// Within this top level bucket, fetch the bucket dedicated to storing
// open channel data specific to the remote node.
nodePub := c.IdentityPub.SerializeCompressed()
nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(nodePub)
nodePub := nodeKey.SerializeCompressed()
nodeChanBucket, err := openChanBucket.CreateBucketIfNotExists(nodePub)
if err != nil {
return nil, err
}
// We'll then recurse down an additional layer in order to fetch the
// bucket for this particular chain.
chainBucket, err := nodeChanBucket.CreateBucketIfNotExists(chainHash[:])
if err != nil {
return nil, err
}
// With the bucket for the node fetched, we can now go down another
// level, creating the bucket (if it doesn't exist), for this channel
// itself.
var chanPointBuf bytes.Buffer
chanPointBuf.Grow(outPointSize)
if err := writeOutpoint(&chanPointBuf, outPoint); err != nil {
return nil, err
}
chanBucket, err := chainBucket.CreateBucketIfNotExists(
chanPointBuf.Bytes(),
)
if err != nil {
return nil, err
}
return chanBucket, nil
}
// readChanBucket is a helper function that returns a readable bucket that a
// channel's data resides in given: the public key for the node, the outpoint,
// and the chainhash that the channel resides on.
func readChanBucket(tx *bolt.Tx, nodeKey *btcec.PublicKey,
outPoint *wire.OutPoint, chainHash chainhash.Hash) (*bolt.Bucket, error) {
// First fetch the top level bucket which stores all data related to
// current, active channels.
openChanBucket := tx.Bucket(openChannelBucket)
if openChanBucket == nil {
return nil, ErrNoChanDBExists
}
// Within this top level bucket, fetch the bucket dedicated to storing
// open channel data specific to the remote node.
nodePub := nodeKey.SerializeCompressed()
nodeChanBucket := openChanBucket.Bucket(nodePub)
if nodeChanBucket == nil {
return nil, ErrNoActiveChannels
}
// We'll then recurse down an additional layer in order to fetch the
// bucket for this particular chain.
chainBucket := nodeChanBucket.Bucket(chainHash[:])
if chainBucket == nil {
return nil, ErrNoActiveChannels
}
// With the bucket for the node fetched, we can now go down another
// level, for this channel iteslf.
var chanPointBuf bytes.Buffer
chanPointBuf.Grow(outPointSize)
if err := writeOutpoint(&chanPointBuf, outPoint); err != nil {
return nil, err
}
chanBucket := chainBucket.Bucket(chanPointBuf.Bytes())
if chanBucket == nil {
return nil, ErrNoActiveChannels
}
return chanBucket, nil
}
// fullSync is an internal version of the FullSync method which allows callers
// to sync the contents of an OpenChannel while re-using an existing database
// transaction.
func (c *OpenChannel) fullSync(tx *bolt.Tx) error {
chanBucket, err := updateChanBucket(tx, c.IdentityPub,
&c.FundingOutpoint, c.ChainHash)
if err != nil {
return err
}
// Add this channel ID to the node's active channel index if
// it doesn't already exist.
chanIndexBucket, err := nodeChanBucket.CreateBucketIfNotExists(chanIDBucket)
if err != nil {
return err
}
var b bytes.Buffer
if err := writeOutpoint(&b, &c.FundingOutpoint); err != nil {
return err
}
if chanIndexBucket.Get(b.Bytes()) == nil {
if err := chanIndexBucket.Put(b.Bytes(), nil); err != nil {
return putOpenChannel(chanBucket, c)
}
return err
}
channel.IsPending = false
channel.ShortChanID = openLoc
return putOpenChannel(chanBucket, channel)
})
}
// putChannel serializes, and stores the current state of the channel in its
// entirety.
func putOpenChannel(chanBucket *bolt.Bucket, channel *OpenChannel) error {
// First, we'll write out all the relatively static fields, that are
// decided upon initial channel creation.
if err := putChanInfo(chanBucket, channel); err != nil {
return fmt.Errorf("unable to store chan info: %v", err)
}
return putOpenChannel(chanBucket, nodeChanBucket, c)
// With the static channel info written out, we'll now write out the
// current commitment state for both parties.
if err := putChanCommitments(chanBucket, channel); err != nil {
return fmt.Errorf("unable to store chan commitments: %v", err)
}
// Finally, we'll write out the revocation state for both parties
// within a distinct key space.
if err := putChanRevocationState(chanBucket, channel); err != nil {
return fmt.Errorf("unable to store chan revocations: %v", err)
}
return nil
}
// fetchOpenChannel retrieves, and deserializes (including decrypting
// sensitive) the complete channel currently active with the passed nodeID.
func fetchOpenChannel(chanBucket *bolt.Bucket,
chanPoint *wire.OutPoint) (*OpenChannel, error) {
channel := &OpenChannel{
FundingOutpoint: *chanPoint,
}
// First, we'll read all the static information that changes less
// frequently from disk.
if err := fetchChanInfo(chanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to fetch chan info: %v", err)
}
// With the static information read, we'll now read the current
// commitment state for both sides of the channel.
if err := fetchChanCommitments(chanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to fetch chan commitments: %v", err)
}
// Finally, we'll retrieve the current revocation state so we can
// properly
if err := fetchChanRevocationState(chanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to fetch chan revocations: %v", err)
}
return channel, nil
}
// SyncPending writes the contents of the channel to the database while it's in
@ -460,9 +590,10 @@ func (c *OpenChannel) SyncPending(addr *net.TCPAddr, pendingHeight uint32) error
// relationship for this channel. The LinkNode metadata
// contains reachability, up-time, and service bits related
// information.
// TODO(roasbeef): net info should be in lnwire.NetAddress
linkNode := c.Db.NewLinkNode(wire.MainNet, c.IdentityPub, addr)
// TODO(roasbeef): do away with link node all together?
return putLinkNode(nodeInfoBucket, linkNode)
})
}
@ -538,32 +669,8 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *wire.MsgTx,
return nil
})
}
// UpdateHTLCs....
func (c *OpenChannel) UpdateHTLCs(htlcs []*HTLC) error {
c.Lock()
defer c.Unlock()
return c.Db.Update(func(tx *bolt.Tx) error {
chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket)
if err != nil {
return err
}
id := c.IdentityPub.SerializeCompressed()
nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(id)
if err != nil {
return err
}
if err := putCurrentHtlcs(nodeChanBucket, htlcs,
&c.FundingOutpoint); err != nil {
return err
}
return nil
})
}
// HTLC is the on-disk representation of a hash time-locked contract. HTLCs are
@ -614,11 +721,49 @@ type HTLC struct {
LogIndex uint64
}
// AddRemoteInclusionHeight...
AddRemoteInclusionHeight uint64
func serializeHtlcs(b io.Writer, htlcs []HTLC) error {
numHtlcs := uint16(len(htlcs))
if err := writeElement(b, numHtlcs); err != nil {
return err
}
// DescriptorIndex...
DescriptorIndex uint64
for _, htlc := range htlcs {
if err := writeElements(b,
htlc.Signature, htlc.RHash, htlc.Amt, htlc.RefundTimeout,
htlc.OutputIndex, htlc.Incoming, htlc.OnionBlob[:],
htlc.HtlcIndex, htlc.LogIndex,
); err != nil {
return err
}
}
return nil
}
func deserializeHtlcs(r io.Reader) ([]HTLC, error) {
var numHtlcs uint16
if err := readElement(r, &numHtlcs); err != nil {
return nil, err
}
var htlcs []HTLC
if numHtlcs == 0 {
return htlcs, nil
}
htlcs = make([]HTLC, numHtlcs)
for i := uint16(0); i < numHtlcs; i++ {
if err := readElements(r,
&htlcs[i].Signature, &htlcs[i].RHash, &htlcs[i].Amt,
&htlcs[i].RefundTimeout, &htlcs[i].OutputIndex,
&htlcs[i].Incoming, &htlcs[i].OnionBlob,
&htlcs[i].HtlcIndex, &htlcs[i].LogIndex,
); err != nil {
return htlcs, err
}
}
return htlcs, nil
}
// Copy returns a full copy of the target HTLC.
@ -635,44 +780,7 @@ func (h *HTLC) Copy() HTLC {
return clone
}
// ChannelDelta is a snapshot of the commitment state at a particular point in
// the commitment chain. With each state transition, a snapshot of the current
// state along with all non-settled HTLCs are recorded. These snapshots detail
// the state of the _remote_ party's commitment at a particular state number.
// For ourselves (the local node) we ONLY store our most recent (unrevoked)
// state for safety purposes.
type ChannelDelta struct {
// OurMessageIndex...
OurMessageIndex uint64
// TheirMessageIndex...
TheirMessageIndex uint64
// LocalBalance is our current balance at this particular update
// number.
LocalBalance lnwire.MilliSatoshi
// RemoteBalanceis the balance of the remote node at this particular
// update number.
RemoteBalance lnwire.MilliSatoshi
// CommitFee is the fee that has been subtracted from the channel
// initiator's balance at this point in the commitment chain.
CommitFee btcutil.Amount
// FeePerKw is the fee per kw used to calculate the commit fee at this point
// in the commit chain.
FeePerKw btcutil.Amount
// UpdateNum is the update number that this ChannelDelta represents the
// total number of commitment updates to this point. This can be viewed
// as sort of a "commitment height" as this number is monotonically
// increasing.
UpdateNum uint64
// Htlcs is the set of HTLC's that are pending at this particular
// commitment height.
Htlcs []*HTLC
}
// CommitDiff...
@ -803,21 +911,22 @@ func (c *OpenChannel) InsertNextRevocation(revKey *btcec.PublicKey) error {
c.Lock()
defer c.Unlock()
return c.Db.Update(func(tx *bolt.Tx) error {
chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket)
c.RemoteNextRevocation = revKey
err := c.Db.Update(func(tx *bolt.Tx) error {
chanBucket, err := updateChanBucket(tx, c.IdentityPub,
&c.FundingOutpoint, c.ChainHash)
if err != nil {
return err
}
id := c.IdentityPub.SerializeCompressed()
nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(id)
if err != nil {
return err
}
c.RemoteNextRevocation = revKey
return putChanRevocationState(nodeChanBucket, c)
return putChanRevocationState(chanBucket, c)
})
if err != nil {
return err
}
return nil
}
// AppendToRevocationLog records the new state transition within an on-disk
@ -935,26 +1044,29 @@ func (c *OpenChannel) RevocationLogTail() (*ChannelDelta, error) {
// order to allow multiple instances of a particular open channel to obtain a
// consistent view of the number of channel updates to data.
func (c *OpenChannel) CommitmentHeight() (uint64, error) {
// TODO(roasbeef): this is super hacky, remedy during refactor!!!
o := &OpenChannel{
FundingOutpoint: c.FundingOutpoint,
}
var height uint64
err := c.Db.View(func(tx *bolt.Tx) error {
// Get the bucket dedicated to storing the metadata for open
// channels.
openChanBucket := tx.Bucket(openChannelBucket)
if openChanBucket == nil {
return ErrNoActiveChannels
chanBucket, err := readChanBucket(tx, c.IdentityPub,
&c.FundingOutpoint, c.ChainHash)
if err != nil {
return err
}
return fetchChanNumUpdates(openChanBucket, o)
commit, err := fetchChanCommitment(chanBucket, true)
if err != nil {
return err
}
height = commit.CommitHeight
return nil
})
if err != nil {
return 0, nil
}
return o.NumUpdates, nil
return height, nil
}
// FindPreviousState scans through the append-only log in an attempt to recover
@ -1012,10 +1124,10 @@ const (
// _revoked_ channel state.
BreachClose
// FundingCanceled indicates that the channel never was fully opened before it
// was marked as closed in the database. This can happen if we or the remote
// fail at some point during the opening workflow, or we timeout waiting for
// the funding transaction to be confirmed.
// FundingCanceled indicates that the channel never was fully opened
// before it was marked as closed in the database. This can happen if
// we or the remote fail at some point during the opening workflow, or
// we timeout waiting for the funding transaction to be confirmed.
FundingCanceled
)
@ -1143,43 +1255,36 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary) error {
// ChannelSnapshot is a frozen snapshot of the current channel state. A
// snapshot is detached from the original channel that generated it, providing
// read-only access to the current or prior state of an active channel.
//
// TODO(roasbeef): remove all together? pretty much just commitment
type ChannelSnapshot struct {
// RemoteIdentity is the identity public key of the remote node that we
// are maintaining the open channel with.
RemoteIdentity btcec.PublicKey
// ChannelPoint is the channel point that uniquly identifies the
// channel whose delta this is.
// ChanPoint is the outpoint that created the channel. This output is
// found within the funding transaction and uniquely identified the
// channel on the resident chain.
ChannelPoint wire.OutPoint
// Capacity is the total capacity of the channel in satoshis.
// ChainHash is the genesis hash of the chain that the channel resides
// within.
ChainHash chainhash.Hash
// Capacity is the total capacity of the channel.
Capacity btcutil.Amount
// LocalBalance is the amount of mSAT allocated to the local party.
LocalBalance lnwire.MilliSatoshi
// TotalMSatSent is the total number of milli-satoshis we've sent
// within this channel.
TotalMSatSent lnwire.MilliSatoshi
// RemoteBalance is the amount of mSAT allocated to the remote party.
RemoteBalance lnwire.MilliSatoshi
// TotalMSatReceived is the total number of milli-satoshis we've
// received within this channel.
TotalMSatReceived lnwire.MilliSatoshi
// NumUpdates is the number of updates that have taken place within the
// commitment transaction itself.
NumUpdates uint64
// CommitFee is the total fee paid on the commitment transaction at
// this current commitment state.
CommitFee btcutil.Amount
// TotalMilliSatoshisSent is the total number of mSAT sent by the local
// party at this current commitment instance.
TotalMilliSatoshisSent lnwire.MilliSatoshi
// TotalMilliSatoshisReceived is the total number of mSAT received by
// the local party current commitment instance.
TotalMilliSatoshisReceived lnwire.MilliSatoshi
// Htlcs is the current set of outstanding HTLC's live on the
// commitment transaction at this instance.
Htlcs []HTLC
// ChannelCommitment is the current up-to-date commitment for the
// target channel.
ChannelCommitment
}
// Snapshot returns a read-only snapshot of the current channel state. This
@ -1189,22 +1294,26 @@ func (c *OpenChannel) Snapshot() *ChannelSnapshot {
c.RLock()
defer c.RUnlock()
localCommit := c.LocalCommitment
snapshot := &ChannelSnapshot{
RemoteIdentity: *c.IdentityPub,
ChannelPoint: c.FundingOutpoint,
Capacity: c.Capacity,
LocalBalance: c.LocalBalance,
RemoteBalance: c.RemoteBalance,
NumUpdates: c.NumUpdates,
CommitFee: c.CommitFee,
TotalMilliSatoshisSent: c.TotalMSatSent,
TotalMilliSatoshisReceived: c.TotalMSatReceived,
RemoteIdentity: *c.IdentityPub,
ChannelPoint: c.FundingOutpoint,
Capacity: c.Capacity,
TotalMSatSent: c.TotalMSatSent,
TotalMSatReceived: c.TotalMSatReceived,
ChainHash: c.ChainHash,
ChannelCommitment: ChannelCommitment{
LocalBalance: localCommit.LocalBalance,
RemoteBalance: localCommit.RemoteBalance,
CommitHeight: localCommit.CommitHeight,
CommitFee: localCommit.CommitFee,
},
}
// Copy over the current set of HTLCs to ensure the caller can't
// mutate our internal state.
snapshot.Htlcs = make([]HTLC, len(c.Htlcs))
for i, h := range c.Htlcs {
// Copy over the current set of HTLCs to ensure the caller can't mutate
// our internal state.
snapshot.Htlcs = make([]HTLC, len(localCommit.Htlcs))
for i, h := range localCommit.Htlcs {
snapshot.Htlcs[i] = h.Copy()
}
@ -1228,37 +1337,11 @@ func putChannelCloseSummary(tx *bolt.Tx, chanID []byte,
}
func serializeChannelCloseSummary(w io.Writer, cs *ChannelCloseSummary) error {
if err := binary.Write(w, byteOrder, cs.IsPending); err != nil {
return err
}
if err := writeOutpoint(w, &cs.ChanPoint); err != nil {
return err
}
if _, err := w.Write(cs.ClosingTXID[:]); err != nil {
return err
}
if err := binary.Write(w, byteOrder, cs.SettledBalance); err != nil {
return err
}
if err := binary.Write(w, byteOrder, cs.TimeLockedBalance); err != nil {
return err
}
if err := binary.Write(w, byteOrder, cs.Capacity); err != nil {
return err
}
if _, err := w.Write([]byte{byte(cs.CloseType)}); err != nil {
return err
}
pub := cs.RemotePub.SerializeCompressed()
if _, err := w.Write(pub); err != nil {
return err
}
return nil
return writeElements(w,
cs.ChanPoint, cs.ChainHash, cs.ClosingTXID, cs.RemotePub, cs.Capacity,
cs.SettledBalance, cs.TimeLockedBalance, cs.CloseType,
cs.IsPending,
)
}
func fetchChannelCloseSummary(tx *bolt.Tx,
@ -1281,40 +1364,11 @@ func fetchChannelCloseSummary(tx *bolt.Tx,
func deserializeCloseChannelSummary(r io.Reader) (*ChannelCloseSummary, error) {
c := &ChannelCloseSummary{}
var err error
if err := binary.Read(r, byteOrder, &c.IsPending); err != nil {
return nil, err
}
if err := readOutpoint(r, &c.ChanPoint); err != nil {
return nil, err
}
if _, err := io.ReadFull(r, c.ClosingTXID[:]); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &c.SettledBalance); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &c.TimeLockedBalance); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &c.Capacity); err != nil {
return nil, err
}
var closeType [1]byte
if err := binary.Read(r, byteOrder, closeType[:]); err != nil {
return nil, err
}
c.CloseType = ClosureType(closeType[0])
var pub [33]byte
if _, err := io.ReadFull(r, pub[:]); err != nil {
return nil, err
}
c.RemotePub, err = btcec.ParsePubKey(pub[:], btcec.S256())
err := readElements(r,
&c.ChanPoint, &c.ChainHash, &c.ClosingTXID, &c.RemotePub, &c.Capacity,
&c.SettledBalance, &c.TimeLockedBalance, &c.CloseType,
&c.IsPending,
)
if err != nil {
return nil, err
}
@ -1322,625 +1376,182 @@ func deserializeCloseChannelSummary(r io.Reader) (*ChannelCloseSummary, error) {
return c, nil
}
// putChannel serializes, and stores the current state of the channel in its
// entirety.
func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
channel *OpenChannel) error {
// First write out all the "common" fields using the field's prefix
// append with the channel's ID. These fields go into a top-level
// bucket to allow for ease of metric aggregation via efficient prefix
// scans.
if err := putChanCapacity(openChanBucket, channel); err != nil {
return err
}
if err := putChanFeePerKw(openChanBucket, channel); err != nil {
return err
}
if err := putChanNumUpdates(openChanBucket, channel); err != nil {
return err
}
if err := putOurMessageIndex(openChanBucket, channel); err != nil {
return err
}
if err := putTheirMessageIndex(openChanBucket, channel); err != nil {
return err
}
if err := putChanAmountsTransferred(openChanBucket, channel); err != nil {
return err
}
if err := putChanIsPending(openChanBucket, channel); err != nil {
return err
}
if err := putChanConfInfo(openChanBucket, channel); err != nil {
return err
}
if err := putChanCommitFee(openChanBucket, channel); err != nil {
func putChanInfo(chanBucket *bolt.Bucket, channel *OpenChannel) error {
var w bytes.Buffer
if err := writeElements(&w,
channel.ChanType, channel.ChainHash, channel.FundingOutpoint,
channel.ShortChanID, channel.IsPending, channel.IsInitiator,
channel.FundingBroadcastHeight, channel.NumConfsRequired,
channel.IdentityPub, channel.Capacity, channel.TotalMSatSent,
channel.TotalMSatReceived,
); err != nil {
return err
}
// Next, write out the fields of the channel update less frequently.
if err := putChannelIDs(nodeChanBucket, channel); err != nil {
writeChanConfig := func(b io.Writer, c *ChannelConfig) error {
return writeElements(b,
c.DustLimit, c.MaxPendingAmount, c.ChanReserve, c.MinHTLC,
c.MaxAcceptedHtlcs, c.CsvDelay, c.MultiSigKey,
c.RevocationBasePoint, c.PaymentBasePoint, c.DelayBasePoint,
)
}
if err := writeChanConfig(&w, &channel.LocalChanCfg); err != nil {
return err
}
if err := putChanConfigs(nodeChanBucket, channel); err != nil {
return err
}
if err := putChanCommitTxns(nodeChanBucket, channel); err != nil {
return err
}
if err := putChanFundingInfo(nodeChanBucket, channel); err != nil {
return err
}
if err := putChanRevocationState(nodeChanBucket, channel); err != nil {
return err
}
if err := putCurrentHtlcs(nodeChanBucket, channel.Htlcs,
&channel.FundingOutpoint); err != nil {
if err := writeChanConfig(&w, &channel.RemoteChanCfg); err != nil {
return err
}
return nil
return chanBucket.Put(chanInfoKey, w.Bytes())
}
// fetchOpenChannel retrieves, and deserializes (including decrypting
// sensitive) the complete channel currently active with the passed nodeID.
func fetchOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
chanID *wire.OutPoint) (*OpenChannel, error) {
var err error
channel := &OpenChannel{
FundingOutpoint: *chanID,
func serializeChanCommit(w io.Writer, c *ChannelCommitment) error {
if err := writeElements(w,
c.CommitHeight, c.LocalLogIndex, c.LocalHtlcIndex,
c.RemoteLogIndex, c.RemoteHtlcIndex, c.LocalBalance,
c.RemoteBalance, c.CommitFee, c.FeePerKw, c.CommitTx,
c.CommitSig,
); err != nil {
return err
}
// First, read out the fields of the channel update less frequently.
if err = fetchChannelIDs(nodeChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read chan ID's: %v", err)
}
if err = fetchChanConfigs(nodeChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read chan config: %v", err)
}
if err = fetchChanCommitTxns(nodeChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read commit txns: %v", err)
}
if err = fetchChanFundingInfo(nodeChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read funding info: %v", err)
}
if err = fetchChanRevocationState(nodeChanBucket, channel); err != nil {
return nil, err
}
channel.Htlcs, err = fetchCurrentHtlcs(nodeChanBucket, chanID)
if err != nil {
return nil, fmt.Errorf("unable to read current htlc's: %v", err)
}
// With the existence of an open channel bucket with this node verified,
// perform a full read of the entire struct. Starting with the prefixed
// fields residing in the parent bucket.
// TODO(roasbeef): combine the below into channel config like key
if err = fetchChanCapacity(openChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read chan capacity: %v", err)
}
if err = fetchChanMinFeePerKw(openChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read fee-per-kb: %v", err)
}
if err = fetchChanNumUpdates(openChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read num updates: %v", err)
}
if err = fetchOurMessageIndex(openChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read our message index: %v", err)
}
if err = fetchTheirMessageIndex(openChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read their message index: %v", err)
}
if err = fetchChanAmountsTransferred(openChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read sat transferred: %v", err)
}
if err = fetchChanIsPending(openChanBucket, channel); err != nil {
return nil, err
}
if err := fetchChanConfInfo(openChanBucket, channel); err != nil {
return nil, err
}
if err = fetchChanCommitFee(openChanBucket, channel); err != nil {
return nil, err
}
return channel, nil
return serializeHtlcs(w, c.Htlcs)
}
func deleteOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
channelID []byte, o *wire.OutPoint) error {
func putChanCommitment(chanBucket *bolt.Bucket, c *ChannelCommitment,
local bool) error {
// First we'll delete all the "common" top level items stored outside
// the node's channel bucket.
if err := deleteChanCapacity(openChanBucket, channelID); err != nil {
return err
}
if err := deleteChanMinFeePerKw(openChanBucket, channelID); err != nil {
return err
}
if err := deleteChanNumUpdates(openChanBucket, channelID); err != nil {
return err
}
if err := deleteOurMessageIndex(openChanBucket, channelID); err != nil {
return err
}
if err := deleteTheirMessageIndex(openChanBucket, channelID); err != nil {
return err
}
if err := deleteChanAmountsTransferred(openChanBucket, channelID); err != nil {
return err
}
if err := deleteChanIsPending(openChanBucket, channelID); err != nil {
return err
}
if err := deleteChanConfInfo(openChanBucket, channelID); err != nil {
return err
}
if err := deleteChanCommitFee(openChanBucket, channelID); err != nil {
return err
}
// Finally, delete all the fields directly within the node's channel
// bucket.
if err := deleteChannelIDs(nodeChanBucket, channelID); err != nil {
return err
}
if err := deleteChanConfigs(nodeChanBucket, channelID); err != nil {
return err
}
if err := deleteChanCommitTxns(nodeChanBucket, channelID); err != nil {
return err
}
if err := deleteChanFundingInfo(nodeChanBucket, channelID); err != nil {
return err
}
if err := deleteChanRevocationState(nodeChanBucket, channelID); err != nil {
return err
}
if err := deleteCurrentHtlcs(nodeChanBucket, o); err != nil {
return err
}
return nil
}
func putChanCapacity(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
// Some scratch bytes re-used for serializing each of the uint64's.
scratch1 := make([]byte, 8)
scratch2 := make([]byte, 8)
scratch3 := make([]byte, 8)
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix[3:], b.Bytes())
copy(keyPrefix[:3], chanCapacityPrefix)
byteOrder.PutUint64(scratch1, uint64(channel.Capacity))
if err := openChanBucket.Put(keyPrefix, scratch1); err != nil {
return err
}
copy(keyPrefix[:3], selfBalancePrefix)
byteOrder.PutUint64(scratch2, uint64(channel.LocalBalance))
if err := openChanBucket.Put(keyPrefix, scratch2); err != nil {
return err
}
copy(keyPrefix[:3], theirBalancePrefix)
byteOrder.PutUint64(scratch3, uint64(channel.RemoteBalance))
return openChanBucket.Put(keyPrefix, scratch3)
}
func deleteChanCapacity(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix[3:], chanID)
copy(keyPrefix[:3], chanCapacityPrefix)
if err := openChanBucket.Delete(keyPrefix); err != nil {
return err
}
copy(keyPrefix[:3], selfBalancePrefix)
if err := openChanBucket.Delete(keyPrefix); err != nil {
return err
}
copy(keyPrefix[:3], theirBalancePrefix)
return openChanBucket.Delete(keyPrefix)
}
func fetchChanCapacity(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
// A byte slice re-used to compute each key prefix below.
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix[3:], b.Bytes())
copy(keyPrefix[:3], chanCapacityPrefix)
capacityBytes := openChanBucket.Get(keyPrefix)
channel.Capacity = btcutil.Amount(byteOrder.Uint64(capacityBytes))
copy(keyPrefix[:3], selfBalancePrefix)
selfBalanceBytes := openChanBucket.Get(keyPrefix)
channel.LocalBalance = lnwire.MilliSatoshi(byteOrder.Uint64(selfBalanceBytes))
copy(keyPrefix[:3], theirBalancePrefix)
theirBalanceBytes := openChanBucket.Get(keyPrefix)
channel.RemoteBalance = lnwire.MilliSatoshi(byteOrder.Uint64(theirBalanceBytes))
return nil
}
func putChanFeePerKw(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
scratch := make([]byte, 8)
byteOrder.PutUint64(scratch, uint64(channel.FeePerKw))
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix, minFeePerKwPrefix)
copy(keyPrefix[3:], b.Bytes())
return openChanBucket.Put(keyPrefix, scratch)
}
func deleteChanMinFeePerKw(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix, minFeePerKwPrefix)
copy(keyPrefix[3:], chanID)
return openChanBucket.Delete(keyPrefix)
}
func fetchChanMinFeePerKw(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix, minFeePerKwPrefix)
copy(keyPrefix[3:], b.Bytes())
feeBytes := openChanBucket.Get(keyPrefix)
channel.FeePerKw = btcutil.Amount(byteOrder.Uint64(feeBytes))
return nil
}
func putChanNumUpdates(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
scratch := make([]byte, 8)
byteOrder.PutUint64(scratch, channel.NumUpdates)
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix, updatePrefix)
copy(keyPrefix[3:], b.Bytes())
return openChanBucket.Put(keyPrefix, scratch)
}
func deleteChanNumUpdates(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix, updatePrefix)
copy(keyPrefix[3:], chanID)
return openChanBucket.Delete(keyPrefix)
}
func fetchChanNumUpdates(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix, updatePrefix)
copy(keyPrefix[3:], b.Bytes())
updateBytes := openChanBucket.Get(keyPrefix)
channel.NumUpdates = byteOrder.Uint64(updateBytes)
return nil
}
func putChanAmountsTransferred(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
scratch1 := make([]byte, 8)
scratch2 := make([]byte, 8)
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix[3:], b.Bytes())
copy(keyPrefix[:3], satSentPrefix)
byteOrder.PutUint64(scratch1, uint64(channel.TotalMSatSent))
if err := openChanBucket.Put(keyPrefix, scratch1); err != nil {
return err
}
copy(keyPrefix[:3], satReceivedPrefix)
byteOrder.PutUint64(scratch2, uint64(channel.TotalMSatReceived))
return openChanBucket.Put(keyPrefix, scratch2)
}
func deleteChanAmountsTransferred(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix[3:], chanID)
copy(keyPrefix[:3], satSentPrefix)
if err := openChanBucket.Delete(keyPrefix); err != nil {
return err
}
copy(keyPrefix[:3], satReceivedPrefix)
return openChanBucket.Delete(keyPrefix)
}
func fetchChanAmountsTransferred(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix[3:], b.Bytes())
copy(keyPrefix[:3], satSentPrefix)
totalSentBytes := openChanBucket.Get(keyPrefix)
channel.TotalMSatSent = lnwire.MilliSatoshi(byteOrder.Uint64(totalSentBytes))
copy(keyPrefix[:3], satReceivedPrefix)
totalReceivedBytes := openChanBucket.Get(keyPrefix)
channel.TotalMSatReceived = lnwire.MilliSatoshi(byteOrder.Uint64(totalReceivedBytes))
return nil
}
func putChanIsPending(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
scratch := make([]byte, 2)
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix[3:], b.Bytes())
copy(keyPrefix[:3], isPendingPrefix)
if channel.IsPending {
byteOrder.PutUint16(scratch, uint16(1))
return openChanBucket.Put(keyPrefix, scratch)
}
byteOrder.PutUint16(scratch, uint16(0))
return openChanBucket.Put(keyPrefix, scratch)
}
func deleteChanIsPending(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix[3:], chanID)
copy(keyPrefix[:3], isPendingPrefix)
return openChanBucket.Delete(keyPrefix)
}
func fetchChanIsPending(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix[3:], b.Bytes())
copy(keyPrefix[:3], isPendingPrefix)
isPending := byteOrder.Uint16(openChanBucket.Get(keyPrefix))
if isPending == 1 {
channel.IsPending = true
var key []byte
copy(key[:], chanCommitmentKey)
if local {
key = append(key, byte(0x00))
} else {
channel.IsPending = false
key = append(key, byte(0x01))
}
return nil
}
func putChanConfInfo(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
if err := serializeChanCommit(&b, c); err != nil {
return err
}
keyPrefix := make([]byte, len(confInfoPrefix)+b.Len())
copy(keyPrefix[:len(confInfoPrefix)], confInfoPrefix)
copy(keyPrefix[len(confInfoPrefix):], b.Bytes())
// We store the conf info in the following format: broadcast || open.
var scratch [12]byte
byteOrder.PutUint32(scratch[:], channel.FundingBroadcastHeight)
byteOrder.PutUint64(scratch[4:], channel.ShortChanID.ToUint64())
return openChanBucket.Put(keyPrefix, scratch[:])
return chanBucket.Put(key, b.Bytes())
}
func fetchChanConfInfo(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, len(confInfoPrefix)+b.Len())
copy(keyPrefix[:len(confInfoPrefix)], confInfoPrefix)
copy(keyPrefix[len(confInfoPrefix):], b.Bytes())
confInfoBytes := openChanBucket.Get(keyPrefix)
channel.FundingBroadcastHeight = byteOrder.Uint32(confInfoBytes[:4])
channel.ShortChanID = lnwire.NewShortChanIDFromInt(
byteOrder.Uint64(confInfoBytes[4:]),
)
return nil
}
func deleteChanConfInfo(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, len(confInfoPrefix)+len(chanID))
copy(keyPrefix[:len(confInfoPrefix)], confInfoPrefix)
copy(keyPrefix[len(confInfoPrefix):], chanID)
return openChanBucket.Delete(keyPrefix)
}
func putChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
// TODO(roasbeef): just pass in chanID everywhere for puts
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
// Construct the id key: cid || channelID.
// TODO(roasbeef): abstract out to func
idKey := make([]byte, len(chanIDKey)+b.Len())
copy(idKey[:3], chanIDKey)
copy(idKey[3:], b.Bytes())
idBytes := channel.IdentityPub.SerializeCompressed()
return nodeChanBucket.Put(idKey, idBytes)
}
func deleteChannelIDs(nodeChanBucket *bolt.Bucket, chanID []byte) error {
idKey := make([]byte, len(chanIDKey)+len(chanID))
copy(idKey[:3], chanIDKey)
copy(idKey[3:], chanID)
return nodeChanBucket.Delete(idKey)
}
func fetchChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
var (
err error
b bytes.Buffer
)
if err = writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
// Construct the id key: cid || channelID.
idKey := make([]byte, len(chanIDKey)+b.Len())
copy(idKey[:3], chanIDKey)
copy(idKey[3:], b.Bytes())
idBytes := nodeChanBucket.Get(idKey)
channel.IdentityPub, err = btcec.ParsePubKey(idBytes, btcec.S256())
func putChanCommitments(chanBucket *bolt.Bucket, channel *OpenChannel) error {
err := putChanCommitment(chanBucket, &channel.LocalCommitment, true)
if err != nil {
return err
}
return nil
return putChanCommitment(chanBucket, &channel.RemoteCommitment, false)
}
func putChanCommitFee(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
scratch := make([]byte, 8)
byteOrder.PutUint64(scratch, uint64(channel.CommitFee))
func putChanRevocationState(chanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
err := writeElements(
&b, channel.RemoteCurrentRevocation, channel.RevocationProducer,
channel.RevocationStore,
)
if err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix, commitFeePrefix)
copy(keyPrefix[3:], b.Bytes())
// TODO(roasbeef): don't keep producer on disk
return openChanBucket.Put(keyPrefix, scratch)
// If the next revocation is present, which is only the case after the
// FundingLocked message has been sent, then we'll write it to disk.
if channel.RemoteNextRevocation != nil {
err = writeElements(&b, channel.RemoteNextRevocation)
if err != nil {
return err
}
}
return chanBucket.Put(revocationStateKey, b.Bytes())
}
func fetchChanCommitFee(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
func fetchChanInfo(chanBucket *bolt.Bucket, channel *OpenChannel) error {
infoBytes := chanBucket.Get(chanInfoKey)
if infoBytes == nil {
return ErrNoChanInfoFound
}
r := bytes.NewReader(infoBytes)
if err := readElements(r,
&channel.ChanType, &channel.ChainHash, &channel.FundingOutpoint,
&channel.ShortChanID, &channel.IsPending, &channel.IsInitiator,
&channel.FundingBroadcastHeight, &channel.NumConfsRequired,
&channel.IdentityPub, &channel.Capacity, &channel.TotalMSatSent,
&channel.TotalMSatReceived,
); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix, commitFeePrefix)
copy(keyPrefix[3:], b.Bytes())
commitFeeBytes := openChanBucket.Get(keyPrefix)
channel.CommitFee = btcutil.Amount(byteOrder.Uint64(commitFeeBytes))
readChanConfig := func(b io.Reader, c *ChannelConfig) error {
return readElements(b,
&c.DustLimit, &c.MaxPendingAmount, &c.ChanReserve,
&c.MinHTLC, &c.MaxAcceptedHtlcs, &c.CsvDelay,
&c.MultiSigKey, &c.RevocationBasePoint,
&c.PaymentBasePoint, &c.DelayBasePoint,
)
}
if err := readChanConfig(r, &channel.LocalChanCfg); err != nil {
return err
}
if err := readChanConfig(r, &channel.RemoteChanCfg); err != nil {
return err
}
return nil
}
func deleteChanCommitFee(openChanBucket *bolt.Bucket, chanID []byte) error {
commitFeeKey := make([]byte, 3+len(chanID))
copy(commitFeeKey, commitFeePrefix)
copy(commitFeeKey[3:], chanID)
func deserializeChanCommit(r io.Reader) (ChannelCommitment, error) {
var c ChannelCommitment
return openChanBucket.Delete(commitFeeKey)
}
func putChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
var bc bytes.Buffer
if err := writeOutpoint(&bc, &channel.FundingOutpoint); err != nil {
return err
}
txnsKey := make([]byte, len(commitTxnsKey)+bc.Len())
copy(txnsKey[:3], commitTxnsKey)
copy(txnsKey[3:], bc.Bytes())
var b bytes.Buffer
if err := channel.CommitTx.Serialize(&b); err != nil {
return err
err := readElements(r,
&c.CommitHeight, &c.LocalLogIndex, &c.LocalHtlcIndex, &c.RemoteLogIndex,
&c.RemoteHtlcIndex, &c.LocalBalance, &c.RemoteBalance,
&c.CommitFee, &c.FeePerKw, &c.CommitTx, &c.CommitSig,
)
if err != nil {
return c, err
}
if err := wire.WriteVarBytes(&b, 0, channel.CommitSig); err != nil {
return err
c.Htlcs, err = deserializeHtlcs(r)
if err != nil {
return c, err
}
return nodeChanBucket.Put(txnsKey, b.Bytes())
return c, nil
}
func deleteChanCommitTxns(nodeChanBucket *bolt.Bucket, chanID []byte) error {
txnsKey := make([]byte, len(commitTxnsKey)+len(chanID))
copy(txnsKey[:3], commitTxnsKey)
copy(txnsKey[3:], chanID)
return nodeChanBucket.Delete(txnsKey)
func fetchChanCommitment(chanBucket *bolt.Bucket, local bool) (ChannelCommitment, error) {
var key []byte
copy(key[:], chanCommitmentKey)
if local {
key = append(key, byte(0x00))
} else {
key = append(key, byte(0x01))
}
commitBytes := chanBucket.Get(key)
if commitBytes == nil {
return ChannelCommitment{}, ErrNoCommitmentsFound
}
r := bytes.NewReader(commitBytes)
return deserializeChanCommit(r)
}
func fetchChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
var bc bytes.Buffer
func fetchChanCommitments(chanBucket *bolt.Bucket, channel *OpenChannel) error {
var err error
if err = writeOutpoint(&bc, &channel.FundingOutpoint); err != nil {
channel.LocalCommitment, err = fetchChanCommitment(chanBucket, true)
if err != nil {
return err
}
txnsKey := make([]byte, len(commitTxnsKey)+bc.Len())
copy(txnsKey[:3], commitTxnsKey)
copy(txnsKey[3:], bc.Bytes())
txnBytes := bytes.NewReader(nodeChanBucket.Get(txnsKey))
channel.CommitTx = *wire.NewMsgTx(2)
if err = channel.CommitTx.Deserialize(txnBytes); err != nil {
return err
}
channel.CommitSig, err = wire.ReadVarBytes(txnBytes, 0, 80, "")
channel.RemoteCommitment, err = fetchChanCommitment(chanBucket, false)
if err != nil {
return err
}
@ -1948,803 +1559,99 @@ func fetchChanCommitTxns(nodeChanBucket *bolt.Bucket, channel *OpenChannel) erro
return nil
}
func putChanConfigs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
func fetchChanRevocationState(chanBucket *bolt.Bucket, channel *OpenChannel) error {
revBytes := chanBucket.Get(revocationStateKey)
if revBytes == nil {
return ErrNoRevocationsFound
}
r := bytes.NewReader(revBytes)
putChanConfig := func(cfg *ChannelConfig) error {
err := binary.Write(&b, byteOrder, cfg.DustLimit)
if err != nil {
return err
}
err = binary.Write(&b, byteOrder, cfg.MaxPendingAmount)
if err != nil {
return err
}
err = binary.Write(&b, byteOrder, cfg.ChanReserve)
if err != nil {
return err
}
err = binary.Write(&b, byteOrder, cfg.MinHTLC)
if err != nil {
return err
}
err = binary.Write(&b, byteOrder, cfg.CsvDelay)
if err != nil {
return err
}
err = binary.Write(&b, byteOrder, cfg.MaxAcceptedHtlcs)
if err != nil {
return err
}
_, err = b.Write(cfg.MultiSigKey.SerializeCompressed())
if err != nil {
return err
}
_, err = b.Write(cfg.RevocationBasePoint.SerializeCompressed())
if err != nil {
return err
}
_, err = b.Write(cfg.PaymentBasePoint.SerializeCompressed())
if err != nil {
return err
}
_, err = b.Write(cfg.DelayBasePoint.SerializeCompressed())
if err != nil {
return err
}
err := readElements(
r, &channel.RemoteCurrentRevocation, &channel.RevocationProducer,
&channel.RevocationStore,
)
if err != nil {
return err
}
// If there aren't any bytes left in the buffer, then we don't yet have
// the next remote revocation, so we can exit early here.
if r.Len() == 0 {
return nil
}
putChanConfig(&channel.LocalChanCfg)
putChanConfig(&channel.RemoteChanCfg)
var bc bytes.Buffer
if err := writeOutpoint(&bc, &channel.FundingOutpoint); err != nil {
return err
}
configKey := make([]byte, len(chanConfigPrefix)+len(bc.Bytes()))
copy(configKey, chanConfigPrefix)
copy(configKey, bc.Bytes())
return nodeChanBucket.Put(configKey, b.Bytes())
// Otherwise we'll read the next revocation for the remote party which
// is always the last item within the buffer.
return readElements(r, &channel.RemoteNextRevocation)
}
func fetchChanConfigs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
var bc bytes.Buffer
if err := writeOutpoint(&bc, &channel.FundingOutpoint); err != nil {
func deleteOpenChannel(chanBucket *bolt.Bucket, chanPointBytes []byte) error {
if err := chanBucket.Delete(chanInfoKey); err != nil {
return err
}
configKey := make([]byte, len(chanConfigPrefix)+len(bc.Bytes()))
copy(configKey, chanConfigPrefix)
copy(configKey, bc.Bytes())
configBytes := nodeChanBucket.Get(configKey)
if configBytes == nil {
return fmt.Errorf("unable to find channel config for %v: ",
channel.FundingOutpoint)
}
configReader := bytes.NewReader(configBytes)
fetchChanConfig := func() (*ChannelConfig, error) {
cfg := &ChannelConfig{}
err := binary.Read(configReader, byteOrder, &cfg.DustLimit)
if err != nil {
return nil, err
}
err = binary.Read(configReader, byteOrder, &cfg.MaxPendingAmount)
if err != nil {
return nil, err
}
err = binary.Read(configReader, byteOrder, &cfg.ChanReserve)
if err != nil {
return nil, err
}
err = binary.Read(configReader, byteOrder, &cfg.MinHTLC)
if err != nil {
return nil, err
}
err = binary.Read(configReader, byteOrder, &cfg.CsvDelay)
if err != nil {
return nil, err
}
err = binary.Read(configReader, byteOrder, &cfg.MaxAcceptedHtlcs)
if err != nil {
return nil, err
}
var pub [33]byte
readKey := func() (*btcec.PublicKey, error) {
if _, err := io.ReadFull(configReader, pub[:]); err != nil {
return nil, err
}
return btcec.ParsePubKey(pub[:], btcec.S256())
}
cfg.MultiSigKey, err = readKey()
if err != nil {
return nil, err
}
cfg.RevocationBasePoint, err = readKey()
if err != nil {
return nil, err
}
cfg.PaymentBasePoint, err = readKey()
if err != nil {
return nil, err
}
cfg.DelayBasePoint, err = readKey()
if err != nil {
return nil, err
}
return cfg, nil
}
var err error
cfg, err := fetchChanConfig()
err := chanBucket.Delete(append(chanCommitmentKey, byte(0x00)))
if err != nil {
return err
}
channel.LocalChanCfg = *cfg
cfg, err = fetchChanConfig()
if err != nil {
return err
}
channel.RemoteChanCfg = *cfg
return nil
}
func deleteChanConfigs(nodeChanBucket *bolt.Bucket, chanID []byte) error {
configKey := make([]byte, len(chanConfigPrefix)+len(chanID))
copy(configKey, chanConfigPrefix)
copy(configKey, chanID)
return nodeChanBucket.Delete(configKey)
}
func putChanFundingInfo(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
var bc bytes.Buffer
if err := writeOutpoint(&bc, &channel.FundingOutpoint); err != nil {
return err
}
fundTxnKey := make([]byte, len(fundingTxnKey)+bc.Len())
copy(fundTxnKey[:3], fundingTxnKey)
copy(fundTxnKey[3:], bc.Bytes())
var b bytes.Buffer
var boolByte [1]byte
if channel.IsInitiator {
boolByte[0] = 1
} else {
boolByte[0] = 0
}
if err := binary.Write(&b, byteOrder, boolByte[:]); err != nil {
return err
}
// TODO(roasbeef): make first field instead?
if _, err := b.Write([]byte{uint8(channel.ChanType)}); err != nil {
return err
}
if _, err := b.Write(channel.ChainHash[:]); err != nil {
return err
}
var scratch [2]byte
byteOrder.PutUint16(scratch[:], channel.NumConfsRequired)
if _, err := b.Write(scratch[:]); err != nil {
return err
}
return nodeChanBucket.Put(fundTxnKey, b.Bytes())
}
func deleteChanFundingInfo(nodeChanBucket *bolt.Bucket, chanID []byte) error {
fundTxnKey := make([]byte, len(fundingTxnKey)+len(chanID))
copy(fundTxnKey[:3], fundingTxnKey)
copy(fundTxnKey[3:], chanID)
return nodeChanBucket.Delete(fundTxnKey)
}
func fetchChanFundingInfo(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
fundTxnKey := make([]byte, len(fundingTxnKey)+b.Len())
copy(fundTxnKey[:3], fundingTxnKey)
copy(fundTxnKey[3:], b.Bytes())
infoBytes := bytes.NewReader(nodeChanBucket.Get(fundTxnKey))
var err error
var boolByte [1]byte
err = binary.Read(infoBytes, byteOrder, boolByte[:])
if err != nil {
return err
}
if boolByte[0] == 1 {
channel.IsInitiator = true
} else {
channel.IsInitiator = false
}
var chanType [1]byte
err = binary.Read(infoBytes, byteOrder, chanType[:])
if err != nil {
return err
}
channel.ChanType = ChannelType(chanType[0])
err = binary.Read(infoBytes, byteOrder, channel.ChainHash[:])
err = chanBucket.Delete(append(chanCommitmentKey, byte(0x01)))
if err != nil {
return err
}
var scratch [2]byte
if _, err := infoBytes.Read(scratch[:]); err != nil {
return err
}
channel.NumConfsRequired = byteOrder.Uint16(scratch[:])
return nil
}
func putChanRevocationState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
curRevKey := channel.RemoteCurrentRevocation.SerializeCompressed()
if err := wire.WriteVarBytes(&b, 0, curRevKey); err != nil {
if err := chanBucket.Delete(revocationStateKey); err != nil {
return err
}
// TODO(roasbeef): shouldn't be storing on disk, should re-derive as
// needed
if err := channel.RevocationProducer.Encode(&b); err != nil {
return err
}
if err := channel.RevocationStore.Encode(&b); err != nil {
return err
}
var bc bytes.Buffer
if err := writeOutpoint(&bc, &channel.FundingOutpoint); err != nil {
return err
}
// We place the next revocation key at the very end, as under certain
// circumstances (when a channel is initially funded), this value will
// not yet have been set.
//
// TODO(roasbeef): segment the storage?
if channel.RemoteNextRevocation != nil {
nextRevKey := channel.RemoteNextRevocation.SerializeCompressed()
if err := wire.WriteVarBytes(&b, 0, nextRevKey); err != nil {
return err
}
}
revocationKey := make([]byte, len(revocationStateKey)+bc.Len())
copy(revocationKey[:3], revocationStateKey)
copy(revocationKey[3:], bc.Bytes())
return nodeChanBucket.Put(revocationKey, b.Bytes())
}
func deleteChanRevocationState(nodeChanBucket *bolt.Bucket, chanID []byte) error {
revocationKey := make([]byte, len(revocationStateKey)+len(chanID))
copy(revocationKey[:3], revocationStateKey)
copy(revocationKey[3:], chanID)
return nodeChanBucket.Delete(revocationKey)
}
func fetchChanRevocationState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
preimageKey := make([]byte, len(revocationStateKey)+b.Len())
copy(preimageKey[:3], revocationStateKey)
copy(preimageKey[3:], b.Bytes())
reader := bytes.NewReader(nodeChanBucket.Get(preimageKey))
curRevKeyBytes, err := wire.ReadVarBytes(reader, 0, 1000, "")
if err != nil {
return err
}
channel.RemoteCurrentRevocation, err = btcec.ParsePubKey(curRevKeyBytes, btcec.S256())
if err != nil {
return err
}
// TODO(roasbeef): should be rederiving on fly, or encrypting on disk.
var root [32]byte
if _, err := io.ReadFull(reader, root[:]); err != nil {
return err
}
channel.RevocationProducer, err = shachain.NewRevocationProducerFromBytes(root[:])
if err != nil {
return err
}
channel.RevocationStore, err = shachain.NewRevocationStoreFromBytes(reader)
if err != nil {
return err
}
// We'll attempt to see if the remote party's next revocation key is
// currently set, if so then we'll read and deserialize it. Otherwise,
// we can exit early.
if reader.Len() != 0 {
nextRevKeyBytes, err := wire.ReadVarBytes(reader, 0, 1000, "")
if err != nil {
return err
}
channel.RemoteNextRevocation, err = btcec.ParsePubKey(
nextRevKeyBytes, btcec.S256(),
)
if err != nil {
return err
}
if diff := chanBucket.Get(commitDiffKey); diff != nil {
return chanBucket.Delete(commitDiffKey)
}
return nil
}
func putOurMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
scratch := make([]byte, 8)
byteOrder.PutUint64(scratch, channel.OurMessageIndex)
func makeLogKey(updateNum uint64) [8]byte {
var key [8]byte
byteOrder.PutUint64(key[:], updateNum)
return key
}
func appendChannelLogEntry(log *bolt.Bucket,
commit *ChannelCommitment) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
if err := serializeChanCommit(&b, commit); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix, ourIndexPrefix)
copy(keyPrefix[3:], b.Bytes())
return openChanBucket.Put(keyPrefix, scratch)
}
func deleteOurMessageIndex(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix, ourIndexPrefix)
copy(keyPrefix[3:], chanID)
return openChanBucket.Delete(keyPrefix)
}
func fetchOurMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix, ourIndexPrefix)
copy(keyPrefix[3:], b.Bytes())
updateBytes := openChanBucket.Get(keyPrefix)
channel.OurMessageIndex = byteOrder.Uint64(updateBytes)
return nil
}
func putTheirMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
scratch := make([]byte, 8)
byteOrder.PutUint64(scratch, channel.TheirMessageIndex)
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix, theirIndexPrefix)
copy(keyPrefix[3:], b.Bytes())
return openChanBucket.Put(keyPrefix, scratch)
}
func deleteTheirMessageIndex(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix, theirIndexPrefix)
copy(keyPrefix[3:], chanID)
return openChanBucket.Delete(keyPrefix)
}
func fetchTheirMessageIndex(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, &channel.FundingOutpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix, theirIndexPrefix)
copy(keyPrefix[3:], b.Bytes())
updateBytes := openChanBucket.Get(keyPrefix)
channel.TheirMessageIndex = byteOrder.Uint64(updateBytes)
return nil
}
func serializeHTLC(w io.Writer, h *HTLC) error {
if err := wire.WriteVarBytes(w, 0, h.Signature); err != nil {
return err
}
if _, err := w.Write(h.RHash[:]); err != nil {
return err
}
if err := binary.Write(w, byteOrder, h.Amt); err != nil {
return err
}
if err := binary.Write(w, byteOrder, h.RefundTimeout); err != nil {
return err
}
if err := binary.Write(w, byteOrder, h.OutputIndex); err != nil {
return err
}
var boolByte [1]byte
if h.Incoming {
boolByte[0] = 1
} else {
boolByte[0] = 0
}
if err := binary.Write(w, byteOrder, boolByte[:]); err != nil {
return err
}
var onionLength [2]byte
byteOrder.PutUint16(onionLength[:], uint16(len(h.OnionBlob)))
if _, err := w.Write(onionLength[:]); err != nil {
return err
}
if _, err := w.Write(h.OnionBlob); err != nil {
return err
}
if err := binary.Write(w, byteOrder, h.AddLocalInclusionHeight); err != nil {
return err
}
if err := binary.Write(w, byteOrder, h.AddRemoteInclusionHeight); err != nil {
return err
}
if err := binary.Write(w, byteOrder, h.DescriptorIndex); err != nil {
return err
}
return nil
}
func deserializeHTLC(r io.Reader) (*HTLC, error) {
h := &HTLC{}
var err error
h.Signature, err = wire.ReadVarBytes(r, 0, 80, "")
if err != nil {
return nil, err
}
if _, err := io.ReadFull(r, h.RHash[:]); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &h.Amt); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &h.RefundTimeout); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &h.OutputIndex); err != nil {
return nil, err
}
var scratch [1]byte
if err := binary.Read(r, byteOrder, scratch[:]); err != nil {
return nil, err
}
if boolByte[0] == 1 {
h.Incoming = true
} else {
h.Incoming = false
}
var onionLength [2]byte
if _, err := r.Read(onionLength[:]); err != nil {
return nil, err
}
l := byteOrder.Uint16(onionLength[:])
if l != 0 {
h.OnionBlob = make([]byte, l)
if _, err := io.ReadFull(r, h.OnionBlob); err != nil {
return nil, err
}
}
if err := binary.Read(r, byteOrder, &h.AddLocalInclusionHeight); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &h.AddRemoteInclusionHeight); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &h.DescriptorIndex); err != nil {
return nil, err
}
return h, nil
}
func makeHtlcKey(o *wire.OutPoint) [39]byte {
var (
n int
k [39]byte
)
// chk || txid || index
n += copy(k[:], currentHtlcKey)
n += copy(k[n:], o.Hash[:])
var scratch [4]byte
byteOrder.PutUint32(scratch[:], o.Index)
copy(k[n:], scratch[:])
return k
}
func putCurrentHtlcs(nodeChanBucket *bolt.Bucket, htlcs []*HTLC,
o *wire.OutPoint) error {
var b bytes.Buffer
for _, htlc := range htlcs {
if err := serializeHTLC(&b, htlc); err != nil {
return err
}
}
htlcKey := makeHtlcKey(o)
return nodeChanBucket.Put(htlcKey[:], b.Bytes())
}
func fetchCurrentHtlcs(nodeChanBucket *bolt.Bucket,
o *wire.OutPoint) ([]*HTLC, error) {
htlcKey := makeHtlcKey(o)
htlcBytes := nodeChanBucket.Get(htlcKey[:])
if htlcBytes == nil {
return nil, nil
}
// TODO(roasbeef): can preallocate here
var htlcs []*HTLC
htlcReader := bytes.NewReader(htlcBytes)
for htlcReader.Len() != 0 {
htlc, err := deserializeHTLC(htlcReader)
if err != nil {
return nil, err
}
htlcs = append(htlcs, htlc)
}
return htlcs, nil
}
func deleteCurrentHtlcs(nodeChanBucket *bolt.Bucket, o *wire.OutPoint) error {
htlcKey := makeHtlcKey(o)
return nodeChanBucket.Delete(htlcKey[:])
}
func serializeChannelDelta(w io.Writer, delta *ChannelDelta) error {
// TODO(roasbeef): could use compression here to reduce on-disk space.
var scratch [8]byte
byteOrder.PutUint64(scratch[:], uint64(delta.LocalBalance))
if _, err := w.Write(scratch[:]); err != nil {
return err
}
byteOrder.PutUint64(scratch[:], uint64(delta.RemoteBalance))
if _, err := w.Write(scratch[:]); err != nil {
return err
}
byteOrder.PutUint64(scratch[:], delta.UpdateNum)
if _, err := w.Write(scratch[:]); err != nil {
return err
}
numHtlcs := uint64(len(delta.Htlcs))
if err := wire.WriteVarInt(w, 0, numHtlcs); err != nil {
return err
}
for _, htlc := range delta.Htlcs {
if err := serializeHTLC(w, htlc); err != nil {
return err
}
}
byteOrder.PutUint64(scratch[:], uint64(delta.CommitFee))
if _, err := w.Write(scratch[:]); err != nil {
return err
}
byteOrder.PutUint64(scratch[:], uint64(delta.FeePerKw))
if _, err := w.Write(scratch[:]); err != nil {
return err
}
return nil
}
func deserializeChannelDelta(r io.Reader) (*ChannelDelta, error) {
var (
err error
scratch [8]byte
)
delta := &ChannelDelta{}
if _, err := r.Read(scratch[:]); err != nil {
return nil, err
}
delta.LocalBalance = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:]))
if _, err := r.Read(scratch[:]); err != nil {
return nil, err
}
delta.RemoteBalance = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:]))
if _, err := r.Read(scratch[:]); err != nil {
return nil, err
}
delta.UpdateNum = byteOrder.Uint64(scratch[:])
numHtlcs, err := wire.ReadVarInt(r, 0)
if err != nil {
return nil, err
}
delta.Htlcs = make([]*HTLC, numHtlcs)
for i := uint64(0); i < numHtlcs; i++ {
htlc, err := deserializeHTLC(r)
if err != nil {
return nil, err
}
delta.Htlcs[i] = htlc
}
if _, err := r.Read(scratch[:]); err != nil {
return nil, err
}
delta.CommitFee = btcutil.Amount(byteOrder.Uint64(scratch[:]))
if _, err := r.Read(scratch[:]); err != nil {
return nil, err
}
delta.FeePerKw = btcutil.Amount(byteOrder.Uint64(scratch[:]))
return delta, nil
}
func makeLogKey(o *wire.OutPoint, updateNum uint64) [44]byte {
var (
scratch [8]byte
n int
// txid (32) || index (4) || update_num (8)
// 32 + 4 + 8 = 44
k [44]byte
)
n += copy(k[:], o.Hash[:])
byteOrder.PutUint32(scratch[:4], o.Index)
n += copy(k[n:], scratch[:4])
byteOrder.PutUint64(scratch[:], updateNum)
copy(k[n:], scratch[:])
return k
}
func appendChannelLogEntry(log *bolt.Bucket, delta *ChannelDelta,
chanPoint *wire.OutPoint) error {
var b bytes.Buffer
if err := serializeChannelDelta(&b, delta); err != nil {
return err
}
logEntrykey := makeLogKey(chanPoint, delta.UpdateNum)
logEntrykey := makeLogKey(commit.CommitHeight)
return log.Put(logEntrykey[:], b.Bytes())
}
func fetchChannelLogEntry(log *bolt.Bucket, chanPoint *wire.OutPoint,
updateNum uint64) (*ChannelDelta, error) {
func fetchChannelLogEntry(log *bolt.Bucket,
updateNum uint64) (ChannelCommitment, error) {
logEntrykey := makeLogKey(chanPoint, updateNum)
deltaBytes := log.Get(logEntrykey[:])
if deltaBytes == nil {
return nil, fmt.Errorf("log entry not found")
logEntrykey := makeLogKey(updateNum)
commitBytes := log.Get(logEntrykey[:])
if commitBytes == nil {
return ChannelCommitment{}, fmt.Errorf("log entry not found")
}
deltaReader := bytes.NewReader(deltaBytes)
return deserializeChannelDelta(deltaReader)
commitReader := bytes.NewReader(commitBytes)
return deserializeChanCommit(commitReader)
}
func wipeChannelLogEntries(log *bolt.Bucket, o *wire.OutPoint) error {
var (
n int
logPrefix [32 + 4]byte
scratch [4]byte
)
func wipeChannelLogEntries(log *bolt.Bucket) error {
// TODO(roasbeef): comment
// First we'll construct a key prefix that we'll use to scan through
// and delete all the log entries related to this channel. The format
// for log entries within the database is: txid || index || update_num.
// We'll construct a prefix key with the first two thirds of the full
// key to scan with and delete all entries.
n += copy(logPrefix[:], o.Hash[:])
byteOrder.PutUint32(scratch[:], o.Index)
copy(logPrefix[n:], scratch[:])
// With the prefix constructed, scan through the log bucket from the
// starting point of the log entries for this channel. We'll keep
// deleting keys until the prefix no longer matches.
logCursor := log.Cursor()
for logKey, _ := logCursor.Seek(logPrefix[:]); bytes.HasPrefix(logKey, logPrefix[:]); logKey, _ = logCursor.Next() {
if err := log.Delete(logKey); err != nil {
for k, _ := logCursor.First(); k != nil; k, _ = logCursor.Next() {
if err := logCursor.Delete(); err != nil {
return err
}
}
return nil
}
func writeOutpoint(w io.Writer, o *wire.OutPoint) error {
// TODO(roasbeef): make all scratch buffers on the stack
scratch := make([]byte, 4)
// TODO(roasbeef): write raw 32 bytes instead of wasting the extra
// byte.
if err := wire.WriteVarBytes(w, 0, o.Hash[:]); err != nil {
return err
}
byteOrder.PutUint32(scratch, o.Index)
_, err := w.Write(scratch)
return err
}
func readOutpoint(r io.Reader, o *wire.OutPoint) error {
scratch := make([]byte, 4)
txid, err := wire.ReadVarBytes(r, 0, 32, "prevout")
if err != nil {
return err
}
copy(o.Hash[:], txid)
if _, err := r.Read(scratch); err != nil {
return err
}
o.Index = byteOrder.Uint32(scratch)
return nil
}