autopilot+pilot: refactor connection logic out of OpenChannel

In this commit, we refactor the existing connection logic outside of the
ChanController's OpenChannel method. We do this as previously it was
possible for peers to stall us while attempting to connect to them. In
order to remedy this, we now attempt to connect the peer before tracking
them in our set of pending opens.
This commit is contained in:
Wilmer Paulino 2018-08-06 18:58:36 -07:00
parent 2fbe95ebba
commit 454b549c7e
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
3 changed files with 222 additions and 91 deletions

@ -1,6 +1,7 @@
package autopilot package autopilot
import ( import (
"net"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -32,6 +33,15 @@ type Config struct {
// creation, closing and update of channels within the network. // creation, closing and update of channels within the network.
ChanController ChannelController ChanController ChannelController
// ConnectToPeer attempts to connect to the peer using one of its
// advertised addresses. The boolean returned signals whether the peer
// was already connected.
ConnectToPeer func(*btcec.PublicKey, []net.Addr) (bool, error)
// DisconnectPeer attempts to disconnect the peer with the given public
// key.
DisconnectPeer func(*btcec.PublicKey) error
// WalletBalance is a function closure that should return the current // WalletBalance is a function closure that should return the current
// available balance o the backing wallet. // available balance o the backing wallet.
WalletBalance func() (btcutil.Amount, error) WalletBalance func() (btcutil.Amount, error)
@ -448,20 +458,86 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
continue continue
} }
nID := NewNodeID(chanCandidate.PeerKey) go func(directive AttachmentDirective) {
pendingOpens[nID] = Channel{ // We'll start out by attempting to
Capacity: chanCandidate.ChanAmt, // connect to the peer in order to begin
Node: nID, // the funding workflow.
pub := directive.PeerKey
alreadyConnected, err := a.cfg.ConnectToPeer(
pub, directive.Addrs,
)
if err != nil {
log.Warnf("Unable to connect "+
"to %x: %v",
pub.SerializeCompressed(),
err)
// Since we failed to connect to
// them, we'll mark them as
// failed so that we don't
// attempt to connect to them
// again.
nodeID := NewNodeID(pub)
pendingMtx.Lock()
failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock()
// Finally, we'll trigger the
// agent to select new peers to
// connect to.
a.OnChannelOpenFailure()
return
} }
go func(directive AttachmentDirective) { // If we were succesful, we'll track
// this peer in our set of pending
// opens. We do this here to ensure we
// don't stall on selecting new peers if
// the connection attempt happens to
// take too long.
pendingMtx.Lock()
if uint16(len(pendingOpens))+1 >
a.cfg.MaxPendingOpens {
pub := directive.PeerKey pendingMtx.Unlock()
err := a.cfg.ChanController.OpenChannel(
directive.PeerKey, // Since we've reached our max
directive.ChanAmt, // number of pending opens,
directive.Addrs, // we'll disconnect this peer
// and exit. However, if we were
// previously connected to them,
// then we'll make sure to
// maintain the connection
// alive.
if alreadyConnected {
return
}
err = a.cfg.DisconnectPeer(
pub,
)
if err != nil {
log.Warnf("Unable to "+
"disconnect peer "+
"%x: %v",
pub.SerializeCompressed(),
err)
}
return
}
nodeID := NewNodeID(directive.PeerKey)
pendingOpens[nodeID] = Channel{
Capacity: directive.ChanAmt,
Node: nodeID,
}
pendingMtx.Unlock()
// We can then begin the funding
// workflow with this peer.
err = a.cfg.ChanController.OpenChannel(
pub, directive.ChanAmt,
) )
if err != nil { if err != nil {
log.Warnf("Unable to open "+ log.Warnf("Unable to open "+
@ -470,23 +546,40 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
directive.ChanAmt, err) directive.ChanAmt, err)
// As the attempt failed, we'll // As the attempt failed, we'll
// clear it from the set of // clear the peer from the set of
// pending channels. // pending opens and mark them
// as failed so we don't attempt
// to open a channel to them
// again.
pendingMtx.Lock() pendingMtx.Lock()
nID := NewNodeID(directive.PeerKey) delete(pendingOpens, nodeID)
delete(pendingOpens, nID) failedNodes[nodeID] = struct{}{}
// Mark this node as failed so we don't
// attempt it again.
failedNodes[nID] = struct{}{}
pendingMtx.Unlock() pendingMtx.Unlock()
// Trigger the autopilot controller to // Trigger the agent to
// re-evaluate everything and possibly // re-evaluate everything and
// retry with a different node. // possibly retry with a
// different node.
a.OnChannelOpenFailure() a.OnChannelOpenFailure()
// Finally, we should also
// disconnect the peer if we
// weren't already connected to
// them beforehand by an
// external subsystem.
if alreadyConnected {
return
} }
err = a.cfg.DisconnectPeer(pub)
if err != nil {
log.Warnf("Unable to "+
"disconnect peer "+
"%x: %v",
pub.SerializeCompressed(),
err)
}
}
}(chanCandidate) }(chanCandidate)
} }
pendingMtx.Unlock() pendingMtx.Unlock()

