From f6614dd4350c5a04fc7ada68a60a75ccd5bf62ce Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 19 Nov 2019 19:46:32 -0800 Subject: [PATCH 1/3] channeldb: add new AbandonChannel method --- channeldb/db.go | 48 +++++++++++++++++++++++++++++++++ channeldb/db_test.go | 64 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+) diff --git a/channeldb/db.go b/channeldb/db.go index a67ccd2f..e8a8e5af 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -1094,6 +1094,54 @@ func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) { return dedupedAddrs, nil } +// AbandonChannel attempts to remove the target channel from the open channel +// database. If the channel was already removed (has a closed channel entry), +// then we'll return a nil error. Otherwise, we'll insert a new close summary +// into the database. +func (d *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error { + // With the chanPoint constructed, we'll attempt to find the target + // channel in the database. If we can't find the channel, then we'll + // return the error back to the caller. + dbChan, err := d.FetchChannel(*chanPoint) + switch { + // If the channel wasn't found, then it's possible that it was already + // abandoned from the database. + case err == ErrChannelNotFound: + _, closedErr := d.FetchClosedChannel(chanPoint) + if closedErr != nil { + return closedErr + } + + // If the channel was already closed, then we don't return an + // error as we'd like fro this step to be repeatable. + return nil + case err != nil: + return err + } + + // Now that we've found the channel, we'll populate a close summary for + // the channel, so we can store as much information for this abounded + // channel as possible. We also ensure that we set Pending to false, to + // indicate that this channel has been "fully" closed. + summary := &ChannelCloseSummary{ + CloseType: Abandoned, + ChanPoint: *chanPoint, + ChainHash: dbChan.ChainHash, + CloseHeight: bestHeight, + RemotePub: dbChan.IdentityPub, + Capacity: dbChan.Capacity, + SettledBalance: dbChan.LocalCommitment.LocalBalance.ToSatoshis(), + ShortChanID: dbChan.ShortChanID(), + RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation, + RemoteNextRevocation: dbChan.RemoteNextRevocation, + LocalChanConfig: dbChan.LocalChanCfg, + } + + // Finally, we'll close the channel in the DB, and return back to the + // caller. + return dbChan.CloseChannel(summary) +} + // syncVersions function is used for safe db version synchronization. It // applies migration functions to the current database and recovers the // previous state of db if at least one error/panic appeared during migration. diff --git a/channeldb/db_test.go b/channeldb/db_test.go index 198f6e2b..4deffe98 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -469,3 +469,67 @@ func TestRestoreChannelShells(t *testing.T) { t.Fatalf("only a single edge should be inserted: %v", err) } } + +// TestAbandonChannel tests that the AbandonChannel method is able to properly +// remove a channel from the database and add a close channel summary. If +// called after a channel has already been removed, the method shouldn't return +// an error. +func TestAbandonChannel(t *testing.T) { + t.Parallel() + + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + defer cleanUp() + + // If we attempt to abandon the state of a channel that doesn't exist + // in the open or closed channel bucket, then we should receive an + // error. + err = cdb.AbandonChannel(&wire.OutPoint{}, 0) + if err == nil { + t.Fatalf("removing non-existent channel should have failed") + } + + // We'll now create a new channel to abandon shortly. + chanState, err := createTestChannelState(cdb) + if err != nil { + t.Fatalf("unable to create channel state: %v", err) + } + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18555, + } + err = chanState.SyncPending(addr, 10) + if err != nil { + t.Fatalf("unable to sync pending channel: %v", err) + } + + // We should now be able to abandon the channel without any errors. + closeHeight := uint32(11) + err = cdb.AbandonChannel(&chanState.FundingOutpoint, closeHeight) + if err != nil { + t.Fatalf("unable to abandon channel: %v", err) + } + + // At this point, the channel should no longer be found in the set of + // open channels. + _, err = cdb.FetchChannel(chanState.FundingOutpoint) + if err != ErrChannelNotFound { + t.Fatalf("channel should not have been found: %v", err) + } + + // However we should be able to retrieve a close channel summary for + // the channel. + _, err = cdb.FetchClosedChannel(&chanState.FundingOutpoint) + if err != nil { + t.Fatalf("unable to fetch closed channel: %v", err) + } + + // Finally, if we attempt to abandon the channel again, we should get a + // nil error as the channel has already been abandoned. + err = cdb.AbandonChannel(&chanState.FundingOutpoint, closeHeight) + if err != nil { + t.Fatalf("unable to abandon channel: %v", err) + } +} From faa5f340fd03bef33a5408f103b7940f957982db Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 13 Nov 2019 16:05:49 -0800 Subject: [PATCH 2/3] contractcourt: export ResolveContract, update to stop channel arb In this commit, we export the `ResolveContract` method as it's useful as a way to manually remove active contracts from the chain and channel arbitrator. Along the way, we also update the method to also attempt to stop the channel arb if it exists. This allows an external party to remove all state with a single call. Before this commit, it was assumed that this method was only called by the channel arb itself, when it was already on the way to exiting after all contracts were fully resolved. We also add a set of unit tests to exercise the intended behavior as this method is now public. --- contractcourt/chain_arbitrator.go | 56 +++++++++----- contractcourt/chain_arbitrator_test.go | 102 +++++++++++++++++++++++++ 2 files changed, 138 insertions(+), 20 deletions(-) diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 6d7c674a..1c18253e 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -296,7 +296,7 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, } arbCfg.MarkChannelResolved = func() error { - return c.resolveContract(chanPoint, chanLog) + return c.ResolveContract(chanPoint) } // Finally, we'll need to construct a series of htlc Sets based on all @@ -321,11 +321,10 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, ), nil } -// resolveContract marks a contract as fully resolved within the database. +// ResolveContract marks a contract as fully resolved within the database. // This is only to be done once all contracts which were live on the channel // before hitting the chain have been resolved. -func (c *ChainArbitrator) resolveContract(chanPoint wire.OutPoint, - arbLog ArbitratorLog) error { +func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error { log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint) @@ -338,27 +337,44 @@ func (c *ChainArbitrator) resolveContract(chanPoint wire.OutPoint, return err } + // Now that the channel has been marked as fully closed, we'll stop + // both the channel arbitrator and chain watcher for this channel if + // they're still active. + var arbLog ArbitratorLog + c.Lock() + chainArb := c.activeChannels[chanPoint] + delete(c.activeChannels, chanPoint) + + chainWatcher := c.activeWatchers[chanPoint] + delete(c.activeWatchers, chanPoint) + c.Unlock() + + if chainArb != nil { + arbLog = chainArb.log + + if err := chainArb.Stop(); err != nil { + log.Warnf("unable to stop ChannelArbitrator(%v): %v", + chanPoint, err) + } + } + if chainWatcher != nil { + if err := chainWatcher.Stop(); err != nil { + log.Warnf("unable to stop ChainWatcher(%v): %v", + chanPoint, err) + } + } + + // Once this has been marked as resolved, we'll wipe the log that the + // channel arbitrator was using to store its persistent state. We do + // this after marking the channel resolved, as otherwise, the + // arbitrator would be re-created, and think it was starting from the + // default state. if arbLog != nil { - // Once this has been marked as resolved, we'll wipe the log - // that the channel arbitrator was using to store its - // persistent state. We do this after marking the channel - // resolved, as otherwise, the arbitrator would be re-created, - // and think it was starting from the default state. if err := arbLog.WipeHistory(); err != nil { return err } } - c.Lock() - delete(c.activeChannels, chanPoint) - - chainWatcher, ok := c.activeWatchers[chanPoint] - if ok { - chainWatcher.Stop() - } - delete(c.activeWatchers, chanPoint) - c.Unlock() - return nil } @@ -491,7 +507,7 @@ func (c *ChainArbitrator) Start() error { return err } arbCfg.MarkChannelResolved = func() error { - return c.resolveContract(chanPoint, chanLog) + return c.ResolveContract(chanPoint) } // We can also leave off the set of HTLC's here as since the diff --git a/contractcourt/chain_arbitrator_test.go b/contractcourt/chain_arbitrator_test.go index 38ea2a35..28682c92 100644 --- a/contractcourt/chain_arbitrator_test.go +++ b/contractcourt/chain_arbitrator_test.go @@ -115,3 +115,105 @@ func TestChainArbitratorRepublishCommitment(t *testing.T) { t.Fatalf("unexpected tx published") } } + +// TestResolveContract tests that if we have an active channel being watched by +// the chain arb, then a call to ResolveContract will mark the channel as fully +// closed in the database, and also clean up all arbitrator state. +func TestResolveContract(t *testing.T) { + t.Parallel() + + // To start with, we'll create a new temp DB for the duration of this + // test. + tempPath, err := ioutil.TempDir("", "testdb") + if err != nil { + t.Fatalf("unable to make temp dir: %v", err) + } + defer os.RemoveAll(tempPath) + db, err := channeldb.Open(tempPath) + if err != nil { + t.Fatalf("unable to open db: %v", err) + } + defer db.Close() + + // With the DB created, we'll make a new channel, and mark it as + // pending open within the database. + newChannel, _, cleanup, err := lnwallet.CreateTestChannels(true) + if err != nil { + t.Fatalf("unable to make new test channel: %v", err) + } + defer cleanup() + channel := newChannel.State() + channel.Db = db + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + if err := channel.SyncPending(addr, 101); err != nil { + t.Fatalf("unable to write channel to db: %v", err) + } + + // With the channel inserted into the database, we'll now create a new + // chain arbitrator that should pick up these new channels and launch + // resolver for them. + chainArbCfg := ChainArbitratorConfig{ + ChainIO: &mockChainIO{}, + Notifier: &mockNotifier{}, + PublishTx: func(tx *wire.MsgTx) error { + return nil + }, + } + chainArb := NewChainArbitrator( + chainArbCfg, db, + ) + if err := chainArb.Start(); err != nil { + t.Fatal(err) + } + defer func() { + if err := chainArb.Stop(); err != nil { + t.Fatal(err) + } + }() + + channelArb := chainArb.activeChannels[channel.FundingOutpoint] + + // While the resolver are active, we'll now remove the channel from the + // database (mark is as closed). + err = db.AbandonChannel(&channel.FundingOutpoint, 4) + if err != nil { + t.Fatalf("unable to remove channel: %v", err) + } + + // With the channel removed, we'll now manually call ResolveContract. + // This stimulates needing to remove a channel from the chain arb due + // to any possible external consistency issues. + err = chainArb.ResolveContract(channel.FundingOutpoint) + if err != nil { + t.Fatalf("unable to resolve contract: %v", err) + } + + // The shouldn't be an active chain watcher or channel arb for this + // channel. + if len(chainArb.activeChannels) != 0 { + t.Fatalf("expected zero active channels, instead have %v", + len(chainArb.activeChannels)) + } + if len(chainArb.activeWatchers) != 0 { + t.Fatalf("expected zero active watchers, instead have %v", + len(chainArb.activeWatchers)) + } + + // At this point, the channel's arbitrator log should also be empty as + // well. + _, err = channelArb.log.FetchContractResolutions() + if err != errScopeBucketNoExist { + t.Fatalf("channel arb log state should have been "+ + "removed: %v", err) + } + + // If we attempt to call this method again, then we should get a nil + // error, as there is no more state to be cleaned up. + err = chainArb.ResolveContract(channel.FundingOutpoint) + if err != nil { + t.Fatalf("second resolve call shouldn't fail: %v", err) + } +} From 32965fd4bef388c3cbff1e35d0fa287fdd4ca4bd Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 13 Nov 2019 16:09:20 -0800 Subject: [PATCH 3/3] rpc: update AbandonChannel to remove state from cnct, nursery and graph In this commit, we update the `AbandonChannel` method to also remove the state from the countract court as well as the channel graph. Abandoning a channel is now a three step process: remove from the open channel state, remove from the graph, remove from the contract court. Between any step it's possible that the users restarts the process all over again. As a result, each of the steps below are intended to be idempotent. We also update the integration test to assert that no channel is found in the graph any longer. Before this commit, this test would fail as the channel was still found in the graph, which can cause other issues for an operational daemon. Fixes #3716. --- lntest/itest/lnd_test.go | 55 ++++++++++++++++++--- rpcserver.go | 104 +++++++++++++++++++++++++++++---------- 2 files changed, 125 insertions(+), 34 deletions(-) diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 5817e85d..a6a76099 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -13155,11 +13155,17 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) { ctxt, _ := context.WithTimeout(ctxb, channelOpenTimeout) chanPoint := openChannelAndAssert( - ctxt, t, net, net.Alice, net.Bob, channelParam) + ctxt, t, net, net.Alice, net.Bob, channelParam, + ) + txid, err := lnd.GetChanPointFundingTxid(chanPoint) + if err != nil { + t.Fatalf("unable to get txid: %v", err) + } + chanPointStr := fmt.Sprintf("%v:%v", txid, chanPoint.OutputIndex) // Wait for channel to be confirmed open. ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) - err := net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint) + err = net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint) if err != nil { t.Fatalf("alice didn't report channel: %v", err) } @@ -13168,6 +13174,25 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("bob didn't report channel: %v", err) } + // Now that the channel is open, we'll obtain its channel ID real quick + // so we can use it to query the graph below. + listReq := &lnrpc.ListChannelsRequest{} + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + aliceChannelList, err := net.Alice.ListChannels(ctxt, listReq) + if err != nil { + t.Fatalf("unable to fetch alice's channels: %v", err) + } + var chanID uint64 + for _, channel := range aliceChannelList.Channels { + if channel.ChannelPoint == chanPointStr { + chanID = channel.ChanId + } + } + + if chanID == 0 { + t.Fatalf("unable to find channel") + } + // Send request to abandon channel. abandonChannelRequest := &lnrpc.AbandonChannelRequest{ ChannelPoint: chanPoint, @@ -13180,9 +13205,8 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) { } // Assert that channel in no longer open. - listReq := &lnrpc.ListChannelsRequest{} ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) - aliceChannelList, err := net.Alice.ListChannels(ctxt, listReq) + aliceChannelList, err = net.Alice.ListChannels(ctxt, listReq) if err != nil { t.Fatalf("unable to list channels: %v", err) } @@ -13230,9 +13254,26 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) { len(aliceClosedList.Channels)) } - // Now that we're done with the test, the channel can be closed. This is - // necessary to avoid unexpected outcomes of other tests that use Bob's - // lnd instance. + // Ensure that the channel can no longer be found in the channel graph. + _, err = net.Alice.GetChanInfo(ctxb, &lnrpc.ChanInfoRequest{ + ChanId: chanID, + }) + if !strings.Contains(err.Error(), "marked as zombie") { + t.Fatalf("channel shouldn't be found in the channel " + + "graph!") + } + + // Calling AbandonChannel again, should result in no new errors, as the + // channel has already been removed. + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + _, err = net.Alice.AbandonChannel(ctxt, abandonChannelRequest) + if err != nil { + t.Fatalf("unable to abandon channel a second time: %v", err) + } + + // Now that we're done with the test, the channel can be closed. This + // is necessary to avoid unexpected outcomes of other tests that use + // Bob's lnd instance. ctxt, _ = context.WithTimeout(ctxb, channelCloseTimeout) closeChannelAndAssert(ctxt, t, net, net.Bob, chanPoint, true) diff --git a/rpcserver.go b/rpcserver.go index 828ce7cf..837c2cae 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1947,6 +1947,28 @@ func createRPCCloseUpdate(update interface{}) ( return nil, errors.New("unknown close status update") } +// abandonChanFromGraph attempts to remove a channel from the channel graph. If +// we can't find the chanID in the graph, then we assume it has already been +// removed, and will return a nop. +func abandonChanFromGraph(chanGraph *channeldb.ChannelGraph, + chanPoint *wire.OutPoint) error { + + // First, we'll obtain the channel ID. If we can't locate this, then + // it's the case that the channel may have already been removed from + // the graph, so we'll return a nil error. + chanID, err := chanGraph.ChannelID(chanPoint) + switch { + case err == channeldb.ErrEdgeNotFound: + return nil + case err != nil: + return err + } + + // If the channel ID is still in the graph, then that means the channel + // is still open, so we'll now move to purge it from the graph. + return chanGraph.DeleteChannelEdges(chanID) +} + // AbandonChannel removes all channel state from the database except for a // close summary. This method can be used to get rid of permanently unusable // channels due to bugs fixed in newer versions of lnd. @@ -1970,42 +1992,70 @@ func (r *rpcServer) AbandonChannel(ctx context.Context, index := in.ChannelPoint.OutputIndex chanPoint := wire.NewOutPoint(txid, index) - // With the chanPoint constructed, we'll attempt to find the target - // channel in the database. If we can't find the channel, then we'll - // return the error back to the caller. - dbChan, err := r.server.chanDB.FetchChannel(*chanPoint) - if err != nil { - return nil, err - } - - // Now that we've found the channel, we'll populate a close summary for - // the channel, so we can store as much information for this abounded - // channel as possible. We also ensure that we set Pending to false, to - // indicate that this channel has been "fully" closed. + // When we remove the channel from the database, we need to set a close + // height, so we'll just use the current best known height. _, bestHeight, err := r.server.cc.chainIO.GetBestBlock() if err != nil { return nil, err } - summary := &channeldb.ChannelCloseSummary{ - CloseType: channeldb.Abandoned, - ChanPoint: *chanPoint, - ChainHash: dbChan.ChainHash, - CloseHeight: uint32(bestHeight), - RemotePub: dbChan.IdentityPub, - Capacity: dbChan.Capacity, - SettledBalance: dbChan.LocalCommitment.LocalBalance.ToSatoshis(), - ShortChanID: dbChan.ShortChanID(), - RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation, - RemoteNextRevocation: dbChan.RemoteNextRevocation, - LocalChanConfig: dbChan.LocalChanCfg, + + dbChan, err := r.server.chanDB.FetchChannel(*chanPoint) + switch { + // If the channel isn't found in the set of open channels, then we can + // continue on as it can't be loaded into the link/peer. + case err == channeldb.ErrChannelNotFound: + break + + // If the channel is still known to be open, then before we modify any + // on-disk state, we'll remove the channel from the switch and peer + // state if it's been loaded in. + case err == nil: + // We'll mark the channel as borked before we remove the state + // from the switch/peer so it won't be loaded back in if the + // peer reconnects. + if err := dbChan.MarkBorked(); err != nil { + return nil, err + } + remotePub := dbChan.IdentityPub + if peer, err := r.server.FindPeer(remotePub); err == nil { + if err := peer.WipeChannel(chanPoint); err != nil { + return nil, fmt.Errorf("unable to wipe "+ + "channel state: %v", err) + } + } + + default: + return nil, err } - // Finally, we'll close the channel in the DB, and return back to the - // caller. - err = dbChan.CloseChannel(summary) + // Abandoning a channel is a three step process: remove from the open + // channel state, remove from the graph, remove from the contract + // court. Between any step it's possible that the users restarts the + // process all over again. As a result, each of the steps below are + // intended to be idempotent. + err = r.server.chanDB.AbandonChannel(chanPoint, uint32(bestHeight)) if err != nil { return nil, err } + err = abandonChanFromGraph( + r.server.chanDB.ChannelGraph(), chanPoint, + ) + if err != nil { + return nil, err + } + err = r.server.chainArb.ResolveContract(*chanPoint) + if err != nil { + return nil, err + } + + // If this channel was in the process of being closed, but didn't fully + // close, then it's possible that the nursery is hanging on to some + // state. To err on the side of caution, we'll now attempt to wipe any + // state for this channel from the nursery. + err = r.server.utxoNursery.cfg.Store.RemoveChannel(chanPoint) + if err != nil && err != ErrContractNotFound { + return nil, err + } return &lnrpc.AbandonChannelResponse{}, nil }