diff --git a/peer.go b/peer.go index 6667fe95..efe58fcd 100644 --- a/peer.go +++ b/peer.go @@ -41,14 +41,16 @@ const ( // idleTimeout is the duration of inactivity before we time out a peer. idleTimeout = 5 * time.Minute - // writeMessageTimeout is the timeout used when writing a message to peer. + // writeMessageTimeout is the timeout used when writing a message to the + // peer. writeMessageTimeout = 5 * time.Second // readMessageTimeout is the timeout used when reading a message from a // peer. readMessageTimeout = 5 * time.Second - // handshakeTimeout is the timeout used when waiting for peer init message. + // handshakeTimeout is the timeout used when waiting for the peer's init + // message. handshakeTimeout = 15 * time.Second // outgoingQueueLen is the buffer size of the channel which houses @@ -70,14 +72,14 @@ type outgoingMsg struct { } // newChannelMsg packages a channeldb.OpenChannel with a channel that allows -// the receiver of the request to report when the funding transaction has been -// confirmed and the channel creation process completed. +// the receiver of the request to report when the channel creation process has +// completed. type newChannelMsg struct { channel *channeldb.OpenChannel err chan error } -// closeMsgs is a wrapper struct around any wire messages that deal with the +// closeMsg is a wrapper struct around any wire messages that deal with the // cooperative channel closure negotiation process. This struct includes the // raw channel ID targeted along with the original message. type closeMsg struct { @@ -115,18 +117,18 @@ type peer struct { started int32 disconnect int32 - // The following fields are only meant to be used *atomically* + // MUST be used atomically. bytesReceived uint64 bytesSent uint64 // pingTime is a rough estimate of the RTT (round-trip-time) between us - // and the connected peer. This time is expressed in micro seconds. + // and the connected peer. This time is expressed in microseconds. // To be used atomically. // TODO(roasbeef): also use a WMA or EMA? pingTime int64 // pingLastSend is the Unix time expressed in nanoseconds when we sent - // our last ping message. To be used atomically. + // our last ping message. To be used atomically. pingLastSend int64 cfg *Config @@ -141,9 +143,8 @@ type peer struct { // ready to process messages. activeSignal chan struct{} - // startTime is the time this peer connection was successfully - // established. It will be zero for peers that did not successfully - // Start(). + // startTime is the time this peer connection was successfully established. + // It will be zero for peers that did not successfully call Start(). startTime time.Time inbound bool @@ -157,7 +158,7 @@ type peer struct { outgoingQueue chan outgoingMsg // activeChanMtx protects access to the activeChannels and - // addeddChannels maps. + // addedChannels maps. activeChanMtx sync.RWMutex // activeChannels is a map which stores the state machines of all @@ -186,11 +187,10 @@ type peer struct { // proxy messages to individual, active links. activeMsgStreams map[lnwire.ChannelID]*msgStream - // activeChanCloses is a map that keep track of all the active - // cooperative channel closures that are active. Any channel closing - // messages are directed to one of these active state machines. Once - // the channel has been closed, the state machine will be delete from - // the map. + // activeChanCloses is a map that keeps track of all the active + // cooperative channel closures. Any channel closing messages are directed + // to one of these active state machines. Once the channel has been closed, + // the state machine will be deleted from the map. activeChanCloses map[lnwire.ChannelID]*chancloser.ChanCloser // localCloseChanReqs is a channel in which any local requests to close @@ -457,8 +457,8 @@ func (p *peer) initGossipSync() { srvrLog.Infof("Negotiated chan series queries with %x", p.pubKeyBytes[:]) - // Register the this peer's for gossip syncer with the gossiper. - // This is blocks synchronously to ensure the gossip syncer is + // Register the peer's gossip syncer with the gossiper. + // This blocks synchronously to ensure the gossip syncer is // registered with the gossiper before attempting to read // messages from the remote peer. // @@ -468,6 +468,7 @@ func (p *peer) initGossipSync() { // peers. p.server.authGossiper.InitSyncState(p) } + } // QuitSignal is a method that should return a channel which will be sent upon @@ -624,7 +625,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( return msgs, nil } -// addLink creates and adds a new link from the specified channel. +// addLink creates and adds a new ChannelLink from the specified channel. func (p *peer) addLink(chanPoint *wire.OutPoint, lnChan *lnwallet.LightningChannel, forwardingPolicy *htlcswitch.ForwardingPolicy, @@ -704,7 +705,7 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, } // maybeSendNodeAnn sends our node announcement to the remote peer if at least -// one confirmed advertised channel exists with them. +// one confirmed public channel exists with them. func (p *peer) maybeSendNodeAnn(channels []*channeldb.OpenChannel) { hasConfirmedPublicChan := false for _, channel := range channels { @@ -735,7 +736,7 @@ func (p *peer) maybeSendNodeAnn(channels []*channeldb.OpenChannel) { } // WaitForDisconnect waits until the peer has disconnected. A peer may be -// disconnected if the local or remote side terminating the connection, or an +// disconnected if the local or remote side terminates the connection, or an // irrecoverable protocol error has been encountered. This method will only // begin watching the peer's waitgroup after the ready channel or the peer's // quit channel are signaled. The ready channel should only be signaled if a @@ -994,7 +995,9 @@ func (ms *msgStream) AddMsg(msg lnwire.Message) { // waitUntilLinkActive waits until the target link is active and returns a // ChannelLink to pass messages to. It accomplishes this by subscribing to // an ActiveLinkEvent which is emitted by the link when it first starts up. -func waitUntilLinkActive(p *peer, cid lnwire.ChannelID) htlcswitch.ChannelLink { +func waitUntilLinkActive(p *peer, + cid lnwire.ChannelID) htlcswitch.ChannelLink { + // Subscribe to receive channel events. // // NOTE: If the link is already active by SubscribeChannelEvents, then @@ -1319,7 +1322,7 @@ func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool { // storeError stores an error in our peer's buffer of recent errors with the // current timestamp. Errors are only stored if we have at least one active -// channel with the peer to mitigate dos attack vectors where a peer costlessly +// channel with the peer to mitigate a dos vector where a peer costlessly // connects to us and spams us with errors. func (p *peer) storeError(err error) { var haveChannels bool @@ -1581,9 +1584,9 @@ func (p *peer) logWireMessage(msg lnwire.Message, read bool) { // writeMessage writes and flushes the target lnwire.Message to the remote peer. // If the passed message is nil, this method will only try to flush an existing -// message buffered on the connection. It is safe to recall this method with a -// nil message iff a timeout error is returned. This will continue to flush the -// pending message to the wire. +// message buffered on the connection. It is safe to call this method again +// with a nil message iff a timeout error is returned. This will continue to +// flush the pending message to the wire. func (p *peer) writeMessage(msg lnwire.Message) error { // Simply exit if we're shutting down. if atomic.LoadInt32(&p.disconnect) != 0 { @@ -1861,11 +1864,14 @@ func (p *peer) queueMsgLazy(msg lnwire.Message, errChan chan error) { // queue sends a given message to the queueHandler using the passed priority. If // the errChan is non-nil, an error is sent back if the msg failed to queue or // failed to write, and nil otherwise. -func (p *peer) queue(priority bool, msg lnwire.Message, errChan chan error) { +func (p *peer) queue(priority bool, msg lnwire.Message, + errChan chan error) { + select { case p.outgoingQueue <- outgoingMsg{priority, msg, errChan}: case <-p.quit: - peerLog.Tracef("Peer shutting down, could not enqueue msg.") + peerLog.Tracef("Peer shutting down, could not enqueue msg: %v.", + spew.Sdump(msg)) if errChan != nil { errChan <- lnpeer.ErrPeerExiting } @@ -2453,10 +2459,10 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) { } } -// linkFailureReport is sent to the channelManager whenever a link that was -// added to the switch reports a link failure, and is forced to exit. The report -// houses the necessary information to cleanup the channel state, send back the -// error message, and force close if necessary. +// linkFailureReport is sent to the channelManager whenever a link reports a +// link failure, and is forced to exit. The report houses the necessary +// information to clean up the channel state, send back the error message, and +// force close if necessary. type linkFailureReport struct { chanPoint wire.OutPoint chanID lnwire.ChannelID @@ -2465,7 +2471,7 @@ type linkFailureReport struct { } // handleLinkFailure processes a link failure report when a link in the switch -// fails. It handles facilitates removal of all channel state within the peer, +// fails. It facilitates the removal of all channel state within the peer, // force closing the channel depending on severity, and sending the error // message back to the remote party. func (p *peer) handleLinkFailure(failure linkFailureReport) { @@ -2617,7 +2623,7 @@ func waitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier, } // WipeChannel removes the passed channel point from all indexes associated with -// the peer, and the switch. +// the peer and the switch. func (p *peer) WipeChannel(chanPoint *wire.OutPoint) { chanID := lnwire.NewChanIDFromOutPoint(chanPoint) @@ -2631,7 +2637,7 @@ func (p *peer) WipeChannel(chanPoint *wire.OutPoint) { } // handleInitMsg handles the incoming init message which contains global and -// local features vectors. If feature vectors are incompatible then disconnect. +// local feature vectors. If feature vectors are incompatible then disconnect. func (p *peer) handleInitMsg(msg *lnwire.Init) error { // First, merge any features from the legacy global features field into // those presented in the local features fields. @@ -2641,7 +2647,7 @@ func (p *peer) handleInitMsg(msg *lnwire.Init) error { err) } - // Then, finalize the remote feature vector providing the flatteneed + // Then, finalize the remote feature vector providing the flattened // feature bit namespace. p.remoteFeatures = lnwire.NewFeatureVector( msg.Features, lnwire.Features, @@ -2654,8 +2660,8 @@ func (p *peer) handleInitMsg(msg *lnwire.Init) error { return fmt.Errorf("invalid remote features: %v", err) } - // Ensure the remote party's feature vector contains all transistive - // dependencies. We know ours are are correct since they are validated + // Ensure the remote party's feature vector contains all transitive + // dependencies. We know ours are correct since they are validated // during the feature manager's instantiation. err = feature.ValidateDeps(p.remoteFeatures) if err != nil { @@ -2690,8 +2696,8 @@ func (p *peer) RemoteFeatures() *lnwire.FeatureVector { return p.remoteFeatures } -// sendInitMsg sends init message to remote peer which contains our currently -// supported local and global features. +// sendInitMsg sends the Init message to the remote peer. This message contains our +// currently supported local and global features. func (p *peer) sendInitMsg() error { msg := lnwire.NewInitMessage( p.legacyFeatures.RawFeatureVector, @@ -2745,20 +2751,20 @@ func (p *peer) resendChanSyncMsg(cid lnwire.ChannelID) error { return nil } -// SendMessage sends a variadic number of high-priority message to remote peer. -// The first argument denotes if the method should block until the messages have -// been sent to the remote peer or an error is returned, otherwise it returns -// immediately after queuing. +// SendMessage sends a variadic number of high-priority messages to the remote +// peer. The first argument denotes if the method should block until the +// messages have been sent to the remote peer or an error is returned, +// otherwise it returns immediately after queuing. // // NOTE: Part of the lnpeer.Peer interface. func (p *peer) SendMessage(sync bool, msgs ...lnwire.Message) error { return p.sendMessage(sync, true, msgs...) } -// SendMessageLazy sends a variadic number of low-priority message to remote -// peer. The first argument denotes if the method should block until the -// messages have been sent to the remote peer or an error is returned, otherwise -// it returns immediately after queueing. +// SendMessageLazy sends a variadic number of low-priority messages to the +// remote peer. The first argument denotes if the method should block until +// the messages have been sent to the remote peer or an error is returned, +// otherwise it returns immediately after queueing. // // NOTE: Part of the lnpeer.Peer interface. func (p *peer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error {