Merge pull request #2329 from Roasbeef/global-sig-pool

multi: replace per channel sigPool with global daemon level sigPool
This commit is contained in:
Olaoluwa Osuntokun 2018-12-17 16:33:56 -08:00 committed by GitHub
commit 152fc8b1f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 239 additions and 286 deletions

@ -1123,18 +1123,6 @@ func TestBreachHandoffFail(t *testing.T) {
}
defer cleanUpArb()
// Instantiate a second lightning channel for alice, using the state of
// her last channel.
aliceKeyPriv, _ := btcec.PrivKeyFromBytes(btcec.S256(),
alicesPrivKey)
aliceSigner := &mockSigner{aliceKeyPriv}
alice2, err := lnwallet.NewLightningChannel(aliceSigner, nil, alice.State())
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
}
defer alice2.Stop()
// Signal a spend of the funding transaction and wait for the close
// observer to exit. This time we are allowing the handoff to succeed.
breach = &ContractBreachEvent{
@ -1567,18 +1555,23 @@ func createInitChannels(revocationWindow int) (*lnwallet.LightningChannel, *lnwa
aliceSigner := &mockSigner{aliceKeyPriv}
bobSigner := &mockSigner{bobKeyPriv}
alicePool := lnwallet.NewSigPool(1, aliceSigner)
channelAlice, err := lnwallet.NewLightningChannel(
aliceSigner, pCache, aliceChannelState,
aliceSigner, pCache, aliceChannelState, alicePool,
)
if err != nil {
return nil, nil, nil, err
}
alicePool.Start()
bobPool := lnwallet.NewSigPool(1, bobSigner)
channelBob, err := lnwallet.NewLightningChannel(
bobSigner, pCache, bobChannelState,
bobSigner, pCache, bobChannelState, bobPool,
)
if err != nil {
return nil, nil, nil, err
}
bobPool.Start()
addr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),

@ -3,10 +3,11 @@ package contractcourt
import (
"errors"
"fmt"
"github.com/lightningnetwork/lnd/sweep"
"sync"
"sync/atomic"
"github.com/lightningnetwork/lnd/sweep"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
@ -238,11 +239,11 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
}
chanMachine, err := lnwallet.NewLightningChannel(
c.cfg.Signer, c.cfg.PreimageDB, channel)
c.cfg.Signer, c.cfg.PreimageDB, channel, nil,
)
if err != nil {
return nil, err
}
chanMachine.Stop()
if err := c.cfg.MarkLinkInactive(chanPoint); err != nil {
log.Errorf("unable to mark link inactive: %v", err)

@ -1681,13 +1681,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
// Go on adding the channel to the channel graph, and crafting
// channel announcements.
lnChannel, err := lnwallet.NewLightningChannel(
nil, nil, completeChan,
nil, nil, completeChan, nil,
)
if err != nil {
fndgLog.Errorf("failed creating lnChannel: %v", err)
return
}
defer lnChannel.Stop()
err = f.sendFundingLocked(
fmsg.peer, completeChan, lnChannel, shortChanID,
@ -1971,12 +1970,11 @@ func (f *fundingManager) handleFundingConfirmation(peer lnpeer.Peer,
// We create the state-machine object which wraps the database state.
lnChannel, err := lnwallet.NewLightningChannel(
nil, nil, completeChan,
nil, nil, completeChan, nil,
)
if err != nil {
return err
}
defer lnChannel.Stop()
chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)

@ -443,7 +443,6 @@ func (l *channelLink) Stop() {
}
l.updateFeeTimer.Stop()
l.channel.Stop()
l.overflowQueue.Stop()
close(l.quit)

@ -1592,8 +1592,6 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
cleanUp := func() {
close(alicePeer.quit)
defer fCleanUp()
defer aliceLink.Stop()
defer bobChannel.Stop()
}
return aliceLink, bobChannel, bticker.Force, start, cleanUp, restore, nil

@ -10,6 +10,7 @@ import (
"math/big"
"net"
"os"
"runtime"
"sync/atomic"
"testing"
"time"
@ -369,18 +370,23 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte,
preimageMap: make(map[[32]byte][]byte),
}
alicePool := lnwallet.NewSigPool(runtime.NumCPU(), aliceSigner)
channelAlice, err := lnwallet.NewLightningChannel(
aliceSigner, pCache, aliceChannelState,
aliceSigner, pCache, aliceChannelState, alicePool,
)
if err != nil {
return nil, nil, nil, nil, err
}
alicePool.Start()
bobPool := lnwallet.NewSigPool(runtime.NumCPU(), bobSigner)
channelBob, err := lnwallet.NewLightningChannel(
bobSigner, pCache, bobChannelState,
bobSigner, pCache, bobChannelState, bobPool,
)
if err != nil {
return nil, nil, nil, nil, err
}
bobPool.Start()
// Now that the channel are open, simulate the start of a session by
// having Alice and Bob extend their revocation windows to each other.
@ -402,6 +408,7 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte,
restore := func() (*lnwallet.LightningChannel, *lnwallet.LightningChannel,
error) {
aliceStoredChannels, err := dbAlice.FetchOpenChannels(aliceKeyPub)
switch err {
case nil:
@ -434,8 +441,9 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte,
return nil, nil, errors.New("unable to find stored alice channel")
}
newAliceChannel, err := lnwallet.NewLightningChannel(aliceSigner,
nil, aliceStoredChannel)
newAliceChannel, err := lnwallet.NewLightningChannel(
aliceSigner, nil, aliceStoredChannel, alicePool,
)
if err != nil {
return nil, nil, errors.Errorf("unable to create new channel: %v",
err)
@ -473,8 +481,9 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte,
return nil, nil, errors.New("unable to find stored bob channel")
}
newBobChannel, err := lnwallet.NewLightningChannel(bobSigner,
nil, bobStoredChannel)
newBobChannel, err := lnwallet.NewLightningChannel(
bobSigner, nil, bobStoredChannel, bobPool,
)
if err != nil {
return nil, nil, errors.Errorf("unable to create new channel: %v",
err)
@ -485,7 +494,7 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte,
return channelAlice, channelBob, cleanUpFunc, restore, nil
}
// getChanID retrieves the channel point from nwire message.
// getChanID retrieves the channel point from an lnnwire message.
func getChanID(msg lnwire.Message) (lnwire.ChannelID, error) {
var chanID lnwire.ChannelID
switch msg := msg.(type) {

@ -5,10 +5,8 @@ import (
"container/list"
"crypto/sha256"
"fmt"
"runtime"
"sort"
"sync"
"sync/atomic"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg/chainhash"
@ -1260,8 +1258,6 @@ func compactLogs(ourLog, theirLog *updateLog,
//
// See the individual comments within the above methods for further details.
type LightningChannel struct {
shutdown int32 // To be used atomically.
// Signer is the main signer instances that will be responsible for
// signing any HTLC and commitment transaction generated by the state
// machine.
@ -1282,7 +1278,7 @@ type LightningChannel struct {
// validating signatures in parallel. This is utilized as an
// optimization to void serially signing or validating the HTLC
// signatures, of which there may be hundreds.
sigPool *sigPool
sigPool *SigPool
// pCache is the global preimage cache shared across all other
// LightningChannel instance. We'll use this cache either when we force
@ -1347,11 +1343,6 @@ type LightningChannel struct {
RemoteFundingKey *btcec.PublicKey
sync.RWMutex
cowg sync.WaitGroup
wg sync.WaitGroup
quit chan struct{}
}
// NewLightningChannel creates a new, active payment channel given an
@ -1360,7 +1351,8 @@ type LightningChannel struct {
// automatically persist pertinent state to the database in an efficient
// manner.
func NewLightningChannel(signer Signer, pCache PreimageCache,
state *channeldb.OpenChannel) (*LightningChannel, error) {
state *channeldb.OpenChannel,
sigPool *SigPool) (*LightningChannel, error) {
localCommit := state.LocalCommitment
remoteCommit := state.RemoteCommitment
@ -1375,9 +1367,8 @@ func NewLightningChannel(signer Signer, pCache PreimageCache,
)
lc := &LightningChannel{
// TODO(roasbeef): tune num sig workers?
sigPool: newSigPool(runtime.NumCPU(), signer),
Signer: signer,
sigPool: sigPool,
pCache: pCache,
currentHeight: localCommit.CommitHeight,
remoteCommitChain: newCommitmentChain(),
@ -1391,7 +1382,6 @@ func NewLightningChannel(signer Signer, pCache PreimageCache,
Capacity: state.Capacity,
LocalFundingKey: state.LocalChanCfg.MultiSigKey.PubKey,
RemoteFundingKey: state.RemoteChanCfg.MultiSigKey.PubKey,
quit: make(chan struct{}),
}
// With the main channel struct reconstructed, we'll now restore the
@ -1411,12 +1401,6 @@ func NewLightningChannel(signer Signer, pCache PreimageCache,
lc.createStateHintObfuscator()
// Finally, we'll kick of the signature job pool to handle any upcoming
// commitment state generation and validation.
if err := lc.sigPool.Start(); err != nil {
return nil, err
}
return lc, nil
}
@ -1467,23 +1451,6 @@ func (lc *LightningChannel) createStateHintObfuscator() {
}
}
// Stop gracefully shuts down any active goroutines spawned by the
// LightningChannel during regular duties.
func (lc *LightningChannel) Stop() {
if !atomic.CompareAndSwapInt32(&lc.shutdown, 0, 1) {
return
}
lc.sigPool.Stop()
close(lc.quit)
}
// WaitForClose blocks until the channel's close observer has terminated.
func (lc *LightningChannel) WaitForClose() {
lc.cowg.Wait()
}
// ResetState resets the state of the channel back to the default state. This
// ensures that any active goroutines which need to act based on on-chain
// events do so properly.
@ -2673,7 +2640,7 @@ func processRemoveEntry(htlc *PaymentDescriptor, ourBalance,
// asynchronously and in parallel.
func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing,
localChanCfg, remoteChanCfg *channeldb.ChannelConfig,
remoteCommitView *commitment) ([]signJob, chan struct{}, error) {
remoteCommitView *commitment) ([]SignJob, chan struct{}, error) {
txHash := remoteCommitView.txn.TxHash()
dustLimit := remoteChanCfg.DustLimit
@ -2684,7 +2651,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing,
// smaller (than its total capacity) and some HTLCs may be dust.
numSigs := (len(remoteCommitView.incomingHTLCs) +
len(remoteCommitView.outgoingHTLCs))
sigBatch := make([]signJob, 0, numSigs)
sigBatch := make([]SignJob, 0, numSigs)
var err error
cancelChan := make(chan struct{})
@ -2700,9 +2667,9 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing,
// If the HTLC isn't dust, then we'll create an empty sign job
// to add to the batch momentarily.
sigJob := signJob{}
sigJob.cancel = cancelChan
sigJob.resp = make(chan signJobResp, 1)
sigJob := SignJob{}
sigJob.Cancel = cancelChan
sigJob.Resp = make(chan SignJobResp, 1)
// As this is an incoming HTLC and we're sinning the commitment
// transaction of the remote node, we'll need to generate an
@ -2718,7 +2685,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing,
Hash: txHash,
Index: uint32(htlc.remoteOutputIndex),
}
sigJob.tx, err = createHtlcTimeoutTx(
sigJob.Tx, err = createHtlcTimeoutTx(
op, outputAmt, htlc.Timeout,
uint32(remoteChanCfg.CsvDelay),
keyRing.RevocationKey, keyRing.DelayKey,
@ -2730,7 +2697,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing,
// Finally, we'll generate a sign descriptor to generate a
// signature to give to the remote party for this commitment
// transaction. Note we use the raw HTLC amount.
sigJob.signDesc = SignDescriptor{
sigJob.SignDesc = SignDescriptor{
KeyDesc: localChanCfg.HtlcBasePoint,
SingleTweak: keyRing.LocalHtlcKeyTweak,
WitnessScript: htlc.theirWitnessScript,
@ -2738,10 +2705,10 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing,
Value: int64(htlc.Amount.ToSatoshis()),
},
HashType: txscript.SigHashAll,
SigHashes: txscript.NewTxSigHashes(sigJob.tx),
SigHashes: txscript.NewTxSigHashes(sigJob.Tx),
InputIndex: 0,
}
sigJob.outputIndex = htlc.remoteOutputIndex
sigJob.OutputIndex = htlc.remoteOutputIndex
sigBatch = append(sigBatch, sigJob)
}
@ -2751,9 +2718,9 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing,
continue
}
sigJob := signJob{}
sigJob.cancel = cancelChan
sigJob.resp = make(chan signJobResp, 1)
sigJob := SignJob{}
sigJob.Cancel = cancelChan
sigJob.Resp = make(chan SignJobResp, 1)
// As this is an outgoing HTLC and we're signing the commitment
// transaction of the remote node, we'll need to generate an
@ -2770,7 +2737,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing,
Hash: txHash,
Index: uint32(htlc.remoteOutputIndex),
}
sigJob.tx, err = createHtlcSuccessTx(
sigJob.Tx, err = createHtlcSuccessTx(
op, outputAmt, uint32(remoteChanCfg.CsvDelay),
keyRing.RevocationKey, keyRing.DelayKey,
)
@ -2781,7 +2748,7 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing,
// Finally, we'll generate a sign descriptor to generate a
// signature to give to the remote party for this commitment
// transaction. Note we use the raw HTLC amount.
sigJob.signDesc = SignDescriptor{
sigJob.SignDesc = SignDescriptor{
KeyDesc: localChanCfg.HtlcBasePoint,
SingleTweak: keyRing.LocalHtlcKeyTweak,
WitnessScript: htlc.theirWitnessScript,
@ -2789,10 +2756,10 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing,
Value: int64(htlc.Amount.ToSatoshis()),
},
HashType: txscript.SigHashAll,
SigHashes: txscript.NewTxSigHashes(sigJob.tx),
SigHashes: txscript.NewTxSigHashes(sigJob.Tx),
InputIndex: 0,
}
sigJob.outputIndex = htlc.remoteOutputIndex
sigJob.OutputIndex = htlc.remoteOutputIndex
sigBatch = append(sigBatch, sigJob)
}
@ -3059,26 +3026,23 @@ func (lc *LightningChannel) SignNextCommitment() (lnwire.Sig, []lnwire.Sig, erro
// order as they appear on the commitment transaction after BIP 69
// sorting.
sort.Slice(sigBatch, func(i, j int) bool {
return sigBatch[i].outputIndex < sigBatch[j].outputIndex
return sigBatch[i].OutputIndex < sigBatch[j].OutputIndex
})
// With the jobs sorted, we'll now iterate through all the responses to
// gather each of the signatures in order.
htlcSigs = make([]lnwire.Sig, 0, len(sigBatch))
for _, htlcSigJob := range sigBatch {
select {
case jobResp := <-htlcSigJob.resp:
// If an error occurred, then we'll cancel any other
// active jobs.
if jobResp.err != nil {
close(cancelChan)
return sig, htlcSigs, err
}
jobResp := <-htlcSigJob.Resp
htlcSigs = append(htlcSigs, jobResp.sig)
case <-lc.quit:
return sig, htlcSigs, fmt.Errorf("channel shutting down")
// If an error occurred, then we'll cancel any other active
// jobs.
if jobResp.Err != nil {
close(cancelChan)
return sig, htlcSigs, err
}
htlcSigs = append(htlcSigs, jobResp.Sig)
}
// As we're about to proposer a new commitment state for the remote
@ -3740,7 +3704,7 @@ func (lc *LightningChannel) validateCommitmentSanity(theirLogCounter,
// directly into the pool of workers.
func genHtlcSigValidationJobs(localCommitmentView *commitment,
keyRing *CommitmentKeyRing, htlcSigs []lnwire.Sig,
localChanCfg, remoteChanCfg *channeldb.ChannelConfig) ([]verifyJob, error) {
localChanCfg, remoteChanCfg *channeldb.ChannelConfig) ([]VerifyJob, error) {
txHash := localCommitmentView.txn.TxHash()
feePerKw := localCommitmentView.feePerKw
@ -3751,7 +3715,7 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment,
// length will be smaller than the total capacity.
numHtlcs := (len(localCommitmentView.incomingHTLCs) +
len(localCommitmentView.outgoingHTLCs))
verifyJobs := make([]verifyJob, 0, numHtlcs)
verifyJobs := make([]VerifyJob, 0, numHtlcs)
// We'll iterate through each output in the commitment transaction,
// populating the sigHash closure function if it's detected to be an
@ -3879,11 +3843,11 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment,
continue
}
verifyJobs = append(verifyJobs, verifyJob{
htlcIndex: htlcIndex,
pubKey: keyRing.RemoteHtlcKey,
sig: sig,
sigHash: sigHash,
verifyJobs = append(verifyJobs, VerifyJob{
HtlcIndex: htlcIndex,
PubKey: keyRing.RemoteHtlcKey,
Sig: sig,
SigHash: sigHash,
})
i++
@ -4083,34 +4047,30 @@ func (lc *LightningChannel) ReceiveNewCommitment(commitSig lnwire.Sig,
for i := 0; i < len(verifyJobs); i++ {
// In the case that a single signature is invalid, we'll exit
// early and cancel all the outstanding verification jobs.
select {
case htlcErr := <-verifyResps:
if htlcErr != nil {
close(cancelChan)
htlcErr := <-verifyResps
if htlcErr != nil {
close(cancelChan)
sig, err := lnwire.NewSigFromSignature(
htlcErr.sig,
)
if err != nil {
return err
}
sigHash, err := htlcErr.sigHash()
if err != nil {
return err
}
var txBytes bytes.Buffer
localCommitTx.Serialize(&txBytes)
return &InvalidHtlcSigError{
commitHeight: nextHeight,
htlcSig: sig.ToSignatureBytes(),
htlcIndex: htlcErr.htlcIndex,
sigHash: sigHash,
commitTx: txBytes.Bytes(),
}
sig, err := lnwire.NewSigFromSignature(
htlcErr.Sig,
)
if err != nil {
return err
}
sigHash, err := htlcErr.SigHash()
if err != nil {
return err
}
var txBytes bytes.Buffer
localCommitTx.Serialize(&txBytes)
return &InvalidHtlcSigError{
commitHeight: nextHeight,
htlcSig: sig.ToSignatureBytes(),
htlcIndex: htlcErr.HtlcIndex,
sigHash: sigHash,
commitTx: txBytes.Bytes(),
}
case <-lc.quit:
return fmt.Errorf("channel shutting down")
}
}

@ -1444,14 +1444,18 @@ func TestStateUpdatePersistence(t *testing.T) {
if err != nil {
t.Fatalf("unable to fetch channel: %v", err)
}
aliceChannelNew, err := NewLightningChannel(
aliceChannel.Signer, nil, aliceChannels[0],
aliceChannel.sigPool,
)
if err != nil {
t.Fatalf("unable to create new channel: %v", err)
}
bobChannelNew, err := NewLightningChannel(
bobChannel.Signer, nil, bobChannels[0],
bobChannel.sigPool,
)
if err != nil {
t.Fatalf("unable to create new channel: %v", err)
@ -2544,20 +2548,19 @@ func TestChanSyncFullySynced(t *testing.T) {
if err != nil {
t.Fatalf("unable to fetch channel: %v", err)
}
aliceChannelNew, err := NewLightningChannel(
aliceChannel.Signer, nil, aliceChannels[0],
aliceChannel.Signer, nil, aliceChannels[0], aliceChannel.sigPool,
)
if err != nil {
t.Fatalf("unable to create new channel: %v", err)
}
defer aliceChannelNew.Stop()
bobChannelNew, err := NewLightningChannel(
bobChannel.Signer, nil, bobChannels[0],
bobChannel.Signer, nil, bobChannels[0], bobChannel.sigPool,
)
if err != nil {
t.Fatalf("unable to create new channel: %v", err)
}
defer bobChannelNew.Stop()
assertNoChanSyncNeeded(t, aliceChannelNew, bobChannelNew)
}
@ -2576,6 +2579,7 @@ func restartChannel(channelOld *LightningChannel) (*LightningChannel, error) {
channelNew, err := NewLightningChannel(
channelOld.Signer, channelOld.pCache, nodeChannels[0],
channelOld.sigPool,
)
if err != nil {
return nil, err
@ -2786,7 +2790,6 @@ func TestChanSyncOweCommitment(t *testing.T) {
if err != nil {
t.Fatalf("unable to restart alice: %v", err)
}
defer aliceChannel.Stop()
assertAliceCommitRetransmit()
// TODO(roasbeef): restart bob as well???
@ -3053,7 +3056,6 @@ func TestChanSyncOweRevocation(t *testing.T) {
if err != nil {
t.Fatalf("unable to restart alice: %v", err)
}
defer aliceChannel.Stop()
assertAliceOwesRevoke()
// TODO(roasbeef): restart bob too???
@ -3233,7 +3235,6 @@ func TestChanSyncOweRevocationAndCommit(t *testing.T) {
if err != nil {
t.Fatalf("unable to restart channel: %v", err)
}
defer bobChannel.Stop()
assertBobSendsRevokeAndCommit()
// We'll now finish the state transition by having Alice process both
@ -3428,7 +3429,6 @@ func TestChanSyncOweRevocationAndCommitForceTransition(t *testing.T) {
if err != nil {
t.Fatalf("unable to restart channel: %v", err)
}
defer bobChannel.Stop()
if len(bobMsgsToSend) != 2 {
t.Fatalf("expected bob to send %v messages, instead "+
"sends: %v", 2, spew.Sdump(bobMsgsToSend))
@ -3775,12 +3775,10 @@ func TestChannelRetransmissionFeeUpdate(t *testing.T) {
if err != nil {
t.Fatalf("unable to restart alice: %v", err)
}
defer aliceChannel.Stop()
bobChannel, err = restartChannel(bobChannel)
if err != nil {
t.Fatalf("unable to restart channel: %v", err)
}
defer bobChannel.Stop()
// Bob doesn't get this message so upon reconnection, they need to
// synchronize. Alice should conclude that she owes Bob a commitment,
@ -4293,12 +4291,10 @@ func TestLockedInHtlcForwardingSkipAfterRestart(t *testing.T) {
if err != nil {
t.Fatalf("unable to restart alice: %v", err)
}
defer aliceChannel.Stop()
bobChannel, err = restartChannel(bobChannel)
if err != nil {
t.Fatalf("unable to restart bob: %v", err)
}
defer bobChannel.Stop()
// With both nodes restarted, Bob will now attempt to cancel one of
// Alice's HTLC's.
@ -4399,12 +4395,10 @@ func TestLockedInHtlcForwardingSkipAfterRestart(t *testing.T) {
if err != nil {
t.Fatalf("unable to restart alice: %v", err)
}
defer aliceChannel.Stop()
bobChannel, err = restartChannel(bobChannel)
if err != nil {
t.Fatalf("unable to restart bob: %v", err)
}
defer bobChannel.Stop()
// Readd the Fail to both Alice and Bob's channels, as the non-committed
// update will not have survived the restart.
@ -5590,19 +5584,19 @@ func TestChannelRestoreUpdateLogs(t *testing.T) {
// the update logs up to the correct state set up above.
newAliceChannel, err := NewLightningChannel(
aliceChannel.Signer, nil, aliceChannel.channelState,
aliceChannel.sigPool,
)
if err != nil {
t.Fatalf("unable to create new channel: %v", err)
}
defer newAliceChannel.Stop()
newBobChannel, err := NewLightningChannel(
bobChannel.Signer, nil, bobChannel.channelState,
bobChannel.sigPool,
)
if err != nil {
t.Fatalf("unable to create new channel: %v", err)
}
defer newBobChannel.Stop()
// compare all the logs between the old and new channels, to make sure
// they all got restored properly.
@ -5669,13 +5663,14 @@ func assertInLogs(t *testing.T, channel *LightningChannel, numAddsLocal,
// expected state.
func restoreAndAssert(t *testing.T, channel *LightningChannel, numAddsLocal,
numFailsLocal, numAddsRemote, numFailsRemote int) {
newChannel, err := NewLightningChannel(
channel.Signer, nil, channel.channelState,
channel.sigPool,
)
if err != nil {
t.Fatalf("unable to create new channel: %v", err)
}
defer newChannel.Stop()
assertInLog(t, newChannel.localUpdateLog, numAddsLocal, numFailsLocal)
assertInLog(t, newChannel.remoteUpdateLog, numAddsRemote, numFailsRemote)
@ -5863,12 +5858,10 @@ func TestDuplicateFailRejection(t *testing.T) {
if err != nil {
t.Fatalf("unable to restart channel: %v", err)
}
defer bobChannel.Stop()
aliceChannel, err = restartChannel(aliceChannel)
if err != nil {
t.Fatalf("unable to restart channel: %v", err)
}
defer aliceChannel.Stop()
// If we try to fail the same HTLC again, then we should get an error.
err = bobChannel.FailHTLC(0, []byte("failreason"), nil, nil, nil)
@ -5945,12 +5938,10 @@ func TestDuplicateSettleRejection(t *testing.T) {
if err != nil {
t.Fatalf("unable to restart channel: %v", err)
}
defer bobChannel.Stop()
aliceChannel, err = restartChannel(aliceChannel)
if err != nil {
t.Fatalf("unable to restart channel: %v", err)
}
defer aliceChannel.Stop()
// If we try to fail the same HTLC again, then we should get an error.
err = bobChannel.SettleHTLC(alicePreimage, uint64(0), nil, nil, nil)
@ -5983,9 +5974,9 @@ func TestChannelRestoreCommitHeight(t *testing.T) {
channel *LightningChannel, remoteLog bool, htlcIndex uint64,
expLocal, expRemote uint64) *LightningChannel {
channel.Stop()
newChannel, err := NewLightningChannel(
channel.Signer, nil, channel.channelState,
channel.sigPool,
)
if err != nil {
t.Fatalf("unable to create new channel: %v", err)
@ -6152,7 +6143,4 @@ func TestChannelRestoreCommitHeight(t *testing.T) {
// HTLC an add height.
bobChannel = restoreAndAssertCommitHeights(t, bobChannel, true, 0, 2, 1)
bobChannel = restoreAndAssertCommitHeights(t, bobChannel, true, 1, 2, 2)
aliceChannel.Stop()
bobChannel.Stop()
}

@ -19,119 +19,118 @@ const (
// TODO(roasbeef): job buffer pool?
)
// verifyJob is a job sent to the sigPool to verify a signature on a
// transaction. The items contained in the struct are necessary and sufficient
// to verify the full signature. The passed sigHash closure function should be
// set to a function that generates the relevant sighash.
// VerifyJob is a job sent to the sigPool sig pool to verify a signature
// on a transaction. The items contained in the struct are necessary and
// sufficient to verify the full signature. The passed sigHash closure function
// should be set to a function that generates the relevant sighash.
//
// TODO(roasbeef): when we move to ecschnorr, make into batch signature
// verification using bos-coster (or pip?).
type verifyJob struct {
// pubKey is the public key that was used to generate the purported
type VerifyJob struct {
// PubKey is the public key that was used to generate the purported
// valid signature. Note that with the current channel construction,
// this public key will likely have been tweaked using the current per
// commitment point for a particular commitment transactions.
pubKey *btcec.PublicKey
PubKey *btcec.PublicKey
// sig is the raw signature generated using the above public key. This
// Sig is the raw signature generated using the above public key. This
// is the signature to be verified.
sig *btcec.Signature
Sig *btcec.Signature
// sigHash is a function closure generates the sighashes that the
// SigHash is a function closure generates the sighashes that the
// passed signature is known to have signed.
sigHash func() ([]byte, error)
SigHash func() ([]byte, error)
// htlcIndex is the index of the HTLC from the PoV of the remote
// HtlcIndex is the index of the HTLC from the PoV of the remote
// party's update log.
htlcIndex uint64
HtlcIndex uint64
// cancel is a channel that should be closed if the caller wishes to
// Cancel is a channel that should be closed if the caller wishes to
// cancel all pending verification jobs part of a single batch. This
// channel is to be closed in the case that a single signature in a
// batch has been returned as invalid, as there is no need to verify
// the remainder of the signatures.
cancel chan struct{}
Cancel chan struct{}
// errResp is the channel that the result of the signature verification
// ErrResp is the channel that the result of the signature verification
// is to be sent over. In the see that the signature is valid, a nil
// error will be passed. Otherwise, a concrete error detailing the
// issue will be passed.
errResp chan *htlcIndexErr
ErrResp chan *HtlcIndexErr
}
// verifyJobErr is a special type of error that also includes a pointer to the
// original validation job. Ths error message allows us to craft more detailed
// HtlcIndexErr is a special type of error that also includes a pointer to the
// original validation job. This error message allows us to craft more detailed
// errors at upper layers.
type htlcIndexErr struct {
type HtlcIndexErr struct {
error
*verifyJob
*VerifyJob
}
// signJob is a job sent to the sigPool to generate a valid signature according
// to the passed SignDescriptor for the passed transaction. Jobs are intended
// to be sent in batches in order to parallelize the job of generating
// signatures for a new commitment transaction.
type signJob struct {
// signDesc is intended to be a full populated SignDescriptor which
// SignJob is a job sent to the sigPool sig pool to generate a valid
// signature according to the passed SignDescriptor for the passed transaction.
// Jobs are intended to be sent in batches in order to parallelize the job of
// generating signatures for a new commitment transaction.
type SignJob struct {
// SignDesc is intended to be a full populated SignDescriptor which
// encodes the necessary material (keys, witness script, etc) required
// to generate a valid signature for the specified input.
signDesc SignDescriptor
SignDesc SignDescriptor
// tx is the transaction to be signed. This is required to generate the
// Tx is the transaction to be signed. This is required to generate the
// proper sighash for the input to be signed.
tx *wire.MsgTx
Tx *wire.MsgTx
// outputIndex is the output index of the HTLC on the commitment
// OutputIndex is the output index of the HTLC on the commitment
// transaction being signed.
outputIndex int32
OutputIndex int32
// cancel is a channel that should be closed if the caller wishes to
// Cancel is a channel that should be closed if the caller wishes to
// abandon all pending sign jobs part of a single batch.
cancel chan struct{}
Cancel chan struct{}
// resp is the channel that the response to this particular signJob
// Resp is the channel that the response to this particular SignJob
// will be sent over.
//
// TODO(roasbeef): actually need to allow caller to set, need to retain
// order mark commit sig as special
resp chan signJobResp
Resp chan SignJobResp
}
// signJobResp is the response to a sign job. Both channels are to be read in
// SignJobResp is the response to a sign job. Both channels are to be read in
// order to ensure no unnecessary goroutine blocking occurs. Additionally, both
// channels should be buffered.
type signJobResp struct {
// sig is the generated signature for a particular signJob In the case
type SignJobResp struct {
// Sig is the generated signature for a particular SignJob In the case
// of an error during signature generation, then this value sent will
// be nil.
sig lnwire.Sig
Sig lnwire.Sig
// err is the error that occurred when executing the specified
// Err is the error that occurred when executing the specified
// signature job. In the case that no error occurred, this value will
// be nil.
err error
Err error
}
// sigPool is a struct that is meant to allow the current channel state machine
// to parallelize all signature generation and verification. This struct is
// needed as _each_ HTLC when creating a commitment transaction requires a
// signature, and similarly a receiver of a new commitment must verify all the
// HTLC signatures included within the CommitSig message. A pool of workers
// will be maintained by the sigPool. Batches of jobs (either to sign or
// verify) can be sent to the pool of workers which will asynchronously perform
// the specified job.
//
// TODO(roasbeef): rename?
// * ecdsaPool?
type sigPool struct {
// TODO(roasbeef); fix description
// SigPool is a struct that is meant to allow the current channel state
// machine to parallelize all signature generation and verification. This
// struct is needed as _each_ HTLC when creating a commitment transaction
// requires a signature, and similarly a receiver of a new commitment must
// verify all the HTLC signatures included within the CommitSig message. A pool
// of workers will be maintained by the sigPool. Batches of jobs (either
// to sign or verify) can be sent to the pool of workers which will
// asynchronously perform the specified job.
type SigPool struct {
started uint32 // To be used atomically.
stopped uint32 // To be used atomically.
signer Signer
verifyJobs chan verifyJob
signJobs chan signJob
verifyJobs chan VerifyJob
signJobs chan SignJob
wg sync.WaitGroup
quit chan struct{}
@ -139,22 +138,22 @@ type sigPool struct {
numWorkers int
}
// newSigPool creates a new signature pool with the specified number of
// NewSigPool creates a new signature pool with the specified number of
// workers. The recommended parameter for the number of works is the number of
// physical CPU cores available on the target machine.
func newSigPool(numWorkers int, signer Signer) *sigPool {
return &sigPool{
func NewSigPool(numWorkers int, signer Signer) *SigPool {
return &SigPool{
signer: signer,
numWorkers: numWorkers,
verifyJobs: make(chan verifyJob, jobBuffer),
signJobs: make(chan signJob, jobBuffer),
verifyJobs: make(chan VerifyJob, jobBuffer),
signJobs: make(chan SignJob, jobBuffer),
quit: make(chan struct{}),
}
}
// Start starts of all goroutines that the sigPool needs to carry out its
// duties.
func (s *sigPool) Start() error {
// Start starts of all goroutines that the sigPool sig pool needs to
// carry out its duties.
func (s *SigPool) Start() error {
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
return nil
}
@ -169,7 +168,7 @@ func (s *sigPool) Start() error {
// Stop signals any active workers carrying out jobs to exit so the sigPool can
// gracefully shutdown.
func (s *sigPool) Stop() error {
func (s *SigPool) Stop() error {
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
return nil
}
@ -180,11 +179,11 @@ func (s *sigPool) Stop() error {
return nil
}
// poolWorker is the main worker goroutine within the sigPool. Individual
// batches are distributed amongst each of the active workers. The workers then
// execute the task based on the type of job, and return the result back to
// caller.
func (s *sigPool) poolWorker() {
// poolWorker is the main worker goroutine within the sigPool sig pool.
// Individual batches are distributed amongst each of the active workers. The
// workers then execute the task based on the type of job, and return the
// result back to caller.
func (s *SigPool) poolWorker() {
defer s.wg.Done()
for {
@ -195,16 +194,17 @@ func (s *sigPool) poolWorker() {
// send the result along with a possible error back to the
// caller.
case sigMsg := <-s.signJobs:
rawSig, err := s.signer.SignOutputRaw(sigMsg.tx,
&sigMsg.signDesc)
rawSig, err := s.signer.SignOutputRaw(
sigMsg.Tx, &sigMsg.SignDesc,
)
if err != nil {
select {
case sigMsg.resp <- signJobResp{
sig: lnwire.Sig{},
err: err,
case sigMsg.Resp <- SignJobResp{
Sig: lnwire.Sig{},
Err: err,
}:
continue
case <-sigMsg.cancel:
case <-sigMsg.Cancel:
continue
case <-s.quit:
return
@ -213,11 +213,11 @@ func (s *sigPool) poolWorker() {
sig, err := lnwire.NewSigFromRawSignature(rawSig)
select {
case sigMsg.resp <- signJobResp{
sig: sig,
err: err,
case sigMsg.Resp <- SignJobResp{
Sig: sig,
Err: err,
}:
case <-sigMsg.cancel:
case <-sigMsg.Cancel:
continue
case <-s.quit:
return
@ -227,58 +227,60 @@ func (s *sigPool) poolWorker() {
// world. We'll attempt to construct the sighash, parse the
// signature, and finally verify the signature.
case verifyMsg := <-s.verifyJobs:
sigHash, err := verifyMsg.sigHash()
sigHash, err := verifyMsg.SigHash()
if err != nil {
select {
case verifyMsg.errResp <- &htlcIndexErr{
case verifyMsg.ErrResp <- &HtlcIndexErr{
error: err,
verifyJob: &verifyMsg,
VerifyJob: &verifyMsg,
}:
continue
case <-verifyMsg.cancel:
case <-verifyMsg.Cancel:
continue
}
}
rawSig := verifyMsg.sig
rawSig := verifyMsg.Sig
if !rawSig.Verify(sigHash, verifyMsg.pubKey) {
if !rawSig.Verify(sigHash, verifyMsg.PubKey) {
err := fmt.Errorf("invalid signature "+
"sighash: %x, sig: %x", sigHash, rawSig.Serialize())
"sighash: %x, sig: %x", sigHash,
rawSig.Serialize())
select {
case verifyMsg.errResp <- &htlcIndexErr{
case verifyMsg.ErrResp <- &HtlcIndexErr{
error: err,
verifyJob: &verifyMsg,
VerifyJob: &verifyMsg,
}:
case <-verifyMsg.cancel:
case <-verifyMsg.Cancel:
case <-s.quit:
return
}
} else {
select {
case verifyMsg.errResp <- nil:
case <-verifyMsg.cancel:
case verifyMsg.ErrResp <- nil:
case <-verifyMsg.Cancel:
case <-s.quit:
return
}
}
// The sigPool is exiting, so we will as well.
// The sigPool sig pool is exiting, so we will as well.
case <-s.quit:
return
}
}
}
// SubmitSignBatch submits a batch of signature jobs to the sigPool. The
// response and cancel channels for each of the signJob's are expected to be
// fully populated, as the response for each job will be sent over the response
// channel within the job itself.
func (s *sigPool) SubmitSignBatch(signJobs []signJob) {
// SubmitSignBatch submits a batch of signature jobs to the sigPool. The
// response and cancel channels for each of the SignJob's are expected to be
// fully populated, as the response for each job will be sent over the
// response channel within the job itself.
func (s *SigPool) SubmitSignBatch(signJobs []SignJob) {
for _, job := range signJobs {
select {
case s.signJobs <- job:
case <-job.cancel:
case <-job.Cancel:
// TODO(roasbeef): return error?
case <-s.quit:
return
@ -291,18 +293,18 @@ func (s *sigPool) SubmitSignBatch(signJobs []signJob) {
// denoting if signature verification was valid or not. The passed cancelChan
// allows the caller to cancel all pending jobs in the case that they wish to
// bail early.
func (s *sigPool) SubmitVerifyBatch(verifyJobs []verifyJob,
cancelChan chan struct{}) <-chan *htlcIndexErr {
func (s *SigPool) SubmitVerifyBatch(verifyJobs []VerifyJob,
cancelChan chan struct{}) <-chan *HtlcIndexErr {
errChan := make(chan *htlcIndexErr, len(verifyJobs))
errChan := make(chan *HtlcIndexErr, len(verifyJobs))
for _, job := range verifyJobs {
job.cancel = cancelChan
job.errResp = errChan
job.Cancel = cancelChan
job.ErrResp = errChan
select {
case s.verifyJobs <- job:
case <-job.cancel:
case <-job.Cancel:
return errChan
}
}

@ -309,18 +309,24 @@ func CreateTestChannels() (*LightningChannel, *LightningChannel, func(), error)
}
// TODO(roasbeef): make mock version of pre-image store
alicePool := NewSigPool(1, aliceSigner)
channelAlice, err := NewLightningChannel(
aliceSigner, pCache, aliceChannelState,
aliceSigner, pCache, aliceChannelState, alicePool,
)
if err != nil {
return nil, nil, nil, err
}
alicePool.Start()
bobPool := NewSigPool(1, bobSigner)
channelBob, err := NewLightningChannel(
bobSigner, pCache, bobChannelState,
bobSigner, pCache, bobChannelState, bobPool,
)
if err != nil {
return nil, nil, nil, err
}
bobPool.Start()
err = SetStateNumHint(
aliceCommitTx, 0, channelAlice.stateHintObfuscator,
@ -346,8 +352,8 @@ func CreateTestChannels() (*LightningChannel, *LightningChannel, func(), error)
os.RemoveAll(bobPath)
os.RemoveAll(alicePath)
channelAlice.Stop()
channelBob.Stop()
alicePool.Stop()
bobPool.Stop()
}
// Now that the channel are open, simulate the start of a session by

19
peer.go

@ -383,6 +383,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
for _, dbChan := range chans {
lnChan, err := lnwallet.NewLightningChannel(
p.server.cc.signer, p.server.witnessBeacon, dbChan,
p.server.sigPool,
)
if err != nil {
return err
@ -400,7 +401,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
if dbChan.ChanStatus() != channeldb.Default {
peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+
"start.", chanPoint, dbChan.ChanStatus())
lnChan.Stop()
continue
}
@ -409,13 +409,11 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
if _, ok := p.failedChannels[chanID]; ok {
peerLog.Warnf("ChannelPoint(%v) is failed, won't "+
"start.", chanPoint)
lnChan.Stop()
continue
}
_, currentHeight, err := p.server.cc.chainIO.GetBestBlock()
if err != nil {
lnChan.Stop()
return err
}
@ -425,7 +423,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
graph := p.server.chanDB.ChannelGraph()
info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint)
if err != nil && err != channeldb.ErrEdgeNotFound {
lnChan.Stop()
return err
}
@ -473,7 +470,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
*chanPoint,
)
if err != nil {
lnChan.Stop()
return err
}
@ -483,7 +479,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
currentHeight, true,
)
if err != nil {
lnChan.Stop()
return fmt.Errorf("unable to add link %v to switch: %v",
chanPoint, err)
}
@ -1596,7 +1591,7 @@ out:
// easily according to its channel ID.
lnChan, err := lnwallet.NewLightningChannel(
p.server.cc.signer, p.server.witnessBeacon,
newChan,
newChan, p.server.sigPool,
)
if err != nil {
p.activeChanMtx.Unlock()
@ -1624,7 +1619,6 @@ out:
"block: %v", err)
peerLog.Errorf(err.Error())
lnChan.Stop()
newChanReq.err <- err
continue
}
@ -1636,7 +1630,6 @@ out:
"chain events: %v", err)
peerLog.Errorf(err.Error())
lnChan.Stop()
newChanReq.err <- err
continue
}
@ -1666,7 +1659,6 @@ out:
p.PubKey())
peerLog.Errorf(err.Error())
lnChan.Stop()
newChanReq.err <- err
continue
}
@ -2019,8 +2011,6 @@ func (p *peer) finalizeChanClosure(chanCloser *channelCloser) {
}
}
chanCloser.cfg.channel.Stop()
// Next, we'll launch a goroutine which will request to be notified by
// the ChainNotifier once the closure transaction obtains a single
// confirmation.
@ -2121,10 +2111,7 @@ func (p *peer) WipeChannel(chanPoint *wire.OutPoint) error {
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
p.activeChanMtx.Lock()
if channel, ok := p.activeChannels[chanID]; ok {
channel.Stop()
delete(p.activeChannels, chanID)
}
delete(p.activeChannels, chanID)
p.activeChanMtx.Unlock()
// Instruct the HtlcSwitch to close this link as the channel is no

@ -1431,7 +1431,6 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
if err != nil {
return err
}
channel.Stop()
// If a force closure was requested, then we'll handle all the details
// around the creation and broadcast of the unilateral closure
@ -1667,7 +1666,7 @@ func (r *rpcServer) fetchOpenDbChannel(chanPoint wire.OutPoint) (
// fetchActiveChannel attempts to locate a channel identified by its channel
// point from the database's set of all currently opened channels and
// return it as a fully popuplated state machine
// return it as a fully populated state machine
func (r *rpcServer) fetchActiveChannel(chanPoint wire.OutPoint) (
*lnwallet.LightningChannel, error) {
@ -1680,7 +1679,7 @@ func (r *rpcServer) fetchActiveChannel(chanPoint wire.OutPoint) (
// we create a fully populated channel state machine which
// uses the db channel as backing storage.
return lnwallet.NewLightningChannel(
r.server.cc.wallet.Cfg.Signer, nil, dbChan,
r.server.cc.wallet.Cfg.Signer, nil, dbChan, nil,
)
}

@ -11,6 +11,7 @@ import (
"net"
"path/filepath"
"regexp"
"runtime"
"strconv"
"sync"
"sync/atomic"
@ -161,6 +162,8 @@ type server struct {
connMgr *connmgr.ConnManager
sigPool *lnwallet.SigPool
// globalFeatures feature vector which affects HTLCs and thus are also
// advertised to other nodes.
globalFeatures *lnwire.FeatureVector
@ -258,8 +261,9 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
sphinxRouter := sphinx.NewRouter(privKey, activeNetParams.Params, replayLog)
s := &server{
chanDB: chanDB,
cc: cc,
chanDB: chanDB,
cc: cc,
sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer),
invoices: newInvoiceRegistry(chanDB),
@ -947,6 +951,9 @@ func (s *server) Start() error {
// sufficient number of confirmations, or when the input for the
// funding transaction is spent in an attempt at an uncooperative close
// by the counterparty.
if err := s.sigPool.Start(); err != nil {
return err
}
if err := s.cc.chainNotifier.Start(); err != nil {
return err
}
@ -1034,6 +1041,7 @@ func (s *server) Stop() error {
}
// Shutdown the wallet, funding manager, and the rpc server.
s.sigPool.Stop()
s.cc.chainNotifier.Stop()
s.chanRouter.Stop()
s.htlcSwitch.Stop()

@ -298,18 +298,23 @@ func createTestPeer(notifier chainntnfs.ChainNotifier,
aliceSigner := &mockSigner{aliceKeyPriv}
bobSigner := &mockSigner{bobKeyPriv}
alicePool := lnwallet.NewSigPool(1, aliceSigner)
channelAlice, err := lnwallet.NewLightningChannel(
aliceSigner, nil, aliceChannelState,
aliceSigner, nil, aliceChannelState, alicePool,
)
if err != nil {
return nil, nil, nil, nil, err
}
alicePool.Start()
bobPool := lnwallet.NewSigPool(1, bobSigner)
channelBob, err := lnwallet.NewLightningChannel(
bobSigner, nil, bobChannelState,
bobSigner, nil, bobChannelState, bobPool,
)
if err != nil {
return nil, nil, nil, nil, err
}
bobPool.Start()
chainIO := &mockChainIO{}
wallet := &lnwallet.LightningWallet{