diff --git a/breacharbiter_test.go b/breacharbiter_test.go index 21d62658..65a5bc5b 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -1123,18 +1123,6 @@ func TestBreachHandoffFail(t *testing.T) { } defer cleanUpArb() - // Instantiate a second lightning channel for alice, using the state of - // her last channel. - aliceKeyPriv, _ := btcec.PrivKeyFromBytes(btcec.S256(), - alicesPrivKey) - aliceSigner := &mockSigner{aliceKeyPriv} - - alice2, err := lnwallet.NewLightningChannel(aliceSigner, nil, alice.State()) - if err != nil { - t.Fatalf("unable to create test channels: %v", err) - } - defer alice2.Stop() - // Signal a spend of the funding transaction and wait for the close // observer to exit. This time we are allowing the handoff to succeed. breach = &ContractBreachEvent{ @@ -1567,18 +1555,23 @@ func createInitChannels(revocationWindow int) (*lnwallet.LightningChannel, *lnwa aliceSigner := &mockSigner{aliceKeyPriv} bobSigner := &mockSigner{bobKeyPriv} + alicePool := lnwallet.NewSigPool(1, aliceSigner) channelAlice, err := lnwallet.NewLightningChannel( - aliceSigner, pCache, aliceChannelState, + aliceSigner, pCache, aliceChannelState, alicePool, ) if err != nil { return nil, nil, nil, err } + alicePool.Start() + + bobPool := lnwallet.NewSigPool(1, bobSigner) channelBob, err := lnwallet.NewLightningChannel( - bobSigner, pCache, bobChannelState, + bobSigner, pCache, bobChannelState, bobPool, ) if err != nil { return nil, nil, nil, err } + bobPool.Start() addr := &net.TCPAddr{ IP: net.ParseIP("127.0.0.1"), diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 5f8b161d..07eab9e1 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -3,10 +3,11 @@ package contractcourt import ( "errors" "fmt" - "github.com/lightningnetwork/lnd/sweep" "sync" "sync/atomic" + "github.com/lightningnetwork/lnd/sweep" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" @@ -238,11 +239,11 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, } chanMachine, err := lnwallet.NewLightningChannel( - c.cfg.Signer, c.cfg.PreimageDB, channel) + c.cfg.Signer, c.cfg.PreimageDB, channel, nil, + ) if err != nil { return nil, err } - chanMachine.Stop() if err := c.cfg.MarkLinkInactive(chanPoint); err != nil { log.Errorf("unable to mark link inactive: %v", err) diff --git a/fundingmanager.go b/fundingmanager.go index 2bf15789..c799139c 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -1681,13 +1681,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { // Go on adding the channel to the channel graph, and crafting // channel announcements. lnChannel, err := lnwallet.NewLightningChannel( - nil, nil, completeChan, + nil, nil, completeChan, nil, ) if err != nil { fndgLog.Errorf("failed creating lnChannel: %v", err) return } - defer lnChannel.Stop() err = f.sendFundingLocked( fmsg.peer, completeChan, lnChannel, shortChanID, @@ -1971,12 +1970,11 @@ func (f *fundingManager) handleFundingConfirmation(peer lnpeer.Peer, // We create the state-machine object which wraps the database state. lnChannel, err := lnwallet.NewLightningChannel( - nil, nil, completeChan, + nil, nil, completeChan, nil, ) if err != nil { return err } - defer lnChannel.Stop() chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index ba62cc0f..fe2e71a5 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -443,7 +443,6 @@ func (l *channelLink) Stop() { } l.updateFeeTimer.Stop() - l.channel.Stop() l.overflowQueue.Stop() close(l.quit) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index e3c07a03..517196db 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1592,8 +1592,6 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( cleanUp := func() { close(alicePeer.quit) defer fCleanUp() - defer aliceLink.Stop() - defer bobChannel.Stop() } return aliceLink, bobChannel, bticker.Force, start, cleanUp, restore, nil diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 358a586d..c29619f5 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -10,6 +10,7 @@ import ( "math/big" "net" "os" + "runtime" "sync/atomic" "testing" "time" @@ -369,18 +370,23 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte, preimageMap: make(map[[32]byte][]byte), } + alicePool := lnwallet.NewSigPool(runtime.NumCPU(), aliceSigner) channelAlice, err := lnwallet.NewLightningChannel( - aliceSigner, pCache, aliceChannelState, + aliceSigner, pCache, aliceChannelState, alicePool, ) if err != nil { return nil, nil, nil, nil, err } + alicePool.Start() + + bobPool := lnwallet.NewSigPool(runtime.NumCPU(), bobSigner) channelBob, err := lnwallet.NewLightningChannel( - bobSigner, pCache, bobChannelState, + bobSigner, pCache, bobChannelState, bobPool, ) if err != nil { return nil, nil, nil, nil, err } + bobPool.Start() // Now that the channel are open, simulate the start of a session by // having Alice and Bob extend their revocation windows to each other. @@ -402,6 +408,7 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte, restore := func() (*lnwallet.LightningChannel, *lnwallet.LightningChannel, error) { + aliceStoredChannels, err := dbAlice.FetchOpenChannels(aliceKeyPub) switch err { case nil: @@ -434,8 +441,9 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte, return nil, nil, errors.New("unable to find stored alice channel") } - newAliceChannel, err := lnwallet.NewLightningChannel(aliceSigner, - nil, aliceStoredChannel) + newAliceChannel, err := lnwallet.NewLightningChannel( + aliceSigner, nil, aliceStoredChannel, alicePool, + ) if err != nil { return nil, nil, errors.Errorf("unable to create new channel: %v", err) @@ -473,8 +481,9 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte, return nil, nil, errors.New("unable to find stored bob channel") } - newBobChannel, err := lnwallet.NewLightningChannel(bobSigner, - nil, bobStoredChannel) + newBobChannel, err := lnwallet.NewLightningChannel( + bobSigner, nil, bobStoredChannel, bobPool, + ) if err != nil { return nil, nil, errors.Errorf("unable to create new channel: %v", err) @@ -485,7 +494,7 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte, return channelAlice, channelBob, cleanUpFunc, restore, nil } -// getChanID retrieves the channel point from nwire message. +// getChanID retrieves the channel point from an lnnwire message. func getChanID(msg lnwire.Message) (lnwire.ChannelID, error) { var chanID lnwire.ChannelID switch msg := msg.(type) { diff --git a/lnwallet/channel.go b/lnwallet/channel.go index cdd2baac..4cd21cf0 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -5,10 +5,8 @@ import ( "container/list" "crypto/sha256" "fmt" - "runtime" "sort" "sync" - "sync/atomic" "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -1260,8 +1258,6 @@ func compactLogs(ourLog, theirLog *updateLog, // // See the individual comments within the above methods for further details. type LightningChannel struct { - shutdown int32 // To be used atomically. - // Signer is the main signer instances that will be responsible for // signing any HTLC and commitment transaction generated by the state // machine. @@ -1282,7 +1278,7 @@ type LightningChannel struct { // validating signatures in parallel. This is utilized as an // optimization to void serially signing or validating the HTLC // signatures, of which there may be hundreds. - sigPool *sigPool + sigPool *SigPool // pCache is the global preimage cache shared across all other // LightningChannel instance. We'll use this cache either when we force @@ -1347,11 +1343,6 @@ type LightningChannel struct { RemoteFundingKey *btcec.PublicKey sync.RWMutex - - cowg sync.WaitGroup - wg sync.WaitGroup - - quit chan struct{} } // NewLightningChannel creates a new, active payment channel given an @@ -1360,7 +1351,8 @@ type LightningChannel struct { // automatically persist pertinent state to the database in an efficient // manner. func NewLightningChannel(signer Signer, pCache PreimageCache, - state *channeldb.OpenChannel) (*LightningChannel, error) { + state *channeldb.OpenChannel, + sigPool *SigPool) (*LightningChannel, error) { localCommit := state.LocalCommitment remoteCommit := state.RemoteCommitment @@ -1375,9 +1367,8 @@ func NewLightningChannel(signer Signer, pCache PreimageCache, ) lc := &LightningChannel{ - // TODO(roasbeef): tune num sig workers? - sigPool: newSigPool(runtime.NumCPU(), signer), Signer: signer, + sigPool: sigPool, pCache: pCache, currentHeight: localCommit.CommitHeight, remoteCommitChain: newCommitmentChain(), @@ -1391,7 +1382,6 @@ func NewLightningChannel(signer Signer, pCache PreimageCache, Capacity: state.Capacity, LocalFundingKey: state.LocalChanCfg.MultiSigKey.PubKey, RemoteFundingKey: state.RemoteChanCfg.MultiSigKey.PubKey, - quit: make(chan struct{}), } // With the main channel struct reconstructed, we'll now restore the @@ -1411,12 +1401,6 @@ func NewLightningChannel(signer Signer, pCache PreimageCache, lc.createStateHintObfuscator() - // Finally, we'll kick of the signature job pool to handle any upcoming - // commitment state generation and validation. - if err := lc.sigPool.Start(); err != nil { - return nil, err - } - return lc, nil } @@ -1467,23 +1451,6 @@ func (lc *LightningChannel) createStateHintObfuscator() { } } -// Stop gracefully shuts down any active goroutines spawned by the -// LightningChannel during regular duties. -func (lc *LightningChannel) Stop() { - if !atomic.CompareAndSwapInt32(&lc.shutdown, 0, 1) { - return - } - - lc.sigPool.Stop() - - close(lc.quit) -} - -// WaitForClose blocks until the channel's close observer has terminated. -func (lc *LightningChannel) WaitForClose() { - lc.cowg.Wait() -} - // ResetState resets the state of the channel back to the default state. This // ensures that any active goroutines which need to act based on on-chain // events do so properly. @@ -2673,7 +2640,7 @@ func processRemoveEntry(htlc *PaymentDescriptor, ourBalance, // asynchronously and in parallel. func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, localChanCfg, remoteChanCfg *channeldb.ChannelConfig, - remoteCommitView *commitment) ([]signJob, chan struct{}, error) { + remoteCommitView *commitment) ([]SignJob, chan struct{}, error) { txHash := remoteCommitView.txn.TxHash() dustLimit := remoteChanCfg.DustLimit @@ -2684,7 +2651,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, // smaller (than its total capacity) and some HTLCs may be dust. numSigs := (len(remoteCommitView.incomingHTLCs) + len(remoteCommitView.outgoingHTLCs)) - sigBatch := make([]signJob, 0, numSigs) + sigBatch := make([]SignJob, 0, numSigs) var err error cancelChan := make(chan struct{}) @@ -2700,9 +2667,9 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, // If the HTLC isn't dust, then we'll create an empty sign job // to add to the batch momentarily. - sigJob := signJob{} - sigJob.cancel = cancelChan - sigJob.resp = make(chan signJobResp, 1) + sigJob := SignJob{} + sigJob.Cancel = cancelChan + sigJob.Resp = make(chan SignJobResp, 1) // As this is an incoming HTLC and we're sinning the commitment // transaction of the remote node, we'll need to generate an @@ -2718,7 +2685,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, Hash: txHash, Index: uint32(htlc.remoteOutputIndex), } - sigJob.tx, err = createHtlcTimeoutTx( + sigJob.Tx, err = createHtlcTimeoutTx( op, outputAmt, htlc.Timeout, uint32(remoteChanCfg.CsvDelay), keyRing.RevocationKey, keyRing.DelayKey, @@ -2730,7 +2697,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, // Finally, we'll generate a sign descriptor to generate a // signature to give to the remote party for this commitment // transaction. Note we use the raw HTLC amount. - sigJob.signDesc = SignDescriptor{ + sigJob.SignDesc = SignDescriptor{ KeyDesc: localChanCfg.HtlcBasePoint, SingleTweak: keyRing.LocalHtlcKeyTweak, WitnessScript: htlc.theirWitnessScript, @@ -2738,10 +2705,10 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, Value: int64(htlc.Amount.ToSatoshis()), }, HashType: txscript.SigHashAll, - SigHashes: txscript.NewTxSigHashes(sigJob.tx), + SigHashes: txscript.NewTxSigHashes(sigJob.Tx), InputIndex: 0, } - sigJob.outputIndex = htlc.remoteOutputIndex + sigJob.OutputIndex = htlc.remoteOutputIndex sigBatch = append(sigBatch, sigJob) } @@ -2751,9 +2718,9 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, continue } - sigJob := signJob{} - sigJob.cancel = cancelChan - sigJob.resp = make(chan signJobResp, 1) + sigJob := SignJob{} + sigJob.Cancel = cancelChan + sigJob.Resp = make(chan SignJobResp, 1) // As this is an outgoing HTLC and we're signing the commitment // transaction of the remote node, we'll need to generate an @@ -2770,7 +2737,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, Hash: txHash, Index: uint32(htlc.remoteOutputIndex), } - sigJob.tx, err = createHtlcSuccessTx( + sigJob.Tx, err = createHtlcSuccessTx( op, outputAmt, uint32(remoteChanCfg.CsvDelay), keyRing.RevocationKey, keyRing.DelayKey, ) @@ -2781,7 +2748,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, // Finally, we'll generate a sign descriptor to generate a // signature to give to the remote party for this commitment // transaction. Note we use the raw HTLC amount. - sigJob.signDesc = SignDescriptor{ + sigJob.SignDesc = SignDescriptor{ KeyDesc: localChanCfg.HtlcBasePoint, SingleTweak: keyRing.LocalHtlcKeyTweak, WitnessScript: htlc.theirWitnessScript, @@ -2789,10 +2756,10 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, Value: int64(htlc.Amount.ToSatoshis()), }, HashType: txscript.SigHashAll, - SigHashes: txscript.NewTxSigHashes(sigJob.tx), + SigHashes: txscript.NewTxSigHashes(sigJob.Tx), InputIndex: 0, } - sigJob.outputIndex = htlc.remoteOutputIndex + sigJob.OutputIndex = htlc.remoteOutputIndex sigBatch = append(sigBatch, sigJob) } @@ -3059,26 +3026,23 @@ func (lc *LightningChannel) SignNextCommitment() (lnwire.Sig, []lnwire.Sig, erro // order as they appear on the commitment transaction after BIP 69 // sorting. sort.Slice(sigBatch, func(i, j int) bool { - return sigBatch[i].outputIndex < sigBatch[j].outputIndex + return sigBatch[i].OutputIndex < sigBatch[j].OutputIndex }) // With the jobs sorted, we'll now iterate through all the responses to // gather each of the signatures in order. htlcSigs = make([]lnwire.Sig, 0, len(sigBatch)) for _, htlcSigJob := range sigBatch { - select { - case jobResp := <-htlcSigJob.resp: - // If an error occurred, then we'll cancel any other - // active jobs. - if jobResp.err != nil { - close(cancelChan) - return sig, htlcSigs, err - } + jobResp := <-htlcSigJob.Resp - htlcSigs = append(htlcSigs, jobResp.sig) - case <-lc.quit: - return sig, htlcSigs, fmt.Errorf("channel shutting down") + // If an error occurred, then we'll cancel any other active + // jobs. + if jobResp.Err != nil { + close(cancelChan) + return sig, htlcSigs, err } + + htlcSigs = append(htlcSigs, jobResp.Sig) } // As we're about to proposer a new commitment state for the remote @@ -3740,7 +3704,7 @@ func (lc *LightningChannel) validateCommitmentSanity(theirLogCounter, // directly into the pool of workers. func genHtlcSigValidationJobs(localCommitmentView *commitment, keyRing *CommitmentKeyRing, htlcSigs []lnwire.Sig, - localChanCfg, remoteChanCfg *channeldb.ChannelConfig) ([]verifyJob, error) { + localChanCfg, remoteChanCfg *channeldb.ChannelConfig) ([]VerifyJob, error) { txHash := localCommitmentView.txn.TxHash() feePerKw := localCommitmentView.feePerKw @@ -3751,7 +3715,7 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment, // length will be smaller than the total capacity. numHtlcs := (len(localCommitmentView.incomingHTLCs) + len(localCommitmentView.outgoingHTLCs)) - verifyJobs := make([]verifyJob, 0, numHtlcs) + verifyJobs := make([]VerifyJob, 0, numHtlcs) // We'll iterate through each output in the commitment transaction, // populating the sigHash closure function if it's detected to be an @@ -3879,11 +3843,11 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment, continue } - verifyJobs = append(verifyJobs, verifyJob{ - htlcIndex: htlcIndex, - pubKey: keyRing.RemoteHtlcKey, - sig: sig, - sigHash: sigHash, + verifyJobs = append(verifyJobs, VerifyJob{ + HtlcIndex: htlcIndex, + PubKey: keyRing.RemoteHtlcKey, + Sig: sig, + SigHash: sigHash, }) i++ @@ -4083,34 +4047,30 @@ func (lc *LightningChannel) ReceiveNewCommitment(commitSig lnwire.Sig, for i := 0; i < len(verifyJobs); i++ { // In the case that a single signature is invalid, we'll exit // early and cancel all the outstanding verification jobs. - select { - case htlcErr := <-verifyResps: - if htlcErr != nil { - close(cancelChan) + htlcErr := <-verifyResps + if htlcErr != nil { + close(cancelChan) - sig, err := lnwire.NewSigFromSignature( - htlcErr.sig, - ) - if err != nil { - return err - } - sigHash, err := htlcErr.sigHash() - if err != nil { - return err - } - - var txBytes bytes.Buffer - localCommitTx.Serialize(&txBytes) - return &InvalidHtlcSigError{ - commitHeight: nextHeight, - htlcSig: sig.ToSignatureBytes(), - htlcIndex: htlcErr.htlcIndex, - sigHash: sigHash, - commitTx: txBytes.Bytes(), - } + sig, err := lnwire.NewSigFromSignature( + htlcErr.Sig, + ) + if err != nil { + return err + } + sigHash, err := htlcErr.SigHash() + if err != nil { + return err + } + + var txBytes bytes.Buffer + localCommitTx.Serialize(&txBytes) + return &InvalidHtlcSigError{ + commitHeight: nextHeight, + htlcSig: sig.ToSignatureBytes(), + htlcIndex: htlcErr.HtlcIndex, + sigHash: sigHash, + commitTx: txBytes.Bytes(), } - case <-lc.quit: - return fmt.Errorf("channel shutting down") } } diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index 24701f28..e3cd3e15 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -1444,14 +1444,18 @@ func TestStateUpdatePersistence(t *testing.T) { if err != nil { t.Fatalf("unable to fetch channel: %v", err) } + aliceChannelNew, err := NewLightningChannel( aliceChannel.Signer, nil, aliceChannels[0], + aliceChannel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) } + bobChannelNew, err := NewLightningChannel( bobChannel.Signer, nil, bobChannels[0], + bobChannel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) @@ -2544,20 +2548,19 @@ func TestChanSyncFullySynced(t *testing.T) { if err != nil { t.Fatalf("unable to fetch channel: %v", err) } + aliceChannelNew, err := NewLightningChannel( - aliceChannel.Signer, nil, aliceChannels[0], + aliceChannel.Signer, nil, aliceChannels[0], aliceChannel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) } - defer aliceChannelNew.Stop() bobChannelNew, err := NewLightningChannel( - bobChannel.Signer, nil, bobChannels[0], + bobChannel.Signer, nil, bobChannels[0], bobChannel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) } - defer bobChannelNew.Stop() assertNoChanSyncNeeded(t, aliceChannelNew, bobChannelNew) } @@ -2576,6 +2579,7 @@ func restartChannel(channelOld *LightningChannel) (*LightningChannel, error) { channelNew, err := NewLightningChannel( channelOld.Signer, channelOld.pCache, nodeChannels[0], + channelOld.sigPool, ) if err != nil { return nil, err @@ -2786,7 +2790,6 @@ func TestChanSyncOweCommitment(t *testing.T) { if err != nil { t.Fatalf("unable to restart alice: %v", err) } - defer aliceChannel.Stop() assertAliceCommitRetransmit() // TODO(roasbeef): restart bob as well??? @@ -3053,7 +3056,6 @@ func TestChanSyncOweRevocation(t *testing.T) { if err != nil { t.Fatalf("unable to restart alice: %v", err) } - defer aliceChannel.Stop() assertAliceOwesRevoke() // TODO(roasbeef): restart bob too??? @@ -3233,7 +3235,6 @@ func TestChanSyncOweRevocationAndCommit(t *testing.T) { if err != nil { t.Fatalf("unable to restart channel: %v", err) } - defer bobChannel.Stop() assertBobSendsRevokeAndCommit() // We'll now finish the state transition by having Alice process both @@ -3428,7 +3429,6 @@ func TestChanSyncOweRevocationAndCommitForceTransition(t *testing.T) { if err != nil { t.Fatalf("unable to restart channel: %v", err) } - defer bobChannel.Stop() if len(bobMsgsToSend) != 2 { t.Fatalf("expected bob to send %v messages, instead "+ "sends: %v", 2, spew.Sdump(bobMsgsToSend)) @@ -3775,12 +3775,10 @@ func TestChannelRetransmissionFeeUpdate(t *testing.T) { if err != nil { t.Fatalf("unable to restart alice: %v", err) } - defer aliceChannel.Stop() bobChannel, err = restartChannel(bobChannel) if err != nil { t.Fatalf("unable to restart channel: %v", err) } - defer bobChannel.Stop() // Bob doesn't get this message so upon reconnection, they need to // synchronize. Alice should conclude that she owes Bob a commitment, @@ -4293,12 +4291,10 @@ func TestLockedInHtlcForwardingSkipAfterRestart(t *testing.T) { if err != nil { t.Fatalf("unable to restart alice: %v", err) } - defer aliceChannel.Stop() bobChannel, err = restartChannel(bobChannel) if err != nil { t.Fatalf("unable to restart bob: %v", err) } - defer bobChannel.Stop() // With both nodes restarted, Bob will now attempt to cancel one of // Alice's HTLC's. @@ -4399,12 +4395,10 @@ func TestLockedInHtlcForwardingSkipAfterRestart(t *testing.T) { if err != nil { t.Fatalf("unable to restart alice: %v", err) } - defer aliceChannel.Stop() bobChannel, err = restartChannel(bobChannel) if err != nil { t.Fatalf("unable to restart bob: %v", err) } - defer bobChannel.Stop() // Readd the Fail to both Alice and Bob's channels, as the non-committed // update will not have survived the restart. @@ -5590,19 +5584,19 @@ func TestChannelRestoreUpdateLogs(t *testing.T) { // the update logs up to the correct state set up above. newAliceChannel, err := NewLightningChannel( aliceChannel.Signer, nil, aliceChannel.channelState, + aliceChannel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) } - defer newAliceChannel.Stop() newBobChannel, err := NewLightningChannel( bobChannel.Signer, nil, bobChannel.channelState, + bobChannel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) } - defer newBobChannel.Stop() // compare all the logs between the old and new channels, to make sure // they all got restored properly. @@ -5669,13 +5663,14 @@ func assertInLogs(t *testing.T, channel *LightningChannel, numAddsLocal, // expected state. func restoreAndAssert(t *testing.T, channel *LightningChannel, numAddsLocal, numFailsLocal, numAddsRemote, numFailsRemote int) { + newChannel, err := NewLightningChannel( channel.Signer, nil, channel.channelState, + channel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) } - defer newChannel.Stop() assertInLog(t, newChannel.localUpdateLog, numAddsLocal, numFailsLocal) assertInLog(t, newChannel.remoteUpdateLog, numAddsRemote, numFailsRemote) @@ -5863,12 +5858,10 @@ func TestDuplicateFailRejection(t *testing.T) { if err != nil { t.Fatalf("unable to restart channel: %v", err) } - defer bobChannel.Stop() aliceChannel, err = restartChannel(aliceChannel) if err != nil { t.Fatalf("unable to restart channel: %v", err) } - defer aliceChannel.Stop() // If we try to fail the same HTLC again, then we should get an error. err = bobChannel.FailHTLC(0, []byte("failreason"), nil, nil, nil) @@ -5945,12 +5938,10 @@ func TestDuplicateSettleRejection(t *testing.T) { if err != nil { t.Fatalf("unable to restart channel: %v", err) } - defer bobChannel.Stop() aliceChannel, err = restartChannel(aliceChannel) if err != nil { t.Fatalf("unable to restart channel: %v", err) } - defer aliceChannel.Stop() // If we try to fail the same HTLC again, then we should get an error. err = bobChannel.SettleHTLC(alicePreimage, uint64(0), nil, nil, nil) @@ -5983,9 +5974,9 @@ func TestChannelRestoreCommitHeight(t *testing.T) { channel *LightningChannel, remoteLog bool, htlcIndex uint64, expLocal, expRemote uint64) *LightningChannel { - channel.Stop() newChannel, err := NewLightningChannel( channel.Signer, nil, channel.channelState, + channel.sigPool, ) if err != nil { t.Fatalf("unable to create new channel: %v", err) @@ -6152,7 +6143,4 @@ func TestChannelRestoreCommitHeight(t *testing.T) { // HTLC an add height. bobChannel = restoreAndAssertCommitHeights(t, bobChannel, true, 0, 2, 1) bobChannel = restoreAndAssertCommitHeights(t, bobChannel, true, 1, 2, 2) - - aliceChannel.Stop() - bobChannel.Stop() } diff --git a/lnwallet/sigpool.go b/lnwallet/sigpool.go index 7e8015fc..c30688a7 100644 --- a/lnwallet/sigpool.go +++ b/lnwallet/sigpool.go @@ -19,119 +19,118 @@ const ( // TODO(roasbeef): job buffer pool? ) -// verifyJob is a job sent to the sigPool to verify a signature on a -// transaction. The items contained in the struct are necessary and sufficient -// to verify the full signature. The passed sigHash closure function should be -// set to a function that generates the relevant sighash. +// VerifyJob is a job sent to the sigPool sig pool to verify a signature +// on a transaction. The items contained in the struct are necessary and +// sufficient to verify the full signature. The passed sigHash closure function +// should be set to a function that generates the relevant sighash. // // TODO(roasbeef): when we move to ecschnorr, make into batch signature // verification using bos-coster (or pip?). -type verifyJob struct { - // pubKey is the public key that was used to generate the purported +type VerifyJob struct { + // PubKey is the public key that was used to generate the purported // valid signature. Note that with the current channel construction, // this public key will likely have been tweaked using the current per // commitment point for a particular commitment transactions. - pubKey *btcec.PublicKey + PubKey *btcec.PublicKey - // sig is the raw signature generated using the above public key. This + // Sig is the raw signature generated using the above public key. This // is the signature to be verified. - sig *btcec.Signature + Sig *btcec.Signature - // sigHash is a function closure generates the sighashes that the + // SigHash is a function closure generates the sighashes that the // passed signature is known to have signed. - sigHash func() ([]byte, error) + SigHash func() ([]byte, error) - // htlcIndex is the index of the HTLC from the PoV of the remote + // HtlcIndex is the index of the HTLC from the PoV of the remote // party's update log. - htlcIndex uint64 + HtlcIndex uint64 - // cancel is a channel that should be closed if the caller wishes to + // Cancel is a channel that should be closed if the caller wishes to // cancel all pending verification jobs part of a single batch. This // channel is to be closed in the case that a single signature in a // batch has been returned as invalid, as there is no need to verify // the remainder of the signatures. - cancel chan struct{} + Cancel chan struct{} - // errResp is the channel that the result of the signature verification + // ErrResp is the channel that the result of the signature verification // is to be sent over. In the see that the signature is valid, a nil // error will be passed. Otherwise, a concrete error detailing the // issue will be passed. - errResp chan *htlcIndexErr + ErrResp chan *HtlcIndexErr } -// verifyJobErr is a special type of error that also includes a pointer to the -// original validation job. Ths error message allows us to craft more detailed +// HtlcIndexErr is a special type of error that also includes a pointer to the +// original validation job. This error message allows us to craft more detailed // errors at upper layers. -type htlcIndexErr struct { +type HtlcIndexErr struct { error - *verifyJob + *VerifyJob } -// signJob is a job sent to the sigPool to generate a valid signature according -// to the passed SignDescriptor for the passed transaction. Jobs are intended -// to be sent in batches in order to parallelize the job of generating -// signatures for a new commitment transaction. -type signJob struct { - // signDesc is intended to be a full populated SignDescriptor which +// SignJob is a job sent to the sigPool sig pool to generate a valid +// signature according to the passed SignDescriptor for the passed transaction. +// Jobs are intended to be sent in batches in order to parallelize the job of +// generating signatures for a new commitment transaction. +type SignJob struct { + // SignDesc is intended to be a full populated SignDescriptor which // encodes the necessary material (keys, witness script, etc) required // to generate a valid signature for the specified input. - signDesc SignDescriptor + SignDesc SignDescriptor - // tx is the transaction to be signed. This is required to generate the + // Tx is the transaction to be signed. This is required to generate the // proper sighash for the input to be signed. - tx *wire.MsgTx + Tx *wire.MsgTx - // outputIndex is the output index of the HTLC on the commitment + // OutputIndex is the output index of the HTLC on the commitment // transaction being signed. - outputIndex int32 + OutputIndex int32 - // cancel is a channel that should be closed if the caller wishes to + // Cancel is a channel that should be closed if the caller wishes to // abandon all pending sign jobs part of a single batch. - cancel chan struct{} + Cancel chan struct{} - // resp is the channel that the response to this particular signJob + // Resp is the channel that the response to this particular SignJob // will be sent over. // // TODO(roasbeef): actually need to allow caller to set, need to retain // order mark commit sig as special - resp chan signJobResp + Resp chan SignJobResp } -// signJobResp is the response to a sign job. Both channels are to be read in +// SignJobResp is the response to a sign job. Both channels are to be read in // order to ensure no unnecessary goroutine blocking occurs. Additionally, both // channels should be buffered. -type signJobResp struct { - // sig is the generated signature for a particular signJob In the case +type SignJobResp struct { + // Sig is the generated signature for a particular SignJob In the case // of an error during signature generation, then this value sent will // be nil. - sig lnwire.Sig + Sig lnwire.Sig - // err is the error that occurred when executing the specified + // Err is the error that occurred when executing the specified // signature job. In the case that no error occurred, this value will // be nil. - err error + Err error } -// sigPool is a struct that is meant to allow the current channel state machine -// to parallelize all signature generation and verification. This struct is -// needed as _each_ HTLC when creating a commitment transaction requires a -// signature, and similarly a receiver of a new commitment must verify all the -// HTLC signatures included within the CommitSig message. A pool of workers -// will be maintained by the sigPool. Batches of jobs (either to sign or -// verify) can be sent to the pool of workers which will asynchronously perform -// the specified job. -// -// TODO(roasbeef): rename? -// * ecdsaPool? -type sigPool struct { +// TODO(roasbeef); fix description + +// SigPool is a struct that is meant to allow the current channel state +// machine to parallelize all signature generation and verification. This +// struct is needed as _each_ HTLC when creating a commitment transaction +// requires a signature, and similarly a receiver of a new commitment must +// verify all the HTLC signatures included within the CommitSig message. A pool +// of workers will be maintained by the sigPool. Batches of jobs (either +// to sign or verify) can be sent to the pool of workers which will +// asynchronously perform the specified job. +type SigPool struct { started uint32 // To be used atomically. stopped uint32 // To be used atomically. signer Signer - verifyJobs chan verifyJob - signJobs chan signJob + verifyJobs chan VerifyJob + signJobs chan SignJob wg sync.WaitGroup quit chan struct{} @@ -139,22 +138,22 @@ type sigPool struct { numWorkers int } -// newSigPool creates a new signature pool with the specified number of +// NewSigPool creates a new signature pool with the specified number of // workers. The recommended parameter for the number of works is the number of // physical CPU cores available on the target machine. -func newSigPool(numWorkers int, signer Signer) *sigPool { - return &sigPool{ +func NewSigPool(numWorkers int, signer Signer) *SigPool { + return &SigPool{ signer: signer, numWorkers: numWorkers, - verifyJobs: make(chan verifyJob, jobBuffer), - signJobs: make(chan signJob, jobBuffer), + verifyJobs: make(chan VerifyJob, jobBuffer), + signJobs: make(chan SignJob, jobBuffer), quit: make(chan struct{}), } } -// Start starts of all goroutines that the sigPool needs to carry out its -// duties. -func (s *sigPool) Start() error { +// Start starts of all goroutines that the sigPool sig pool needs to +// carry out its duties. +func (s *SigPool) Start() error { if !atomic.CompareAndSwapUint32(&s.started, 0, 1) { return nil } @@ -169,7 +168,7 @@ func (s *sigPool) Start() error { // Stop signals any active workers carrying out jobs to exit so the sigPool can // gracefully shutdown. -func (s *sigPool) Stop() error { +func (s *SigPool) Stop() error { if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { return nil } @@ -180,11 +179,11 @@ func (s *sigPool) Stop() error { return nil } -// poolWorker is the main worker goroutine within the sigPool. Individual -// batches are distributed amongst each of the active workers. The workers then -// execute the task based on the type of job, and return the result back to -// caller. -func (s *sigPool) poolWorker() { +// poolWorker is the main worker goroutine within the sigPool sig pool. +// Individual batches are distributed amongst each of the active workers. The +// workers then execute the task based on the type of job, and return the +// result back to caller. +func (s *SigPool) poolWorker() { defer s.wg.Done() for { @@ -195,16 +194,17 @@ func (s *sigPool) poolWorker() { // send the result along with a possible error back to the // caller. case sigMsg := <-s.signJobs: - rawSig, err := s.signer.SignOutputRaw(sigMsg.tx, - &sigMsg.signDesc) + rawSig, err := s.signer.SignOutputRaw( + sigMsg.Tx, &sigMsg.SignDesc, + ) if err != nil { select { - case sigMsg.resp <- signJobResp{ - sig: lnwire.Sig{}, - err: err, + case sigMsg.Resp <- SignJobResp{ + Sig: lnwire.Sig{}, + Err: err, }: continue - case <-sigMsg.cancel: + case <-sigMsg.Cancel: continue case <-s.quit: return @@ -213,11 +213,11 @@ func (s *sigPool) poolWorker() { sig, err := lnwire.NewSigFromRawSignature(rawSig) select { - case sigMsg.resp <- signJobResp{ - sig: sig, - err: err, + case sigMsg.Resp <- SignJobResp{ + Sig: sig, + Err: err, }: - case <-sigMsg.cancel: + case <-sigMsg.Cancel: continue case <-s.quit: return @@ -227,58 +227,60 @@ func (s *sigPool) poolWorker() { // world. We'll attempt to construct the sighash, parse the // signature, and finally verify the signature. case verifyMsg := <-s.verifyJobs: - sigHash, err := verifyMsg.sigHash() + sigHash, err := verifyMsg.SigHash() if err != nil { select { - case verifyMsg.errResp <- &htlcIndexErr{ + case verifyMsg.ErrResp <- &HtlcIndexErr{ error: err, - verifyJob: &verifyMsg, + VerifyJob: &verifyMsg, }: continue - case <-verifyMsg.cancel: + case <-verifyMsg.Cancel: continue } } - rawSig := verifyMsg.sig + rawSig := verifyMsg.Sig - if !rawSig.Verify(sigHash, verifyMsg.pubKey) { + if !rawSig.Verify(sigHash, verifyMsg.PubKey) { err := fmt.Errorf("invalid signature "+ - "sighash: %x, sig: %x", sigHash, rawSig.Serialize()) + "sighash: %x, sig: %x", sigHash, + rawSig.Serialize()) + select { - case verifyMsg.errResp <- &htlcIndexErr{ + case verifyMsg.ErrResp <- &HtlcIndexErr{ error: err, - verifyJob: &verifyMsg, + VerifyJob: &verifyMsg, }: - case <-verifyMsg.cancel: + case <-verifyMsg.Cancel: case <-s.quit: return } } else { select { - case verifyMsg.errResp <- nil: - case <-verifyMsg.cancel: + case verifyMsg.ErrResp <- nil: + case <-verifyMsg.Cancel: case <-s.quit: return } } - // The sigPool is exiting, so we will as well. + // The sigPool sig pool is exiting, so we will as well. case <-s.quit: return } } } -// SubmitSignBatch submits a batch of signature jobs to the sigPool. The -// response and cancel channels for each of the signJob's are expected to be -// fully populated, as the response for each job will be sent over the response -// channel within the job itself. -func (s *sigPool) SubmitSignBatch(signJobs []signJob) { +// SubmitSignBatch submits a batch of signature jobs to the sigPool. The +// response and cancel channels for each of the SignJob's are expected to be +// fully populated, as the response for each job will be sent over the +// response channel within the job itself. +func (s *SigPool) SubmitSignBatch(signJobs []SignJob) { for _, job := range signJobs { select { case s.signJobs <- job: - case <-job.cancel: + case <-job.Cancel: // TODO(roasbeef): return error? case <-s.quit: return @@ -291,18 +293,18 @@ func (s *sigPool) SubmitSignBatch(signJobs []signJob) { // denoting if signature verification was valid or not. The passed cancelChan // allows the caller to cancel all pending jobs in the case that they wish to // bail early. -func (s *sigPool) SubmitVerifyBatch(verifyJobs []verifyJob, - cancelChan chan struct{}) <-chan *htlcIndexErr { +func (s *SigPool) SubmitVerifyBatch(verifyJobs []VerifyJob, + cancelChan chan struct{}) <-chan *HtlcIndexErr { - errChan := make(chan *htlcIndexErr, len(verifyJobs)) + errChan := make(chan *HtlcIndexErr, len(verifyJobs)) for _, job := range verifyJobs { - job.cancel = cancelChan - job.errResp = errChan + job.Cancel = cancelChan + job.ErrResp = errChan select { case s.verifyJobs <- job: - case <-job.cancel: + case <-job.Cancel: return errChan } } diff --git a/lnwallet/test_utils.go b/lnwallet/test_utils.go index c11aa69d..60e98d3b 100644 --- a/lnwallet/test_utils.go +++ b/lnwallet/test_utils.go @@ -309,18 +309,24 @@ func CreateTestChannels() (*LightningChannel, *LightningChannel, func(), error) } // TODO(roasbeef): make mock version of pre-image store + + alicePool := NewSigPool(1, aliceSigner) channelAlice, err := NewLightningChannel( - aliceSigner, pCache, aliceChannelState, + aliceSigner, pCache, aliceChannelState, alicePool, ) if err != nil { return nil, nil, nil, err } + alicePool.Start() + + bobPool := NewSigPool(1, bobSigner) channelBob, err := NewLightningChannel( - bobSigner, pCache, bobChannelState, + bobSigner, pCache, bobChannelState, bobPool, ) if err != nil { return nil, nil, nil, err } + bobPool.Start() err = SetStateNumHint( aliceCommitTx, 0, channelAlice.stateHintObfuscator, @@ -346,8 +352,8 @@ func CreateTestChannels() (*LightningChannel, *LightningChannel, func(), error) os.RemoveAll(bobPath) os.RemoveAll(alicePath) - channelAlice.Stop() - channelBob.Stop() + alicePool.Stop() + bobPool.Stop() } // Now that the channel are open, simulate the start of a session by diff --git a/peer.go b/peer.go index 90ecec10..a4963de9 100644 --- a/peer.go +++ b/peer.go @@ -383,6 +383,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { for _, dbChan := range chans { lnChan, err := lnwallet.NewLightningChannel( p.server.cc.signer, p.server.witnessBeacon, dbChan, + p.server.sigPool, ) if err != nil { return err @@ -400,7 +401,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { if dbChan.ChanStatus() != channeldb.Default { peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+ "start.", chanPoint, dbChan.ChanStatus()) - lnChan.Stop() continue } @@ -409,13 +409,11 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { if _, ok := p.failedChannels[chanID]; ok { peerLog.Warnf("ChannelPoint(%v) is failed, won't "+ "start.", chanPoint) - lnChan.Stop() continue } _, currentHeight, err := p.server.cc.chainIO.GetBestBlock() if err != nil { - lnChan.Stop() return err } @@ -425,7 +423,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { graph := p.server.chanDB.ChannelGraph() info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint) if err != nil && err != channeldb.ErrEdgeNotFound { - lnChan.Stop() return err } @@ -473,7 +470,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { *chanPoint, ) if err != nil { - lnChan.Stop() return err } @@ -483,7 +479,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { currentHeight, true, ) if err != nil { - lnChan.Stop() return fmt.Errorf("unable to add link %v to switch: %v", chanPoint, err) } @@ -1596,7 +1591,7 @@ out: // easily according to its channel ID. lnChan, err := lnwallet.NewLightningChannel( p.server.cc.signer, p.server.witnessBeacon, - newChan, + newChan, p.server.sigPool, ) if err != nil { p.activeChanMtx.Unlock() @@ -1624,7 +1619,6 @@ out: "block: %v", err) peerLog.Errorf(err.Error()) - lnChan.Stop() newChanReq.err <- err continue } @@ -1636,7 +1630,6 @@ out: "chain events: %v", err) peerLog.Errorf(err.Error()) - lnChan.Stop() newChanReq.err <- err continue } @@ -1666,7 +1659,6 @@ out: p.PubKey()) peerLog.Errorf(err.Error()) - lnChan.Stop() newChanReq.err <- err continue } @@ -2019,8 +2011,6 @@ func (p *peer) finalizeChanClosure(chanCloser *channelCloser) { } } - chanCloser.cfg.channel.Stop() - // Next, we'll launch a goroutine which will request to be notified by // the ChainNotifier once the closure transaction obtains a single // confirmation. @@ -2121,10 +2111,7 @@ func (p *peer) WipeChannel(chanPoint *wire.OutPoint) error { chanID := lnwire.NewChanIDFromOutPoint(chanPoint) p.activeChanMtx.Lock() - if channel, ok := p.activeChannels[chanID]; ok { - channel.Stop() - delete(p.activeChannels, chanID) - } + delete(p.activeChannels, chanID) p.activeChanMtx.Unlock() // Instruct the HtlcSwitch to close this link as the channel is no diff --git a/rpcserver.go b/rpcserver.go index d5397569..c7e760d7 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1431,7 +1431,6 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, if err != nil { return err } - channel.Stop() // If a force closure was requested, then we'll handle all the details // around the creation and broadcast of the unilateral closure @@ -1667,7 +1666,7 @@ func (r *rpcServer) fetchOpenDbChannel(chanPoint wire.OutPoint) ( // fetchActiveChannel attempts to locate a channel identified by its channel // point from the database's set of all currently opened channels and -// return it as a fully popuplated state machine +// return it as a fully populated state machine func (r *rpcServer) fetchActiveChannel(chanPoint wire.OutPoint) ( *lnwallet.LightningChannel, error) { @@ -1680,7 +1679,7 @@ func (r *rpcServer) fetchActiveChannel(chanPoint wire.OutPoint) ( // we create a fully populated channel state machine which // uses the db channel as backing storage. return lnwallet.NewLightningChannel( - r.server.cc.wallet.Cfg.Signer, nil, dbChan, + r.server.cc.wallet.Cfg.Signer, nil, dbChan, nil, ) } diff --git a/server.go b/server.go index fbc6c02c..c792122b 100644 --- a/server.go +++ b/server.go @@ -11,6 +11,7 @@ import ( "net" "path/filepath" "regexp" + "runtime" "strconv" "sync" "sync/atomic" @@ -161,6 +162,8 @@ type server struct { connMgr *connmgr.ConnManager + sigPool *lnwallet.SigPool + // globalFeatures feature vector which affects HTLCs and thus are also // advertised to other nodes. globalFeatures *lnwire.FeatureVector @@ -258,8 +261,9 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, sphinxRouter := sphinx.NewRouter(privKey, activeNetParams.Params, replayLog) s := &server{ - chanDB: chanDB, - cc: cc, + chanDB: chanDB, + cc: cc, + sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer), invoices: newInvoiceRegistry(chanDB), @@ -947,6 +951,9 @@ func (s *server) Start() error { // sufficient number of confirmations, or when the input for the // funding transaction is spent in an attempt at an uncooperative close // by the counterparty. + if err := s.sigPool.Start(); err != nil { + return err + } if err := s.cc.chainNotifier.Start(); err != nil { return err } @@ -1034,6 +1041,7 @@ func (s *server) Stop() error { } // Shutdown the wallet, funding manager, and the rpc server. + s.sigPool.Stop() s.cc.chainNotifier.Stop() s.chanRouter.Stop() s.htlcSwitch.Stop() diff --git a/test_utils.go b/test_utils.go index 2f954ff2..283efabc 100644 --- a/test_utils.go +++ b/test_utils.go @@ -298,18 +298,23 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, aliceSigner := &mockSigner{aliceKeyPriv} bobSigner := &mockSigner{bobKeyPriv} + alicePool := lnwallet.NewSigPool(1, aliceSigner) channelAlice, err := lnwallet.NewLightningChannel( - aliceSigner, nil, aliceChannelState, + aliceSigner, nil, aliceChannelState, alicePool, ) if err != nil { return nil, nil, nil, nil, err } + alicePool.Start() + + bobPool := lnwallet.NewSigPool(1, bobSigner) channelBob, err := lnwallet.NewLightningChannel( - bobSigner, nil, bobChannelState, + bobSigner, nil, bobChannelState, bobPool, ) if err != nil { return nil, nil, nil, nil, err } + bobPool.Start() chainIO := &mockChainIO{} wallet := &lnwallet.LightningWallet{