autopilot: add tracking of pending channel state

This commit adds tracking of the pending channels state within the
autopilot.Agent. This fixes a class of bugs which was discovered during
the latest test net block storm wherein the Agent would attempt to
repeatedly attach to the same node due to rapid closure of other
channels.

In this commit we fix this issue by ensuring that we always factor in
the pending channel state when querying the heuristic w.r.t if we need
more channels, and if so to which nodes should be attached to.
This commit is contained in:
Olaoluwa Osuntokun 2017-08-15 18:23:52 -07:00
parent 8034780ca4
commit 711b695a2f
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 284 additions and 11 deletions

View File

@ -50,7 +50,7 @@ type Config struct {
// helper utility methods.
type channelState map[lnwire.ShortChannelID]Channel
// CHannels returns a slice of all the active channels.
// Channels returns a slice of all the active channels.
func (c channelState) Channels() []Channel {
chans := make([]Channel, 0, len(c))
for _, channel := range c {
@ -209,6 +209,43 @@ func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) {
}()
}
// mergeNodeMaps merges the Agent's set of nodes that it already has active
// channels open to, with the set of nodes that are pending new channels. This
// ensures that the Agent doesn't attempt to open any "duplicate" channels to
// the same node.
func mergeNodeMaps(a map[NodeID]struct{},
b map[NodeID]Channel) map[NodeID]struct{} {
c := make(map[NodeID]struct{}, len(a)+len(b))
for nodeID := range a {
c[nodeID] = struct{}{}
}
for nodeID := range b {
c[nodeID] = struct{}{}
}
return c
}
// mergeChanState merges the Agent's set of active channels, with the set of
// channels awaiting confirmation. This ensures that the agent doesn't go over
// the prescribed channel limit or fund allocation limit.
func mergeChanState(pendingChans map[NodeID]Channel,
activeChans channelState) []Channel {
numChans := len(pendingChans) + len(activeChans)
totalChans := make([]Channel, 0, numChans)
for _, activeChan := range activeChans.Channels() {
totalChans = append(totalChans, activeChan)
}
for _, pendingChan := range pendingChans {
totalChans = append(totalChans, pendingChan)
}
return totalChans
}
// controller implements the closed-loop control system of the Agent. The
// controller will make a decision w.r.t channel placement within the graph
// based on: it's current internal state of the set of active channels open,
@ -218,8 +255,6 @@ func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) {
func (a *Agent) controller(startingBalance btcutil.Amount) {
defer a.wg.Done()
// TODO(roasbeef): add queries for internal state?
// We'll start off by assigning our starting balance, and injecting
// that amount as an initial wake up to the main controller goroutine.
a.OnBalanceChange(startingBalance)
@ -227,6 +262,13 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
// TODO(roasbeef): do we in fact need to maintain order?
// * use sync.Cond if so
// pendingOpens tracks the channels that we've requested to be
// initiated, but haven't yet been confirmed as being fully opened.
// This state is required as otherwise, we may go over our allotted
// channel limit, or open multiple channels to the same node.
pendingOpens := make(map[NodeID]Channel)
var pendingMtx sync.Mutex
// TODO(roasbeef): add 10-minute wake up timer
for {
select {
@ -258,6 +300,10 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
newChan := update.newChan
a.chanState[newChan.ChanID] = newChan
pendingMtx.Lock()
delete(pendingOpens, newChan.Node)
pendingMtx.Unlock()
// A channel has been closed, this may free up an
// available slot, triggering a new channel update.
case *chanCloseUpdate:
@ -271,15 +317,17 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
}
// With all the updates applied, we'll obtain a set of
// the current active channels.
chans := a.chanState.Channels()
// the current active channels (confirmed channels),
// and also factor in our set of unconfirmed channels.
confirmedChans := a.chanState
totalChans := mergeChanState(pendingOpens, confirmedChans)
// Now that we've updated our internal state, we'll
// consult our channel attachment heuristic to
// determine if we should open up any additional
// channels or modify existing channels.
availableFunds, needMore := a.cfg.Heuristic.NeedMoreChans(
chans, a.totalBalance,
totalChans, a.totalBalance,
)
if !needMore {
continue
@ -290,7 +338,10 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
// We're to attempt an attachment so we'll o obtain the
// set of nodes that we currently have channels with so
// we avoid duplicate edges.
nodesToSkip := a.chanState.ConnectedNodes()
connectedNodes := a.chanState.ConnectedNodes()
pendingMtx.Lock()
nodesToSkip := mergeNodeMaps(connectedNodes, pendingOpens)
pendingMtx.Unlock()
// If we reach this point, then according to our
// heuristic we should modify our channel state to tend
@ -321,7 +372,14 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
// directive. If any of these succeed, then we'll
// receive a new state update, taking us back to the
// top of our controller loop.
pendingMtx.Lock()
for _, chanCandidate := range chanCandidates {
nID := NewNodeID(chanCandidate.PeerKey)
pendingOpens[nID] = Channel{
Capacity: chanCandidate.ChanAmt,
Node: nID,
}
go func(directive AttachmentDirective) {
pub := directive.PeerKey
err := a.cfg.ChanController.OpenChannel(
@ -334,14 +392,20 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
"channel to %x of %v: %v",
pub.SerializeCompressed(),
directive.ChanAmt, err)
return
// As the attempt failed, we'll
// clear it from the set of
// pending channels.
pendingMtx.Lock()
nID := NewNodeID(directive.PeerKey)
delete(pendingOpens, nID)
pendingMtx.Unlock()
}
// TODO(roasbeef): on err signal
// failure so attempt to allocate
// again?
}(chanCandidate)
}
pendingMtx.Unlock()
// The agent has been signalled to exit, so we'll bail out
// immediately.

View File

@ -17,21 +17,54 @@ type moreChansResp struct {
amt btcutil.Amount
}
type moreChanArg struct {
chans []Channel
balance btcutil.Amount
}
type mockHeuristic struct {
moreChansResps chan moreChansResp
moreChanArgs chan moreChanArg
directiveResps chan []AttachmentDirective
directiveArgs chan directiveArg
}
func (m *mockHeuristic) NeedMoreChans(chans []Channel,
balance btcutil.Amount) (btcutil.Amount, bool) {
if m.moreChanArgs != nil {
m.moreChanArgs <- moreChanArg{
chans: chans,
balance: balance,
}
}
resp := <-m.moreChansResps
return resp.amt, resp.needMore
}
type directiveArg struct {
self *btcec.PublicKey
graph ChannelGraph
amt btcutil.Amount
skip map[NodeID]struct{}
}
func (m *mockHeuristic) Select(self *btcec.PublicKey, graph ChannelGraph,
amtToUse btcutil.Amount, skipChans map[NodeID]struct{}) ([]AttachmentDirective, error) {
if m.directiveArgs != nil {
m.directiveArgs <- directiveArg{
self: self,
graph: graph,
amt: amtToUse,
skip: skipChans,
}
}
resp := <-m.directiveResps
return resp, nil
}
@ -552,3 +585,179 @@ func TestAgentImmediateAttach(t *testing.T) {
}
}
}
// TestAgentPendingChannelState ensures that the agent properly factors in its
// pending channel state when making decisions w.r.t if it needs more channels
// or not, and if so, who is eligible to open new channels to.
func TestAgentPendingChannelState(t *testing.T) {
t.Parallel()
// First, we'll create all the dependencies that we'll need in order to
// create the autopilot agent.
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// The wallet will start with 6 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 6
// With the dependencies we created, we can now create the initial
// agent itself.
testCfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil
},
Graph: memGraph,
}
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// With the autopilot agent and all its dependencies we'll start the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
var wg sync.WaitGroup
// Once again, we'll start by telling the agent as part of its first
// query, that it needs more channels and has 3 BTC available for
// attachment.
wg.Add(1)
go func() {
select {
// We'll send over a response indicating that it should
// establish more channels, and give it a budget of 1 BTC to do
// so.
case heuristic.moreChansResps <- moreChansResp{true, btcutil.SatoshiPerBitcoin}:
wg.Done()
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
// We'll wait for the first query to be consumed. If this doesn't
// happen then the above goroutine will timeout, and fail the test.
wg.Wait()
heuristic.moreChanArgs = make(chan moreChanArg)
// Next, the agent should deliver a query to the Select method of the
// heuristic. We'll only return a single directive for a pre-chosen
// node.
nodeKey, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeID := NewNodeID(nodeKey)
nodeDirective := AttachmentDirective{
PeerKey: nodeKey,
ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
&net.TCPAddr{
IP: bytes.Repeat([]byte("a"), 16),
},
},
}
select {
case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}:
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
heuristic.directiveArgs = make(chan directiveArg)
// A request to open the channel should've also been sent.
select {
case openChan := <-chanController.openChanSignals:
if openChan.amt != nodeDirective.ChanAmt {
t.Fatalf("invalid chan amt: expected %v, got %v",
nodeDirective.ChanAmt, openChan.amt)
}
if !openChan.target.IsEqual(nodeKey) {
t.Fatalf("unexpected key: expected %x, got %x",
nodeKey.SerializeCompressed(),
openChan.target.SerializeCompressed())
}
if len(openChan.addrs) != 1 {
t.Fatalf("should have single addr, instead have: %v",
len(openChan.addrs))
}
case <-time.After(time.Second * 10):
t.Fatalf("channel wasn't opened in time")
}
// Now, in order to test that the pending state was properly updated,
// we'll trigger a balance update in order to trigger a query to the
// heuristic.
agent.OnBalanceChange(0.4 * btcutil.SatoshiPerBitcoin)
wg = sync.WaitGroup{}
// The heuristic should be queried, and the argument for the set of
// channels passed in should include the pending channels that
// should've been created above.
select {
// The request that we get should include a pending channel for the
// one that we just created, otherwise the agent isn't properly
// updating its internal state.
case req := <-heuristic.moreChanArgs:
if len(req.chans) != 1 {
t.Fatalf("should include pending chan in current "+
"state, instead have %v chans", len(req.chans))
}
if req.chans[0].Capacity != nodeDirective.ChanAmt {
t.Fatalf("wrong chan capacity: expected %v, got %v",
req.chans[0].Capacity, nodeDirective.ChanAmt)
}
if req.chans[0].Node != nodeID {
t.Fatalf("wrong node ID: expected %x, got %x",
req.chans[0].Node[:], nodeID)
}
case <-time.After(time.Second * 10):
t.Fatalf("need more chans wasn't queried in time")
}
// We'll send across a response indicating that it *does* need more
// channels.
select {
case heuristic.moreChansResps <- moreChansResp{true, btcutil.SatoshiPerBitcoin}:
case <-time.After(time.Second * 10):
t.Fatalf("need more chans wasn't queried in time")
}
// The response above should prompt the agent to make a query to the
// Select method. The arguments passed should reflect the fact that the
// node we have a pending channel to, should be ignored.
select {
case req := <-heuristic.directiveArgs:
if len(req.skip) == 0 {
t.Fatalf("expected to skip %v nodes, instead "+
"skipping %v", 1, len(req.skip))
}
if _, ok := req.skip[nodeID]; !ok {
t.Fatalf("pending node not included in skip arguments")
}
case <-time.After(time.Second * 10):
t.Fatalf("select wasn't queried in time")
}
}