diff --git a/peer.go b/peer.go index 5aef5f59..2236e66c 100644 --- a/peer.go +++ b/peer.go @@ -28,6 +28,7 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/pool" + "github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/ticker" ) @@ -52,6 +53,9 @@ const ( // messages to be sent across the wire, requested by objects outside // this struct. outgoingQueueLen = 50 + + // errorBufferSize is the number of historic peer errors that we store. + errorBufferSize = 10 ) // outgoingMsg packages an lnwire.Message to be sent out on the wire, along with @@ -91,6 +95,13 @@ type channelCloseUpdate struct { Success bool } +// timestampedError is a timestamped error that is used to store the most recent +// errors we have experienced with our peers. +type timestampedError struct { + error error + timestamp time.Time +} + // peer is an active peer on the Lightning Network. This struct is responsible // for managing any channel state related to this peer. To do so, it has // several helper goroutines to handle events such as HTLC timeouts, new @@ -216,6 +227,14 @@ type peer struct { // peer's chansync message with its own over and over again. resentChanSyncMsg map[lnwire.ChannelID]struct{} + // errorBuffer stores a set of errors related to a peer. It contains + // error messages that our peer has recently sent us over the wire and + // records of unknown messages that were sent to us and, so that we can + // track a full record of the communication errors we have had with our + // peer. If we choose to disconnect from a peer, it also stores the + // reason we had for disconnecting. + errorBuffer *queue.CircularBuffer + // writePool is the task pool to that manages reuse of write buffers. // Write tasks are submitted to the pool in order to conserve the total // number of write buffers allocated at any one time, and decouple write @@ -233,12 +252,15 @@ type peer struct { var _ lnpeer.Peer = (*peer)(nil) // newPeer creates a new peer from an establish connection object, and a -// pointer to the main server. +// pointer to the main server. It takes an error buffer which may contain errors +// from a previous connection with the peer if we have been connected to them +// before. func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, addr *lnwire.NetAddress, inbound bool, features, legacyFeatures *lnwire.FeatureVector, chanActiveTimeout time.Duration, - outgoingCltvRejectDelta uint32) ( + outgoingCltvRejectDelta uint32, + errBuffer *queue.CircularBuffer) ( *peer, error) { nodePub := addr.IdentityKey @@ -276,6 +298,8 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, chanActiveTimeout: chanActiveTimeout, + errorBuffer: errBuffer, + writePool: server.writePool, readPool: server.readPool, @@ -338,6 +362,7 @@ func (p *peer) Start() error { msg := <-msgChan if msg, ok := msg.(*lnwire.Init); ok { if err := p.handleInitMsg(msg); err != nil { + p.storeError(err) return err } } else { @@ -668,7 +693,10 @@ func (p *peer) Disconnect(reason error) { return } - peerLog.Infof("Disconnecting %s, reason: %v", p, reason) + err := fmt.Errorf("disconnecting %s, reason: %v", p, reason) + p.storeError(err) + + peerLog.Infof(err.Error()) // Ensure that the TCP connection is properly closed before continuing. p.conn.Close() @@ -1026,12 +1054,17 @@ out: peerLog.Infof("unable to read message from %v: %v", p, err) - switch err.(type) { + // If we could not read our peer's message due to an + // unknown type or invalid alias, we continue processing + // as normal. We store unknown message and address + // types, as they may provide debugging insight. + switch e := err.(type) { // If this is just a message we don't yet recognize, // we'll continue processing as normal as this allows // us to introduce new messages in a forwards // compatible manner. case *lnwire.UnknownMessage: + p.storeError(e) idleTimer.Reset(idleTimeout) continue @@ -1040,12 +1073,15 @@ out: // simply continue parsing the remainder of their // messages. case *lnwire.ErrUnknownAddrType: + p.storeError(e) idleTimer.Reset(idleTimeout) continue // If the NodeAnnouncement has an invalid alias, then // we'll log that error above and continue so we can - // continue to read messges from the peer. + // continue to read messages from the peer. We do not + // store this error because it is of little debugging + // value. case *lnwire.ErrInvalidNodeAlias: idleTimer.Reset(idleTimeout) continue @@ -1141,8 +1177,13 @@ out: discStream.AddMsg(msg) default: - peerLog.Errorf("unknown message %v received from peer "+ - "%v", uint16(msg.MsgType()), p) + // If the message we received is unknown to us, store + // the type to track the failure. + err := fmt.Errorf("unknown message type %v received", + uint16(msg.MsgType())) + p.storeError(err) + + peerLog.Errorf("peer: %v, %v", p, err) } if isLinkUpdate { @@ -1181,13 +1222,39 @@ func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool { return ok } +// storeError stores an error in our peer's buffer of recent errors with the +// current timestamp. Errors are only stored if we have at least one active +// channel with the peer to mitigate dos attack vectors where a peer costlessly +// connects to us and spams us with errors. +func (p *peer) storeError(err error) { + p.activeChanMtx.RLock() + channelCount := len(p.activeChannels) + p.activeChanMtx.RUnlock() + + // If we do not have any active channels with the peer, we do not store + // errors as a dos mitigation. + if channelCount == 0 { + peerLog.Tracef("no channels with peer: %v, not storing err", p) + return + } + + p.errorBuffer.Add( + ×tampedError{timestamp: time.Now(), error: err}, + ) +} + // handleError processes an error message read from the remote peer. The boolean // returns indicates whether the message should be delivered to a targeted peer. +// It stores the error we received from the peer in memory if we have a channel +// open with the peer. // // NOTE: This method should only be called from within the readHandler. func (p *peer) handleError(msg *lnwire.Error) bool { key := p.addr.IdentityKey + // Store the error we have received. + p.storeError(msg) + switch { // In the case of an all-zero channel ID we want to forward the error to diff --git a/server.go b/server.go index 0864d99e..98f446d7 100644 --- a/server.go +++ b/server.go @@ -53,6 +53,7 @@ import ( "github.com/lightningnetwork/lnd/netann" "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/pool" + "github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing/localchans" "github.com/lightningnetwork/lnd/routing/route" @@ -176,6 +177,12 @@ type server struct { persistentConnReqs map[string][]*connmgr.ConnReq persistentRetryCancels map[string]chan struct{} + // peerErrors keeps a set of peer error buffers for peers that have + // disconnected from us. This allows us to track historic peer errors + // over connections. The string of the peer's compressed pubkey is used + // as a key for this map. + peerErrors map[string]*queue.CircularBuffer + // ignorePeerTermination tracks peers for which the server has initiated // a disconnect. Adding a peer to this map causes the peer termination // watcher to short circuit in the event that peers are purposefully @@ -425,6 +432,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, persistentPeersBackoff: make(map[string]time.Duration), persistentConnReqs: make(map[string][]*connmgr.ConnReq), persistentRetryCancels: make(map[string]chan struct{}), + peerErrors: make(map[string]*queue.CircularBuffer), ignorePeerTermination: make(map[*peer]struct{}), scheduledPeerConnection: make(map[string]func()), @@ -2782,6 +2790,19 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, initFeatures := s.featureMgr.Get(feature.SetInit) legacyFeatures := s.featureMgr.Get(feature.SetLegacyGlobal) + // Lookup past error caches for the peer in the server. If no buffer is + // found, create a fresh buffer. + pkStr := string(peerAddr.IdentityKey.SerializeCompressed()) + errBuffer, ok := s.peerErrors[pkStr] + if !ok { + var err error + errBuffer, err = queue.NewCircularBuffer(errorBufferSize) + if err != nil { + srvrLog.Errorf("unable to create peer %v", err) + return + } + } + // Now that we've established a connection, create a peer, and it to the // set of currently active peers. Configure the peer with the incoming // and outgoing broadcast deltas to prevent htlcs from being accepted or @@ -2791,7 +2812,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, p, err := newPeer( conn, connReq, s, peerAddr, inbound, initFeatures, legacyFeatures, cfg.ChanEnableTimeout, - defaultOutgoingCltvRejectDelta, + defaultOutgoingCltvRejectDelta, errBuffer, ) if err != nil { srvrLog.Errorf("unable to create peer %v", err) @@ -2803,6 +2824,11 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, s.addPeer(p) + // Once we have successfully added the peer to the server, we can + // delete the previous error buffer from the server's map of error + // buffers. + delete(s.peerErrors, pkStr) + // Dispatch a goroutine to asynchronously start the peer. This process // includes sending and receiving Init messages, which would be a DOS // vector if we held the server's mutex throughout the procedure. @@ -3097,6 +3123,12 @@ func (s *server) removePeer(p *peer) { delete(s.outboundPeers, pubStr) } + // Copy the peer's error buffer across to the server if it has any items + // in it so that we can restore peer errors across connections. + if p.errorBuffer.Total() > 0 { + s.peerErrors[pubStr] = p.errorBuffer + } + // Inform the peer notifier of a peer offline event so that it can be // reported to clients listening for peer events. var pubKey [33]byte