Merge pull request #3721 from Roasbeef/complete-abandon-channel

rpc+cnct: update AbandonChannel to also remove cnct and channel graph state
This commit is contained in:
Olaoluwa Osuntokun 2019-11-20 20:00:41 -08:00 committed by GitHub
commit 1ad681782e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 375 additions and 54 deletions

@ -1094,6 +1094,54 @@ func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) {
return dedupedAddrs, nil return dedupedAddrs, nil
} }
// AbandonChannel attempts to remove the target channel from the open channel
// database. If the channel was already removed (has a closed channel entry),
// then we'll return a nil error. Otherwise, we'll insert a new close summary
// into the database.
func (d *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error {
// With the chanPoint constructed, we'll attempt to find the target
// channel in the database. If we can't find the channel, then we'll
// return the error back to the caller.
dbChan, err := d.FetchChannel(*chanPoint)
switch {
// If the channel wasn't found, then it's possible that it was already
// abandoned from the database.
case err == ErrChannelNotFound:
_, closedErr := d.FetchClosedChannel(chanPoint)
if closedErr != nil {
return closedErr
}
// If the channel was already closed, then we don't return an
// error as we'd like fro this step to be repeatable.
return nil
case err != nil:
return err
}
// Now that we've found the channel, we'll populate a close summary for
// the channel, so we can store as much information for this abounded
// channel as possible. We also ensure that we set Pending to false, to
// indicate that this channel has been "fully" closed.
summary := &ChannelCloseSummary{
CloseType: Abandoned,
ChanPoint: *chanPoint,
ChainHash: dbChan.ChainHash,
CloseHeight: bestHeight,
RemotePub: dbChan.IdentityPub,
Capacity: dbChan.Capacity,
SettledBalance: dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
ShortChanID: dbChan.ShortChanID(),
RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
RemoteNextRevocation: dbChan.RemoteNextRevocation,
LocalChanConfig: dbChan.LocalChanCfg,
}
// Finally, we'll close the channel in the DB, and return back to the
// caller.
return dbChan.CloseChannel(summary)
}
// syncVersions function is used for safe db version synchronization. It // syncVersions function is used for safe db version synchronization. It
// applies migration functions to the current database and recovers the // applies migration functions to the current database and recovers the
// previous state of db if at least one error/panic appeared during migration. // previous state of db if at least one error/panic appeared during migration.

@ -469,3 +469,67 @@ func TestRestoreChannelShells(t *testing.T) {
t.Fatalf("only a single edge should be inserted: %v", err) t.Fatalf("only a single edge should be inserted: %v", err)
} }
} }
// TestAbandonChannel tests that the AbandonChannel method is able to properly
// remove a channel from the database and add a close channel summary. If
// called after a channel has already been removed, the method shouldn't return
// an error.
func TestAbandonChannel(t *testing.T) {
t.Parallel()
cdb, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
// If we attempt to abandon the state of a channel that doesn't exist
// in the open or closed channel bucket, then we should receive an
// error.
err = cdb.AbandonChannel(&wire.OutPoint{}, 0)
if err == nil {
t.Fatalf("removing non-existent channel should have failed")
}
// We'll now create a new channel to abandon shortly.
chanState, err := createTestChannelState(cdb)
if err != nil {
t.Fatalf("unable to create channel state: %v", err)
}
addr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18555,
}
err = chanState.SyncPending(addr, 10)
if err != nil {
t.Fatalf("unable to sync pending channel: %v", err)
}
// We should now be able to abandon the channel without any errors.
closeHeight := uint32(11)
err = cdb.AbandonChannel(&chanState.FundingOutpoint, closeHeight)
if err != nil {
t.Fatalf("unable to abandon channel: %v", err)
}
// At this point, the channel should no longer be found in the set of
// open channels.
_, err = cdb.FetchChannel(chanState.FundingOutpoint)
if err != ErrChannelNotFound {
t.Fatalf("channel should not have been found: %v", err)
}
// However we should be able to retrieve a close channel summary for
// the channel.
_, err = cdb.FetchClosedChannel(&chanState.FundingOutpoint)
if err != nil {
t.Fatalf("unable to fetch closed channel: %v", err)
}
// Finally, if we attempt to abandon the channel again, we should get a
// nil error as the channel has already been abandoned.
err = cdb.AbandonChannel(&chanState.FundingOutpoint, closeHeight)
if err != nil {
t.Fatalf("unable to abandon channel: %v", err)
}
}

