Merge pull request #1026 from Roasbeef/link-sig-errors

htlcswitch+lnwallet: ensure the Error is sent to remote peer before d/c, add detailed err for htlc sig rejection
This commit is contained in:
Olaoluwa Osuntokun 2018-04-05 16:28:25 -07:00 committed by GitHub
commit f07b1cc267
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 163 additions and 57 deletions

@ -111,8 +111,10 @@ type ChannelLink interface {
// Peer is an interface which represents the remote lightning node inside our // Peer is an interface which represents the remote lightning node inside our
// system. // system.
type Peer interface { type Peer interface {
// SendMessage sends message to remote peer. // SendMessage sends message to remote peer. The second argument
SendMessage(lnwire.Message) error // denotes if the method should block until the message has been sent
// to the remote peer.
SendMessage(msg lnwire.Message, sync bool) error
// WipeChannel removes the channel uniquely identified by its channel // WipeChannel removes the channel uniquely identified by its channel
// point from all indexes associated with the peer. // point from all indexes associated with the peer.

@ -483,7 +483,7 @@ func (l *channelLink) syncChanStates() error {
return fmt.Errorf("unable to generate chan sync message for "+ return fmt.Errorf("unable to generate chan sync message for "+
"ChannelPoint(%v)", l.channel.ChannelPoint()) "ChannelPoint(%v)", l.channel.ChannelPoint())
} }
if err := l.cfg.Peer.SendMessage(localChanSyncMsg); err != nil { if err := l.cfg.Peer.SendMessage(localChanSyncMsg, false); err != nil {
return fmt.Errorf("Unable to send chan sync message for "+ return fmt.Errorf("Unable to send chan sync message for "+
"ChannelPoint(%v)", l.channel.ChannelPoint()) "ChannelPoint(%v)", l.channel.ChannelPoint())
} }
@ -525,7 +525,7 @@ func (l *channelLink) syncChanStates() error {
fundingLockedMsg := lnwire.NewFundingLocked( fundingLockedMsg := lnwire.NewFundingLocked(
l.ChanID(), nextRevocation, l.ChanID(), nextRevocation,
) )
err = l.cfg.Peer.SendMessage(fundingLockedMsg) err = l.cfg.Peer.SendMessage(fundingLockedMsg, false)
if err != nil { if err != nil {
return fmt.Errorf("unable to re-send "+ return fmt.Errorf("unable to re-send "+
"FundingLocked: %v", err) "FundingLocked: %v", err)
@ -575,7 +575,7 @@ func (l *channelLink) syncChanStates() error {
// immediately so we return to a synchronized state as soon as // immediately so we return to a synchronized state as soon as
// possible. // possible.
for _, msg := range msgsToReSend { for _, msg := range msgsToReSend {
l.cfg.Peer.SendMessage(msg) l.cfg.Peer.SendMessage(msg, false)
} }
case <-l.quit: case <-l.quit:
@ -1058,7 +1058,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
l.openedCircuits = append(l.openedCircuits, pkt.inKey()) l.openedCircuits = append(l.openedCircuits, pkt.inKey())
l.keystoneBatch = append(l.keystoneBatch, pkt.keystone()) l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
l.cfg.Peer.SendMessage(htlc) l.cfg.Peer.SendMessage(htlc, false)
case *lnwire.UpdateFulfillHTLC: case *lnwire.UpdateFulfillHTLC:
// An HTLC we forward to the switch has just settled somewhere // An HTLC we forward to the switch has just settled somewhere
@ -1090,7 +1090,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
// Then we send the HTLC settle message to the connected peer // Then we send the HTLC settle message to the connected peer
// so we can continue the propagation of the settle message. // so we can continue the propagation of the settle message.
l.cfg.Peer.SendMessage(htlc) l.cfg.Peer.SendMessage(htlc, false)
isSettle = true isSettle = true
case *lnwire.UpdateFailHTLC: case *lnwire.UpdateFailHTLC:
@ -1122,7 +1122,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
// Finally, we send the HTLC message to the peer which // Finally, we send the HTLC message to the peer which
// initially created the HTLC. // initially created the HTLC.
l.cfg.Peer.SendMessage(htlc) l.cfg.Peer.SendMessage(htlc, false)
isSettle = true isSettle = true
} }
@ -1240,11 +1240,22 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// direct error. // direct error.
// //
// TODO(roasbeef): force close chan // TODO(roasbeef): force close chan
if _, ok := err.(*lnwallet.InvalidCommitSigError); ok { var sendErr bool
l.cfg.Peer.SendMessage(&lnwire.Error{ switch err.(type) {
case *lnwallet.InvalidCommitSigError:
sendErr = true
case *lnwallet.InvalidHtlcSigError:
sendErr = true
}
if sendErr {
err := l.cfg.Peer.SendMessage(&lnwire.Error{
ChanID: l.ChanID(), ChanID: l.ChanID(),
Data: []byte(err.Error()), Data: []byte(err.Error()),
}) }, true)
if err != nil {
l.errorf("unable to send msg to "+
"remote peer: %v", err)
}
} }
l.fail("ChannelPoint(%v): unable to accept new "+ l.fail("ChannelPoint(%v): unable to accept new "+
@ -1260,7 +1271,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
log.Errorf("unable to revoke commitment: %v", err) log.Errorf("unable to revoke commitment: %v", err)
return return
} }
l.cfg.Peer.SendMessage(nextRevocation) l.cfg.Peer.SendMessage(nextRevocation, false)
// Since we just revoked our commitment, we may have a new set // Since we just revoked our commitment, we may have a new set
// of HTLC's on our commitment, so we'll send them over our // of HTLC's on our commitment, so we'll send them over our
@ -1288,7 +1299,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// If both commitment chains are fully synced from our PoV, // If both commitment chains are fully synced from our PoV,
// then we don't need to reply with a signature as both sides // then we don't need to reply with a signature as both sides
// already have a commitment with the latest accepted l. // already have a commitment with the latest accepted.
if l.channel.FullySynced() { if l.channel.FullySynced() {
return return
} }
@ -1457,7 +1468,7 @@ func (l *channelLink) updateCommitTx() error {
CommitSig: theirCommitSig, CommitSig: theirCommitSig,
HtlcSigs: htlcSigs, HtlcSigs: htlcSigs,
} }
l.cfg.Peer.SendMessage(commitSig) l.cfg.Peer.SendMessage(commitSig, false)
// We've just initiated a state transition, attempt to stop the // We've just initiated a state transition, attempt to stop the
// logCommitTimer. If the timer already ticked, then we'll consume the // logCommitTimer. If the timer already ticked, then we'll consume the
@ -1665,7 +1676,7 @@ func (l *channelLink) updateChannelFee(feePerKw lnwallet.SatPerKWeight) error {
// We'll then attempt to send a new UpdateFee message, and also lock it // We'll then attempt to send a new UpdateFee message, and also lock it
// in immediately by triggering a commitment update. // in immediately by triggering a commitment update.
msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw)) msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
if err := l.cfg.Peer.SendMessage(msg); err != nil { if err := l.cfg.Peer.SendMessage(msg, false); err != nil {
return err return err
} }
return l.updateCommitTx() return l.updateCommitTx()
@ -2043,7 +2054,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
ChanID: l.ChanID(), ChanID: l.ChanID(),
ID: pd.HtlcIndex, ID: pd.HtlcIndex,
PaymentPreimage: preimage, PaymentPreimage: preimage,
}) }, false)
needUpdate = true needUpdate = true
// There are additional channels left within this route. So // There are additional channels left within this route. So
@ -2364,7 +2375,7 @@ func (l *channelLink) sendHTLCError(htlcIndex uint64, failure lnwire.FailureMess
ChanID: l.ChanID(), ChanID: l.ChanID(),
ID: htlcIndex, ID: htlcIndex,
Reason: reason, Reason: reason,
}) }, false)
} }
// sendMalformedHTLCError helper function which sends the malformed HTLC update // sendMalformedHTLCError helper function which sends the malformed HTLC update
@ -2384,7 +2395,7 @@ func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
ID: htlcIndex, ID: htlcIndex,
ShaOnionBlob: shaOnionBlob, ShaOnionBlob: shaOnionBlob,
FailureCode: code, FailureCode: code,
}) }, false)
} }
// fail helper function which is used to encapsulate the action necessary for // fail helper function which is used to encapsulate the action necessary for

