lnwallet: introduce the sigPool struct to optimize state creation+verification
This commit introduce a new struct which acts as a companion struct to the channel state machine struct. With the new commitment state machine, we require a signature to be signed and validated for each outstanding non-trimmed HTLC within the commitment state. As it’s possible to have hundreds of HTLC’s on a given commitment transaction, rather than perform all ECDSA operations in serial, we instead aim to parallelize the computation with a worker pool of sign+verify workers. The two primary methods of the sigPool are SubmitVerifyBatch and SubmitSignBatch which allow a caller to trigger an asynchronous job execution when creating or validating a new commitment state.
This commit is contained in:
parent
30b1cbc1fd
commit
b5044e9217
303
lnwallet/sigpool.go
Normal file
303
lnwallet/sigpool.go
Normal file
@ -0,0 +1,303 @@
|
||||
package lnwallet
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/roasbeef/btcd/btcec"
|
||||
"github.com/roasbeef/btcd/wire"
|
||||
)
|
||||
|
||||
const (
|
||||
// jobBuffer is a constant the represents the buffer of jobs in the two
|
||||
// main queues. This allows clients avoid necessarily blocking when
|
||||
// submitting jobs into the queue.
|
||||
jobBuffer = 100
|
||||
|
||||
// TODO(roasbeef): job buffer pool?
|
||||
)
|
||||
|
||||
// verifyJob is a job sent to the sigPool to verify a signature on a
|
||||
// transaction. The items contained in the struct are necessary and sufficient
|
||||
// to verify the full signature. The passed sigHash closure function should be
|
||||
// set to a function that generates the relevant sighash.
|
||||
//
|
||||
// TODO(roasbeef): when we move to ecschnorr, make into batch signature
|
||||
// verification using bos-coster (or pip?).
|
||||
type verifyJob struct {
|
||||
// pubKey is the public key that was used to generate the purported
|
||||
// valid signature. Note that with the current channel construction,
|
||||
// this public key will likely have been tweaked using the current per
|
||||
// commitment point for a particular commitment transactions.
|
||||
pubKey *btcec.PublicKey
|
||||
|
||||
// sig is the raw signature generated using the above public key. This
|
||||
// is the signature to be verified.
|
||||
sig *btcec.Signature
|
||||
|
||||
// sigHash is a function closure generates the sighashes that the
|
||||
// passed signature is known to have signed.
|
||||
sigHash func() ([]byte, error)
|
||||
|
||||
// cancel is a channel that should be closed if the caller wishes to
|
||||
// cancel all pending verification jobs part of a single batch. This
|
||||
// channel is to be closed in the case that a single signature in a
|
||||
// batch has been returned as invalid, as there is no need to verify
|
||||
// the remainder of the signatures.
|
||||
cancel chan struct{}
|
||||
|
||||
// errResp is the channel that the result of the signature verification
|
||||
// is to be sent over. In the see that the signature is valid, a nil
|
||||
// error will be passed. Otherwise, a concrete error detailing the
|
||||
// issue will be passed.
|
||||
errResp chan error
|
||||
}
|
||||
|
||||
// signJob is a job sent to the sigPool to generate a valid signature according
|
||||
// to the passed SignDescriptor for the passed transaction. Jobs are intended
|
||||
// to be sent in batches in order to parallelize the job of generating
|
||||
// signatures for a new commitment transaction.
|
||||
type signJob struct {
|
||||
// signDesc is intended to be a full populated SignDescriptor which
|
||||
// encodes the necessary material (keys, witness script, etc) required
|
||||
// to generate a valid signature for the specified input.
|
||||
signDesc SignDescriptor
|
||||
|
||||
// tx is the transaction to be signed. This is required to generate the
|
||||
// proper sighash for the input to be signed.
|
||||
tx *wire.MsgTx
|
||||
|
||||
// outputIndex...
|
||||
outputIndex int32
|
||||
|
||||
// cancel is a channel that should be closed if the caller wishes to
|
||||
// abandon all pending sign jobs part of a single batch.
|
||||
cancel chan struct{}
|
||||
|
||||
// resp is the channel that the response to this particular signJob
|
||||
// will be sent over.
|
||||
//
|
||||
// TODO(roasbeef): actually need to allow caller to set, need to retain
|
||||
// order mark commit sig as special
|
||||
resp chan signJobResp
|
||||
}
|
||||
|
||||
// sortableSignBatch is a type wrapper around a slice of signJobs which is able
|
||||
// to sort each job according to tis outputs index. Such sorting is necessary
|
||||
// as when creating a new commitment state, we need to send over all the HTLC
|
||||
// signatures (if any) in the exact order the appears on the commitment
|
||||
// transaction after BIP 69 sorting.
|
||||
type sortableSignBatch []signJob
|
||||
|
||||
// Len returns the number of items sortable batch of sign jobs. It is part of
|
||||
// the sort.Interface implementation.
|
||||
func (s sortableSignBatch) Len() int { return len(s) }
|
||||
|
||||
// Less returns whether the item in the batch with index i should sort before
|
||||
// the item with index j. It is part of the sort.Interface implementation.
|
||||
func (s sortableSignBatch) Less(i, j int) bool {
|
||||
return s[i].outputIndex < s[j].outputIndex
|
||||
}
|
||||
|
||||
// Swap swaps the items at the passed indices in the priority queue. It is part
|
||||
// of the sort.Interface implementation.
|
||||
func (s sortableSignBatch) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
||||
// signJobResp is the response to a sign job. Both channels are to be read in
|
||||
// order to ensure no unnecessary goroutine blocking occurs. Additionally, both
|
||||
// channels should be buffered.
|
||||
type signJobResp struct {
|
||||
// sig is the generated signature for a particular signJob In the case
|
||||
// of an error during signature generation, then this value sent will
|
||||
// be nil.
|
||||
sig *btcec.Signature
|
||||
|
||||
// err is the error that occurred when executing the specified
|
||||
// signature job. In the case that no error occurred, this value will
|
||||
// be nil.
|
||||
err error
|
||||
}
|
||||
|
||||
// sigPool is a struct that is meant to allow the current channel state machine
|
||||
// to parallelize all signature generation and verification. This struct is
|
||||
// needed as _each_ HTLC when creating a commitment transaction requires a
|
||||
// signature, and similarly a receiver of a new commitment must verify all the
|
||||
// HTLC signatures included within the CommitSig message. A pool of workers
|
||||
// will be maintained by the sigPool. Batches of jobs (either to sign or
|
||||
// verify) can be sent to the pool of workers which will asynchronously perform
|
||||
// the specified job.
|
||||
//
|
||||
// TODO(roasbeef): rename?
|
||||
// * ecdsaPool?
|
||||
type sigPool struct {
|
||||
started uint32
|
||||
stopped uint32
|
||||
|
||||
signer Signer
|
||||
|
||||
verifyJobs chan verifyJob
|
||||
signJobs chan signJob
|
||||
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
|
||||
numWorkers int
|
||||
}
|
||||
|
||||
// newSigPool creates a new signature pool with the specified number of
|
||||
// workers. The recommended parameter for the number of works is the number of
|
||||
// physical CPU cores available on the target machine.
|
||||
func newSigPool(numWorkers int, signer Signer) *sigPool {
|
||||
return &sigPool{
|
||||
signer: signer,
|
||||
numWorkers: numWorkers,
|
||||
verifyJobs: make(chan verifyJob, jobBuffer),
|
||||
signJobs: make(chan signJob, jobBuffer),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts of all goroutines that the sigPool needs to carry out its
|
||||
// duties.
|
||||
func (s *sigPool) Start() error {
|
||||
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < s.numWorkers; i++ {
|
||||
s.wg.Add(1)
|
||||
go s.poolWorker()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop signals any active workers carrying out jobs to exit so the sigPool can
|
||||
// gracefully shutdown.
|
||||
func (s *sigPool) Stop() error {
|
||||
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
|
||||
return nil
|
||||
}
|
||||
|
||||
close(s.quit)
|
||||
s.wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// poolWorker is the main worker goroutine wtihin the sigPool. Individual
|
||||
// batches are distributed amongst each of the active workers. The workers then
|
||||
// execute the task based on the type of job, and return the result back to
|
||||
// caller.
|
||||
func (s *sigPool) poolWorker() {
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
|
||||
// We've just received a new signature job. Given the items
|
||||
// contained within the message, we'll craft a signature and
|
||||
// send the result along with a possible error back to the
|
||||
// caller.
|
||||
case sigMsg := <-s.signJobs:
|
||||
rawSig, err := s.signer.SignOutputRaw(sigMsg.tx,
|
||||
&sigMsg.signDesc)
|
||||
if err != nil {
|
||||
select {
|
||||
case sigMsg.resp <- signJobResp{
|
||||
sig: nil,
|
||||
err: err,
|
||||
}:
|
||||
continue
|
||||
case <-sigMsg.cancel:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
sig, err := btcec.ParseSignature(rawSig, btcec.S256())
|
||||
select {
|
||||
case sigMsg.resp <- signJobResp{
|
||||
sig: sig,
|
||||
err: err,
|
||||
}:
|
||||
case <-sigMsg.cancel:
|
||||
continue
|
||||
}
|
||||
|
||||
// We've just received a new verification job from the outside
|
||||
// world. We'll attempt to construct the sighash, parse the
|
||||
// signature, and finally verify the signature.
|
||||
case verifyMsg := <-s.verifyJobs:
|
||||
sigHash, err := verifyMsg.sigHash()
|
||||
if err != nil {
|
||||
select {
|
||||
case verifyMsg.errResp <- err:
|
||||
continue
|
||||
case <-verifyMsg.cancel:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
rawSig := verifyMsg.sig
|
||||
|
||||
if !rawSig.Verify(sigHash, verifyMsg.pubKey) {
|
||||
err := fmt.Errorf("invalid signature "+
|
||||
"sighash: %x, sig: %x", sigHash, rawSig.Serialize())
|
||||
select {
|
||||
case verifyMsg.errResp <- err:
|
||||
case <-verifyMsg.cancel:
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case verifyMsg.errResp <- nil:
|
||||
case <-verifyMsg.cancel:
|
||||
}
|
||||
}
|
||||
|
||||
// The sigPool is exiting, so we will as well.
|
||||
case <-s.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SubmitSignBatch submits a batch of signature jobs to the sigPool. The
|
||||
// response and cancel channels for each of the signJob's are expected to be
|
||||
// fully populated, as the response for each job will be sent over the response
|
||||
// channel within the job itself.
|
||||
func (s *sigPool) SubmitSignBatch(signJobs []signJob) {
|
||||
for _, job := range signJobs {
|
||||
select {
|
||||
case s.signJobs <- job:
|
||||
case <-job.cancel:
|
||||
// TODO(roasbeef): return error?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SubmitVerifyBatch submits a batch of verification jobs to the sigPool. For
|
||||
// each job submitted, an error will be passed into the returned channel
|
||||
// denoting if signature verification was valid or not. The passed cancelChan
|
||||
// allows the caller to cancel all pending jobs in the case that they wish to
|
||||
// bail early.
|
||||
func (s *sigPool) SubmitVerifyBatch(verifyJobs []verifyJob,
|
||||
cancelChan chan struct{}) <-chan error {
|
||||
|
||||
errChan := make(chan error, len(verifyJobs))
|
||||
|
||||
for _, job := range verifyJobs {
|
||||
job.cancel = cancelChan
|
||||
job.errResp = errChan
|
||||
|
||||
select {
|
||||
case s.verifyJobs <- job:
|
||||
case <-job.cancel:
|
||||
return errChan
|
||||
}
|
||||
}
|
||||
|
||||
return errChan
|
||||
}
|
Loading…
Reference in New Issue
Block a user