chanfitness: add test context for better testing

As we add more elements to the chanfitness subsystem, we will require
more complex testing. The current tests are built around the inability
to mock subscriptions, which is remedied by addition of our own mock.
This context allows us to run the full store in a test, rather than
having to manually spin up the main goroutine. Mocking our subscriptions
is required so that we can block our subscribe updates on consumption,
using the real package provides us with no guarantee that the client
receives the update before shutdown, which produces test flakes.

This change also makes a move towards separating out the testing of our
event store from testing the underlying event logs to prepare for
further refactoring.
This commit is contained in:
carla 2020-09-08 13:47:14 +02:00
parent 3aa008ab04
commit 7afd113b9f
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91
2 changed files with 350 additions and 311 deletions

@ -2,17 +2,16 @@ package chanfitness
import (
"errors"
"math/big"
"testing"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/subscribe"
"github.com/stretchr/testify/require"
)
// TestStartStoreError tests the starting of the store in cases where the setup
@ -21,20 +20,18 @@ import (
func TestStartStoreError(t *testing.T) {
// Ok and erroring subscribe functions are defined here to de-clutter
// tests.
okSubscribeFunc := func() (*subscribe.Client, error) {
return &subscribe.Client{
Cancel: func() {},
}, nil
okSubscribeFunc := func() (subscribe.Subscription, error) {
return newMockSubscription(t), nil
}
errSubscribeFunc := func() (*subscribe.Client, error) {
errSubscribeFunc := func() (subscribe.Subscription, error) {
return nil, errors.New("intentional test err")
}
tests := []struct {
name string
ChannelEvents func() (*subscribe.Client, error)
PeerEvents func() (*subscribe.Client, error)
ChannelEvents func() (subscribe.Subscription, error)
PeerEvents func() (subscribe.Subscription, error)
GetChannels func() ([]*channeldb.OpenChannel, error)
}{
{
@ -76,29 +73,6 @@ func TestStartStoreError(t *testing.T) {
}
}
// getTestChannel returns a non-zero peer pubKey, serialized pubKey and channel
// outpoint for testing.
func getTestChannel(t *testing.T) (*btcec.PublicKey, route.Vertex,
wire.OutPoint) {
privKey, err := btcec.NewPrivateKey(btcec.S256())
if err != nil {
t.Fatalf("Error getting pubkey: %v", err)
}
pubKey, err := route.NewVertexFromBytes(
privKey.PubKey().SerializeCompressed(),
)
if err != nil {
t.Fatalf("Could not create vertex: %v", err)
}
return privKey.PubKey(), pubKey, wire.OutPoint{
Hash: [chainhash.HashSize]byte{1, 2, 3},
Index: 0,
}
}
// TestMonitorChannelEvents tests the store's handling of channel and peer
// events. It tests for the unexpected cases where we receive a channel open for
// an already known channel and but does not test for closing an unknown channel
@ -106,187 +80,103 @@ func getTestChannel(t *testing.T) (*btcec.PublicKey, route.Vertex,
// through an eventLog which does not exist. This test does not test handling
// of uptime and lifespan requests, as they are tested in their own tests.
func TestMonitorChannelEvents(t *testing.T) {
pubKey, vertex, chanPoint := getTestChannel(t)
tests := []struct {
name string
// generateEvents takes channels which represent the updates
// channels for subscription clients and passes events in the
// desired order. This function is intended to be blocking so
// that the test does not have a data race with event
// consumption, so the channels should not be buffered.
generateEvents func(channelEvents,
peerEvents chan<- interface{})
// expectedEvents is the expected set of event types in the store.
expectedEvents []eventType
}{
{
name: "Channel opened, peer comes online",
generateEvents: func(channelEvents,
peerEvents chan<- interface{}) {
// Add an open channel event
channelEvents <- channelnotifier.OpenChannelEvent{
Channel: &channeldb.OpenChannel{
FundingOutpoint: chanPoint,
IdentityPub: pubKey,
},
var (
pubKey = &btcec.PublicKey{
X: big.NewInt(0),
Y: big.NewInt(1),
Curve: btcec.S256(),
}
// Add a peer online event.
peerEvents <- peernotifier.PeerOnlineEvent{
PubKey: vertex,
}
},
expectedEvents: []eventType{peerOnlineEvent},
},
{
name: "Duplicate channel open events",
generateEvents: func(channelEvents,
peerEvents chan<- interface{}) {
chan1 = wire.OutPoint{Index: 1}
chan2 = wire.OutPoint{Index: 2}
)
// Add an open channel event
channelEvents <- channelnotifier.OpenChannelEvent{
Channel: &channeldb.OpenChannel{
FundingOutpoint: chanPoint,
IdentityPub: pubKey,
},
peer1, err := route.NewVertexFromBytes(pubKey.SerializeCompressed())
require.NoError(t, err)
t.Run("peer comes online after channel open", func(t *testing.T) {
gen := func(ctx *chanEventStoreTestCtx) {
ctx.sendChannelOpenedUpdate(pubKey, chan1)
ctx.peerEvent(peer1, true)
}
// Add a peer online event.
peerEvents <- peernotifier.PeerOnlineEvent{
PubKey: vertex,
}
// Add a duplicate channel open event.
channelEvents <- channelnotifier.OpenChannelEvent{
Channel: &channeldb.OpenChannel{
FundingOutpoint: chanPoint,
IdentityPub: pubKey,
},
}
},
expectedEvents: []eventType{peerOnlineEvent},
},
{
name: "Channel opened, peer already online",
generateEvents: func(channelEvents,
peerEvents chan<- interface{}) {
// Add a peer online event.
peerEvents <- peernotifier.PeerOnlineEvent{
PubKey: vertex,
}
// Add an open channel event
channelEvents <- channelnotifier.OpenChannelEvent{
Channel: &channeldb.OpenChannel{
FundingOutpoint: chanPoint,
IdentityPub: pubKey,
},
}
},
expectedEvents: []eventType{peerOnlineEvent},
},
{
name: "Channel opened, peer offline, closed",
generateEvents: func(channelEvents,
peerEvents chan<- interface{}) {
// Add an open channel event
channelEvents <- channelnotifier.OpenChannelEvent{
Channel: &channeldb.OpenChannel{
FundingOutpoint: chanPoint,
IdentityPub: pubKey,
},
}
// Add a peer online event.
peerEvents <- peernotifier.PeerOfflineEvent{
PubKey: vertex,
}
// Add a close channel event.
channelEvents <- channelnotifier.ClosedChannelEvent{
CloseSummary: &channeldb.ChannelCloseSummary{
ChanPoint: chanPoint,
},
}
},
expectedEvents: []eventType{peerOfflineEvent},
},
{
name: "Event after channel close not recorded",
generateEvents: func(channelEvents,
peerEvents chan<- interface{}) {
// Add an open channel event
channelEvents <- channelnotifier.OpenChannelEvent{
Channel: &channeldb.OpenChannel{
FundingOutpoint: chanPoint,
IdentityPub: pubKey,
},
}
// Add a close channel event.
channelEvents <- channelnotifier.ClosedChannelEvent{
CloseSummary: &channeldb.ChannelCloseSummary{
ChanPoint: chanPoint,
},
}
// Add a peer online event.
peerEvents <- peernotifier.PeerOfflineEvent{
PubKey: vertex,
}
},
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
// Create a store with the channels and online peers
// specified by the test.
store := NewChannelEventStore(&Config{})
// Create channels which represent the subscriptions
// we have to peer and client events.
channelEvents := make(chan interface{})
peerEvents := make(chan interface{})
store.wg.Add(1)
go store.consume(&subscriptions{
channelUpdates: channelEvents,
peerUpdates: peerEvents,
cancel: func() {},
testEventStore(t, gen, 1, map[route.Vertex]bool{
peer1: true,
})
})
// Add events to the store then kill the goroutine using
// store.Stop.
test.generateEvents(channelEvents, peerEvents)
store.Stop()
// Retrieve the eventLog for the channel and check that
// its contents are as expected.
eventLog, ok := store.channels[chanPoint]
if !ok {
t.Fatalf("Expected to find event store")
t.Run("duplicate channel open events", func(t *testing.T) {
gen := func(ctx *chanEventStoreTestCtx) {
ctx.sendChannelOpenedUpdate(pubKey, chan1)
ctx.sendChannelOpenedUpdate(pubKey, chan1)
ctx.peerEvent(peer1, true)
}
for i, e := range eventLog.events {
expectedType := test.expectedEvents[i]
if expectedType != e.eventType {
t.Fatalf("Expected type: %v, got: %v",
expectedType, e.eventType)
}
}
testEventStore(t, gen, 1, map[route.Vertex]bool{
peer1: true,
})
})
t.Run("peer online before channel created", func(t *testing.T) {
gen := func(ctx *chanEventStoreTestCtx) {
ctx.peerEvent(peer1, true)
ctx.sendChannelOpenedUpdate(pubKey, chan1)
}
testEventStore(t, gen, 1, map[route.Vertex]bool{
peer1: true,
})
})
t.Run("multiple channels for peer", func(t *testing.T) {
gen := func(ctx *chanEventStoreTestCtx) {
ctx.peerEvent(peer1, true)
ctx.sendChannelOpenedUpdate(pubKey, chan1)
ctx.peerEvent(peer1, false)
ctx.sendChannelOpenedUpdate(pubKey, chan2)
}
testEventStore(t, gen, 2, map[route.Vertex]bool{
peer1: false,
})
})
t.Run("multiple channels for peer, one closed", func(t *testing.T) {
gen := func(ctx *chanEventStoreTestCtx) {
ctx.peerEvent(peer1, true)
ctx.sendChannelOpenedUpdate(pubKey, chan1)
ctx.peerEvent(peer1, false)
ctx.sendChannelOpenedUpdate(pubKey, chan2)
ctx.closeChannel(chan1, pubKey)
ctx.peerEvent(peer1, true)
}
testEventStore(t, gen, 2, map[route.Vertex]bool{
peer1: true,
})
})
}
// testEventStore creates a new test contexts, generates a set of events for it
// and tests that it has the number of channels and online state for peers that
// we expect.
func testEventStore(t *testing.T, generateEvents func(*chanEventStoreTestCtx),
expectedChannels int, expectedPeers map[route.Vertex]bool) {
testCtx := newChanEventStoreTestCtx(t)
testCtx.start()
generateEvents(testCtx)
// Shutdown the store so that we can safely access the maps in our event
// store.
testCtx.stop()
require.Len(t, testCtx.store.channels, expectedChannels)
require.Equal(t, expectedPeers, testCtx.store.peers)
}
// TestGetLifetime tests the GetLifetime function for the cases where a channel
@ -319,45 +209,27 @@ func TestGetLifetime(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
// Create and empty events store for testing.
store := NewChannelEventStore(&Config{})
// Start goroutine which consumes GetLifespan requests.
store.wg.Add(1)
go store.consume(&subscriptions{
channelUpdates: make(chan interface{}),
peerUpdates: make(chan interface{}),
cancel: func() {},
})
// Stop the store's go routine.
defer store.Stop()
ctx := newChanEventStoreTestCtx(t)
// Add channel to eventStore if the test indicates that
// it should be present.
if test.channelFound {
store.channels[test.channelPoint] =
ctx.store.channels[test.channelPoint] =
&chanEventLog{
openedAt: test.opened,
closedAt: test.closed,
}
}
open, close, err := store.GetLifespan(test.channelPoint)
if test.expectedError != err {
t.Fatalf("Expected: %v, got: %v",
test.expectedError, err)
}
ctx.start()
if open != test.opened {
t.Errorf("Expected: %v, got %v",
test.opened, open)
}
open, close, err := ctx.store.GetLifespan(test.channelPoint)
require.Equal(t, test.expectedError, err)
if close != test.closed {
t.Errorf("Expected: %v, got %v",
test.closed, close)
}
require.Equal(t, test.opened, open)
require.Equal(t, test.closed, close)
ctx.stop()
})
}
}
@ -460,44 +332,29 @@ func TestGetUptime(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
// Set up event store with the events specified for the
// test and mocked time.
store := NewChannelEventStore(&Config{})
ctx := newChanEventStoreTestCtx(t)
// Start goroutine which consumes GetUptime requests.
store.wg.Add(1)
go store.consume(&subscriptions{
channelUpdates: make(chan interface{}),
peerUpdates: make(chan interface{}),
cancel: func() {},
})
// Stop the store's goroutine.
defer store.Stop()
// Add the channel to the store if it is intended to be
// found.
// If we're supposed to find the channel for this test,
// add events for it to the store.
if test.channelFound {
store.channels[test.channelPoint] = &chanEventLog{
eventLog := &chanEventLog{
events: test.events,
now: func() time.Time { return now },
openedAt: test.openedAt,
closedAt: test.closedAt,
}
ctx.store.channels[test.channelPoint] = eventLog
}
uptime, err := store.GetUptime(
ctx.start()
uptime, err := ctx.store.GetUptime(
test.channelPoint, test.startTime, test.endTime,
)
if test.expectedError != err {
t.Fatalf("Expected: %v, got: %v",
test.expectedError, err)
}
require.Equal(t, test.expectedError, err)
require.Equal(t, test.expectedUptime, uptime)
if uptime != test.expectedUptime {
t.Fatalf("Expected uptime percentage: %v, "+
"got %v", test.expectedUptime, uptime)
}
ctx.stop()
})
}
}
@ -507,58 +364,25 @@ func TestGetUptime(t *testing.T) {
// did not have an opened time set, and checks that an online event is set for
// peers that are online at the time that a channel is opened.
func TestAddChannel(t *testing.T) {
_, vertex, chanPoint := getTestChannel(t)
ctx := newChanEventStoreTestCtx(t)
ctx.start()
tests := []struct {
name string
// Create a channel for a peer that is not online yet.
_, _, channel1 := ctx.createChannel()
// peers maps peers to an online state.
peers map[route.Vertex]bool
// Get a set of values for another channel, but do not create it yet.
//
peer2, pubkey2, channel2 := ctx.newChannel()
ctx.peerEvent(peer2, true)
ctx.sendChannelOpenedUpdate(pubkey2, channel2)
expectedEvents []eventType
}{
{
name: "peer offline",
peers: make(map[route.Vertex]bool),
expectedEvents: []eventType{},
},
{
name: "peer online",
peers: map[route.Vertex]bool{
vertex: true,
},
expectedEvents: []eventType{peerOnlineEvent},
},
}
ctx.stop()
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
store := NewChannelEventStore(&Config{})
store.peers = test.peers
// Assert that our peer that was offline on connection has no events
// and our peer that was online on connection has one.
require.Len(t, ctx.store.channels[channel1].events, 0)
// Add channel to the store.
store.addChannel(chanPoint, vertex)
// Check that the eventLog is successfully added.
eventLog, ok := store.channels[chanPoint]
if !ok {
t.Fatalf("channel should be in store")
}
// Check that the eventLog contains the events we
// expect.
for i, e := range test.expectedEvents {
if e != eventLog.events[i].eventType {
t.Fatalf("expected: %v, got: %v",
e, eventLog.events[i].eventType)
}
}
// Ensure that open time is always set.
if eventLog.openedAt.IsZero() {
t.Fatalf("channel should have opened at set")
}
})
}
chan2Events := ctx.store.channels[channel2].events
require.Len(t, chan2Events, 1)
require.Equal(t, peerOnlineEvent, chan2Events[0].eventType)
}

@ -0,0 +1,215 @@
package chanfitness
import (
"math/big"
"testing"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/subscribe"
"github.com/stretchr/testify/require"
)
// timeout is the amount of time we allow our blocking test calls.
var timeout = time.Second
// chanEventStoreTestCtx is a helper struct which can be used to test the
// channel event store.
type chanEventStoreTestCtx struct {
t *testing.T
store *ChannelEventStore
channelSubscription *mockSubscription
peerSubscription *mockSubscription
// testVarIdx is an index which will be used to deterministically add
// channels and public keys to our test context. We use a single value
// for a single pubkey + channel combination because its actual value
// does not matter.
testVarIdx int
}
// newChanEventStoreTestCtx creates a test context which can be used to test
// the event store.
func newChanEventStoreTestCtx(t *testing.T) *chanEventStoreTestCtx {
testCtx := &chanEventStoreTestCtx{
t: t,
channelSubscription: newMockSubscription(t),
peerSubscription: newMockSubscription(t),
}
cfg := &Config{
SubscribeChannelEvents: func() (subscribe.Subscription, error) {
return testCtx.channelSubscription, nil
},
SubscribePeerEvents: func() (subscribe.Subscription, error) {
return testCtx.peerSubscription, nil
},
GetOpenChannels: func() ([]*channeldb.OpenChannel, error) {
return nil, nil
},
}
testCtx.store = NewChannelEventStore(cfg)
return testCtx
}
// start starts the test context's event store.
func (c *chanEventStoreTestCtx) start() {
require.NoError(c.t, c.store.Start())
}
// stop stops the channel event store's subscribe servers and the store itself.
func (c *chanEventStoreTestCtx) stop() {
c.store.Stop()
// Make sure that the cancel function was called for both of our
// subscription mocks.
c.channelSubscription.assertCancelled()
c.peerSubscription.assertCancelled()
}
// newChannel creates a new, unique test channel. Note that this function
// does not add it to the test event store, it just creates mocked values.
func (c *chanEventStoreTestCtx) newChannel() (route.Vertex, *btcec.PublicKey,
wire.OutPoint) {
// Create a pubkey for our channel peer.
pubKey := &btcec.PublicKey{
X: big.NewInt(int64(c.testVarIdx)),
Y: big.NewInt(int64(c.testVarIdx)),
Curve: btcec.S256(),
}
// Create vertex from our pubkey.
vertex, err := route.NewVertexFromBytes(pubKey.SerializeCompressed())
require.NoError(c.t, err)
// Create a channel point using our channel index, then increment it.
chanPoint := wire.OutPoint{
Hash: [chainhash.HashSize]byte{1, 2, 3},
Index: uint32(c.testVarIdx),
}
// Increment the index we use so that the next channel and pubkey we
// create will be unique.
c.testVarIdx++
return vertex, pubKey, chanPoint
}
// createChannel creates a new channel, notifies the event store that it has
// been created and returns the peer vertex, pubkey and channel point.
func (c *chanEventStoreTestCtx) createChannel() (route.Vertex, *btcec.PublicKey,
wire.OutPoint) {
vertex, pubKey, chanPoint := c.newChannel()
c.sendChannelOpenedUpdate(pubKey, chanPoint)
return vertex, pubKey, chanPoint
}
// closeChannel sends a close channel event to our subscribe server.
func (c *chanEventStoreTestCtx) closeChannel(channel wire.OutPoint,
peer *btcec.PublicKey) {
update := channelnotifier.ClosedChannelEvent{
CloseSummary: &channeldb.ChannelCloseSummary{
ChanPoint: channel,
RemotePub: peer,
},
}
c.channelSubscription.sendUpdate(update)
}
// peerEvent sends a peer online or offline event to the store for the peer
// provided.
func (c *chanEventStoreTestCtx) peerEvent(peer route.Vertex, online bool) {
var update interface{}
if online {
update = peernotifier.PeerOnlineEvent{PubKey: peer}
} else {
update = peernotifier.PeerOfflineEvent{PubKey: peer}
}
c.peerSubscription.sendUpdate(update)
}
// sendChannelOpenedUpdate notifies the test event store that a channel has
// been opened.
func (c *chanEventStoreTestCtx) sendChannelOpenedUpdate(pubkey *btcec.PublicKey,
channel wire.OutPoint) {
update := channelnotifier.OpenChannelEvent{
Channel: &channeldb.OpenChannel{
FundingOutpoint: channel,
IdentityPub: pubkey,
},
}
c.channelSubscription.sendUpdate(update)
}
// mockSubscription is a mock subscription client that blocks on sends into the
// updates channel. We use this mock rather than an actual subscribe client
// because they do not block, which makes tests race (because we have no way
// to guarantee that the test client consumes the update before shutdown).
type mockSubscription struct {
t *testing.T
updates chan interface{}
// Embed the subscription interface in this mock so that we satisfy it.
subscribe.Subscription
}
// newMockSubscription creates a mock subscription.
func newMockSubscription(t *testing.T) *mockSubscription {
return &mockSubscription{
t: t,
updates: make(chan interface{}),
}
}
// sendUpdate sends an update into our updates channel, mocking the dispatch of
// an update from a subscription server. This call will fail the test if the
// update is not consumed within our timeout.
func (m *mockSubscription) sendUpdate(update interface{}) {
select {
case m.updates <- update:
case <-time.After(timeout):
m.t.Fatalf("update: %v timeout", update)
}
}
// Updates returns the updates channel for the mock.
func (m *mockSubscription) Updates() <-chan interface{} {
return m.updates
}
// Cancel should be called in case the client no longer wants to subscribe for
// updates from the server.
func (m *mockSubscription) Cancel() {
close(m.updates)
}
// assertCancelled asserts that the cancel function has been called for this
// mock.
func (m *mockSubscription) assertCancelled() {
select {
case _, open := <-m.updates:
require.False(m.t, open, "subscription not cancelled")
case <-time.After(timeout):
m.t.Fatalf("assert cancelled timeout")
}
}