Merge pull request #1694 from wpaulino/autopilot-reachability-test

autopilot: add peer reachability test before tracking funding as pending
This commit is contained in:
Olaoluwa Osuntokun 2018-08-16 20:18:49 -07:00 committed by GitHub
commit 643618b4f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 223 additions and 93 deletions

@ -1,6 +1,7 @@
package autopilot package autopilot
import ( import (
"net"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -32,6 +33,15 @@ type Config struct {
// creation, closing and update of channels within the network. // creation, closing and update of channels within the network.
ChanController ChannelController ChanController ChannelController
// ConnectToPeer attempts to connect to the peer using one of its
// advertised addresses. The boolean returned signals whether the peer
// was already connected.
ConnectToPeer func(*btcec.PublicKey, []net.Addr) (bool, error)
// DisconnectPeer attempts to disconnect the peer with the given public
// key.
DisconnectPeer func(*btcec.PublicKey) error
// WalletBalance is a function closure that should return the current // WalletBalance is a function closure that should return the current
// available balance o the backing wallet. // available balance o the backing wallet.
WalletBalance func() (btcutil.Amount, error) WalletBalance func() (btcutil.Amount, error)
@ -448,20 +458,86 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
continue continue
} }
nID := NewNodeID(chanCandidate.PeerKey) go func(directive AttachmentDirective) {
pendingOpens[nID] = Channel{ // We'll start out by attempting to
Capacity: chanCandidate.ChanAmt, // connect to the peer in order to begin
Node: nID, // the funding workflow.
pub := directive.PeerKey
alreadyConnected, err := a.cfg.ConnectToPeer(
pub, directive.Addrs,
)
if err != nil {
log.Warnf("Unable to connect "+
"to %x: %v",
pub.SerializeCompressed(),
err)
// Since we failed to connect to
// them, we'll mark them as
// failed so that we don't
// attempt to connect to them
// again.
nodeID := NewNodeID(pub)
pendingMtx.Lock()
failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock()
// Finally, we'll trigger the
// agent to select new peers to
// connect to.
a.OnChannelOpenFailure()
return
} }
go func(directive AttachmentDirective) { // If we were succesful, we'll track
// this peer in our set of pending
// opens. We do this here to ensure we
// don't stall on selecting new peers if
// the connection attempt happens to
// take too long.
pendingMtx.Lock()
if uint16(len(pendingOpens))+1 >
a.cfg.MaxPendingOpens {
pub := directive.PeerKey pendingMtx.Unlock()
err := a.cfg.ChanController.OpenChannel(
directive.PeerKey, // Since we've reached our max
directive.ChanAmt, // number of pending opens,
directive.Addrs, // we'll disconnect this peer
// and exit. However, if we were
// previously connected to them,
// then we'll make sure to
// maintain the connection
// alive.
if alreadyConnected {
return
}
err = a.cfg.DisconnectPeer(
pub,
)
if err != nil {
log.Warnf("Unable to "+
"disconnect peer "+
"%x: %v",
pub.SerializeCompressed(),
err)
}
return
}
nodeID := NewNodeID(directive.PeerKey)
pendingOpens[nodeID] = Channel{
Capacity: directive.ChanAmt,
Node: nodeID,
}
pendingMtx.Unlock()
// We can then begin the funding
// workflow with this peer.
err = a.cfg.ChanController.OpenChannel(
pub, directive.ChanAmt,
) )
if err != nil { if err != nil {
log.Warnf("Unable to open "+ log.Warnf("Unable to open "+
@ -470,23 +546,40 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
directive.ChanAmt, err) directive.ChanAmt, err)
// As the attempt failed, we'll // As the attempt failed, we'll
// clear it from the set of // clear the peer from the set of
// pending channels. // pending opens and mark them
// as failed so we don't attempt
// to open a channel to them
// again.
pendingMtx.Lock() pendingMtx.Lock()
nID := NewNodeID(directive.PeerKey) delete(pendingOpens, nodeID)
delete(pendingOpens, nID) failedNodes[nodeID] = struct{}{}
// Mark this node as failed so we don't
// attempt it again.
failedNodes[nID] = struct{}{}
pendingMtx.Unlock() pendingMtx.Unlock()
// Trigger the autopilot controller to // Trigger the agent to
// re-evaluate everything and possibly // re-evaluate everything and
// retry with a different node. // possibly retry with a
// different node.
a.OnChannelOpenFailure() a.OnChannelOpenFailure()
// Finally, we should also
// disconnect the peer if we
// weren't already connected to
// them beforehand by an
// external subsystem.
if alreadyConnected {
return
} }
err = a.cfg.DisconnectPeer(pub)
if err != nil {
log.Warnf("Unable to "+
"disconnect peer "+
"%x: %v",
pub.SerializeCompressed(),
err)
}
}
}(chanCandidate) }(chanCandidate)
} }
pendingMtx.Unlock() pendingMtx.Unlock()

