281 lines
8.1 KiB
Go
281 lines
8.1 KiB
Go
|
package lnwallet
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
|
||
|
"github.com/roasbeef/btcd/btcec"
|
||
|
"github.com/roasbeef/btcd/wire"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
// jobBuffer is a constant the represents the buffer of jobs in the two
|
||
|
// main queues. This allows clients avoid necessarily blocking when
|
||
|
// submitting jobs into the queue.
|
||
|
jobBuffer = 100
|
||
|
|
||
|
// TODO(roasbeef): job buffer pool?
|
||
|
)
|
||
|
|
||
|
// verifyJob is a job sent to the sigPool to verify a signature on a
|
||
|
// transaction. The items contained in the struct are necessary and sufficient
|
||
|
// to verify the full signature. The passed sigHash closure function should be
|
||
|
// set to a function that generates the relevant sighash.
|
||
|
//
|
||
|
// TODO(roasbeef): when we move to ecschnorr, make into batch signature
|
||
|
// verification using bos-coster (or pip?).
|
||
|
type verifyJob struct {
|
||
|
// pubKey is the public key that was used to generate the purported
|
||
|
// valid signature. Note that with the current channel construction,
|
||
|
// this public key will likely have been tweaked using the current per
|
||
|
// commitment point for a particular commitment transactions.
|
||
|
pubKey *btcec.PublicKey
|
||
|
|
||
|
// sig is the raw signature generated using the above public key. This
|
||
|
// is the signature to be verified.
|
||
|
sig *btcec.Signature
|
||
|
|
||
|
// sigHash is a function closure generates the sighashes that the
|
||
|
// passed signature is known to have signed.
|
||
|
sigHash func() ([]byte, error)
|
||
|
|
||
|
// cancel is a channel that should be closed if the caller wishes to
|
||
|
// cancel all pending verification jobs part of a single batch. This
|
||
|
// channel is to be closed in the case that a single signature in a
|
||
|
// batch has been returned as invalid, as there is no need to verify
|
||
|
// the remainder of the signatures.
|
||
|
cancel chan struct{}
|
||
|
|
||
|
// errResp is the channel that the result of the signature verification
|
||
|
// is to be sent over. In the see that the signature is valid, a nil
|
||
|
// error will be passed. Otherwise, a concrete error detailing the
|
||
|
// issue will be passed.
|
||
|
errResp chan error
|
||
|
}
|
||
|
|
||
|
// signJob is a job sent to the sigPool to generate a valid signature according
|
||
|
// to the passed SignDescriptor for the passed transaction. Jobs are intended
|
||
|
// to be sent in batches in order to parallelize the job of generating
|
||
|
// signatures for a new commitment transaction.
|
||
|
type signJob struct {
|
||
|
// signDesc is intended to be a full populated SignDescriptor which
|
||
|
// encodes the necessary material (keys, witness script, etc) required
|
||
|
// to generate a valid signature for the specified input.
|
||
|
signDesc SignDescriptor
|
||
|
|
||
|
// tx is the transaction to be signed. This is required to generate the
|
||
|
// proper sighash for the input to be signed.
|
||
|
tx *wire.MsgTx
|
||
|
|
||
|
// outputIndex...
|
||
|
outputIndex int32
|
||
|
|
||
|
// cancel is a channel that should be closed if the caller wishes to
|
||
|
// abandon all pending sign jobs part of a single batch.
|
||
|
cancel chan struct{}
|
||
|
|
||
|
// resp is the channel that the response to this particular signJob
|
||
|
// will be sent over.
|
||
|
//
|
||
|
// TODO(roasbeef): actually need to allow caller to set, need to retain
|
||
|
// order mark commit sig as special
|
||
|
resp chan signJobResp
|
||
|
}
|
||
|
|
||
|
// signJobResp is the response to a sign job. Both channels are to be read in
|
||
|
// order to ensure no unnecessary goroutine blocking occurs. Additionally, both
|
||
|
// channels should be buffered.
|
||
|
type signJobResp struct {
|
||
|
// sig is the generated signature for a particular signJob In the case
|
||
|
// of an error during signature generation, then this value sent will
|
||
|
// be nil.
|
||
|
sig *btcec.Signature
|
||
|
|
||
|
// err is the error that occurred when executing the specified
|
||
|
// signature job. In the case that no error occurred, this value will
|
||
|
// be nil.
|
||
|
err error
|
||
|
}
|
||
|
|
||
|
// sigPool is a struct that is meant to allow the current channel state machine
|
||
|
// to parallelize all signature generation and verification. This struct is
|
||
|
// needed as _each_ HTLC when creating a commitment transaction requires a
|
||
|
// signature, and similarly a receiver of a new commitment must verify all the
|
||
|
// HTLC signatures included within the CommitSig message. A pool of workers
|
||
|
// will be maintained by the sigPool. Batches of jobs (either to sign or
|
||
|
// verify) can be sent to the pool of workers which will asynchronously perform
|
||
|
// the specified job.
|
||
|
//
|
||
|
// TODO(roasbeef): rename?
|
||
|
// * ecdsaPool?
|
||
|
type sigPool struct {
|
||
|
started uint32
|
||
|
stopped uint32
|
||
|
|
||
|
signer Signer
|
||
|
|
||
|
verifyJobs chan verifyJob
|
||
|
signJobs chan signJob
|
||
|
|
||
|
wg sync.WaitGroup
|
||
|
quit chan struct{}
|
||
|
|
||
|
numWorkers int
|
||
|
}
|
||
|
|
||
|
// newSigPool creates a new signature pool with the specified number of
|
||
|
// workers. The recommended parameter for the number of works is the number of
|
||
|
// physical CPU cores available on the target machine.
|
||
|
func newSigPool(numWorkers int, signer Signer) *sigPool {
|
||
|
return &sigPool{
|
||
|
signer: signer,
|
||
|
numWorkers: numWorkers,
|
||
|
verifyJobs: make(chan verifyJob, jobBuffer),
|
||
|
signJobs: make(chan signJob, jobBuffer),
|
||
|
quit: make(chan struct{}),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Start starts of all goroutines that the sigPool needs to carry out its
|
||
|
// duties.
|
||
|
func (s *sigPool) Start() error {
|
||
|
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
for i := 0; i < s.numWorkers; i++ {
|
||
|
s.wg.Add(1)
|
||
|
go s.poolWorker()
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Stop signals any active workers carrying out jobs to exit so the sigPool can
|
||
|
// gracefully shutdown.
|
||
|
func (s *sigPool) Stop() error {
|
||
|
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
close(s.quit)
|
||
|
s.wg.Wait()
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// poolWorker is the main worker goroutine wtihin the sigPool. Individual
|
||
|
// batches are distributed amongst each of the active workers. The workers then
|
||
|
// execute the task based on the type of job, and return the result back to
|
||
|
// caller.
|
||
|
func (s *sigPool) poolWorker() {
|
||
|
defer s.wg.Done()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
|
||
|
// We've just received a new signature job. Given the items
|
||
|
// contained within the message, we'll craft a signature and
|
||
|
// send the result along with a possible error back to the
|
||
|
// caller.
|
||
|
case sigMsg := <-s.signJobs:
|
||
|
rawSig, err := s.signer.SignOutputRaw(sigMsg.tx,
|
||
|
&sigMsg.signDesc)
|
||
|
if err != nil {
|
||
|
select {
|
||
|
case sigMsg.resp <- signJobResp{
|
||
|
sig: nil,
|
||
|
err: err,
|
||
|
}:
|
||
|
continue
|
||
|
case <-sigMsg.cancel:
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
|
||
|
sig, err := btcec.ParseSignature(rawSig, btcec.S256())
|
||
|
select {
|
||
|
case sigMsg.resp <- signJobResp{
|
||
|
sig: sig,
|
||
|
err: err,
|
||
|
}:
|
||
|
case <-sigMsg.cancel:
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
// We've just received a new verification job from the outside
|
||
|
// world. We'll attempt to construct the sighash, parse the
|
||
|
// signature, and finally verify the signature.
|
||
|
case verifyMsg := <-s.verifyJobs:
|
||
|
sigHash, err := verifyMsg.sigHash()
|
||
|
if err != nil {
|
||
|
select {
|
||
|
case verifyMsg.errResp <- err:
|
||
|
continue
|
||
|
case <-verifyMsg.cancel:
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
|
||
|
rawSig := verifyMsg.sig
|
||
|
|
||
|
if !rawSig.Verify(sigHash, verifyMsg.pubKey) {
|
||
|
err := fmt.Errorf("invalid signature "+
|
||
|
"sighash: %x, sig: %x", sigHash, rawSig.Serialize())
|
||
|
select {
|
||
|
case verifyMsg.errResp <- err:
|
||
|
case <-verifyMsg.cancel:
|
||
|
}
|
||
|
} else {
|
||
|
select {
|
||
|
case verifyMsg.errResp <- nil:
|
||
|
case <-verifyMsg.cancel:
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// The sigPool is exiting, so we will as well.
|
||
|
case <-s.quit:
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// SubmitSignBatch submits a batch of signature jobs to the sigPool. The
|
||
|
// response and cancel channels for each of the signJob's are expected to be
|
||
|
// fully populated, as the response for each job will be sent over the response
|
||
|
// channel within the job itself.
|
||
|
func (s *sigPool) SubmitSignBatch(signJobs []signJob) {
|
||
|
for _, job := range signJobs {
|
||
|
select {
|
||
|
case s.signJobs <- job:
|
||
|
case <-job.cancel:
|
||
|
// TODO(roasbeef): return error?
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// SubmitVerifyBatch submits a batch of verification jobs to the sigPool. For
|
||
|
// each job submitted, an error will be passed into the returned channel
|
||
|
// denoting if signature verification was valid or not. The passed cancelChan
|
||
|
// allows the caller to cancel all pending jobs in the case that they wish to
|
||
|
// bail early.
|
||
|
func (s *sigPool) SubmitVerifyBatch(verifyJobs []verifyJob,
|
||
|
cancelChan chan struct{}) <-chan error {
|
||
|
|
||
|
errChan := make(chan error, len(verifyJobs))
|
||
|
|
||
|
for _, job := range verifyJobs {
|
||
|
job.cancel = cancelChan
|
||
|
job.errResp = errChan
|
||
|
|
||
|
select {
|
||
|
case s.verifyJobs <- job:
|
||
|
case <-job.cancel:
|
||
|
return errChan
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return errChan
|
||
|
}
|