package wtclient_test import ( "encoding/binary" "net" "sync" "testing" "time" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/tor" "github.com/lightningnetwork/lnd/watchtower/blob" "github.com/lightningnetwork/lnd/watchtower/wtclient" "github.com/lightningnetwork/lnd/watchtower/wtdb" "github.com/lightningnetwork/lnd/watchtower/wtmock" "github.com/lightningnetwork/lnd/watchtower/wtpolicy" "github.com/lightningnetwork/lnd/watchtower/wtserver" "github.com/stretchr/testify/require" ) const ( csvDelay uint32 = 144 towerAddrStr = "18.28.243.2:9911" ) var ( revPrivBytes = []byte{ 0x8f, 0x4b, 0x51, 0x83, 0xa9, 0x34, 0xbd, 0x5f, 0x74, 0x6c, 0x9d, 0x5c, 0xae, 0x88, 0x2d, 0x31, 0x06, 0x90, 0xdd, 0x8c, 0x9b, 0x31, 0xbc, 0xd1, 0x78, 0x91, 0x88, 0x2a, 0xf9, 0x74, 0xa0, 0xef, } toLocalPrivBytes = []byte{ 0xde, 0x17, 0xc1, 0x2f, 0xdc, 0x1b, 0xc0, 0xc6, 0x59, 0x5d, 0xf9, 0xc1, 0x3e, 0x89, 0xbc, 0x6f, 0x01, 0x85, 0x45, 0x76, 0x26, 0xce, 0x9c, 0x55, 0x3b, 0xc9, 0xec, 0x3d, 0xd8, 0x8b, 0xac, 0xa8, } toRemotePrivBytes = []byte{ 0x28, 0x59, 0x6f, 0x36, 0xb8, 0x9f, 0x19, 0x5d, 0xcb, 0x07, 0x48, 0x8a, 0xe5, 0x89, 0x71, 0x74, 0x70, 0x4c, 0xff, 0x1e, 0x9c, 0x00, 0x93, 0xbe, 0xe2, 0x2e, 0x68, 0x08, 0x4c, 0xb4, 0x0f, 0x4f, } // addr is the server's reward address given to watchtower clients. addr, _ = btcutil.DecodeAddress( "mrX9vMRYLfVy1BnZbc5gZjuyaqH3ZW2ZHz", &chaincfg.TestNet3Params, ) addrScript, _ = txscript.PayToAddrScript(addr) ) // randPrivKey generates a new secp keypair, and returns the public key. func randPrivKey(t *testing.T) *btcec.PrivateKey { t.Helper() sk, err := btcec.NewPrivateKey(btcec.S256()) if err != nil { t.Fatalf("unable to generate pubkey: %v", err) } return sk } type mockNet struct { mu sync.RWMutex connCallback func(wtserver.Peer) } func newMockNet(cb func(wtserver.Peer)) *mockNet { return &mockNet{ connCallback: cb, } } func (m *mockNet) Dial(network string, address string, timeout time.Duration) (net.Conn, error) { return nil, nil } func (m *mockNet) LookupHost(host string) ([]string, error) { panic("not implemented") } func (m *mockNet) LookupSRV(service string, proto string, name string) (string, []*net.SRV, error) { panic("not implemented") } func (m *mockNet) ResolveTCPAddr(network string, address string) (*net.TCPAddr, error) { panic("not implemented") } func (m *mockNet) AuthDial(local keychain.SingleKeyECDH, netAddr *lnwire.NetAddress, dialer tor.DialFunc) (wtserver.Peer, error) { localPk := local.PubKey() localAddr := &net.TCPAddr{ IP: net.IP{0x32, 0x31, 0x30, 0x29}, Port: 36723, } localPeer, remotePeer := wtmock.NewMockConn( localPk, netAddr.IdentityKey, localAddr, netAddr.Address, 0, ) m.mu.RLock() m.connCallback(remotePeer) m.mu.RUnlock() return localPeer, nil } func (m *mockNet) setConnCallback(cb func(wtserver.Peer)) { m.mu.Lock() defer m.mu.Unlock() m.connCallback = cb } type mockChannel struct { mu sync.Mutex commitHeight uint64 retributions map[uint64]*lnwallet.BreachRetribution localBalance lnwire.MilliSatoshi remoteBalance lnwire.MilliSatoshi revSK *btcec.PrivateKey revPK *btcec.PublicKey revKeyLoc keychain.KeyLocator toRemoteSK *btcec.PrivateKey toRemotePK *btcec.PublicKey toRemoteKeyLoc keychain.KeyLocator toLocalPK *btcec.PublicKey // only need to generate to-local script dustLimit lnwire.MilliSatoshi csvDelay uint32 } func newMockChannel(t *testing.T, signer *wtmock.MockSigner, localAmt, remoteAmt lnwire.MilliSatoshi) *mockChannel { // Generate the revocation, to-local, and to-remote keypairs. revSK := randPrivKey(t) revPK := revSK.PubKey() toLocalSK := randPrivKey(t) toLocalPK := toLocalSK.PubKey() toRemoteSK := randPrivKey(t) toRemotePK := toRemoteSK.PubKey() // Register the revocation secret key and the to-remote secret key with // the signer. We will not need to sign with the to-local key, as this // is to be known only by the counterparty. revKeyLoc := signer.AddPrivKey(revSK) toRemoteKeyLoc := signer.AddPrivKey(toRemoteSK) c := &mockChannel{ retributions: make(map[uint64]*lnwallet.BreachRetribution), localBalance: localAmt, remoteBalance: remoteAmt, revSK: revSK, revPK: revPK, revKeyLoc: revKeyLoc, toLocalPK: toLocalPK, toRemoteSK: toRemoteSK, toRemotePK: toRemotePK, toRemoteKeyLoc: toRemoteKeyLoc, dustLimit: 546000, csvDelay: 144, } // Create the initial remote commitment with the initial balances. c.createRemoteCommitTx(t) return c } func (c *mockChannel) createRemoteCommitTx(t *testing.T) { t.Helper() // Construct the to-local witness script. toLocalScript, err := input.CommitScriptToSelf( c.csvDelay, c.toLocalPK, c.revPK, ) if err != nil { t.Fatalf("unable to create to-local script: %v", err) } // Compute the to-local witness script hash. toLocalScriptHash, err := input.WitnessScriptHash(toLocalScript) if err != nil { t.Fatalf("unable to create to-local witness script hash: %v", err) } // Compute the to-remote witness script hash. toRemoteScriptHash, err := input.CommitScriptUnencumbered(c.toRemotePK) if err != nil { t.Fatalf("unable to create to-remote script: %v", err) } // Construct the remote commitment txn, containing the to-local and // to-remote outputs. The balances are flipped since the transaction is // from the PoV of the remote party. We don't need any inputs for this // test. We increment the version with the commit height to ensure that // all commitment transactions are unique even if the same distribution // of funds is used more than once. commitTxn := &wire.MsgTx{ Version: int32(c.commitHeight + 1), } var ( toLocalSignDesc *input.SignDescriptor toRemoteSignDesc *input.SignDescriptor ) var outputIndex int if c.remoteBalance >= c.dustLimit { commitTxn.TxOut = append(commitTxn.TxOut, &wire.TxOut{ Value: int64(c.remoteBalance.ToSatoshis()), PkScript: toLocalScriptHash, }) // Create the sign descriptor used to sign for the to-local // input. toLocalSignDesc = &input.SignDescriptor{ KeyDesc: keychain.KeyDescriptor{ KeyLocator: c.revKeyLoc, PubKey: c.revPK, }, WitnessScript: toLocalScript, Output: commitTxn.TxOut[outputIndex], HashType: txscript.SigHashAll, } outputIndex++ } if c.localBalance >= c.dustLimit { commitTxn.TxOut = append(commitTxn.TxOut, &wire.TxOut{ Value: int64(c.localBalance.ToSatoshis()), PkScript: toRemoteScriptHash, }) // Create the sign descriptor used to sign for the to-remote // input. toRemoteSignDesc = &input.SignDescriptor{ KeyDesc: keychain.KeyDescriptor{ KeyLocator: c.toRemoteKeyLoc, PubKey: c.toRemotePK, }, WitnessScript: toRemoteScriptHash, Output: commitTxn.TxOut[outputIndex], HashType: txscript.SigHashAll, } outputIndex++ } txid := commitTxn.TxHash() var ( toLocalOutPoint wire.OutPoint toRemoteOutPoint wire.OutPoint ) outputIndex = 0 if toLocalSignDesc != nil { toLocalOutPoint = wire.OutPoint{ Hash: txid, Index: uint32(outputIndex), } outputIndex++ } if toRemoteSignDesc != nil { toRemoteOutPoint = wire.OutPoint{ Hash: txid, Index: uint32(outputIndex), } outputIndex++ } commitKeyRing := &lnwallet.CommitmentKeyRing{ RevocationKey: c.revPK, ToRemoteKey: c.toLocalPK, ToLocalKey: c.toRemotePK, } retribution := &lnwallet.BreachRetribution{ BreachTransaction: commitTxn, RevokedStateNum: c.commitHeight, KeyRing: commitKeyRing, RemoteDelay: c.csvDelay, LocalOutpoint: toRemoteOutPoint, LocalOutputSignDesc: toRemoteSignDesc, RemoteOutpoint: toLocalOutPoint, RemoteOutputSignDesc: toLocalSignDesc, } c.retributions[c.commitHeight] = retribution c.commitHeight++ } // advanceState creates the next channel state and retribution without altering // channel balances. func (c *mockChannel) advanceState(t *testing.T) { c.mu.Lock() defer c.mu.Unlock() c.createRemoteCommitTx(t) } // sendPayment creates the next channel state and retribution after transferring // amt to the remote party. func (c *mockChannel) sendPayment(t *testing.T, amt lnwire.MilliSatoshi) { t.Helper() c.mu.Lock() defer c.mu.Unlock() if c.localBalance < amt { t.Fatalf("insufficient funds to send, need: %v, have: %v", amt, c.localBalance) } c.localBalance -= amt c.remoteBalance += amt c.createRemoteCommitTx(t) } // receivePayment creates the next channel state and retribution after // transferring amt to the local party. func (c *mockChannel) receivePayment(t *testing.T, amt lnwire.MilliSatoshi) { t.Helper() c.mu.Lock() defer c.mu.Unlock() if c.remoteBalance < amt { t.Fatalf("insufficient funds to recv, need: %v, have: %v", amt, c.remoteBalance) } c.localBalance += amt c.remoteBalance -= amt c.createRemoteCommitTx(t) } // getState retrieves the channel's commitment and retribution at state i. func (c *mockChannel) getState(i uint64) (*wire.MsgTx, *lnwallet.BreachRetribution) { c.mu.Lock() defer c.mu.Unlock() retribution := c.retributions[i] return retribution.BreachTransaction, retribution } type testHarness struct { t *testing.T cfg harnessCfg signer *wtmock.MockSigner capacity lnwire.MilliSatoshi clientDB *wtmock.ClientDB clientCfg *wtclient.Config client wtclient.Client serverAddr *lnwire.NetAddress serverDB *wtmock.TowerDB serverCfg *wtserver.Config server *wtserver.Server net *mockNet mu sync.Mutex channels map[lnwire.ChannelID]*mockChannel } type harnessCfg struct { localBalance lnwire.MilliSatoshi remoteBalance lnwire.MilliSatoshi policy wtpolicy.Policy noRegisterChan0 bool noAckCreateSession bool } func newHarness(t *testing.T, cfg harnessCfg) *testHarness { towerTCPAddr, err := net.ResolveTCPAddr("tcp", towerAddrStr) if err != nil { t.Fatalf("Unable to resolve tower TCP addr: %v", err) } privKey, err := btcec.NewPrivateKey(btcec.S256()) if err != nil { t.Fatalf("Unable to generate tower private key: %v", err) } privKeyECDH := &keychain.PrivKeyECDH{PrivKey: privKey} towerPubKey := privKey.PubKey() towerAddr := &lnwire.NetAddress{ IdentityKey: towerPubKey, Address: towerTCPAddr, } const timeout = 200 * time.Millisecond serverDB := wtmock.NewTowerDB() serverCfg := &wtserver.Config{ DB: serverDB, ReadTimeout: timeout, WriteTimeout: timeout, NodeKeyECDH: privKeyECDH, NewAddress: func() (btcutil.Address, error) { return addr, nil }, NoAckCreateSession: cfg.noAckCreateSession, } server, err := wtserver.New(serverCfg) if err != nil { t.Fatalf("unable to create wtserver: %v", err) } signer := wtmock.NewMockSigner() mockNet := newMockNet(server.InboundPeerConnected) clientDB := wtmock.NewClientDB() clientCfg := &wtclient.Config{ Signer: signer, Dial: mockNet.Dial, DB: clientDB, AuthDial: mockNet.AuthDial, SecretKeyRing: wtmock.NewSecretKeyRing(), Policy: cfg.policy, NewAddress: func() ([]byte, error) { return addrScript, nil }, ReadTimeout: timeout, WriteTimeout: timeout, MinBackoff: time.Millisecond, MaxBackoff: time.Second, ForceQuitDelay: 10 * time.Second, } client, err := wtclient.New(clientCfg) if err != nil { t.Fatalf("Unable to create wtclient: %v", err) } if err := server.Start(); err != nil { t.Fatalf("Unable to start wtserver: %v", err) } if err = client.Start(); err != nil { server.Stop() t.Fatalf("Unable to start wtclient: %v", err) } if err := client.AddTower(towerAddr); err != nil { server.Stop() t.Fatalf("Unable to add tower to wtclient: %v", err) } h := &testHarness{ t: t, cfg: cfg, signer: signer, capacity: cfg.localBalance + cfg.remoteBalance, clientDB: clientDB, clientCfg: clientCfg, client: client, serverAddr: towerAddr, serverDB: serverDB, serverCfg: serverCfg, server: server, net: mockNet, channels: make(map[lnwire.ChannelID]*mockChannel), } h.makeChannel(0, h.cfg.localBalance, h.cfg.remoteBalance) if !cfg.noRegisterChan0 { h.registerChannel(0) } return h } // startServer creates a new server using the harness's current serverCfg and // starts it after pointing the mockNet's callback to the new server. func (h *testHarness) startServer() { h.t.Helper() var err error h.server, err = wtserver.New(h.serverCfg) if err != nil { h.t.Fatalf("unable to create wtserver: %v", err) } h.net.setConnCallback(h.server.InboundPeerConnected) if err := h.server.Start(); err != nil { h.t.Fatalf("unable to start wtserver: %v", err) } } // startClient creates a new server using the harness's current clientCf and // starts it. func (h *testHarness) startClient() { h.t.Helper() towerTCPAddr, err := net.ResolveTCPAddr("tcp", towerAddrStr) if err != nil { h.t.Fatalf("Unable to resolve tower TCP addr: %v", err) } towerAddr := &lnwire.NetAddress{ IdentityKey: h.serverCfg.NodeKeyECDH.PubKey(), Address: towerTCPAddr, } h.client, err = wtclient.New(h.clientCfg) if err != nil { h.t.Fatalf("unable to create wtclient: %v", err) } if err := h.client.Start(); err != nil { h.t.Fatalf("unable to start wtclient: %v", err) } if err := h.client.AddTower(towerAddr); err != nil { h.t.Fatalf("unable to add tower to wtclient: %v", err) } } // chanIDFromInt creates a unique channel id given a unique integral id. func chanIDFromInt(id uint64) lnwire.ChannelID { var chanID lnwire.ChannelID binary.BigEndian.PutUint64(chanID[:8], id) return chanID } // makeChannel creates new channel with id, using the localAmt and remoteAmt as // the starting balances. The channel will be available by using h.channel(id). // // NOTE: The method fails if channel for id already exists. func (h *testHarness) makeChannel(id uint64, localAmt, remoteAmt lnwire.MilliSatoshi) { h.t.Helper() chanID := chanIDFromInt(id) c := newMockChannel(h.t, h.signer, localAmt, remoteAmt) c.mu.Lock() _, ok := h.channels[chanID] if !ok { h.channels[chanID] = c } c.mu.Unlock() if ok { h.t.Fatalf("channel %d already created", id) } } // channel retrieves the channel corresponding to id. // // NOTE: The method fails if a channel for id does not exist. func (h *testHarness) channel(id uint64) *mockChannel { h.t.Helper() h.mu.Lock() c, ok := h.channels[chanIDFromInt(id)] h.mu.Unlock() if !ok { h.t.Fatalf("unable to fetch channel %d", id) } return c } // registerChannel registers the channel identified by id with the client. func (h *testHarness) registerChannel(id uint64) { h.t.Helper() chanID := chanIDFromInt(id) err := h.client.RegisterChannel(chanID) if err != nil { h.t.Fatalf("unable to register channel %d: %v", id, err) } } // advanceChannelN calls advanceState on the channel identified by id the number // of provided times and returns the breach hints corresponding to the new // states. func (h *testHarness) advanceChannelN(id uint64, n int) []blob.BreachHint { h.t.Helper() channel := h.channel(id) var hints []blob.BreachHint for i := uint64(0); i < uint64(n); i++ { channel.advanceState(h.t) commitTx, _ := h.channel(id).getState(i) breachTxID := commitTx.TxHash() hints = append(hints, blob.NewBreachHintFromHash(&breachTxID)) } return hints } // backupStates instructs the channel identified by id to send backups to the // client for states in the range [to, from). func (h *testHarness) backupStates(id, from, to uint64, expErr error) { h.t.Helper() for i := from; i < to; i++ { h.backupState(id, i, expErr) } } // backupStates instructs the channel identified by id to send a backup for // state i. func (h *testHarness) backupState(id, i uint64, expErr error) { h.t.Helper() _, retribution := h.channel(id).getState(i) chanID := chanIDFromInt(id) err := h.client.BackupState(&chanID, retribution, channeldb.SingleFunderBit) if err != expErr { h.t.Fatalf("back error mismatch, want: %v, got: %v", expErr, err) } } // sendPayments instructs the channel identified by id to send amt to the remote // party for each state in from-to times and returns the breach hints for states // [from, to). func (h *testHarness) sendPayments(id, from, to uint64, amt lnwire.MilliSatoshi) []blob.BreachHint { h.t.Helper() channel := h.channel(id) var hints []blob.BreachHint for i := from; i < to; i++ { h.channel(id).sendPayment(h.t, amt) commitTx, _ := channel.getState(i) breachTxID := commitTx.TxHash() hints = append(hints, blob.NewBreachHintFromHash(&breachTxID)) } return hints } // receivePayment instructs the channel identified by id to recv amt from the // remote party for each state in from-to times and returns the breach hints for // states [from, to). func (h *testHarness) recvPayments(id, from, to uint64, amt lnwire.MilliSatoshi) []blob.BreachHint { h.t.Helper() channel := h.channel(id) var hints []blob.BreachHint for i := from; i < to; i++ { channel.receivePayment(h.t, amt) commitTx, _ := channel.getState(i) breachTxID := commitTx.TxHash() hints = append(hints, blob.NewBreachHintFromHash(&breachTxID)) } return hints } // waitServerUpdates blocks until the breach hints provided all appear in the // watchtower's database or the timeout expires. This is used to test that the // client in fact sends the updates to the server, even if it is offline. func (h *testHarness) waitServerUpdates(hints []blob.BreachHint, timeout time.Duration) { h.t.Helper() // If no breach hints are provided, we will wait out the full timeout to // assert that no updates appear. wantUpdates := len(hints) > 0 hintSet := make(map[blob.BreachHint]struct{}) for _, hint := range hints { hintSet[hint] = struct{}{} } if len(hints) != len(hintSet) { h.t.Fatalf("breach hints are not unique, list-len: %d "+ "set-len: %d", len(hints), len(hintSet)) } // Closure to assert the server's matches are consistent with the hint // set. serverHasHints := func(matches []wtdb.Match) bool { if len(hintSet) != len(matches) { return false } for _, match := range matches { if _, ok := hintSet[match.Hint]; ok { continue } h.t.Fatalf("match %v in db is not in hint set", match.Hint) } return true } failTimeout := time.After(timeout) for { select { case <-time.After(time.Second): matches, err := h.serverDB.QueryMatches(hints) switch { case err != nil: h.t.Fatalf("unable to query for hints: %v", err) case wantUpdates && serverHasHints(matches): return case wantUpdates: h.t.Logf("Received %d/%d\n", len(matches), len(hints)) } case <-failTimeout: matches, err := h.serverDB.QueryMatches(hints) switch { case err != nil: h.t.Fatalf("unable to query for hints: %v", err) case serverHasHints(matches): return default: h.t.Fatalf("breach hints not received, only "+ "got %d/%d", len(matches), len(hints)) } } } } // assertUpdatesForPolicy queries the server db for matches using the provided // breach hints, then asserts that each match has a session with the expected // policy. func (h *testHarness) assertUpdatesForPolicy(hints []blob.BreachHint, expPolicy wtpolicy.Policy) { // Query for matches on the provided hints. matches, err := h.serverDB.QueryMatches(hints) if err != nil { h.t.Fatalf("unable to query for matches: %v", err) } // Assert that the number of matches is exactly the number of provided // hints. if len(matches) != len(hints) { h.t.Fatalf("expected: %d matches, got: %d", len(hints), len(matches)) } // Assert that all of the matches correspond to a session with the // expected policy. for _, match := range matches { matchPolicy := match.SessionInfo.Policy if expPolicy != matchPolicy { h.t.Fatalf("expected session to have policy: %v, "+ "got: %v", expPolicy, matchPolicy) } } } // addTower adds a tower found at `addr` to the client. func (h *testHarness) addTower(addr *lnwire.NetAddress) { h.t.Helper() if err := h.client.AddTower(addr); err != nil { h.t.Fatalf("unable to add tower: %v", err) } } // removeTower removes a tower from the client. If `addr` is specified, then the // only said address is removed from the tower. func (h *testHarness) removeTower(pubKey *btcec.PublicKey, addr net.Addr) { h.t.Helper() if err := h.client.RemoveTower(pubKey, addr); err != nil { h.t.Fatalf("unable to remove tower: %v", err) } } const ( localBalance = lnwire.MilliSatoshi(100000000) remoteBalance = lnwire.MilliSatoshi(200000000) ) type clientTest struct { name string cfg harnessCfg fn func(*testHarness) } var clientTests = []clientTest{ { // Asserts that client will return the ErrUnregisteredChannel // error when trying to backup states for a channel that has not // been registered (and received it's pkscript). name: "backup unregistered channel", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, policy: wtpolicy.Policy{ TxPolicy: wtpolicy.TxPolicy{ BlobType: blob.TypeAltruistCommit, SweepFeeRate: wtpolicy.DefaultSweepFeeRate, }, MaxUpdates: 20000, }, noRegisterChan0: true, }, fn: func(h *testHarness) { const ( numUpdates = 5 chanID = 0 ) // Advance the channel and backup the retributions. We // expect ErrUnregisteredChannel to be returned since // the channel was not registered during harness // creation. h.advanceChannelN(chanID, numUpdates) h.backupStates( chanID, 0, numUpdates, wtclient.ErrUnregisteredChannel, ) }, }, { // Asserts that the client returns an ErrClientExiting when // trying to backup channels after the Stop method has been // called. name: "backup after stop", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, policy: wtpolicy.Policy{ TxPolicy: wtpolicy.TxPolicy{ BlobType: blob.TypeAltruistCommit, SweepFeeRate: wtpolicy.DefaultSweepFeeRate, }, MaxUpdates: 20000, }, }, fn: func(h *testHarness) { const ( numUpdates = 5 chanID = 0 ) // Stop the client, subsequent backups should fail. h.client.Stop() // Advance the channel and try to back up the states. We // expect ErrClientExiting to be returned from // BackupState. h.advanceChannelN(chanID, numUpdates) h.backupStates( chanID, 0, numUpdates, wtclient.ErrClientExiting, ) }, }, { // Asserts that the client will continue to back up all states // that have previously been enqueued before it finishes // exiting. name: "backup reliable flush", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, policy: wtpolicy.Policy{ TxPolicy: wtpolicy.TxPolicy{ BlobType: blob.TypeAltruistCommit, SweepFeeRate: wtpolicy.DefaultSweepFeeRate, }, MaxUpdates: 5, }, }, fn: func(h *testHarness) { const ( numUpdates = 5 chanID = 0 ) // Generate numUpdates retributions and back them up to // the tower. hints := h.advanceChannelN(chanID, numUpdates) h.backupStates(chanID, 0, numUpdates, nil) // Stop the client in the background, to assert the // pipeline is always flushed before it exits. go h.client.Stop() // Wait for all of the updates to be populated in the // server's database. h.waitServerUpdates(hints, time.Second) }, }, { // Assert that the client will not send out backups for states // whose justice transactions are ineligible for backup, e.g. // creating dust outputs. name: "backup dust ineligible", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, policy: wtpolicy.Policy{ TxPolicy: wtpolicy.TxPolicy{ BlobType: blob.TypeAltruistCommit, SweepFeeRate: 1000000, // high sweep fee creates dust }, MaxUpdates: 20000, }, }, fn: func(h *testHarness) { const ( numUpdates = 5 chanID = 0 ) // Create the retributions and queue them for backup. h.advanceChannelN(chanID, numUpdates) h.backupStates(chanID, 0, numUpdates, nil) // Ensure that no updates are received by the server, // since they should all be marked as ineligible. h.waitServerUpdates(nil, time.Second) }, }, { // Verifies that the client will properly retransmit a committed // state update to the watchtower after a restart if the update // was not acked while the client was active last. name: "committed update restart", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, policy: wtpolicy.Policy{ TxPolicy: wtpolicy.TxPolicy{ BlobType: blob.TypeAltruistCommit, SweepFeeRate: wtpolicy.DefaultSweepFeeRate, }, MaxUpdates: 20000, }, }, fn: func(h *testHarness) { const ( numUpdates = 5 chanID = 0 ) hints := h.advanceChannelN(0, numUpdates) var numSent uint64 // Add the first two states to the client's pipeline. h.backupStates(chanID, 0, 2, nil) numSent = 2 // Wait for both to be reflected in the server's // database. h.waitServerUpdates(hints[:numSent], time.Second) // Now, restart the server and prevent it from acking // state updates. h.server.Stop() h.serverCfg.NoAckUpdates = true h.startServer() defer h.server.Stop() // Send the next state update to the tower. Since the // tower isn't acking state updates, we expect this // update to be committed and sent by the session queue, // but it will never receive an ack. h.backupState(chanID, numSent, nil) numSent++ // Force quit the client to abort the state updates it // has queued. The sleep ensures that the session queues // have enough time to commit the state updates before // the client is killed. time.Sleep(time.Second) h.client.ForceQuit() // Restart the server and allow it to ack the updates // after the client retransmits the unacked update. h.server.Stop() h.serverCfg.NoAckUpdates = false h.startServer() defer h.server.Stop() // Restart the client and allow it to process the // committed update. h.startClient() defer h.client.ForceQuit() // Wait for the committed update to be accepted by the // tower. h.waitServerUpdates(hints[:numSent], time.Second) // Finally, send the rest of the updates and wait for // the tower to receive the remaining states. h.backupStates(chanID, numSent, numUpdates, nil) // Wait for all of the updates to be populated in the // server's database. h.waitServerUpdates(hints, time.Second) }, }, { // Asserts that the client will continue to retry sending state // updates if it doesn't receive an ack from the server. The // client is expected to flush everything in its in-memory // pipeline once the server begins sending acks again. name: "no ack from server", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, policy: wtpolicy.Policy{ TxPolicy: wtpolicy.TxPolicy{ BlobType: blob.TypeAltruistCommit, SweepFeeRate: wtpolicy.DefaultSweepFeeRate, }, MaxUpdates: 5, }, }, fn: func(h *testHarness) { const ( numUpdates = 100 chanID = 0 ) // Generate the retributions that will be backed up. hints := h.advanceChannelN(chanID, numUpdates) // Restart the server and prevent it from acking state // updates. h.server.Stop() h.serverCfg.NoAckUpdates = true h.startServer() defer h.server.Stop() // Now, queue the retributions for backup. h.backupStates(chanID, 0, numUpdates, nil) // Stop the client in the background, to assert the // pipeline is always flushed before it exits. go h.client.Stop() // Give the client time to saturate a large number of // session queues for which the server has not acked the // state updates that it has received. time.Sleep(time.Second) // Restart the server and allow it to ack the updates // after the client retransmits the unacked updates. h.server.Stop() h.serverCfg.NoAckUpdates = false h.startServer() defer h.server.Stop() // Wait for all of the updates to be populated in the // server's database. h.waitServerUpdates(hints, 5*time.Second) }, }, { // Asserts that the client is able to send state updates to the // tower for a full range of channel values, assuming the sweep // fee rates permit it. We expect all of these to be successful // since a sweep transactions spending only from one output is // less expensive than one that sweeps both. name: "send and recv", cfg: harnessCfg{ localBalance: 100000001, // ensure (% amt != 0) remoteBalance: 200000001, // ensure (% amt != 0) policy: wtpolicy.Policy{ TxPolicy: wtpolicy.TxPolicy{ BlobType: blob.TypeAltruistCommit, SweepFeeRate: wtpolicy.DefaultSweepFeeRate, }, MaxUpdates: 1000, }, }, fn: func(h *testHarness) { var ( capacity = h.cfg.localBalance + h.cfg.remoteBalance paymentAmt = lnwire.MilliSatoshi(2000000) numSends = uint64(h.cfg.localBalance / paymentAmt) numRecvs = uint64(capacity / paymentAmt) numUpdates = numSends + numRecvs // 200 updates chanID = uint64(0) ) // Send money to the remote party until all funds are // depleted. sendHints := h.sendPayments(chanID, 0, numSends, paymentAmt) // Now, sequentially receive the entire channel balance // from the remote party. recvHints := h.recvPayments(chanID, numSends, numUpdates, paymentAmt) // Collect the hints generated by both sending and // receiving. hints := append(sendHints, recvHints...) // Backup the channel's states the client. h.backupStates(chanID, 0, numUpdates, nil) // Wait for all of the updates to be populated in the // server's database. h.waitServerUpdates(hints, 3*time.Second) }, }, { // Asserts that the client is able to support multiple links. name: "multiple link backup", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, policy: wtpolicy.Policy{ TxPolicy: wtpolicy.TxPolicy{ BlobType: blob.TypeAltruistCommit, SweepFeeRate: wtpolicy.DefaultSweepFeeRate, }, MaxUpdates: 5, }, }, fn: func(h *testHarness) { const ( numUpdates = 5 numChans = 10 ) // Initialize and register an additional 9 channels. for id := uint64(1); id < 10; id++ { h.makeChannel( id, h.cfg.localBalance, h.cfg.remoteBalance, ) h.registerChannel(id) } // Generate the retributions for all 10 channels and // collect the breach hints. var hints []blob.BreachHint for id := uint64(0); id < 10; id++ { chanHints := h.advanceChannelN(id, numUpdates) hints = append(hints, chanHints...) } // Provided all retributions to the client from all // channels. for id := uint64(0); id < 10; id++ { h.backupStates(id, 0, numUpdates, nil) } // Test reliable flush under multi-client scenario. go h.client.Stop() // Wait for all of the updates to be populated in the // server's database. h.waitServerUpdates(hints, 10*time.Second) }, }, { name: "create session no ack", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, policy: wtpolicy.Policy{ TxPolicy: wtpolicy.TxPolicy{ BlobType: blob.TypeAltruistCommit, SweepFeeRate: wtpolicy.DefaultSweepFeeRate, }, MaxUpdates: 5, }, noAckCreateSession: true, }, fn: func(h *testHarness) { const ( chanID = 0 numUpdates = 3 ) // Generate the retributions that will be backed up. hints := h.advanceChannelN(chanID, numUpdates) // Now, queue the retributions for backup. h.backupStates(chanID, 0, numUpdates, nil) // Since the client is unable to create a session, the // server should have no updates. h.waitServerUpdates(nil, time.Second) // Force quit the client since it has queued backups. h.client.ForceQuit() // Restart the server and allow it to ack session // creation. h.server.Stop() h.serverCfg.NoAckCreateSession = false h.startServer() defer h.server.Stop() // Restart the client with the same policy, which will // immediately try to overwrite the old session with an // identical one. h.startClient() defer h.client.ForceQuit() // Now, queue the retributions for backup. h.backupStates(chanID, 0, numUpdates, nil) // Wait for all of the updates to be populated in the // server's database. h.waitServerUpdates(hints, 5*time.Second) // Assert that the server has updates for the clients // most recent policy. h.assertUpdatesForPolicy(hints, h.clientCfg.Policy) }, }, { name: "create session no ack change policy", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, policy: wtpolicy.Policy{ TxPolicy: wtpolicy.TxPolicy{ BlobType: blob.TypeAltruistCommit, SweepFeeRate: wtpolicy.DefaultSweepFeeRate, }, MaxUpdates: 5, }, noAckCreateSession: true, }, fn: func(h *testHarness) { const ( chanID = 0 numUpdates = 3 ) // Generate the retributions that will be backed up. hints := h.advanceChannelN(chanID, numUpdates) // Now, queue the retributions for backup. h.backupStates(chanID, 0, numUpdates, nil) // Since the client is unable to create a session, the // server should have no updates. h.waitServerUpdates(nil, time.Second) // Force quit the client since it has queued backups. h.client.ForceQuit() // Restart the server and allow it to ack session // creation. h.server.Stop() h.serverCfg.NoAckCreateSession = false h.startServer() defer h.server.Stop() // Restart the client with a new policy, which will // immediately try to overwrite the prior session with // the old policy. h.clientCfg.Policy.SweepFeeRate *= 2 h.startClient() defer h.client.ForceQuit() // Now, queue the retributions for backup. h.backupStates(chanID, 0, numUpdates, nil) // Wait for all of the updates to be populated in the // server's database. h.waitServerUpdates(hints, 5*time.Second) // Assert that the server has updates for the clients // most recent policy. h.assertUpdatesForPolicy(hints, h.clientCfg.Policy) }, }, { // Asserts that the client will not request a new session if // already has an existing session with the same TxPolicy. This // permits the client to continue using policies that differ in // operational parameters, but don't manifest in different // justice transactions. name: "create session change policy same txpolicy", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, policy: wtpolicy.Policy{ TxPolicy: wtpolicy.TxPolicy{ BlobType: blob.TypeAltruistCommit, SweepFeeRate: wtpolicy.DefaultSweepFeeRate, }, MaxUpdates: 10, }, }, fn: func(h *testHarness) { const ( chanID = 0 numUpdates = 6 ) // Generate the retributions that will be backed up. hints := h.advanceChannelN(chanID, numUpdates) // Now, queue the first half of the retributions. h.backupStates(chanID, 0, numUpdates/2, nil) // Wait for the server to collect the first half. h.waitServerUpdates(hints[:numUpdates/2], time.Second) // Stop the client, which should have no more backups. h.client.Stop() // Record the policy that the first half was stored // under. We'll expect the second half to also be stored // under the original policy, since we are only adjusting // the MaxUpdates. The client should detect that the // two policies have equivalent TxPolicies and continue // using the first. expPolicy := h.clientCfg.Policy // Restart the client with a new policy. h.clientCfg.Policy.MaxUpdates = 20 h.startClient() defer h.client.ForceQuit() // Now, queue the second half of the retributions. h.backupStates(chanID, numUpdates/2, numUpdates, nil) // Wait for all of the updates to be populated in the // server's database. h.waitServerUpdates(hints, 5*time.Second) // Assert that the server has updates for the client's // original policy. h.assertUpdatesForPolicy(hints, expPolicy) }, }, { // Asserts that the client will deduplicate backups presented by // a channel both in memory and after a restart. The client // should only accept backups with a commit height greater than // any processed already processed for a given policy. name: "dedup backups", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, policy: wtpolicy.Policy{ TxPolicy: wtpolicy.TxPolicy{ BlobType: blob.TypeAltruistCommit, SweepFeeRate: wtpolicy.DefaultSweepFeeRate, }, MaxUpdates: 5, }, }, fn: func(h *testHarness) { const ( numUpdates = 10 chanID = 0 ) // Generate the retributions that will be backed up. hints := h.advanceChannelN(chanID, numUpdates) // Queue the first half of the retributions twice, the // second batch should be entirely deduped by the // client's in-memory tracking. h.backupStates(chanID, 0, numUpdates/2, nil) h.backupStates(chanID, 0, numUpdates/2, nil) // Wait for the first half of the updates to be // populated in the server's database. h.waitServerUpdates(hints[:len(hints)/2], 5*time.Second) // Restart the client, so we can ensure the deduping is // maintained across restarts. h.client.Stop() h.startClient() defer h.client.ForceQuit() // Try to back up the full range of retributions. Only // the second half should actually be sent. h.backupStates(chanID, 0, numUpdates, nil) // Wait for all of the updates to be populated in the // server's database. h.waitServerUpdates(hints, 5*time.Second) }, }, { // Asserts that the client can continue making backups to a // tower that's been re-added after it's been removed. name: "re-add removed tower", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, policy: wtpolicy.Policy{ TxPolicy: wtpolicy.TxPolicy{ BlobType: blob.TypeAltruistCommit, SweepFeeRate: wtpolicy.DefaultSweepFeeRate, }, MaxUpdates: 5, }, }, fn: func(h *testHarness) { const ( chanID = 0 numUpdates = 4 ) // Create four channel updates and only back up the // first two. hints := h.advanceChannelN(chanID, numUpdates) h.backupStates(chanID, 0, numUpdates/2, nil) h.waitServerUpdates(hints[:numUpdates/2], 5*time.Second) // Fully remove the tower, causing its existing sessions // to be marked inactive. h.removeTower(h.serverAddr.IdentityKey, nil) // Back up the remaining states. Since the tower has // been removed, it shouldn't receive any updates. h.backupStates(chanID, numUpdates/2, numUpdates, nil) h.waitServerUpdates(nil, time.Second) // Re-add the tower. We prevent the tower from acking // session creation to ensure the inactive sessions are // not used. err := h.server.Stop() require.Nil(h.t, err) h.serverCfg.NoAckCreateSession = true h.startServer() h.addTower(h.serverAddr) h.waitServerUpdates(nil, time.Second) // Finally, allow the tower to ack session creation, // allowing the state updates to be sent through the new // session. err = h.server.Stop() require.Nil(h.t, err) h.serverCfg.NoAckCreateSession = false h.startServer() h.waitServerUpdates(hints[numUpdates/2:], 5*time.Second) }, }, { // Asserts that the client's force quite delay will properly // shutdown the client if it is unable to completely drain the // task pipeline. name: "force unclean shutdown", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, policy: wtpolicy.Policy{ TxPolicy: wtpolicy.TxPolicy{ BlobType: blob.TypeAltruistCommit, SweepFeeRate: wtpolicy.DefaultSweepFeeRate, }, MaxUpdates: 5, }, }, fn: func(h *testHarness) { const ( chanID = 0 numUpdates = 6 maxUpdates = 5 ) // Advance the channel to create all states. hints := h.advanceChannelN(chanID, numUpdates) // Back up 4 of the 5 states for the negotiated session. h.backupStates(chanID, 0, maxUpdates-1, nil) h.waitServerUpdates(hints[:maxUpdates-1], 5*time.Second) // Now, restart the tower and prevent it from acking any // new sessions. We do this here as once the last slot // is exhausted the client will attempt to renegotiate. err := h.server.Stop() require.Nil(h.t, err) h.serverCfg.NoAckCreateSession = true h.startServer() // Back up the remaining two states. Once the first is // processed, the session will be exhausted but the // client won't be able to regnegotiate a session for // the final state. We'll only wait for the first five // states to arrive at the tower. h.backupStates(chanID, maxUpdates-1, numUpdates, nil) h.waitServerUpdates(hints[:maxUpdates], 5*time.Second) // Finally, stop the client which will continue to // attempt session negotiation since it has one more // state to process. After the force quite delay // expires, the client should force quite itself and // allow the test to complete. err = h.client.Stop() require.Nil(h.t, err) }, }, } // TestClient executes the client test suite, asserting the ability to backup // states in a number of failure cases and it's reliability during shutdown. func TestClient(t *testing.T) { for _, test := range clientTests { tc := test t.Run(tc.name, func(t *testing.T) { t.Parallel() h := newHarness(t, tc.cfg) defer h.server.Stop() defer h.client.ForceQuit() tc.fn(h) }) } }