@ -77,7 +77,6 @@ var _ AttachmentHeuristic = (*mockHeuristic)(nil)
type openChanIntent struct { type openChanIntent struct {
target *btcec.PublicKey target *btcec.PublicKey
amt btcutil.Amount amt btcutil.Amount
addrs []net.Addr
private bool private bool
} }
@ -86,13 +85,12 @@ type mockChanController struct {
private bool private bool
} }
func (m *mockChanController) OpenChannel(target *btcec.PublicKey, amt btcutil.Amount, func (m *mockChanController) OpenChannel(target *btcec.PublicKey,
addrs []net.Addr) error { amt btcutil.Amount) error {
m.openChanSignals <- openChanIntent{ m.openChanSignals <- openChanIntent{
target: target, target: target,
amt: amt, amt: amt,
addrs: addrs,
private: m.private, private: m.private,
} }
@ -142,6 +140,12 @@ func TestAgentChannelOpenSignal(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return 0, nil return 0, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph, Graph: memGraph,
MaxPendingOpens: 10, MaxPendingOpens: 10,
} }
@ -230,8 +234,8 @@ func TestAgentChannelOpenSignal(t *testing.T) {
type mockFailingChanController struct { type mockFailingChanController struct {
} }
func (m *mockFailingChanController) OpenChannel(target *btcec.PublicKey, amt btcutil.Amount, func (m *mockFailingChanController) OpenChannel(target *btcec.PublicKey,
addrs []net.Addr) error { amt btcutil.Amount) error {
return errors.New("failure") return errors.New("failure")
} }
@ -276,6 +280,12 @@ func TestAgentChannelFailureSignal(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return 0, nil return 0, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph, Graph: memGraph,
MaxPendingOpens: 10, MaxPendingOpens: 10,
} }
@ -366,6 +376,12 @@ func TestAgentChannelCloseSignal(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return 0, nil return 0, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph, Graph: memGraph,
MaxPendingOpens: 10, MaxPendingOpens: 10,
} }
@ -490,6 +506,12 @@ func TestAgentBalanceUpdate(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil return walletBalance, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph, Graph: memGraph,
MaxPendingOpens: 10, MaxPendingOpens: 10,
} }
@ -606,6 +628,12 @@ func TestAgentImmediateAttach(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil return walletBalance, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph, Graph: memGraph,
MaxPendingOpens: 10, MaxPendingOpens: 10,
} }
@ -698,10 +726,6 @@ func TestAgentImmediateAttach(t *testing.T) {
self.SerializeCompressed(), self.SerializeCompressed(),
openChan.target.SerializeCompressed()) openChan.target.SerializeCompressed())
} }
if len(openChan.addrs) != 1 {
t.Fatalf("should have single addr, instead have: %v",
len(openChan.addrs))
}
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("channel not opened in time") t.Fatalf("channel not opened in time")
} }
@ -743,6 +767,12 @@ func TestAgentPrivateChannels(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil return walletBalance, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph, Graph: memGraph,
MaxPendingOpens: 10, MaxPendingOpens: 10,
} }
@ -869,6 +899,12 @@ func TestAgentPendingChannelState(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil return walletBalance, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph, Graph: memGraph,
MaxPendingOpens: 10, MaxPendingOpens: 10,
} }
@ -949,10 +985,6 @@ func TestAgentPendingChannelState(t *testing.T) {
nodeKey.SerializeCompressed(), nodeKey.SerializeCompressed(),
openChan.target.SerializeCompressed()) openChan.target.SerializeCompressed())
} }
if len(openChan.addrs) != 1 {
t.Fatalf("should have single addr, instead have: %v",
len(openChan.addrs))
}
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("channel wasn't opened in time") t.Fatalf("channel wasn't opened in time")
} }