@ -1392,7 +1392,7 @@ type mockPeer struct {
quit chan struct{} quit chan struct{}
} }
func (m *mockPeer) SendMessage(msg lnwire.Message) error { func (m *mockPeer) SendMessage(msg lnwire.Message, sync bool) error {
select { select {
case m.sentMsgs <- msg: case m.sentMsgs <- msg:
case <-m.quit: case <-m.quit:

@ -447,7 +447,7 @@ func (s *mockServer) intersect(f messageInterceptor) {
s.interceptorFuncs = append(s.interceptorFuncs, f) s.interceptorFuncs = append(s.interceptorFuncs, f)
} }
func (s *mockServer) SendMessage(message lnwire.Message) error { func (s *mockServer) SendMessage(message lnwire.Message, sync bool) error {
select { select {
case s.messages <- message: case s.messages <- message:

@ -1881,8 +1881,9 @@ func NewBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64,
if err != nil { if err != nil {
return nil, err return nil, err
} }
commitmentSecret, commitmentPoint := btcec.PrivKeyFromBytes(btcec.S256(), commitmentSecret, commitmentPoint := btcec.PrivKeyFromBytes(
revocationPreimage[:]) btcec.S256(), revocationPreimage[:],
)
// With the commitment point generated, we can now generate the four // With the commitment point generated, we can now generate the four
// keys we'll need to reconstruct the commitment state, // keys we'll need to reconstruct the commitment state,
@ -1893,8 +1894,9 @@ func NewBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64,
// number so we can have the proper witness script to sign and include // number so we can have the proper witness script to sign and include
// within the final witness. // within the final witness.
remoteDelay := uint32(chanState.RemoteChanCfg.CsvDelay) remoteDelay := uint32(chanState.RemoteChanCfg.CsvDelay)
remotePkScript, err := commitScriptToSelf(remoteDelay, keyRing.DelayKey, remotePkScript, err := commitScriptToSelf(
keyRing.RevocationKey) remoteDelay, keyRing.DelayKey, keyRing.RevocationKey,
)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1999,10 +2001,10 @@ func NewBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64,
return nil, err return nil, err
} }
// Otherwise, is this was an outgoing HTLC that we sent, then
// from the PoV of the remote commitment state, they're the
// receiver of this HTLC.
} else { } else {
// Otherwise, is this was an outgoing HTLC that we
// sent, then from the PoV of the remote commitment
// state, they're the receiver of this HTLC.
htlcScript, err = receiverHTLCScript( htlcScript, err = receiverHTLCScript(
htlc.RefundTimeout, keyRing.LocalHtlcKey, htlc.RefundTimeout, keyRing.LocalHtlcKey,
keyRing.RemoteHtlcKey, keyRing.RevocationKey, keyRing.RemoteHtlcKey, keyRing.RevocationKey,
@ -2582,9 +2584,11 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing,
Hash: txHash, Hash: txHash,
Index: uint32(htlc.remoteOutputIndex), Index: uint32(htlc.remoteOutputIndex),
} }
sigJob.tx, err = createHtlcTimeoutTx(op, outputAmt, sigJob.tx, err = createHtlcTimeoutTx(
htlc.Timeout, uint32(remoteChanCfg.CsvDelay), op, outputAmt, htlc.Timeout,
keyRing.RevocationKey, keyRing.DelayKey) uint32(remoteChanCfg.CsvDelay),
keyRing.RevocationKey, keyRing.DelayKey,
)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -2632,9 +2636,10 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing,
Hash: txHash, Hash: txHash,
Index: uint32(htlc.remoteOutputIndex), Index: uint32(htlc.remoteOutputIndex),
} }
sigJob.tx, err = createHtlcSuccessTx(op, outputAmt, sigJob.tx, err = createHtlcSuccessTx(
uint32(remoteChanCfg.CsvDelay), keyRing.RevocationKey, op, outputAmt, uint32(remoteChanCfg.CsvDelay),
keyRing.DelayKey) keyRing.RevocationKey, keyRing.DelayKey,
)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -3483,20 +3488,23 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment,
i := 0 i := 0
for index := range localCommitmentView.txn.TxOut { for index := range localCommitmentView.txn.TxOut {
var ( var (
sigHash func() ([]byte, error) htlcIndex uint64
sig *btcec.Signature sigHash func() ([]byte, error)
err error sig *btcec.Signature
err error
) )
outputIndex := int32(index) outputIndex := int32(index)
switch { switch {
// If this output index is found within the incoming HTLC index, // If this output index is found within the incoming HTLC
// then this means that we need to generate an HTLC success // index, then this means that we need to generate an HTLC
// transaction in order to validate the signature. // success transaction in order to validate the signature.
case localCommitmentView.incomingHTLCIndex[outputIndex] != nil: case localCommitmentView.incomingHTLCIndex[outputIndex] != nil:
htlc := localCommitmentView.incomingHTLCIndex[outputIndex] htlc := localCommitmentView.incomingHTLCIndex[outputIndex]
htlcIndex = htlc.HtlcIndex
sigHash = func() ([]byte, error) { sigHash = func() ([]byte, error) {
op := wire.OutPoint{ op := wire.OutPoint{
Hash: txHash, Hash: txHash,
@ -3547,6 +3555,8 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment,
case localCommitmentView.outgoingHTLCIndex[outputIndex] != nil: case localCommitmentView.outgoingHTLCIndex[outputIndex] != nil:
htlc := localCommitmentView.outgoingHTLCIndex[outputIndex] htlc := localCommitmentView.outgoingHTLCIndex[outputIndex]
htlcIndex = htlc.HtlcIndex
sigHash = func() ([]byte, error) { sigHash = func() ([]byte, error) {
op := wire.OutPoint{ op := wire.OutPoint{
Hash: txHash, Hash: txHash,
@ -3598,9 +3608,10 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment,
} }
verifyJobs = append(verifyJobs, verifyJob{ verifyJobs = append(verifyJobs, verifyJob{
pubKey: keyRing.RemoteHtlcKey, htlcIndex: htlcIndex,
sig: sig, pubKey: keyRing.RemoteHtlcKey,
sigHash: sigHash, sig: sig,
sigHash: sigHash,
}) })
i++ i++
@ -3617,7 +3628,7 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment,
} }
// InvalidCommitSigError is a struct that implements the error interface to // InvalidCommitSigError is a struct that implements the error interface to
// report a failure to validation a commitment signature for a remote peer. // report a failure to validate a commitment signature for a remote peer.
// We'll use the items in this struct to generate a rich error message for the // We'll use the items in this struct to generate a rich error message for the
// remote peer when we receive an invalid signature from it. Doing so can // remote peer when we receive an invalid signature from it. Doing so can
// greatly aide in debugging cross implementation issues. // greatly aide in debugging cross implementation issues.
@ -3635,7 +3646,7 @@ type InvalidCommitSigError struct {
// caused an invalid commitment signature. // caused an invalid commitment signature.
func (i *InvalidCommitSigError) Error() string { func (i *InvalidCommitSigError) Error() string {
return fmt.Sprintf("rejected commitment: commit_height=%v, "+ return fmt.Sprintf("rejected commitment: commit_height=%v, "+
"invalid_sig=%x, commit_tx=%x, sig_hash=%x", i.commitHeight, "invalid_commit_sig=%x, commit_tx=%x, sig_hash=%x", i.commitHeight,
i.commitSig[:], i.commitTx, i.sigHash[:]) i.commitSig[:], i.commitTx, i.sigHash[:])
} }
@ -3643,6 +3654,35 @@ func (i *InvalidCommitSigError) Error() string {
// error interface. // error interface.
var _ error = (*InvalidCommitSigError)(nil) var _ error = (*InvalidCommitSigError)(nil)
// InvalidCommitSigError is a struc that implements the error interface to
// report a failure to validate an htlc signature from a remote peer. We'll use
// the items in this struct to generate a rich error message for the remote
// peer when we receive an invalid signature from it. Doing so can greatly aide
// in debugging across implementation issues.
type InvalidHtlcSigError struct {
commitHeight uint64
htlcSig []byte
htlcIndex uint64
sigHash []byte
commitTx []byte
}
// Error returns a detailed error string including the exact transaction that
// caused an invalid htlc signature.
func (i *InvalidHtlcSigError) Error() string {
return fmt.Sprintf("rejected commitment: commit_height=%v, "+
"invalid_htlc_sig=%x, commit_tx=%x, sig_hash=%x", i.commitHeight,
i.htlcSig, i.commitTx, i.sigHash[:])
}
// A compile time flag to ensure that InvalidCommitSigError implements the
// error interface.
var _ error = (*InvalidCommitSigError)(nil)
// ReceiveNewCommitment process a signature for a new commitment state sent by // ReceiveNewCommitment process a signature for a new commitment state sent by
// the remote party. This method should be called in response to the // the remote party. This method should be called in response to the
// remote party initiating a new change, or when the remote party sends a // remote party initiating a new change, or when the remote party sends a
@ -3772,10 +3812,30 @@ func (lc *LightningChannel) ReceiveNewCommitment(commitSig lnwire.Sig,
// In the case that a single signature is invalid, we'll exit // In the case that a single signature is invalid, we'll exit
// early and cancel all the outstanding verification jobs. // early and cancel all the outstanding verification jobs.
select { select {
case err := <-verifyResps: case htlcErr := <-verifyResps:
if err != nil { if htlcErr != nil {
close(cancelChan) close(cancelChan)
return fmt.Errorf("invalid htlc signature: %v", err)
sig, err := lnwire.NewSigFromSignature(
htlcErr.sig,
)
if err != nil {
return err
}
sigHash, err := htlcErr.sigHash()
if err != nil {
return err
}
var txBytes bytes.Buffer
localCommitTx.Serialize(&txBytes)
return &InvalidHtlcSigError{
commitHeight: nextHeight,
htlcSig: sig.ToSignatureBytes(),
htlcIndex: htlcErr.htlcIndex,
sigHash: sigHash,
commitTx: txBytes.Bytes(),
}
} }
case <-lc.quit: case <-lc.quit:
return fmt.Errorf("channel shutting down") return fmt.Errorf("channel shutting down")

@ -41,6 +41,10 @@ type verifyJob struct {
// passed signature is known to have signed. // passed signature is known to have signed.
sigHash func() ([]byte, error) sigHash func() ([]byte, error)
// htlcIndex is the index of the HTLC from the PoV of the remote
// party's update log.
htlcIndex uint64
// cancel is a channel that should be closed if the caller wishes to // cancel is a channel that should be closed if the caller wishes to
// cancel all pending verification jobs part of a single batch. This // cancel all pending verification jobs part of a single batch. This
// channel is to be closed in the case that a single signature in a // channel is to be closed in the case that a single signature in a
@ -52,7 +56,16 @@ type verifyJob struct {
// is to be sent over. In the see that the signature is valid, a nil // is to be sent over. In the see that the signature is valid, a nil
// error will be passed. Otherwise, a concrete error detailing the // error will be passed. Otherwise, a concrete error detailing the
// issue will be passed. // issue will be passed.
errResp chan error errResp chan *htlcIndexErr
}
// verifyJobErr is a special type of error that also includes a pointer to the
// original validation job. Ths error message allows us to craft more detailed
// errors at upper layers.
type htlcIndexErr struct {
error
*verifyJob
} }
// signJob is a job sent to the sigPool to generate a valid signature according // signJob is a job sent to the sigPool to generate a valid signature according
@ -69,7 +82,8 @@ type signJob struct {
// proper sighash for the input to be signed. // proper sighash for the input to be signed.
tx *wire.MsgTx tx *wire.MsgTx
// outputIndex... // outputIndex is the output index of the HTLC on the commitment
// transaction being signed.
outputIndex int32 outputIndex int32
// cancel is a channel that should be closed if the caller wishes to // cancel is a channel that should be closed if the caller wishes to
@ -216,7 +230,10 @@ func (s *sigPool) poolWorker() {
sigHash, err := verifyMsg.sigHash() sigHash, err := verifyMsg.sigHash()
if err != nil { if err != nil {
select { select {
case verifyMsg.errResp <- err: case verifyMsg.errResp <- &htlcIndexErr{
error: err,
verifyJob: &verifyMsg,
}:
continue continue
case <-verifyMsg.cancel: case <-verifyMsg.cancel:
continue continue
@ -229,7 +246,10 @@ func (s *sigPool) poolWorker() {
err := fmt.Errorf("invalid signature "+ err := fmt.Errorf("invalid signature "+
"sighash: %x, sig: %x", sigHash, rawSig.Serialize()) "sighash: %x, sig: %x", sigHash, rawSig.Serialize())
select { select {
case verifyMsg.errResp <- err: case verifyMsg.errResp <- &htlcIndexErr{
error: err,
verifyJob: &verifyMsg,
}:
case <-verifyMsg.cancel: case <-verifyMsg.cancel:
case <-s.quit: case <-s.quit:
return return
@ -272,9 +292,9 @@ func (s *sigPool) SubmitSignBatch(signJobs []signJob) {
// allows the caller to cancel all pending jobs in the case that they wish to // allows the caller to cancel all pending jobs in the case that they wish to
// bail early. // bail early.
func (s *sigPool) SubmitVerifyBatch(verifyJobs []verifyJob, func (s *sigPool) SubmitVerifyBatch(verifyJobs []verifyJob,
cancelChan chan struct{}) <-chan error { cancelChan chan struct{}) <-chan *htlcIndexErr {
errChan := make(chan error, len(verifyJobs)) errChan := make(chan *htlcIndexErr, len(verifyJobs))
for _, job := range verifyJobs { for _, job := range verifyJobs {
job.cancel = cancelChan job.cancel = cancelChan

21
peer.go

@ -1874,10 +1874,23 @@ func (p *peer) sendInitMsg() error {
return p.writeMessage(msg) return p.writeMessage(msg)
} }
// SendMessage queues a message for sending to the target peer. // SendMessage sends message to remote peer. The second argument denotes if the
func (p *peer) SendMessage(msg lnwire.Message) error { // method should block until the message has been sent to the remote peer.
p.queueMsg(msg, nil) func (p *peer) SendMessage(msg lnwire.Message, sync bool) error {
return nil if !sync {
p.queueMsg(msg, nil)
return nil
}
errChan := make(chan error, 1)
p.queueMsg(msg, errChan)
select {
case err := <-errChan:
return err
case <-p.quit:
return fmt.Errorf("peer shutting down")
}
} }
// PubKey returns the pubkey of the peer in compressed serialized format. // PubKey returns the pubkey of the peer in compressed serialized format.