From fa160f559cbd98c10f35d2cd5aead37bc25853ec Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 14 Dec 2018 16:35:07 -0800 Subject: [PATCH] multi: replace per channel sigPool with global daemon level sigPool In this commit, we remove the per channel `sigPool` within the `lnwallet.LightningChannel` struct. With this change, we ensure that as the number of channels grows, the number of gouroutines idling in the sigPool stays constant. It's the case that currently on the daemon, most channels are likely inactive, with only a hand full actually consistently carrying out channel updates. As a result, this change should reduce the amount of idle CPU usage, as we have less active goroutines in select loops. In order to make this change, the `SigPool` itself has been publicly exported such that outside callers can make a `SigPool` and pass it into newly created channels. Since the sig pool now lives outside the channel, we were also able to do away with the Stop() method on the channel all together. Finally, the server is the sub-system that is currently responsible for managing the `SigPool` within lnd. --- breacharbiter_test.go | 21 +-- contractcourt/chain_arbitrator.go | 7 +- fundingmanager.go | 6 +- htlcswitch/link.go | 1 - htlcswitch/link_test.go | 2 - htlcswitch/test_utils.go | 23 +++- lnwallet/channel.go | 156 ++++++++-------------- lnwallet/channel_test.go | 38 ++---- lnwallet/sigpool.go | 212 +++++++++++++++--------------- lnwallet/test_utils.go | 14 +- peer.go | 19 +-- rpcserver.go | 5 +- server.go | 12 +- test_utils.go | 9 +- 14 files changed, 239 insertions(+), 286 deletions(-) diff --git a/breacharbiter_test.go b/breacharbiter_test.go index 21d62658..65a5bc5b 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -1123,18 +1123,6 @@ func TestBreachHandoffFail(t *testing.T) { } defer cleanUpArb() - // Instantiate a second lightning channel for alice, using the state of - // her last channel. - aliceKeyPriv, _ := btcec.PrivKeyFromBytes(btcec.S256(), - alicesPrivKey) - aliceSigner := &mockSigner{aliceKeyPriv} - - alice2, err := lnwallet.NewLightningChannel(aliceSigner, nil, alice.State()) - if err != nil { - t.Fatalf("unable to create test channels: %v", err) - } - defer alice2.Stop() - // Signal a spend of the funding transaction and wait for the close // observer to exit. This time we are allowing the handoff to succeed. breach = &ContractBreachEvent{ @@ -1567,18 +1555,23 @@ func createInitChannels(revocationWindow int) (*lnwallet.LightningChannel, *lnwa aliceSigner := &mockSigner{aliceKeyPriv} bobSigner := &mockSigner{bobKeyPriv} + alicePool := lnwallet.NewSigPool(1, aliceSigner) channelAlice, err := lnwallet.NewLightningChannel( - aliceSigner, pCache, aliceChannelState, + aliceSigner, pCache, aliceChannelState, alicePool, ) if err != nil { return nil, nil, nil, err } + alicePool.Start() + + bobPool := lnwallet.NewSigPool(1, bobSigner) channelBob, err := lnwallet.NewLightningChannel( - bobSigner, pCache, bobChannelState, + bobSigner, pCache, bobChannelState, bobPool, ) if err != nil { return nil, nil, nil, err } + bobPool.Start() addr := &net.TCPAddr{ IP: net.ParseIP("127.0.0.1"), diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 5f8b161d..07eab9e1 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -3,10 +3,11 @@ package contractcourt import ( "errors" "fmt" - "github.com/lightningnetwork/lnd/sweep" "sync" "sync/atomic" + "github.com/lightningnetwork/lnd/sweep" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" @@ -238,11 +239,11 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, } chanMachine, err := lnwallet.NewLightningChannel( - c.cfg.Signer, c.cfg.PreimageDB, channel) + c.cfg.Signer, c.cfg.PreimageDB, channel, nil, + ) if err != nil { return nil, err } - chanMachine.Stop() if err := c.cfg.MarkLinkInactive(chanPoint); err != nil { log.Errorf("unable to mark link inactive: %v", err) diff --git a/fundingmanager.go b/fundingmanager.go index 2bf15789..c799139c 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -1681,13 +1681,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { // Go on adding the channel to the channel graph, and crafting // channel announcements. lnChannel, err := lnwallet.NewLightningChannel( - nil, nil, completeChan, + nil, nil, completeChan, nil, ) if err != nil { fndgLog.Errorf("failed creating lnChannel: %v", err) return } - defer lnChannel.Stop() err = f.sendFundingLocked( fmsg.peer, completeChan, lnChannel, shortChanID, @@ -1971,12 +1970,11 @@ func (f *fundingManager) handleFundingConfirmation(peer lnpeer.Peer, // We create the state-machine object which wraps the database state. lnChannel, err := lnwallet.NewLightningChannel( - nil, nil, completeChan, + nil, nil, completeChan, nil, ) if err != nil { return err } - defer lnChannel.Stop() chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 9bda1e65..1de54471 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -443,7 +443,6 @@ func (l *channelLink) Stop() { } l.updateFeeTimer.Stop() - l.channel.Stop() l.overflowQueue.Stop() close(l.quit) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index e3c07a03..517196db 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1592,8 +1592,6 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( cleanUp := func() { close(alicePeer.quit) defer fCleanUp() - defer aliceLink.Stop() - defer bobChannel.Stop() } return aliceLink, bobChannel, bticker.Force, start, cleanUp, restore, nil diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 358a586d..c29619f5 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -10,6 +10,7 @@ import ( "math/big" "net" "os" + "runtime" "sync/atomic" "testing" "time" @@ -369,18 +370,23 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte, preimageMap: make(map[[32]byte][]byte), } + alicePool := lnwallet.NewSigPool(runtime.NumCPU(), aliceSigner) channelAlice, err := lnwallet.NewLightningChannel( - aliceSigner, pCache, aliceChannelState, + aliceSigner, pCache, aliceChannelState, alicePool, ) if err != nil { return nil, nil, nil, nil, err } + alicePool.Start() + + bobPool := lnwallet.NewSigPool(runtime.NumCPU(), bobSigner) channelBob, err := lnwallet.NewLightningChannel( - bobSigner, pCache, bobChannelState, + bobSigner, pCache, bobChannelState, bobPool, ) if err != nil { return nil, nil, nil, nil, err } + bobPool.Start() // Now that the channel are open, simulate the start of a session by // having Alice and Bob extend their revocation windows to each other. @@ -402,6 +408,7 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte, restore := func() (*lnwallet.LightningChannel, *lnwallet.LightningChannel, error) { + aliceStoredChannels, err := dbAlice.FetchOpenChannels(aliceKeyPub) switch err { case nil: @@ -434,8 +441,9 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte, return nil, nil, errors.New("unable to find stored alice channel") } - newAliceChannel, err := lnwallet.NewLightningChannel(aliceSigner, - nil, aliceStoredChannel) + newAliceChannel, err := lnwallet.NewLightningChannel( + aliceSigner, nil, aliceStoredChannel, alicePool, + ) if err != nil { return nil, nil, errors.Errorf("unable to create new channel: %v", err) @@ -473,8 +481,9 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte, return nil, nil, errors.New("unable to find stored bob channel") } - newBobChannel, err := lnwallet.NewLightningChannel(bobSigner, - nil, bobStoredChannel) + newBobChannel, err := lnwallet.NewLightningChannel( + bobSigner, nil, bobStoredChannel, bobPool, + ) if err != nil { return nil, nil, errors.Errorf("unable to create new channel: %v", err) @@ -485,7 +494,7 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte, return channelAlice, channelBob, cleanUpFunc, restore, nil } -// getChanID retrieves the channel point from nwire message. +// getChanID retrieves the channel point from an lnnwire message. func getChanID(msg lnwire.Message) (lnwire.ChannelID, error) { var chanID lnwire.ChannelID switch msg := msg.(type) { diff --git a/lnwallet/channel.go b/lnwallet/channel.go index cdd2baac..4cd21cf0 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -5,10 +5,8 @@ import ( "container/list" "crypto/sha256" "fmt" - "runtime" "sort" "sync" - "sync/atomic" "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -1260,8 +1258,6 @@ func compactLogs(ourLog, theirLog *updateLog, // // See the individual comments within the above methods for further details. type LightningChannel struct { - shutdown int32 // To be used atomically. - // Signer is the main signer instances that will be responsible for // signing any HTLC and commitment transaction generated by the state // machine. @@ -1282,7 +1278,7 @@ type LightningChannel struct { // validating signatures in parallel. This is utilized as an // optimization to void serially signing or validating the HTLC // signatures, of which there may be hundreds. - sigPool *sigPool + sigPool *SigPool // pCache is the global preimage cache shared across all other // LightningChannel instance. We'll use this cache either when we force @@ -1347,11 +1343,6 @@ type LightningChannel struct { RemoteFundingKey *btcec.PublicKey sync.RWMutex - - cowg sync.WaitGroup - wg sync.WaitGroup - - quit chan struct{} } // NewLightningChannel creates a new, active payment channel given an @@ -1360,7 +1351,8 @@ type LightningChannel struct { // automatically persist pertinent state to the database in an efficient // manner. func NewLightningChannel(signer Signer, pCache PreimageCache, - state *channeldb.OpenChannel) (*LightningChannel, error) { + state *channeldb.OpenChannel, + sigPool *SigPool) (*LightningChannel, error) { localCommit := state.LocalCommitment remoteCommit := state.RemoteCommitment @@ -1375,9 +1367,8 @@ func NewLightningChannel(signer Signer, pCache PreimageCache, ) lc := &LightningChannel{ - // TODO(roasbeef): tune num sig workers? - sigPool: newSigPool(runtime.NumCPU(), signer), Signer: signer, + sigPool: sigPool, pCache: pCache, currentHeight: localCommit.CommitHeight, remoteCommitChain: newCommitmentChain(), @@ -1391,7 +1382,6 @@ func NewLightningChannel(signer Signer, pCache PreimageCache, Capacity: state.Capacity, LocalFundingKey: state.LocalChanCfg.MultiSigKey.PubKey, RemoteFundingKey: state.RemoteChanCfg.MultiSigKey.PubKey, - quit: make(chan struct{}), } // With the main channel struct reconstructed, we'll now restore the @@ -1411,12 +1401,6 @@ func NewLightningChannel(signer Signer, pCache PreimageCache, lc.createStateHintObfuscator() - // Finally, we'll kick of the signature job pool to handle any upcoming - // commitment state generation and validation. - if err := lc.sigPool.Start(); err != nil { - return nil, err - } - return lc, nil } @@ -1467,23 +1451,6 @@ func (lc *LightningChannel) createStateHintObfuscator() { } } -// Stop gracefully shuts down any active goroutines spawned by the -// LightningChannel during regular duties. -func (lc *LightningChannel) Stop() { - if !atomic.CompareAndSwapInt32(&lc.shutdown, 0, 1) { - return - } - - lc.sigPool.Stop() - - close(lc.quit) -} - -// WaitForClose blocks until the channel's close observer has terminated. -func (lc *LightningChannel) WaitForClose() { - lc.cowg.Wait() -} - // ResetState resets the state of the channel back to the default state. This // ensures that any active goroutines which need to act based on on-chain // events do so properly. @@ -2673,7 +2640,7 @@ func processRemoveEntry(htlc *PaymentDescriptor, ourBalance, // asynchronously and in parallel. func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, localChanCfg, remoteChanCfg *channeldb.ChannelConfig, - remoteCommitView *commitment) ([]signJob, chan struct{}, error) { + remoteCommitView *commitment) ([]SignJob, chan struct{}, error) { txHash := remoteCommitView.txn.TxHash() dustLimit := remoteChanCfg.DustLimit @@ -2684,7 +2651,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, // smaller (than its total capacity) and some HTLCs may be dust. numSigs := (len(remoteCommitView.incomingHTLCs) + len(remoteCommitView.outgoingHTLCs)) - sigBatch := make([]signJob, 0, numSigs) + sigBatch := make([]SignJob, 0, numSigs) var err error cancelChan := make(chan struct{}) @@ -2700,9 +2667,9 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, // If the HTLC isn't dust, then we'll create an empty sign job // to add to the batch momentarily. - sigJob := signJob{} - sigJob.cancel = cancelChan - sigJob.resp = make(chan signJobResp, 1) + sigJob := SignJob{} + sigJob.Cancel = cancelChan + sigJob.Resp = make(chan SignJobResp, 1) // As this is an incoming HTLC and we're sinning the commitment // transaction of the remote node, we'll need to generate an @@ -2718,7 +2685,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, Hash: txHash, Index: uint32(htlc.remoteOutputIndex), } - sigJob.tx, err = createHtlcTimeoutTx( + sigJob.Tx, err = createHtlcTimeoutTx( op, outputAmt, htlc.Timeout, uint32(remoteChanCfg.CsvDelay), keyRing.RevocationKey, keyRing.DelayKey, @@ -2730,7 +2697,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, // Finally, we'll generate a sign descriptor to generate a // signature to give to the remote party for this commitment // transaction. Note we use the raw HTLC amount. - sigJob.signDesc = SignDescriptor{ + sigJob.SignDesc = SignDescriptor{ KeyDesc: localChanCfg.HtlcBasePoint, SingleTweak: keyRing.LocalHtlcKeyTweak, WitnessScript: htlc.theirWitnessScript, @@ -2738,10 +2705,10 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, Value: int64(htlc.Amount.ToSatoshis()), }, HashType: txscript.SigHashAll, - SigHashes: txscript.NewTxSigHashes(sigJob.tx), + SigHashes: txscript.NewTxSigHashes(sigJob.Tx), InputIndex: 0, } - sigJob.outputIndex = htlc.remoteOutputIndex + sigJob.OutputIndex = htlc.remoteOutputIndex sigBatch = append(sigBatch, sigJob) } @@ -2751,9 +2718,9 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, continue } - sigJob := signJob{} - sigJob.cancel = cancelChan - sigJob.resp = make(chan signJobResp, 1) + sigJob := SignJob{} + sigJob.Cancel = cancelChan + sigJob.Resp = make(chan SignJobResp, 1) // As this is an outgoing HTLC and we're signing the commitment // transaction of the remote node, we'll need to generate an @@ -2770,7 +2737,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, Hash: txHash, Index: uint32(htlc.remoteOutputIndex), } - sigJob.tx, err = createHtlcSuccessTx( + sigJob.Tx, err = createHtlcSuccessTx( op, outputAmt, uint32(remoteChanCfg.CsvDelay), keyRing.RevocationKey, keyRing.DelayKey, ) @@ -2781,7 +2748,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, // Finally, we'll generate a sign descriptor to generate a // signature to give to the remote party for this commitment // transaction. Note we use the raw HTLC amount. - sigJob.signDesc = SignDescriptor{ + sigJob.SignDesc = SignDescriptor{ KeyDesc: localChanCfg.HtlcBasePoint, SingleTweak: keyRing.LocalHtlcKeyTweak, WitnessScript: htlc.theirWitnessScript, @@ -2789,10 +2756,10 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, Value: int64(htlc.Amount.ToSatoshis()), }, HashType: txscript.SigHashAll, - SigHashes: txscript.NewTxSigHashes(sigJob.tx), + SigHashes: txscript.NewTxSigHashes(sigJob.Tx), InputIndex: 0, } - sigJob.outputIndex = htlc.remoteOutputIndex + sigJob.OutputIndex = htlc.remoteOutputIndex sigBatch = append(sigBatch, sigJob) } @@ -3059,26 +3026,23 @@ func (lc *LightningChannel) SignNextCommitment() (lnwire.Sig, []lnwire.Sig, erro // order as they appear on the commitment transaction after BIP 69 // sorting. sort.Slice(sigBatch, func(i, j int) bool { - return sigBatch[i].outputIndex < sigBatch[j].outputIndex + return sigBatch[i].OutputIndex < sigBatch[j].OutputIndex }) // With the jobs sorted, we'll now iterate through all the responses to // gather each of the signatures in order. htlcSigs = make([]lnwire.Sig, 0, len(sigBatch)) for _, htlcSigJob := range sigBatch { - select { - case jobResp := <-htlcSigJob.resp: - // If an error occurred, then we'll cancel any other - // active jobs. - if jobResp.err != nil { - close(cancelChan) - return sig, htlcSigs, err - } + jobResp := <-htlcSigJob.Resp - htlcSigs = append(htlcSigs, jobResp.sig) - case <-lc.quit: - return sig, htlcSigs, fmt.Errorf("channel shutting down") + // If an error occurred, then we'll cancel any other active + // jobs. + if jobResp.Err != nil { + close(cancelChan) + return sig, htlcSigs, err } + + htlcSigs = append(htlcSigs, jobResp.Sig) } // As we're about to proposer a new commitment state for the remote @@ -3740,7 +3704,7 @@ func (lc *LightningChannel) validateCommitmentSanity(theirLogCounter, // directly into the pool of workers. func genHtlcSigValidationJobs(localCommitmentView *commitment, keyRing *CommitmentKeyRing, htlcSigs []lnwire.Sig, - localChanCfg, remoteChanCfg *channeldb.ChannelConfig) ([]verifyJob, error) { + localChanCfg, remoteChanCfg *channeldb.ChannelConfig) ([]VerifyJob, error) { txHash := localCommitmentView.txn.TxHash() feePerKw := localCommitmentView.feePerKw @@ -3751,7 +3715,7 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment, // length will be smaller than the total capacity. numHtlcs := (len(localCommitmentView.incomingHTLCs) + len(localCommitmentView.outgoingHTLCs)) - verifyJobs := make([]verifyJob, 0, numHtlcs) + verifyJobs := make([]VerifyJob, 0, numHtlcs) // We'll iterate through each output in the commitment transaction, // populating the sigHash closure function if it's detected to be an @@ -3879,11 +3843,11 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment, continue } - verifyJobs = append(verifyJobs, verifyJob{ - htlcIndex: htlcIndex, - pubKey: keyRing.RemoteHtlcKey, - sig: sig, - sigHash: sigHash, + verifyJobs = append(verifyJobs, VerifyJob{ + HtlcIndex: htlcIndex, + PubKey: keyRing.RemoteHtlcKey, + Sig: sig, + SigHash: sigHash, }) i++ @@ -4083,34 +4047,30 @@ func (lc *LightningChannel) ReceiveNewCommitment(commitSig lnwire.Sig, for i := 0; i < len(verifyJobs); i++ { // In the case that a single signature is invalid, we'll exit // early and cancel all the outstanding verification jobs. - select { - case htlcErr := <-verifyResps: - if htlcErr != nil { - close(cancelChan) + htlcErr := <-verifyResps + if htlcErr != nil { + close(cancelChan) - sig, err := lnwire.NewSigFromSignature( - htlcErr.sig, - ) - if err != nil { - return err - } - sigHash, err := htlcErr.sigHash() - if err != nil { - return err - } - - var txBytes bytes.Buffer - localCommitTx.Serialize(&txBytes) - return &InvalidHtlcSigError{ - commitHeight: nextHeight, - htlcSig: sig.ToSignatureBytes(), - htlcIndex: htlcErr.htlcIndex, - sigHash: sigHash, - commitTx: txBytes.Bytes(), - } + sig, err := lnwire.NewSigFromSignature( + htlcErr.Sig, + ) + if err != nil { + return err + } + sigHash, err := htlcErr.SigHash() + if err != nil { + return err + } + + var txBytes bytes.Buffer + localCommitTx.Serialize(&txBytes) + return &InvalidHtlcSigError{ + commitHeight: nextHeight, + htlcSig: sig.ToSignatureBytes(), + htlcIndex: htlcErr.HtlcIndex, + sigHash: sigHash, + commitTx: txBytes.Bytes(), } - case <-lc.quit: - return fmt.Errorf("channel shutting down") } } diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index 24701f28..e3cd3e15 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -1444,14 +1444,18 @@ func TestStateUpdatePersistence(t *testing.T) { if err != nil { t.Fatalf("unable to fetch channel: %v", err) } + aliceChannelNew, err := NewLightningChannel( aliceChannel.Signer, nil, aliceChannels[0], + aliceChannel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) } + bobChannelNew, err := NewLightningChannel( bobChannel.Signer, nil, bobChannels[0], + bobChannel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) @@ -2544,20 +2548,19 @@ func TestChanSyncFullySynced(t *testing.T) { if err != nil { t.Fatalf("unable to fetch channel: %v", err) } + aliceChannelNew, err := NewLightningChannel( - aliceChannel.Signer, nil, aliceChannels[0], + aliceChannel.Signer, nil, aliceChannels[0], aliceChannel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) } - defer aliceChannelNew.Stop() bobChannelNew, err := NewLightningChannel( - bobChannel.Signer, nil, bobChannels[0], + bobChannel.Signer, nil, bobChannels[0], bobChannel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) } - defer bobChannelNew.Stop() assertNoChanSyncNeeded(t, aliceChannelNew, bobChannelNew) } @@ -2576,6 +2579,7 @@ func restartChannel(channelOld *LightningChannel) (*LightningChannel, error) { channelNew, err := NewLightningChannel( channelOld.Signer, channelOld.pCache, nodeChannels[0], + channelOld.sigPool, ) if err != nil { return nil, err @@ -2786,7 +2790,6 @@ func TestChanSyncOweCommitment(t *testing.T) { if err != nil { t.Fatalf("unable to restart alice: %v", err) } - defer aliceChannel.Stop() assertAliceCommitRetransmit() // TODO(roasbeef): restart bob as well??? @@ -3053,7 +3056,6 @@ func TestChanSyncOweRevocation(t *testing.T) { if err != nil { t.Fatalf("unable to restart alice: %v", err) } - defer aliceChannel.Stop() assertAliceOwesRevoke() // TODO(roasbeef): restart bob too??? @@ -3233,7 +3235,6 @@ func TestChanSyncOweRevocationAndCommit(t *testing.T) { if err != nil { t.Fatalf("unable to restart channel: %v", err) } - defer bobChannel.Stop() assertBobSendsRevokeAndCommit() // We'll now finish the state transition by having Alice process both @@ -3428,7 +3429,6 @@ func TestChanSyncOweRevocationAndCommitForceTransition(t *testing.T) { if err != nil { t.Fatalf("unable to restart channel: %v", err) } - defer bobChannel.Stop() if len(bobMsgsToSend) != 2 { t.Fatalf("expected bob to send %v messages, instead "+ "sends: %v", 2, spew.Sdump(bobMsgsToSend)) @@ -3775,12 +3775,10 @@ func TestChannelRetransmissionFeeUpdate(t *testing.T) { if err != nil { t.Fatalf("unable to restart alice: %v", err) } - defer aliceChannel.Stop() bobChannel, err = restartChannel(bobChannel) if err != nil { t.Fatalf("unable to restart channel: %v", err) } - defer bobChannel.Stop() // Bob doesn't get this message so upon reconnection, they need to // synchronize. Alice should conclude that she owes Bob a commitment, @@ -4293,12 +4291,10 @@ func TestLockedInHtlcForwardingSkipAfterRestart(t *testing.T) { if err != nil { t.Fatalf("unable to restart alice: %v", err) } - defer aliceChannel.Stop() bobChannel, err = restartChannel(bobChannel) if err != nil { t.Fatalf("unable to restart bob: %v", err) } - defer bobChannel.Stop() // With both nodes restarted, Bob will now attempt to cancel one of // Alice's HTLC's. @@ -4399,12 +4395,10 @@ func TestLockedInHtlcForwardingSkipAfterRestart(t *testing.T) { if err != nil { t.Fatalf("unable to restart alice: %v", err) } - defer aliceChannel.Stop() bobChannel, err = restartChannel(bobChannel) if err != nil { t.Fatalf("unable to restart bob: %v", err) } - defer bobChannel.Stop() // Readd the Fail to both Alice and Bob's channels, as the non-committed // update will not have survived the restart. @@ -5590,19 +5584,19 @@ func TestChannelRestoreUpdateLogs(t *testing.T) { // the update logs up to the correct state set up above. newAliceChannel, err := NewLightningChannel( aliceChannel.Signer, nil, aliceChannel.channelState, + aliceChannel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) } - defer newAliceChannel.Stop() newBobChannel, err := NewLightningChannel( bobChannel.Signer, nil, bobChannel.channelState, + bobChannel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) } - defer newBobChannel.Stop() // compare all the logs between the old and new channels, to make sure // they all got restored properly. @@ -5669,13 +5663,14 @@ func assertInLogs(t *testing.T, channel *LightningChannel, numAddsLocal, // expected state. func restoreAndAssert(t *testing.T, channel *LightningChannel, numAddsLocal, numFailsLocal, numAddsRemote, numFailsRemote int) { + newChannel, err := NewLightningChannel( channel.Signer, nil, channel.channelState, + channel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) } - defer newChannel.Stop() assertInLog(t, newChannel.localUpdateLog, numAddsLocal, numFailsLocal) assertInLog(t, newChannel.remoteUpdateLog, numAddsRemote, numFailsRemote) @@ -5863,12 +5858,10 @@ func TestDuplicateFailRejection(t *testing.T) { if err != nil { t.Fatalf("unable to restart channel: %v", err) } - defer bobChannel.Stop() aliceChannel, err = restartChannel(aliceChannel) if err != nil { t.Fatalf("unable to restart channel: %v", err) } - defer aliceChannel.Stop() // If we try to fail the same HTLC again, then we should get an error. err = bobChannel.FailHTLC(0, []byte("failreason"), nil, nil, nil) @@ -5945,12 +5938,10 @@ func TestDuplicateSettleRejection(t *testing.T) { if err != nil { t.Fatalf("unable to restart channel: %v", err) } - defer bobChannel.Stop() aliceChannel, err = restartChannel(aliceChannel) if err != nil { t.Fatalf("unable to restart channel: %v", err) } - defer aliceChannel.Stop() // If we try to fail the same HTLC again, then we should get an error. err = bobChannel.SettleHTLC(alicePreimage, uint64(0), nil, nil, nil) @@ -5983,9 +5974,9 @@ func TestChannelRestoreCommitHeight(t *testing.T) { channel *LightningChannel, remoteLog bool, htlcIndex uint64, expLocal, expRemote uint64) *LightningChannel { - channel.Stop() newChannel, err := NewLightningChannel( channel.Signer, nil, channel.channelState, + channel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) @@ -6152,7 +6143,4 @@ func TestChannelRestoreCommitHeight(t *testing.T) { // HTLC an add height. bobChannel = restoreAndAssertCommitHeights(t, bobChannel, true, 0, 2, 1) bobChannel = restoreAndAssertCommitHeights(t, bobChannel, true, 1, 2, 2) - - aliceChannel.Stop() - bobChannel.Stop() } diff --git a/lnwallet/sigpool.go b/lnwallet/sigpool.go index 7e8015fc..c30688a7 100644 --- a/lnwallet/sigpool.go +++ b/lnwallet/sigpool.go @@ -19,119 +19,118 @@ const ( // TODO(roasbeef): job buffer pool? ) -// verifyJob is a job sent to the sigPool to verify a signature on a -// transaction. The items contained in the struct are necessary and sufficient -// to verify the full signature. The passed sigHash closure function should be -// set to a function that generates the relevant sighash. +// VerifyJob is a job sent to the sigPool sig pool to verify a signature +// on a transaction. The items contained in the struct are necessary and +// sufficient to verify the full signature. The passed sigHash closure function +// should be set to a function that generates the relevant sighash. // // TODO(roasbeef): when we move to ecschnorr, make into batch signature // verification using bos-coster (or pip?). -type verifyJob struct { - // pubKey is the public key that was used to generate the purported +type VerifyJob struct { + // PubKey is the public key that was used to generate the purported // valid signature. Note that with the current channel construction, // this public key will likely have been tweaked using the current per // commitment point for a particular commitment transactions. - pubKey *btcec.PublicKey + PubKey *btcec.PublicKey - // sig is the raw signature generated using the above public key. This + // Sig is the raw signature generated using the above public key. This // is the signature to be verified. - sig *btcec.Signature + Sig *btcec.Signature - // sigHash is a function closure generates the sighashes that the + // SigHash is a function closure generates the sighashes that the // passed signature is known to have signed. - sigHash func() ([]byte, error) + SigHash func() ([]byte, error) - // htlcIndex is the index of the HTLC from the PoV of the remote + // HtlcIndex is the index of the HTLC from the PoV of the remote // party's update log. - htlcIndex uint64 + HtlcIndex uint64 - // cancel is a channel that should be closed if the caller wishes to + // Cancel is a channel that should be closed if the caller wishes to // cancel all pending verification jobs part of a single batch. This // channel is to be closed in the case that a single signature in a // batch has been returned as invalid, as there is no need to verify // the remainder of the signatures. - cancel chan struct{} + Cancel chan struct{} - // errResp is the channel that the result of the signature verification + // ErrResp is the channel that the result of the signature verification // is to be sent over. In the see that the signature is valid, a nil // error will be passed. Otherwise, a concrete error detailing the // issue will be passed. - errResp chan *htlcIndexErr + ErrResp chan *HtlcIndexErr } -// verifyJobErr is a special type of error that also includes a pointer to the -// original validation job. Ths error message allows us to craft more detailed +// HtlcIndexErr is a special type of error that also includes a pointer to the +// original validation job. This error message allows us to craft more detailed // errors at upper layers. -type htlcIndexErr struct { +type HtlcIndexErr struct { error - *verifyJob + *VerifyJob } -// signJob is a job sent to the sigPool to generate a valid signature according -// to the passed SignDescriptor for the passed transaction. Jobs are intended -// to be sent in batches in order to parallelize the job of generating -// signatures for a new commitment transaction. -type signJob struct { - // signDesc is intended to be a full populated SignDescriptor which +// SignJob is a job sent to the sigPool sig pool to generate a valid +// signature according to the passed SignDescriptor for the passed transaction. +// Jobs are intended to be sent in batches in order to parallelize the job of +// generating signatures for a new commitment transaction. +type SignJob struct { + // SignDesc is intended to be a full populated SignDescriptor which // encodes the necessary material (keys, witness script, etc) required // to generate a valid signature for the specified input. - signDesc SignDescriptor + SignDesc SignDescriptor - // tx is the transaction to be signed. This is required to generate the + // Tx is the transaction to be signed. This is required to generate the // proper sighash for the input to be signed. - tx *wire.MsgTx + Tx *wire.MsgTx - // outputIndex is the output index of the HTLC on the commitment + // OutputIndex is the output index of the HTLC on the commitment // transaction being signed. - outputIndex int32 + OutputIndex int32 - // cancel is a channel that should be closed if the caller wishes to + // Cancel is a channel that should be closed if the caller wishes to // abandon all pending sign jobs part of a single batch. - cancel chan struct{} + Cancel chan struct{} - // resp is the channel that the response to this particular signJob + // Resp is the channel that the response to this particular SignJob // will be sent over. // // TODO(roasbeef): actually need to allow caller to set, need to retain // order mark commit sig as special - resp chan signJobResp + Resp chan SignJobResp } -// signJobResp is the response to a sign job. Both channels are to be read in +// SignJobResp is the response to a sign job. Both channels are to be read in // order to ensure no unnecessary goroutine blocking occurs. Additionally, both // channels should be buffered. -type signJobResp struct { - // sig is the generated signature for a particular signJob In the case +type SignJobResp struct { + // Sig is the generated signature for a particular SignJob In the case // of an error during signature generation, then this value sent will // be nil. - sig lnwire.Sig + Sig lnwire.Sig - // err is the error that occurred when executing the specified + // Err is the error that occurred when executing the specified // signature job. In the case that no error occurred, this value will // be nil. - err error + Err error } -// sigPool is a struct that is meant to allow the current channel state machine -// to parallelize all signature generation and verification. This struct is -// needed as _each_ HTLC when creating a commitment transaction requires a -// signature, and similarly a receiver of a new commitment must verify all the -// HTLC signatures included within the CommitSig message. A pool of workers -// will be maintained by the sigPool. Batches of jobs (either to sign or -// verify) can be sent to the pool of workers which will asynchronously perform -// the specified job. -// -// TODO(roasbeef): rename? -// * ecdsaPool? -type sigPool struct { +// TODO(roasbeef); fix description + +// SigPool is a struct that is meant to allow the current channel state +// machine to parallelize all signature generation and verification. This +// struct is needed as _each_ HTLC when creating a commitment transaction +// requires a signature, and similarly a receiver of a new commitment must +// verify all the HTLC signatures included within the CommitSig message. A pool +// of workers will be maintained by the sigPool. Batches of jobs (either +// to sign or verify) can be sent to the pool of workers which will +// asynchronously perform the specified job. +type SigPool struct { started uint32 // To be used atomically. stopped uint32 // To be used atomically. signer Signer - verifyJobs chan verifyJob - signJobs chan signJob + verifyJobs chan VerifyJob + signJobs chan SignJob wg sync.WaitGroup quit chan struct{} @@ -139,22 +138,22 @@ type sigPool struct { numWorkers int } -// newSigPool creates a new signature pool with the specified number of +// NewSigPool creates a new signature pool with the specified number of // workers. The recommended parameter for the number of works is the number of // physical CPU cores available on the target machine. -func newSigPool(numWorkers int, signer Signer) *sigPool { - return &sigPool{ +func NewSigPool(numWorkers int, signer Signer) *SigPool { + return &SigPool{ signer: signer, numWorkers: numWorkers, - verifyJobs: make(chan verifyJob, jobBuffer), - signJobs: make(chan signJob, jobBuffer), + verifyJobs: make(chan VerifyJob, jobBuffer), + signJobs: make(chan SignJob, jobBuffer), quit: make(chan struct{}), } } -// Start starts of all goroutines that the sigPool needs to carry out its -// duties. -func (s *sigPool) Start() error { +// Start starts of all goroutines that the sigPool sig pool needs to +// carry out its duties. +func (s *SigPool) Start() error { if !atomic.CompareAndSwapUint32(&s.started, 0, 1) { return nil } @@ -169,7 +168,7 @@ func (s *sigPool) Start() error { // Stop signals any active workers carrying out jobs to exit so the sigPool can // gracefully shutdown. -func (s *sigPool) Stop() error { +func (s *SigPool) Stop() error { if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { return nil } @@ -180,11 +179,11 @@ func (s *sigPool) Stop() error { return nil } -// poolWorker is the main worker goroutine within the sigPool. Individual -// batches are distributed amongst each of the active workers. The workers then -// execute the task based on the type of job, and return the result back to -// caller. -func (s *sigPool) poolWorker() { +// poolWorker is the main worker goroutine within the sigPool sig pool. +// Individual batches are distributed amongst each of the active workers. The +// workers then execute the task based on the type of job, and return the +// result back to caller. +func (s *SigPool) poolWorker() { defer s.wg.Done() for { @@ -195,16 +194,17 @@ func (s *sigPool) poolWorker() { // send the result along with a possible error back to the // caller. case sigMsg := <-s.signJobs: - rawSig, err := s.signer.SignOutputRaw(sigMsg.tx, - &sigMsg.signDesc) + rawSig, err := s.signer.SignOutputRaw( + sigMsg.Tx, &sigMsg.SignDesc, + ) if err != nil { select { - case sigMsg.resp <- signJobResp{ - sig: lnwire.Sig{}, - err: err, + case sigMsg.Resp <- SignJobResp{ + Sig: lnwire.Sig{}, + Err: err, }: continue - case <-sigMsg.cancel: + case <-sigMsg.Cancel: continue case <-s.quit: return @@ -213,11 +213,11 @@ func (s *sigPool) poolWorker() { sig, err := lnwire.NewSigFromRawSignature(rawSig) select { - case sigMsg.resp <- signJobResp{ - sig: sig, - err: err, + case sigMsg.Resp <- SignJobResp{ + Sig: sig, + Err: err, }: - case <-sigMsg.cancel: + case <-sigMsg.Cancel: continue case <-s.quit: return @@ -227,58 +227,60 @@ func (s *sigPool) poolWorker() { // world. We'll attempt to construct the sighash, parse the // signature, and finally verify the signature. case verifyMsg := <-s.verifyJobs: - sigHash, err := verifyMsg.sigHash() + sigHash, err := verifyMsg.SigHash() if err != nil { select { - case verifyMsg.errResp <- &htlcIndexErr{ + case verifyMsg.ErrResp <- &HtlcIndexErr{ error: err, - verifyJob: &verifyMsg, + VerifyJob: &verifyMsg, }: continue - case <-verifyMsg.cancel: + case <-verifyMsg.Cancel: continue } } - rawSig := verifyMsg.sig + rawSig := verifyMsg.Sig - if !rawSig.Verify(sigHash, verifyMsg.pubKey) { + if !rawSig.Verify(sigHash, verifyMsg.PubKey) { err := fmt.Errorf("invalid signature "+ - "sighash: %x, sig: %x", sigHash, rawSig.Serialize()) + "sighash: %x, sig: %x", sigHash, + rawSig.Serialize()) + select { - case verifyMsg.errResp <- &htlcIndexErr{ + case verifyMsg.ErrResp <- &HtlcIndexErr{ error: err, - verifyJob: &verifyMsg, + VerifyJob: &verifyMsg, }: - case <-verifyMsg.cancel: + case <-verifyMsg.Cancel: case <-s.quit: return } } else { select { - case verifyMsg.errResp <- nil: - case <-verifyMsg.cancel: + case verifyMsg.ErrResp <- nil: + case <-verifyMsg.Cancel: case <-s.quit: return } } - // The sigPool is exiting, so we will as well. + // The sigPool sig pool is exiting, so we will as well. case <-s.quit: return } } } -// SubmitSignBatch submits a batch of signature jobs to the sigPool. The -// response and cancel channels for each of the signJob's are expected to be -// fully populated, as the response for each job will be sent over the response -// channel within the job itself. -func (s *sigPool) SubmitSignBatch(signJobs []signJob) { +// SubmitSignBatch submits a batch of signature jobs to the sigPool. The +// response and cancel channels for each of the SignJob's are expected to be +// fully populated, as the response for each job will be sent over the +// response channel within the job itself. +func (s *SigPool) SubmitSignBatch(signJobs []SignJob) { for _, job := range signJobs { select { case s.signJobs <- job: - case <-job.cancel: + case <-job.Cancel: // TODO(roasbeef): return error? case <-s.quit: return @@ -291,18 +293,18 @@ func (s *sigPool) SubmitSignBatch(signJobs []signJob) { // denoting if signature verification was valid or not. The passed cancelChan // allows the caller to cancel all pending jobs in the case that they wish to // bail early. -func (s *sigPool) SubmitVerifyBatch(verifyJobs []verifyJob, - cancelChan chan struct{}) <-chan *htlcIndexErr { +func (s *SigPool) SubmitVerifyBatch(verifyJobs []VerifyJob, + cancelChan chan struct{}) <-chan *HtlcIndexErr { - errChan := make(chan *htlcIndexErr, len(verifyJobs)) + errChan := make(chan *HtlcIndexErr, len(verifyJobs)) for _, job := range verifyJobs { - job.cancel = cancelChan - job.errResp = errChan + job.Cancel = cancelChan + job.ErrResp = errChan select { case s.verifyJobs <- job: - case <-job.cancel: + case <-job.Cancel: return errChan } } diff --git a/lnwallet/test_utils.go b/lnwallet/test_utils.go index c11aa69d..60e98d3b 100644 --- a/lnwallet/test_utils.go +++ b/lnwallet/test_utils.go @@ -309,18 +309,24 @@ func CreateTestChannels() (*LightningChannel, *LightningChannel, func(), error) } // TODO(roasbeef): make mock version of pre-image store + + alicePool := NewSigPool(1, aliceSigner) channelAlice, err := NewLightningChannel( - aliceSigner, pCache, aliceChannelState, + aliceSigner, pCache, aliceChannelState, alicePool, ) if err != nil { return nil, nil, nil, err } + alicePool.Start() + + bobPool := NewSigPool(1, bobSigner) channelBob, err := NewLightningChannel( - bobSigner, pCache, bobChannelState, + bobSigner, pCache, bobChannelState, bobPool, ) if err != nil { return nil, nil, nil, err } + bobPool.Start() err = SetStateNumHint( aliceCommitTx, 0, channelAlice.stateHintObfuscator, @@ -346,8 +352,8 @@ func CreateTestChannels() (*LightningChannel, *LightningChannel, func(), error) os.RemoveAll(bobPath) os.RemoveAll(alicePath) - channelAlice.Stop() - channelBob.Stop() + alicePool.Stop() + bobPool.Stop() } // Now that the channel are open, simulate the start of a session by diff --git a/peer.go b/peer.go index 90ecec10..a4963de9 100644 --- a/peer.go +++ b/peer.go @@ -383,6 +383,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { for _, dbChan := range chans { lnChan, err := lnwallet.NewLightningChannel( p.server.cc.signer, p.server.witnessBeacon, dbChan, + p.server.sigPool, ) if err != nil { return err @@ -400,7 +401,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { if dbChan.ChanStatus() != channeldb.Default { peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+ "start.", chanPoint, dbChan.ChanStatus()) - lnChan.Stop() continue } @@ -409,13 +409,11 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { if _, ok := p.failedChannels[chanID]; ok { peerLog.Warnf("ChannelPoint(%v) is failed, won't "+ "start.", chanPoint) - lnChan.Stop() continue } _, currentHeight, err := p.server.cc.chainIO.GetBestBlock() if err != nil { - lnChan.Stop() return err } @@ -425,7 +423,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { graph := p.server.chanDB.ChannelGraph() info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint) if err != nil && err != channeldb.ErrEdgeNotFound { - lnChan.Stop() return err } @@ -473,7 +470,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { *chanPoint, ) if err != nil { - lnChan.Stop() return err } @@ -483,7 +479,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { currentHeight, true, ) if err != nil { - lnChan.Stop() return fmt.Errorf("unable to add link %v to switch: %v", chanPoint, err) } @@ -1596,7 +1591,7 @@ out: // easily according to its channel ID. lnChan, err := lnwallet.NewLightningChannel( p.server.cc.signer, p.server.witnessBeacon, - newChan, + newChan, p.server.sigPool, ) if err != nil { p.activeChanMtx.Unlock() @@ -1624,7 +1619,6 @@ out: "block: %v", err) peerLog.Errorf(err.Error()) - lnChan.Stop() newChanReq.err <- err continue } @@ -1636,7 +1630,6 @@ out: "chain events: %v", err) peerLog.Errorf(err.Error()) - lnChan.Stop() newChanReq.err <- err continue } @@ -1666,7 +1659,6 @@ out: p.PubKey()) peerLog.Errorf(err.Error()) - lnChan.Stop() newChanReq.err <- err continue } @@ -2019,8 +2011,6 @@ func (p *peer) finalizeChanClosure(chanCloser *channelCloser) { } } - chanCloser.cfg.channel.Stop() - // Next, we'll launch a goroutine which will request to be notified by // the ChainNotifier once the closure transaction obtains a single // confirmation. @@ -2121,10 +2111,7 @@ func (p *peer) WipeChannel(chanPoint *wire.OutPoint) error { chanID := lnwire.NewChanIDFromOutPoint(chanPoint) p.activeChanMtx.Lock() - if channel, ok := p.activeChannels[chanID]; ok { - channel.Stop() - delete(p.activeChannels, chanID) - } + delete(p.activeChannels, chanID) p.activeChanMtx.Unlock() // Instruct the HtlcSwitch to close this link as the channel is no diff --git a/rpcserver.go b/rpcserver.go index d5397569..c7e760d7 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1431,7 +1431,6 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, if err != nil { return err } - channel.Stop() // If a force closure was requested, then we'll handle all the details // around the creation and broadcast of the unilateral closure @@ -1667,7 +1666,7 @@ func (r *rpcServer) fetchOpenDbChannel(chanPoint wire.OutPoint) ( // fetchActiveChannel attempts to locate a channel identified by its channel // point from the database's set of all currently opened channels and -// return it as a fully popuplated state machine +// return it as a fully populated state machine func (r *rpcServer) fetchActiveChannel(chanPoint wire.OutPoint) ( *lnwallet.LightningChannel, error) { @@ -1680,7 +1679,7 @@ func (r *rpcServer) fetchActiveChannel(chanPoint wire.OutPoint) ( // we create a fully populated channel state machine which // uses the db channel as backing storage. return lnwallet.NewLightningChannel( - r.server.cc.wallet.Cfg.Signer, nil, dbChan, + r.server.cc.wallet.Cfg.Signer, nil, dbChan, nil, ) } diff --git a/server.go b/server.go index fbc6c02c..c792122b 100644 --- a/server.go +++ b/server.go @@ -11,6 +11,7 @@ import ( "net" "path/filepath" "regexp" + "runtime" "strconv" "sync" "sync/atomic" @@ -161,6 +162,8 @@ type server struct { connMgr *connmgr.ConnManager + sigPool *lnwallet.SigPool + // globalFeatures feature vector which affects HTLCs and thus are also // advertised to other nodes. globalFeatures *lnwire.FeatureVector @@ -258,8 +261,9 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, sphinxRouter := sphinx.NewRouter(privKey, activeNetParams.Params, replayLog) s := &server{ - chanDB: chanDB, - cc: cc, + chanDB: chanDB, + cc: cc, + sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer), invoices: newInvoiceRegistry(chanDB), @@ -947,6 +951,9 @@ func (s *server) Start() error { // sufficient number of confirmations, or when the input for the // funding transaction is spent in an attempt at an uncooperative close // by the counterparty. + if err := s.sigPool.Start(); err != nil { + return err + } if err := s.cc.chainNotifier.Start(); err != nil { return err } @@ -1034,6 +1041,7 @@ func (s *server) Stop() error { } // Shutdown the wallet, funding manager, and the rpc server. + s.sigPool.Stop() s.cc.chainNotifier.Stop() s.chanRouter.Stop() s.htlcSwitch.Stop() diff --git a/test_utils.go b/test_utils.go index 2f954ff2..283efabc 100644 --- a/test_utils.go +++ b/test_utils.go @@ -298,18 +298,23 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, aliceSigner := &mockSigner{aliceKeyPriv} bobSigner := &mockSigner{bobKeyPriv} + alicePool := lnwallet.NewSigPool(1, aliceSigner) channelAlice, err := lnwallet.NewLightningChannel( - aliceSigner, nil, aliceChannelState, + aliceSigner, nil, aliceChannelState, alicePool, ) if err != nil { return nil, nil, nil, nil, err } + alicePool.Start() + + bobPool := lnwallet.NewSigPool(1, bobSigner) channelBob, err := lnwallet.NewLightningChannel( - bobSigner, nil, bobChannelState, + bobSigner, nil, bobChannelState, bobPool, ) if err != nil { return nil, nil, nil, nil, err } + bobPool.Start() chainIO := &mockChainIO{} wallet := &lnwallet.LightningWallet{