Merge pull request #982 from cfromknecht/server-conns
server: cleanup persistent connection retries
This commit is contained in:
commit
ffb1b65eca
@ -299,8 +299,9 @@ type fundingConfig struct {
|
|||||||
// WatchNewChannel is to be called once a new channel enters the final
|
// WatchNewChannel is to be called once a new channel enters the final
|
||||||
// funding stage: waiting for on-chain confirmation. This method sends
|
// funding stage: waiting for on-chain confirmation. This method sends
|
||||||
// the channel to the ChainArbitrator so it can watch for any on-chain
|
// the channel to the ChainArbitrator so it can watch for any on-chain
|
||||||
// events related to the channel.
|
// events related to the channel. We also provide the address of the
|
||||||
WatchNewChannel func(*channeldb.OpenChannel) error
|
// node we're establishing a channel with for reconnection purposes.
|
||||||
|
WatchNewChannel func(*channeldb.OpenChannel, *lnwire.NetAddress) error
|
||||||
|
|
||||||
// ReportShortChanID allows the funding manager to report the newly
|
// ReportShortChanID allows the funding manager to report the newly
|
||||||
// discovered short channel ID of a formerly pending channel to outside
|
// discovered short channel ID of a formerly pending channel to outside
|
||||||
@ -942,7 +943,6 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(roasbeef): error if funding flow already ongoing
|
|
||||||
fndgLog.Infof("Recv'd fundingRequest(amt=%v, push=%v, delay=%v, "+
|
fndgLog.Infof("Recv'd fundingRequest(amt=%v, push=%v, delay=%v, "+
|
||||||
"pendingId=%x) from peer(%x)", amt, msg.PushAmount,
|
"pendingId=%x) from peer(%x)", amt, msg.PushAmount,
|
||||||
msg.CsvDelay, msg.PendingChannelID,
|
msg.CsvDelay, msg.PendingChannelID,
|
||||||
@ -953,9 +953,6 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
|
|||||||
// reservation attempt may be rejected. Note that since we're on the
|
// reservation attempt may be rejected. Note that since we're on the
|
||||||
// responding side of a single funder workflow, we don't commit any
|
// responding side of a single funder workflow, we don't commit any
|
||||||
// funds to the channel ourselves.
|
// funds to the channel ourselves.
|
||||||
//
|
|
||||||
// TODO(roasbeef): assuming this was an inbound connection, replace
|
|
||||||
// port with default advertised port
|
|
||||||
chainHash := chainhash.Hash(msg.ChainHash)
|
chainHash := chainhash.Hash(msg.ChainHash)
|
||||||
reservation, err := f.cfg.Wallet.InitChannelReservation(
|
reservation, err := f.cfg.Wallet.InitChannelReservation(
|
||||||
amt, 0, msg.PushAmount,
|
amt, 0, msg.PushAmount,
|
||||||
@ -1356,7 +1353,8 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
|
|||||||
// Now that we've sent over our final signature for this channel, we'll
|
// Now that we've sent over our final signature for this channel, we'll
|
||||||
// send it to the ChainArbitrator so it can watch for any on-chain
|
// send it to the ChainArbitrator so it can watch for any on-chain
|
||||||
// actions during this final confirmation stage.
|
// actions during this final confirmation stage.
|
||||||
if err := f.cfg.WatchNewChannel(completeChan); err != nil {
|
peerAddr := resCtx.peerAddress
|
||||||
|
if err := f.cfg.WatchNewChannel(completeChan, peerAddr); err != nil {
|
||||||
fndgLog.Errorf("Unable to send new ChannelPoint(%v) for "+
|
fndgLog.Errorf("Unable to send new ChannelPoint(%v) for "+
|
||||||
"arbitration: %v", fundingOut, err)
|
"arbitration: %v", fundingOut, err)
|
||||||
}
|
}
|
||||||
@ -1504,7 +1502,8 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
|
|||||||
// we'll send the to be active channel to the ChainArbitrator so it can
|
// we'll send the to be active channel to the ChainArbitrator so it can
|
||||||
// watch for any on-chin actions before the channel has fully
|
// watch for any on-chin actions before the channel has fully
|
||||||
// confirmed.
|
// confirmed.
|
||||||
if err := f.cfg.WatchNewChannel(completeChan); err != nil {
|
peerAddr := resCtx.peerAddress
|
||||||
|
if err := f.cfg.WatchNewChannel(completeChan, peerAddr); err != nil {
|
||||||
fndgLog.Errorf("Unable to send new ChannelPoint(%v) for "+
|
fndgLog.Errorf("Unable to send new ChannelPoint(%v) for "+
|
||||||
"arbitration: %v", fundingPoint, err)
|
"arbitration: %v", fundingPoint, err)
|
||||||
}
|
}
|
||||||
|
@ -300,7 +300,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
|
|||||||
return uint16(lnwallet.MaxHTLCNumber / 2)
|
return uint16(lnwallet.MaxHTLCNumber / 2)
|
||||||
},
|
},
|
||||||
ArbiterChan: arbiterChan,
|
ArbiterChan: arbiterChan,
|
||||||
WatchNewChannel: func(*channeldb.OpenChannel) error {
|
WatchNewChannel: func(*channeldb.OpenChannel, *lnwire.NetAddress) error {
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
ReportShortChanID: func(wire.OutPoint, lnwire.ShortChannelID) error {
|
ReportShortChanID: func(wire.OutPoint, lnwire.ShortChannelID) error {
|
||||||
|
15
lnd.go
15
lnd.go
@ -430,7 +430,20 @@ func lndMain() error {
|
|||||||
}
|
}
|
||||||
return delay
|
return delay
|
||||||
},
|
},
|
||||||
WatchNewChannel: server.chainArb.WatchNewChannel,
|
WatchNewChannel: func(channel *channeldb.OpenChannel,
|
||||||
|
addr *lnwire.NetAddress) error {
|
||||||
|
|
||||||
|
// First, we'll mark this new peer as a persistent peer
|
||||||
|
// for re-connection purposes.
|
||||||
|
server.mu.Lock()
|
||||||
|
pubStr := string(addr.IdentityKey.SerializeCompressed())
|
||||||
|
server.persistentPeers[pubStr] = struct{}{}
|
||||||
|
server.mu.Unlock()
|
||||||
|
|
||||||
|
// With that taken care of, we'll send this channel to
|
||||||
|
// the chain arb so it can react to on-chain events.
|
||||||
|
return server.chainArb.WatchNewChannel(channel)
|
||||||
|
},
|
||||||
ReportShortChanID: func(chanPoint wire.OutPoint,
|
ReportShortChanID: func(chanPoint wire.OutPoint,
|
||||||
sid lnwire.ShortChannelID) error {
|
sid lnwire.ShortChannelID) error {
|
||||||
|
|
||||||
|
49
lnd_test.go
49
lnd_test.go
@ -1147,24 +1147,8 @@ func testChannelFundingPersistence(net *lntest.NetworkHarness, t *harnessTest) {
|
|||||||
|
|
||||||
// The following block ensures that after both nodes have restarted,
|
// The following block ensures that after both nodes have restarted,
|
||||||
// they have reconnected before the execution of the next test.
|
// they have reconnected before the execution of the next test.
|
||||||
peersTimeout := time.After(15 * time.Second)
|
if err := net.EnsureConnected(ctxb, net.Alice, carol); err != nil {
|
||||||
checkPeersTick := time.NewTicker(100 * time.Millisecond)
|
t.Fatalf("peers unable to reconnect after restart: %v", err)
|
||||||
defer checkPeersTick.Stop()
|
|
||||||
peersPoll:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-peersTimeout:
|
|
||||||
t.Fatalf("peers unable to reconnect after restart")
|
|
||||||
case <-checkPeersTick.C:
|
|
||||||
peers, err := carol.ListPeers(ctxb,
|
|
||||||
&lnrpc.ListPeersRequest{})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("ListPeers error: %v\n", err)
|
|
||||||
}
|
|
||||||
if len(peers.Peers) > 0 {
|
|
||||||
break peersPoll
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Next, mine enough blocks s.t the channel will open with a single
|
// Next, mine enough blocks s.t the channel will open with a single
|
||||||
@ -1251,6 +1235,11 @@ func testChannelBalance(net *lntest.NetworkHarness, t *harnessTest) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Before beginning, make sure alice and bob are connected.
|
||||||
|
if err := net.EnsureConnected(ctx, net.Alice, net.Bob); err != nil {
|
||||||
|
t.Fatalf("unable to connect alice and bob: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
chanPoint := openChannelAndAssert(ctx, t, net, net.Alice, net.Bob,
|
chanPoint := openChannelAndAssert(ctx, t, net, net.Alice, net.Bob,
|
||||||
amount, 0)
|
amount, 0)
|
||||||
|
|
||||||
@ -4610,7 +4599,7 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest)
|
|||||||
// and Carol. Note that we will also receive a node announcement from
|
// and Carol. Note that we will also receive a node announcement from
|
||||||
// Bob, since a node will update its node announcement after a new
|
// Bob, since a node will update its node announcement after a new
|
||||||
// channel is opened.
|
// channel is opened.
|
||||||
if err := net.ConnectNodes(ctxb, net.Alice, net.Bob); err != nil {
|
if err := net.EnsureConnected(ctxb, net.Alice, net.Bob); err != nil {
|
||||||
t.Fatalf("unable to connect alice to bob: %v", err)
|
t.Fatalf("unable to connect alice to bob: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -7172,7 +7161,7 @@ func testSwitchOfflineDelivery(net *lntest.NetworkHarness, t *harnessTest) {
|
|||||||
// Now that the settles have reached Dave, reconnect him with Alice,
|
// Now that the settles have reached Dave, reconnect him with Alice,
|
||||||
// allowing the settles to return to the sender.
|
// allowing the settles to return to the sender.
|
||||||
ctxt, _ = context.WithTimeout(ctxb, timeout)
|
ctxt, _ = context.WithTimeout(ctxb, timeout)
|
||||||
if err := net.ConnectNodes(ctxt, dave, net.Alice); err != nil {
|
if err := net.EnsureConnected(ctxt, dave, net.Alice); err != nil {
|
||||||
t.Fatalf("unable to reconnect alice to dave: %v", err)
|
t.Fatalf("unable to reconnect alice to dave: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -7481,8 +7470,16 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness
|
|||||||
t.Fatalf("unable to reconnect alice to dave: %v", err)
|
t.Fatalf("unable to reconnect alice to dave: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// After Dave reconnects, the settles should be propagated all the way
|
// Force Dave and Alice to reconnect before waiting for the htlcs to
|
||||||
// back to the sender. All nodes should report no active htlcs.
|
// clear.
|
||||||
|
ctxt, _ = context.WithTimeout(ctxb, timeout)
|
||||||
|
err = net.EnsureConnected(ctxt, dave, net.Alice)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to reconnect dave and carol: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// After reconnection succeeds, the settles should be propagated all the
|
||||||
|
// way back to the sender. All nodes should report no active htlcs.
|
||||||
err = lntest.WaitPredicate(func() bool {
|
err = lntest.WaitPredicate(func() bool {
|
||||||
return assertNumActiveHtlcs(nodes, 0)
|
return assertNumActiveHtlcs(nodes, 0)
|
||||||
}, time.Second*15)
|
}, time.Second*15)
|
||||||
@ -7528,6 +7525,14 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness
|
|||||||
|
|
||||||
payReqs = []string{resp.PaymentRequest}
|
payReqs = []string{resp.PaymentRequest}
|
||||||
|
|
||||||
|
// Before completing the final payment request, ensure that the
|
||||||
|
// connection between Dave and Carol has been healed.
|
||||||
|
ctxt, _ = context.WithTimeout(ctxb, timeout)
|
||||||
|
err = net.EnsureConnected(ctxt, dave, carol)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to reconnect dave and carol: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Using Carol as the source, pay to the 5 invoices from Bob created
|
// Using Carol as the source, pay to the 5 invoices from Bob created
|
||||||
// above.
|
// above.
|
||||||
ctxt, _ = context.WithTimeout(ctxb, timeout)
|
ctxt, _ = context.WithTimeout(ctxb, timeout)
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package lntest
|
package lntest
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"strings"
|
"strings"
|
||||||
@ -271,53 +272,86 @@ func (n *NetworkHarness) NewNode(extraArgs []string) (*HarnessNode, error) {
|
|||||||
// been made, the method will block until the two nodes appear in each other's
|
// been made, the method will block until the two nodes appear in each other's
|
||||||
// peers list, or until the 15s timeout expires.
|
// peers list, or until the 15s timeout expires.
|
||||||
func (n *NetworkHarness) EnsureConnected(ctx context.Context, a, b *HarnessNode) error {
|
func (n *NetworkHarness) EnsureConnected(ctx context.Context, a, b *HarnessNode) error {
|
||||||
bobInfo, err := b.GetInfo(ctx, &lnrpc.GetInfoRequest{})
|
// errConnectionRequested is used to signal that a connection was
|
||||||
|
// requested successfully, which is distinct from already being
|
||||||
|
// connected to the peer.
|
||||||
|
errConnectionRequested := errors.New("connection request in progress")
|
||||||
|
|
||||||
|
tryConnect := func(a, b *HarnessNode) error {
|
||||||
|
ctxt, _ := context.WithTimeout(ctx, 15*time.Second)
|
||||||
|
bInfo, err := b.GetInfo(ctxt, &lnrpc.GetInfoRequest{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
req := &lnrpc.ConnectPeerRequest{
|
req := &lnrpc.ConnectPeerRequest{
|
||||||
Addr: &lnrpc.LightningAddress{
|
Addr: &lnrpc.LightningAddress{
|
||||||
Pubkey: bobInfo.IdentityPubkey,
|
Pubkey: bInfo.IdentityPubkey,
|
||||||
Host: b.cfg.P2PAddr(),
|
Host: b.cfg.P2PAddr(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = a.ConnectPeer(ctx, req)
|
ctxt, _ = context.WithTimeout(ctx, 15*time.Second)
|
||||||
|
_, err = a.ConnectPeer(ctxt, req)
|
||||||
switch {
|
switch {
|
||||||
|
|
||||||
// Request was successful, wait for both to display the connection.
|
// Request was successful, wait for both to display the
|
||||||
|
// connection.
|
||||||
case err == nil:
|
case err == nil:
|
||||||
|
return errConnectionRequested
|
||||||
|
|
||||||
// If we already have pending connection, we will wait until bob appears
|
// If the two are already connected, we return early with no
|
||||||
// in alice's peer list.
|
// error.
|
||||||
case strings.Contains(err.Error(), "connection attempt to ") &&
|
|
||||||
strings.Contains(err.Error(), " is pending"):
|
|
||||||
|
|
||||||
// If the two are already connected, we return early with no error.
|
|
||||||
case strings.Contains(err.Error(), "already connected to peer"):
|
case strings.Contains(err.Error(), "already connected to peer"):
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err = WaitPredicate(func() bool {
|
aErr := tryConnect(a, b)
|
||||||
|
bErr := tryConnect(b, a)
|
||||||
|
switch {
|
||||||
|
case aErr == nil && bErr == nil:
|
||||||
|
// If both reported already being connected to each other, we
|
||||||
|
// can exit early.
|
||||||
|
return nil
|
||||||
|
|
||||||
|
case aErr != errConnectionRequested:
|
||||||
|
// Return any critical errors returned by either alice.
|
||||||
|
return aErr
|
||||||
|
|
||||||
|
case bErr != errConnectionRequested:
|
||||||
|
// Return any critical errors returned by either bob.
|
||||||
|
return bErr
|
||||||
|
|
||||||
|
default:
|
||||||
|
// Otherwise one or both requested a connection, so we wait for
|
||||||
|
// the peers lists to reflect the connection.
|
||||||
|
}
|
||||||
|
|
||||||
|
findSelfInPeerList := func(a, b *HarnessNode) bool {
|
||||||
// If node B is seen in the ListPeers response from node A,
|
// If node B is seen in the ListPeers response from node A,
|
||||||
// then we can exit early as the connection has been fully
|
// then we can exit early as the connection has been fully
|
||||||
// established.
|
// established.
|
||||||
resp, err := a.ListPeers(ctx, &lnrpc.ListPeersRequest{})
|
ctxt, _ := context.WithTimeout(ctx, 15*time.Second)
|
||||||
|
resp, err := b.ListPeers(ctxt, &lnrpc.ListPeersRequest{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, peer := range resp.Peers {
|
for _, peer := range resp.Peers {
|
||||||
if peer.PubKey == b.PubKeyStr {
|
if peer.PubKey == a.PubKeyStr {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
err := WaitPredicate(func() bool {
|
||||||
|
return findSelfInPeerList(a, b) && findSelfInPeerList(b, a)
|
||||||
}, time.Second*15)
|
}, time.Second*15)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("peers not connected within 15 seconds")
|
return fmt.Errorf("peers not connected within 15 seconds")
|
||||||
|
@ -147,6 +147,7 @@ func (cfg nodeConfig) genArgs() []string {
|
|||||||
args = append(args, fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr()))
|
args = append(args, fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr()))
|
||||||
args = append(args, fmt.Sprintf("--restlisten=%v", cfg.RESTAddr()))
|
args = append(args, fmt.Sprintf("--restlisten=%v", cfg.RESTAddr()))
|
||||||
args = append(args, fmt.Sprintf("--listen=%v", cfg.P2PAddr()))
|
args = append(args, fmt.Sprintf("--listen=%v", cfg.P2PAddr()))
|
||||||
|
args = append(args, fmt.Sprintf("--externalip=%v", cfg.P2PAddr()))
|
||||||
args = append(args, fmt.Sprintf("--logdir=%v", cfg.LogDir))
|
args = append(args, fmt.Sprintf("--logdir=%v", cfg.LogDir))
|
||||||
args = append(args, fmt.Sprintf("--datadir=%v", cfg.DataDir))
|
args = append(args, fmt.Sprintf("--datadir=%v", cfg.DataDir))
|
||||||
args = append(args, fmt.Sprintf("--tlscertpath=%v", cfg.TLSCertPath))
|
args = append(args, fmt.Sprintf("--tlscertpath=%v", cfg.TLSCertPath))
|
||||||
|
216
server.go
216
server.go
@ -51,7 +51,7 @@ var (
|
|||||||
|
|
||||||
// maximumBackoff is the largest backoff we will permit when
|
// maximumBackoff is the largest backoff we will permit when
|
||||||
// reattempting connections to persistent peers.
|
// reattempting connections to persistent peers.
|
||||||
maximumBackoff = time.Minute
|
maximumBackoff = time.Hour
|
||||||
)
|
)
|
||||||
|
|
||||||
// server is the main server of the Lightning Network Daemon. The server houses
|
// server is the main server of the Lightning Network Daemon. The server houses
|
||||||
@ -85,6 +85,7 @@ type server struct {
|
|||||||
persistentPeers map[string]struct{}
|
persistentPeers map[string]struct{}
|
||||||
persistentPeersBackoff map[string]time.Duration
|
persistentPeersBackoff map[string]time.Duration
|
||||||
persistentConnReqs map[string][]*connmgr.ConnReq
|
persistentConnReqs map[string][]*connmgr.ConnReq
|
||||||
|
persistentRetryCancels map[string]chan struct{}
|
||||||
|
|
||||||
// ignorePeerTermination tracks peers for which the server has initiated
|
// ignorePeerTermination tracks peers for which the server has initiated
|
||||||
// a disconnect. Adding a peer to this map causes the peer termination
|
// a disconnect. Adding a peer to this map causes the peer termination
|
||||||
@ -179,6 +180,7 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl,
|
|||||||
persistentPeers: make(map[string]struct{}),
|
persistentPeers: make(map[string]struct{}),
|
||||||
persistentPeersBackoff: make(map[string]time.Duration),
|
persistentPeersBackoff: make(map[string]time.Duration),
|
||||||
persistentConnReqs: make(map[string][]*connmgr.ConnReq),
|
persistentConnReqs: make(map[string][]*connmgr.ConnReq),
|
||||||
|
persistentRetryCancels: make(map[string]chan struct{}),
|
||||||
ignorePeerTermination: make(map[*peer]struct{}),
|
ignorePeerTermination: make(map[*peer]struct{}),
|
||||||
|
|
||||||
peersByPub: make(map[string]*peer),
|
peersByPub: make(map[string]*peer),
|
||||||
@ -690,16 +692,6 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add bootstrapped peer as persistent to maintain
|
|
||||||
// connectivity even if we have no open channels.
|
|
||||||
targetPub := string(conn.RemotePub().SerializeCompressed())
|
|
||||||
s.mu.Lock()
|
|
||||||
s.persistentPeers[targetPub] = struct{}{}
|
|
||||||
if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
|
|
||||||
s.persistentPeersBackoff[targetPub] = defaultBackoff
|
|
||||||
}
|
|
||||||
s.mu.Unlock()
|
|
||||||
|
|
||||||
s.OutboundPeerConnected(nil, conn)
|
s.OutboundPeerConnected(nil, conn)
|
||||||
}(addr)
|
}(addr)
|
||||||
}
|
}
|
||||||
@ -805,16 +797,6 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add bootstrapped peer as persistent to maintain
|
|
||||||
// connectivity even if we have no open channels.
|
|
||||||
targetPub := string(conn.RemotePub().SerializeCompressed())
|
|
||||||
s.mu.Lock()
|
|
||||||
s.persistentPeers[targetPub] = struct{}{}
|
|
||||||
if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
|
|
||||||
s.persistentPeersBackoff[targetPub] = defaultBackoff
|
|
||||||
}
|
|
||||||
s.mu.Unlock()
|
|
||||||
|
|
||||||
s.OutboundPeerConnected(nil, conn)
|
s.OutboundPeerConnected(nil, conn)
|
||||||
}(addr)
|
}(addr)
|
||||||
}
|
}
|
||||||
@ -1305,7 +1287,6 @@ func (s *server) peerTerminationWatcher(p *peer) {
|
|||||||
|
|
||||||
// Otherwise, we'll launch a new connection request in order to
|
// Otherwise, we'll launch a new connection request in order to
|
||||||
// attempt to maintain a persistent connection with this peer.
|
// attempt to maintain a persistent connection with this peer.
|
||||||
// TODO(roasbeef): look up latest info for peer in database
|
|
||||||
connReq := &connmgr.ConnReq{
|
connReq := &connmgr.ConnReq{
|
||||||
Addr: p.addr,
|
Addr: p.addr,
|
||||||
Permanent: true,
|
Permanent: true,
|
||||||
@ -1313,20 +1294,29 @@ func (s *server) peerTerminationWatcher(p *peer) {
|
|||||||
s.persistentConnReqs[pubStr] = append(
|
s.persistentConnReqs[pubStr] = append(
|
||||||
s.persistentConnReqs[pubStr], connReq)
|
s.persistentConnReqs[pubStr], connReq)
|
||||||
|
|
||||||
// Compute the subsequent backoff duration.
|
// Record the computed backoff in the backoff map.
|
||||||
currBackoff := s.persistentPeersBackoff[pubStr]
|
backoff := s.nextPeerBackoff(pubStr)
|
||||||
nextBackoff := computeNextBackoff(currBackoff)
|
s.persistentPeersBackoff[pubStr] = backoff
|
||||||
s.persistentPeersBackoff[pubStr] = nextBackoff
|
|
||||||
|
// Initialize a retry canceller for this peer if one does not
|
||||||
|
// exist.
|
||||||
|
cancelChan, ok := s.persistentRetryCancels[pubStr]
|
||||||
|
if !ok {
|
||||||
|
cancelChan = make(chan struct{})
|
||||||
|
s.persistentRetryCancels[pubStr] = cancelChan
|
||||||
|
}
|
||||||
|
|
||||||
// We choose not to wait group this go routine since the Connect
|
// We choose not to wait group this go routine since the Connect
|
||||||
// call can stall for arbitrarily long if we shutdown while an
|
// call can stall for arbitrarily long if we shutdown while an
|
||||||
// outbound connection attempt is being made.
|
// outbound connection attempt is being made.
|
||||||
go func() {
|
go func() {
|
||||||
srvrLog.Debugf("Scheduling connection re-establishment to "+
|
srvrLog.Debugf("Scheduling connection re-establishment to "+
|
||||||
"persistent peer %v in %s", p, nextBackoff)
|
"persistent peer %v in %s", p, backoff)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(nextBackoff):
|
case <-time.After(backoff):
|
||||||
|
case <-cancelChan:
|
||||||
|
return
|
||||||
case <-s.quit:
|
case <-s.quit:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -1339,6 +1329,22 @@ func (s *server) peerTerminationWatcher(p *peer) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nextPeerBackoff computes the next backoff duration for a peer's pubkey using
|
||||||
|
// exponential backoff. If no previous backoff was known, the default is
|
||||||
|
// returned.
|
||||||
|
func (s *server) nextPeerBackoff(pubStr string) time.Duration {
|
||||||
|
// Now, determine the appropriate backoff to use for the retry.
|
||||||
|
backoff, ok := s.persistentPeersBackoff[pubStr]
|
||||||
|
if !ok {
|
||||||
|
// If an existing backoff was unknown, use the default.
|
||||||
|
return defaultBackoff
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, use a previous backoff to compute the
|
||||||
|
// subsequent randomized exponential backoff duration.
|
||||||
|
return computeNextBackoff(backoff)
|
||||||
|
}
|
||||||
|
|
||||||
// shouldRequestGraphSync returns true if the servers deems it necessary that
|
// shouldRequestGraphSync returns true if the servers deems it necessary that
|
||||||
// we sync channel graph state with the remote peer. This method is used to
|
// we sync channel graph state with the remote peer. This method is used to
|
||||||
// avoid _always_ syncing channel graph state with each peer that connects.
|
// avoid _always_ syncing channel graph state with each peer that connects.
|
||||||
@ -1357,9 +1363,19 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
|
|||||||
inbound bool) {
|
inbound bool) {
|
||||||
|
|
||||||
brontideConn := conn.(*brontide.Conn)
|
brontideConn := conn.(*brontide.Conn)
|
||||||
|
addr := conn.RemoteAddr()
|
||||||
|
pubKey := brontideConn.RemotePub()
|
||||||
|
|
||||||
|
// We'll ensure that we locate the proper port to use within the peer's
|
||||||
|
// address for reconnecting purposes.
|
||||||
|
if tcpAddr, ok := addr.(*net.TCPAddr); ok {
|
||||||
|
targetPort := s.fetchNodeAdvertisedPort(pubKey, tcpAddr)
|
||||||
|
tcpAddr.Port = targetPort
|
||||||
|
}
|
||||||
|
|
||||||
peerAddr := &lnwire.NetAddress{
|
peerAddr := &lnwire.NetAddress{
|
||||||
IdentityKey: brontideConn.RemotePub(),
|
IdentityKey: pubKey,
|
||||||
Address: conn.RemoteAddr(),
|
Address: addr,
|
||||||
ChainNet: activeNetParams.Net,
|
ChainNet: activeNetParams.Net,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1437,8 +1453,6 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
|
|||||||
|
|
||||||
srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr())
|
srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr())
|
||||||
|
|
||||||
localPub := s.identityPriv.PubKey()
|
|
||||||
|
|
||||||
// Check to see if we already have a connection with this peer. If so,
|
// Check to see if we already have a connection with this peer. If so,
|
||||||
// we may need to drop our existing connection. This prevents us from
|
// we may need to drop our existing connection. This prevents us from
|
||||||
// having duplicate connections to the same peer. We forgo adding a
|
// having duplicate connections to the same peer. We forgo adding a
|
||||||
@ -1455,6 +1469,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
|
|||||||
// connection we've already established should be kept, then
|
// connection we've already established should be kept, then
|
||||||
// we'll close out this connection s.t there's only a single
|
// we'll close out this connection s.t there's only a single
|
||||||
// connection between us.
|
// connection between us.
|
||||||
|
localPub := s.identityPriv.PubKey()
|
||||||
if !shouldDropLocalConnection(localPub, nodePub) {
|
if !shouldDropLocalConnection(localPub, nodePub) {
|
||||||
srvrLog.Warnf("Received inbound connection from "+
|
srvrLog.Warnf("Received inbound connection from "+
|
||||||
"peer %x, but already connected, dropping conn",
|
"peer %x, but already connected, dropping conn",
|
||||||
@ -1475,15 +1490,9 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
|
|||||||
s.ignorePeerTermination[connectedPeer] = struct{}{}
|
s.ignorePeerTermination[connectedPeer] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Next, check to see if we have any outstanding persistent connection
|
// Lastly, cancel all pending requests. The incoming connection will not
|
||||||
// requests to this peer. If so, then we'll remove all of these
|
// have an associated connection request.
|
||||||
// connection requests, and also delete the entry from the map.
|
s.cancelConnReqs(pubStr, nil)
|
||||||
if connReqs, ok := s.persistentConnReqs[pubStr]; ok {
|
|
||||||
for _, connReq := range connReqs {
|
|
||||||
s.connMgr.Remove(connReq.ID())
|
|
||||||
}
|
|
||||||
delete(s.persistentConnReqs, pubStr)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.peerConnected(conn, nil, false)
|
s.peerConnected(conn, nil, false)
|
||||||
}
|
}
|
||||||
@ -1498,7 +1507,6 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
localPub := s.identityPriv.PubKey()
|
|
||||||
nodePub := conn.(*brontide.Conn).RemotePub()
|
nodePub := conn.(*brontide.Conn).RemotePub()
|
||||||
pubStr := string(nodePub.SerializeCompressed())
|
pubStr := string(nodePub.SerializeCompressed())
|
||||||
|
|
||||||
@ -1509,29 +1517,31 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
|||||||
// this new connection.
|
// this new connection.
|
||||||
if _, ok := s.outboundPeers[pubStr]; ok {
|
if _, ok := s.outboundPeers[pubStr]; ok {
|
||||||
srvrLog.Debugf("Ignoring duplicate outbound connection")
|
srvrLog.Debugf("Ignoring duplicate outbound connection")
|
||||||
|
if connReq != nil {
|
||||||
|
s.connMgr.Remove(connReq.ID())
|
||||||
|
}
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil {
|
if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil {
|
||||||
srvrLog.Debugf("Ignoring cancelled outbound connection")
|
srvrLog.Debugf("Ignoring cancelled outbound connection")
|
||||||
|
s.connMgr.Remove(connReq.ID())
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
srvrLog.Infof("Established connection to: %v", conn.RemoteAddr())
|
srvrLog.Infof("Established connection to: %v", conn.RemoteAddr())
|
||||||
|
|
||||||
// As we've just established an outbound connection to this peer, we'll
|
if connReq != nil {
|
||||||
// cancel all other persistent connection requests and eliminate the
|
// A successful connection was returned by the connmgr.
|
||||||
// entry for this peer from the map.
|
// Immediately cancel all pending requests, excluding the
|
||||||
if connReqs, ok := s.persistentConnReqs[pubStr]; ok {
|
// outbound connection we just established.
|
||||||
for _, pConnReq := range connReqs {
|
ignore := connReq.ID()
|
||||||
if connReq != nil &&
|
s.cancelConnReqs(pubStr, &ignore)
|
||||||
pConnReq.ID() != connReq.ID() {
|
} else {
|
||||||
|
// This was a successful connection made by some other
|
||||||
s.connMgr.Remove(pConnReq.ID())
|
// subsystem. Remove all requests being managed by the connmgr.
|
||||||
}
|
s.cancelConnReqs(pubStr, nil)
|
||||||
}
|
|
||||||
delete(s.persistentConnReqs, pubStr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we already have a connection with this peer, decide whether or not
|
// If we already have a connection with this peer, decide whether or not
|
||||||
@ -1549,6 +1559,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
|||||||
// If our (this) connection should be dropped, then we'll do
|
// If our (this) connection should be dropped, then we'll do
|
||||||
// so, in order to ensure we don't have any duplicate
|
// so, in order to ensure we don't have any duplicate
|
||||||
// connections.
|
// connections.
|
||||||
|
localPub := s.identityPriv.PubKey()
|
||||||
if shouldDropLocalConnection(localPub, nodePub) {
|
if shouldDropLocalConnection(localPub, nodePub) {
|
||||||
srvrLog.Warnf("Established outbound connection to "+
|
srvrLog.Warnf("Established outbound connection to "+
|
||||||
"peer %x, but already connected, dropping conn",
|
"peer %x, but already connected, dropping conn",
|
||||||
@ -1576,6 +1587,55 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
|||||||
s.peerConnected(conn, connReq, true)
|
s.peerConnected(conn, connReq, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UnassignedConnID is the default connection ID that a request can have before
|
||||||
|
// it actually is submitted to the connmgr.
|
||||||
|
// TODO(conner): move into connmgr package, or better, add connmgr method for
|
||||||
|
// generating atomic IDs
|
||||||
|
const UnassignedConnID uint64 = 0
|
||||||
|
|
||||||
|
// cancelConnReqs stops all persistent connection requests for a given pubkey.
|
||||||
|
// Any attempts initiated by the peerTerminationWatcher are canceled first.
|
||||||
|
// Afterwards, each connection request removed from the connmgr. The caller can
|
||||||
|
// optionally specify a connection ID to ignore, which prevents us from
|
||||||
|
// canceling a successful request. All persistent connreqs for the provided
|
||||||
|
// pubkey are discarded after the operationjw.
|
||||||
|
func (s *server) cancelConnReqs(pubStr string, skip *uint64) {
|
||||||
|
// First, cancel any lingering persistent retry attempts, which will
|
||||||
|
// prevent retries for any with backoffs that are still maturing.
|
||||||
|
if cancelChan, ok := s.persistentRetryCancels[pubStr]; ok {
|
||||||
|
close(cancelChan)
|
||||||
|
delete(s.persistentRetryCancels, pubStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next, check to see if we have any outstanding persistent connection
|
||||||
|
// requests to this peer. If so, then we'll remove all of these
|
||||||
|
// connection requests, and also delete the entry from the map.
|
||||||
|
connReqs, ok := s.persistentConnReqs[pubStr]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, connReq := range connReqs {
|
||||||
|
// Atomically capture the current request identifier.
|
||||||
|
connID := connReq.ID()
|
||||||
|
|
||||||
|
// Skip any zero IDs, this indicates the request has not
|
||||||
|
// yet been schedule.
|
||||||
|
if connID == UnassignedConnID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip a particular connection ID if instructed.
|
||||||
|
if skip != nil && connID == *skip {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
s.connMgr.Remove(connID)
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(s.persistentConnReqs, pubStr)
|
||||||
|
}
|
||||||
|
|
||||||
// addPeer adds the passed peer to the server's global state of all active
|
// addPeer adds the passed peer to the server's global state of all active
|
||||||
// peers.
|
// peers.
|
||||||
func (s *server) addPeer(p *peer) {
|
func (s *server) addPeer(p *peer) {
|
||||||
@ -1713,9 +1773,9 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error {
|
|||||||
// If there's already a pending connection request for this pubkey,
|
// If there's already a pending connection request for this pubkey,
|
||||||
// then we ignore this request to ensure we don't create a redundant
|
// then we ignore this request to ensure we don't create a redundant
|
||||||
// connection.
|
// connection.
|
||||||
if _, ok := s.persistentConnReqs[targetPub]; ok {
|
if reqs, ok := s.persistentConnReqs[targetPub]; ok {
|
||||||
s.mu.Unlock()
|
srvrLog.Warnf("Already have %d persistent connection "+
|
||||||
return fmt.Errorf("connection attempt to %v is pending", addr)
|
"requests for %v, connecting anyway.", len(reqs), addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there's not already a pending or active connection to this node,
|
// If there's not already a pending or active connection to this node,
|
||||||
@ -1780,6 +1840,8 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
|
|||||||
|
|
||||||
srvrLog.Infof("Disconnecting from %v", peer)
|
srvrLog.Infof("Disconnecting from %v", peer)
|
||||||
|
|
||||||
|
//s.cancelConnReqs(pubStr, nil)
|
||||||
|
|
||||||
// If this peer was formerly a persistent connection, then we'll remove
|
// If this peer was formerly a persistent connection, then we'll remove
|
||||||
// them from this map so we don't attempt to re-connect after we
|
// them from this map so we don't attempt to re-connect after we
|
||||||
// disconnect.
|
// disconnect.
|
||||||
@ -1928,3 +1990,43 @@ func computeNextBackoff(currBackoff time.Duration) time.Duration {
|
|||||||
// that the backoff can tweaked by 1/20 in either direction.
|
// that the backoff can tweaked by 1/20 in either direction.
|
||||||
return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2)
|
return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fetchNodeAdvertisedPort attempts to fetch the advertised port of the target
|
||||||
|
// node. If a port isn't found, then the default port will be used.
|
||||||
|
func (s *server) fetchNodeAdvertisedPort(pub *btcec.PublicKey,
|
||||||
|
targetAddr *net.TCPAddr) int {
|
||||||
|
|
||||||
|
// If the target port is already the default peer port, then we'll
|
||||||
|
// return that.
|
||||||
|
if targetAddr.Port == defaultPeerPort {
|
||||||
|
return defaultPeerPort
|
||||||
|
}
|
||||||
|
|
||||||
|
node, err := s.chanDB.ChannelGraph().FetchLightningNode(pub)
|
||||||
|
|
||||||
|
// If the node wasn't found, then we'll just return the current default
|
||||||
|
// port.
|
||||||
|
if err != nil {
|
||||||
|
return defaultPeerPort
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, we'll attempt to find a matching advertised IP, and will
|
||||||
|
// then use the port for that.
|
||||||
|
for _, addr := range node.Addresses {
|
||||||
|
// We'll only examine an address if it's a TCP address.
|
||||||
|
tcpAddr, ok := addr.(*net.TCPAddr)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If this is the matching IP, then we'll return the port that
|
||||||
|
// it has been advertised with.
|
||||||
|
if tcpAddr.IP.Equal(targetAddr.IP) {
|
||||||
|
return tcpAddr.Port
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we couldn't find a matching IP, then we'll just return the
|
||||||
|
// default port.
|
||||||
|
return defaultPeerPort
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user