From 9be9d6934943765a725cf4b5111d690a18ec0064 Mon Sep 17 00:00:00 2001 From: nsa Date: Mon, 29 Jun 2020 21:29:22 -0400 Subject: [PATCH] multi: initialize peer with peer.Config --- peer.go | 303 ++++++++++++++++++++++---------------------------- rpcserver.go | 12 +- server.go | 88 +++++++++++---- test_utils.go | 93 ++++++---------- 4 files changed, 243 insertions(+), 253 deletions(-) diff --git a/peer.go b/peer.go index 0bd3eff0..4852f56f 100644 --- a/peer.go +++ b/peer.go @@ -29,7 +29,7 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chancloser" "github.com/lightningnetwork/lnd/lnwire" - "github.com/lightningnetwork/lnd/pool" + ppeer "github.com/lightningnetwork/lnd/peer" "github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/ticker" ) @@ -131,13 +131,7 @@ type peer struct { // our last ping message. To be used atomically. pingLastSend int64 - cfg *Config - - connReq *connmgr.ConnReq - conn net.Conn - - addr *lnwire.NetAddress - pubKeyBytes [33]byte + cfg ppeer.Config // activeSignal when closed signals that the peer is now active and // ready to process messages. @@ -147,8 +141,6 @@ type peer struct { // It will be zero for peers that did not successfully call Start(). startTime time.Time - inbound bool - // sendQueue is the channel which is used to queue outgoing to be // written onto the wire. Note that this channel is unbuffered. sendQueue chan outgoingMsg @@ -206,28 +198,6 @@ type peer struct { // well as lnwire.ClosingSigned messages. chanCloseMsgs chan *closeMsg - // chanActiveTimeout specifies the duration the peer will wait to - // request a channel reenable, beginning from the time the peer was - // started. - chanActiveTimeout time.Duration - - server *server - - // features is the set of features that we advertised to the remote - // node. - features *lnwire.FeatureVector - - // legacyFeatures is the set of features that we advertised to the remote - // node for backwards compatibility. Nodes that have not implemented - // flat featurs will still be able to read our feature bits from the - // legacy global field, but we will also advertise everything in the - // default features field. - legacyFeatures *lnwire.FeatureVector - - // outgoingCltvRejectDelta defines the number of blocks before expiry of - // an htlc where we don't offer an htlc anymore. - outgoingCltvRejectDelta uint32 - // remoteFeatures is the feature vector received from the peer during // the connection handshake. remoteFeatures *lnwire.FeatureVector @@ -238,22 +208,6 @@ type peer struct { // peer's chansync message with its own over and over again. resentChanSyncMsg map[lnwire.ChannelID]struct{} - // errorBuffer stores a set of errors related to a peer. It contains - // error messages that our peer has recently sent us over the wire and - // records of unknown messages that were sent to us and, so that we can - // track a full record of the communication errors we have had with our - // peer. If we choose to disconnect from a peer, it also stores the - // reason we had for disconnecting. - errorBuffer *queue.CircularBuffer - - // writePool is the task pool to that manages reuse of write buffers. - // Write tasks are submitted to the pool in order to conserve the total - // number of write buffers allocated at any one time, and decouple write - // buffer allocation from the peer life cycle. - writePool *pool.Write - - readPool *pool.Read - queueQuit chan struct{} quit chan struct{} wg sync.WaitGroup @@ -262,41 +216,14 @@ type peer struct { // A compile-time check to ensure that peer satisfies the lnpeer.Peer interface. var _ lnpeer.Peer = (*peer)(nil) -// newPeer creates a new peer from an establish connection object, and a -// pointer to the main server. It takes an error buffer which may contain errors -// from a previous connection with the peer if we have been connected to them -// before. -func newPeer(cfg *Config, conn net.Conn, connReq *connmgr.ConnReq, server *server, - addr *lnwire.NetAddress, inbound bool, - features, legacyFeatures *lnwire.FeatureVector, - chanActiveTimeout time.Duration, - outgoingCltvRejectDelta uint32, - errBuffer *queue.CircularBuffer) ( - *peer, error) { - - nodePub := addr.IdentityKey +// newPeer creates a new peer from a peer.Config object. +func newPeer(cfg ppeer.Config) *peer { p := &peer{ - conn: conn, - addr: addr, - - cfg: cfg, - - activeSignal: make(chan struct{}), - - inbound: inbound, - connReq: connReq, - - server: server, - - features: features, - legacyFeatures: legacyFeatures, - - outgoingCltvRejectDelta: outgoingCltvRejectDelta, - - sendQueue: make(chan outgoingMsg), - outgoingQueue: make(chan outgoingMsg), - + cfg: cfg, + activeSignal: make(chan struct{}), + sendQueue: make(chan outgoingMsg), + outgoingQueue: make(chan outgoingMsg), addedChannels: make(map[lnwire.ChannelID]struct{}), activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), newChannels: make(chan *newChannelMsg, 1), @@ -308,20 +235,11 @@ func newPeer(cfg *Config, conn net.Conn, connReq *connmgr.ConnReq, server *serve linkFailures: make(chan linkFailureReport), chanCloseMsgs: make(chan *closeMsg), resentChanSyncMsg: make(map[lnwire.ChannelID]struct{}), - - chanActiveTimeout: chanActiveTimeout, - - errorBuffer: errBuffer, - - writePool: server.writePool, - readPool: server.readPool, - - queueQuit: make(chan struct{}), - quit: make(chan struct{}), + queueQuit: make(chan struct{}), + quit: make(chan struct{}), } - copy(p.pubKeyBytes[:], nodePub.SerializeCompressed()) - return p, nil + return p } // Start starts all helper goroutines the peer needs for normal operations. In @@ -385,7 +303,7 @@ func (p *peer) Start() error { // Fetch and then load all the active channels we have with this remote // peer from the database. - activeChans, err := p.server.chanDB.FetchOpenChannels(p.addr.IdentityKey) + activeChans, err := p.cfg.ChannelDB.FetchOpenChannels(p.cfg.Addr.IdentityKey) if err != nil { peerLog.Errorf("unable to fetch active chans "+ "for peer %v: %v", p, err) @@ -393,7 +311,7 @@ func (p *peer) Start() error { } if len(activeChans) == 0 { - p.server.prunePersistentPeerConnection(p.pubKeyBytes) + p.cfg.PrunePersistentPeerConnection(p.cfg.PubKeyBytes) } // Next, load all the active channels we have with this peer, @@ -454,7 +372,7 @@ func (p *peer) initGossipSync() { // we'll create a new gossipSyncer in the AuthenticatedGossiper for it. if p.remoteFeatures.HasFeature(lnwire.GossipQueriesOptional) { srvrLog.Infof("Negotiated chan series queries with %x", - p.pubKeyBytes[:]) + p.cfg.PubKeyBytes[:]) // Register the peer's gossip syncer with the gossiper. // This blocks synchronously to ensure the gossip syncer is @@ -465,7 +383,7 @@ func (p *peer) initGossipSync() { // requires an improved version of the current network // bootstrapper to ensure we can find and connect to non-channel // peers. - p.server.authGossiper.InitSyncState(p) + p.cfg.AuthGossiper.InitSyncState(p) } } @@ -493,7 +411,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( for _, dbChan := range chans { lnChan, err := lnwallet.NewLightningChannel( - p.server.cc.signer, dbChan, p.server.sigPool, + p.cfg.Signer, dbChan, p.cfg.SigPool, ) if err != nil { return nil, err @@ -535,7 +453,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( // Before we register this new link with the HTLC Switch, we'll // need to fetch its current link-layer forwarding policy from // the database. - graph := p.server.chanDB.ChannelGraph() + graph := p.cfg.ChannelDB.ChannelGraph() info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint) if err != nil && err != channeldb.ErrEdgeNotFound { return nil, err @@ -550,7 +468,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( // particular channel. var selfPolicy *channeldb.ChannelEdgePolicy if info != nil && bytes.Equal(info.NodeKey1Bytes[:], - p.server.identityECDH.PubKey().SerializeCompressed()) { + p.cfg.ServerPubKey[:]) { selfPolicy = p1 } else { @@ -573,7 +491,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( peerLog.Warnf("Unable to find our forwarding policy "+ "for channel %v, using default values", chanPoint) - forwardingPolicy = &p.server.cc.routingPolicy + forwardingPolicy = &p.cfg.RoutingPolicy } peerLog.Tracef("Using link policy of: %v", @@ -594,7 +512,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( } // Subscribe to the set of on-chain events for this channel. - chainEvents, err := p.server.chainArb.SubscribeChannelEvents( + chainEvents, err := p.cfg.ChainArb.SubscribeChannelEvents( *chanPoint, ) if err != nil { @@ -641,26 +559,26 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, select { case p.linkFailures <- failure: case <-p.quit: - case <-p.server.quit: + case <-p.cfg.Quit: } } linkCfg := htlcswitch.ChannelLinkConfig{ Peer: p, - DecodeHopIterators: p.server.sphinx.DecodeHopIterators, - ExtractErrorEncrypter: p.server.sphinx.ExtractErrorEncrypter, - FetchLastChannelUpdate: p.server.fetchLastChanUpdate(), + DecodeHopIterators: p.cfg.Sphinx.DecodeHopIterators, + ExtractErrorEncrypter: p.cfg.Sphinx.ExtractErrorEncrypter, + FetchLastChannelUpdate: p.cfg.FetchLastChanUpdate, HodlMask: p.cfg.Hodl.Mask(), - Registry: p.server.invoices, - Switch: p.server.htlcSwitch, - Circuits: p.server.htlcSwitch.CircuitModifier(), - ForwardPackets: p.server.interceptableSwitch.ForwardPackets, + Registry: p.cfg.Invoices, + Switch: p.cfg.Switch, + Circuits: p.cfg.Switch.CircuitModifier(), + ForwardPackets: p.cfg.InterceptSwitch.ForwardPackets, FwrdingPolicy: *forwardingPolicy, - FeeEstimator: p.server.cc.feeEstimator, - PreimageCache: p.server.witnessBeacon, + FeeEstimator: p.cfg.FeeEstimator, + PreimageCache: p.cfg.WitnessBeacon, ChainEvents: chainEvents, UpdateContractSignals: func(signals *contractcourt.ContractSignals) error { - return p.server.chainArb.UpdateContractSignals( + return p.cfg.ChainArb.UpdateContractSignals( *chanPoint, signals, ) }, @@ -673,14 +591,14 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, UnsafeReplay: p.cfg.UnsafeReplay, MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout, MaxFeeUpdateTimeout: htlcswitch.DefaultMaxLinkFeeUpdateTimeout, - OutgoingCltvRejectDelta: p.outgoingCltvRejectDelta, - TowerClient: p.server.towerClient, + OutgoingCltvRejectDelta: p.cfg.OutgoingCltvRejectDelta, + TowerClient: p.cfg.TowerClient, MaxOutgoingCltvExpiry: p.cfg.MaxOutgoingCltvExpiry, MaxFeeAllocation: p.cfg.MaxChannelFeeAllocation, - NotifyActiveLink: p.server.channelNotifier.NotifyActiveLinkEvent, - NotifyActiveChannel: p.server.channelNotifier.NotifyActiveChannelEvent, - NotifyInactiveChannel: p.server.channelNotifier.NotifyInactiveChannelEvent, - HtlcNotifier: p.server.htlcNotifier, + NotifyActiveLink: p.cfg.ChannelNotifier.NotifyActiveLinkEvent, + NotifyActiveChannel: p.cfg.ChannelNotifier.NotifyActiveChannelEvent, + NotifyInactiveChannel: p.cfg.ChannelNotifier.NotifyInactiveChannelEvent, + HtlcNotifier: p.cfg.HtlcNotifier, } link := htlcswitch.NewChannelLink(linkCfg, lnChan) @@ -689,12 +607,12 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, // links going by the same channel id. If one is found, we'll shut it // down to ensure that the mailboxes are only ever under the control of // one link. - p.server.htlcSwitch.RemoveLink(link.ChanID()) + p.cfg.Switch.RemoveLink(link.ChanID()) // With the channel link created, we'll now notify the htlc switch so // this channel can be used to dispatch local payments and also // passively forward payments. - return p.server.htlcSwitch.AddLink(link) + return p.cfg.Switch.AddLink(link) } // maybeSendNodeAnn sends our node announcement to the remote peer if at least @@ -716,7 +634,7 @@ func (p *peer) maybeSendNodeAnn(channels []*channeldb.OpenChannel) { return } - ourNodeAnn, err := p.server.genNodeAnnouncement(false) + ourNodeAnn, err := p.cfg.GenNodeAnnouncement(false) if err != nil { srvrLog.Debugf("Unable to retrieve node announcement: %v", err) return @@ -724,7 +642,7 @@ func (p *peer) maybeSendNodeAnn(channels []*channeldb.OpenChannel) { if err := p.SendMessageLazy(false, &ourNodeAnn); err != nil { srvrLog.Debugf("Unable to resend node announcement to %x: %v", - p.pubKeyBytes, err) + p.cfg.PubKeyBytes, err) } } @@ -759,20 +677,20 @@ func (p *peer) Disconnect(reason error) { peerLog.Infof(err.Error()) // Ensure that the TCP connection is properly closed before continuing. - p.conn.Close() + p.cfg.Conn.Close() close(p.quit) } // String returns the string representation of this peer. func (p *peer) String() string { - return fmt.Sprintf("%x@%s", p.pubKeyBytes, p.conn.RemoteAddr()) + return fmt.Sprintf("%x@%s", p.cfg.PubKeyBytes, p.cfg.Conn.RemoteAddr()) } // readNextMessage reads, and returns the next message on the wire along with // any additional raw payload. func (p *peer) readNextMessage() (lnwire.Message, error) { - noiseConn, ok := p.conn.(*brontide.Conn) + noiseConn, ok := p.cfg.Conn.(*brontide.Conn) if !ok { return nil, fmt.Errorf("brontide.Conn required to read messages") } @@ -792,7 +710,7 @@ func (p *peer) readNextMessage() (lnwire.Message, error) { // is message oriented and allows nodes to pad on additional data to // the message stream. var rawMsg []byte - err = p.readPool.Submit(func(buf *buffer.Read) error { + err = p.cfg.ReadPool.Submit(func(buf *buffer.Read) error { // Before reading the body of the message, set the read timeout // accordingly to ensure we don't block other readers using the // pool. We do so only after the task has been scheduled to @@ -1000,7 +918,7 @@ func waitUntilLinkActive(p *peer, // we will get an ActiveLinkEvent notification and retrieve the link. If // the call to GetLink is before SubscribeChannelEvents, however, there // will be a race condition. - sub, err := p.server.channelNotifier.SubscribeChannelEvents() + sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents() if err != nil { // If we have a non-nil error, then the server is shutting down and we // can exit here and return nil. This means no message will be delivered @@ -1011,7 +929,7 @@ func waitUntilLinkActive(p *peer, // The link may already be active by this point, and we may have missed the // ActiveLinkEvent. Check if the link exists. - link, _ := p.server.htlcSwitch.GetLink(cid) + link, _ := p.cfg.Switch.GetLink(cid) if link != nil { return link } @@ -1041,7 +959,7 @@ func waitUntilLinkActive(p *peer, // The link shouldn't be nil as we received an // ActiveLinkEvent. If it is nil, we return nil and the // calling function should catch it. - link, _ = p.server.htlcSwitch.GetLink(cid) + link, _ = p.cfg.Switch.GetLink(cid) return link case <-p.quit: @@ -1103,7 +1021,7 @@ func newDiscMsgStream(p *peer) *msgStream { "Update stream for gossiper exited", 1000, func(msg lnwire.Message) { - p.server.authGossiper.ProcessRemoteAnnouncement(msg, p) + p.cfg.AuthGossiper.ProcessRemoteAnnouncement(msg, p) }, ) } @@ -1207,15 +1125,15 @@ out: p.queueMsg(lnwire.NewPong(pongBytes), nil) case *lnwire.OpenChannel: - p.server.fundingMgr.processFundingOpen(msg, p) + p.cfg.ProcessFundingOpen(msg, p) case *lnwire.AcceptChannel: - p.server.fundingMgr.processFundingAccept(msg, p) + p.cfg.ProcessFundingAccept(msg, p) case *lnwire.FundingCreated: - p.server.fundingMgr.processFundingCreated(msg, p) + p.cfg.ProcessFundingCreated(msg, p) case *lnwire.FundingSigned: - p.server.fundingMgr.processFundingSigned(msg, p) + p.cfg.ProcessFundingSigned(msg, p) case *lnwire.FundingLocked: - p.server.fundingMgr.processFundingLocked(msg, p) + p.cfg.ProcessFundingLocked(msg, p) case *lnwire.Shutdown: select { @@ -1341,7 +1259,7 @@ func (p *peer) storeError(err error) { return } - p.errorBuffer.Add( + p.cfg.ErrorBuffer.Add( &TimestampedError{Timestamp: time.Now(), Error: err}, ) } @@ -1353,7 +1271,7 @@ func (p *peer) storeError(err error) { // // NOTE: This method should only be called from within the readHandler. func (p *peer) handleError(msg *lnwire.Error) bool { - key := p.addr.IdentityKey + key := p.cfg.Addr.IdentityKey // Store the error we have received. p.storeError(msg) @@ -1370,8 +1288,8 @@ func (p *peer) handleError(msg *lnwire.Error) bool { // If the channel ID for the error message corresponds to a pending // channel, then the funding manager will handle the error. - case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key): - p.server.fundingMgr.processFundingError(msg, key) + case p.cfg.IsPendingChannel(msg.ChanID, key): + p.cfg.ProcessFundingError(msg, key) return false // If not we hand the error to the channel link for this channel. @@ -1593,7 +1511,7 @@ func (p *peer) writeMessage(msg lnwire.Message) error { p.logWireMessage(msg, false) } - noiseConn, ok := p.conn.(*brontide.Conn) + noiseConn, ok := p.cfg.Conn.(*brontide.Conn) if !ok { return fmt.Errorf("brontide.Conn required to write messages") } @@ -1629,7 +1547,7 @@ func (p *peer) writeMessage(msg lnwire.Message) error { // Otherwise, this is a new message. We'll acquire a write buffer to // serialize the message and buffer the ciphertext on the connection. - err := p.writePool.Submit(func(buf *bytes.Buffer) error { + err := p.cfg.WritePool.Submit(func(buf *bytes.Buffer) error { // Using a buffer allocated by the write pool, encode the // message directly into the buffer. _, writeErr := lnwire.WriteMessage(buf, msg, 0) @@ -1902,7 +1820,7 @@ func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot { // genDeliveryScript returns a new script to be used to send our funds to in // the case of a cooperative channel close negotiation. func (p *peer) genDeliveryScript() ([]byte, error) { - deliveryAddr, err := p.server.cc.wallet.NewAddress( + deliveryAddr, err := p.cfg.Wallet.NewAddress( lnwallet.WitnessPubKey, false, ) if err != nil { @@ -1925,7 +1843,7 @@ func (p *peer) channelManager() { // reenableTimeout will fire once after the configured channel status // interval has elapsed. This will trigger us to sign new channel // updates and broadcast them with the "disabled" flag unset. - reenableTimeout := time.After(p.chanActiveTimeout) + reenableTimeout := time.After(p.cfg.ChanActiveTimeout) out: for { @@ -1977,7 +1895,7 @@ out: // set of active channels, so we can look it up later // easily according to its channel ID. lnChan, err := lnwallet.NewLightningChannel( - p.server.cc.signer, newChan, p.server.sigPool, + p.cfg.Signer, newChan, p.cfg.SigPool, ) if err != nil { p.activeChanMtx.Unlock() @@ -2002,7 +1920,7 @@ out: // necessary items it needs to function. // // TODO(roasbeef): panic on below? - chainEvents, err := p.server.chainArb.SubscribeChannelEvents( + chainEvents, err := p.cfg.ChainArb.SubscribeChannelEvents( *chanPoint, ) if err != nil { @@ -2021,7 +1939,7 @@ out: // at initial channel creation. Note that the maximum HTLC value // defaults to the cap on the total value of outstanding HTLCs. fwdMinHtlc := lnChan.FwdMinHtlc() - defaultPolicy := p.server.cc.routingPolicy + defaultPolicy := p.cfg.RoutingPolicy forwardingPolicy := &htlcswitch.ForwardingPolicy{ MinHTLCOut: fwdMinHtlc, MaxHTLC: newChan.LocalChanCfg.MaxPendingAmount, @@ -2151,7 +2069,7 @@ func (p *peer) reenableActiveChannels() { // disabled bit to false and send out a new ChannelUpdate. If this // channel is already active, the update won't be sent. for _, chanPoint := range activePublicChans { - err := p.server.chanStatusMgr.RequestEnable(chanPoint) + err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint) if err != nil { srvrLog.Errorf("Unable to enable channel %v: %v", chanPoint, err) @@ -2208,14 +2126,14 @@ func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) ( // In order to begin fee negotiations, we'll first compute our // target ideal fee-per-kw. We'll set this to a lax value, as // we weren't the ones that initiated the channel closure. - feePerKw, err := p.server.cc.feeEstimator.EstimateFeePerKW(6) + feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW(6) if err != nil { peerLog.Errorf("unable to query fee estimator: %v", err) return nil, fmt.Errorf("unable to estimate fee") } - _, startingHeight, err := p.server.cc.chainIO.GetBestBlock() + _, startingHeight, err := p.cfg.ChainIO.GetBestBlock() if err != nil { peerLog.Errorf("unable to obtain best block: %v", err) return nil, fmt.Errorf("cannot obtain best block") @@ -2224,11 +2142,11 @@ func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) ( chanCloser = chancloser.NewChanCloser( chancloser.ChanCloseCfg{ Channel: channel, - UnregisterChannel: p.server.htlcSwitch.RemoveLink, - BroadcastTx: p.server.cc.wallet.PublishTransaction, - DisableChannel: p.server.chanStatusMgr.RequestDisable, + UnregisterChannel: p.cfg.Switch.RemoveLink, + BroadcastTx: p.cfg.Wallet.PublishTransaction, + DisableChannel: p.cfg.ChanStatusMgr.RequestDisable, Disconnect: func() error { - return p.server.DisconnectPeer(p.IdentityKey()) + return p.cfg.DisconnectPeer(p.IdentityKey()) }, Quit: p.quit, }, @@ -2330,7 +2248,7 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) { // Next, we'll create a new channel closer state machine to // handle the close negotiation. - _, startingHeight, err := p.server.cc.chainIO.GetBestBlock() + _, startingHeight, err := p.cfg.ChainIO.GetBestBlock() if err != nil { peerLog.Errorf(err.Error()) req.Err <- err @@ -2340,11 +2258,11 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) { chanCloser := chancloser.NewChanCloser( chancloser.ChanCloseCfg{ Channel: channel, - UnregisterChannel: p.server.htlcSwitch.RemoveLink, - BroadcastTx: p.server.cc.wallet.PublishTransaction, - DisableChannel: p.server.chanStatusMgr.RequestDisable, + UnregisterChannel: p.cfg.Switch.RemoveLink, + BroadcastTx: p.cfg.Wallet.PublishTransaction, + DisableChannel: p.cfg.ChanStatusMgr.RequestDisable, Disconnect: func() error { - return p.server.DisconnectPeer(p.IdentityKey()) + return p.cfg.DisconnectPeer(p.IdentityKey()) }, Quit: p.quit, }, @@ -2414,7 +2332,7 @@ func (p *peer) handleLinkFailure(failure linkFailureReport) { peerLog.Warnf("Force closing link(%v)", failure.shortChanID) - closeTx, err := p.server.chainArb.ForceCloseContract( + closeTx, err := p.cfg.ChainArb.ForceCloseContract( failure.chanPoint, ) if err != nil { @@ -2463,7 +2381,7 @@ func (p *peer) finalizeChanClosure(chanCloser *chancloser.ChanCloser) { // Next, we'll launch a goroutine which will request to be notified by // the ChainNotifier once the closure transaction obtains a single // confirmation. - notifier := p.server.cc.chainNotifier + notifier := p.cfg.ChainNotifier // If any error happens during waitForChanToClose, forward it to // closeReq. If this channel closure is not locally initiated, closeReq @@ -2557,7 +2475,7 @@ func (p *peer) WipeChannel(chanPoint *wire.OutPoint) { // Instruct the HtlcSwitch to close this link as the channel is no // longer active. - p.server.htlcSwitch.RemoveLink(chanID) + p.cfg.Switch.RemoveLink(chanID) } // handleInitMsg handles the incoming init message which contains global and @@ -2607,7 +2525,7 @@ func (p *peer) handleInitMsg(msg *lnwire.Init) error { // // NOTE: Part of the lnpeer.Peer interface. func (p *peer) LocalFeatures() *lnwire.FeatureVector { - return p.features + return p.cfg.Features } // RemoteFeatures returns the set of global features that has been advertised by @@ -2623,8 +2541,8 @@ func (p *peer) RemoteFeatures() *lnwire.FeatureVector { // currently supported local and global features. func (p *peer) sendInitMsg() error { msg := lnwire.NewInitMessage( - p.legacyFeatures.RawFeatureVector, - p.features.RawFeatureVector, + p.cfg.LegacyFeatures.RawFeatureVector, + p.cfg.Features.RawFeatureVector, ) return p.writeMessage(msg) @@ -2640,7 +2558,7 @@ func (p *peer) resendChanSyncMsg(cid lnwire.ChannelID) error { } // Check if we have any channel sync messages stored for this channel. - c, err := p.server.chanDB.FetchClosedChannelForID(cid) + c, err := p.cfg.ChannelDB.FetchClosedChannelForID(cid) if err != nil { return fmt.Errorf("unable to fetch channel sync messages for "+ "peer %v: %v", p, err) @@ -2730,7 +2648,7 @@ func (p *peer) sendMessage(sync, priority bool, msgs ...lnwire.Message) error { return err case <-p.quit: return lnpeer.ErrPeerExiting - case <-p.server.quit: + case <-p.cfg.Quit: return lnpeer.ErrPeerExiting } } @@ -2742,21 +2660,21 @@ func (p *peer) sendMessage(sync, priority bool, msgs ...lnwire.Message) error { // // NOTE: Part of the lnpeer.Peer interface. func (p *peer) PubKey() [33]byte { - return p.pubKeyBytes + return p.cfg.PubKeyBytes } // IdentityKey returns the public key of the remote peer. // // NOTE: Part of the lnpeer.Peer interface. func (p *peer) IdentityKey() *btcec.PublicKey { - return p.addr.IdentityKey + return p.cfg.Addr.IdentityKey } // Address returns the network address of the remote peer. // // NOTE: Part of the lnpeer.Peer interface. func (p *peer) Address() net.Addr { - return p.addr.Address + return p.cfg.Addr.Address } // AddNewChannel adds a new channel to the peer. The channel should fail to be @@ -2871,6 +2789,51 @@ func (p *peer) HandleLocalCloseChanReqs(req *htlcswitch.ChanClose) { } } +// NetAddress returns the network of the remote peer as an lnwire.NetAddress. +func (p *peer) NetAddress() *lnwire.NetAddress { + return p.cfg.Addr +} + +// Inbound returns cfg.Inbound. +func (p *peer) Inbound() bool { + return p.cfg.Inbound +} + +// ConnReq returns cfg.ConnReq. +func (p *peer) ConnReq() *connmgr.ConnReq { + return p.cfg.ConnReq +} + +// ErrorBuffer returns cfg.ErrorBuffer. +func (p *peer) ErrorBuffer() *queue.CircularBuffer { + return p.cfg.ErrorBuffer +} + +// SetAddress sets the remote peer's address given an address. +func (p *peer) SetAddress(address net.Addr) { + p.cfg.Addr.Address = address +} + +// ActiveSignal returns the peer's active signal. +func (p *peer) ActiveSignal() chan struct{} { + return p.activeSignal +} + +// Conn returns a pointer to the peer's connection struct. +func (p *peer) Conn() net.Conn { + return p.cfg.Conn +} + +// BytesReceived returns the number of bytes received from the peer. +func (p *peer) BytesReceived() uint64 { + return atomic.LoadUint64(&p.bytesReceived) +} + +// BytesSent returns the number of bytes sent to the peer. +func (p *peer) BytesSent() uint64 { + return atomic.LoadUint64(&p.bytesSent) +} + // LinkUpdater is an interface implemented by most messages in BOLT 2 that are // allowed to update the channel state. type LinkUpdater interface { diff --git a/rpcserver.go b/rpcserver.go index 8adef137..3517eb22 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2573,10 +2573,10 @@ func (r *rpcServer) ListPeers(ctx context.Context, peer := &lnrpc.Peer{ PubKey: hex.EncodeToString(nodePub[:]), - Address: serverPeer.conn.RemoteAddr().String(), - Inbound: serverPeer.inbound, - BytesRecv: atomic.LoadUint64(&serverPeer.bytesReceived), - BytesSent: atomic.LoadUint64(&serverPeer.bytesSent), + Address: serverPeer.Conn().RemoteAddr().String(), + Inbound: serverPeer.Inbound(), + BytesRecv: serverPeer.BytesReceived(), + BytesSent: serverPeer.BytesSent(), SatSent: satSent, SatRecv: satRecv, PingTime: serverPeer.PingTime(), @@ -2591,12 +2591,12 @@ func (r *rpcServer) ListPeers(ctx context.Context, // it is non-nil. If we want all the stored errors, simply // add the full list to our set of errors. if in.LatestError { - latestErr := serverPeer.errorBuffer.Latest() + latestErr := serverPeer.ErrorBuffer().Latest() if latestErr != nil { peerErrors = []interface{}{latestErr} } } else { - peerErrors = serverPeer.errorBuffer.List() + peerErrors = serverPeer.ErrorBuffer().List() } // Add the relevant peer errors to our response. diff --git a/server.go b/server.go index e0e1084b..28852df2 100644 --- a/server.go +++ b/server.go @@ -52,6 +52,7 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/nat" "github.com/lightningnetwork/lnd/netann" + ppeer "github.com/lightningnetwork/lnd/peer" "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/pool" "github.com/lightningnetwork/lnd/queue" @@ -2563,7 +2564,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) { // we'll close out the new connection s.t there's only a single // connection between us. localPub := s.identityECDH.PubKey() - if !connectedPeer.inbound && + if !connectedPeer.Inbound() && !shouldDropLocalConnection(localPub, nodePub) { srvrLog.Warnf("Received inbound connection from "+ @@ -2674,7 +2675,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) // we'll close out the new connection s.t there's only a single // connection between us. localPub := s.identityECDH.PubKey() - if connectedPeer.inbound && + if connectedPeer.Inbound() && shouldDropLocalConnection(localPub, nodePub) { srvrLog.Warnf("Established outbound connection to "+ @@ -2797,16 +2798,63 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, // offered that would trigger channel closure. In case of outgoing // htlcs, an extra block is added to prevent the channel from being // closed when the htlc is outstanding and a new block comes in. - p, err := newPeer( - s.cfg, conn, connReq, s, peerAddr, inbound, initFeatures, - legacyFeatures, s.cfg.ChanEnableTimeout, - lncfg.DefaultOutgoingCltvRejectDelta, errBuffer, - ) - if err != nil { - srvrLog.Errorf("unable to create peer %v", err) - return + pCfg := ppeer.Config{ + Conn: conn, + ConnReq: connReq, + Addr: peerAddr, + Inbound: inbound, + Features: initFeatures, + LegacyFeatures: legacyFeatures, + OutgoingCltvRejectDelta: lncfg.DefaultOutgoingCltvRejectDelta, + ChanActiveTimeout: s.cfg.ChanEnableTimeout, + ErrorBuffer: errBuffer, + WritePool: s.writePool, + ReadPool: s.readPool, + Switch: s.htlcSwitch, + InterceptSwitch: s.interceptableSwitch, + ChannelDB: s.chanDB, + ChainArb: s.chainArb, + AuthGossiper: s.authGossiper, + ChanStatusMgr: s.chanStatusMgr, + ChainIO: s.cc.chainIO, + FeeEstimator: s.cc.feeEstimator, + Signer: s.cc.wallet.Cfg.Signer, + SigPool: s.sigPool, + Wallet: s.cc.wallet, + ChainNotifier: s.cc.chainNotifier, + RoutingPolicy: s.cc.routingPolicy, + Sphinx: s.sphinx, + WitnessBeacon: s.witnessBeacon, + Invoices: s.invoices, + ChannelNotifier: s.channelNotifier, + HtlcNotifier: s.htlcNotifier, + TowerClient: s.towerClient, + DisconnectPeer: s.DisconnectPeer, + GenNodeAnnouncement: s.genNodeAnnouncement, + + PrunePersistentPeerConnection: s.prunePersistentPeerConnection, + + FetchLastChanUpdate: s.fetchLastChanUpdate(), + ProcessFundingOpen: s.fundingMgr.processFundingOpen, + ProcessFundingAccept: s.fundingMgr.processFundingAccept, + ProcessFundingCreated: s.fundingMgr.processFundingCreated, + ProcessFundingSigned: s.fundingMgr.processFundingSigned, + ProcessFundingLocked: s.fundingMgr.processFundingLocked, + ProcessFundingError: s.fundingMgr.processFundingError, + IsPendingChannel: s.fundingMgr.IsPendingChannel, + + Hodl: s.cfg.Hodl, + UnsafeReplay: s.cfg.UnsafeReplay, + MaxOutgoingCltvExpiry: s.cfg.MaxOutgoingCltvExpiry, + MaxChannelFeeAllocation: s.cfg.MaxChannelFeeAllocation, + Quit: s.quit, } + copy(pCfg.PubKeyBytes[:], peerAddr.IdentityKey.SerializeCompressed()) + copy(pCfg.ServerPubKey[:], s.identityECDH.PubKey().SerializeCompressed()) + + p := newPeer(pCfg) + // TODO(roasbeef): update IP address for link-node // * also mark last-seen, do it one single transaction? @@ -2842,12 +2890,12 @@ func (s *server) addPeer(p *peer) { // TODO(roasbeef): pipe all requests through to the // queryHandler/peerManager - pubSer := p.addr.IdentityKey.SerializeCompressed() + pubSer := p.NetAddress().IdentityKey.SerializeCompressed() pubStr := string(pubSer) s.peersByPub[pubStr] = p - if p.inbound { + if p.Inbound() { s.inboundPeers[pubStr] = p } else { s.outboundPeers[pubStr] = p @@ -3020,12 +3068,12 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { // within the peer's address for reconnection purposes. // // TODO(roasbeef): use them all? - if p.inbound { + if p.Inbound() { advertisedAddr, err := s.fetchNodeAdvertisedAddr(pubKey) switch { // We found an advertised address, so use it. case err == nil: - p.addr.Address = advertisedAddr + p.SetAddress(advertisedAddr) // The peer doesn't have an advertised address. case err == errNoAdvertisedAddr: @@ -3058,7 +3106,7 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { // Otherwise, we'll launch a new connection request in order to // attempt to maintain a persistent connection with this peer. connReq := &connmgr.ConnReq{ - Addr: p.addr, + Addr: p.NetAddress(), Permanent: true, } s.persistentConnReqs[pubStr] = append( @@ -3113,8 +3161,8 @@ func (s *server) removePeer(p *peer) { p.Disconnect(fmt.Errorf("server: disconnecting peer %v", p)) // If this peer had an active persistent connection request, remove it. - if p.connReq != nil { - s.connMgr.Remove(p.connReq.ID()) + if p.ConnReq() != nil { + s.connMgr.Remove(p.ConnReq().ID()) } // Ignore deleting peers if we're shutting down. @@ -3128,7 +3176,7 @@ func (s *server) removePeer(p *peer) { delete(s.peersByPub, pubStr) - if p.inbound { + if p.Inbound() { delete(s.inboundPeers, pubStr) } else { delete(s.outboundPeers, pubStr) @@ -3136,8 +3184,8 @@ func (s *server) removePeer(p *peer) { // Copy the peer's error buffer across to the server if it has any items // in it so that we can restore peer errors across connections. - if p.errorBuffer.Total() > 0 { - s.peerErrors[pubStr] = p.errorBuffer + if p.ErrorBuffer().Total() > 0 { + s.peerErrors[pubStr] = p.ErrorBuffer() } // Inform the peer notifier of a peer offline event so that it can be diff --git a/test_utils.go b/test_utils.go index d0f9a68d..c9d6f653 100644 --- a/test_utils.go +++ b/test_utils.go @@ -17,16 +17,15 @@ import ( "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" - "github.com/lightningnetwork/lnd/clock" - "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" - "github.com/lightningnetwork/lnd/lnwallet/chancloser" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/netann" + ppeer "github.com/lightningnetwork/lnd/peer" + "github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/shachain" "github.com/lightningnetwork/lnd/ticker" ) @@ -352,37 +351,8 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, publTx chan *wire.MsgTx, publishedTransactions: publTx, }, } - cc := &chainControl{ - feeEstimator: estimator, - chainIO: chainIO, - chainNotifier: notifier, - wallet: wallet, - } - breachArbiter := &breachArbiter{} - - chainArb := contractcourt.NewChainArbitrator( - contractcourt.ChainArbitratorConfig{ - Notifier: notifier, - ChainIO: chainIO, - IsForwardedHTLC: func(chanID lnwire.ShortChannelID, - htlcIndex uint64) bool { - - return true - }, - Clock: clock.NewDefaultClock(), - }, dbAlice, - ) - chainArb.WatchNewChannel(aliceChannelState) - - s := &server{ - chanDB: dbAlice, - cc: cc, - breachArbiter: breachArbiter, - chainArb: chainArb, - } - - _, currentHeight, err := s.cc.chainIO.GetBestBlock() + _, currentHeight, err := chainIO.GetBestBlock() if err != nil { return nil, nil, nil, err } @@ -404,7 +374,6 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, publTx chan *wire.MsgTx, if err = htlcSwitch.Start(); err != nil { return nil, nil, nil, err } - s.htlcSwitch = htlcSwitch nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner) @@ -418,7 +387,7 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, publTx chan *wire.MsgTx, Graph: dbAlice.ChannelGraph(), MessageSigner: nodeSignerAlice, OurPubKey: aliceKeyPub, - IsChannelActive: s.htlcSwitch.HasActiveLink, + IsChannelActive: htlcSwitch.HasActiveLink, ApplyChannelUpdate: func(*lnwire.ChannelUpdate) error { return nil }, }) if err != nil { @@ -427,31 +396,41 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, publTx chan *wire.MsgTx, if err = chanStatusMgr.Start(); err != nil { return nil, nil, nil, err } - s.chanStatusMgr = chanStatusMgr - alicePeer := &peer{ - addr: &lnwire.NetAddress{ - IdentityKey: aliceKeyPub, - Address: aliceAddr, - }, - - server: s, - sendQueue: make(chan outgoingMsg, 1), - outgoingQueue: make(chan outgoingMsg, outgoingQueueLen), - - activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), - newChannels: make(chan *newChannelMsg, 1), - - activeChanCloses: make(map[lnwire.ChannelID]*chancloser.ChanCloser), - localCloseChanReqs: make(chan *htlcswitch.ChanClose), - chanCloseMsgs: make(chan *closeMsg), - - chanActiveTimeout: chanActiveTimeout, - - queueQuit: make(chan struct{}), - quit: make(chan struct{}), + errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize) + if err != nil { + return nil, nil, nil, err } + var pubKey [33]byte + copy(pubKey[:], aliceKeyPub.SerializeCompressed()) + + cfgAddr := &lnwire.NetAddress{ + IdentityKey: aliceKeyPub, + Address: aliceAddr, + ChainNet: wire.SimNet, + } + + pCfg := ppeer.Config{ + Addr: cfgAddr, + PubKeyBytes: pubKey, + ErrorBuffer: errBuffer, + ChainIO: chainIO, + Switch: htlcSwitch, + + ChanActiveTimeout: chanActiveTimeout, + InterceptSwitch: htlcswitch.NewInterceptableSwitch(htlcSwitch), + + ChannelDB: dbAlice, + FeeEstimator: estimator, + Wallet: wallet, + ChainNotifier: notifier, + ChanStatusMgr: chanStatusMgr, + DisconnectPeer: func(b *btcec.PublicKey) error { return nil }, + } + + alicePeer := newPeer(pCfg) + chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint()) alicePeer.activeChannels[chanID] = channelAlice