@ -77,7 +77,6 @@ var _ AttachmentHeuristic = (*mockHeuristic)(nil)
type openChanIntent struct { type openChanIntent struct {
target *btcec.PublicKey target *btcec.PublicKey
amt btcutil.Amount amt btcutil.Amount
addrs []net.Addr
private bool private bool
} }
@ -86,13 +85,12 @@ type mockChanController struct {
private bool private bool
} }
func (m *mockChanController) OpenChannel(target *btcec.PublicKey, amt btcutil.Amount, func (m *mockChanController) OpenChannel(target *btcec.PublicKey,
addrs []net.Addr) error { amt btcutil.Amount) error {
m.openChanSignals <- openChanIntent{ m.openChanSignals <- openChanIntent{
target: target, target: target,
amt: amt, amt: amt,
addrs: addrs,
private: m.private, private: m.private,
} }
@ -142,6 +140,12 @@ func TestAgentChannelOpenSignal(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return 0, nil return 0, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph, Graph: memGraph,
MaxPendingOpens: 10, MaxPendingOpens: 10,
} }
@ -230,8 +234,8 @@ func TestAgentChannelOpenSignal(t *testing.T) {
type mockFailingChanController struct { type mockFailingChanController struct {
} }
func (m *mockFailingChanController) OpenChannel(target *btcec.PublicKey, amt btcutil.Amount, func (m *mockFailingChanController) OpenChannel(target *btcec.PublicKey,
addrs []net.Addr) error { amt btcutil.Amount) error {
return errors.New("failure") return errors.New("failure")
} }
@ -276,6 +280,12 @@ func TestAgentChannelFailureSignal(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return 0, nil return 0, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph, Graph: memGraph,
MaxPendingOpens: 10, MaxPendingOpens: 10,
} }
@ -366,6 +376,12 @@ func TestAgentChannelCloseSignal(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return 0, nil return 0, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph, Graph: memGraph,
MaxPendingOpens: 10, MaxPendingOpens: 10,
} }
@ -490,6 +506,12 @@ func TestAgentBalanceUpdate(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil return walletBalance, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph, Graph: memGraph,
MaxPendingOpens: 10, MaxPendingOpens: 10,
} }
@ -606,6 +628,12 @@ func TestAgentImmediateAttach(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil return walletBalance, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph, Graph: memGraph,
MaxPendingOpens: 10, MaxPendingOpens: 10,
} }
@ -698,10 +726,6 @@ func TestAgentImmediateAttach(t *testing.T) {
self.SerializeCompressed(), self.SerializeCompressed(),
openChan.target.SerializeCompressed()) openChan.target.SerializeCompressed())
} }
if len(openChan.addrs) != 1 {
t.Fatalf("should have single addr, instead have: %v",
len(openChan.addrs))
}
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("channel not opened in time") t.Fatalf("channel not opened in time")
} }
@ -743,6 +767,12 @@ func TestAgentPrivateChannels(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil return walletBalance, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph, Graph: memGraph,
MaxPendingOpens: 10, MaxPendingOpens: 10,
} }
@ -869,6 +899,12 @@ func TestAgentPendingChannelState(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil return walletBalance, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
return false, nil
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph, Graph: memGraph,
MaxPendingOpens: 10, MaxPendingOpens: 10,
} }
@ -949,10 +985,6 @@ func TestAgentPendingChannelState(t *testing.T) {
nodeKey.SerializeCompressed(), nodeKey.SerializeCompressed(),
openChan.target.SerializeCompressed()) openChan.target.SerializeCompressed())
} }
if len(openChan.addrs) != 1 {
t.Fatalf("should have single addr, instead have: %v",
len(openChan.addrs))
}
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("channel wasn't opened in time") t.Fatalf("channel wasn't opened in time")
} }

