From 80040d9d961db9c2d40d3bd10d70792141228e75 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 15 Mar 2019 02:33:47 -0700 Subject: [PATCH] watchtower/wtclient/client_test: adds client-server upload test --- watchtower/wtclient/client_test.go | 1118 ++++++++++++++++++++++++++++ 1 file changed, 1118 insertions(+) create mode 100644 watchtower/wtclient/client_test.go diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go new file mode 100644 index 00000000..dba4275e --- /dev/null +++ b/watchtower/wtclient/client_test.go @@ -0,0 +1,1118 @@ +// +build dev + +package wtclient_test + +import ( + "encoding/binary" + "net" + "sync" + "testing" + "time" + + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + "github.com/lightningnetwork/lnd/input" + "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/watchtower/blob" + "github.com/lightningnetwork/lnd/watchtower/wtclient" + "github.com/lightningnetwork/lnd/watchtower/wtdb" + "github.com/lightningnetwork/lnd/watchtower/wtmock" + "github.com/lightningnetwork/lnd/watchtower/wtpolicy" + "github.com/lightningnetwork/lnd/watchtower/wtserver" +) + +const csvDelay uint32 = 144 + +var ( + revPrivBytes = []byte{ + 0x8f, 0x4b, 0x51, 0x83, 0xa9, 0x34, 0xbd, 0x5f, + 0x74, 0x6c, 0x9d, 0x5c, 0xae, 0x88, 0x2d, 0x31, + 0x06, 0x90, 0xdd, 0x8c, 0x9b, 0x31, 0xbc, 0xd1, + 0x78, 0x91, 0x88, 0x2a, 0xf9, 0x74, 0xa0, 0xef, + } + + toLocalPrivBytes = []byte{ + 0xde, 0x17, 0xc1, 0x2f, 0xdc, 0x1b, 0xc0, 0xc6, + 0x59, 0x5d, 0xf9, 0xc1, 0x3e, 0x89, 0xbc, 0x6f, + 0x01, 0x85, 0x45, 0x76, 0x26, 0xce, 0x9c, 0x55, + 0x3b, 0xc9, 0xec, 0x3d, 0xd8, 0x8b, 0xac, 0xa8, + } + + toRemotePrivBytes = []byte{ + 0x28, 0x59, 0x6f, 0x36, 0xb8, 0x9f, 0x19, 0x5d, + 0xcb, 0x07, 0x48, 0x8a, 0xe5, 0x89, 0x71, 0x74, + 0x70, 0x4c, 0xff, 0x1e, 0x9c, 0x00, 0x93, 0xbe, + 0xe2, 0x2e, 0x68, 0x08, 0x4c, 0xb4, 0x0f, 0x4f, + } + + // addr is the server's reward address given to watchtower clients. + addr, _ = btcutil.DecodeAddress( + "mrX9vMRYLfVy1BnZbc5gZjuyaqH3ZW2ZHz", &chaincfg.TestNet3Params, + ) + + addrScript, _ = txscript.PayToAddrScript(addr) +) + +// randPrivKey generates a new secp keypair, and returns the public key. +func randPrivKey(t *testing.T) *btcec.PrivateKey { + t.Helper() + + sk, err := btcec.NewPrivateKey(btcec.S256()) + if err != nil { + t.Fatalf("unable to generate pubkey: %v", err) + } + + return sk +} + +type mockNet struct { + mu sync.RWMutex + connCallback func(wtserver.Peer) +} + +func newMockNet(cb func(wtserver.Peer)) *mockNet { + return &mockNet{ + connCallback: cb, + } +} + +func (m *mockNet) Dial(network string, address string) (net.Conn, error) { + return nil, nil +} + +func (m *mockNet) LookupHost(host string) ([]string, error) { + panic("not implemented") +} + +func (m *mockNet) LookupSRV(service string, proto string, name string) (string, []*net.SRV, error) { + panic("not implemented") +} + +func (m *mockNet) ResolveTCPAddr(network string, address string) (*net.TCPAddr, error) { + panic("not implemented") +} + +func (m *mockNet) AuthDial(localPriv *btcec.PrivateKey, netAddr *lnwire.NetAddress, + dialer func(string, string) (net.Conn, error)) (wtserver.Peer, error) { + + localPk := localPriv.PubKey() + localAddr := &net.TCPAddr{ + IP: net.IP{0x32, 0x31, 0x30, 0x29}, + Port: 36723, + } + + localPeer, remotePeer := wtmock.NewMockConn( + localPk, netAddr.IdentityKey, localAddr, netAddr.Address, 0, + ) + + m.mu.RLock() + m.connCallback(remotePeer) + m.mu.RUnlock() + + return localPeer, nil +} + +func (m *mockNet) setConnCallback(cb func(wtserver.Peer)) { + m.mu.Lock() + defer m.mu.Unlock() + m.connCallback = cb +} + +type mockChannel struct { + mu sync.Mutex + commitHeight uint64 + retributions map[uint64]*lnwallet.BreachRetribution + localBalance lnwire.MilliSatoshi + remoteBalance lnwire.MilliSatoshi + + revSK *btcec.PrivateKey + revPK *btcec.PublicKey + revKeyLoc keychain.KeyLocator + + toRemoteSK *btcec.PrivateKey + toRemotePK *btcec.PublicKey + toRemoteKeyLoc keychain.KeyLocator + + toLocalPK *btcec.PublicKey // only need to generate to-local script + + dustLimit lnwire.MilliSatoshi + csvDelay uint32 +} + +func newMockChannel(t *testing.T, signer *wtmock.MockSigner, + localAmt, remoteAmt lnwire.MilliSatoshi) *mockChannel { + + // Generate the revocation, to-local, and to-remote keypairs. + revSK := randPrivKey(t) + revPK := revSK.PubKey() + + toLocalSK := randPrivKey(t) + toLocalPK := toLocalSK.PubKey() + + toRemoteSK := randPrivKey(t) + toRemotePK := toRemoteSK.PubKey() + + // Register the revocation secret key and the to-remote secret key with + // the signer. We will not need to sign with the to-local key, as this + // is to be known only by the counterparty. + revKeyLoc := signer.AddPrivKey(revSK) + toRemoteKeyLoc := signer.AddPrivKey(toRemoteSK) + + c := &mockChannel{ + retributions: make(map[uint64]*lnwallet.BreachRetribution), + localBalance: localAmt, + remoteBalance: remoteAmt, + revSK: revSK, + revPK: revPK, + revKeyLoc: revKeyLoc, + toLocalPK: toLocalPK, + toRemoteSK: toRemoteSK, + toRemotePK: toRemotePK, + toRemoteKeyLoc: toRemoteKeyLoc, + dustLimit: 546000, + csvDelay: 144, + } + + // Create the initial remote commitment with the initial balances. + c.createRemoteCommitTx(t) + + return c +} + +func (c *mockChannel) createRemoteCommitTx(t *testing.T) { + t.Helper() + + // Construct the to-local witness script. + toLocalScript, err := input.CommitScriptToSelf( + c.csvDelay, c.toLocalPK, c.revPK, + ) + if err != nil { + t.Fatalf("unable to create to-local script: %v", err) + } + + // Compute the to-local witness script hash. + toLocalScriptHash, err := input.WitnessScriptHash(toLocalScript) + if err != nil { + t.Fatalf("unable to create to-local witness script hash: %v", err) + } + + // Compute the to-remote witness script hash. + toRemoteScriptHash, err := input.CommitScriptUnencumbered(c.toRemotePK) + if err != nil { + t.Fatalf("unable to create to-remote script: %v", err) + } + + // Construct the remote commitment txn, containing the to-local and + // to-remote outputs. The balances are flipped since the transaction is + // from the PoV of the remote party. We don't need any inputs for this + // test. We increment the version with the commit height to ensure that + // all commitment transactions are unique even if the same distribution + // of funds is used more than once. + commitTxn := &wire.MsgTx{ + Version: int32(c.commitHeight + 1), + } + + var ( + toLocalSignDesc *input.SignDescriptor + toRemoteSignDesc *input.SignDescriptor + ) + + var outputIndex int + if c.remoteBalance >= c.dustLimit { + commitTxn.TxOut = append(commitTxn.TxOut, &wire.TxOut{ + Value: int64(c.remoteBalance.ToSatoshis()), + PkScript: toLocalScriptHash, + }) + + // Create the sign descriptor used to sign for the to-local + // input. + toLocalSignDesc = &input.SignDescriptor{ + KeyDesc: keychain.KeyDescriptor{ + KeyLocator: c.revKeyLoc, + PubKey: c.revPK, + }, + WitnessScript: toLocalScript, + Output: commitTxn.TxOut[outputIndex], + HashType: txscript.SigHashAll, + } + outputIndex++ + } + if c.localBalance >= c.dustLimit { + commitTxn.TxOut = append(commitTxn.TxOut, &wire.TxOut{ + Value: int64(c.localBalance.ToSatoshis()), + PkScript: toRemoteScriptHash, + }) + + // Create the sign descriptor used to sign for the to-remote + // input. + toRemoteSignDesc = &input.SignDescriptor{ + KeyDesc: keychain.KeyDescriptor{ + KeyLocator: c.toRemoteKeyLoc, + PubKey: c.toRemotePK, + }, + WitnessScript: toRemoteScriptHash, + Output: commitTxn.TxOut[outputIndex], + HashType: txscript.SigHashAll, + } + outputIndex++ + } + + txid := commitTxn.TxHash() + + var ( + toLocalOutPoint wire.OutPoint + toRemoteOutPoint wire.OutPoint + ) + + outputIndex = 0 + if toLocalSignDesc != nil { + toLocalOutPoint = wire.OutPoint{ + Hash: txid, + Index: uint32(outputIndex), + } + outputIndex++ + } + if toRemoteSignDesc != nil { + toRemoteOutPoint = wire.OutPoint{ + Hash: txid, + Index: uint32(outputIndex), + } + outputIndex++ + } + + commitKeyRing := &lnwallet.CommitmentKeyRing{ + RevocationKey: c.revPK, + NoDelayKey: c.toLocalPK, + DelayKey: c.toRemotePK, + } + + retribution := &lnwallet.BreachRetribution{ + BreachTransaction: commitTxn, + RevokedStateNum: c.commitHeight, + KeyRing: commitKeyRing, + RemoteDelay: c.csvDelay, + LocalOutpoint: toRemoteOutPoint, + LocalOutputSignDesc: toRemoteSignDesc, + RemoteOutpoint: toLocalOutPoint, + RemoteOutputSignDesc: toLocalSignDesc, + } + + c.retributions[c.commitHeight] = retribution + c.commitHeight++ +} + +// advanceState creates the next channel state and retribution without altering +// channel balances. +func (c *mockChannel) advanceState(t *testing.T) { + c.mu.Lock() + defer c.mu.Unlock() + + c.createRemoteCommitTx(t) +} + +// sendPayment creates the next channel state and retribution after transferring +// amt to the remote party. +func (c *mockChannel) sendPayment(t *testing.T, amt lnwire.MilliSatoshi) { + t.Helper() + + c.mu.Lock() + defer c.mu.Unlock() + + if c.localBalance < amt { + t.Fatalf("insufficient funds to send, need: %v, have: %v", + amt, c.localBalance) + } + + c.localBalance -= amt + c.remoteBalance += amt + c.createRemoteCommitTx(t) +} + +// receivePayment creates the next channel state and retribution after +// transferring amt to the local party. +func (c *mockChannel) receivePayment(t *testing.T, amt lnwire.MilliSatoshi) { + t.Helper() + + c.mu.Lock() + defer c.mu.Unlock() + + if c.remoteBalance < amt { + t.Fatalf("insufficient funds to recv, need: %v, have: %v", + amt, c.remoteBalance) + } + + c.localBalance += amt + c.remoteBalance -= amt + c.createRemoteCommitTx(t) +} + +// getState retrieves the channel's commitment and retribution at state i. +func (c *mockChannel) getState(i uint64) (*wire.MsgTx, *lnwallet.BreachRetribution) { + c.mu.Lock() + defer c.mu.Unlock() + + retribution := c.retributions[i] + + return retribution.BreachTransaction, retribution +} + +type testHarness struct { + t *testing.T + cfg harnessCfg + signer *wtmock.MockSigner + capacity lnwire.MilliSatoshi + clientDB *wtmock.ClientDB + clientCfg *wtclient.Config + client wtclient.Client + serverDB *wtdb.MockDB + serverCfg *wtserver.Config + server *wtserver.Server + net *mockNet + + mu sync.Mutex + channels map[lnwire.ChannelID]*mockChannel +} + +type harnessCfg struct { + localBalance lnwire.MilliSatoshi + remoteBalance lnwire.MilliSatoshi + policy wtpolicy.Policy + noRegisterChan0 bool +} + +func newHarness(t *testing.T, cfg harnessCfg) *testHarness { + towerAddrStr := "18.28.243.2:9911" + towerTCPAddr, err := net.ResolveTCPAddr("tcp", towerAddrStr) + if err != nil { + t.Fatalf("Unable to resolve tower TCP addr: %v", err) + } + + privKey, err := btcec.NewPrivateKey(btcec.S256()) + if err != nil { + t.Fatalf("Unable to generate tower private key: %v", err) + } + + towerPubKey := privKey.PubKey() + + towerAddr := &lnwire.NetAddress{ + IdentityKey: towerPubKey, + Address: towerTCPAddr, + } + + const timeout = 200 * time.Millisecond + serverDB := wtdb.NewMockDB() + + serverCfg := &wtserver.Config{ + DB: serverDB, + ReadTimeout: timeout, + WriteTimeout: timeout, + NewAddress: func() (btcutil.Address, error) { + return addr, nil + }, + } + + server, err := wtserver.New(serverCfg) + if err != nil { + t.Fatalf("unable to create wtserver: %v", err) + } + + signer := wtmock.NewMockSigner() + mockNet := newMockNet(server.InboundPeerConnected) + clientDB := wtmock.NewClientDB() + + clientCfg := &wtclient.Config{ + Signer: signer, + Dial: func(string, string) (net.Conn, error) { + return nil, nil + }, + DB: clientDB, + AuthDial: mockNet.AuthDial, + PrivateTower: towerAddr, + Policy: cfg.policy, + NewAddress: func() ([]byte, error) { + return addrScript, nil + }, + ReadTimeout: timeout, + WriteTimeout: timeout, + MinBackoff: time.Millisecond, + MaxBackoff: 10 * time.Millisecond, + } + client, err := wtclient.New(clientCfg) + if err != nil { + t.Fatalf("Unable to create wtclient: %v", err) + } + + if err := server.Start(); err != nil { + t.Fatalf("Unable to start wtserver: %v", err) + } + + if err = client.Start(); err != nil { + server.Stop() + t.Fatalf("Unable to start wtclient: %v", err) + } + + h := &testHarness{ + t: t, + cfg: cfg, + signer: signer, + capacity: cfg.localBalance + cfg.remoteBalance, + clientDB: clientDB, + clientCfg: clientCfg, + client: client, + serverDB: serverDB, + serverCfg: serverCfg, + server: server, + net: mockNet, + channels: make(map[lnwire.ChannelID]*mockChannel), + } + + h.makeChannel(0, h.cfg.localBalance, h.cfg.remoteBalance) + if !cfg.noRegisterChan0 { + h.registerChannel(0) + } + + return h +} + +// startServer creates a new server using the harness's current serverCfg and +// starts it after pointing the mockNet's callback to the new server. +func (h *testHarness) startServer() { + h.t.Helper() + + var err error + h.server, err = wtserver.New(h.serverCfg) + if err != nil { + h.t.Fatalf("unable to create wtserver: %v", err) + } + + h.net.setConnCallback(h.server.InboundPeerConnected) + + if err := h.server.Start(); err != nil { + h.t.Fatalf("unable to start wtserver: %v", err) + } +} + +// startClient creates a new server using the harness's current clientCf and +// starts it. +func (h *testHarness) startClient() { + h.t.Helper() + + var err error + h.client, err = wtclient.New(h.clientCfg) + if err != nil { + h.t.Fatalf("unable to create wtclient: %v", err) + } + if err := h.client.Start(); err != nil { + h.t.Fatalf("unable to start wtclient: %v", err) + } +} + +// chanIDFromInt creates a unique channel id given a unique integral id. +func chanIDFromInt(id uint64) lnwire.ChannelID { + var chanID lnwire.ChannelID + binary.BigEndian.PutUint64(chanID[:8], id) + return chanID +} + +// makeChannel creates new channel with id, using the localAmt and remoteAmt as +// the starting balances. The channel will be available by using h.channel(id). +// +// NOTE: The method fails if channel for id already exists. +func (h *testHarness) makeChannel(id uint64, + localAmt, remoteAmt lnwire.MilliSatoshi) { + + h.t.Helper() + + chanID := chanIDFromInt(id) + c := newMockChannel(h.t, h.signer, localAmt, remoteAmt) + + c.mu.Lock() + _, ok := h.channels[chanID] + if !ok { + h.channels[chanID] = c + } + c.mu.Unlock() + + if ok { + h.t.Fatalf("channel %d already created", id) + } +} + +// channel retrieves the channel corresponding to id. +// +// NOTE: The method fails if a channel for id does not exist. +func (h *testHarness) channel(id uint64) *mockChannel { + h.t.Helper() + + h.mu.Lock() + c, ok := h.channels[chanIDFromInt(id)] + h.mu.Unlock() + if !ok { + h.t.Fatalf("unable to fetch channel %d", id) + } + + return c +} + +// registerChannel registers the channel identified by id with the client. +func (h *testHarness) registerChannel(id uint64) { + h.t.Helper() + + chanID := chanIDFromInt(id) + err := h.client.RegisterChannel(chanID) + if err != nil { + h.t.Fatalf("unable to register channel %d: %v", id, err) + } +} + +// advanceChannelN calls advanceState on the channel identified by id the number +// of provided times and returns the breach hints corresponding to the new +// states. +func (h *testHarness) advanceChannelN(id uint64, n int) []wtdb.BreachHint { + h.t.Helper() + + channel := h.channel(id) + + var hints []wtdb.BreachHint + for i := uint64(0); i < uint64(n); i++ { + channel.advanceState(h.t) + commitTx, _ := h.channel(id).getState(i) + breachTxID := commitTx.TxHash() + hints = append(hints, wtdb.NewBreachHintFromHash(&breachTxID)) + } + + return hints +} + +// backupStates instructs the channel identified by id to send backups to the +// client for states in the range [to, from). +func (h *testHarness) backupStates(id, from, to uint64, expErr error) { + h.t.Helper() + + for i := from; i < to; i++ { + h.backupState(id, i, expErr) + } +} + +// backupStates instructs the channel identified by id to send a backup for +// state i. +func (h *testHarness) backupState(id, i uint64, expErr error) { + _, retribution := h.channel(id).getState(i) + + chanID := chanIDFromInt(id) + err := h.client.BackupState(&chanID, retribution) + if err != expErr { + h.t.Fatalf("back error mismatch, want: %v, got: %v", + expErr, err) + } +} + +// sendPayments instructs the channel identified by id to send amt to the remote +// party for each state in from-to times and returns the breach hints for states +// [from, to). +func (h *testHarness) sendPayments(id, from, to uint64, + amt lnwire.MilliSatoshi) []wtdb.BreachHint { + + h.t.Helper() + + channel := h.channel(id) + + var hints []wtdb.BreachHint + for i := from; i < to; i++ { + h.channel(id).sendPayment(h.t, amt) + commitTx, _ := channel.getState(i) + breachTxID := commitTx.TxHash() + hints = append(hints, wtdb.NewBreachHintFromHash(&breachTxID)) + } + + return hints +} + +// receivePayment instructs the channel identified by id to recv amt from the +// remote party for each state in from-to times and returns the breach hints for +// states [from, to). +func (h *testHarness) recvPayments(id, from, to uint64, + amt lnwire.MilliSatoshi) []wtdb.BreachHint { + + h.t.Helper() + + channel := h.channel(id) + + var hints []wtdb.BreachHint + for i := from; i < to; i++ { + channel.receivePayment(h.t, amt) + commitTx, _ := channel.getState(i) + breachTxID := commitTx.TxHash() + hints = append(hints, wtdb.NewBreachHintFromHash(&breachTxID)) + } + + return hints +} + +// waitServerUpdates blocks until the breach hints provided all appear in the +// watchtower's database or the timeout expires. This is used to test that the +// client in fact sends the updates to the server, even if it is offline. +func (h *testHarness) waitServerUpdates(hints []wtdb.BreachHint, + timeout time.Duration) { + + h.t.Helper() + + // If no breach hints are provided, we will wait out the full timeout to + // assert that no updates appear. + wantUpdates := len(hints) > 0 + + hintSet := make(map[wtdb.BreachHint]struct{}) + for _, hint := range hints { + hintSet[hint] = struct{}{} + } + + if len(hints) != len(hintSet) { + h.t.Fatalf("breach hints are not unique, list-len: %d "+ + "set-len: %d", len(hints), len(hintSet)) + } + + // Closure to assert the server's matches are consistent with the hint + // set. + serverHasHints := func(matches []wtdb.Match) bool { + if len(hintSet) != len(matches) { + return false + } + + for _, match := range matches { + if _, ok := hintSet[match.Hint]; ok { + continue + } + + h.t.Fatalf("match %v in db is not in hint set", + match.Hint) + } + + return true + } + + failTimeout := time.After(timeout) + for { + select { + case <-time.After(time.Second): + matches, err := h.serverDB.QueryMatches(hints) + switch { + case err != nil: + h.t.Fatalf("unable to query for hints: %v", err) + + case wantUpdates && serverHasHints(matches): + return + + case wantUpdates: + h.t.Logf("Received %d/%d\n", len(matches), + len(hints)) + } + + case <-failTimeout: + matches, err := h.serverDB.QueryMatches(hints) + switch { + case err != nil: + h.t.Fatalf("unable to query for hints: %v", err) + + case serverHasHints(matches): + return + + default: + h.t.Fatalf("breach hints not received, only "+ + "got %d/%d", len(matches), len(hints)) + } + } + } +} + +const ( + localBalance = lnwire.MilliSatoshi(100000000) + remoteBalance = lnwire.MilliSatoshi(200000000) +) + +type clientTest struct { + name string + cfg harnessCfg + fn func(*testHarness) +} + +var clientTests = []clientTest{ + { + // Asserts that client will return the ErrUnregisteredChannel + // error when trying to backup states for a channel that has not + // been registered (and received it's pkscript). + name: "backup unregistered channel", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + BlobType: blob.TypeDefault, + MaxUpdates: 20000, + SweepFeeRate: 1, + }, + noRegisterChan0: true, + }, + fn: func(h *testHarness) { + const ( + numUpdates = 5 + chanID = 0 + ) + + // Advance the channel and backup the retributions. We + // expect ErrUnregisteredChannel to be returned since + // the channel was not registered during harness + // creation. + h.advanceChannelN(chanID, numUpdates) + h.backupStates( + chanID, 0, numUpdates, + wtclient.ErrUnregisteredChannel, + ) + }, + }, + { + // Asserts that the client returns an ErrClientExiting when + // trying to backup channels after the Stop method has been + // called. + name: "backup after stop", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + BlobType: blob.TypeDefault, + MaxUpdates: 20000, + SweepFeeRate: 1, + }, + }, + fn: func(h *testHarness) { + const ( + numUpdates = 5 + chanID = 0 + ) + + // Stop the client, subsequent backups should fail. + h.client.Stop() + + // Advance the channel and try to back up the states. We + // expect ErrClientExiting to be returned from + // BackupState. + h.advanceChannelN(chanID, numUpdates) + h.backupStates( + chanID, 0, numUpdates, + wtclient.ErrClientExiting, + ) + }, + }, + { + // Asserts that the client will continue to back up all states + // that have previously been enqueued before it finishes + // exiting. + name: "backup reliable flush", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + BlobType: blob.TypeDefault, + MaxUpdates: 5, + SweepFeeRate: 1, + }, + }, + fn: func(h *testHarness) { + const ( + numUpdates = 5 + chanID = 0 + ) + + // Generate numUpdates retributions and back them up to + // the tower. + hints := h.advanceChannelN(chanID, numUpdates) + h.backupStates(chanID, 0, numUpdates, nil) + + // Stop the client in the background, to assert the + // pipeline is always flushed before it exits. + go h.client.Stop() + + // Wait for all of the updates to be populated in the + // server's database. + h.waitServerUpdates(hints, time.Second) + }, + }, + { + // Assert that the client will not send out backups for states + // whose justice transactions are ineligible for backup, e.g. + // creating dust outputs. + name: "backup dust ineligible", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + BlobType: blob.TypeDefault, + MaxUpdates: 20000, + SweepFeeRate: 1000000, // high sweep fee creates dust + }, + }, + fn: func(h *testHarness) { + const ( + numUpdates = 5 + chanID = 0 + ) + + // Create the retributions and queue them for backup. + h.advanceChannelN(chanID, numUpdates) + h.backupStates(chanID, 0, numUpdates, nil) + + // Ensure that no updates are received by the server, + // since they should all be marked as ineligible. + h.waitServerUpdates(nil, time.Second) + }, + }, + { + // Verifies that the client will properly retransmit a committed + // state update to the watchtower after a restart if the update + // was not acked while the client was active last. + name: "committed update restart", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + BlobType: blob.TypeDefault, + MaxUpdates: 20000, + SweepFeeRate: 1, + }, + }, + fn: func(h *testHarness) { + const ( + numUpdates = 5 + chanID = 0 + ) + + hints := h.advanceChannelN(0, numUpdates) + + var numSent uint64 + + // Add the first two states to the client's pipeline. + h.backupStates(chanID, 0, 2, nil) + numSent = 2 + + // Wait for both to be reflected in the server's + // database. + h.waitServerUpdates(hints[:numSent], time.Second) + + // Now, restart the server and prevent it from acking + // state updates. + h.server.Stop() + h.serverCfg.NoAckUpdates = true + h.startServer() + defer h.server.Stop() + + // Send the next state update to the tower. Since the + // tower isn't acking state updates, we expect this + // update to be committed and sent by the session queue, + // but it will never receive an ack. + h.backupState(chanID, numSent, nil) + numSent++ + + // Force quit the client to abort the state updates it + // has queued. The sleep ensures that the session queues + // have enough time to commit the state updates before + // the client is killed. + time.Sleep(time.Second) + h.client.ForceQuit() + + // Restart the server and allow it to ack the updates + // after the client retransmits the unacked update. + h.server.Stop() + h.serverCfg.NoAckUpdates = false + h.startServer() + defer h.server.Stop() + + // Restart the client and allow it to process the + // committed update. + h.startClient() + defer h.client.ForceQuit() + + // Wait for the committed update to be accepted by the + // tower. + h.waitServerUpdates(hints[:numSent], time.Second) + + // Finally, send the rest of the updates and wait for + // the tower to receive the remaining states. + h.backupStates(chanID, numSent, numUpdates, nil) + + // Wait for all of the updates to be populated in the + // server's database. + h.waitServerUpdates(hints, time.Second) + + }, + }, + { + // Asserts that the client will continue to retry sending state + // updates if it doesn't receive an ack from the server. The + // client is expected to flush everything in its in-memory + // pipeline once the server begins sending acks again. + name: "no ack from server", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + BlobType: blob.TypeDefault, + MaxUpdates: 5, + SweepFeeRate: 1, + }, + }, + fn: func(h *testHarness) { + const ( + numUpdates = 100 + chanID = 0 + ) + + // Generate the retributions that will be backed up. + hints := h.advanceChannelN(chanID, numUpdates) + + // Restart the server and prevent it from acking state + // updates. + h.server.Stop() + h.serverCfg.NoAckUpdates = true + h.startServer() + defer h.server.Stop() + + // Now, queue the retributions for backup. + h.backupStates(chanID, 0, numUpdates, nil) + + // Stop the client in the background, to assert the + // pipeline is always flushed before it exits. + go h.client.Stop() + + // Give the client time to saturate a large number of + // session queues for which the server has not acked the + // state updates that it has received. + time.Sleep(time.Second) + + // Restart the server and allow it to ack the updates + // after the client retransmits the unacked updates. + h.server.Stop() + h.serverCfg.NoAckUpdates = false + h.startServer() + defer h.server.Stop() + + // Wait for all of the updates to be populated in the + // server's database. + h.waitServerUpdates(hints, 5*time.Second) + }, + }, + { + // Asserts that the client is able to send state updates to the + // tower for a full range of channel values, assuming the sweep + // fee rates permit it. We expect all of these to be successful + // since a sweep transactions spending only from one output is + // less expensive than one that sweeps both. + name: "send and recv", + cfg: harnessCfg{ + localBalance: 10000001, // ensure (% amt != 0) + remoteBalance: 20000001, // ensure (% amt != 0) + policy: wtpolicy.Policy{ + BlobType: blob.TypeDefault, + MaxUpdates: 1000, + SweepFeeRate: 1, + }, + }, + fn: func(h *testHarness) { + var ( + capacity = h.cfg.localBalance + h.cfg.remoteBalance + paymentAmt = lnwire.MilliSatoshi(200000) + numSends = uint64(h.cfg.localBalance / paymentAmt) + numRecvs = uint64(capacity / paymentAmt) + numUpdates = numSends + numRecvs // 200 updates + chanID = uint64(0) + ) + + // Send money to the remote party until all funds are + // depleted. + sendHints := h.sendPayments(chanID, 0, numSends, paymentAmt) + + // Now, sequentially receive the entire channel balance + // from the remote party. + recvHints := h.recvPayments(chanID, numSends, numUpdates, paymentAmt) + + // Collect the hints generated by both sending and + // receiving. + hints := append(sendHints, recvHints...) + + // Backup the channel's states the client. + h.backupStates(chanID, 0, numUpdates, nil) + + // Wait for all of the updates to be populated in the + // server's database. + h.waitServerUpdates(hints, 3*time.Second) + }, + }, + { + // Asserts that the client is able to support multiple links. + name: "multiple link backup", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + BlobType: blob.TypeDefault, + MaxUpdates: 5, + SweepFeeRate: 1, + }, + }, + fn: func(h *testHarness) { + const ( + numUpdates = 5 + numChans = 10 + ) + + // Initialize and register an additional 9 channels. + for id := uint64(1); id < 10; id++ { + h.makeChannel( + id, h.cfg.localBalance, + h.cfg.remoteBalance, + ) + h.registerChannel(id) + } + + // Generate the retributions for all 10 channels and + // collect the breach hints. + var hints []wtdb.BreachHint + for id := uint64(0); id < 10; id++ { + chanHints := h.advanceChannelN(id, numUpdates) + hints = append(hints, chanHints...) + } + + // Provided all retributions to the client from all + // channels. + for id := uint64(0); id < 10; id++ { + h.backupStates(id, 0, numUpdates, nil) + } + + // Test reliable flush under multi-client scenario. + go h.client.Stop() + + // Wait for all of the updates to be populated in the + // server's database. + h.waitServerUpdates(hints, 10*time.Second) + }, + }, +} + +// TestClient executes the client test suite, asserting the ability to backup +// states in a number of failure cases and it's reliability during shutdown. +func TestClient(t *testing.T) { + for _, test := range clientTests { + tc := test + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + h := newHarness(t, tc.cfg) + defer h.server.Stop() + defer h.client.ForceQuit() + + tc.fn(h) + }) + } +}