@ -296,7 +296,7 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
} }
arbCfg.MarkChannelResolved = func() error { arbCfg.MarkChannelResolved = func() error {
return c.resolveContract(chanPoint, chanLog) return c.ResolveContract(chanPoint)
} }
// Finally, we'll need to construct a series of htlc Sets based on all // Finally, we'll need to construct a series of htlc Sets based on all
@ -321,11 +321,10 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
), nil ), nil
} }
// resolveContract marks a contract as fully resolved within the database. // ResolveContract marks a contract as fully resolved within the database.
// This is only to be done once all contracts which were live on the channel // This is only to be done once all contracts which were live on the channel
// before hitting the chain have been resolved. // before hitting the chain have been resolved.
func (c *ChainArbitrator) resolveContract(chanPoint wire.OutPoint, func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error {
arbLog ArbitratorLog) error {
log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint) log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint)
@ -338,27 +337,44 @@ func (c *ChainArbitrator) resolveContract(chanPoint wire.OutPoint,
return err return err
} }
// Now that the channel has been marked as fully closed, we'll stop
// both the channel arbitrator and chain watcher for this channel if
// they're still active.
var arbLog ArbitratorLog
c.Lock()
chainArb := c.activeChannels[chanPoint]
delete(c.activeChannels, chanPoint)
chainWatcher := c.activeWatchers[chanPoint]
delete(c.activeWatchers, chanPoint)
c.Unlock()
if chainArb != nil {
arbLog = chainArb.log
if err := chainArb.Stop(); err != nil {
log.Warnf("unable to stop ChannelArbitrator(%v): %v",
chanPoint, err)
}
}
if chainWatcher != nil {
if err := chainWatcher.Stop(); err != nil {
log.Warnf("unable to stop ChainWatcher(%v): %v",
chanPoint, err)
}
}
// Once this has been marked as resolved, we'll wipe the log that the
// channel arbitrator was using to store its persistent state. We do
// this after marking the channel resolved, as otherwise, the
// arbitrator would be re-created, and think it was starting from the
// default state.
if arbLog != nil { if arbLog != nil {
// Once this has been marked as resolved, we'll wipe the log
// that the channel arbitrator was using to store its
// persistent state. We do this after marking the channel
// resolved, as otherwise, the arbitrator would be re-created,
// and think it was starting from the default state.
if err := arbLog.WipeHistory(); err != nil { if err := arbLog.WipeHistory(); err != nil {
return err return err
} }
} }
c.Lock()
delete(c.activeChannels, chanPoint)
chainWatcher, ok := c.activeWatchers[chanPoint]
if ok {
chainWatcher.Stop()
}
delete(c.activeWatchers, chanPoint)
c.Unlock()
return nil return nil
} }
@ -491,7 +507,7 @@ func (c *ChainArbitrator) Start() error {
return err return err
} }
arbCfg.MarkChannelResolved = func() error { arbCfg.MarkChannelResolved = func() error {
return c.resolveContract(chanPoint, chanLog) return c.ResolveContract(chanPoint)
} }
// We can also leave off the set of HTLC's here as since the // We can also leave off the set of HTLC's here as since the