@ -136,8 +136,7 @@ type ChannelController interface {
// specified amount. This function should un-block immediately after // specified amount. This function should un-block immediately after
// the funding transaction that marks the channel open has been // the funding transaction that marks the channel open has been
// broadcast. // broadcast.
OpenChannel(target *btcec.PublicKey, amt btcutil.Amount, OpenChannel(target *btcec.PublicKey, amt btcutil.Amount) error
addrs []net.Addr) error
// CloseChannel attempts to close out the target channel. // CloseChannel attempts to close out the target channel.
// //

114
pilot.go

@ -1,6 +1,7 @@
package main package main
import ( import (
"errors"
"fmt" "fmt"
"net" "net"
@ -24,60 +25,7 @@ type chanController struct {
// specified amount. This function should un-block immediately after the // specified amount. This function should un-block immediately after the
// funding transaction that marks the channel open has been broadcast. // funding transaction that marks the channel open has been broadcast.
func (c *chanController) OpenChannel(target *btcec.PublicKey, func (c *chanController) OpenChannel(target *btcec.PublicKey,
amt btcutil.Amount, addrs []net.Addr) error { amt btcutil.Amount) error {
// We can't establish a channel if no addresses were provided for the
// peer.
if len(addrs) == 0 {
return fmt.Errorf("Unable to create channel w/o an active " +
"address")
}
// First, we'll check if we're already connected to the target peer. If
// not, then we'll need to establish a connection.
if _, err := c.server.FindPeer(target); err != nil {
// TODO(roasbeef): try teach addr
atplLog.Tracef("Connecting to %x to auto-create channel: ",
target.SerializeCompressed())
lnAddr := &lnwire.NetAddress{
IdentityKey: target,
ChainNet: activeNetParams.Net,
}
// We'll attempt to successively connect to each of the
// advertised IP addresses until we've either exhausted the
// advertised IP addresses, or have made a connection.
var connected bool
for _, addr := range addrs {
switch addr.(type) {
case *net.TCPAddr, *tor.OnionAddr:
lnAddr.Address = addr
default:
return fmt.Errorf("unknown address type %T", addr)
}
// TODO(roasbeef): make perm connection in server after
// chan open?
err := c.server.ConnectToPeer(lnAddr, false)
if err != nil {
// If we weren't able to connect to the peer,
// then we'll move onto the next.
continue
}
connected = true
break
}
// If we weren't able to establish a connection at all, then
// we'll error out.
if !connected {
return fmt.Errorf("Unable to connect to %x",
target.SerializeCompressed())
}
}
// With the connection established, we'll now establish our connection // With the connection established, we'll now establish our connection
// to the target peer, waiting for the first update before we exit. // to the target peer, waiting for the first update before we exit.
@ -160,6 +108,64 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error)
}, },
Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()), Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()),
MaxPendingOpens: 10, MaxPendingOpens: 10,
ConnectToPeer: func(target *btcec.PublicKey, addrs []net.Addr) (bool, error) {
// We can't establish a channel if no addresses were
// provided for the peer.
if len(addrs) == 0 {
return false, errors.New("no addresses specified")
}
// First, we'll check if we're already connected to the
// target peer. If we are, we can exit early. Otherwise,
// we'll need to establish a connection.
if _, err := svr.FindPeer(target); err == nil {
return true, nil
}
atplLog.Tracef("Attempting to connect to %x",
target.SerializeCompressed())
lnAddr := &lnwire.NetAddress{
IdentityKey: target,
ChainNet: activeNetParams.Net,
}
// We'll attempt to successively connect to each of the
// advertised IP addresses until we've either exhausted
// the advertised IP addresses, or have made a
// connection.
var connected bool
for _, addr := range addrs {
switch addr.(type) {
case *net.TCPAddr, *tor.OnionAddr:
lnAddr.Address = addr
default:
return false, fmt.Errorf("unknown "+
"address type %T", addr)
}
err := svr.ConnectToPeer(lnAddr, false)
if err != nil {
// If we weren't able to connect to the
// peer at this address, then we'll move
// onto the next.
continue
}
connected = true
break
}
// If we weren't able to establish a connection at all,
// then we'll error out.
if !connected {
return false, fmt.Errorf("unable to connect "+
"to %x", target.SerializeCompressed())
}
return false, nil
},
DisconnectPeer: svr.DisconnectPeer,
} }
// Next, we'll fetch the current state of open channels from the // Next, we'll fetch the current state of open channels from the