peer.go: comment clarifications, improved formatting

This commit is contained in:
nsa 2020-06-26 17:55:40 -04:00
parent 01415f4a5f
commit d23b30d889

104
peer.go

@ -41,14 +41,16 @@ const (
// idleTimeout is the duration of inactivity before we time out a peer. // idleTimeout is the duration of inactivity before we time out a peer.
idleTimeout = 5 * time.Minute idleTimeout = 5 * time.Minute
// writeMessageTimeout is the timeout used when writing a message to peer. // writeMessageTimeout is the timeout used when writing a message to the
// peer.
writeMessageTimeout = 5 * time.Second writeMessageTimeout = 5 * time.Second
// readMessageTimeout is the timeout used when reading a message from a // readMessageTimeout is the timeout used when reading a message from a
// peer. // peer.
readMessageTimeout = 5 * time.Second readMessageTimeout = 5 * time.Second
// handshakeTimeout is the timeout used when waiting for peer init message. // handshakeTimeout is the timeout used when waiting for the peer's init
// message.
handshakeTimeout = 15 * time.Second handshakeTimeout = 15 * time.Second
// outgoingQueueLen is the buffer size of the channel which houses // outgoingQueueLen is the buffer size of the channel which houses
@ -70,14 +72,14 @@ type outgoingMsg struct {
} }
// newChannelMsg packages a channeldb.OpenChannel with a channel that allows // newChannelMsg packages a channeldb.OpenChannel with a channel that allows
// the receiver of the request to report when the funding transaction has been // the receiver of the request to report when the channel creation process has
// confirmed and the channel creation process completed. // completed.
type newChannelMsg struct { type newChannelMsg struct {
channel *channeldb.OpenChannel channel *channeldb.OpenChannel
err chan error err chan error
} }
// closeMsgs is a wrapper struct around any wire messages that deal with the // closeMsg is a wrapper struct around any wire messages that deal with the
// cooperative channel closure negotiation process. This struct includes the // cooperative channel closure negotiation process. This struct includes the
// raw channel ID targeted along with the original message. // raw channel ID targeted along with the original message.
type closeMsg struct { type closeMsg struct {
@ -115,18 +117,18 @@ type peer struct {
started int32 started int32
disconnect int32 disconnect int32
// The following fields are only meant to be used *atomically* // MUST be used atomically.
bytesReceived uint64 bytesReceived uint64
bytesSent uint64 bytesSent uint64
// pingTime is a rough estimate of the RTT (round-trip-time) between us // pingTime is a rough estimate of the RTT (round-trip-time) between us
// and the connected peer. This time is expressed in micro seconds. // and the connected peer. This time is expressed in microseconds.
// To be used atomically. // To be used atomically.
// TODO(roasbeef): also use a WMA or EMA? // TODO(roasbeef): also use a WMA or EMA?
pingTime int64 pingTime int64
// pingLastSend is the Unix time expressed in nanoseconds when we sent // pingLastSend is the Unix time expressed in nanoseconds when we sent
// our last ping message. To be used atomically. // our last ping message. To be used atomically.
pingLastSend int64 pingLastSend int64
cfg *Config cfg *Config
@ -141,9 +143,8 @@ type peer struct {
// ready to process messages. // ready to process messages.
activeSignal chan struct{} activeSignal chan struct{}
// startTime is the time this peer connection was successfully // startTime is the time this peer connection was successfully established.
// established. It will be zero for peers that did not successfully // It will be zero for peers that did not successfully call Start().
// Start().
startTime time.Time startTime time.Time
inbound bool inbound bool
@ -157,7 +158,7 @@ type peer struct {
outgoingQueue chan outgoingMsg outgoingQueue chan outgoingMsg
// activeChanMtx protects access to the activeChannels and // activeChanMtx protects access to the activeChannels and
// addeddChannels maps. // addedChannels maps.
activeChanMtx sync.RWMutex activeChanMtx sync.RWMutex
// activeChannels is a map which stores the state machines of all // activeChannels is a map which stores the state machines of all
@ -186,11 +187,10 @@ type peer struct {
// proxy messages to individual, active links. // proxy messages to individual, active links.
activeMsgStreams map[lnwire.ChannelID]*msgStream activeMsgStreams map[lnwire.ChannelID]*msgStream
// activeChanCloses is a map that keep track of all the active // activeChanCloses is a map that keeps track of all the active
// cooperative channel closures that are active. Any channel closing // cooperative channel closures. Any channel closing messages are directed
// messages are directed to one of these active state machines. Once // to one of these active state machines. Once the channel has been closed,
// the channel has been closed, the state machine will be delete from // the state machine will be deleted from the map.
// the map.
activeChanCloses map[lnwire.ChannelID]*chancloser.ChanCloser activeChanCloses map[lnwire.ChannelID]*chancloser.ChanCloser
// localCloseChanReqs is a channel in which any local requests to close // localCloseChanReqs is a channel in which any local requests to close
@ -457,8 +457,8 @@ func (p *peer) initGossipSync() {
srvrLog.Infof("Negotiated chan series queries with %x", srvrLog.Infof("Negotiated chan series queries with %x",
p.pubKeyBytes[:]) p.pubKeyBytes[:])
// Register the this peer's for gossip syncer with the gossiper. // Register the peer's gossip syncer with the gossiper.
// This is blocks synchronously to ensure the gossip syncer is // This blocks synchronously to ensure the gossip syncer is
// registered with the gossiper before attempting to read // registered with the gossiper before attempting to read
// messages from the remote peer. // messages from the remote peer.
// //
@ -468,6 +468,7 @@ func (p *peer) initGossipSync() {
// peers. // peers.
p.server.authGossiper.InitSyncState(p) p.server.authGossiper.InitSyncState(p)
} }
} }
// QuitSignal is a method that should return a channel which will be sent upon // QuitSignal is a method that should return a channel which will be sent upon
@ -624,7 +625,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) (
return msgs, nil return msgs, nil
} }
// addLink creates and adds a new link from the specified channel. // addLink creates and adds a new ChannelLink from the specified channel.
func (p *peer) addLink(chanPoint *wire.OutPoint, func (p *peer) addLink(chanPoint *wire.OutPoint,
lnChan *lnwallet.LightningChannel, lnChan *lnwallet.LightningChannel,
forwardingPolicy *htlcswitch.ForwardingPolicy, forwardingPolicy *htlcswitch.ForwardingPolicy,
@ -704,7 +705,7 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
} }
// maybeSendNodeAnn sends our node announcement to the remote peer if at least // maybeSendNodeAnn sends our node announcement to the remote peer if at least
// one confirmed advertised channel exists with them. // one confirmed public channel exists with them.
func (p *peer) maybeSendNodeAnn(channels []*channeldb.OpenChannel) { func (p *peer) maybeSendNodeAnn(channels []*channeldb.OpenChannel) {
hasConfirmedPublicChan := false hasConfirmedPublicChan := false
for _, channel := range channels { for _, channel := range channels {
@ -735,7 +736,7 @@ func (p *peer) maybeSendNodeAnn(channels []*channeldb.OpenChannel) {
} }
// WaitForDisconnect waits until the peer has disconnected. A peer may be // WaitForDisconnect waits until the peer has disconnected. A peer may be
// disconnected if the local or remote side terminating the connection, or an // disconnected if the local or remote side terminates the connection, or an
// irrecoverable protocol error has been encountered. This method will only // irrecoverable protocol error has been encountered. This method will only
// begin watching the peer's waitgroup after the ready channel or the peer's // begin watching the peer's waitgroup after the ready channel or the peer's
// quit channel are signaled. The ready channel should only be signaled if a // quit channel are signaled. The ready channel should only be signaled if a
@ -994,7 +995,9 @@ func (ms *msgStream) AddMsg(msg lnwire.Message) {
// waitUntilLinkActive waits until the target link is active and returns a // waitUntilLinkActive waits until the target link is active and returns a
// ChannelLink to pass messages to. It accomplishes this by subscribing to // ChannelLink to pass messages to. It accomplishes this by subscribing to
// an ActiveLinkEvent which is emitted by the link when it first starts up. // an ActiveLinkEvent which is emitted by the link when it first starts up.
func waitUntilLinkActive(p *peer, cid lnwire.ChannelID) htlcswitch.ChannelLink { func waitUntilLinkActive(p *peer,
cid lnwire.ChannelID) htlcswitch.ChannelLink {
// Subscribe to receive channel events. // Subscribe to receive channel events.
// //
// NOTE: If the link is already active by SubscribeChannelEvents, then // NOTE: If the link is already active by SubscribeChannelEvents, then
@ -1319,7 +1322,7 @@ func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool {
// storeError stores an error in our peer's buffer of recent errors with the // storeError stores an error in our peer's buffer of recent errors with the
// current timestamp. Errors are only stored if we have at least one active // current timestamp. Errors are only stored if we have at least one active
// channel with the peer to mitigate dos attack vectors where a peer costlessly // channel with the peer to mitigate a dos vector where a peer costlessly
// connects to us and spams us with errors. // connects to us and spams us with errors.
func (p *peer) storeError(err error) { func (p *peer) storeError(err error) {
var haveChannels bool var haveChannels bool
@ -1581,9 +1584,9 @@ func (p *peer) logWireMessage(msg lnwire.Message, read bool) {
// writeMessage writes and flushes the target lnwire.Message to the remote peer. // writeMessage writes and flushes the target lnwire.Message to the remote peer.
// If the passed message is nil, this method will only try to flush an existing // If the passed message is nil, this method will only try to flush an existing
// message buffered on the connection. It is safe to recall this method with a // message buffered on the connection. It is safe to call this method again
// nil message iff a timeout error is returned. This will continue to flush the // with a nil message iff a timeout error is returned. This will continue to
// pending message to the wire. // flush the pending message to the wire.
func (p *peer) writeMessage(msg lnwire.Message) error { func (p *peer) writeMessage(msg lnwire.Message) error {
// Simply exit if we're shutting down. // Simply exit if we're shutting down.
if atomic.LoadInt32(&p.disconnect) != 0 { if atomic.LoadInt32(&p.disconnect) != 0 {
@ -1861,11 +1864,14 @@ func (p *peer) queueMsgLazy(msg lnwire.Message, errChan chan error) {
// queue sends a given message to the queueHandler using the passed priority. If // queue sends a given message to the queueHandler using the passed priority. If
// the errChan is non-nil, an error is sent back if the msg failed to queue or // the errChan is non-nil, an error is sent back if the msg failed to queue or
// failed to write, and nil otherwise. // failed to write, and nil otherwise.
func (p *peer) queue(priority bool, msg lnwire.Message, errChan chan error) { func (p *peer) queue(priority bool, msg lnwire.Message,
errChan chan error) {
select { select {
case p.outgoingQueue <- outgoingMsg{priority, msg, errChan}: case p.outgoingQueue <- outgoingMsg{priority, msg, errChan}:
case <-p.quit: case <-p.quit:
peerLog.Tracef("Peer shutting down, could not enqueue msg.") peerLog.Tracef("Peer shutting down, could not enqueue msg: %v.",
spew.Sdump(msg))
if errChan != nil { if errChan != nil {
errChan <- lnpeer.ErrPeerExiting errChan <- lnpeer.ErrPeerExiting
} }
@ -2453,10 +2459,10 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) {
} }
} }
// linkFailureReport is sent to the channelManager whenever a link that was // linkFailureReport is sent to the channelManager whenever a link reports a
// added to the switch reports a link failure, and is forced to exit. The report // link failure, and is forced to exit. The report houses the necessary
// houses the necessary information to cleanup the channel state, send back the // information to clean up the channel state, send back the error message, and
// error message, and force close if necessary. // force close if necessary.
type linkFailureReport struct { type linkFailureReport struct {
chanPoint wire.OutPoint chanPoint wire.OutPoint
chanID lnwire.ChannelID chanID lnwire.ChannelID
@ -2465,7 +2471,7 @@ type linkFailureReport struct {
} }
// handleLinkFailure processes a link failure report when a link in the switch // handleLinkFailure processes a link failure report when a link in the switch
// fails. It handles facilitates removal of all channel state within the peer, // fails. It facilitates the removal of all channel state within the peer,
// force closing the channel depending on severity, and sending the error // force closing the channel depending on severity, and sending the error
// message back to the remote party. // message back to the remote party.
func (p *peer) handleLinkFailure(failure linkFailureReport) { func (p *peer) handleLinkFailure(failure linkFailureReport) {
@ -2617,7 +2623,7 @@ func waitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier,
} }
// WipeChannel removes the passed channel point from all indexes associated with // WipeChannel removes the passed channel point from all indexes associated with
// the peer, and the switch. // the peer and the switch.
func (p *peer) WipeChannel(chanPoint *wire.OutPoint) { func (p *peer) WipeChannel(chanPoint *wire.OutPoint) {
chanID := lnwire.NewChanIDFromOutPoint(chanPoint) chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
@ -2631,7 +2637,7 @@ func (p *peer) WipeChannel(chanPoint *wire.OutPoint) {
} }
// handleInitMsg handles the incoming init message which contains global and // handleInitMsg handles the incoming init message which contains global and
// local features vectors. If feature vectors are incompatible then disconnect. // local feature vectors. If feature vectors are incompatible then disconnect.
func (p *peer) handleInitMsg(msg *lnwire.Init) error { func (p *peer) handleInitMsg(msg *lnwire.Init) error {
// First, merge any features from the legacy global features field into // First, merge any features from the legacy global features field into
// those presented in the local features fields. // those presented in the local features fields.
@ -2641,7 +2647,7 @@ func (p *peer) handleInitMsg(msg *lnwire.Init) error {
err) err)
} }
// Then, finalize the remote feature vector providing the flatteneed // Then, finalize the remote feature vector providing the flattened
// feature bit namespace. // feature bit namespace.
p.remoteFeatures = lnwire.NewFeatureVector( p.remoteFeatures = lnwire.NewFeatureVector(
msg.Features, lnwire.Features, msg.Features, lnwire.Features,
@ -2654,8 +2660,8 @@ func (p *peer) handleInitMsg(msg *lnwire.Init) error {
return fmt.Errorf("invalid remote features: %v", err) return fmt.Errorf("invalid remote features: %v", err)
} }
// Ensure the remote party's feature vector contains all transistive // Ensure the remote party's feature vector contains all transitive
// dependencies. We know ours are are correct since they are validated // dependencies. We know ours are correct since they are validated
// during the feature manager's instantiation. // during the feature manager's instantiation.
err = feature.ValidateDeps(p.remoteFeatures) err = feature.ValidateDeps(p.remoteFeatures)
if err != nil { if err != nil {
@ -2690,8 +2696,8 @@ func (p *peer) RemoteFeatures() *lnwire.FeatureVector {
return p.remoteFeatures return p.remoteFeatures
} }
// sendInitMsg sends init message to remote peer which contains our currently // sendInitMsg sends the Init message to the remote peer. This message contains our
// supported local and global features. // currently supported local and global features.
func (p *peer) sendInitMsg() error { func (p *peer) sendInitMsg() error {
msg := lnwire.NewInitMessage( msg := lnwire.NewInitMessage(
p.legacyFeatures.RawFeatureVector, p.legacyFeatures.RawFeatureVector,
@ -2745,20 +2751,20 @@ func (p *peer) resendChanSyncMsg(cid lnwire.ChannelID) error {
return nil return nil
} }
// SendMessage sends a variadic number of high-priority message to remote peer. // SendMessage sends a variadic number of high-priority messages to the remote
// The first argument denotes if the method should block until the messages have // peer. The first argument denotes if the method should block until the
// been sent to the remote peer or an error is returned, otherwise it returns // messages have been sent to the remote peer or an error is returned,
// immediately after queuing. // otherwise it returns immediately after queuing.
// //
// NOTE: Part of the lnpeer.Peer interface. // NOTE: Part of the lnpeer.Peer interface.
func (p *peer) SendMessage(sync bool, msgs ...lnwire.Message) error { func (p *peer) SendMessage(sync bool, msgs ...lnwire.Message) error {
return p.sendMessage(sync, true, msgs...) return p.sendMessage(sync, true, msgs...)
} }
// SendMessageLazy sends a variadic number of low-priority message to remote // SendMessageLazy sends a variadic number of low-priority messages to the
// peer. The first argument denotes if the method should block until the // remote peer. The first argument denotes if the method should block until
// messages have been sent to the remote peer or an error is returned, otherwise // the messages have been sent to the remote peer or an error is returned,
// it returns immediately after queueing. // otherwise it returns immediately after queueing.
// //
// NOTE: Part of the lnpeer.Peer interface. // NOTE: Part of the lnpeer.Peer interface.
func (p *peer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error { func (p *peer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error {