From f5dee02ff445e74d3682b3f0aab5c67ff3aec66d Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 7 Jun 2019 16:42:25 +0200 Subject: [PATCH 1/8] htlcswitch/mock: set SelfKey and mock ErrorSource --- htlcswitch/mock.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index be9c3994..df059914 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -163,7 +163,10 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error) } } + priv, _ := btcec.NewPrivateKey(btcec.S256()) + pubkey := priv.PubKey() cfg := Config{ + SelfKey: pubkey, DB: db, SwitchPackager: channeldb.NewSwitchPackager(), FwdingLog: &mockForwardingLog{ @@ -390,7 +393,11 @@ func (o *mockDeobfuscator) DecryptError(reason lnwire.OpaqueReason) (*Forwarding return nil, err } + priv, _ := btcec.NewPrivateKey(btcec.S256()) + pubkey := priv.PubKey() + return &ForwardingError{ + ErrorSource: pubkey, FailureMessage: failure, }, nil } From df3f5d02adb97b42e3b0d6aa0e9e36b8da012e74 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 7 Jun 2019 16:42:25 +0200 Subject: [PATCH 2/8] htlcswitch/payment_result: add (de)serialization of networkResult + test --- htlcswitch/payment_result.go | 33 ++++++++++++ htlcswitch/payment_result_test.go | 90 +++++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+) create mode 100644 htlcswitch/payment_result_test.go diff --git a/htlcswitch/payment_result.go b/htlcswitch/payment_result.go index 5cfce845..02359d6d 100644 --- a/htlcswitch/payment_result.go +++ b/htlcswitch/payment_result.go @@ -2,7 +2,9 @@ package htlcswitch import ( "errors" + "io" + "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" ) @@ -46,3 +48,34 @@ type networkResult struct { // which the failure reason might not be included. isResolution bool } + +// serializeNetworkResult serializes the networkResult. +func serializeNetworkResult(w io.Writer, n *networkResult) error { + if _, err := lnwire.WriteMessage(w, n.msg, 0); err != nil { + return err + } + + return channeldb.WriteElements(w, n.unencrypted, n.isResolution) +} + +// deserializeNetworkResult deserializes the networkResult. +func deserializeNetworkResult(r io.Reader) (*networkResult, error) { + var ( + err error + ) + + n := &networkResult{} + + n.msg, err = lnwire.ReadMessage(r, 0) + if err != nil { + return nil, err + } + + if err := channeldb.ReadElements(r, + &n.unencrypted, &n.isResolution, + ); err != nil { + return nil, err + } + + return n, nil +} diff --git a/htlcswitch/payment_result_test.go b/htlcswitch/payment_result_test.go new file mode 100644 index 00000000..4b45bc9a --- /dev/null +++ b/htlcswitch/payment_result_test.go @@ -0,0 +1,90 @@ +package htlcswitch + +import ( + "bytes" + "math/rand" + "reflect" + "testing" + + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/lntypes" + "github.com/lightningnetwork/lnd/lnwire" +) + +// TestNetworkResultSerialization checks that NetworkResults are properly +// (de)serialized. +func TestNetworkResultSerialization(t *testing.T) { + t.Parallel() + + var preimage lntypes.Preimage + if _, err := rand.Read(preimage[:]); err != nil { + t.Fatalf("unable gen rand preimag: %v", err) + } + + var chanID lnwire.ChannelID + if _, err := rand.Read(chanID[:]); err != nil { + t.Fatalf("unable gen rand chanid: %v", err) + } + + var reason [256]byte + if _, err := rand.Read(reason[:]); err != nil { + t.Fatalf("unable gen rand reason: %v", err) + } + + settle := &lnwire.UpdateFulfillHTLC{ + ChanID: chanID, + ID: 2, + PaymentPreimage: preimage, + } + + fail := &lnwire.UpdateFailHTLC{ + ChanID: chanID, + ID: 1, + Reason: []byte{}, + } + + fail2 := &lnwire.UpdateFailHTLC{ + ChanID: chanID, + ID: 1, + Reason: reason[:], + } + + testCases := []*networkResult{ + { + msg: settle, + }, + { + msg: fail, + unencrypted: false, + isResolution: false, + }, + { + msg: fail, + unencrypted: false, + isResolution: true, + }, + { + msg: fail2, + unencrypted: true, + isResolution: false, + }, + } + + for _, p := range testCases { + var buf bytes.Buffer + if err := serializeNetworkResult(&buf, p); err != nil { + t.Fatalf("serialize failed: %v", err) + } + + r := bytes.NewReader(buf.Bytes()) + p1, err := deserializeNetworkResult(r) + if err != nil { + t.Fatalf("unable to deserizlize: %v", err) + } + + if !reflect.DeepEqual(p, p1) { + t.Fatalf("not equal. %v vs %v", spew.Sdump(p), + spew.Sdump(p1)) + } + } +} From 1febe1a6d5121ee1b2239f88d65953590341e758 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 7 Jun 2019 16:42:25 +0200 Subject: [PATCH 3/8] htlcswitch/payment_result: add paymentResultStore paymentResultStore is a persistent store where we keep track of all received payment results. This is used to ensure we don't lose results from payment attempts on restarts. --- htlcswitch/payment_result.go | 179 +++++++++++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) diff --git a/htlcswitch/payment_result.go b/htlcswitch/payment_result.go index 02359d6d..b87b0a74 100644 --- a/htlcswitch/payment_result.go +++ b/htlcswitch/payment_result.go @@ -1,14 +1,24 @@ package htlcswitch import ( + "bytes" + "encoding/binary" "errors" "io" + "sync" + "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/multimutex" ) var ( + + // networkResultStoreBucketKey is used for the root level bucket that + // stores the network result for each payment ID. + networkResultStoreBucketKey = []byte("network-result-store-bucket") + // ErrPaymentIDNotFound is an error returned if the given paymentID is // not found. ErrPaymentIDNotFound = errors.New("paymentID not found") @@ -79,3 +89,172 @@ func deserializeNetworkResult(r io.Reader) (*networkResult, error) { return n, nil } + +// networkResultStore is a persistent store that stores any results of HTLCs in +// flight on the network. Since payment results are inherently asynchronous, it +// is used as a common access point for senders of HTLCs, to know when a result +// is back. The Switch will checkpoint any received result to the store, and +// the store will keep results and notify the callers about them. +type networkResultStore struct { + db *channeldb.DB + + // results is a map from paymentIDs to channels where subscribers to + // payment results will be notified. + results map[uint64][]chan *networkResult + resultsMtx sync.Mutex + + // paymentIDMtx is a multimutex used to make sure the database and + // result subscribers map is consistent for each payment ID in case of + // concurrent callers. + paymentIDMtx *multimutex.Mutex +} + +func newNetworkResultStore(db *channeldb.DB) *networkResultStore { + return &networkResultStore{ + db: db, + results: make(map[uint64][]chan *networkResult), + paymentIDMtx: multimutex.NewMutex(), + } +} + +// storeResult stores the networkResult for the given paymentID, and +// notifies any subscribers. +func (store *networkResultStore) storeResult(paymentID uint64, + result *networkResult) error { + + // We get a mutex for this payment ID. This is needed to ensure + // consistency between the database state and the subscribers in case + // of concurrent calls. + store.paymentIDMtx.Lock(paymentID) + defer store.paymentIDMtx.Unlock(paymentID) + + // Serialize the payment result. + var b bytes.Buffer + if err := serializeNetworkResult(&b, result); err != nil { + return err + } + + var paymentIDBytes [8]byte + binary.BigEndian.PutUint64(paymentIDBytes[:], paymentID) + + err := store.db.Batch(func(tx *bbolt.Tx) error { + networkResults, err := tx.CreateBucketIfNotExists( + networkResultStoreBucketKey, + ) + if err != nil { + return err + } + + return networkResults.Put(paymentIDBytes[:], b.Bytes()) + }) + if err != nil { + return err + } + + // Now that the result is stored in the database, we can notify any + // active subscribers. + store.resultsMtx.Lock() + for _, res := range store.results[paymentID] { + res <- result + } + delete(store.results, paymentID) + store.resultsMtx.Unlock() + + return nil +} + +// subscribeResult is used to get the payment result for the given +// payment ID. It returns a channel on which the result will be delivered when +// ready. +func (store *networkResultStore) subscribeResult(paymentID uint64) ( + <-chan *networkResult, error) { + + // We get a mutex for this payment ID. This is needed to ensure + // consistency between the database state and the subscribers in case + // of concurrent calls. + store.paymentIDMtx.Lock(paymentID) + defer store.paymentIDMtx.Unlock(paymentID) + + var ( + result *networkResult + resultChan = make(chan *networkResult, 1) + ) + + err := store.db.View(func(tx *bbolt.Tx) error { + var err error + result, err = fetchResult(tx, paymentID) + switch { + + // Result not yet available, we will notify once a result is + // available. + case err == ErrPaymentIDNotFound: + return nil + + case err != nil: + return err + + // The result was found, and will be returned immediately. + default: + return nil + } + }) + if err != nil { + return nil, err + } + + // If the result was found, we can send it on the result channel + // imemdiately. + if result != nil { + resultChan <- result + return resultChan, nil + } + + // Otherwise we store the result channel for when the result is + // available. + store.resultsMtx.Lock() + store.results[paymentID] = append( + store.results[paymentID], resultChan, + ) + store.resultsMtx.Unlock() + + return resultChan, nil +} + +// getResult attempts to immediately fetch the result for the given pid from +// the store. If no result is available, ErrPaymentIDNotFound is returned. +func (store *networkResultStore) getResult(pid uint64) ( + *networkResult, error) { + + var result *networkResult + err := store.db.View(func(tx *bbolt.Tx) error { + var err error + result, err = fetchResult(tx, pid) + return err + }) + if err != nil { + return nil, err + } + + return result, nil +} + +func fetchResult(tx *bbolt.Tx, pid uint64) (*networkResult, error) { + var paymentIDBytes [8]byte + binary.BigEndian.PutUint64(paymentIDBytes[:], pid) + + networkResults := tx.Bucket(networkResultStoreBucketKey) + if networkResults == nil { + return nil, ErrPaymentIDNotFound + } + + // Check whether a result is already available. + resultBytes := networkResults.Get(paymentIDBytes[:]) + if resultBytes == nil { + return nil, ErrPaymentIDNotFound + } + + // Decode the result we found. + r := bytes.NewReader(resultBytes) + + return deserializeNetworkResult(r) +} From 2dea790b5513e7041c44ce9979ca69e30d19f701 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 7 Jun 2019 16:42:25 +0200 Subject: [PATCH 4/8] multi: make GetPaymentResult take payment hash Used for logging in the switch, and when we remove the pending payments, only the router will have the hash stored across restarts. --- htlcswitch/link_test.go | 4 ++-- htlcswitch/switch.go | 4 ++-- htlcswitch/switch_test.go | 4 ++-- htlcswitch/test_utils.go | 4 ++-- routing/mock_test.go | 7 ++++--- routing/payment_lifecycle.go | 2 +- routing/router.go | 3 ++- 7 files changed, 15 insertions(+), 13 deletions(-) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 646c6450..ac3a1b12 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1118,7 +1118,7 @@ func TestChannelLinkMultiHopUnknownPaymentHash(t *testing.T) { } resultChan, err := n.aliceServer.htlcSwitch.GetPaymentResult( - pid, newMockDeobfuscator(), + pid, htlc.PaymentHash, newMockDeobfuscator(), ) if err != nil { t.Fatalf("unable to get payment result: %v", err) @@ -3898,7 +3898,7 @@ func TestChannelLinkAcceptDuplicatePayment(t *testing.T) { } resultChan, err := n.aliceServer.htlcSwitch.GetPaymentResult( - pid, newMockDeobfuscator(), + pid, htlc.PaymentHash, newMockDeobfuscator(), ) if err != nil { t.Fatalf("unable to get payment result: %v", err) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 45e2e2db..efbcf608 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -342,7 +342,7 @@ func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) erro // result is received on the channel, the HTLC is guaranteed to no longer be in // flight. The switch shutting down is signaled by closing the channel. If the // paymentID is unknown, ErrPaymentIDNotFound will be returned. -func (s *Switch) GetPaymentResult(paymentID uint64, +func (s *Switch) GetPaymentResult(paymentID uint64, paymentHash lntypes.Hash, deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) { s.pendingMutex.Lock() @@ -375,7 +375,7 @@ func (s *Switch) GetPaymentResult(paymentID uint64, // Extract the result and pass it to the result channel. result, err := s.extractResult( - deobfuscator, n, paymentID, payment.paymentHash, + deobfuscator, n, paymentID, paymentHash, ) if err != nil { e := fmt.Errorf("Unable to extract result: %v", err) diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index cd798301..af7c0fca 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -1743,7 +1743,7 @@ func TestSwitchSendPayment(t *testing.T) { // First check that the switch will correctly respond that this payment // ID is unknown. _, err = s.GetPaymentResult( - paymentID, newMockDeobfuscator(), + paymentID, rhash, newMockDeobfuscator(), ) if err != ErrPaymentIDNotFound { t.Fatalf("expected ErrPaymentIDNotFound, got %v", err) @@ -1761,7 +1761,7 @@ func TestSwitchSendPayment(t *testing.T) { } resultChan, err := s.GetPaymentResult( - paymentID, newMockDeobfuscator(), + paymentID, rhash, newMockDeobfuscator(), ) if err != nil { errChan <- err diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index fa8ff886..5ae959bc 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -801,7 +801,7 @@ func preparePayment(sendingPeer, receivingPeer lnpeer.Peer, return err } resultChan, err := sender.htlcSwitch.GetPaymentResult( - pid, newMockDeobfuscator(), + pid, hash, newMockDeobfuscator(), ) if err != nil { return err @@ -1289,7 +1289,7 @@ func (n *twoHopNetwork) makeHoldPayment(sendingPeer, receivingPeer lnpeer.Peer, } resultChan, err := sender.htlcSwitch.GetPaymentResult( - pid, newMockDeobfuscator(), + pid, rhash, newMockDeobfuscator(), ) if err != nil { paymentErr <- err diff --git a/routing/mock_test.go b/routing/mock_test.go index 600c5587..f2f7de78 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -52,7 +52,8 @@ func (m *mockPaymentAttemptDispatcher) SendHTLC(firstHop lnwire.ShortChannelID, } func (m *mockPaymentAttemptDispatcher) GetPaymentResult(paymentID uint64, - _ htlcswitch.ErrorDecrypter) (<-chan *htlcswitch.PaymentResult, error) { + _ lntypes.Hash, _ htlcswitch.ErrorDecrypter) ( + <-chan *htlcswitch.PaymentResult, error) { c := make(chan *htlcswitch.PaymentResult, 1) res, ok := m.results[paymentID] @@ -139,8 +140,8 @@ func (m *mockPayer) SendHTLC(_ lnwire.ShortChannelID, } -func (m *mockPayer) GetPaymentResult(paymentID uint64, _ htlcswitch.ErrorDecrypter) ( - <-chan *htlcswitch.PaymentResult, error) { +func (m *mockPayer) GetPaymentResult(paymentID uint64, _ lntypes.Hash, + _ htlcswitch.ErrorDecrypter) (<-chan *htlcswitch.PaymentResult, error) { select { case res := <-m.paymentResult: diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index 46993d2a..3710ff58 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -95,7 +95,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) { // Now ask the switch to return the result of the payment when // available. resultChan, err := p.router.cfg.Payer.GetPaymentResult( - p.attempt.PaymentID, errorDecryptor, + p.attempt.PaymentID, p.payment.PaymentHash, errorDecryptor, ) switch { diff --git a/routing/router.go b/routing/router.go index fd0f689a..8bb50742 100644 --- a/routing/router.go +++ b/routing/router.go @@ -138,7 +138,8 @@ type PaymentAttemptDispatcher interface { // HTLC is guaranteed to no longer be in flight. The switch shutting // down is signaled by closing the channel. If the paymentID is // unknown, ErrPaymentIDNotFound will be returned. - GetPaymentResult(paymentID uint64, deobfuscator htlcswitch.ErrorDecrypter) ( + GetPaymentResult(paymentID uint64, paymentHash lntypes.Hash, + deobfuscator htlcswitch.ErrorDecrypter) ( <-chan *htlcswitch.PaymentResult, error) } From 2cc778d3091a6319349cd97e90d508799faf7b7d Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 7 Jun 2019 16:42:26 +0200 Subject: [PATCH 5/8] htlcswitch/switch: use paymentResultStore to keep track of results --- htlcswitch/link_test.go | 4 +- htlcswitch/switch.go | 153 +++++++++++++++------------------------- 2 files changed, 60 insertions(+), 97 deletions(-) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index ac3a1b12..6ae141ab 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -3909,8 +3909,8 @@ func TestChannelLinkAcceptDuplicatePayment(t *testing.T) { err = n.aliceServer.htlcSwitch.SendHTLC( n.firstBobChannelLink.ShortChanID(), pid, htlc, ) - if err != ErrPaymentIDAlreadyExists { - t.Fatalf("ErrPaymentIDAlreadyExists should have been "+ + if err != ErrDuplicateAdd { + t.Fatalf("ErrDuplicateAdd should have been "+ "received got: %v", err) } diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index efbcf608..ee2c090a 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -64,16 +64,6 @@ var ( zeroPreimage [sha256.Size]byte ) -// pendingPayment represents the payment which made by user and waits for -// updates to be received whether the payment has been rejected or proceed -// successfully. -type pendingPayment struct { - paymentHash lntypes.Hash - amount lnwire.MilliSatoshi - - resultChan chan *networkResult -} - // plexPacket encapsulates switch packet and adds error channel to receive // error from request handler. type plexPacket struct { @@ -201,12 +191,12 @@ type Switch struct { // service was initialized with. cfg *Config - // pendingPayments stores payments initiated by the user that are not yet - // settled. The map is used to later look up the payments and notify the - // user of the result when they are complete. Each payment is given a unique - // integer ID when it is created. - pendingPayments map[uint64]*pendingPayment - pendingMutex sync.RWMutex + // networkResults stores the results of payments initiated by the user. + // results. The store is used to later look up the payments and notify + // the user of the result when they are complete. Each payment attempt + // should be given a unique integer ID when it is created, otherwise + // results might be overwritten. + networkResults *networkResultStore // circuits is storage for payment circuits which are used to // forward the settle/fail htlc updates back to the add htlc initiator. @@ -292,7 +282,7 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) { forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink), interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink), pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink), - pendingPayments: make(map[uint64]*pendingPayment), + networkResults: newNetworkResultStore(cfg.DB), htlcPlex: make(chan *plexPacket), chanCloseRequests: make(chan *ChanClose), resolutionMsgs: make(chan *resolutionMsg), @@ -345,12 +335,33 @@ func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) erro func (s *Switch) GetPaymentResult(paymentID uint64, paymentHash lntypes.Hash, deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) { - s.pendingMutex.Lock() - payment, ok := s.pendingPayments[paymentID] - s.pendingMutex.Unlock() + var ( + nChan <-chan *networkResult + err error + outKey = CircuitKey{ + ChanID: sourceHop, + HtlcID: paymentID, + } + ) - if !ok { - return nil, ErrPaymentIDNotFound + // If the payment is not found in the circuit map, check whether a + // result is already available. + // Assumption: no one will add this payment ID other than the caller. + if s.circuits.LookupCircuit(outKey) == nil { + res, err := s.networkResults.getResult(paymentID) + if err != nil { + return nil, err + } + c := make(chan *networkResult, 1) + c <- res + nChan = c + } else { + // The payment was committed to the circuits, subscribe for a + // result. + nChan, err = s.networkResults.subscribeResult(paymentID) + if err != nil { + return nil, err + } } resultChan := make(chan *PaymentResult, 1) @@ -364,7 +375,7 @@ func (s *Switch) GetPaymentResult(paymentID uint64, paymentHash lntypes.Hash, var n *networkResult select { - case n = <-payment.resultChan: + case n = <-nChan: case <-s.quit: // We close the result channel to signal a shutdown. We // don't send any result in this case since the HTLC is @@ -398,24 +409,6 @@ func (s *Switch) GetPaymentResult(paymentID uint64, paymentHash lntypes.Hash, func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64, htlc *lnwire.UpdateAddHTLC) error { - // Create payment and add to the map of payment in order later to be - // able to retrieve it and return response to the user. - payment := &pendingPayment{ - resultChan: make(chan *networkResult, 1), - paymentHash: htlc.PaymentHash, - amount: htlc.Amount, - } - - s.pendingMutex.Lock() - if _, ok := s.pendingPayments[paymentID]; ok { - s.pendingMutex.Unlock() - - return ErrPaymentIDAlreadyExists - } - - s.pendingPayments[paymentID] = payment - s.pendingMutex.Unlock() - // Generate and send new update packet, if error will be received on // this stage it means that packet haven't left boundaries of our // system and something wrong happened. @@ -426,12 +419,7 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64, htlc: htlc, } - if err := s.forward(packet); err != nil { - s.removePendingPayment(paymentID) - return err - } - - return nil + return s.forward(packet) } // UpdateForwardingPolicies sends a message to the switch to update the @@ -856,15 +844,34 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { // multiple db transactions. The guarantees of the circuit map are stringent // enough such that we are able to tolerate reordering of these operations // without side effects. The primary operations handled are: -// 1. Ack settle/fail references, to avoid resending this response internally -// 2. Teardown the closing circuit in the circuit map -// 3. Transition the payment status to grounded or completed. -// 4. Respond to an in-mem pending payment, if it is found. +// 1. Save the payment result to the pending payment store. +// 2. Notify subscribers about the payment result. +// 3. Ack settle/fail references, to avoid resending this response internally +// 4. Teardown the closing circuit in the circuit map // // NOTE: This method MUST be spawned as a goroutine. func (s *Switch) handleLocalResponse(pkt *htlcPacket) { defer s.wg.Done() + paymentID := pkt.incomingHTLCID + + // The error reason will be unencypted in case this a local + // failure or a converted error. + unencrypted := pkt.localFailure || pkt.convertedError + n := &networkResult{ + msg: pkt.htlc, + unencrypted: unencrypted, + isResolution: pkt.isResolution, + } + + // Store the result to the db. This will also notify subscribers about + // the result. + if err := s.networkResults.storeResult(paymentID, n); err != nil { + log.Errorf("Unable to complete payment for pid=%v: %v", + paymentID, err) + return + } + // First, we'll clean up any fwdpkg references, circuit entries, and // mark in our db that the payment for this payment hash has either // succeeded or failed. @@ -892,26 +899,6 @@ func (s *Switch) handleLocalResponse(pkt *htlcPacket) { pkt.inKey(), err) return } - - // Locate the pending payment to notify the application that this - // payment has failed. If one is not found, it likely means the daemon - // has been restarted since sending the payment. - payment := s.findPayment(pkt.incomingHTLCID) - - // The error reason will be unencypted in case this a local - // failure or a converted error. - unencrypted := pkt.localFailure || pkt.convertedError - n := &networkResult{ - msg: pkt.htlc, - unencrypted: unencrypted, - isResolution: pkt.isResolution, - } - - // Deliver the payment error and preimage to the application, if it is - // waiting for a response. - if payment != nil { - payment.resultChan <- n - } } // extractResult uses the given deobfuscator to extract the payment result from @@ -2173,30 +2160,6 @@ func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) { return channelLinks, nil } -// removePendingPayment is the helper function which removes the pending user -// payment. -func (s *Switch) removePendingPayment(paymentID uint64) { - s.pendingMutex.Lock() - defer s.pendingMutex.Unlock() - - delete(s.pendingPayments, paymentID) -} - -// findPayment is the helper function which find the payment. -func (s *Switch) findPayment(paymentID uint64) *pendingPayment { - s.pendingMutex.RLock() - defer s.pendingMutex.RUnlock() - - payment, ok := s.pendingPayments[paymentID] - if !ok { - log.Errorf("Cannot find pending payment with ID %d", - paymentID) - return nil - } - - return payment -} - // CircuitModifier returns a reference to subset of the interfaces provided by // the circuit map, to allow links to open and close circuits. func (s *Switch) CircuitModifier() CircuitModifier { From f556b375ff58579840f74858422cd9250f8c612f Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 7 Jun 2019 16:42:26 +0200 Subject: [PATCH 6/8] lnd_test: add testHoldInvoicePersistence testHoldInvoicePersistence tests that a sender to a hold-invoice, can be restarted before the payment gets settled, and still be able to receive the preimage. --- lntest/itest/lnd_test.go | 360 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 360 insertions(+) diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 6381c24f..e318df06 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -32,8 +32,10 @@ import ( "github.com/lightningnetwork/lnd" "github.com/lightningnetwork/lnd/chanbackup" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc" "github.com/lightningnetwork/lnd/lnrpc/routerrpc" "github.com/lightningnetwork/lnd/lntest" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwire" "golang.org/x/net/context" "google.golang.org/grpc" @@ -13134,6 +13136,360 @@ func testChannelBackupRestore(net *lntest.NetworkHarness, t *harnessTest) { } } +// testHoldInvoicePersistence tests that a sender to a hold-invoice, can be +// restarted before the payment gets settled, and still be able to receive the +// preimage. +func testHoldInvoicePersistence(net *lntest.NetworkHarness, t *harnessTest) { + ctxb := context.Background() + + const ( + chanAmt = btcutil.Amount(1000000) + numPayments = 10 + ) + + // Create carol, and clean up when the test finishes. + carol, err := net.NewNode("Carol", nil) + if err != nil { + t.Fatalf("unable to create new nodes: %v", err) + } + defer shutdownAndAssert(net, t, carol) + + // Connect Alice to Carol. + ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) + if err := net.ConnectNodes(ctxb, net.Alice, carol); err != nil { + t.Fatalf("unable to connect alice to carol: %v", err) + } + + // Open a channel between Alice and Carol. + ctxt, _ = context.WithTimeout(ctxb, channelOpenTimeout) + chanPointAlice := openChannelAndAssert( + ctxt, t, net, net.Alice, carol, + lntest.OpenChannelParams{ + Amt: chanAmt, + }, + ) + + // Wait for Alice and Carol to receive the channel edge from the + // funding manager. + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + err = net.Alice.WaitForNetworkChannelOpen(ctxt, chanPointAlice) + if err != nil { + t.Fatalf("alice didn't see the alice->carol channel before "+ + "timeout: %v", err) + } + + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + err = carol.WaitForNetworkChannelOpen(ctxt, chanPointAlice) + if err != nil { + t.Fatalf("alice didn't see the alice->carol channel before "+ + "timeout: %v", err) + } + + // Create preimages for all payments we are going to initiate. + var preimages []lntypes.Preimage + for i := 0; i < numPayments; i++ { + var preimage lntypes.Preimage + _, err = rand.Read(preimage[:]) + if err != nil { + t.Fatalf("unable to generate preimage: %v", err) + } + + preimages = append(preimages, preimage) + } + + // Let Carol create hold-invoices for all the payments. + var ( + payAmt = btcutil.Amount(4) + payReqs []string + invoiceStreams []invoicesrpc.Invoices_SubscribeSingleInvoiceClient + ) + + for _, preimage := range preimages { + payHash := preimage.Hash() + invoiceReq := &invoicesrpc.AddHoldInvoiceRequest{ + Memo: "testing", + Value: int64(payAmt), + Hash: payHash[:], + } + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + resp, err := carol.AddHoldInvoice(ctxt, invoiceReq) + if err != nil { + t.Fatalf("unable to add invoice: %v", err) + } + + ctx, cancel := context.WithCancel(ctxb) + defer cancel() + + stream, err := carol.SubscribeSingleInvoice( + ctx, + &invoicesrpc.SubscribeSingleInvoiceRequest{ + RHash: payHash[:], + }, + ) + if err != nil { + t.Fatalf("unable to subscribe to invoice: %v", err) + } + + invoiceStreams = append(invoiceStreams, stream) + payReqs = append(payReqs, resp.PaymentRequest) + } + + // Wait for all the invoices to reach the OPEN state. + for _, stream := range invoiceStreams { + invoice, err := stream.Recv() + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoice.State != lnrpc.Invoice_OPEN { + t.Fatalf("expected OPEN, got state: %v", invoice.State) + } + } + + // Let Alice initiate payments for all the created invoices. + var paymentStreams []routerrpc.Router_SendPaymentClient + for _, payReq := range payReqs { + ctx, cancel := context.WithCancel(ctxb) + defer cancel() + + payStream, err := net.Alice.RouterClient.SendPayment( + ctx, &routerrpc.SendPaymentRequest{ + PaymentRequest: payReq, + TimeoutSeconds: 60, + FeeLimitSat: 1000000, + }, + ) + if err != nil { + t.Fatalf("unable to send alice htlc: %v", err) + } + + paymentStreams = append(paymentStreams, payStream) + } + + // Wait for inlight status update. + for _, payStream := range paymentStreams { + status, err := payStream.Recv() + if err != nil { + t.Fatalf("Failed receiving status update: %v", err) + } + + if status.State != routerrpc.PaymentState_IN_FLIGHT { + t.Fatalf("state not in flight: %v", status.State) + } + } + + // The payments should now show up in Alice's ListInvoices, with a zero + // preimage, indicating they are not yet settled. + err = lntest.WaitNoError(func() error { + req := &lnrpc.ListPaymentsRequest{} + ctxt, _ = context.WithTimeout(ctxt, defaultTimeout) + paymentsResp, err := net.Alice.ListPayments(ctxt, req) + if err != nil { + return fmt.Errorf("error when obtaining payments: %v", + err) + } + + // Gather the payment hashes we are looking for in the + // response. + payHashes := make(map[string]struct{}) + for _, preimg := range preimages { + payHashes[preimg.Hash().String()] = struct{}{} + } + + var zeroPreimg lntypes.Preimage + for _, payment := range paymentsResp.Payments { + _, ok := payHashes[payment.PaymentHash] + if !ok { + continue + } + + // The preimage should NEVER be non-zero at this point. + if payment.PaymentPreimage != zeroPreimg.String() { + t.Fatalf("expected zero preimage, got %v", + payment.PaymentPreimage) + } + + // We wait for the payment attempt to have been + // properly recorded in the DB. + if len(payment.Path) == 0 { + return fmt.Errorf("path is empty") + } + + delete(payHashes, payment.PaymentHash) + } + + if len(payHashes) != 0 { + return fmt.Errorf("payhash not found in response") + } + + return nil + }, time.Second*15) + if err != nil { + t.Fatalf("predicate not satisfied: %v", err) + } + + // Wait for all invoices to be accepted. + for _, stream := range invoiceStreams { + invoice, err := stream.Recv() + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoice.State != lnrpc.Invoice_ACCEPTED { + t.Fatalf("expected ACCEPTED, got state: %v", + invoice.State) + } + } + + // Restart alice. This to ensure she will still be able to handle + // settling the invoices after a restart. + if err := net.RestartNode(net.Alice, nil); err != nil { + t.Fatalf("Node restart failed: %v", err) + } + + // Now after a restart, we must re-track the payments. We set up a + // goroutine for each to track thir status updates. + var ( + statusUpdates []chan *routerrpc.PaymentStatus + wg sync.WaitGroup + quit = make(chan struct{}) + ) + + defer close(quit) + for _, preimg := range preimages { + hash := preimg.Hash() + + ctx, cancel := context.WithCancel(ctxb) + defer cancel() + + payStream, err := net.Alice.RouterClient.TrackPayment( + ctx, &routerrpc.TrackPaymentRequest{ + PaymentHash: hash[:], + }, + ) + if err != nil { + t.Fatalf("unable to send track payment: %v", err) + } + + // We set up a channel where we'll forward any status update. + upd := make(chan *routerrpc.PaymentStatus) + wg.Add(1) + go func() { + defer wg.Done() + + for { + status, err := payStream.Recv() + if err != nil { + close(upd) + return + } + + select { + case upd <- status: + case <-quit: + return + } + } + }() + + statusUpdates = append(statusUpdates, upd) + } + + // Wait for the infligt status update. + for _, upd := range statusUpdates { + select { + case status, ok := <-upd: + if !ok { + t.Fatalf("failed getting status update") + } + + if status.State != routerrpc.PaymentState_IN_FLIGHT { + t.Fatalf("state not in in flight: %v", + status.State) + } + case <-time.After(5 * time.Second): + t.Fatalf("in flight status not recevied") + } + } + + // Settle invoices half the invoices, cancel the rest. + for i, preimage := range preimages { + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + if i%2 == 0 { + settle := &invoicesrpc.SettleInvoiceMsg{ + Preimage: preimage[:], + } + _, err = carol.SettleInvoice(ctxt, settle) + } else { + hash := preimage.Hash() + settle := &invoicesrpc.CancelInvoiceMsg{ + PaymentHash: hash[:], + } + _, err = carol.CancelInvoice(ctxt, settle) + } + if err != nil { + t.Fatalf("unable to cancel/settle invoice: %v", err) + } + } + + // Make sure we get the expected status update. + for i, upd := range statusUpdates { + select { + case status, ok := <-upd: + if !ok { + t.Fatalf("failed getting status update") + } + + if i%2 == 0 { + if status.State != routerrpc.PaymentState_SUCCEEDED { + t.Fatalf("state not suceeded : %v", + status.State) + } + } else { + if status.State != routerrpc.PaymentState_FAILED_NO_ROUTE { + t.Fatalf("state not failed: %v", + status.State) + } + } + case <-time.After(5 * time.Second): + t.Fatalf("in flight status not recevied") + } + } + + // Check that Alice's invoices to be shown as settled and failed + // accordingly, and preimages matching up. + req := &lnrpc.ListPaymentsRequest{} + ctxt, _ = context.WithTimeout(ctxt, defaultTimeout) + paymentsResp, err := net.Alice.ListPayments(ctxt, req) + if err != nil { + t.Fatalf("error when obtaining Alice payments: %v", err) + } + for i, preimage := range preimages { + paymentHash := preimage.Hash() + var p string + for _, resp := range paymentsResp.Payments { + if resp.PaymentHash == paymentHash.String() { + p = resp.PaymentPreimage + break + } + } + if p == "" { + t.Fatalf("payment not found") + } + + if i%2 == 0 { + if p != preimage.String() { + t.Fatalf("preimage doesn't match: %v vs %v", + p, preimage.String()) + } + } else { + if p != lntypes.ZeroHash.String() { + t.Fatalf("preimage not zero: %v", p) + } + } + } +} + type testCase struct { name string test func(net *lntest.NetworkHarness, t *harnessTest) @@ -13373,6 +13729,10 @@ var testsCases = []*testCase{ name: "channel backup restore", test: testChannelBackupRestore, }, + { + name: "hold invoice sender persistence", + test: testHoldInvoicePersistence, + }, } // TestLightningNetworkDaemon performs a series of integration tests amongst a From dd3abbc4ef79af68b10302b01cdc827c6d7a8e74 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 7 Jun 2019 16:42:26 +0200 Subject: [PATCH 7/8] htlcswitch/payment_result_test: add TestNetworkResultStore TestNetworkResultStore tests that the networkResult store behaves as expected, and that we can store, get and subscribe to results. --- htlcswitch/payment_result_test.go | 102 ++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/htlcswitch/payment_result_test.go b/htlcswitch/payment_result_test.go index 4b45bc9a..9683f405 100644 --- a/htlcswitch/payment_result_test.go +++ b/htlcswitch/payment_result_test.go @@ -2,11 +2,14 @@ package htlcswitch import ( "bytes" + "io/ioutil" "math/rand" "reflect" "testing" + "time" "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwire" ) @@ -88,3 +91,102 @@ func TestNetworkResultSerialization(t *testing.T) { } } } + +// TestNetworkResultStore tests that the networkResult store behaves as +// expected, and that we can store, get and subscribe to results. +func TestNetworkResultStore(t *testing.T) { + t.Parallel() + + const numResults = 4 + + tempDir, err := ioutil.TempDir("", "testdb") + db, err := channeldb.Open(tempDir) + if err != nil { + t.Fatal(err) + } + + store := newNetworkResultStore(db) + + var results []*networkResult + for i := 0; i < numResults; i++ { + n := &networkResult{ + msg: &lnwire.UpdateAddHTLC{}, + unencrypted: true, + isResolution: true, + } + results = append(results, n) + } + + // Subscribe to 2 of them. + var subs []<-chan *networkResult + for i := uint64(0); i < 2; i++ { + sub, err := store.subscribeResult(i) + if err != nil { + t.Fatalf("unable to subscribe: %v", err) + } + subs = append(subs, sub) + } + + // Store three of them. + for i := uint64(0); i < 3; i++ { + err := store.storeResult(i, results[i]) + if err != nil { + t.Fatalf("unable to store result: %v", err) + } + } + + // The two subscribers should be notified. + for _, sub := range subs { + select { + case <-sub: + case <-time.After(1 * time.Second): + t.Fatalf("no result received") + } + } + + // Let the third one subscribe now. THe result should be received + // immediately. + sub, err := store.subscribeResult(2) + if err != nil { + t.Fatalf("unable to subscribe: %v", err) + } + select { + case <-sub: + case <-time.After(1 * time.Second): + t.Fatalf("no result received") + } + + // Try fetching the result directly for the non-stored one. This should + // fail. + _, err = store.getResult(3) + if err != ErrPaymentIDNotFound { + t.Fatalf("expected ErrPaymentIDNotFound, got %v", err) + } + + // Add the result and try again. + err = store.storeResult(3, results[3]) + if err != nil { + t.Fatalf("unable to store result: %v", err) + } + + _, err = store.getResult(3) + if err != nil { + t.Fatalf("unable to get result: %v", err) + } + + // Since we don't delete results from the store (yet), make sure we + // will get subscriptions for all of them. + // TODO(halseth): check deletion when we have reliable handoff. + for i := uint64(0); i < numResults; i++ { + sub, err := store.subscribeResult(i) + if err != nil { + t.Fatalf("unable to subscribe: %v", err) + } + + select { + case <-sub: + case <-time.After(1 * time.Second): + t.Fatalf("no result received") + } + } +} From dd88015985e80210aba1b23e45fa4f9dfafd9083 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 7 Jun 2019 16:42:26 +0200 Subject: [PATCH 8/8] htlcswitch/switch test: add TestSwitchGetPaymentResult TestSwitchGetPaymentResult tests that the switch interacts as expected with the circuit map and network result store when looking up the result of a payment ID. This is important for not to lose results under concurrent lookup and receiving results. --- htlcswitch/mock.go | 55 +++++++++++++++++++ htlcswitch/switch_test.go | 113 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+) diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index df059914..1f681264 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -916,3 +916,58 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte, Spend: make(chan *chainntnfs.SpendDetail), }, nil } + +type mockCircuitMap struct { + lookup chan *PaymentCircuit +} + +var _ CircuitMap = (*mockCircuitMap)(nil) + +func (m *mockCircuitMap) OpenCircuits(...Keystone) error { + return nil +} + +func (m *mockCircuitMap) TrimOpenCircuits(chanID lnwire.ShortChannelID, + start uint64) error { + return nil +} + +func (m *mockCircuitMap) DeleteCircuits(inKeys ...CircuitKey) error { + return nil +} + +func (m *mockCircuitMap) CommitCircuits( + circuit ...*PaymentCircuit) (*CircuitFwdActions, error) { + + return nil, nil +} + +func (m *mockCircuitMap) CloseCircuit(outKey CircuitKey) (*PaymentCircuit, + error) { + return nil, nil +} + +func (m *mockCircuitMap) FailCircuit(inKey CircuitKey) (*PaymentCircuit, + error) { + return nil, nil +} + +func (m *mockCircuitMap) LookupCircuit(inKey CircuitKey) *PaymentCircuit { + return <-m.lookup +} + +func (m *mockCircuitMap) LookupOpenCircuit(outKey CircuitKey) *PaymentCircuit { + return nil +} + +func (m *mockCircuitMap) LookupByPaymentHash(hash [32]byte) []*PaymentCircuit { + return nil +} + +func (m *mockCircuitMap) NumPending() int { + return 0 +} + +func (m *mockCircuitMap) NumOpen() int { + return 0 +} diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index af7c0fca..6a0a24e9 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -14,6 +14,7 @@ import ( "github.com/btcsuite/fastsha256" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/ticker" ) @@ -2125,3 +2126,115 @@ func TestUpdateFailMalformedHTLCErrorConversion(t *testing.T) { assertPaymentFailure(t) }) } + +// TestSwitchGetPaymentResult tests that the switch interacts as expected with +// the circuit map and network result store when looking up the result of a +// payment ID. This is important for not to lose results under concurrent +// lookup and receiving results. +func TestSwitchGetPaymentResult(t *testing.T) { + t.Parallel() + + const paymentID = 123 + var preimg lntypes.Preimage + preimg[0] = 3 + + s, err := initSwitchWithDB(testStartingHeight, nil) + if err != nil { + t.Fatalf("unable to init switch: %v", err) + } + if err := s.Start(); err != nil { + t.Fatalf("unable to start switch: %v", err) + } + defer s.Stop() + + lookup := make(chan *PaymentCircuit, 1) + s.circuits = &mockCircuitMap{ + lookup: lookup, + } + + // If the payment circuit is not found in the circuit map, the payment + // result must be found in the store if available. Since we haven't + // added anything to the store yet, ErrPaymentIDNotFound should be + // returned. + lookup <- nil + _, err = s.GetPaymentResult( + paymentID, lntypes.Hash{}, newMockDeobfuscator(), + ) + if err != ErrPaymentIDNotFound { + t.Fatalf("expected ErrPaymentIDNotFound, got %v", err) + } + + // Next let the lookup find the circuit in the circuit map. It should + // subscribe to payment results, and return the result when available. + lookup <- &PaymentCircuit{} + resultChan, err := s.GetPaymentResult( + paymentID, lntypes.Hash{}, newMockDeobfuscator(), + ) + if err != nil { + t.Fatalf("unable to get payment result: %v", err) + } + + // Add the result to the store. + n := &networkResult{ + msg: &lnwire.UpdateFulfillHTLC{ + PaymentPreimage: preimg, + }, + unencrypted: true, + isResolution: true, + } + + err = s.networkResults.storeResult(paymentID, n) + if err != nil { + t.Fatalf("unable to store result: %v", err) + } + + // The result should be availble. + select { + case res, ok := <-resultChan: + if !ok { + t.Fatalf("channel was closed") + } + + if res.Error != nil { + t.Fatalf("got unexpected error result") + } + + if res.Preimage != preimg { + t.Fatalf("expected preimg %v, got %v", + preimg, res.Preimage) + } + + case <-time.After(1 * time.Second): + t.Fatalf("result not received") + } + + // As a final test, try to get the result again. Now that is no longer + // in the circuit map, it should be immediately available from the + // store. + lookup <- nil + resultChan, err = s.GetPaymentResult( + paymentID, lntypes.Hash{}, newMockDeobfuscator(), + ) + if err != nil { + t.Fatalf("unable to get payment result: %v", err) + } + + select { + case res, ok := <-resultChan: + if !ok { + t.Fatalf("channel was closed") + } + + if res.Error != nil { + t.Fatalf("got unexpected error result") + } + + if res.Preimage != preimg { + t.Fatalf("expected preimg %v, got %v", + preimg, res.Preimage) + } + + case <-time.After(1 * time.Second): + t.Fatalf("result not received") + } +}