114
pilot.go

@ -1,6 +1,7 @@
package main package main
import ( import (
"errors"
"fmt" "fmt"
"net" "net"
@ -24,60 +25,7 @@ type chanController struct {
// specified amount. This function should un-block immediately after the // specified amount. This function should un-block immediately after the
// funding transaction that marks the channel open has been broadcast. // funding transaction that marks the channel open has been broadcast.
func (c *chanController) OpenChannel(target *btcec.PublicKey, func (c *chanController) OpenChannel(target *btcec.PublicKey,
amt btcutil.Amount, addrs []net.Addr) error { amt btcutil.Amount) error {
// We can't establish a channel if no addresses were provided for the
// peer.
if len(addrs) == 0 {
return fmt.Errorf("Unable to create channel w/o an active " +
"address")
}
// First, we'll check if we're already connected to the target peer. If
// not, then we'll need to establish a connection.
if _, err := c.server.FindPeer(target); err != nil {
// TODO(roasbeef): try teach addr
atplLog.Tracef("Connecting to %x to auto-create channel: ",
target.SerializeCompressed())
lnAddr := &lnwire.NetAddress{
IdentityKey: target,
ChainNet: activeNetParams.Net,
}
// We'll attempt to successively connect to each of the
// advertised IP addresses until we've either exhausted the
// advertised IP addresses, or have made a connection.
var connected bool
for _, addr := range addrs {
switch addr.(type) {
case *net.TCPAddr, *tor.OnionAddr:
lnAddr.Address = addr
default:
return fmt.Errorf("unknown address type %T", addr)
}
// TODO(roasbeef): make perm connection in server after
// chan open?
err := c.server.ConnectToPeer(lnAddr, false)
if err != nil {
// If we weren't able to connect to the peer,
// then we'll move onto the next.
continue
}
connected = true
break
}
// If we weren't able to establish a connection at all, then
// we'll error out.
if !connected {
return fmt.Errorf("Unable to connect to %x",
target.SerializeCompressed())
}
}
// With the connection established, we'll now establish our connection // With the connection established, we'll now establish our connection
// to the target peer, waiting for the first update before we exit. // to the target peer, waiting for the first update before we exit.
@ -160,6 +108,64 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error)
}, },
Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()), Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()),
MaxPendingOpens: 10, MaxPendingOpens: 10,
ConnectToPeer: func(target *btcec.PublicKey, addrs []net.Addr) (bool, error) {
// We can't establish a channel if no addresses were
// provided for the peer.
if len(addrs) == 0 {
return false, errors.New("no addresses specified")
}
// First, we'll check if we're already connected to the
// target peer. If we are, we can exit early. Otherwise,
// we'll need to establish a connection.
if _, err := svr.FindPeer(target); err == nil {
return true, nil
}
atplLog.Tracef("Attempting to connect to %x",
target.SerializeCompressed())
lnAddr := &lnwire.NetAddress{
IdentityKey: target,
ChainNet: activeNetParams.Net,
}
// We'll attempt to successively connect to each of the
// advertised IP addresses until we've either exhausted
// the advertised IP addresses, or have made a
// connection.
var connected bool
for _, addr := range addrs {
switch addr.(type) {
case *net.TCPAddr, *tor.OnionAddr:
lnAddr.Address = addr
default:
return false, fmt.Errorf("unknown "+
"address type %T", addr)
}
err := svr.ConnectToPeer(lnAddr, false)
if err != nil {
// If we weren't able to connect to the
// peer at this address, then we'll move
// onto the next.
continue
}
connected = true
break
}
// If we weren't able to establish a connection at all,
// then we'll error out.
if !connected {
return false, fmt.Errorf("unable to connect "+
"to %x", target.SerializeCompressed())
}
return false, nil
},
DisconnectPeer: svr.DisconnectPeer,
} }
// Next, we'll fetch the current state of open channels from the // Next, we'll fetch the current state of open channels from the