diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 51a5146e..ef00ff38 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -111,8 +111,10 @@ type ChannelLink interface { // Peer is an interface which represents the remote lightning node inside our // system. type Peer interface { - // SendMessage sends message to remote peer. - SendMessage(lnwire.Message) error + // SendMessage sends message to remote peer. The second argument + // denotes if the method should block until the message has been sent + // to the remote peer. + SendMessage(msg lnwire.Message, sync bool) error // WipeChannel removes the channel uniquely identified by its channel // point from all indexes associated with the peer. diff --git a/htlcswitch/link.go b/htlcswitch/link.go index d58f2276..2a7feae2 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -483,7 +483,7 @@ func (l *channelLink) syncChanStates() error { return fmt.Errorf("unable to generate chan sync message for "+ "ChannelPoint(%v)", l.channel.ChannelPoint()) } - if err := l.cfg.Peer.SendMessage(localChanSyncMsg); err != nil { + if err := l.cfg.Peer.SendMessage(localChanSyncMsg, false); err != nil { return fmt.Errorf("Unable to send chan sync message for "+ "ChannelPoint(%v)", l.channel.ChannelPoint()) } @@ -525,7 +525,7 @@ func (l *channelLink) syncChanStates() error { fundingLockedMsg := lnwire.NewFundingLocked( l.ChanID(), nextRevocation, ) - err = l.cfg.Peer.SendMessage(fundingLockedMsg) + err = l.cfg.Peer.SendMessage(fundingLockedMsg, false) if err != nil { return fmt.Errorf("unable to re-send "+ "FundingLocked: %v", err) @@ -575,7 +575,7 @@ func (l *channelLink) syncChanStates() error { // immediately so we return to a synchronized state as soon as // possible. for _, msg := range msgsToReSend { - l.cfg.Peer.SendMessage(msg) + l.cfg.Peer.SendMessage(msg, false) } case <-l.quit: @@ -1058,7 +1058,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { l.openedCircuits = append(l.openedCircuits, pkt.inKey()) l.keystoneBatch = append(l.keystoneBatch, pkt.keystone()) - l.cfg.Peer.SendMessage(htlc) + l.cfg.Peer.SendMessage(htlc, false) case *lnwire.UpdateFulfillHTLC: // An HTLC we forward to the switch has just settled somewhere @@ -1090,7 +1090,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // Then we send the HTLC settle message to the connected peer // so we can continue the propagation of the settle message. - l.cfg.Peer.SendMessage(htlc) + l.cfg.Peer.SendMessage(htlc, false) isSettle = true case *lnwire.UpdateFailHTLC: @@ -1122,7 +1122,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // Finally, we send the HTLC message to the peer which // initially created the HTLC. - l.cfg.Peer.SendMessage(htlc) + l.cfg.Peer.SendMessage(htlc, false) isSettle = true } @@ -1240,11 +1240,22 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // direct error. // // TODO(roasbeef): force close chan - if _, ok := err.(*lnwallet.InvalidCommitSigError); ok { - l.cfg.Peer.SendMessage(&lnwire.Error{ + var sendErr bool + switch err.(type) { + case *lnwallet.InvalidCommitSigError: + sendErr = true + case *lnwallet.InvalidHtlcSigError: + sendErr = true + } + if sendErr { + err := l.cfg.Peer.SendMessage(&lnwire.Error{ ChanID: l.ChanID(), Data: []byte(err.Error()), - }) + }, true) + if err != nil { + l.errorf("unable to send msg to "+ + "remote peer: %v", err) + } } l.fail("ChannelPoint(%v): unable to accept new "+ @@ -1260,7 +1271,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { log.Errorf("unable to revoke commitment: %v", err) return } - l.cfg.Peer.SendMessage(nextRevocation) + l.cfg.Peer.SendMessage(nextRevocation, false) // Since we just revoked our commitment, we may have a new set // of HTLC's on our commitment, so we'll send them over our @@ -1288,7 +1299,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // If both commitment chains are fully synced from our PoV, // then we don't need to reply with a signature as both sides - // already have a commitment with the latest accepted l. + // already have a commitment with the latest accepted. if l.channel.FullySynced() { return } @@ -1457,7 +1468,7 @@ func (l *channelLink) updateCommitTx() error { CommitSig: theirCommitSig, HtlcSigs: htlcSigs, } - l.cfg.Peer.SendMessage(commitSig) + l.cfg.Peer.SendMessage(commitSig, false) // We've just initiated a state transition, attempt to stop the // logCommitTimer. If the timer already ticked, then we'll consume the @@ -1665,7 +1676,7 @@ func (l *channelLink) updateChannelFee(feePerKw lnwallet.SatPerKWeight) error { // We'll then attempt to send a new UpdateFee message, and also lock it // in immediately by triggering a commitment update. msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw)) - if err := l.cfg.Peer.SendMessage(msg); err != nil { + if err := l.cfg.Peer.SendMessage(msg, false); err != nil { return err } return l.updateCommitTx() @@ -2043,7 +2054,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, ChanID: l.ChanID(), ID: pd.HtlcIndex, PaymentPreimage: preimage, - }) + }, false) needUpdate = true // There are additional channels left within this route. So @@ -2364,7 +2375,7 @@ func (l *channelLink) sendHTLCError(htlcIndex uint64, failure lnwire.FailureMess ChanID: l.ChanID(), ID: htlcIndex, Reason: reason, - }) + }, false) } // sendMalformedHTLCError helper function which sends the malformed HTLC update @@ -2384,7 +2395,7 @@ func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64, ID: htlcIndex, ShaOnionBlob: shaOnionBlob, FailureCode: code, - }) + }, false) } // fail helper function which is used to encapsulate the action necessary for diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 84f20953..95f11d0f 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1392,7 +1392,7 @@ type mockPeer struct { quit chan struct{} } -func (m *mockPeer) SendMessage(msg lnwire.Message) error { +func (m *mockPeer) SendMessage(msg lnwire.Message, sync bool) error { select { case m.sentMsgs <- msg: case <-m.quit: diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 658b636e..42c1843c 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -447,7 +447,7 @@ func (s *mockServer) intersect(f messageInterceptor) { s.interceptorFuncs = append(s.interceptorFuncs, f) } -func (s *mockServer) SendMessage(message lnwire.Message) error { +func (s *mockServer) SendMessage(message lnwire.Message, sync bool) error { select { case s.messages <- message: diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 22717eda..c7804c0f 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -1881,8 +1881,9 @@ func NewBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64, if err != nil { return nil, err } - commitmentSecret, commitmentPoint := btcec.PrivKeyFromBytes(btcec.S256(), - revocationPreimage[:]) + commitmentSecret, commitmentPoint := btcec.PrivKeyFromBytes( + btcec.S256(), revocationPreimage[:], + ) // With the commitment point generated, we can now generate the four // keys we'll need to reconstruct the commitment state, @@ -1893,8 +1894,9 @@ func NewBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64, // number so we can have the proper witness script to sign and include // within the final witness. remoteDelay := uint32(chanState.RemoteChanCfg.CsvDelay) - remotePkScript, err := commitScriptToSelf(remoteDelay, keyRing.DelayKey, - keyRing.RevocationKey) + remotePkScript, err := commitScriptToSelf( + remoteDelay, keyRing.DelayKey, keyRing.RevocationKey, + ) if err != nil { return nil, err } @@ -1999,10 +2001,10 @@ func NewBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64, return nil, err } - // Otherwise, is this was an outgoing HTLC that we sent, then - // from the PoV of the remote commitment state, they're the - // receiver of this HTLC. } else { + // Otherwise, is this was an outgoing HTLC that we + // sent, then from the PoV of the remote commitment + // state, they're the receiver of this HTLC. htlcScript, err = receiverHTLCScript( htlc.RefundTimeout, keyRing.LocalHtlcKey, keyRing.RemoteHtlcKey, keyRing.RevocationKey, @@ -2582,9 +2584,11 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, Hash: txHash, Index: uint32(htlc.remoteOutputIndex), } - sigJob.tx, err = createHtlcTimeoutTx(op, outputAmt, - htlc.Timeout, uint32(remoteChanCfg.CsvDelay), - keyRing.RevocationKey, keyRing.DelayKey) + sigJob.tx, err = createHtlcTimeoutTx( + op, outputAmt, htlc.Timeout, + uint32(remoteChanCfg.CsvDelay), + keyRing.RevocationKey, keyRing.DelayKey, + ) if err != nil { return nil, nil, err } @@ -2632,9 +2636,10 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, Hash: txHash, Index: uint32(htlc.remoteOutputIndex), } - sigJob.tx, err = createHtlcSuccessTx(op, outputAmt, - uint32(remoteChanCfg.CsvDelay), keyRing.RevocationKey, - keyRing.DelayKey) + sigJob.tx, err = createHtlcSuccessTx( + op, outputAmt, uint32(remoteChanCfg.CsvDelay), + keyRing.RevocationKey, keyRing.DelayKey, + ) if err != nil { return nil, nil, err } @@ -3483,20 +3488,23 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment, i := 0 for index := range localCommitmentView.txn.TxOut { var ( - sigHash func() ([]byte, error) - sig *btcec.Signature - err error + htlcIndex uint64 + sigHash func() ([]byte, error) + sig *btcec.Signature + err error ) outputIndex := int32(index) switch { - // If this output index is found within the incoming HTLC index, - // then this means that we need to generate an HTLC success - // transaction in order to validate the signature. + // If this output index is found within the incoming HTLC + // index, then this means that we need to generate an HTLC + // success transaction in order to validate the signature. case localCommitmentView.incomingHTLCIndex[outputIndex] != nil: htlc := localCommitmentView.incomingHTLCIndex[outputIndex] + htlcIndex = htlc.HtlcIndex + sigHash = func() ([]byte, error) { op := wire.OutPoint{ Hash: txHash, @@ -3547,6 +3555,8 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment, case localCommitmentView.outgoingHTLCIndex[outputIndex] != nil: htlc := localCommitmentView.outgoingHTLCIndex[outputIndex] + htlcIndex = htlc.HtlcIndex + sigHash = func() ([]byte, error) { op := wire.OutPoint{ Hash: txHash, @@ -3598,9 +3608,10 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment, } verifyJobs = append(verifyJobs, verifyJob{ - pubKey: keyRing.RemoteHtlcKey, - sig: sig, - sigHash: sigHash, + htlcIndex: htlcIndex, + pubKey: keyRing.RemoteHtlcKey, + sig: sig, + sigHash: sigHash, }) i++ @@ -3617,7 +3628,7 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment, } // InvalidCommitSigError is a struct that implements the error interface to -// report a failure to validation a commitment signature for a remote peer. +// report a failure to validate a commitment signature for a remote peer. // We'll use the items in this struct to generate a rich error message for the // remote peer when we receive an invalid signature from it. Doing so can // greatly aide in debugging cross implementation issues. @@ -3635,7 +3646,7 @@ type InvalidCommitSigError struct { // caused an invalid commitment signature. func (i *InvalidCommitSigError) Error() string { return fmt.Sprintf("rejected commitment: commit_height=%v, "+ - "invalid_sig=%x, commit_tx=%x, sig_hash=%x", i.commitHeight, + "invalid_commit_sig=%x, commit_tx=%x, sig_hash=%x", i.commitHeight, i.commitSig[:], i.commitTx, i.sigHash[:]) } @@ -3643,6 +3654,35 @@ func (i *InvalidCommitSigError) Error() string { // error interface. var _ error = (*InvalidCommitSigError)(nil) +// InvalidCommitSigError is a struc that implements the error interface to +// report a failure to validate an htlc signature from a remote peer. We'll use +// the items in this struct to generate a rich error message for the remote +// peer when we receive an invalid signature from it. Doing so can greatly aide +// in debugging across implementation issues. +type InvalidHtlcSigError struct { + commitHeight uint64 + + htlcSig []byte + + htlcIndex uint64 + + sigHash []byte + + commitTx []byte +} + +// Error returns a detailed error string including the exact transaction that +// caused an invalid htlc signature. +func (i *InvalidHtlcSigError) Error() string { + return fmt.Sprintf("rejected commitment: commit_height=%v, "+ + "invalid_htlc_sig=%x, commit_tx=%x, sig_hash=%x", i.commitHeight, + i.htlcSig, i.commitTx, i.sigHash[:]) +} + +// A compile time flag to ensure that InvalidCommitSigError implements the +// error interface. +var _ error = (*InvalidCommitSigError)(nil) + // ReceiveNewCommitment process a signature for a new commitment state sent by // the remote party. This method should be called in response to the // remote party initiating a new change, or when the remote party sends a @@ -3772,10 +3812,30 @@ func (lc *LightningChannel) ReceiveNewCommitment(commitSig lnwire.Sig, // In the case that a single signature is invalid, we'll exit // early and cancel all the outstanding verification jobs. select { - case err := <-verifyResps: - if err != nil { + case htlcErr := <-verifyResps: + if htlcErr != nil { close(cancelChan) - return fmt.Errorf("invalid htlc signature: %v", err) + + sig, err := lnwire.NewSigFromSignature( + htlcErr.sig, + ) + if err != nil { + return err + } + sigHash, err := htlcErr.sigHash() + if err != nil { + return err + } + + var txBytes bytes.Buffer + localCommitTx.Serialize(&txBytes) + return &InvalidHtlcSigError{ + commitHeight: nextHeight, + htlcSig: sig.ToSignatureBytes(), + htlcIndex: htlcErr.htlcIndex, + sigHash: sigHash, + commitTx: txBytes.Bytes(), + } } case <-lc.quit: return fmt.Errorf("channel shutting down") diff --git a/lnwallet/sigpool.go b/lnwallet/sigpool.go index a93c9630..defbf8ba 100644 --- a/lnwallet/sigpool.go +++ b/lnwallet/sigpool.go @@ -41,6 +41,10 @@ type verifyJob struct { // passed signature is known to have signed. sigHash func() ([]byte, error) + // htlcIndex is the index of the HTLC from the PoV of the remote + // party's update log. + htlcIndex uint64 + // cancel is a channel that should be closed if the caller wishes to // cancel all pending verification jobs part of a single batch. This // channel is to be closed in the case that a single signature in a @@ -52,7 +56,16 @@ type verifyJob struct { // is to be sent over. In the see that the signature is valid, a nil // error will be passed. Otherwise, a concrete error detailing the // issue will be passed. - errResp chan error + errResp chan *htlcIndexErr +} + +// verifyJobErr is a special type of error that also includes a pointer to the +// original validation job. Ths error message allows us to craft more detailed +// errors at upper layers. +type htlcIndexErr struct { + error + + *verifyJob } // signJob is a job sent to the sigPool to generate a valid signature according @@ -69,7 +82,8 @@ type signJob struct { // proper sighash for the input to be signed. tx *wire.MsgTx - // outputIndex... + // outputIndex is the output index of the HTLC on the commitment + // transaction being signed. outputIndex int32 // cancel is a channel that should be closed if the caller wishes to @@ -216,7 +230,10 @@ func (s *sigPool) poolWorker() { sigHash, err := verifyMsg.sigHash() if err != nil { select { - case verifyMsg.errResp <- err: + case verifyMsg.errResp <- &htlcIndexErr{ + error: err, + verifyJob: &verifyMsg, + }: continue case <-verifyMsg.cancel: continue @@ -229,7 +246,10 @@ func (s *sigPool) poolWorker() { err := fmt.Errorf("invalid signature "+ "sighash: %x, sig: %x", sigHash, rawSig.Serialize()) select { - case verifyMsg.errResp <- err: + case verifyMsg.errResp <- &htlcIndexErr{ + error: err, + verifyJob: &verifyMsg, + }: case <-verifyMsg.cancel: case <-s.quit: return @@ -272,9 +292,9 @@ func (s *sigPool) SubmitSignBatch(signJobs []signJob) { // allows the caller to cancel all pending jobs in the case that they wish to // bail early. func (s *sigPool) SubmitVerifyBatch(verifyJobs []verifyJob, - cancelChan chan struct{}) <-chan error { + cancelChan chan struct{}) <-chan *htlcIndexErr { - errChan := make(chan error, len(verifyJobs)) + errChan := make(chan *htlcIndexErr, len(verifyJobs)) for _, job := range verifyJobs { job.cancel = cancelChan diff --git a/peer.go b/peer.go index c45db9df..b76c099d 100644 --- a/peer.go +++ b/peer.go @@ -1874,10 +1874,23 @@ func (p *peer) sendInitMsg() error { return p.writeMessage(msg) } -// SendMessage queues a message for sending to the target peer. -func (p *peer) SendMessage(msg lnwire.Message) error { - p.queueMsg(msg, nil) - return nil +// SendMessage sends message to remote peer. The second argument denotes if the +// method should block until the message has been sent to the remote peer. +func (p *peer) SendMessage(msg lnwire.Message, sync bool) error { + if !sync { + p.queueMsg(msg, nil) + return nil + } + + errChan := make(chan error, 1) + p.queueMsg(msg, errChan) + + select { + case err := <-errChan: + return err + case <-p.quit: + return fmt.Errorf("peer shutting down") + } } // PubKey returns the pubkey of the peer in compressed serialized format.