@ -115,3 +115,105 @@ func TestChainArbitratorRepublishCommitment(t *testing.T) {
t.Fatalf("unexpected tx published") t.Fatalf("unexpected tx published")
} }
} }
// TestResolveContract tests that if we have an active channel being watched by
// the chain arb, then a call to ResolveContract will mark the channel as fully
// closed in the database, and also clean up all arbitrator state.
func TestResolveContract(t *testing.T) {
t.Parallel()
// To start with, we'll create a new temp DB for the duration of this
// test.
tempPath, err := ioutil.TempDir("", "testdb")
if err != nil {
t.Fatalf("unable to make temp dir: %v", err)
}
defer os.RemoveAll(tempPath)
db, err := channeldb.Open(tempPath)
if err != nil {
t.Fatalf("unable to open db: %v", err)
}
defer db.Close()
// With the DB created, we'll make a new channel, and mark it as
// pending open within the database.
newChannel, _, cleanup, err := lnwallet.CreateTestChannels(true)
if err != nil {
t.Fatalf("unable to make new test channel: %v", err)
}
defer cleanup()
channel := newChannel.State()
channel.Db = db
addr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18556,
}
if err := channel.SyncPending(addr, 101); err != nil {
t.Fatalf("unable to write channel to db: %v", err)
}
// With the channel inserted into the database, we'll now create a new
// chain arbitrator that should pick up these new channels and launch
// resolver for them.
chainArbCfg := ChainArbitratorConfig{
ChainIO: &mockChainIO{},
Notifier: &mockNotifier{},
PublishTx: func(tx *wire.MsgTx) error {
return nil
},
}
chainArb := NewChainArbitrator(
chainArbCfg, db,
)
if err := chainArb.Start(); err != nil {
t.Fatal(err)
}
defer func() {
if err := chainArb.Stop(); err != nil {
t.Fatal(err)
}
}()
channelArb := chainArb.activeChannels[channel.FundingOutpoint]
// While the resolver are active, we'll now remove the channel from the
// database (mark is as closed).
err = db.AbandonChannel(&channel.FundingOutpoint, 4)
if err != nil {
t.Fatalf("unable to remove channel: %v", err)
}
// With the channel removed, we'll now manually call ResolveContract.
// This stimulates needing to remove a channel from the chain arb due
// to any possible external consistency issues.
err = chainArb.ResolveContract(channel.FundingOutpoint)
if err != nil {
t.Fatalf("unable to resolve contract: %v", err)
}
// The shouldn't be an active chain watcher or channel arb for this
// channel.
if len(chainArb.activeChannels) != 0 {
t.Fatalf("expected zero active channels, instead have %v",
len(chainArb.activeChannels))
}
if len(chainArb.activeWatchers) != 0 {
t.Fatalf("expected zero active watchers, instead have %v",
len(chainArb.activeWatchers))
}
// At this point, the channel's arbitrator log should also be empty as
// well.
_, err = channelArb.log.FetchContractResolutions()
if err != errScopeBucketNoExist {
t.Fatalf("channel arb log state should have been "+
"removed: %v", err)
}
// If we attempt to call this method again, then we should get a nil
// error, as there is no more state to be cleaned up.
err = chainArb.ResolveContract(channel.FundingOutpoint)
if err != nil {
t.Fatalf("second resolve call shouldn't fail: %v", err)
}
}

