diff --git a/chanfitness/chaneventstore_test.go b/chanfitness/chaneventstore_test.go index 793fbb66..df7b34d3 100644 --- a/chanfitness/chaneventstore_test.go +++ b/chanfitness/chaneventstore_test.go @@ -2,17 +2,16 @@ package chanfitness import ( "errors" + "math/big" "testing" "time" "github.com/btcsuite/btcd/btcec" - "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" - "github.com/lightningnetwork/lnd/channelnotifier" - "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/subscribe" + "github.com/stretchr/testify/require" ) // TestStartStoreError tests the starting of the store in cases where the setup @@ -21,20 +20,18 @@ import ( func TestStartStoreError(t *testing.T) { // Ok and erroring subscribe functions are defined here to de-clutter // tests. - okSubscribeFunc := func() (*subscribe.Client, error) { - return &subscribe.Client{ - Cancel: func() {}, - }, nil + okSubscribeFunc := func() (subscribe.Subscription, error) { + return newMockSubscription(t), nil } - errSubscribeFunc := func() (*subscribe.Client, error) { + errSubscribeFunc := func() (subscribe.Subscription, error) { return nil, errors.New("intentional test err") } tests := []struct { name string - ChannelEvents func() (*subscribe.Client, error) - PeerEvents func() (*subscribe.Client, error) + ChannelEvents func() (subscribe.Subscription, error) + PeerEvents func() (subscribe.Subscription, error) GetChannels func() ([]*channeldb.OpenChannel, error) }{ { @@ -76,29 +73,6 @@ func TestStartStoreError(t *testing.T) { } } -// getTestChannel returns a non-zero peer pubKey, serialized pubKey and channel -// outpoint for testing. -func getTestChannel(t *testing.T) (*btcec.PublicKey, route.Vertex, - wire.OutPoint) { - - privKey, err := btcec.NewPrivateKey(btcec.S256()) - if err != nil { - t.Fatalf("Error getting pubkey: %v", err) - } - - pubKey, err := route.NewVertexFromBytes( - privKey.PubKey().SerializeCompressed(), - ) - if err != nil { - t.Fatalf("Could not create vertex: %v", err) - } - - return privKey.PubKey(), pubKey, wire.OutPoint{ - Hash: [chainhash.HashSize]byte{1, 2, 3}, - Index: 0, - } -} - // TestMonitorChannelEvents tests the store's handling of channel and peer // events. It tests for the unexpected cases where we receive a channel open for // an already known channel and but does not test for closing an unknown channel @@ -106,187 +80,103 @@ func getTestChannel(t *testing.T) (*btcec.PublicKey, route.Vertex, // through an eventLog which does not exist. This test does not test handling // of uptime and lifespan requests, as they are tested in their own tests. func TestMonitorChannelEvents(t *testing.T) { - pubKey, vertex, chanPoint := getTestChannel(t) + var ( + pubKey = &btcec.PublicKey{ + X: big.NewInt(0), + Y: big.NewInt(1), + Curve: btcec.S256(), + } - tests := []struct { - name string + chan1 = wire.OutPoint{Index: 1} + chan2 = wire.OutPoint{Index: 2} + ) - // generateEvents takes channels which represent the updates - // channels for subscription clients and passes events in the - // desired order. This function is intended to be blocking so - // that the test does not have a data race with event - // consumption, so the channels should not be buffered. - generateEvents func(channelEvents, - peerEvents chan<- interface{}) + peer1, err := route.NewVertexFromBytes(pubKey.SerializeCompressed()) + require.NoError(t, err) - // expectedEvents is the expected set of event types in the store. - expectedEvents []eventType - }{ - { - name: "Channel opened, peer comes online", - generateEvents: func(channelEvents, - peerEvents chan<- interface{}) { + t.Run("peer comes online after channel open", func(t *testing.T) { + gen := func(ctx *chanEventStoreTestCtx) { + ctx.sendChannelOpenedUpdate(pubKey, chan1) + ctx.peerEvent(peer1, true) + } - // Add an open channel event - channelEvents <- channelnotifier.OpenChannelEvent{ - Channel: &channeldb.OpenChannel{ - FundingOutpoint: chanPoint, - IdentityPub: pubKey, - }, - } - - // Add a peer online event. - peerEvents <- peernotifier.PeerOnlineEvent{ - PubKey: vertex, - } - }, - expectedEvents: []eventType{peerOnlineEvent}, - }, - { - name: "Duplicate channel open events", - generateEvents: func(channelEvents, - peerEvents chan<- interface{}) { - - // Add an open channel event - channelEvents <- channelnotifier.OpenChannelEvent{ - Channel: &channeldb.OpenChannel{ - FundingOutpoint: chanPoint, - IdentityPub: pubKey, - }, - } - - // Add a peer online event. - peerEvents <- peernotifier.PeerOnlineEvent{ - PubKey: vertex, - } - - // Add a duplicate channel open event. - channelEvents <- channelnotifier.OpenChannelEvent{ - Channel: &channeldb.OpenChannel{ - FundingOutpoint: chanPoint, - IdentityPub: pubKey, - }, - } - }, - expectedEvents: []eventType{peerOnlineEvent}, - }, - { - name: "Channel opened, peer already online", - generateEvents: func(channelEvents, - peerEvents chan<- interface{}) { - - // Add a peer online event. - peerEvents <- peernotifier.PeerOnlineEvent{ - PubKey: vertex, - } - - // Add an open channel event - channelEvents <- channelnotifier.OpenChannelEvent{ - Channel: &channeldb.OpenChannel{ - FundingOutpoint: chanPoint, - IdentityPub: pubKey, - }, - } - }, - expectedEvents: []eventType{peerOnlineEvent}, - }, - - { - name: "Channel opened, peer offline, closed", - generateEvents: func(channelEvents, - peerEvents chan<- interface{}) { - - // Add an open channel event - channelEvents <- channelnotifier.OpenChannelEvent{ - Channel: &channeldb.OpenChannel{ - FundingOutpoint: chanPoint, - IdentityPub: pubKey, - }, - } - - // Add a peer online event. - peerEvents <- peernotifier.PeerOfflineEvent{ - PubKey: vertex, - } - - // Add a close channel event. - channelEvents <- channelnotifier.ClosedChannelEvent{ - CloseSummary: &channeldb.ChannelCloseSummary{ - ChanPoint: chanPoint, - }, - } - }, - expectedEvents: []eventType{peerOfflineEvent}, - }, - { - name: "Event after channel close not recorded", - generateEvents: func(channelEvents, - peerEvents chan<- interface{}) { - - // Add an open channel event - channelEvents <- channelnotifier.OpenChannelEvent{ - Channel: &channeldb.OpenChannel{ - FundingOutpoint: chanPoint, - IdentityPub: pubKey, - }, - } - - // Add a close channel event. - channelEvents <- channelnotifier.ClosedChannelEvent{ - CloseSummary: &channeldb.ChannelCloseSummary{ - ChanPoint: chanPoint, - }, - } - - // Add a peer online event. - peerEvents <- peernotifier.PeerOfflineEvent{ - PubKey: vertex, - } - }, - }, - } - - for _, test := range tests { - test := test - - t.Run(test.name, func(t *testing.T) { - // Create a store with the channels and online peers - // specified by the test. - store := NewChannelEventStore(&Config{}) - - // Create channels which represent the subscriptions - // we have to peer and client events. - channelEvents := make(chan interface{}) - peerEvents := make(chan interface{}) - - store.wg.Add(1) - go store.consume(&subscriptions{ - channelUpdates: channelEvents, - peerUpdates: peerEvents, - cancel: func() {}, - }) - - // Add events to the store then kill the goroutine using - // store.Stop. - test.generateEvents(channelEvents, peerEvents) - store.Stop() - - // Retrieve the eventLog for the channel and check that - // its contents are as expected. - eventLog, ok := store.channels[chanPoint] - if !ok { - t.Fatalf("Expected to find event store") - } - - for i, e := range eventLog.events { - expectedType := test.expectedEvents[i] - if expectedType != e.eventType { - t.Fatalf("Expected type: %v, got: %v", - expectedType, e.eventType) - } - } + testEventStore(t, gen, 1, map[route.Vertex]bool{ + peer1: true, }) - } + }) + + t.Run("duplicate channel open events", func(t *testing.T) { + gen := func(ctx *chanEventStoreTestCtx) { + ctx.sendChannelOpenedUpdate(pubKey, chan1) + ctx.sendChannelOpenedUpdate(pubKey, chan1) + ctx.peerEvent(peer1, true) + } + + testEventStore(t, gen, 1, map[route.Vertex]bool{ + peer1: true, + }) + }) + + t.Run("peer online before channel created", func(t *testing.T) { + gen := func(ctx *chanEventStoreTestCtx) { + ctx.peerEvent(peer1, true) + ctx.sendChannelOpenedUpdate(pubKey, chan1) + } + + testEventStore(t, gen, 1, map[route.Vertex]bool{ + peer1: true, + }) + }) + + t.Run("multiple channels for peer", func(t *testing.T) { + gen := func(ctx *chanEventStoreTestCtx) { + ctx.peerEvent(peer1, true) + ctx.sendChannelOpenedUpdate(pubKey, chan1) + + ctx.peerEvent(peer1, false) + ctx.sendChannelOpenedUpdate(pubKey, chan2) + } + + testEventStore(t, gen, 2, map[route.Vertex]bool{ + peer1: false, + }) + }) + + t.Run("multiple channels for peer, one closed", func(t *testing.T) { + gen := func(ctx *chanEventStoreTestCtx) { + ctx.peerEvent(peer1, true) + ctx.sendChannelOpenedUpdate(pubKey, chan1) + + ctx.peerEvent(peer1, false) + ctx.sendChannelOpenedUpdate(pubKey, chan2) + + ctx.closeChannel(chan1, pubKey) + ctx.peerEvent(peer1, true) + } + + testEventStore(t, gen, 2, map[route.Vertex]bool{ + peer1: true, + }) + }) + +} + +// testEventStore creates a new test contexts, generates a set of events for it +// and tests that it has the number of channels and online state for peers that +// we expect. +func testEventStore(t *testing.T, generateEvents func(*chanEventStoreTestCtx), + expectedChannels int, expectedPeers map[route.Vertex]bool) { + + testCtx := newChanEventStoreTestCtx(t) + testCtx.start() + + generateEvents(testCtx) + + // Shutdown the store so that we can safely access the maps in our event + // store. + testCtx.stop() + require.Len(t, testCtx.store.channels, expectedChannels) + require.Equal(t, expectedPeers, testCtx.store.peers) } // TestGetLifetime tests the GetLifetime function for the cases where a channel @@ -319,45 +209,27 @@ func TestGetLifetime(t *testing.T) { test := test t.Run(test.name, func(t *testing.T) { - // Create and empty events store for testing. - store := NewChannelEventStore(&Config{}) - - // Start goroutine which consumes GetLifespan requests. - store.wg.Add(1) - go store.consume(&subscriptions{ - channelUpdates: make(chan interface{}), - peerUpdates: make(chan interface{}), - cancel: func() {}, - }) - - // Stop the store's go routine. - defer store.Stop() + ctx := newChanEventStoreTestCtx(t) // Add channel to eventStore if the test indicates that // it should be present. if test.channelFound { - store.channels[test.channelPoint] = + ctx.store.channels[test.channelPoint] = &chanEventLog{ openedAt: test.opened, closedAt: test.closed, } } - open, close, err := store.GetLifespan(test.channelPoint) - if test.expectedError != err { - t.Fatalf("Expected: %v, got: %v", - test.expectedError, err) - } + ctx.start() - if open != test.opened { - t.Errorf("Expected: %v, got %v", - test.opened, open) - } + open, close, err := ctx.store.GetLifespan(test.channelPoint) + require.Equal(t, test.expectedError, err) - if close != test.closed { - t.Errorf("Expected: %v, got %v", - test.closed, close) - } + require.Equal(t, test.opened, open) + require.Equal(t, test.closed, close) + + ctx.stop() }) } } @@ -460,44 +332,29 @@ func TestGetUptime(t *testing.T) { test := test t.Run(test.name, func(t *testing.T) { - // Set up event store with the events specified for the - // test and mocked time. - store := NewChannelEventStore(&Config{}) + ctx := newChanEventStoreTestCtx(t) - // Start goroutine which consumes GetUptime requests. - store.wg.Add(1) - go store.consume(&subscriptions{ - channelUpdates: make(chan interface{}), - peerUpdates: make(chan interface{}), - cancel: func() {}, - }) - - // Stop the store's goroutine. - defer store.Stop() - - // Add the channel to the store if it is intended to be - // found. + // If we're supposed to find the channel for this test, + // add events for it to the store. if test.channelFound { - store.channels[test.channelPoint] = &chanEventLog{ + eventLog := &chanEventLog{ events: test.events, now: func() time.Time { return now }, openedAt: test.openedAt, closedAt: test.closedAt, } + ctx.store.channels[test.channelPoint] = eventLog } - uptime, err := store.GetUptime( + ctx.start() + + uptime, err := ctx.store.GetUptime( test.channelPoint, test.startTime, test.endTime, ) - if test.expectedError != err { - t.Fatalf("Expected: %v, got: %v", - test.expectedError, err) - } + require.Equal(t, test.expectedError, err) + require.Equal(t, test.expectedUptime, uptime) - if uptime != test.expectedUptime { - t.Fatalf("Expected uptime percentage: %v, "+ - "got %v", test.expectedUptime, uptime) - } + ctx.stop() }) } } @@ -507,58 +364,25 @@ func TestGetUptime(t *testing.T) { // did not have an opened time set, and checks that an online event is set for // peers that are online at the time that a channel is opened. func TestAddChannel(t *testing.T) { - _, vertex, chanPoint := getTestChannel(t) + ctx := newChanEventStoreTestCtx(t) + ctx.start() - tests := []struct { - name string + // Create a channel for a peer that is not online yet. + _, _, channel1 := ctx.createChannel() - // peers maps peers to an online state. - peers map[route.Vertex]bool + // Get a set of values for another channel, but do not create it yet. + // + peer2, pubkey2, channel2 := ctx.newChannel() + ctx.peerEvent(peer2, true) + ctx.sendChannelOpenedUpdate(pubkey2, channel2) - expectedEvents []eventType - }{ - { - name: "peer offline", - peers: make(map[route.Vertex]bool), - expectedEvents: []eventType{}, - }, - { - name: "peer online", - peers: map[route.Vertex]bool{ - vertex: true, - }, - expectedEvents: []eventType{peerOnlineEvent}, - }, - } + ctx.stop() - for _, test := range tests { - test := test - t.Run(test.name, func(t *testing.T) { - store := NewChannelEventStore(&Config{}) - store.peers = test.peers + // Assert that our peer that was offline on connection has no events + // and our peer that was online on connection has one. + require.Len(t, ctx.store.channels[channel1].events, 0) - // Add channel to the store. - store.addChannel(chanPoint, vertex) - - // Check that the eventLog is successfully added. - eventLog, ok := store.channels[chanPoint] - if !ok { - t.Fatalf("channel should be in store") - } - - // Check that the eventLog contains the events we - // expect. - for i, e := range test.expectedEvents { - if e != eventLog.events[i].eventType { - t.Fatalf("expected: %v, got: %v", - e, eventLog.events[i].eventType) - } - } - - // Ensure that open time is always set. - if eventLog.openedAt.IsZero() { - t.Fatalf("channel should have opened at set") - } - }) - } + chan2Events := ctx.store.channels[channel2].events + require.Len(t, chan2Events, 1) + require.Equal(t, peerOnlineEvent, chan2Events[0].eventType) } diff --git a/chanfitness/chaneventstore_testctx_test.go b/chanfitness/chaneventstore_testctx_test.go new file mode 100644 index 00000000..86ff43a2 --- /dev/null +++ b/chanfitness/chaneventstore_testctx_test.go @@ -0,0 +1,215 @@ +package chanfitness + +import ( + "math/big" + "testing" + "time" + + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" + "github.com/lightningnetwork/lnd/peernotifier" + "github.com/lightningnetwork/lnd/routing/route" + "github.com/lightningnetwork/lnd/subscribe" + "github.com/stretchr/testify/require" +) + +// timeout is the amount of time we allow our blocking test calls. +var timeout = time.Second + +// chanEventStoreTestCtx is a helper struct which can be used to test the +// channel event store. +type chanEventStoreTestCtx struct { + t *testing.T + + store *ChannelEventStore + + channelSubscription *mockSubscription + peerSubscription *mockSubscription + + // testVarIdx is an index which will be used to deterministically add + // channels and public keys to our test context. We use a single value + // for a single pubkey + channel combination because its actual value + // does not matter. + testVarIdx int +} + +// newChanEventStoreTestCtx creates a test context which can be used to test +// the event store. +func newChanEventStoreTestCtx(t *testing.T) *chanEventStoreTestCtx { + testCtx := &chanEventStoreTestCtx{ + t: t, + channelSubscription: newMockSubscription(t), + peerSubscription: newMockSubscription(t), + } + + cfg := &Config{ + SubscribeChannelEvents: func() (subscribe.Subscription, error) { + return testCtx.channelSubscription, nil + }, + SubscribePeerEvents: func() (subscribe.Subscription, error) { + return testCtx.peerSubscription, nil + }, + GetOpenChannels: func() ([]*channeldb.OpenChannel, error) { + return nil, nil + }, + } + + testCtx.store = NewChannelEventStore(cfg) + + return testCtx +} + +// start starts the test context's event store. +func (c *chanEventStoreTestCtx) start() { + require.NoError(c.t, c.store.Start()) +} + +// stop stops the channel event store's subscribe servers and the store itself. +func (c *chanEventStoreTestCtx) stop() { + c.store.Stop() + + // Make sure that the cancel function was called for both of our + // subscription mocks. + c.channelSubscription.assertCancelled() + c.peerSubscription.assertCancelled() +} + +// newChannel creates a new, unique test channel. Note that this function +// does not add it to the test event store, it just creates mocked values. +func (c *chanEventStoreTestCtx) newChannel() (route.Vertex, *btcec.PublicKey, + wire.OutPoint) { + + // Create a pubkey for our channel peer. + pubKey := &btcec.PublicKey{ + X: big.NewInt(int64(c.testVarIdx)), + Y: big.NewInt(int64(c.testVarIdx)), + Curve: btcec.S256(), + } + + // Create vertex from our pubkey. + vertex, err := route.NewVertexFromBytes(pubKey.SerializeCompressed()) + require.NoError(c.t, err) + + // Create a channel point using our channel index, then increment it. + chanPoint := wire.OutPoint{ + Hash: [chainhash.HashSize]byte{1, 2, 3}, + Index: uint32(c.testVarIdx), + } + + // Increment the index we use so that the next channel and pubkey we + // create will be unique. + c.testVarIdx++ + + return vertex, pubKey, chanPoint +} + +// createChannel creates a new channel, notifies the event store that it has +// been created and returns the peer vertex, pubkey and channel point. +func (c *chanEventStoreTestCtx) createChannel() (route.Vertex, *btcec.PublicKey, + wire.OutPoint) { + + vertex, pubKey, chanPoint := c.newChannel() + c.sendChannelOpenedUpdate(pubKey, chanPoint) + + return vertex, pubKey, chanPoint +} + +// closeChannel sends a close channel event to our subscribe server. +func (c *chanEventStoreTestCtx) closeChannel(channel wire.OutPoint, + peer *btcec.PublicKey) { + + update := channelnotifier.ClosedChannelEvent{ + CloseSummary: &channeldb.ChannelCloseSummary{ + ChanPoint: channel, + RemotePub: peer, + }, + } + + c.channelSubscription.sendUpdate(update) +} + +// peerEvent sends a peer online or offline event to the store for the peer +// provided. +func (c *chanEventStoreTestCtx) peerEvent(peer route.Vertex, online bool) { + var update interface{} + if online { + update = peernotifier.PeerOnlineEvent{PubKey: peer} + } else { + update = peernotifier.PeerOfflineEvent{PubKey: peer} + } + + c.peerSubscription.sendUpdate(update) +} + +// sendChannelOpenedUpdate notifies the test event store that a channel has +// been opened. +func (c *chanEventStoreTestCtx) sendChannelOpenedUpdate(pubkey *btcec.PublicKey, + channel wire.OutPoint) { + + update := channelnotifier.OpenChannelEvent{ + Channel: &channeldb.OpenChannel{ + FundingOutpoint: channel, + IdentityPub: pubkey, + }, + } + + c.channelSubscription.sendUpdate(update) +} + +// mockSubscription is a mock subscription client that blocks on sends into the +// updates channel. We use this mock rather than an actual subscribe client +// because they do not block, which makes tests race (because we have no way +// to guarantee that the test client consumes the update before shutdown). +type mockSubscription struct { + t *testing.T + updates chan interface{} + + // Embed the subscription interface in this mock so that we satisfy it. + subscribe.Subscription +} + +// newMockSubscription creates a mock subscription. +func newMockSubscription(t *testing.T) *mockSubscription { + return &mockSubscription{ + t: t, + updates: make(chan interface{}), + } +} + +// sendUpdate sends an update into our updates channel, mocking the dispatch of +// an update from a subscription server. This call will fail the test if the +// update is not consumed within our timeout. +func (m *mockSubscription) sendUpdate(update interface{}) { + select { + case m.updates <- update: + + case <-time.After(timeout): + m.t.Fatalf("update: %v timeout", update) + } +} + +// Updates returns the updates channel for the mock. +func (m *mockSubscription) Updates() <-chan interface{} { + return m.updates +} + +// Cancel should be called in case the client no longer wants to subscribe for +// updates from the server. +func (m *mockSubscription) Cancel() { + close(m.updates) +} + +// assertCancelled asserts that the cancel function has been called for this +// mock. +func (m *mockSubscription) assertCancelled() { + select { + case _, open := <-m.updates: + require.False(m.t, open, "subscription not cancelled") + + case <-time.After(timeout): + m.t.Fatalf("assert cancelled timeout") + } +}