diff --git a/lnwallet/sigpool.go b/lnwallet/sigpool.go new file mode 100644 index 00000000..2c7ce0d6 --- /dev/null +++ b/lnwallet/sigpool.go @@ -0,0 +1,303 @@ +package lnwallet + +import ( + "fmt" + "sync" + "sync/atomic" + + "github.com/roasbeef/btcd/btcec" + "github.com/roasbeef/btcd/wire" +) + +const ( + // jobBuffer is a constant the represents the buffer of jobs in the two + // main queues. This allows clients avoid necessarily blocking when + // submitting jobs into the queue. + jobBuffer = 100 + + // TODO(roasbeef): job buffer pool? +) + +// verifyJob is a job sent to the sigPool to verify a signature on a +// transaction. The items contained in the struct are necessary and sufficient +// to verify the full signature. The passed sigHash closure function should be +// set to a function that generates the relevant sighash. +// +// TODO(roasbeef): when we move to ecschnorr, make into batch signature +// verification using bos-coster (or pip?). +type verifyJob struct { + // pubKey is the public key that was used to generate the purported + // valid signature. Note that with the current channel construction, + // this public key will likely have been tweaked using the current per + // commitment point for a particular commitment transactions. + pubKey *btcec.PublicKey + + // sig is the raw signature generated using the above public key. This + // is the signature to be verified. + sig *btcec.Signature + + // sigHash is a function closure generates the sighashes that the + // passed signature is known to have signed. + sigHash func() ([]byte, error) + + // cancel is a channel that should be closed if the caller wishes to + // cancel all pending verification jobs part of a single batch. This + // channel is to be closed in the case that a single signature in a + // batch has been returned as invalid, as there is no need to verify + // the remainder of the signatures. + cancel chan struct{} + + // errResp is the channel that the result of the signature verification + // is to be sent over. In the see that the signature is valid, a nil + // error will be passed. Otherwise, a concrete error detailing the + // issue will be passed. + errResp chan error +} + +// signJob is a job sent to the sigPool to generate a valid signature according +// to the passed SignDescriptor for the passed transaction. Jobs are intended +// to be sent in batches in order to parallelize the job of generating +// signatures for a new commitment transaction. +type signJob struct { + // signDesc is intended to be a full populated SignDescriptor which + // encodes the necessary material (keys, witness script, etc) required + // to generate a valid signature for the specified input. + signDesc SignDescriptor + + // tx is the transaction to be signed. This is required to generate the + // proper sighash for the input to be signed. + tx *wire.MsgTx + + // outputIndex... + outputIndex int32 + + // cancel is a channel that should be closed if the caller wishes to + // abandon all pending sign jobs part of a single batch. + cancel chan struct{} + + // resp is the channel that the response to this particular signJob + // will be sent over. + // + // TODO(roasbeef): actually need to allow caller to set, need to retain + // order mark commit sig as special + resp chan signJobResp +} + +// sortableSignBatch is a type wrapper around a slice of signJobs which is able +// to sort each job according to tis outputs index. Such sorting is necessary +// as when creating a new commitment state, we need to send over all the HTLC +// signatures (if any) in the exact order the appears on the commitment +// transaction after BIP 69 sorting. +type sortableSignBatch []signJob + +// Len returns the number of items sortable batch of sign jobs. It is part of +// the sort.Interface implementation. +func (s sortableSignBatch) Len() int { return len(s) } + +// Less returns whether the item in the batch with index i should sort before +// the item with index j. It is part of the sort.Interface implementation. +func (s sortableSignBatch) Less(i, j int) bool { + return s[i].outputIndex < s[j].outputIndex +} + +// Swap swaps the items at the passed indices in the priority queue. It is part +// of the sort.Interface implementation. +func (s sortableSignBatch) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +// signJobResp is the response to a sign job. Both channels are to be read in +// order to ensure no unnecessary goroutine blocking occurs. Additionally, both +// channels should be buffered. +type signJobResp struct { + // sig is the generated signature for a particular signJob In the case + // of an error during signature generation, then this value sent will + // be nil. + sig *btcec.Signature + + // err is the error that occurred when executing the specified + // signature job. In the case that no error occurred, this value will + // be nil. + err error +} + +// sigPool is a struct that is meant to allow the current channel state machine +// to parallelize all signature generation and verification. This struct is +// needed as _each_ HTLC when creating a commitment transaction requires a +// signature, and similarly a receiver of a new commitment must verify all the +// HTLC signatures included within the CommitSig message. A pool of workers +// will be maintained by the sigPool. Batches of jobs (either to sign or +// verify) can be sent to the pool of workers which will asynchronously perform +// the specified job. +// +// TODO(roasbeef): rename? +// * ecdsaPool? +type sigPool struct { + started uint32 + stopped uint32 + + signer Signer + + verifyJobs chan verifyJob + signJobs chan signJob + + wg sync.WaitGroup + quit chan struct{} + + numWorkers int +} + +// newSigPool creates a new signature pool with the specified number of +// workers. The recommended parameter for the number of works is the number of +// physical CPU cores available on the target machine. +func newSigPool(numWorkers int, signer Signer) *sigPool { + return &sigPool{ + signer: signer, + numWorkers: numWorkers, + verifyJobs: make(chan verifyJob, jobBuffer), + signJobs: make(chan signJob, jobBuffer), + quit: make(chan struct{}), + } +} + +// Start starts of all goroutines that the sigPool needs to carry out its +// duties. +func (s *sigPool) Start() error { + if !atomic.CompareAndSwapUint32(&s.started, 0, 1) { + return nil + } + + for i := 0; i < s.numWorkers; i++ { + s.wg.Add(1) + go s.poolWorker() + } + + return nil +} + +// Stop signals any active workers carrying out jobs to exit so the sigPool can +// gracefully shutdown. +func (s *sigPool) Stop() error { + if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { + return nil + } + + close(s.quit) + s.wg.Wait() + + return nil +} + +// poolWorker is the main worker goroutine wtihin the sigPool. Individual +// batches are distributed amongst each of the active workers. The workers then +// execute the task based on the type of job, and return the result back to +// caller. +func (s *sigPool) poolWorker() { + defer s.wg.Done() + + for { + select { + + // We've just received a new signature job. Given the items + // contained within the message, we'll craft a signature and + // send the result along with a possible error back to the + // caller. + case sigMsg := <-s.signJobs: + rawSig, err := s.signer.SignOutputRaw(sigMsg.tx, + &sigMsg.signDesc) + if err != nil { + select { + case sigMsg.resp <- signJobResp{ + sig: nil, + err: err, + }: + continue + case <-sigMsg.cancel: + continue + } + } + + sig, err := btcec.ParseSignature(rawSig, btcec.S256()) + select { + case sigMsg.resp <- signJobResp{ + sig: sig, + err: err, + }: + case <-sigMsg.cancel: + continue + } + + // We've just received a new verification job from the outside + // world. We'll attempt to construct the sighash, parse the + // signature, and finally verify the signature. + case verifyMsg := <-s.verifyJobs: + sigHash, err := verifyMsg.sigHash() + if err != nil { + select { + case verifyMsg.errResp <- err: + continue + case <-verifyMsg.cancel: + continue + } + } + + rawSig := verifyMsg.sig + + if !rawSig.Verify(sigHash, verifyMsg.pubKey) { + err := fmt.Errorf("invalid signature "+ + "sighash: %x, sig: %x", sigHash, rawSig.Serialize()) + select { + case verifyMsg.errResp <- err: + case <-verifyMsg.cancel: + } + } else { + select { + case verifyMsg.errResp <- nil: + case <-verifyMsg.cancel: + } + } + + // The sigPool is exiting, so we will as well. + case <-s.quit: + return + } + } +} + +// SubmitSignBatch submits a batch of signature jobs to the sigPool. The +// response and cancel channels for each of the signJob's are expected to be +// fully populated, as the response for each job will be sent over the response +// channel within the job itself. +func (s *sigPool) SubmitSignBatch(signJobs []signJob) { + for _, job := range signJobs { + select { + case s.signJobs <- job: + case <-job.cancel: + // TODO(roasbeef): return error? + } + } +} + +// SubmitVerifyBatch submits a batch of verification jobs to the sigPool. For +// each job submitted, an error will be passed into the returned channel +// denoting if signature verification was valid or not. The passed cancelChan +// allows the caller to cancel all pending jobs in the case that they wish to +// bail early. +func (s *sigPool) SubmitVerifyBatch(verifyJobs []verifyJob, + cancelChan chan struct{}) <-chan error { + + errChan := make(chan error, len(verifyJobs)) + + for _, job := range verifyJobs { + job.cancel = cancelChan + job.errResp = errChan + + select { + case s.verifyJobs <- job: + case <-job.cancel: + return errChan + } + } + + return errChan +}