@ -13155,11 +13155,17 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) {
ctxt, _ := context.WithTimeout(ctxb, channelOpenTimeout) ctxt, _ := context.WithTimeout(ctxb, channelOpenTimeout)
chanPoint := openChannelAndAssert( chanPoint := openChannelAndAssert(
ctxt, t, net, net.Alice, net.Bob, channelParam) ctxt, t, net, net.Alice, net.Bob, channelParam,
)
txid, err := lnd.GetChanPointFundingTxid(chanPoint)
if err != nil {
t.Fatalf("unable to get txid: %v", err)
}
chanPointStr := fmt.Sprintf("%v:%v", txid, chanPoint.OutputIndex)
// Wait for channel to be confirmed open. // Wait for channel to be confirmed open.
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
err := net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint) err = net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint)
if err != nil { if err != nil {
t.Fatalf("alice didn't report channel: %v", err) t.Fatalf("alice didn't report channel: %v", err)
} }
@ -13168,6 +13174,25 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("bob didn't report channel: %v", err) t.Fatalf("bob didn't report channel: %v", err)
} }
// Now that the channel is open, we'll obtain its channel ID real quick
// so we can use it to query the graph below.
listReq := &lnrpc.ListChannelsRequest{}
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
aliceChannelList, err := net.Alice.ListChannels(ctxt, listReq)
if err != nil {
t.Fatalf("unable to fetch alice's channels: %v", err)
}
var chanID uint64
for _, channel := range aliceChannelList.Channels {
if channel.ChannelPoint == chanPointStr {
chanID = channel.ChanId
}
}
if chanID == 0 {
t.Fatalf("unable to find channel")
}
// Send request to abandon channel. // Send request to abandon channel.
abandonChannelRequest := &lnrpc.AbandonChannelRequest{ abandonChannelRequest := &lnrpc.AbandonChannelRequest{
ChannelPoint: chanPoint, ChannelPoint: chanPoint,
@ -13180,9 +13205,8 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) {
} }
// Assert that channel in no longer open. // Assert that channel in no longer open.
listReq := &lnrpc.ListChannelsRequest{}
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
aliceChannelList, err := net.Alice.ListChannels(ctxt, listReq) aliceChannelList, err = net.Alice.ListChannels(ctxt, listReq)
if err != nil { if err != nil {
t.Fatalf("unable to list channels: %v", err) t.Fatalf("unable to list channels: %v", err)
} }
@ -13230,9 +13254,26 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) {
len(aliceClosedList.Channels)) len(aliceClosedList.Channels))
} }
// Now that we're done with the test, the channel can be closed. This is // Ensure that the channel can no longer be found in the channel graph.
// necessary to avoid unexpected outcomes of other tests that use Bob's _, err = net.Alice.GetChanInfo(ctxb, &lnrpc.ChanInfoRequest{
// lnd instance. ChanId: chanID,
})
if !strings.Contains(err.Error(), "marked as zombie") {
t.Fatalf("channel shouldn't be found in the channel " +
"graph!")
}
// Calling AbandonChannel again, should result in no new errors, as the
// channel has already been removed.
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
_, err = net.Alice.AbandonChannel(ctxt, abandonChannelRequest)
if err != nil {
t.Fatalf("unable to abandon channel a second time: %v", err)
}
// Now that we're done with the test, the channel can be closed. This
// is necessary to avoid unexpected outcomes of other tests that use
// Bob's lnd instance.
ctxt, _ = context.WithTimeout(ctxb, channelCloseTimeout) ctxt, _ = context.WithTimeout(ctxb, channelCloseTimeout)
closeChannelAndAssert(ctxt, t, net, net.Bob, chanPoint, true) closeChannelAndAssert(ctxt, t, net, net.Bob, chanPoint, true)

