diff --git a/autopilot/agent.go b/autopilot/agent.go index b3509116..da69a935 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -1,6 +1,7 @@ package autopilot import ( + "net" "sync" "sync/atomic" @@ -32,6 +33,15 @@ type Config struct { // creation, closing and update of channels within the network. ChanController ChannelController + // ConnectToPeer attempts to connect to the peer using one of its + // advertised addresses. The boolean returned signals whether the peer + // was already connected. + ConnectToPeer func(*btcec.PublicKey, []net.Addr) (bool, error) + + // DisconnectPeer attempts to disconnect the peer with the given public + // key. + DisconnectPeer func(*btcec.PublicKey) error + // WalletBalance is a function closure that should return the current // available balance o the backing wallet. WalletBalance func() (btcutil.Amount, error) @@ -448,20 +458,86 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { continue } - nID := NewNodeID(chanCandidate.PeerKey) - pendingOpens[nID] = Channel{ - Capacity: chanCandidate.ChanAmt, - Node: nID, - } - go func(directive AttachmentDirective) { - + // We'll start out by attempting to + // connect to the peer in order to begin + // the funding workflow. pub := directive.PeerKey - err := a.cfg.ChanController.OpenChannel( + alreadyConnected, err := a.cfg.ConnectToPeer( + pub, directive.Addrs, + ) + if err != nil { + log.Warnf("Unable to connect "+ + "to %x: %v", + pub.SerializeCompressed(), + err) - directive.PeerKey, - directive.ChanAmt, - directive.Addrs, + // Since we failed to connect to + // them, we'll mark them as + // failed so that we don't + // attempt to connect to them + // again. + nodeID := NewNodeID(pub) + pendingMtx.Lock() + failedNodes[nodeID] = struct{}{} + pendingMtx.Unlock() + + // Finally, we'll trigger the + // agent to select new peers to + // connect to. + a.OnChannelOpenFailure() + + return + } + + // If we were succesful, we'll track + // this peer in our set of pending + // opens. We do this here to ensure we + // don't stall on selecting new peers if + // the connection attempt happens to + // take too long. + pendingMtx.Lock() + if uint16(len(pendingOpens))+1 > + a.cfg.MaxPendingOpens { + + pendingMtx.Unlock() + + // Since we've reached our max + // number of pending opens, + // we'll disconnect this peer + // and exit. However, if we were + // previously connected to them, + // then we'll make sure to + // maintain the connection + // alive. + if alreadyConnected { + return + } + + err = a.cfg.DisconnectPeer( + pub, + ) + if err != nil { + log.Warnf("Unable to "+ + "disconnect peer "+ + "%x: %v", + pub.SerializeCompressed(), + err) + } + return + } + + nodeID := NewNodeID(directive.PeerKey) + pendingOpens[nodeID] = Channel{ + Capacity: directive.ChanAmt, + Node: nodeID, + } + pendingMtx.Unlock() + + // We can then begin the funding + // workflow with this peer. + err = a.cfg.ChanController.OpenChannel( + pub, directive.ChanAmt, ) if err != nil { log.Warnf("Unable to open "+ @@ -470,23 +546,40 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { directive.ChanAmt, err) // As the attempt failed, we'll - // clear it from the set of - // pending channels. + // clear the peer from the set of + // pending opens and mark them + // as failed so we don't attempt + // to open a channel to them + // again. pendingMtx.Lock() - nID := NewNodeID(directive.PeerKey) - delete(pendingOpens, nID) - - // Mark this node as failed so we don't - // attempt it again. - failedNodes[nID] = struct{}{} + delete(pendingOpens, nodeID) + failedNodes[nodeID] = struct{}{} pendingMtx.Unlock() - // Trigger the autopilot controller to - // re-evaluate everything and possibly - // retry with a different node. + // Trigger the agent to + // re-evaluate everything and + // possibly retry with a + // different node. a.OnChannelOpenFailure() - } + // Finally, we should also + // disconnect the peer if we + // weren't already connected to + // them beforehand by an + // external subsystem. + if alreadyConnected { + return + } + + err = a.cfg.DisconnectPeer(pub) + if err != nil { + log.Warnf("Unable to "+ + "disconnect peer "+ + "%x: %v", + pub.SerializeCompressed(), + err) + } + } }(chanCandidate) } pendingMtx.Unlock() diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 73c94bd8..9baf96eb 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -77,7 +77,6 @@ var _ AttachmentHeuristic = (*mockHeuristic)(nil) type openChanIntent struct { target *btcec.PublicKey amt btcutil.Amount - addrs []net.Addr private bool } @@ -86,13 +85,12 @@ type mockChanController struct { private bool } -func (m *mockChanController) OpenChannel(target *btcec.PublicKey, amt btcutil.Amount, - addrs []net.Addr) error { +func (m *mockChanController) OpenChannel(target *btcec.PublicKey, + amt btcutil.Amount) error { m.openChanSignals <- openChanIntent{ target: target, amt: amt, - addrs: addrs, private: m.private, } @@ -142,6 +140,12 @@ func TestAgentChannelOpenSignal(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return 0, nil }, + ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { + return false, nil + }, + DisconnectPeer: func(*btcec.PublicKey) error { + return nil + }, Graph: memGraph, MaxPendingOpens: 10, } @@ -230,8 +234,8 @@ func TestAgentChannelOpenSignal(t *testing.T) { type mockFailingChanController struct { } -func (m *mockFailingChanController) OpenChannel(target *btcec.PublicKey, amt btcutil.Amount, - addrs []net.Addr) error { +func (m *mockFailingChanController) OpenChannel(target *btcec.PublicKey, + amt btcutil.Amount) error { return errors.New("failure") } @@ -276,6 +280,12 @@ func TestAgentChannelFailureSignal(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return 0, nil }, + ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { + return false, nil + }, + DisconnectPeer: func(*btcec.PublicKey) error { + return nil + }, Graph: memGraph, MaxPendingOpens: 10, } @@ -366,6 +376,12 @@ func TestAgentChannelCloseSignal(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return 0, nil }, + ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { + return false, nil + }, + DisconnectPeer: func(*btcec.PublicKey) error { + return nil + }, Graph: memGraph, MaxPendingOpens: 10, } @@ -490,6 +506,12 @@ func TestAgentBalanceUpdate(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return walletBalance, nil }, + ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { + return false, nil + }, + DisconnectPeer: func(*btcec.PublicKey) error { + return nil + }, Graph: memGraph, MaxPendingOpens: 10, } @@ -606,6 +628,12 @@ func TestAgentImmediateAttach(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return walletBalance, nil }, + ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { + return false, nil + }, + DisconnectPeer: func(*btcec.PublicKey) error { + return nil + }, Graph: memGraph, MaxPendingOpens: 10, } @@ -698,10 +726,6 @@ func TestAgentImmediateAttach(t *testing.T) { self.SerializeCompressed(), openChan.target.SerializeCompressed()) } - if len(openChan.addrs) != 1 { - t.Fatalf("should have single addr, instead have: %v", - len(openChan.addrs)) - } case <-time.After(time.Second * 10): t.Fatalf("channel not opened in time") } @@ -743,6 +767,12 @@ func TestAgentPrivateChannels(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return walletBalance, nil }, + ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { + return false, nil + }, + DisconnectPeer: func(*btcec.PublicKey) error { + return nil + }, Graph: memGraph, MaxPendingOpens: 10, } @@ -869,6 +899,12 @@ func TestAgentPendingChannelState(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return walletBalance, nil }, + ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { + return false, nil + }, + DisconnectPeer: func(*btcec.PublicKey) error { + return nil + }, Graph: memGraph, MaxPendingOpens: 10, } @@ -949,10 +985,6 @@ func TestAgentPendingChannelState(t *testing.T) { nodeKey.SerializeCompressed(), openChan.target.SerializeCompressed()) } - if len(openChan.addrs) != 1 { - t.Fatalf("should have single addr, instead have: %v", - len(openChan.addrs)) - } case <-time.After(time.Second * 10): t.Fatalf("channel wasn't opened in time") } diff --git a/pilot.go b/pilot.go index 64b029da..86093a6f 100644 --- a/pilot.go +++ b/pilot.go @@ -1,6 +1,7 @@ package main import ( + "errors" "fmt" "net" @@ -24,60 +25,7 @@ type chanController struct { // specified amount. This function should un-block immediately after the // funding transaction that marks the channel open has been broadcast. func (c *chanController) OpenChannel(target *btcec.PublicKey, - amt btcutil.Amount, addrs []net.Addr) error { - - // We can't establish a channel if no addresses were provided for the - // peer. - if len(addrs) == 0 { - return fmt.Errorf("Unable to create channel w/o an active " + - "address") - } - - // First, we'll check if we're already connected to the target peer. If - // not, then we'll need to establish a connection. - if _, err := c.server.FindPeer(target); err != nil { - // TODO(roasbeef): try teach addr - - atplLog.Tracef("Connecting to %x to auto-create channel: ", - target.SerializeCompressed()) - - lnAddr := &lnwire.NetAddress{ - IdentityKey: target, - ChainNet: activeNetParams.Net, - } - - // We'll attempt to successively connect to each of the - // advertised IP addresses until we've either exhausted the - // advertised IP addresses, or have made a connection. - var connected bool - for _, addr := range addrs { - switch addr.(type) { - case *net.TCPAddr, *tor.OnionAddr: - lnAddr.Address = addr - default: - return fmt.Errorf("unknown address type %T", addr) - } - - // TODO(roasbeef): make perm connection in server after - // chan open? - err := c.server.ConnectToPeer(lnAddr, false) - if err != nil { - // If we weren't able to connect to the peer, - // then we'll move onto the next. - continue - } - - connected = true - break - } - - // If we weren't able to establish a connection at all, then - // we'll error out. - if !connected { - return fmt.Errorf("Unable to connect to %x", - target.SerializeCompressed()) - } - } + amt btcutil.Amount) error { // With the connection established, we'll now establish our connection // to the target peer, waiting for the first update before we exit. @@ -160,6 +108,64 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) }, Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()), MaxPendingOpens: 10, + ConnectToPeer: func(target *btcec.PublicKey, addrs []net.Addr) (bool, error) { + // We can't establish a channel if no addresses were + // provided for the peer. + if len(addrs) == 0 { + return false, errors.New("no addresses specified") + } + + // First, we'll check if we're already connected to the + // target peer. If we are, we can exit early. Otherwise, + // we'll need to establish a connection. + if _, err := svr.FindPeer(target); err == nil { + return true, nil + } + + atplLog.Tracef("Attempting to connect to %x", + target.SerializeCompressed()) + + lnAddr := &lnwire.NetAddress{ + IdentityKey: target, + ChainNet: activeNetParams.Net, + } + + // We'll attempt to successively connect to each of the + // advertised IP addresses until we've either exhausted + // the advertised IP addresses, or have made a + // connection. + var connected bool + for _, addr := range addrs { + switch addr.(type) { + case *net.TCPAddr, *tor.OnionAddr: + lnAddr.Address = addr + default: + return false, fmt.Errorf("unknown "+ + "address type %T", addr) + } + + err := svr.ConnectToPeer(lnAddr, false) + if err != nil { + // If we weren't able to connect to the + // peer at this address, then we'll move + // onto the next. + continue + } + + connected = true + break + } + + // If we weren't able to establish a connection at all, + // then we'll error out. + if !connected { + return false, fmt.Errorf("unable to connect "+ + "to %x", target.SerializeCompressed()) + } + + return false, nil + }, + DisconnectPeer: svr.DisconnectPeer, } // Next, we'll fetch the current state of open channels from the