peer+server: store errors for peers with open channels

Add an error buffer to the peer struct which will store errors for
peers that we have active channels with. We do not store these errors
with peers that we do not have channels open with to prevent peers from
connecting and costlessly spamming us with error messages. When the peer
disconnects, the error buffer is offloaded to the server so that we can
track errors across connections. When peers reconnect, they are created
with their historic error buffer.
This commit is contained in:
carla 2020-03-17 08:22:35 +02:00
parent a223e4eedb
commit 54089febd6
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91
2 changed files with 107 additions and 8 deletions

81
peer.go

@ -28,6 +28,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/pool" "github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/ticker"
) )
@ -52,6 +53,9 @@ const (
// messages to be sent across the wire, requested by objects outside // messages to be sent across the wire, requested by objects outside
// this struct. // this struct.
outgoingQueueLen = 50 outgoingQueueLen = 50
// errorBufferSize is the number of historic peer errors that we store.
errorBufferSize = 10
) )
// outgoingMsg packages an lnwire.Message to be sent out on the wire, along with // outgoingMsg packages an lnwire.Message to be sent out on the wire, along with
@ -91,6 +95,13 @@ type channelCloseUpdate struct {
Success bool Success bool
} }
// timestampedError is a timestamped error that is used to store the most recent
// errors we have experienced with our peers.
type timestampedError struct {
error error
timestamp time.Time
}
// peer is an active peer on the Lightning Network. This struct is responsible // peer is an active peer on the Lightning Network. This struct is responsible
// for managing any channel state related to this peer. To do so, it has // for managing any channel state related to this peer. To do so, it has
// several helper goroutines to handle events such as HTLC timeouts, new // several helper goroutines to handle events such as HTLC timeouts, new
@ -216,6 +227,14 @@ type peer struct {
// peer's chansync message with its own over and over again. // peer's chansync message with its own over and over again.
resentChanSyncMsg map[lnwire.ChannelID]struct{} resentChanSyncMsg map[lnwire.ChannelID]struct{}
// errorBuffer stores a set of errors related to a peer. It contains
// error messages that our peer has recently sent us over the wire and
// records of unknown messages that were sent to us and, so that we can
// track a full record of the communication errors we have had with our
// peer. If we choose to disconnect from a peer, it also stores the
// reason we had for disconnecting.
errorBuffer *queue.CircularBuffer
// writePool is the task pool to that manages reuse of write buffers. // writePool is the task pool to that manages reuse of write buffers.
// Write tasks are submitted to the pool in order to conserve the total // Write tasks are submitted to the pool in order to conserve the total
// number of write buffers allocated at any one time, and decouple write // number of write buffers allocated at any one time, and decouple write
@ -233,12 +252,15 @@ type peer struct {
var _ lnpeer.Peer = (*peer)(nil) var _ lnpeer.Peer = (*peer)(nil)
// newPeer creates a new peer from an establish connection object, and a // newPeer creates a new peer from an establish connection object, and a
// pointer to the main server. // pointer to the main server. It takes an error buffer which may contain errors
// from a previous connection with the peer if we have been connected to them
// before.
func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
addr *lnwire.NetAddress, inbound bool, addr *lnwire.NetAddress, inbound bool,
features, legacyFeatures *lnwire.FeatureVector, features, legacyFeatures *lnwire.FeatureVector,
chanActiveTimeout time.Duration, chanActiveTimeout time.Duration,
outgoingCltvRejectDelta uint32) ( outgoingCltvRejectDelta uint32,
errBuffer *queue.CircularBuffer) (
*peer, error) { *peer, error) {
nodePub := addr.IdentityKey nodePub := addr.IdentityKey
@ -276,6 +298,8 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
chanActiveTimeout: chanActiveTimeout, chanActiveTimeout: chanActiveTimeout,
errorBuffer: errBuffer,
writePool: server.writePool, writePool: server.writePool,
readPool: server.readPool, readPool: server.readPool,
@ -338,6 +362,7 @@ func (p *peer) Start() error {
msg := <-msgChan msg := <-msgChan
if msg, ok := msg.(*lnwire.Init); ok { if msg, ok := msg.(*lnwire.Init); ok {
if err := p.handleInitMsg(msg); err != nil { if err := p.handleInitMsg(msg); err != nil {
p.storeError(err)
return err return err
} }
} else { } else {
@ -668,7 +693,10 @@ func (p *peer) Disconnect(reason error) {
return return
} }
peerLog.Infof("Disconnecting %s, reason: %v", p, reason) err := fmt.Errorf("disconnecting %s, reason: %v", p, reason)
p.storeError(err)
peerLog.Infof(err.Error())
// Ensure that the TCP connection is properly closed before continuing. // Ensure that the TCP connection is properly closed before continuing.
p.conn.Close() p.conn.Close()
@ -1026,12 +1054,17 @@ out:
peerLog.Infof("unable to read message from %v: %v", peerLog.Infof("unable to read message from %v: %v",
p, err) p, err)
switch err.(type) { // If we could not read our peer's message due to an
// unknown type or invalid alias, we continue processing
// as normal. We store unknown message and address
// types, as they may provide debugging insight.
switch e := err.(type) {
// If this is just a message we don't yet recognize, // If this is just a message we don't yet recognize,
// we'll continue processing as normal as this allows // we'll continue processing as normal as this allows
// us to introduce new messages in a forwards // us to introduce new messages in a forwards
// compatible manner. // compatible manner.
case *lnwire.UnknownMessage: case *lnwire.UnknownMessage:
p.storeError(e)
idleTimer.Reset(idleTimeout) idleTimer.Reset(idleTimeout)
continue continue
@ -1040,12 +1073,15 @@ out:
// simply continue parsing the remainder of their // simply continue parsing the remainder of their
// messages. // messages.
case *lnwire.ErrUnknownAddrType: case *lnwire.ErrUnknownAddrType:
p.storeError(e)
idleTimer.Reset(idleTimeout) idleTimer.Reset(idleTimeout)
continue continue
// If the NodeAnnouncement has an invalid alias, then // If the NodeAnnouncement has an invalid alias, then
// we'll log that error above and continue so we can // we'll log that error above and continue so we can
// continue to read messges from the peer. // continue to read messages from the peer. We do not
// store this error because it is of little debugging
// value.
case *lnwire.ErrInvalidNodeAlias: case *lnwire.ErrInvalidNodeAlias:
idleTimer.Reset(idleTimeout) idleTimer.Reset(idleTimeout)
continue continue
@ -1141,8 +1177,13 @@ out:
discStream.AddMsg(msg) discStream.AddMsg(msg)
default: default:
peerLog.Errorf("unknown message %v received from peer "+ // If the message we received is unknown to us, store
"%v", uint16(msg.MsgType()), p) // the type to track the failure.
err := fmt.Errorf("unknown message type %v received",
uint16(msg.MsgType()))
p.storeError(err)
peerLog.Errorf("peer: %v, %v", p, err)
} }
if isLinkUpdate { if isLinkUpdate {
@ -1181,13 +1222,39 @@ func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool {
return ok return ok
} }
// storeError stores an error in our peer's buffer of recent errors with the
// current timestamp. Errors are only stored if we have at least one active
// channel with the peer to mitigate dos attack vectors where a peer costlessly
// connects to us and spams us with errors.
func (p *peer) storeError(err error) {
p.activeChanMtx.RLock()
channelCount := len(p.activeChannels)
p.activeChanMtx.RUnlock()
// If we do not have any active channels with the peer, we do not store
// errors as a dos mitigation.
if channelCount == 0 {
peerLog.Tracef("no channels with peer: %v, not storing err", p)
return
}
p.errorBuffer.Add(
&timestampedError{timestamp: time.Now(), error: err},
)
}
// handleError processes an error message read from the remote peer. The boolean // handleError processes an error message read from the remote peer. The boolean
// returns indicates whether the message should be delivered to a targeted peer. // returns indicates whether the message should be delivered to a targeted peer.
// It stores the error we received from the peer in memory if we have a channel
// open with the peer.
// //
// NOTE: This method should only be called from within the readHandler. // NOTE: This method should only be called from within the readHandler.
func (p *peer) handleError(msg *lnwire.Error) bool { func (p *peer) handleError(msg *lnwire.Error) bool {
key := p.addr.IdentityKey key := p.addr.IdentityKey
// Store the error we have received.
p.storeError(msg)
switch { switch {
// In the case of an all-zero channel ID we want to forward the error to // In the case of an all-zero channel ID we want to forward the error to

@ -53,6 +53,7 @@ import (
"github.com/lightningnetwork/lnd/netann" "github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/pool" "github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/localchans" "github.com/lightningnetwork/lnd/routing/localchans"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
@ -176,6 +177,12 @@ type server struct {
persistentConnReqs map[string][]*connmgr.ConnReq persistentConnReqs map[string][]*connmgr.ConnReq
persistentRetryCancels map[string]chan struct{} persistentRetryCancels map[string]chan struct{}
// peerErrors keeps a set of peer error buffers for peers that have
// disconnected from us. This allows us to track historic peer errors
// over connections. The string of the peer's compressed pubkey is used
// as a key for this map.
peerErrors map[string]*queue.CircularBuffer
// ignorePeerTermination tracks peers for which the server has initiated // ignorePeerTermination tracks peers for which the server has initiated
// a disconnect. Adding a peer to this map causes the peer termination // a disconnect. Adding a peer to this map causes the peer termination
// watcher to short circuit in the event that peers are purposefully // watcher to short circuit in the event that peers are purposefully
@ -425,6 +432,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
persistentPeersBackoff: make(map[string]time.Duration), persistentPeersBackoff: make(map[string]time.Duration),
persistentConnReqs: make(map[string][]*connmgr.ConnReq), persistentConnReqs: make(map[string][]*connmgr.ConnReq),
persistentRetryCancels: make(map[string]chan struct{}), persistentRetryCancels: make(map[string]chan struct{}),
peerErrors: make(map[string]*queue.CircularBuffer),
ignorePeerTermination: make(map[*peer]struct{}), ignorePeerTermination: make(map[*peer]struct{}),
scheduledPeerConnection: make(map[string]func()), scheduledPeerConnection: make(map[string]func()),
@ -2782,6 +2790,19 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
initFeatures := s.featureMgr.Get(feature.SetInit) initFeatures := s.featureMgr.Get(feature.SetInit)
legacyFeatures := s.featureMgr.Get(feature.SetLegacyGlobal) legacyFeatures := s.featureMgr.Get(feature.SetLegacyGlobal)
// Lookup past error caches for the peer in the server. If no buffer is
// found, create a fresh buffer.
pkStr := string(peerAddr.IdentityKey.SerializeCompressed())
errBuffer, ok := s.peerErrors[pkStr]
if !ok {
var err error
errBuffer, err = queue.NewCircularBuffer(errorBufferSize)
if err != nil {
srvrLog.Errorf("unable to create peer %v", err)
return
}
}
// Now that we've established a connection, create a peer, and it to the // Now that we've established a connection, create a peer, and it to the
// set of currently active peers. Configure the peer with the incoming // set of currently active peers. Configure the peer with the incoming
// and outgoing broadcast deltas to prevent htlcs from being accepted or // and outgoing broadcast deltas to prevent htlcs from being accepted or
@ -2791,7 +2812,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
p, err := newPeer( p, err := newPeer(
conn, connReq, s, peerAddr, inbound, initFeatures, conn, connReq, s, peerAddr, inbound, initFeatures,
legacyFeatures, cfg.ChanEnableTimeout, legacyFeatures, cfg.ChanEnableTimeout,
defaultOutgoingCltvRejectDelta, defaultOutgoingCltvRejectDelta, errBuffer,
) )
if err != nil { if err != nil {
srvrLog.Errorf("unable to create peer %v", err) srvrLog.Errorf("unable to create peer %v", err)
@ -2803,6 +2824,11 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
s.addPeer(p) s.addPeer(p)
// Once we have successfully added the peer to the server, we can
// delete the previous error buffer from the server's map of error
// buffers.
delete(s.peerErrors, pkStr)
// Dispatch a goroutine to asynchronously start the peer. This process // Dispatch a goroutine to asynchronously start the peer. This process
// includes sending and receiving Init messages, which would be a DOS // includes sending and receiving Init messages, which would be a DOS
// vector if we held the server's mutex throughout the procedure. // vector if we held the server's mutex throughout the procedure.
@ -3097,6 +3123,12 @@ func (s *server) removePeer(p *peer) {
delete(s.outboundPeers, pubStr) delete(s.outboundPeers, pubStr)
} }
// Copy the peer's error buffer across to the server if it has any items
// in it so that we can restore peer errors across connections.
if p.errorBuffer.Total() > 0 {
s.peerErrors[pubStr] = p.errorBuffer
}
// Inform the peer notifier of a peer offline event so that it can be // Inform the peer notifier of a peer offline event so that it can be
// reported to clients listening for peer events. // reported to clients listening for peer events.
var pubKey [33]byte var pubKey [33]byte