channeldb: added isPending flag and queries

In order to facilitate persistence during the funding process, added
the isPending flag to channels so that when the daemon restarts, we can
properly re-initialize the chain notifier and update the state of
channels that were going through the funding process.
This commit is contained in:
bryanvu 2017-01-22 23:31:01 -08:00 committed by Olaoluwa Osuntokun
parent 9034f85b88
commit 3e02ea11ef
7 changed files with 239 additions and 23 deletions

@ -67,6 +67,7 @@ var (
satSentPrefix = []byte("ssp")
satReceivedPrefix = []byte("srp")
netFeesPrefix = []byte("ntp")
isPendingPrefix = []byte("pdg")
// chanIDKey stores the node, and channelID for an active channel.
chanIDKey = []byte("cik")
@ -194,6 +195,10 @@ type OpenChannel struct {
// negotiate fees, or close the channel.
IsInitiator bool
// IsPending indicates whether a channel's funding transaction has been
// confirmed.
IsPending bool
// FundingOutpoint is the outpoint of the final funding transaction.
FundingOutpoint *wire.OutPoint
@ -318,16 +323,18 @@ func (c *OpenChannel) fullSync(tx *bolt.Tx) error {
return putOpenChannel(chanBucket, nodeChanBucket, c)
}
// FullSyncWithAddr is identical to the FullSync function in that it writes the
// full channel state to disk. Additionally, this function also creates a
// LinkNode relationship between this newly created channel and an existing of
// new LinkNode instance. Syncing with this method rather than FullSync is
// required in order to allow listing all channels in the database globally, or
// according to the LinkNode they were created with.
// SyncPending writes the contents of the channel to the database while it's in
// the pending (waiting for funding confirmation) state. The IsPending flag
// will be set to true. When the channel's funding transaction is confirmed,
// the channel should be marked as "open" and the IsPending flag set to false.
// Note that this function also creates a LinkNode relationship between this
// newly created channel and a new LinkNode instance. This allows listing all
// channels in the database globally, or according to the LinkNode they were
// created with.
//
// TODO(roasbeef): addr param should eventually be a lnwire.NetAddress type
// that includes service bits.
func (c *OpenChannel) FullSyncWithAddr(addr *net.TCPAddr) error {
func (c *OpenChannel) SyncPending(addr *net.TCPAddr) error {
c.Lock()
defer c.Unlock()
@ -732,6 +739,9 @@ func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
if err := putChanAmountsTransferred(openChanBucket, channel); err != nil {
return err
}
if err := putChanIsPending(openChanBucket, channel); err != nil {
return err
}
// Next, write out the fields of the channel update less frequently.
if err := putChannelIDs(nodeChanBucket, channel); err != nil {
@ -816,6 +826,9 @@ func fetchOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
if err = fetchChanAmountsTransferred(openChanBucket, channel); err != nil {
return nil, fmt.Errorf("unable to read sat transferred: %v", err)
}
if err = fetchChanIsPending(openChanBucket, channel); err != nil {
return nil, err
}
return channel, nil
}
@ -843,6 +856,9 @@ func deleteOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
if err := deleteChanOurDustLimit(openChanBucket, channelID); err != nil {
return err
}
if err := deleteChanIsPending(openChanBucket, channelID); err != nil {
return err
}
// Finally, delete all the fields directly within the node's channel
// bucket.
@ -1159,6 +1175,54 @@ func fetchChanAmountsTransferred(openChanBucket *bolt.Bucket, channel *OpenChann
return nil
}
func putChanIsPending(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
scratch := make([]byte, 2)
var b bytes.Buffer
if err := writeOutpoint(&b, channel.ChanID); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix[3:], b.Bytes())
copy(keyPrefix[:3], isPendingPrefix)
if channel.IsPending {
byteOrder.PutUint16(scratch, uint16(1))
return openChanBucket.Put(keyPrefix, scratch)
}
byteOrder.PutUint16(scratch, uint16(0))
return openChanBucket.Put(keyPrefix, scratch)
}
func deleteChanIsPending(openChanBucket *bolt.Bucket, chanID []byte) error {
keyPrefix := make([]byte, 3+len(chanID))
copy(keyPrefix[3:], chanID)
copy(keyPrefix[:3], isPendingPrefix)
return openChanBucket.Delete(keyPrefix)
}
func fetchChanIsPending(openChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, channel.ChanID); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix[3:], b.Bytes())
copy(keyPrefix[:3], isPendingPrefix)
isPending := byteOrder.Uint16(openChanBucket.Get(keyPrefix))
if isPending == 1 {
channel.IsPending = true
} else {
channel.IsPending = false
}
return nil
}
func putChannelIDs(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
// TODO(roasbeef): just pass in chanID everywhere for puts
var b bytes.Buffer

@ -3,6 +3,7 @@ package channeldb
import (
"bytes"
"io/ioutil"
"net"
"os"
"reflect"
"testing"
@ -136,6 +137,7 @@ func createTestChannelState(cdb *DB) (*OpenChannel, error) {
return &OpenChannel{
IsInitiator: true,
IsPending: true,
ChanType: SingleFunder,
IdentityPub: pubKey,
ChanID: id,
@ -583,3 +585,50 @@ func TestChannelStateTransition(t *testing.T) {
t.Fatal("revocation log search should've failed")
}
}
func TestFetchPendingChannels(t *testing.T) {
cdb, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("uanble to make test database: %v", err)
}
defer cleanUp()
// Create first test channel state
state, err := createTestChannelState(cdb)
if err != nil {
t.Fatalf("unable to create channel state: %v", err)
}
addr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18555,
}
if err := state.SyncPending(addr); err != nil {
t.Fatalf("unable to save and serialize channel state: %v", err)
}
pendingChannels, err := cdb.FetchPendingChannels()
if err != nil {
t.Fatalf("unable to list pending channels: %v", err)
}
if len(pendingChannels) != 1 {
t.Fatalf("incorrect number of pending channels: expecting %v,"+
"got %v", 1, len(pendingChannels))
}
if err := cdb.MarkChannelAsOpen(pendingChannels[0].ChanID); err != nil {
t.Fatalf("unable to mark channel as open: %v", err)
}
pendingChannels, err = cdb.FetchPendingChannels()
if err != nil {
t.Fatalf("unable to list pending channels: %v", err)
}
if len(pendingChannels) != 0 {
t.Fatalf("incorrect number of pending channels: expecting %v,"+
"got %v", 0, len(pendingChannels))
}
}

@ -295,9 +295,23 @@ func (d *DB) fetchNodeChannels(openChanBucket,
}
// FetchAllChannels attempts to retrieve all open channels currently stored
// within the database. If no active channels exist within the network, then
// ErrNoActiveChannels is returned.
// within the database.
func (d *DB) FetchAllChannels() ([]*OpenChannel, error) {
return fetchChannels(d, false)
}
// FetchPendingChannels will return channels that have completed the process
// of generating and broadcasting funding transactions, but whose funding
// transactions have yet to be confirmed on the blockchain.
func (d *DB) FetchPendingChannels() ([]*OpenChannel, error) {
return fetchChannels(d, true)
}
// fetchChannels attempts to retrieve channels currently stored in the
// database. The pendingOnly parameter determines whether only pending
// channels will be returned. If no active channels exist within the network,
// then ErrNoActiveChannels is returned.
func fetchChannels(d *DB, pendingOnly bool) ([]*OpenChannel, error) {
var channels []*OpenChannel
err := d.View(func(tx *bolt.Tx) error {
@ -330,8 +344,15 @@ func (d *DB) FetchAllChannels() ([]*OpenChannel, error) {
return fmt.Errorf("unable to read channel for "+
"node_key=%x: %v", k, err)
}
channels = append(channels, nodeChannels...)
if pendingOnly {
for _, channel := range nodeChannels {
if channel.IsPending == true {
channels = append(channels, channel)
}
}
} else {
channels = append(channels, nodeChannels...)
}
return nil
})
})
@ -339,6 +360,38 @@ func (d *DB) FetchAllChannels() ([]*OpenChannel, error) {
return channels, err
}
// MarkChannelAsOpen records the finalization of the funding process and marks
// a channel as available for use.
func (d *DB) MarkChannelAsOpen(outpoint *wire.OutPoint) error {
err := d.Update(func(tx *bolt.Tx) error {
openChanBucket := tx.Bucket(openChannelBucket)
if openChanBucket == nil {
return ErrNoActiveChannels
}
// Generate the database key, which will consist of the IsPending
// prefix followed by the channel's outpoint.
var b bytes.Buffer
if err := writeOutpoint(&b, outpoint); err != nil {
return err
}
keyPrefix := make([]byte, 3+b.Len())
copy(keyPrefix[3:], b.Bytes())
copy(keyPrefix[:3], isPendingPrefix)
// For the database value, store a zero, since the channel is no
// longer pending.
scratch := make([]byte, 2)
byteOrder.PutUint16(scratch, uint16(0))
return openChanBucket.Put(keyPrefix, scratch)
})
if err != nil {
return err
}
return nil
}
// syncVersions function is used for safe db version synchronization. It applies
// migration functions to the current database and recovers the previous
// state of db if at least one error/panic appeared during migration.

@ -241,17 +241,22 @@ func (f *fundingManager) Stop() error {
type numPendingReq struct {
resp chan uint32
err chan error
}
// NumPendingChannels returns the number of pending channels currently
// progressing through the reservation workflow.
func (f *fundingManager) NumPendingChannels() uint32 {
resp := make(chan uint32, 1)
func (f *fundingManager) NumPendingChannels() (uint32, error) {
respChan := make(chan uint32, 1)
errChan := make(chan error)
req := &numPendingReq{resp}
req := &numPendingReq{
resp: respChan,
err: errChan,
}
f.queries <- req
return <-resp
return <-respChan, <-errChan
}
type pendingChannel struct {
@ -264,17 +269,22 @@ type pendingChannel struct {
type pendingChansReq struct {
resp chan []*pendingChannel
err chan error
}
// PendingChannels returns a slice describing all the channels which are
// currently pending at the last state of the funding workflow.
func (f *fundingManager) PendingChannels() []*pendingChannel {
resp := make(chan []*pendingChannel, 1)
func (f *fundingManager) PendingChannels() ([]*pendingChannel, error) {
respChan := make(chan []*pendingChannel, 1)
errChan := make(chan error)
req := &pendingChansReq{resp}
req := &pendingChansReq{
resp: respChan,
err: errChan,
}
f.queries <- req
return <-resp
return <-respChan, <-errChan
}
// reservationCoordinator is the primary goroutine tasked with progressing the
@ -322,7 +332,18 @@ func (f *fundingManager) handleNumPending(msg *numPendingReq) {
for _, peerChannels := range f.activeReservations {
numPending += uint32(len(peerChannels))
}
dbPendingChannels, err := f.cfg.Wallet.ChannelDB.FetchPendingChannels()
if err != nil {
close(msg.resp)
msg.err <- err
return
}
numPending = numPending + uint32(len(dbPendingChannels))
msg.resp <- numPending
msg.err <- nil
}
// handlePendingChannels responds to a request for details concerning all
@ -346,7 +367,28 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) {
pendingChannels = append(pendingChannels, pendingChan)
}
}
dbPendingChannels, err := f.cfg.Wallet.ChannelDB.FetchPendingChannels()
if err != nil {
msg.resp <- nil
msg.err <- err
return
}
for _, dbPendingChan := range dbPendingChannels {
pendingChan := &pendingChannel{
identityPub: dbPendingChan.IdentityPub,
channelPoint: dbPendingChan.ChanID,
capacity: dbPendingChan.Capacity,
localBalance: dbPendingChan.OurBalance,
remoteBalance: dbPendingChan.TheirBalance,
}
pendingChannels = append(pendingChannels, pendingChan)
}
msg.resp <- pendingChannels
msg.err <- nil
}
// processFundingRequest sends a message to the fundingManager allowing it to

@ -204,6 +204,7 @@ func NewChannelReservation(capacity, fundingAmt btcutil.Amount, minFeeRate btcut
partialState: &channeldb.OpenChannel{
Capacity: capacity,
IsInitiator: initiator,
IsPending: true,
ChanType: chanType,
OurBalance: ourBalance,
TheirBalance: theirBalance,

@ -1066,7 +1066,7 @@ func (l *LightningWallet) handleFundingCounterPartySigs(msg *addCounterPartySigs
// which will be used for the lifetime of this channel.
// TODO(roasbeef): revisit faul-tolerance of this flow
nodeAddr := res.nodeAddr
if err := res.partialState.FullSyncWithAddr(nodeAddr); err != nil {
if err := res.partialState.SyncPending(nodeAddr); err != nil {
msg.err <- err
return
}
@ -1220,7 +1220,7 @@ func (l *LightningWallet) handleChannelOpen(req *channelOpenMsg) {
// Add the complete funding transaction to the DB, in it's open bucket
// which will be used for the lifetime of this channel.
if err := res.partialState.FullSyncWithAddr(res.nodeAddr); err != nil {
if err := res.partialState.SyncPending(res.nodeAddr); err != nil {
req.err <- err
res.chanOpen <- nil
return

@ -631,7 +631,11 @@ func (r *rpcServer) GetInfo(ctx context.Context,
activeChannels += uint32(len(serverPeer.ChannelSnapshots()))
}
pendingChannels := r.server.fundingMgr.NumPendingChannels()
pendingChannels, err := r.server.fundingMgr.NumPendingChannels()
if err != nil {
return nil, err
}
idPub := r.server.identityPriv.PubKey().SerializeCompressed()
bestHash, bestHeight, err := r.server.bio.GetBestBlock()
@ -760,7 +764,10 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
var pendingChannels []*lnrpc.PendingChannelResponse_PendingChannel
if includeOpen {
pendingOpenChans := r.server.fundingMgr.PendingChannels()
pendingOpenChans, err := r.server.fundingMgr.PendingChannels()
if err != nil {
return nil, err
}
for _, pendingOpen := range pendingOpenChans {
channelPointStr := "<non initialized yet>"
if pendingOpen.channelPoint != nil {