htlcswitch: use prefix logger for remaining log statements in link

This commit is contained in:
Joost Jager 2019-10-01 11:16:24 +02:00
parent 03ed1b0aa3
commit 654b3cc718
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7
2 changed files with 45 additions and 54 deletions

@ -425,11 +425,11 @@ var _ ChannelLink = (*channelLink)(nil)
func (l *channelLink) Start() error { func (l *channelLink) Start() error {
if !atomic.CompareAndSwapInt32(&l.started, 0, 1) { if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
err := errors.Errorf("channel link(%v): already started", l) err := errors.Errorf("channel link(%v): already started", l)
log.Warn(err) l.log.Warn("already started")
return err return err
} }
log.Infof("ChannelLink(%v) is starting", l) l.log.Info("starting")
// If the config supplied watchtower client, ensure the channel is // If the config supplied watchtower client, ensure the channel is
// registered before trying to use it during operation. // registered before trying to use it during operation.
@ -482,8 +482,7 @@ func (l *channelLink) Start() error {
err := l.cfg.UpdateContractSignals(signals) err := l.cfg.UpdateContractSignals(signals)
if err != nil { if err != nil {
log.Errorf("Unable to update signals for "+ l.log.Errorf("Unable to update signals")
"ChannelLink(%v)", l)
} }
}() }()
} }
@ -502,11 +501,11 @@ func (l *channelLink) Start() error {
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) Stop() { func (l *channelLink) Stop() {
if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) { if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
log.Warnf("channel link(%v): already stopped", l) l.log.Warn("already stopped")
return return
} }
log.Infof("ChannelLink(%v) is stopping", l) l.log.Info("stopping")
// As the link is stopping, we are no longer interested in hodl events // As the link is stopping, we are no longer interested in hodl events
// coming from the invoice registry. // coming from the invoice registry.
@ -531,7 +530,7 @@ func (l *channelLink) Stop() {
// we had learned them at some point. // we had learned them at some point.
err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...) err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
if err != nil { if err != nil {
log.Errorf("Unable to add preimages=%v to cache: %v", l.log.Errorf("Unable to add preimages=%v to cache: %v",
l.uncommittedPreimages, err) l.uncommittedPreimages, err)
} }
} }
@ -578,8 +577,8 @@ func (l *channelLink) sampleNetworkFee() (lnwallet.SatPerKWeight, error) {
return 0, err return 0, err
} }
log.Debugf("ChannelLink(%v): sampled fee rate for 3 block conf: %v "+ l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
"sat/kw", l, int64(feePerKw)) int64(feePerKw))
return feePerKw, nil return feePerKw, nil
} }
@ -610,8 +609,7 @@ func shouldAdjustCommitFee(netFee, chanFee lnwallet.SatPerKWeight) bool {
// flow. We'll compare out commitment chains with the remote party, and re-send // flow. We'll compare out commitment chains with the remote party, and re-send
// either a danging commit signature, a revocation, or both. // either a danging commit signature, a revocation, or both.
func (l *channelLink) syncChanStates() error { func (l *channelLink) syncChanStates() error {
log.Infof("Attempting to re-resynchronize ChannelPoint(%v)", l.log.Info("Attempting to re-resynchronize")
l.channel.ChannelPoint())
// First, we'll generate our ChanSync message to send to the other // First, we'll generate our ChanSync message to send to the other
// side. Based on this message, the remote party will decide if they // side. Based on this message, the remote party will decide if they
@ -650,9 +648,7 @@ func (l *channelLink) syncChanStates() error {
localChanSyncMsg.NextLocalCommitHeight == 1 && localChanSyncMsg.NextLocalCommitHeight == 1 &&
!l.channel.IsPending() { !l.channel.IsPending() {
log.Infof("ChannelPoint(%v): resending "+ l.log.Infof("resending FundingLocked message to peer")
"FundingLocked message to peer",
l.channel.ChannelPoint())
nextRevocation, err := l.channel.NextRevocationKey() nextRevocation, err := l.channel.NextRevocationKey()
if err != nil { if err != nil {
@ -671,8 +667,7 @@ func (l *channelLink) syncChanStates() error {
} }
// In any case, we'll then process their ChanSync message. // In any case, we'll then process their ChanSync message.
log.Infof("Received re-establishment message from remote side "+ l.log.Info("Received re-establishment message from remote side")
"for channel(%v)", l.channel.ChannelPoint())
var ( var (
openedCircuits []CircuitKey openedCircuits []CircuitKey
@ -701,9 +696,8 @@ func (l *channelLink) syncChanStates() error {
} }
if len(msgsToReSend) > 0 { if len(msgsToReSend) > 0 {
log.Infof("Sending %v updates to synchronize the "+ l.log.Infof("Sending %v updates to synchronize the "+
"state for ChannelPoint(%v)", len(msgsToReSend), "state", len(msgsToReSend))
l.channel.ChannelPoint())
} }
// If we have any messages to retransmit, we'll do so // If we have any messages to retransmit, we'll do so
@ -872,11 +866,10 @@ func (l *channelLink) htlcManager() {
defer func() { defer func() {
l.cfg.BatchTicker.Stop() l.cfg.BatchTicker.Stop()
l.wg.Done() l.wg.Done()
log.Infof("ChannelLink(%v) has exited", l) l.log.Infof("exited")
}() }()
log.Infof("HTLC manager for ChannelPoint(%v) started, "+ l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
"bandwidth=%v", l.channel.ChannelPoint(), l.Bandwidth())
// TODO(roasbeef): need to call wipe chan whenever D/C? // TODO(roasbeef): need to call wipe chan whenever D/C?
@ -887,14 +880,14 @@ func (l *channelLink) htlcManager() {
if l.cfg.SyncStates { if l.cfg.SyncStates {
err := l.syncChanStates() err := l.syncChanStates()
if err != nil { if err != nil {
log.Warnf("Error when syncing channel states: %v", err) l.log.Warnf("Error when syncing channel states: %v", err)
errDataLoss, localDataLoss := errDataLoss, localDataLoss :=
err.(*lnwallet.ErrCommitSyncLocalDataLoss) err.(*lnwallet.ErrCommitSyncLocalDataLoss)
switch { switch {
case err == ErrLinkShuttingDown: case err == ErrLinkShuttingDown:
log.Debugf("unable to sync channel states, " + l.log.Debugf("unable to sync channel states, " +
"link is shutting down") "link is shutting down")
return return
@ -942,7 +935,7 @@ func (l *channelLink) htlcManager() {
errDataLoss.CommitPoint, errDataLoss.CommitPoint,
) )
if err != nil { if err != nil {
log.Errorf("Unable to mark channel "+ l.log.Errorf("Unable to mark channel "+
"data loss: %v", err) "data loss: %v", err)
} }
@ -953,7 +946,7 @@ func (l *channelLink) htlcManager() {
// cases where this error is returned? // cases where this error is returned?
case err == lnwallet.ErrCannotSyncCommitChains: case err == lnwallet.ErrCannotSyncCommitChains:
if err := l.channel.MarkBorked(); err != nil { if err := l.channel.MarkBorked(); err != nil {
log.Errorf("Unable to mark channel "+ l.log.Errorf("Unable to mark channel "+
"borked: %v", err) "borked: %v", err)
} }
@ -1044,7 +1037,8 @@ out:
// blocks. // blocks.
netFee, err := l.sampleNetworkFee() netFee, err := l.sampleNetworkFee()
if err != nil { if err != nil {
log.Errorf("unable to sample network fee: %v", err) l.log.Errorf("unable to sample network fee: %v",
err)
continue continue
} }
@ -1063,7 +1057,8 @@ out:
// If we do, then we'll send a new UpdateFee message to // If we do, then we'll send a new UpdateFee message to
// the remote party, to be locked in with a new update. // the remote party, to be locked in with a new update.
if err := l.updateChannelFee(newCommitFee); err != nil { if err := l.updateChannelFee(newCommitFee); err != nil {
log.Errorf("unable to update fee rate: %v", err) l.log.Errorf("unable to update fee rate: %v",
err)
continue continue
} }
@ -1074,15 +1069,15 @@ out:
// //
// TODO(roasbeef): add force closure? also breach? // TODO(roasbeef): add force closure? also breach?
case <-l.cfg.ChainEvents.RemoteUnilateralClosure: case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
log.Warnf("Remote peer has closed ChannelPoint(%v) on-chain", l.log.Warnf("Remote peer has closed on-chain")
l.channel.ChannelPoint())
// TODO(roasbeef): remove all together // TODO(roasbeef): remove all together
go func() { go func() {
chanPoint := l.channel.ChannelPoint() chanPoint := l.channel.ChannelPoint()
err := l.cfg.Peer.WipeChannel(chanPoint) err := l.cfg.Peer.WipeChannel(chanPoint)
if err != nil { if err != nil {
log.Errorf("unable to wipe channel %v", err) l.log.Errorf("unable to wipe channel "+
"%v", err)
} }
}() }()
@ -1129,7 +1124,7 @@ out:
// to continue propagating within the network. // to continue propagating within the network.
case packet := <-l.overflowQueue.outgoingPkts: case packet := <-l.overflowQueue.outgoingPkts:
msg := packet.htlc.(*lnwire.UpdateAddHTLC) msg := packet.htlc.(*lnwire.UpdateAddHTLC)
log.Tracef("Reprocessing downstream add update "+ l.log.Tracef("Reprocessing downstream add update "+
"with payment hash(%x)", msg.PaymentHash[:]) "with payment hash(%x)", msg.PaymentHash[:])
l.handleDownStreamPkt(packet, true) l.handleDownStreamPkt(packet, true)
@ -1144,7 +1139,7 @@ out:
// failed, then we'll free up a new slot. // failed, then we'll free up a new slot.
htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
if ok && l.overflowQueue.Length() != 0 { if ok && l.overflowQueue.Length() != 0 {
log.Infof("Downstream htlc add update with "+ l.log.Infof("Downstream htlc add update with "+
"payment hash(%x) have been added to "+ "payment hash(%x) have been added to "+
"reprocessing queue, batch_size=%v", "reprocessing queue, batch_size=%v",
htlc.PaymentHash[:], htlc.PaymentHash[:],
@ -1659,7 +1654,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
OnionSHA256: msg.ShaOnionBlob, OnionSHA256: msg.ShaOnionBlob,
} }
default: default:
log.Warnf("Unexpected failure code received in "+ l.log.Warnf("Unexpected failure code received in "+
"UpdateFailMailformedHTLC: %v", msg.FailureCode) "UpdateFailMailformedHTLC: %v", msg.FailureCode)
// We don't just pass back the error we received from // We don't just pass back the error we received from
@ -1767,7 +1762,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// state. // state.
nextRevocation, currentHtlcs, err := l.channel.RevokeCurrentCommitment() nextRevocation, currentHtlcs, err := l.channel.RevokeCurrentCommitment()
if err != nil { if err != nil {
log.Errorf("unable to revoke commitment: %v", err) l.log.Errorf("unable to revoke commitment: %v", err)
return return
} }
l.cfg.Peer.SendMessage(false, nextRevocation) l.cfg.Peer.SendMessage(false, nextRevocation)
@ -1903,8 +1898,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
"ChannelPoint(%v): received error from peer: %v", "ChannelPoint(%v): received error from peer: %v",
l.channel.ChannelPoint(), msg.Error()) l.channel.ChannelPoint(), msg.Error())
default: default:
log.Warnf("ChannelPoint(%v): received unknown message of type %T", l.log.Warnf("received unknown message of type %T", msg)
l.channel.ChannelPoint(), msg)
} }
} }
@ -2106,8 +2100,7 @@ func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
ShortChanID: sid, ShortChanID: sid,
}) })
if err != nil { if err != nil {
log.Errorf("Unable to update signals for "+ l.log.Errorf("Unable to update signals")
"ChannelLink(%v)", l)
} }
}() }()
@ -2407,14 +2400,12 @@ func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
// committing to an update_fee message. // committing to an update_fee message.
func (l *channelLink) updateChannelFee(feePerKw lnwallet.SatPerKWeight) error { func (l *channelLink) updateChannelFee(feePerKw lnwallet.SatPerKWeight) error {
log.Infof("ChannelPoint(%v): updating commit fee to %v sat/kw", l, l.log.Infof("updating commit fee to %v sat/kw", feePerKw)
feePerKw)
// We skip sending the UpdateFee message if the channel is not // We skip sending the UpdateFee message if the channel is not
// currently eligible to forward messages. // currently eligible to forward messages.
if !l.EligibleToForward() { if !l.EligibleToForward() {
log.Debugf("ChannelPoint(%v): skipping fee update for "+ l.log.Debugf("skipping fee update for inactive channel")
"inactive channel", l.ChanID())
return nil return nil
} }
@ -2444,8 +2435,7 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg,
return return
} }
log.Debugf("ChannelLink(%v): settle-fail-filter %v", l.log.Debugf("settle-fail-filter %v", fwdPkg.SettleFailFilter)
l.ShortChanID(), fwdPkg.SettleFailFilter)
var switchPackets []*htlcPacket var switchPackets []*htlcPacket
for i, pd := range settleFails { for i, pd := range settleFails {
@ -2631,7 +2621,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
onionBlob[:], pd.SourceRef) onionBlob[:], pd.SourceRef)
needUpdate = true needUpdate = true
log.Errorf("unable to decode onion hop "+ l.log.Errorf("unable to decode onion hop "+
"iterator: %v", failureCode) "iterator: %v", failureCode)
continue continue
} }
@ -2650,7 +2640,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
) )
needUpdate = true needUpdate = true
log.Errorf("unable to decode onion "+ l.log.Errorf("unable to decode onion "+
"obfuscator: %v", failureCode) "obfuscator: %v", failureCode)
continue continue
} }
@ -2670,7 +2660,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
) )
needUpdate = true needUpdate = true
log.Errorf("Unable to decode forwarding "+ l.log.Errorf("Unable to decode forwarding "+
"instructions: %v", err) "instructions: %v", err)
continue continue
} }
@ -2770,7 +2760,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
buf := bytes.NewBuffer(addMsg.OnionBlob[0:0]) buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
err := chanIterator.EncodeNextHop(buf) err := chanIterator.EncodeNextHop(buf)
if err != nil { if err != nil {
log.Errorf("unable to encode the "+ l.log.Errorf("unable to encode the "+
"remaining route %v", err) "remaining route %v", err)
var failure lnwire.FailureMessage var failure lnwire.FailureMessage
@ -2868,7 +2858,7 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
// matches the HTLC we were extended. // matches the HTLC we were extended.
if pd.Amount != fwdInfo.AmountToForward { if pd.Amount != fwdInfo.AmountToForward {
log.Errorf("Onion payload of incoming htlc(%x) has incorrect "+ l.log.Errorf("Onion payload of incoming htlc(%x) has incorrect "+
"value: expected %v, got %v", pd.RHash, "value: expected %v, got %v", pd.RHash,
pd.Amount, fwdInfo.AmountToForward) pd.Amount, fwdInfo.AmountToForward)
@ -2881,7 +2871,7 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
// We'll also ensure that our time-lock value has been computed // We'll also ensure that our time-lock value has been computed
// correctly. // correctly.
if pd.Timeout != fwdInfo.OutgoingCTLV { if pd.Timeout != fwdInfo.OutgoingCTLV {
log.Errorf("Onion payload of incoming htlc(%x) has incorrect "+ l.log.Errorf("Onion payload of incoming htlc(%x) has incorrect "+
"time-lock: expected %v, got %v", "time-lock: expected %v, got %v",
pd.RHash[:], pd.Timeout, fwdInfo.OutgoingCTLV) pd.RHash[:], pd.Timeout, fwdInfo.OutgoingCTLV)
@ -3025,13 +3015,13 @@ func (l *channelLink) sendHTLCError(htlcIndex uint64, failure lnwire.FailureMess
reason, err := e.EncryptFirstHop(failure) reason, err := e.EncryptFirstHop(failure)
if err != nil { if err != nil {
log.Errorf("unable to obfuscate error: %v", err) l.log.Errorf("unable to obfuscate error: %v", err)
return return
} }
err = l.channel.FailHTLC(htlcIndex, reason, sourceRef, nil, nil) err = l.channel.FailHTLC(htlcIndex, reason, sourceRef, nil, nil)
if err != nil { if err != nil {
log.Errorf("unable cancel htlc: %v", err) l.log.Errorf("unable cancel htlc: %v", err)
return return
} }
@ -3050,7 +3040,7 @@ func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
shaOnionBlob := sha256.Sum256(onionBlob) shaOnionBlob := sha256.Sum256(onionBlob)
err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef) err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
if err != nil { if err != nil {
log.Errorf("unable cancel htlc: %v", err) l.log.Errorf("unable cancel htlc: %v", err)
return return
} }

@ -5417,6 +5417,7 @@ func TestHtlcSatisfyPolicy(t *testing.T) {
FetchLastChannelUpdate: fetchLastChannelUpdate, FetchLastChannelUpdate: fetchLastChannelUpdate,
MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry,
}, },
log: log,
} }
var hash [32]byte var hash [32]byte