@ -1947,6 +1947,28 @@ func createRPCCloseUpdate(update interface{}) (
return nil, errors.New("unknown close status update") return nil, errors.New("unknown close status update")
} }
// abandonChanFromGraph attempts to remove a channel from the channel graph. If
// we can't find the chanID in the graph, then we assume it has already been
// removed, and will return a nop.
func abandonChanFromGraph(chanGraph *channeldb.ChannelGraph,
chanPoint *wire.OutPoint) error {
// First, we'll obtain the channel ID. If we can't locate this, then
// it's the case that the channel may have already been removed from
// the graph, so we'll return a nil error.
chanID, err := chanGraph.ChannelID(chanPoint)
switch {
case err == channeldb.ErrEdgeNotFound:
return nil
case err != nil:
return err
}
// If the channel ID is still in the graph, then that means the channel
// is still open, so we'll now move to purge it from the graph.
return chanGraph.DeleteChannelEdges(chanID)
}
// AbandonChannel removes all channel state from the database except for a // AbandonChannel removes all channel state from the database except for a
// close summary. This method can be used to get rid of permanently unusable // close summary. This method can be used to get rid of permanently unusable
// channels due to bugs fixed in newer versions of lnd. // channels due to bugs fixed in newer versions of lnd.
@ -1970,42 +1992,70 @@ func (r *rpcServer) AbandonChannel(ctx context.Context,
index := in.ChannelPoint.OutputIndex index := in.ChannelPoint.OutputIndex
chanPoint := wire.NewOutPoint(txid, index) chanPoint := wire.NewOutPoint(txid, index)
// With the chanPoint constructed, we'll attempt to find the target // When we remove the channel from the database, we need to set a close
// channel in the database. If we can't find the channel, then we'll // height, so we'll just use the current best known height.
// return the error back to the caller.
dbChan, err := r.server.chanDB.FetchChannel(*chanPoint)
if err != nil {
return nil, err
}
// Now that we've found the channel, we'll populate a close summary for
// the channel, so we can store as much information for this abounded
// channel as possible. We also ensure that we set Pending to false, to
// indicate that this channel has been "fully" closed.
_, bestHeight, err := r.server.cc.chainIO.GetBestBlock() _, bestHeight, err := r.server.cc.chainIO.GetBestBlock()
if err != nil { if err != nil {
return nil, err return nil, err
} }
summary := &channeldb.ChannelCloseSummary{
CloseType: channeldb.Abandoned, dbChan, err := r.server.chanDB.FetchChannel(*chanPoint)
ChanPoint: *chanPoint, switch {
ChainHash: dbChan.ChainHash, // If the channel isn't found in the set of open channels, then we can
CloseHeight: uint32(bestHeight), // continue on as it can't be loaded into the link/peer.
RemotePub: dbChan.IdentityPub, case err == channeldb.ErrChannelNotFound:
Capacity: dbChan.Capacity, break
SettledBalance: dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
ShortChanID: dbChan.ShortChanID(), // If the channel is still known to be open, then before we modify any
RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation, // on-disk state, we'll remove the channel from the switch and peer
RemoteNextRevocation: dbChan.RemoteNextRevocation, // state if it's been loaded in.
LocalChanConfig: dbChan.LocalChanCfg, case err == nil:
// We'll mark the channel as borked before we remove the state
// from the switch/peer so it won't be loaded back in if the
// peer reconnects.
if err := dbChan.MarkBorked(); err != nil {
return nil, err
}
remotePub := dbChan.IdentityPub
if peer, err := r.server.FindPeer(remotePub); err == nil {
if err := peer.WipeChannel(chanPoint); err != nil {
return nil, fmt.Errorf("unable to wipe "+
"channel state: %v", err)
}
}
default:
return nil, err
} }
// Finally, we'll close the channel in the DB, and return back to the // Abandoning a channel is a three step process: remove from the open
// caller. // channel state, remove from the graph, remove from the contract
err = dbChan.CloseChannel(summary) // court. Between any step it's possible that the users restarts the
// process all over again. As a result, each of the steps below are
// intended to be idempotent.
err = r.server.chanDB.AbandonChannel(chanPoint, uint32(bestHeight))
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = abandonChanFromGraph(
r.server.chanDB.ChannelGraph(), chanPoint,
)
if err != nil {
return nil, err
}
err = r.server.chainArb.ResolveContract(*chanPoint)
if err != nil {
return nil, err
}
// If this channel was in the process of being closed, but didn't fully
// close, then it's possible that the nursery is hanging on to some
// state. To err on the side of caution, we'll now attempt to wipe any
// state for this channel from the nursery.
err = r.server.utxoNursery.cfg.Store.RemoveChannel(chanPoint)
if err != nil && err != ErrContractNotFound {
return nil, err
}
return &lnrpc.AbandonChannelResponse{}, nil return &lnrpc.AbandonChannelResponse{}, nil
} }