multi: initialize peer with peer.Config

This commit is contained in:
nsa 2020-06-29 21:29:22 -04:00
parent bf161c9835
commit 9be9d69349
4 changed files with 243 additions and 253 deletions

303
peer.go

@ -29,7 +29,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/pool"
ppeer "github.com/lightningnetwork/lnd/peer"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/ticker"
)
@ -131,13 +131,7 @@ type peer struct {
// our last ping message. To be used atomically.
pingLastSend int64
cfg *Config
connReq *connmgr.ConnReq
conn net.Conn
addr *lnwire.NetAddress
pubKeyBytes [33]byte
cfg ppeer.Config
// activeSignal when closed signals that the peer is now active and
// ready to process messages.
@ -147,8 +141,6 @@ type peer struct {
// It will be zero for peers that did not successfully call Start().
startTime time.Time
inbound bool
// sendQueue is the channel which is used to queue outgoing to be
// written onto the wire. Note that this channel is unbuffered.
sendQueue chan outgoingMsg
@ -206,28 +198,6 @@ type peer struct {
// well as lnwire.ClosingSigned messages.
chanCloseMsgs chan *closeMsg
// chanActiveTimeout specifies the duration the peer will wait to
// request a channel reenable, beginning from the time the peer was
// started.
chanActiveTimeout time.Duration
server *server
// features is the set of features that we advertised to the remote
// node.
features *lnwire.FeatureVector
// legacyFeatures is the set of features that we advertised to the remote
// node for backwards compatibility. Nodes that have not implemented
// flat featurs will still be able to read our feature bits from the
// legacy global field, but we will also advertise everything in the
// default features field.
legacyFeatures *lnwire.FeatureVector
// outgoingCltvRejectDelta defines the number of blocks before expiry of
// an htlc where we don't offer an htlc anymore.
outgoingCltvRejectDelta uint32
// remoteFeatures is the feature vector received from the peer during
// the connection handshake.
remoteFeatures *lnwire.FeatureVector
@ -238,22 +208,6 @@ type peer struct {
// peer's chansync message with its own over and over again.
resentChanSyncMsg map[lnwire.ChannelID]struct{}
// errorBuffer stores a set of errors related to a peer. It contains
// error messages that our peer has recently sent us over the wire and
// records of unknown messages that were sent to us and, so that we can
// track a full record of the communication errors we have had with our
// peer. If we choose to disconnect from a peer, it also stores the
// reason we had for disconnecting.
errorBuffer *queue.CircularBuffer
// writePool is the task pool to that manages reuse of write buffers.
// Write tasks are submitted to the pool in order to conserve the total
// number of write buffers allocated at any one time, and decouple write
// buffer allocation from the peer life cycle.
writePool *pool.Write
readPool *pool.Read
queueQuit chan struct{}
quit chan struct{}
wg sync.WaitGroup
@ -262,41 +216,14 @@ type peer struct {
// A compile-time check to ensure that peer satisfies the lnpeer.Peer interface.
var _ lnpeer.Peer = (*peer)(nil)
// newPeer creates a new peer from an establish connection object, and a
// pointer to the main server. It takes an error buffer which may contain errors
// from a previous connection with the peer if we have been connected to them
// before.
func newPeer(cfg *Config, conn net.Conn, connReq *connmgr.ConnReq, server *server,
addr *lnwire.NetAddress, inbound bool,
features, legacyFeatures *lnwire.FeatureVector,
chanActiveTimeout time.Duration,
outgoingCltvRejectDelta uint32,
errBuffer *queue.CircularBuffer) (
*peer, error) {
nodePub := addr.IdentityKey
// newPeer creates a new peer from a peer.Config object.
func newPeer(cfg ppeer.Config) *peer {
p := &peer{
conn: conn,
addr: addr,
cfg: cfg,
activeSignal: make(chan struct{}),
inbound: inbound,
connReq: connReq,
server: server,
features: features,
legacyFeatures: legacyFeatures,
outgoingCltvRejectDelta: outgoingCltvRejectDelta,
sendQueue: make(chan outgoingMsg),
outgoingQueue: make(chan outgoingMsg),
cfg: cfg,
activeSignal: make(chan struct{}),
sendQueue: make(chan outgoingMsg),
outgoingQueue: make(chan outgoingMsg),
addedChannels: make(map[lnwire.ChannelID]struct{}),
activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel),
newChannels: make(chan *newChannelMsg, 1),
@ -308,20 +235,11 @@ func newPeer(cfg *Config, conn net.Conn, connReq *connmgr.ConnReq, server *serve
linkFailures: make(chan linkFailureReport),
chanCloseMsgs: make(chan *closeMsg),
resentChanSyncMsg: make(map[lnwire.ChannelID]struct{}),
chanActiveTimeout: chanActiveTimeout,
errorBuffer: errBuffer,
writePool: server.writePool,
readPool: server.readPool,
queueQuit: make(chan struct{}),
quit: make(chan struct{}),
queueQuit: make(chan struct{}),
quit: make(chan struct{}),
}
copy(p.pubKeyBytes[:], nodePub.SerializeCompressed())
return p, nil
return p
}
// Start starts all helper goroutines the peer needs for normal operations. In
@ -385,7 +303,7 @@ func (p *peer) Start() error {
// Fetch and then load all the active channels we have with this remote
// peer from the database.
activeChans, err := p.server.chanDB.FetchOpenChannels(p.addr.IdentityKey)
activeChans, err := p.cfg.ChannelDB.FetchOpenChannels(p.cfg.Addr.IdentityKey)
if err != nil {
peerLog.Errorf("unable to fetch active chans "+
"for peer %v: %v", p, err)
@ -393,7 +311,7 @@ func (p *peer) Start() error {
}
if len(activeChans) == 0 {
p.server.prunePersistentPeerConnection(p.pubKeyBytes)
p.cfg.PrunePersistentPeerConnection(p.cfg.PubKeyBytes)
}
// Next, load all the active channels we have with this peer,
@ -454,7 +372,7 @@ func (p *peer) initGossipSync() {
// we'll create a new gossipSyncer in the AuthenticatedGossiper for it.
if p.remoteFeatures.HasFeature(lnwire.GossipQueriesOptional) {
srvrLog.Infof("Negotiated chan series queries with %x",
p.pubKeyBytes[:])
p.cfg.PubKeyBytes[:])
// Register the peer's gossip syncer with the gossiper.
// This blocks synchronously to ensure the gossip syncer is
@ -465,7 +383,7 @@ func (p *peer) initGossipSync() {
// requires an improved version of the current network
// bootstrapper to ensure we can find and connect to non-channel
// peers.
p.server.authGossiper.InitSyncState(p)
p.cfg.AuthGossiper.InitSyncState(p)
}
}
@ -493,7 +411,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) (
for _, dbChan := range chans {
lnChan, err := lnwallet.NewLightningChannel(
p.server.cc.signer, dbChan, p.server.sigPool,
p.cfg.Signer, dbChan, p.cfg.SigPool,
)
if err != nil {
return nil, err
@ -535,7 +453,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) (
// Before we register this new link with the HTLC Switch, we'll
// need to fetch its current link-layer forwarding policy from
// the database.
graph := p.server.chanDB.ChannelGraph()
graph := p.cfg.ChannelDB.ChannelGraph()
info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint)
if err != nil && err != channeldb.ErrEdgeNotFound {
return nil, err
@ -550,7 +468,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) (
// particular channel.
var selfPolicy *channeldb.ChannelEdgePolicy
if info != nil && bytes.Equal(info.NodeKey1Bytes[:],
p.server.identityECDH.PubKey().SerializeCompressed()) {
p.cfg.ServerPubKey[:]) {
selfPolicy = p1
} else {
@ -573,7 +491,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) (
peerLog.Warnf("Unable to find our forwarding policy "+
"for channel %v, using default values",
chanPoint)
forwardingPolicy = &p.server.cc.routingPolicy
forwardingPolicy = &p.cfg.RoutingPolicy
}
peerLog.Tracef("Using link policy of: %v",
@ -594,7 +512,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) (
}
// Subscribe to the set of on-chain events for this channel.
chainEvents, err := p.server.chainArb.SubscribeChannelEvents(
chainEvents, err := p.cfg.ChainArb.SubscribeChannelEvents(
*chanPoint,
)
if err != nil {
@ -641,26 +559,26 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
select {
case p.linkFailures <- failure:
case <-p.quit:
case <-p.server.quit:
case <-p.cfg.Quit:
}
}
linkCfg := htlcswitch.ChannelLinkConfig{
Peer: p,
DecodeHopIterators: p.server.sphinx.DecodeHopIterators,
ExtractErrorEncrypter: p.server.sphinx.ExtractErrorEncrypter,
FetchLastChannelUpdate: p.server.fetchLastChanUpdate(),
DecodeHopIterators: p.cfg.Sphinx.DecodeHopIterators,
ExtractErrorEncrypter: p.cfg.Sphinx.ExtractErrorEncrypter,
FetchLastChannelUpdate: p.cfg.FetchLastChanUpdate,
HodlMask: p.cfg.Hodl.Mask(),
Registry: p.server.invoices,
Switch: p.server.htlcSwitch,
Circuits: p.server.htlcSwitch.CircuitModifier(),
ForwardPackets: p.server.interceptableSwitch.ForwardPackets,
Registry: p.cfg.Invoices,
Switch: p.cfg.Switch,
Circuits: p.cfg.Switch.CircuitModifier(),
ForwardPackets: p.cfg.InterceptSwitch.ForwardPackets,
FwrdingPolicy: *forwardingPolicy,
FeeEstimator: p.server.cc.feeEstimator,
PreimageCache: p.server.witnessBeacon,
FeeEstimator: p.cfg.FeeEstimator,
PreimageCache: p.cfg.WitnessBeacon,
ChainEvents: chainEvents,
UpdateContractSignals: func(signals *contractcourt.ContractSignals) error {
return p.server.chainArb.UpdateContractSignals(
return p.cfg.ChainArb.UpdateContractSignals(
*chanPoint, signals,
)
},
@ -673,14 +591,14 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
UnsafeReplay: p.cfg.UnsafeReplay,
MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout,
MaxFeeUpdateTimeout: htlcswitch.DefaultMaxLinkFeeUpdateTimeout,
OutgoingCltvRejectDelta: p.outgoingCltvRejectDelta,
TowerClient: p.server.towerClient,
OutgoingCltvRejectDelta: p.cfg.OutgoingCltvRejectDelta,
TowerClient: p.cfg.TowerClient,
MaxOutgoingCltvExpiry: p.cfg.MaxOutgoingCltvExpiry,
MaxFeeAllocation: p.cfg.MaxChannelFeeAllocation,
NotifyActiveLink: p.server.channelNotifier.NotifyActiveLinkEvent,
NotifyActiveChannel: p.server.channelNotifier.NotifyActiveChannelEvent,
NotifyInactiveChannel: p.server.channelNotifier.NotifyInactiveChannelEvent,
HtlcNotifier: p.server.htlcNotifier,
NotifyActiveLink: p.cfg.ChannelNotifier.NotifyActiveLinkEvent,
NotifyActiveChannel: p.cfg.ChannelNotifier.NotifyActiveChannelEvent,
NotifyInactiveChannel: p.cfg.ChannelNotifier.NotifyInactiveChannelEvent,
HtlcNotifier: p.cfg.HtlcNotifier,
}
link := htlcswitch.NewChannelLink(linkCfg, lnChan)
@ -689,12 +607,12 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
// links going by the same channel id. If one is found, we'll shut it
// down to ensure that the mailboxes are only ever under the control of
// one link.
p.server.htlcSwitch.RemoveLink(link.ChanID())
p.cfg.Switch.RemoveLink(link.ChanID())
// With the channel link created, we'll now notify the htlc switch so
// this channel can be used to dispatch local payments and also
// passively forward payments.
return p.server.htlcSwitch.AddLink(link)
return p.cfg.Switch.AddLink(link)
}
// maybeSendNodeAnn sends our node announcement to the remote peer if at least
@ -716,7 +634,7 @@ func (p *peer) maybeSendNodeAnn(channels []*channeldb.OpenChannel) {
return
}
ourNodeAnn, err := p.server.genNodeAnnouncement(false)
ourNodeAnn, err := p.cfg.GenNodeAnnouncement(false)
if err != nil {
srvrLog.Debugf("Unable to retrieve node announcement: %v", err)
return
@ -724,7 +642,7 @@ func (p *peer) maybeSendNodeAnn(channels []*channeldb.OpenChannel) {
if err := p.SendMessageLazy(false, &ourNodeAnn); err != nil {
srvrLog.Debugf("Unable to resend node announcement to %x: %v",
p.pubKeyBytes, err)
p.cfg.PubKeyBytes, err)
}
}
@ -759,20 +677,20 @@ func (p *peer) Disconnect(reason error) {
peerLog.Infof(err.Error())
// Ensure that the TCP connection is properly closed before continuing.
p.conn.Close()
p.cfg.Conn.Close()
close(p.quit)
}
// String returns the string representation of this peer.
func (p *peer) String() string {
return fmt.Sprintf("%x@%s", p.pubKeyBytes, p.conn.RemoteAddr())
return fmt.Sprintf("%x@%s", p.cfg.PubKeyBytes, p.cfg.Conn.RemoteAddr())
}
// readNextMessage reads, and returns the next message on the wire along with
// any additional raw payload.
func (p *peer) readNextMessage() (lnwire.Message, error) {
noiseConn, ok := p.conn.(*brontide.Conn)
noiseConn, ok := p.cfg.Conn.(*brontide.Conn)
if !ok {
return nil, fmt.Errorf("brontide.Conn required to read messages")
}
@ -792,7 +710,7 @@ func (p *peer) readNextMessage() (lnwire.Message, error) {
// is message oriented and allows nodes to pad on additional data to
// the message stream.
var rawMsg []byte
err = p.readPool.Submit(func(buf *buffer.Read) error {
err = p.cfg.ReadPool.Submit(func(buf *buffer.Read) error {
// Before reading the body of the message, set the read timeout
// accordingly to ensure we don't block other readers using the
// pool. We do so only after the task has been scheduled to
@ -1000,7 +918,7 @@ func waitUntilLinkActive(p *peer,
// we will get an ActiveLinkEvent notification and retrieve the link. If
// the call to GetLink is before SubscribeChannelEvents, however, there
// will be a race condition.
sub, err := p.server.channelNotifier.SubscribeChannelEvents()
sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents()
if err != nil {
// If we have a non-nil error, then the server is shutting down and we
// can exit here and return nil. This means no message will be delivered
@ -1011,7 +929,7 @@ func waitUntilLinkActive(p *peer,
// The link may already be active by this point, and we may have missed the
// ActiveLinkEvent. Check if the link exists.
link, _ := p.server.htlcSwitch.GetLink(cid)
link, _ := p.cfg.Switch.GetLink(cid)
if link != nil {
return link
}
@ -1041,7 +959,7 @@ func waitUntilLinkActive(p *peer,
// The link shouldn't be nil as we received an
// ActiveLinkEvent. If it is nil, we return nil and the
// calling function should catch it.
link, _ = p.server.htlcSwitch.GetLink(cid)
link, _ = p.cfg.Switch.GetLink(cid)
return link
case <-p.quit:
@ -1103,7 +1021,7 @@ func newDiscMsgStream(p *peer) *msgStream {
"Update stream for gossiper exited",
1000,
func(msg lnwire.Message) {
p.server.authGossiper.ProcessRemoteAnnouncement(msg, p)
p.cfg.AuthGossiper.ProcessRemoteAnnouncement(msg, p)
},
)
}
@ -1207,15 +1125,15 @@ out:
p.queueMsg(lnwire.NewPong(pongBytes), nil)
case *lnwire.OpenChannel:
p.server.fundingMgr.processFundingOpen(msg, p)
p.cfg.ProcessFundingOpen(msg, p)
case *lnwire.AcceptChannel:
p.server.fundingMgr.processFundingAccept(msg, p)
p.cfg.ProcessFundingAccept(msg, p)
case *lnwire.FundingCreated:
p.server.fundingMgr.processFundingCreated(msg, p)
p.cfg.ProcessFundingCreated(msg, p)
case *lnwire.FundingSigned:
p.server.fundingMgr.processFundingSigned(msg, p)
p.cfg.ProcessFundingSigned(msg, p)
case *lnwire.FundingLocked:
p.server.fundingMgr.processFundingLocked(msg, p)
p.cfg.ProcessFundingLocked(msg, p)
case *lnwire.Shutdown:
select {
@ -1341,7 +1259,7 @@ func (p *peer) storeError(err error) {
return
}
p.errorBuffer.Add(
p.cfg.ErrorBuffer.Add(
&TimestampedError{Timestamp: time.Now(), Error: err},
)
}
@ -1353,7 +1271,7 @@ func (p *peer) storeError(err error) {
//
// NOTE: This method should only be called from within the readHandler.
func (p *peer) handleError(msg *lnwire.Error) bool {
key := p.addr.IdentityKey
key := p.cfg.Addr.IdentityKey
// Store the error we have received.
p.storeError(msg)
@ -1370,8 +1288,8 @@ func (p *peer) handleError(msg *lnwire.Error) bool {
// If the channel ID for the error message corresponds to a pending
// channel, then the funding manager will handle the error.
case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key):
p.server.fundingMgr.processFundingError(msg, key)
case p.cfg.IsPendingChannel(msg.ChanID, key):
p.cfg.ProcessFundingError(msg, key)
return false
// If not we hand the error to the channel link for this channel.
@ -1593,7 +1511,7 @@ func (p *peer) writeMessage(msg lnwire.Message) error {
p.logWireMessage(msg, false)
}
noiseConn, ok := p.conn.(*brontide.Conn)
noiseConn, ok := p.cfg.Conn.(*brontide.Conn)
if !ok {
return fmt.Errorf("brontide.Conn required to write messages")
}
@ -1629,7 +1547,7 @@ func (p *peer) writeMessage(msg lnwire.Message) error {
// Otherwise, this is a new message. We'll acquire a write buffer to
// serialize the message and buffer the ciphertext on the connection.
err := p.writePool.Submit(func(buf *bytes.Buffer) error {
err := p.cfg.WritePool.Submit(func(buf *bytes.Buffer) error {
// Using a buffer allocated by the write pool, encode the
// message directly into the buffer.
_, writeErr := lnwire.WriteMessage(buf, msg, 0)
@ -1902,7 +1820,7 @@ func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot {
// genDeliveryScript returns a new script to be used to send our funds to in
// the case of a cooperative channel close negotiation.
func (p *peer) genDeliveryScript() ([]byte, error) {
deliveryAddr, err := p.server.cc.wallet.NewAddress(
deliveryAddr, err := p.cfg.Wallet.NewAddress(
lnwallet.WitnessPubKey, false,
)
if err != nil {
@ -1925,7 +1843,7 @@ func (p *peer) channelManager() {
// reenableTimeout will fire once after the configured channel status
// interval has elapsed. This will trigger us to sign new channel
// updates and broadcast them with the "disabled" flag unset.
reenableTimeout := time.After(p.chanActiveTimeout)
reenableTimeout := time.After(p.cfg.ChanActiveTimeout)
out:
for {
@ -1977,7 +1895,7 @@ out:
// set of active channels, so we can look it up later
// easily according to its channel ID.
lnChan, err := lnwallet.NewLightningChannel(
p.server.cc.signer, newChan, p.server.sigPool,
p.cfg.Signer, newChan, p.cfg.SigPool,
)
if err != nil {
p.activeChanMtx.Unlock()
@ -2002,7 +1920,7 @@ out:
// necessary items it needs to function.
//
// TODO(roasbeef): panic on below?
chainEvents, err := p.server.chainArb.SubscribeChannelEvents(
chainEvents, err := p.cfg.ChainArb.SubscribeChannelEvents(
*chanPoint,
)
if err != nil {
@ -2021,7 +1939,7 @@ out:
// at initial channel creation. Note that the maximum HTLC value
// defaults to the cap on the total value of outstanding HTLCs.
fwdMinHtlc := lnChan.FwdMinHtlc()
defaultPolicy := p.server.cc.routingPolicy
defaultPolicy := p.cfg.RoutingPolicy
forwardingPolicy := &htlcswitch.ForwardingPolicy{
MinHTLCOut: fwdMinHtlc,
MaxHTLC: newChan.LocalChanCfg.MaxPendingAmount,
@ -2151,7 +2069,7 @@ func (p *peer) reenableActiveChannels() {
// disabled bit to false and send out a new ChannelUpdate. If this
// channel is already active, the update won't be sent.
for _, chanPoint := range activePublicChans {
err := p.server.chanStatusMgr.RequestEnable(chanPoint)
err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint)
if err != nil {
srvrLog.Errorf("Unable to enable channel %v: %v",
chanPoint, err)
@ -2208,14 +2126,14 @@ func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) (
// In order to begin fee negotiations, we'll first compute our
// target ideal fee-per-kw. We'll set this to a lax value, as
// we weren't the ones that initiated the channel closure.
feePerKw, err := p.server.cc.feeEstimator.EstimateFeePerKW(6)
feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW(6)
if err != nil {
peerLog.Errorf("unable to query fee estimator: %v", err)
return nil, fmt.Errorf("unable to estimate fee")
}
_, startingHeight, err := p.server.cc.chainIO.GetBestBlock()
_, startingHeight, err := p.cfg.ChainIO.GetBestBlock()
if err != nil {
peerLog.Errorf("unable to obtain best block: %v", err)
return nil, fmt.Errorf("cannot obtain best block")
@ -2224,11 +2142,11 @@ func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) (
chanCloser = chancloser.NewChanCloser(
chancloser.ChanCloseCfg{
Channel: channel,
UnregisterChannel: p.server.htlcSwitch.RemoveLink,
BroadcastTx: p.server.cc.wallet.PublishTransaction,
DisableChannel: p.server.chanStatusMgr.RequestDisable,
UnregisterChannel: p.cfg.Switch.RemoveLink,
BroadcastTx: p.cfg.Wallet.PublishTransaction,
DisableChannel: p.cfg.ChanStatusMgr.RequestDisable,
Disconnect: func() error {
return p.server.DisconnectPeer(p.IdentityKey())
return p.cfg.DisconnectPeer(p.IdentityKey())
},
Quit: p.quit,
},
@ -2330,7 +2248,7 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) {
// Next, we'll create a new channel closer state machine to
// handle the close negotiation.
_, startingHeight, err := p.server.cc.chainIO.GetBestBlock()
_, startingHeight, err := p.cfg.ChainIO.GetBestBlock()
if err != nil {
peerLog.Errorf(err.Error())
req.Err <- err
@ -2340,11 +2258,11 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) {
chanCloser := chancloser.NewChanCloser(
chancloser.ChanCloseCfg{
Channel: channel,
UnregisterChannel: p.server.htlcSwitch.RemoveLink,
BroadcastTx: p.server.cc.wallet.PublishTransaction,
DisableChannel: p.server.chanStatusMgr.RequestDisable,
UnregisterChannel: p.cfg.Switch.RemoveLink,
BroadcastTx: p.cfg.Wallet.PublishTransaction,
DisableChannel: p.cfg.ChanStatusMgr.RequestDisable,
Disconnect: func() error {
return p.server.DisconnectPeer(p.IdentityKey())
return p.cfg.DisconnectPeer(p.IdentityKey())
},
Quit: p.quit,
},
@ -2414,7 +2332,7 @@ func (p *peer) handleLinkFailure(failure linkFailureReport) {
peerLog.Warnf("Force closing link(%v)",
failure.shortChanID)
closeTx, err := p.server.chainArb.ForceCloseContract(
closeTx, err := p.cfg.ChainArb.ForceCloseContract(
failure.chanPoint,
)
if err != nil {
@ -2463,7 +2381,7 @@ func (p *peer) finalizeChanClosure(chanCloser *chancloser.ChanCloser) {
// Next, we'll launch a goroutine which will request to be notified by
// the ChainNotifier once the closure transaction obtains a single
// confirmation.
notifier := p.server.cc.chainNotifier
notifier := p.cfg.ChainNotifier
// If any error happens during waitForChanToClose, forward it to
// closeReq. If this channel closure is not locally initiated, closeReq
@ -2557,7 +2475,7 @@ func (p *peer) WipeChannel(chanPoint *wire.OutPoint) {
// Instruct the HtlcSwitch to close this link as the channel is no
// longer active.
p.server.htlcSwitch.RemoveLink(chanID)
p.cfg.Switch.RemoveLink(chanID)
}
// handleInitMsg handles the incoming init message which contains global and
@ -2607,7 +2525,7 @@ func (p *peer) handleInitMsg(msg *lnwire.Init) error {
//
// NOTE: Part of the lnpeer.Peer interface.
func (p *peer) LocalFeatures() *lnwire.FeatureVector {
return p.features
return p.cfg.Features
}
// RemoteFeatures returns the set of global features that has been advertised by
@ -2623,8 +2541,8 @@ func (p *peer) RemoteFeatures() *lnwire.FeatureVector {
// currently supported local and global features.
func (p *peer) sendInitMsg() error {
msg := lnwire.NewInitMessage(
p.legacyFeatures.RawFeatureVector,
p.features.RawFeatureVector,
p.cfg.LegacyFeatures.RawFeatureVector,
p.cfg.Features.RawFeatureVector,
)
return p.writeMessage(msg)
@ -2640,7 +2558,7 @@ func (p *peer) resendChanSyncMsg(cid lnwire.ChannelID) error {
}
// Check if we have any channel sync messages stored for this channel.
c, err := p.server.chanDB.FetchClosedChannelForID(cid)
c, err := p.cfg.ChannelDB.FetchClosedChannelForID(cid)
if err != nil {
return fmt.Errorf("unable to fetch channel sync messages for "+
"peer %v: %v", p, err)
@ -2730,7 +2648,7 @@ func (p *peer) sendMessage(sync, priority bool, msgs ...lnwire.Message) error {
return err
case <-p.quit:
return lnpeer.ErrPeerExiting
case <-p.server.quit:
case <-p.cfg.Quit:
return lnpeer.ErrPeerExiting
}
}
@ -2742,21 +2660,21 @@ func (p *peer) sendMessage(sync, priority bool, msgs ...lnwire.Message) error {
//
// NOTE: Part of the lnpeer.Peer interface.
func (p *peer) PubKey() [33]byte {
return p.pubKeyBytes
return p.cfg.PubKeyBytes
}
// IdentityKey returns the public key of the remote peer.
//
// NOTE: Part of the lnpeer.Peer interface.
func (p *peer) IdentityKey() *btcec.PublicKey {
return p.addr.IdentityKey
return p.cfg.Addr.IdentityKey
}
// Address returns the network address of the remote peer.
//
// NOTE: Part of the lnpeer.Peer interface.
func (p *peer) Address() net.Addr {
return p.addr.Address
return p.cfg.Addr.Address
}
// AddNewChannel adds a new channel to the peer. The channel should fail to be
@ -2871,6 +2789,51 @@ func (p *peer) HandleLocalCloseChanReqs(req *htlcswitch.ChanClose) {
}
}
// NetAddress returns the network of the remote peer as an lnwire.NetAddress.
func (p *peer) NetAddress() *lnwire.NetAddress {
return p.cfg.Addr
}
// Inbound returns cfg.Inbound.
func (p *peer) Inbound() bool {
return p.cfg.Inbound
}
// ConnReq returns cfg.ConnReq.
func (p *peer) ConnReq() *connmgr.ConnReq {
return p.cfg.ConnReq
}
// ErrorBuffer returns cfg.ErrorBuffer.
func (p *peer) ErrorBuffer() *queue.CircularBuffer {
return p.cfg.ErrorBuffer
}
// SetAddress sets the remote peer's address given an address.
func (p *peer) SetAddress(address net.Addr) {
p.cfg.Addr.Address = address
}
// ActiveSignal returns the peer's active signal.
func (p *peer) ActiveSignal() chan struct{} {
return p.activeSignal
}
// Conn returns a pointer to the peer's connection struct.
func (p *peer) Conn() net.Conn {
return p.cfg.Conn
}
// BytesReceived returns the number of bytes received from the peer.
func (p *peer) BytesReceived() uint64 {
return atomic.LoadUint64(&p.bytesReceived)
}
// BytesSent returns the number of bytes sent to the peer.
func (p *peer) BytesSent() uint64 {
return atomic.LoadUint64(&p.bytesSent)
}
// LinkUpdater is an interface implemented by most messages in BOLT 2 that are
// allowed to update the channel state.
type LinkUpdater interface {

@ -2573,10 +2573,10 @@ func (r *rpcServer) ListPeers(ctx context.Context,
peer := &lnrpc.Peer{
PubKey: hex.EncodeToString(nodePub[:]),
Address: serverPeer.conn.RemoteAddr().String(),
Inbound: serverPeer.inbound,
BytesRecv: atomic.LoadUint64(&serverPeer.bytesReceived),
BytesSent: atomic.LoadUint64(&serverPeer.bytesSent),
Address: serverPeer.Conn().RemoteAddr().String(),
Inbound: serverPeer.Inbound(),
BytesRecv: serverPeer.BytesReceived(),
BytesSent: serverPeer.BytesSent(),
SatSent: satSent,
SatRecv: satRecv,
PingTime: serverPeer.PingTime(),
@ -2591,12 +2591,12 @@ func (r *rpcServer) ListPeers(ctx context.Context,
// it is non-nil. If we want all the stored errors, simply
// add the full list to our set of errors.
if in.LatestError {
latestErr := serverPeer.errorBuffer.Latest()
latestErr := serverPeer.ErrorBuffer().Latest()
if latestErr != nil {
peerErrors = []interface{}{latestErr}
}
} else {
peerErrors = serverPeer.errorBuffer.List()
peerErrors = serverPeer.ErrorBuffer().List()
}
// Add the relevant peer errors to our response.

@ -52,6 +52,7 @@ import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/nat"
"github.com/lightningnetwork/lnd/netann"
ppeer "github.com/lightningnetwork/lnd/peer"
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/queue"
@ -2563,7 +2564,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
// we'll close out the new connection s.t there's only a single
// connection between us.
localPub := s.identityECDH.PubKey()
if !connectedPeer.inbound &&
if !connectedPeer.Inbound() &&
!shouldDropLocalConnection(localPub, nodePub) {
srvrLog.Warnf("Received inbound connection from "+
@ -2674,7 +2675,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
// we'll close out the new connection s.t there's only a single
// connection between us.
localPub := s.identityECDH.PubKey()
if connectedPeer.inbound &&
if connectedPeer.Inbound() &&
shouldDropLocalConnection(localPub, nodePub) {
srvrLog.Warnf("Established outbound connection to "+
@ -2797,16 +2798,63 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
// offered that would trigger channel closure. In case of outgoing
// htlcs, an extra block is added to prevent the channel from being
// closed when the htlc is outstanding and a new block comes in.
p, err := newPeer(
s.cfg, conn, connReq, s, peerAddr, inbound, initFeatures,
legacyFeatures, s.cfg.ChanEnableTimeout,
lncfg.DefaultOutgoingCltvRejectDelta, errBuffer,
)
if err != nil {
srvrLog.Errorf("unable to create peer %v", err)
return
pCfg := ppeer.Config{
Conn: conn,
ConnReq: connReq,
Addr: peerAddr,
Inbound: inbound,
Features: initFeatures,
LegacyFeatures: legacyFeatures,
OutgoingCltvRejectDelta: lncfg.DefaultOutgoingCltvRejectDelta,
ChanActiveTimeout: s.cfg.ChanEnableTimeout,
ErrorBuffer: errBuffer,
WritePool: s.writePool,
ReadPool: s.readPool,
Switch: s.htlcSwitch,
InterceptSwitch: s.interceptableSwitch,
ChannelDB: s.chanDB,
ChainArb: s.chainArb,
AuthGossiper: s.authGossiper,
ChanStatusMgr: s.chanStatusMgr,
ChainIO: s.cc.chainIO,
FeeEstimator: s.cc.feeEstimator,
Signer: s.cc.wallet.Cfg.Signer,
SigPool: s.sigPool,
Wallet: s.cc.wallet,
ChainNotifier: s.cc.chainNotifier,
RoutingPolicy: s.cc.routingPolicy,
Sphinx: s.sphinx,
WitnessBeacon: s.witnessBeacon,
Invoices: s.invoices,
ChannelNotifier: s.channelNotifier,
HtlcNotifier: s.htlcNotifier,
TowerClient: s.towerClient,
DisconnectPeer: s.DisconnectPeer,
GenNodeAnnouncement: s.genNodeAnnouncement,
PrunePersistentPeerConnection: s.prunePersistentPeerConnection,
FetchLastChanUpdate: s.fetchLastChanUpdate(),
ProcessFundingOpen: s.fundingMgr.processFundingOpen,
ProcessFundingAccept: s.fundingMgr.processFundingAccept,
ProcessFundingCreated: s.fundingMgr.processFundingCreated,
ProcessFundingSigned: s.fundingMgr.processFundingSigned,
ProcessFundingLocked: s.fundingMgr.processFundingLocked,
ProcessFundingError: s.fundingMgr.processFundingError,
IsPendingChannel: s.fundingMgr.IsPendingChannel,
Hodl: s.cfg.Hodl,
UnsafeReplay: s.cfg.UnsafeReplay,
MaxOutgoingCltvExpiry: s.cfg.MaxOutgoingCltvExpiry,
MaxChannelFeeAllocation: s.cfg.MaxChannelFeeAllocation,
Quit: s.quit,
}
copy(pCfg.PubKeyBytes[:], peerAddr.IdentityKey.SerializeCompressed())
copy(pCfg.ServerPubKey[:], s.identityECDH.PubKey().SerializeCompressed())
p := newPeer(pCfg)
// TODO(roasbeef): update IP address for link-node
// * also mark last-seen, do it one single transaction?
@ -2842,12 +2890,12 @@ func (s *server) addPeer(p *peer) {
// TODO(roasbeef): pipe all requests through to the
// queryHandler/peerManager
pubSer := p.addr.IdentityKey.SerializeCompressed()
pubSer := p.NetAddress().IdentityKey.SerializeCompressed()
pubStr := string(pubSer)
s.peersByPub[pubStr] = p
if p.inbound {
if p.Inbound() {
s.inboundPeers[pubStr] = p
} else {
s.outboundPeers[pubStr] = p
@ -3020,12 +3068,12 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
// within the peer's address for reconnection purposes.
//
// TODO(roasbeef): use them all?
if p.inbound {
if p.Inbound() {
advertisedAddr, err := s.fetchNodeAdvertisedAddr(pubKey)
switch {
// We found an advertised address, so use it.
case err == nil:
p.addr.Address = advertisedAddr
p.SetAddress(advertisedAddr)
// The peer doesn't have an advertised address.
case err == errNoAdvertisedAddr:
@ -3058,7 +3106,7 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
// Otherwise, we'll launch a new connection request in order to
// attempt to maintain a persistent connection with this peer.
connReq := &connmgr.ConnReq{
Addr: p.addr,
Addr: p.NetAddress(),
Permanent: true,
}
s.persistentConnReqs[pubStr] = append(
@ -3113,8 +3161,8 @@ func (s *server) removePeer(p *peer) {
p.Disconnect(fmt.Errorf("server: disconnecting peer %v", p))
// If this peer had an active persistent connection request, remove it.
if p.connReq != nil {
s.connMgr.Remove(p.connReq.ID())
if p.ConnReq() != nil {
s.connMgr.Remove(p.ConnReq().ID())
}
// Ignore deleting peers if we're shutting down.
@ -3128,7 +3176,7 @@ func (s *server) removePeer(p *peer) {
delete(s.peersByPub, pubStr)
if p.inbound {
if p.Inbound() {
delete(s.inboundPeers, pubStr)
} else {
delete(s.outboundPeers, pubStr)
@ -3136,8 +3184,8 @@ func (s *server) removePeer(p *peer) {
// Copy the peer's error buffer across to the server if it has any items
// in it so that we can restore peer errors across connections.
if p.errorBuffer.Total() > 0 {
s.peerErrors[pubStr] = p.errorBuffer
if p.ErrorBuffer().Total() > 0 {
s.peerErrors[pubStr] = p.ErrorBuffer()
}
// Inform the peer notifier of a peer offline event so that it can be

@ -17,16 +17,15 @@ import (
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann"
ppeer "github.com/lightningnetwork/lnd/peer"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/shachain"
"github.com/lightningnetwork/lnd/ticker"
)
@ -352,37 +351,8 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, publTx chan *wire.MsgTx,
publishedTransactions: publTx,
},
}
cc := &chainControl{
feeEstimator: estimator,
chainIO: chainIO,
chainNotifier: notifier,
wallet: wallet,
}
breachArbiter := &breachArbiter{}
chainArb := contractcourt.NewChainArbitrator(
contractcourt.ChainArbitratorConfig{
Notifier: notifier,
ChainIO: chainIO,
IsForwardedHTLC: func(chanID lnwire.ShortChannelID,
htlcIndex uint64) bool {
return true
},
Clock: clock.NewDefaultClock(),
}, dbAlice,
)
chainArb.WatchNewChannel(aliceChannelState)
s := &server{
chanDB: dbAlice,
cc: cc,
breachArbiter: breachArbiter,
chainArb: chainArb,
}
_, currentHeight, err := s.cc.chainIO.GetBestBlock()
_, currentHeight, err := chainIO.GetBestBlock()
if err != nil {
return nil, nil, nil, err
}
@ -404,7 +374,6 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, publTx chan *wire.MsgTx,
if err = htlcSwitch.Start(); err != nil {
return nil, nil, nil, err
}
s.htlcSwitch = htlcSwitch
nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner)
@ -418,7 +387,7 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, publTx chan *wire.MsgTx,
Graph: dbAlice.ChannelGraph(),
MessageSigner: nodeSignerAlice,
OurPubKey: aliceKeyPub,
IsChannelActive: s.htlcSwitch.HasActiveLink,
IsChannelActive: htlcSwitch.HasActiveLink,
ApplyChannelUpdate: func(*lnwire.ChannelUpdate) error { return nil },
})
if err != nil {
@ -427,31 +396,41 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, publTx chan *wire.MsgTx,
if err = chanStatusMgr.Start(); err != nil {
return nil, nil, nil, err
}
s.chanStatusMgr = chanStatusMgr
alicePeer := &peer{
addr: &lnwire.NetAddress{
IdentityKey: aliceKeyPub,
Address: aliceAddr,
},
server: s,
sendQueue: make(chan outgoingMsg, 1),
outgoingQueue: make(chan outgoingMsg, outgoingQueueLen),
activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel),
newChannels: make(chan *newChannelMsg, 1),
activeChanCloses: make(map[lnwire.ChannelID]*chancloser.ChanCloser),
localCloseChanReqs: make(chan *htlcswitch.ChanClose),
chanCloseMsgs: make(chan *closeMsg),
chanActiveTimeout: chanActiveTimeout,
queueQuit: make(chan struct{}),
quit: make(chan struct{}),
errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize)
if err != nil {
return nil, nil, nil, err
}
var pubKey [33]byte
copy(pubKey[:], aliceKeyPub.SerializeCompressed())
cfgAddr := &lnwire.NetAddress{
IdentityKey: aliceKeyPub,
Address: aliceAddr,
ChainNet: wire.SimNet,
}
pCfg := ppeer.Config{
Addr: cfgAddr,
PubKeyBytes: pubKey,
ErrorBuffer: errBuffer,
ChainIO: chainIO,
Switch: htlcSwitch,
ChanActiveTimeout: chanActiveTimeout,
InterceptSwitch: htlcswitch.NewInterceptableSwitch(htlcSwitch),
ChannelDB: dbAlice,
FeeEstimator: estimator,
Wallet: wallet,
ChainNotifier: notifier,
ChanStatusMgr: chanStatusMgr,
DisconnectPeer: func(b *btcec.PublicKey) error { return nil },
}
alicePeer := newPeer(pCfg)
chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
alicePeer.activeChannels[chanID] = channelAlice