package lnwallet import ( "fmt" "sync" "sync/atomic" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/wire" ) const ( // jobBuffer is a constant the represents the buffer of jobs in the two // main queues. This allows clients avoid necessarily blocking when // submitting jobs into the queue. jobBuffer = 100 // TODO(roasbeef): job buffer pool? ) // verifyJob is a job sent to the sigPool to verify a signature on a // transaction. The items contained in the struct are necessary and sufficient // to verify the full signature. The passed sigHash closure function should be // set to a function that generates the relevant sighash. // // TODO(roasbeef): when we move to ecschnorr, make into batch signature // verification using bos-coster (or pip?). type verifyJob struct { // pubKey is the public key that was used to generate the purported // valid signature. Note that with the current channel construction, // this public key will likely have been tweaked using the current per // commitment point for a particular commitment transactions. pubKey *btcec.PublicKey // sig is the raw signature generated using the above public key. This // is the signature to be verified. sig *btcec.Signature // sigHash is a function closure generates the sighashes that the // passed signature is known to have signed. sigHash func() ([]byte, error) // cancel is a channel that should be closed if the caller wishes to // cancel all pending verification jobs part of a single batch. This // channel is to be closed in the case that a single signature in a // batch has been returned as invalid, as there is no need to verify // the remainder of the signatures. cancel chan struct{} // errResp is the channel that the result of the signature verification // is to be sent over. In the see that the signature is valid, a nil // error will be passed. Otherwise, a concrete error detailing the // issue will be passed. errResp chan error } // signJob is a job sent to the sigPool to generate a valid signature according // to the passed SignDescriptor for the passed transaction. Jobs are intended // to be sent in batches in order to parallelize the job of generating // signatures for a new commitment transaction. type signJob struct { // signDesc is intended to be a full populated SignDescriptor which // encodes the necessary material (keys, witness script, etc) required // to generate a valid signature for the specified input. signDesc SignDescriptor // tx is the transaction to be signed. This is required to generate the // proper sighash for the input to be signed. tx *wire.MsgTx // outputIndex... outputIndex int32 // cancel is a channel that should be closed if the caller wishes to // abandon all pending sign jobs part of a single batch. cancel chan struct{} // resp is the channel that the response to this particular signJob // will be sent over. // // TODO(roasbeef): actually need to allow caller to set, need to retain // order mark commit sig as special resp chan signJobResp } // sortableSignBatch is a type wrapper around a slice of signJobs which is able // to sort each job according to tis outputs index. Such sorting is necessary // as when creating a new commitment state, we need to send over all the HTLC // signatures (if any) in the exact order the appears on the commitment // transaction after BIP 69 sorting. type sortableSignBatch []signJob // Len returns the number of items sortable batch of sign jobs. It is part of // the sort.Interface implementation. func (s sortableSignBatch) Len() int { return len(s) } // Less returns whether the item in the batch with index i should sort before // the item with index j. It is part of the sort.Interface implementation. func (s sortableSignBatch) Less(i, j int) bool { return s[i].outputIndex < s[j].outputIndex } // Swap swaps the items at the passed indices in the priority queue. It is part // of the sort.Interface implementation. func (s sortableSignBatch) Swap(i, j int) { s[i], s[j] = s[j], s[i] } // signJobResp is the response to a sign job. Both channels are to be read in // order to ensure no unnecessary goroutine blocking occurs. Additionally, both // channels should be buffered. type signJobResp struct { // sig is the generated signature for a particular signJob In the case // of an error during signature generation, then this value sent will // be nil. sig *btcec.Signature // err is the error that occurred when executing the specified // signature job. In the case that no error occurred, this value will // be nil. err error } // sigPool is a struct that is meant to allow the current channel state machine // to parallelize all signature generation and verification. This struct is // needed as _each_ HTLC when creating a commitment transaction requires a // signature, and similarly a receiver of a new commitment must verify all the // HTLC signatures included within the CommitSig message. A pool of workers // will be maintained by the sigPool. Batches of jobs (either to sign or // verify) can be sent to the pool of workers which will asynchronously perform // the specified job. // // TODO(roasbeef): rename? // * ecdsaPool? type sigPool struct { started uint32 stopped uint32 signer Signer verifyJobs chan verifyJob signJobs chan signJob wg sync.WaitGroup quit chan struct{} numWorkers int } // newSigPool creates a new signature pool with the specified number of // workers. The recommended parameter for the number of works is the number of // physical CPU cores available on the target machine. func newSigPool(numWorkers int, signer Signer) *sigPool { return &sigPool{ signer: signer, numWorkers: numWorkers, verifyJobs: make(chan verifyJob, jobBuffer), signJobs: make(chan signJob, jobBuffer), quit: make(chan struct{}), } } // Start starts of all goroutines that the sigPool needs to carry out its // duties. func (s *sigPool) Start() error { if !atomic.CompareAndSwapUint32(&s.started, 0, 1) { return nil } for i := 0; i < s.numWorkers; i++ { s.wg.Add(1) go s.poolWorker() } return nil } // Stop signals any active workers carrying out jobs to exit so the sigPool can // gracefully shutdown. func (s *sigPool) Stop() error { if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { return nil } close(s.quit) s.wg.Wait() return nil } // poolWorker is the main worker goroutine wtihin the sigPool. Individual // batches are distributed amongst each of the active workers. The workers then // execute the task based on the type of job, and return the result back to // caller. func (s *sigPool) poolWorker() { defer s.wg.Done() for { select { // We've just received a new signature job. Given the items // contained within the message, we'll craft a signature and // send the result along with a possible error back to the // caller. case sigMsg := <-s.signJobs: rawSig, err := s.signer.SignOutputRaw(sigMsg.tx, &sigMsg.signDesc) if err != nil { select { case sigMsg.resp <- signJobResp{ sig: nil, err: err, }: continue case <-sigMsg.cancel: continue } } sig, err := btcec.ParseSignature(rawSig, btcec.S256()) select { case sigMsg.resp <- signJobResp{ sig: sig, err: err, }: case <-sigMsg.cancel: continue } // We've just received a new verification job from the outside // world. We'll attempt to construct the sighash, parse the // signature, and finally verify the signature. case verifyMsg := <-s.verifyJobs: sigHash, err := verifyMsg.sigHash() if err != nil { select { case verifyMsg.errResp <- err: continue case <-verifyMsg.cancel: continue } } rawSig := verifyMsg.sig if !rawSig.Verify(sigHash, verifyMsg.pubKey) { err := fmt.Errorf("invalid signature "+ "sighash: %x, sig: %x", sigHash, rawSig.Serialize()) select { case verifyMsg.errResp <- err: case <-verifyMsg.cancel: } } else { select { case verifyMsg.errResp <- nil: case <-verifyMsg.cancel: } } // The sigPool is exiting, so we will as well. case <-s.quit: return } } } // SubmitSignBatch submits a batch of signature jobs to the sigPool. The // response and cancel channels for each of the signJob's are expected to be // fully populated, as the response for each job will be sent over the response // channel within the job itself. func (s *sigPool) SubmitSignBatch(signJobs []signJob) { for _, job := range signJobs { select { case s.signJobs <- job: case <-job.cancel: // TODO(roasbeef): return error? } } } // SubmitVerifyBatch submits a batch of verification jobs to the sigPool. For // each job submitted, an error will be passed into the returned channel // denoting if signature verification was valid or not. The passed cancelChan // allows the caller to cancel all pending jobs in the case that they wish to // bail early. func (s *sigPool) SubmitVerifyBatch(verifyJobs []verifyJob, cancelChan chan struct{}) <-chan error { errChan := make(chan error, len(verifyJobs)) for _, job := range verifyJobs { job.cancel = cancelChan job.errResp = errChan select { case s.verifyJobs <- job: case <-job.cancel: return errChan } } return errChan }