From 3e02ea11efdabae43c216bd45baadd6ac92230b1 Mon Sep 17 00:00:00 2001 From: bryanvu Date: Sun, 22 Jan 2017 23:31:01 -0800 Subject: [PATCH] channeldb: added isPending flag and queries In order to facilitate persistence during the funding process, added the isPending flag to channels so that when the daemon restarts, we can properly re-initialize the chain notifier and update the state of channels that were going through the funding process. --- channeldb/channel.go | 78 +++++++++++++++++++++++++++++++++++---- channeldb/channel_test.go | 49 ++++++++++++++++++++++++ channeldb/db.go | 61 ++++++++++++++++++++++++++++-- fundingmanager.go | 58 +++++++++++++++++++++++++---- lnwallet/reservation.go | 1 + lnwallet/wallet.go | 4 +- rpcserver.go | 11 +++++- 7 files changed, 239 insertions(+), 23 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index b513c48a..6c8664a8 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -67,6 +67,7 @@ var ( satSentPrefix = []byte("ssp") satReceivedPrefix = []byte("srp") netFeesPrefix = []byte("ntp") + isPendingPrefix = []byte("pdg") // chanIDKey stores the node, and channelID for an active channel. chanIDKey = []byte("cik") @@ -194,6 +195,10 @@ type OpenChannel struct { // negotiate fees, or close the channel. IsInitiator bool + // IsPending indicates whether a channel's funding transaction has been + // confirmed. + IsPending bool + // FundingOutpoint is the outpoint of the final funding transaction. FundingOutpoint *wire.OutPoint @@ -318,16 +323,18 @@ func (c *OpenChannel) fullSync(tx *bolt.Tx) error { return putOpenChannel(chanBucket, nodeChanBucket, c) } -// FullSyncWithAddr is identical to the FullSync function in that it writes the -// full channel state to disk. Additionally, this function also creates a -// LinkNode relationship between this newly created channel and an existing of -// new LinkNode instance. Syncing with this method rather than FullSync is -// required in order to allow listing all channels in the database globally, or -// according to the LinkNode they were created with. +// SyncPending writes the contents of the channel to the database while it's in +// the pending (waiting for funding confirmation) state. The IsPending flag +// will be set to true. When the channel's funding transaction is confirmed, +// the channel should be marked as "open" and the IsPending flag set to false. +// Note that this function also creates a LinkNode relationship between this +// newly created channel and a new LinkNode instance. This allows listing all +// channels in the database globally, or according to the LinkNode they were +// created with. // // TODO(roasbeef): addr param should eventually be a lnwire.NetAddress type // that includes service bits. -func (c *OpenChannel) FullSyncWithAddr(addr *net.TCPAddr) error { +func (c *OpenChannel) SyncPending(addr *net.TCPAddr) error { c.Lock() defer c.Unlock() @@ -732,6 +739,9 @@ func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, if err := putChanAmountsTransferred(openChanBucket, channel); err != nil { return err } + if err := putChanIsPending(openChanBucket, channel); err != nil { + return err + } // Next, write out the fields of the channel update less frequently. if err := putChannelIDs(nodeChanBucket, channel); err != nil { @@ -816,6 +826,9 @@ func fetchOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, if err = fetchChanAmountsTransferred(openChanBucket, channel); err != nil { return nil, fmt.Errorf("unable to read sat transferred: %v", err) } + if err = fetchChanIsPending(openChanBucket, channel); err != nil { + return nil, err + } return channel, nil } @@ -843,6 +856,9 @@ func deleteOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, if err := deleteChanOurDustLimit(openChanBucket, channelID); err != nil { return err } + if err := deleteChanIsPending(openChanBucket, channelID); err != nil { + return err + } // Finally, delete all the fields directly within the node's channel // bucket. @@ -1159,6 +1175,54 @@ func fetchChanAmountsTransferred(openChanBucket *bolt.Bucket, channel *OpenChann return nil } +func putChanIsPending(openChanBucket *bolt.Bucket, channel *OpenChannel) error { + scratch := make([]byte, 2) + + var b bytes.Buffer + if err := writeOutpoint(&b, channel.ChanID); err != nil { + return err + } + + keyPrefix := make([]byte, 3+b.Len()) + copy(keyPrefix[3:], b.Bytes()) + copy(keyPrefix[:3], isPendingPrefix) + + if channel.IsPending { + byteOrder.PutUint16(scratch, uint16(1)) + return openChanBucket.Put(keyPrefix, scratch) + } + + byteOrder.PutUint16(scratch, uint16(0)) + return openChanBucket.Put(keyPrefix, scratch) +} + +func deleteChanIsPending(openChanBucket *bolt.Bucket, chanID []byte) error { + keyPrefix := make([]byte, 3+len(chanID)) + copy(keyPrefix[3:], chanID) + copy(keyPrefix[:3], isPendingPrefix) + return openChanBucket.Delete(keyPrefix) +} + +func fetchChanIsPending(openChanBucket *bolt.Bucket, channel *OpenChannel) error { + var b bytes.Buffer + if err := writeOutpoint(&b, channel.ChanID); err != nil { + return err + } + + keyPrefix := make([]byte, 3+b.Len()) + copy(keyPrefix[3:], b.Bytes()) + copy(keyPrefix[:3], isPendingPrefix) + + isPending := byteOrder.Uint16(openChanBucket.Get(keyPrefix)) + if isPending == 1 { + channel.IsPending = true + } else { + channel.IsPending = false + } + + return nil +} + func putChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { // TODO(roasbeef): just pass in chanID everywhere for puts var b bytes.Buffer diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index 166ca7e5..fe937647 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -3,6 +3,7 @@ package channeldb import ( "bytes" "io/ioutil" + "net" "os" "reflect" "testing" @@ -136,6 +137,7 @@ func createTestChannelState(cdb *DB) (*OpenChannel, error) { return &OpenChannel{ IsInitiator: true, + IsPending: true, ChanType: SingleFunder, IdentityPub: pubKey, ChanID: id, @@ -583,3 +585,50 @@ func TestChannelStateTransition(t *testing.T) { t.Fatal("revocation log search should've failed") } } + +func TestFetchPendingChannels(t *testing.T) { + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("uanble to make test database: %v", err) + } + defer cleanUp() + + // Create first test channel state + state, err := createTestChannelState(cdb) + if err != nil { + t.Fatalf("unable to create channel state: %v", err) + } + + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18555, + } + + if err := state.SyncPending(addr); err != nil { + t.Fatalf("unable to save and serialize channel state: %v", err) + } + + pendingChannels, err := cdb.FetchPendingChannels() + if err != nil { + t.Fatalf("unable to list pending channels: %v", err) + } + + if len(pendingChannels) != 1 { + t.Fatalf("incorrect number of pending channels: expecting %v,"+ + "got %v", 1, len(pendingChannels)) + } + + if err := cdb.MarkChannelAsOpen(pendingChannels[0].ChanID); err != nil { + t.Fatalf("unable to mark channel as open: %v", err) + } + + pendingChannels, err = cdb.FetchPendingChannels() + if err != nil { + t.Fatalf("unable to list pending channels: %v", err) + } + + if len(pendingChannels) != 0 { + t.Fatalf("incorrect number of pending channels: expecting %v,"+ + "got %v", 0, len(pendingChannels)) + } +} diff --git a/channeldb/db.go b/channeldb/db.go index 95b95f67..2682a2c6 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -295,9 +295,23 @@ func (d *DB) fetchNodeChannels(openChanBucket, } // FetchAllChannels attempts to retrieve all open channels currently stored -// within the database. If no active channels exist within the network, then -// ErrNoActiveChannels is returned. +// within the database. func (d *DB) FetchAllChannels() ([]*OpenChannel, error) { + return fetchChannels(d, false) +} + +// FetchPendingChannels will return channels that have completed the process +// of generating and broadcasting funding transactions, but whose funding +// transactions have yet to be confirmed on the blockchain. +func (d *DB) FetchPendingChannels() ([]*OpenChannel, error) { + return fetchChannels(d, true) +} + +// fetchChannels attempts to retrieve channels currently stored in the +// database. The pendingOnly parameter determines whether only pending +// channels will be returned. If no active channels exist within the network, +// then ErrNoActiveChannels is returned. +func fetchChannels(d *DB, pendingOnly bool) ([]*OpenChannel, error) { var channels []*OpenChannel err := d.View(func(tx *bolt.Tx) error { @@ -330,8 +344,15 @@ func (d *DB) FetchAllChannels() ([]*OpenChannel, error) { return fmt.Errorf("unable to read channel for "+ "node_key=%x: %v", k, err) } - - channels = append(channels, nodeChannels...) + if pendingOnly { + for _, channel := range nodeChannels { + if channel.IsPending == true { + channels = append(channels, channel) + } + } + } else { + channels = append(channels, nodeChannels...) + } return nil }) }) @@ -339,6 +360,38 @@ func (d *DB) FetchAllChannels() ([]*OpenChannel, error) { return channels, err } +// MarkChannelAsOpen records the finalization of the funding process and marks +// a channel as available for use. +func (d *DB) MarkChannelAsOpen(outpoint *wire.OutPoint) error { + err := d.Update(func(tx *bolt.Tx) error { + openChanBucket := tx.Bucket(openChannelBucket) + if openChanBucket == nil { + return ErrNoActiveChannels + } + + // Generate the database key, which will consist of the IsPending + // prefix followed by the channel's outpoint. + var b bytes.Buffer + if err := writeOutpoint(&b, outpoint); err != nil { + return err + } + keyPrefix := make([]byte, 3+b.Len()) + copy(keyPrefix[3:], b.Bytes()) + copy(keyPrefix[:3], isPendingPrefix) + + // For the database value, store a zero, since the channel is no + // longer pending. + scratch := make([]byte, 2) + byteOrder.PutUint16(scratch, uint16(0)) + return openChanBucket.Put(keyPrefix, scratch) + }) + if err != nil { + return err + } + + return nil +} + // syncVersions function is used for safe db version synchronization. It applies // migration functions to the current database and recovers the previous // state of db if at least one error/panic appeared during migration. diff --git a/fundingmanager.go b/fundingmanager.go index a47e71d2..bf0da77b 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -241,17 +241,22 @@ func (f *fundingManager) Stop() error { type numPendingReq struct { resp chan uint32 + err chan error } // NumPendingChannels returns the number of pending channels currently // progressing through the reservation workflow. -func (f *fundingManager) NumPendingChannels() uint32 { - resp := make(chan uint32, 1) +func (f *fundingManager) NumPendingChannels() (uint32, error) { + respChan := make(chan uint32, 1) + errChan := make(chan error) - req := &numPendingReq{resp} + req := &numPendingReq{ + resp: respChan, + err: errChan, + } f.queries <- req - return <-resp + return <-respChan, <-errChan } type pendingChannel struct { @@ -264,17 +269,22 @@ type pendingChannel struct { type pendingChansReq struct { resp chan []*pendingChannel + err chan error } // PendingChannels returns a slice describing all the channels which are // currently pending at the last state of the funding workflow. -func (f *fundingManager) PendingChannels() []*pendingChannel { - resp := make(chan []*pendingChannel, 1) +func (f *fundingManager) PendingChannels() ([]*pendingChannel, error) { + respChan := make(chan []*pendingChannel, 1) + errChan := make(chan error) - req := &pendingChansReq{resp} + req := &pendingChansReq{ + resp: respChan, + err: errChan, + } f.queries <- req - return <-resp + return <-respChan, <-errChan } // reservationCoordinator is the primary goroutine tasked with progressing the @@ -322,7 +332,18 @@ func (f *fundingManager) handleNumPending(msg *numPendingReq) { for _, peerChannels := range f.activeReservations { numPending += uint32(len(peerChannels)) } + + dbPendingChannels, err := f.cfg.Wallet.ChannelDB.FetchPendingChannels() + if err != nil { + close(msg.resp) + msg.err <- err + return + } + + numPending = numPending + uint32(len(dbPendingChannels)) + msg.resp <- numPending + msg.err <- nil } // handlePendingChannels responds to a request for details concerning all @@ -346,7 +367,28 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) { pendingChannels = append(pendingChannels, pendingChan) } } + + dbPendingChannels, err := f.cfg.Wallet.ChannelDB.FetchPendingChannels() + if err != nil { + msg.resp <- nil + msg.err <- err + return + } + + for _, dbPendingChan := range dbPendingChannels { + pendingChan := &pendingChannel{ + identityPub: dbPendingChan.IdentityPub, + channelPoint: dbPendingChan.ChanID, + capacity: dbPendingChan.Capacity, + localBalance: dbPendingChan.OurBalance, + remoteBalance: dbPendingChan.TheirBalance, + } + + pendingChannels = append(pendingChannels, pendingChan) + } + msg.resp <- pendingChannels + msg.err <- nil } // processFundingRequest sends a message to the fundingManager allowing it to diff --git a/lnwallet/reservation.go b/lnwallet/reservation.go index 0a9801d8..ff322499 100644 --- a/lnwallet/reservation.go +++ b/lnwallet/reservation.go @@ -204,6 +204,7 @@ func NewChannelReservation(capacity, fundingAmt btcutil.Amount, minFeeRate btcut partialState: &channeldb.OpenChannel{ Capacity: capacity, IsInitiator: initiator, + IsPending: true, ChanType: chanType, OurBalance: ourBalance, TheirBalance: theirBalance, diff --git a/lnwallet/wallet.go b/lnwallet/wallet.go index b3f9fe36..892503d5 100644 --- a/lnwallet/wallet.go +++ b/lnwallet/wallet.go @@ -1066,7 +1066,7 @@ func (l *LightningWallet) handleFundingCounterPartySigs(msg *addCounterPartySigs // which will be used for the lifetime of this channel. // TODO(roasbeef): revisit faul-tolerance of this flow nodeAddr := res.nodeAddr - if err := res.partialState.FullSyncWithAddr(nodeAddr); err != nil { + if err := res.partialState.SyncPending(nodeAddr); err != nil { msg.err <- err return } @@ -1220,7 +1220,7 @@ func (l *LightningWallet) handleChannelOpen(req *channelOpenMsg) { // Add the complete funding transaction to the DB, in it's open bucket // which will be used for the lifetime of this channel. - if err := res.partialState.FullSyncWithAddr(res.nodeAddr); err != nil { + if err := res.partialState.SyncPending(res.nodeAddr); err != nil { req.err <- err res.chanOpen <- nil return diff --git a/rpcserver.go b/rpcserver.go index 5001cec3..adb7387f 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -631,7 +631,11 @@ func (r *rpcServer) GetInfo(ctx context.Context, activeChannels += uint32(len(serverPeer.ChannelSnapshots())) } - pendingChannels := r.server.fundingMgr.NumPendingChannels() + pendingChannels, err := r.server.fundingMgr.NumPendingChannels() + if err != nil { + return nil, err + } + idPub := r.server.identityPriv.PubKey().SerializeCompressed() bestHash, bestHeight, err := r.server.bio.GetBestBlock() @@ -760,7 +764,10 @@ func (r *rpcServer) PendingChannels(ctx context.Context, var pendingChannels []*lnrpc.PendingChannelResponse_PendingChannel if includeOpen { - pendingOpenChans := r.server.fundingMgr.PendingChannels() + pendingOpenChans, err := r.server.fundingMgr.PendingChannels() + if err != nil { + return nil, err + } for _, pendingOpen := range pendingOpenChans { channelPointStr := "" if pendingOpen.channelPoint != nil {