diff --git a/channeldb/db.go b/channeldb/db.go index a67ccd2f..e8a8e5af 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -1094,6 +1094,54 @@ func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) { return dedupedAddrs, nil } +// AbandonChannel attempts to remove the target channel from the open channel +// database. If the channel was already removed (has a closed channel entry), +// then we'll return a nil error. Otherwise, we'll insert a new close summary +// into the database. +func (d *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error { + // With the chanPoint constructed, we'll attempt to find the target + // channel in the database. If we can't find the channel, then we'll + // return the error back to the caller. + dbChan, err := d.FetchChannel(*chanPoint) + switch { + // If the channel wasn't found, then it's possible that it was already + // abandoned from the database. + case err == ErrChannelNotFound: + _, closedErr := d.FetchClosedChannel(chanPoint) + if closedErr != nil { + return closedErr + } + + // If the channel was already closed, then we don't return an + // error as we'd like fro this step to be repeatable. + return nil + case err != nil: + return err + } + + // Now that we've found the channel, we'll populate a close summary for + // the channel, so we can store as much information for this abounded + // channel as possible. We also ensure that we set Pending to false, to + // indicate that this channel has been "fully" closed. + summary := &ChannelCloseSummary{ + CloseType: Abandoned, + ChanPoint: *chanPoint, + ChainHash: dbChan.ChainHash, + CloseHeight: bestHeight, + RemotePub: dbChan.IdentityPub, + Capacity: dbChan.Capacity, + SettledBalance: dbChan.LocalCommitment.LocalBalance.ToSatoshis(), + ShortChanID: dbChan.ShortChanID(), + RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation, + RemoteNextRevocation: dbChan.RemoteNextRevocation, + LocalChanConfig: dbChan.LocalChanCfg, + } + + // Finally, we'll close the channel in the DB, and return back to the + // caller. + return dbChan.CloseChannel(summary) +} + // syncVersions function is used for safe db version synchronization. It // applies migration functions to the current database and recovers the // previous state of db if at least one error/panic appeared during migration. diff --git a/channeldb/db_test.go b/channeldb/db_test.go index 198f6e2b..4deffe98 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -469,3 +469,67 @@ func TestRestoreChannelShells(t *testing.T) { t.Fatalf("only a single edge should be inserted: %v", err) } } + +// TestAbandonChannel tests that the AbandonChannel method is able to properly +// remove a channel from the database and add a close channel summary. If +// called after a channel has already been removed, the method shouldn't return +// an error. +func TestAbandonChannel(t *testing.T) { + t.Parallel() + + cdb, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + defer cleanUp() + + // If we attempt to abandon the state of a channel that doesn't exist + // in the open or closed channel bucket, then we should receive an + // error. + err = cdb.AbandonChannel(&wire.OutPoint{}, 0) + if err == nil { + t.Fatalf("removing non-existent channel should have failed") + } + + // We'll now create a new channel to abandon shortly. + chanState, err := createTestChannelState(cdb) + if err != nil { + t.Fatalf("unable to create channel state: %v", err) + } + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18555, + } + err = chanState.SyncPending(addr, 10) + if err != nil { + t.Fatalf("unable to sync pending channel: %v", err) + } + + // We should now be able to abandon the channel without any errors. + closeHeight := uint32(11) + err = cdb.AbandonChannel(&chanState.FundingOutpoint, closeHeight) + if err != nil { + t.Fatalf("unable to abandon channel: %v", err) + } + + // At this point, the channel should no longer be found in the set of + // open channels. + _, err = cdb.FetchChannel(chanState.FundingOutpoint) + if err != ErrChannelNotFound { + t.Fatalf("channel should not have been found: %v", err) + } + + // However we should be able to retrieve a close channel summary for + // the channel. + _, err = cdb.FetchClosedChannel(&chanState.FundingOutpoint) + if err != nil { + t.Fatalf("unable to fetch closed channel: %v", err) + } + + // Finally, if we attempt to abandon the channel again, we should get a + // nil error as the channel has already been abandoned. + err = cdb.AbandonChannel(&chanState.FundingOutpoint, closeHeight) + if err != nil { + t.Fatalf("unable to abandon channel: %v", err) + } +} diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 6d7c674a..1c18253e 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -296,7 +296,7 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, } arbCfg.MarkChannelResolved = func() error { - return c.resolveContract(chanPoint, chanLog) + return c.ResolveContract(chanPoint) } // Finally, we'll need to construct a series of htlc Sets based on all @@ -321,11 +321,10 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, ), nil } -// resolveContract marks a contract as fully resolved within the database. +// ResolveContract marks a contract as fully resolved within the database. // This is only to be done once all contracts which were live on the channel // before hitting the chain have been resolved. -func (c *ChainArbitrator) resolveContract(chanPoint wire.OutPoint, - arbLog ArbitratorLog) error { +func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error { log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint) @@ -338,27 +337,44 @@ func (c *ChainArbitrator) resolveContract(chanPoint wire.OutPoint, return err } + // Now that the channel has been marked as fully closed, we'll stop + // both the channel arbitrator and chain watcher for this channel if + // they're still active. + var arbLog ArbitratorLog + c.Lock() + chainArb := c.activeChannels[chanPoint] + delete(c.activeChannels, chanPoint) + + chainWatcher := c.activeWatchers[chanPoint] + delete(c.activeWatchers, chanPoint) + c.Unlock() + + if chainArb != nil { + arbLog = chainArb.log + + if err := chainArb.Stop(); err != nil { + log.Warnf("unable to stop ChannelArbitrator(%v): %v", + chanPoint, err) + } + } + if chainWatcher != nil { + if err := chainWatcher.Stop(); err != nil { + log.Warnf("unable to stop ChainWatcher(%v): %v", + chanPoint, err) + } + } + + // Once this has been marked as resolved, we'll wipe the log that the + // channel arbitrator was using to store its persistent state. We do + // this after marking the channel resolved, as otherwise, the + // arbitrator would be re-created, and think it was starting from the + // default state. if arbLog != nil { - // Once this has been marked as resolved, we'll wipe the log - // that the channel arbitrator was using to store its - // persistent state. We do this after marking the channel - // resolved, as otherwise, the arbitrator would be re-created, - // and think it was starting from the default state. if err := arbLog.WipeHistory(); err != nil { return err } } - c.Lock() - delete(c.activeChannels, chanPoint) - - chainWatcher, ok := c.activeWatchers[chanPoint] - if ok { - chainWatcher.Stop() - } - delete(c.activeWatchers, chanPoint) - c.Unlock() - return nil } @@ -491,7 +507,7 @@ func (c *ChainArbitrator) Start() error { return err } arbCfg.MarkChannelResolved = func() error { - return c.resolveContract(chanPoint, chanLog) + return c.ResolveContract(chanPoint) } // We can also leave off the set of HTLC's here as since the diff --git a/contractcourt/chain_arbitrator_test.go b/contractcourt/chain_arbitrator_test.go index 38ea2a35..28682c92 100644 --- a/contractcourt/chain_arbitrator_test.go +++ b/contractcourt/chain_arbitrator_test.go @@ -115,3 +115,105 @@ func TestChainArbitratorRepublishCommitment(t *testing.T) { t.Fatalf("unexpected tx published") } } + +// TestResolveContract tests that if we have an active channel being watched by +// the chain arb, then a call to ResolveContract will mark the channel as fully +// closed in the database, and also clean up all arbitrator state. +func TestResolveContract(t *testing.T) { + t.Parallel() + + // To start with, we'll create a new temp DB for the duration of this + // test. + tempPath, err := ioutil.TempDir("", "testdb") + if err != nil { + t.Fatalf("unable to make temp dir: %v", err) + } + defer os.RemoveAll(tempPath) + db, err := channeldb.Open(tempPath) + if err != nil { + t.Fatalf("unable to open db: %v", err) + } + defer db.Close() + + // With the DB created, we'll make a new channel, and mark it as + // pending open within the database. + newChannel, _, cleanup, err := lnwallet.CreateTestChannels(true) + if err != nil { + t.Fatalf("unable to make new test channel: %v", err) + } + defer cleanup() + channel := newChannel.State() + channel.Db = db + addr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + if err := channel.SyncPending(addr, 101); err != nil { + t.Fatalf("unable to write channel to db: %v", err) + } + + // With the channel inserted into the database, we'll now create a new + // chain arbitrator that should pick up these new channels and launch + // resolver for them. + chainArbCfg := ChainArbitratorConfig{ + ChainIO: &mockChainIO{}, + Notifier: &mockNotifier{}, + PublishTx: func(tx *wire.MsgTx) error { + return nil + }, + } + chainArb := NewChainArbitrator( + chainArbCfg, db, + ) + if err := chainArb.Start(); err != nil { + t.Fatal(err) + } + defer func() { + if err := chainArb.Stop(); err != nil { + t.Fatal(err) + } + }() + + channelArb := chainArb.activeChannels[channel.FundingOutpoint] + + // While the resolver are active, we'll now remove the channel from the + // database (mark is as closed). + err = db.AbandonChannel(&channel.FundingOutpoint, 4) + if err != nil { + t.Fatalf("unable to remove channel: %v", err) + } + + // With the channel removed, we'll now manually call ResolveContract. + // This stimulates needing to remove a channel from the chain arb due + // to any possible external consistency issues. + err = chainArb.ResolveContract(channel.FundingOutpoint) + if err != nil { + t.Fatalf("unable to resolve contract: %v", err) + } + + // The shouldn't be an active chain watcher or channel arb for this + // channel. + if len(chainArb.activeChannels) != 0 { + t.Fatalf("expected zero active channels, instead have %v", + len(chainArb.activeChannels)) + } + if len(chainArb.activeWatchers) != 0 { + t.Fatalf("expected zero active watchers, instead have %v", + len(chainArb.activeWatchers)) + } + + // At this point, the channel's arbitrator log should also be empty as + // well. + _, err = channelArb.log.FetchContractResolutions() + if err != errScopeBucketNoExist { + t.Fatalf("channel arb log state should have been "+ + "removed: %v", err) + } + + // If we attempt to call this method again, then we should get a nil + // error, as there is no more state to be cleaned up. + err = chainArb.ResolveContract(channel.FundingOutpoint) + if err != nil { + t.Fatalf("second resolve call shouldn't fail: %v", err) + } +} diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 5817e85d..a6a76099 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -13155,11 +13155,17 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) { ctxt, _ := context.WithTimeout(ctxb, channelOpenTimeout) chanPoint := openChannelAndAssert( - ctxt, t, net, net.Alice, net.Bob, channelParam) + ctxt, t, net, net.Alice, net.Bob, channelParam, + ) + txid, err := lnd.GetChanPointFundingTxid(chanPoint) + if err != nil { + t.Fatalf("unable to get txid: %v", err) + } + chanPointStr := fmt.Sprintf("%v:%v", txid, chanPoint.OutputIndex) // Wait for channel to be confirmed open. ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) - err := net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint) + err = net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint) if err != nil { t.Fatalf("alice didn't report channel: %v", err) } @@ -13168,6 +13174,25 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("bob didn't report channel: %v", err) } + // Now that the channel is open, we'll obtain its channel ID real quick + // so we can use it to query the graph below. + listReq := &lnrpc.ListChannelsRequest{} + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + aliceChannelList, err := net.Alice.ListChannels(ctxt, listReq) + if err != nil { + t.Fatalf("unable to fetch alice's channels: %v", err) + } + var chanID uint64 + for _, channel := range aliceChannelList.Channels { + if channel.ChannelPoint == chanPointStr { + chanID = channel.ChanId + } + } + + if chanID == 0 { + t.Fatalf("unable to find channel") + } + // Send request to abandon channel. abandonChannelRequest := &lnrpc.AbandonChannelRequest{ ChannelPoint: chanPoint, @@ -13180,9 +13205,8 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) { } // Assert that channel in no longer open. - listReq := &lnrpc.ListChannelsRequest{} ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) - aliceChannelList, err := net.Alice.ListChannels(ctxt, listReq) + aliceChannelList, err = net.Alice.ListChannels(ctxt, listReq) if err != nil { t.Fatalf("unable to list channels: %v", err) } @@ -13230,9 +13254,26 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) { len(aliceClosedList.Channels)) } - // Now that we're done with the test, the channel can be closed. This is - // necessary to avoid unexpected outcomes of other tests that use Bob's - // lnd instance. + // Ensure that the channel can no longer be found in the channel graph. + _, err = net.Alice.GetChanInfo(ctxb, &lnrpc.ChanInfoRequest{ + ChanId: chanID, + }) + if !strings.Contains(err.Error(), "marked as zombie") { + t.Fatalf("channel shouldn't be found in the channel " + + "graph!") + } + + // Calling AbandonChannel again, should result in no new errors, as the + // channel has already been removed. + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + _, err = net.Alice.AbandonChannel(ctxt, abandonChannelRequest) + if err != nil { + t.Fatalf("unable to abandon channel a second time: %v", err) + } + + // Now that we're done with the test, the channel can be closed. This + // is necessary to avoid unexpected outcomes of other tests that use + // Bob's lnd instance. ctxt, _ = context.WithTimeout(ctxb, channelCloseTimeout) closeChannelAndAssert(ctxt, t, net, net.Bob, chanPoint, true) diff --git a/rpcserver.go b/rpcserver.go index 828ce7cf..837c2cae 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1947,6 +1947,28 @@ func createRPCCloseUpdate(update interface{}) ( return nil, errors.New("unknown close status update") } +// abandonChanFromGraph attempts to remove a channel from the channel graph. If +// we can't find the chanID in the graph, then we assume it has already been +// removed, and will return a nop. +func abandonChanFromGraph(chanGraph *channeldb.ChannelGraph, + chanPoint *wire.OutPoint) error { + + // First, we'll obtain the channel ID. If we can't locate this, then + // it's the case that the channel may have already been removed from + // the graph, so we'll return a nil error. + chanID, err := chanGraph.ChannelID(chanPoint) + switch { + case err == channeldb.ErrEdgeNotFound: + return nil + case err != nil: + return err + } + + // If the channel ID is still in the graph, then that means the channel + // is still open, so we'll now move to purge it from the graph. + return chanGraph.DeleteChannelEdges(chanID) +} + // AbandonChannel removes all channel state from the database except for a // close summary. This method can be used to get rid of permanently unusable // channels due to bugs fixed in newer versions of lnd. @@ -1970,42 +1992,70 @@ func (r *rpcServer) AbandonChannel(ctx context.Context, index := in.ChannelPoint.OutputIndex chanPoint := wire.NewOutPoint(txid, index) - // With the chanPoint constructed, we'll attempt to find the target - // channel in the database. If we can't find the channel, then we'll - // return the error back to the caller. - dbChan, err := r.server.chanDB.FetchChannel(*chanPoint) - if err != nil { - return nil, err - } - - // Now that we've found the channel, we'll populate a close summary for - // the channel, so we can store as much information for this abounded - // channel as possible. We also ensure that we set Pending to false, to - // indicate that this channel has been "fully" closed. + // When we remove the channel from the database, we need to set a close + // height, so we'll just use the current best known height. _, bestHeight, err := r.server.cc.chainIO.GetBestBlock() if err != nil { return nil, err } - summary := &channeldb.ChannelCloseSummary{ - CloseType: channeldb.Abandoned, - ChanPoint: *chanPoint, - ChainHash: dbChan.ChainHash, - CloseHeight: uint32(bestHeight), - RemotePub: dbChan.IdentityPub, - Capacity: dbChan.Capacity, - SettledBalance: dbChan.LocalCommitment.LocalBalance.ToSatoshis(), - ShortChanID: dbChan.ShortChanID(), - RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation, - RemoteNextRevocation: dbChan.RemoteNextRevocation, - LocalChanConfig: dbChan.LocalChanCfg, + + dbChan, err := r.server.chanDB.FetchChannel(*chanPoint) + switch { + // If the channel isn't found in the set of open channels, then we can + // continue on as it can't be loaded into the link/peer. + case err == channeldb.ErrChannelNotFound: + break + + // If the channel is still known to be open, then before we modify any + // on-disk state, we'll remove the channel from the switch and peer + // state if it's been loaded in. + case err == nil: + // We'll mark the channel as borked before we remove the state + // from the switch/peer so it won't be loaded back in if the + // peer reconnects. + if err := dbChan.MarkBorked(); err != nil { + return nil, err + } + remotePub := dbChan.IdentityPub + if peer, err := r.server.FindPeer(remotePub); err == nil { + if err := peer.WipeChannel(chanPoint); err != nil { + return nil, fmt.Errorf("unable to wipe "+ + "channel state: %v", err) + } + } + + default: + return nil, err } - // Finally, we'll close the channel in the DB, and return back to the - // caller. - err = dbChan.CloseChannel(summary) + // Abandoning a channel is a three step process: remove from the open + // channel state, remove from the graph, remove from the contract + // court. Between any step it's possible that the users restarts the + // process all over again. As a result, each of the steps below are + // intended to be idempotent. + err = r.server.chanDB.AbandonChannel(chanPoint, uint32(bestHeight)) if err != nil { return nil, err } + err = abandonChanFromGraph( + r.server.chanDB.ChannelGraph(), chanPoint, + ) + if err != nil { + return nil, err + } + err = r.server.chainArb.ResolveContract(*chanPoint) + if err != nil { + return nil, err + } + + // If this channel was in the process of being closed, but didn't fully + // close, then it's possible that the nursery is hanging on to some + // state. To err on the side of caution, we'll now attempt to wipe any + // state for this channel from the nursery. + err = r.server.utxoNursery.cfg.Store.RemoveChannel(chanPoint) + if err != nil && err != ErrContractNotFound { + return nil, err + } return &lnrpc.AbandonChannelResponse{}, nil }