Merge pull request #2762 from halseth/reliable-payments-lookup-circuitmap
[reliable payments] persist htlcswitch pending payments
This commit is contained in:
commit
e45d4d703a
@ -1118,7 +1118,7 @@ func TestChannelLinkMultiHopUnknownPaymentHash(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
resultChan, err := n.aliceServer.htlcSwitch.GetPaymentResult(
|
resultChan, err := n.aliceServer.htlcSwitch.GetPaymentResult(
|
||||||
pid, newMockDeobfuscator(),
|
pid, htlc.PaymentHash, newMockDeobfuscator(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to get payment result: %v", err)
|
t.Fatalf("unable to get payment result: %v", err)
|
||||||
@ -3898,7 +3898,7 @@ func TestChannelLinkAcceptDuplicatePayment(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
resultChan, err := n.aliceServer.htlcSwitch.GetPaymentResult(
|
resultChan, err := n.aliceServer.htlcSwitch.GetPaymentResult(
|
||||||
pid, newMockDeobfuscator(),
|
pid, htlc.PaymentHash, newMockDeobfuscator(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to get payment result: %v", err)
|
t.Fatalf("unable to get payment result: %v", err)
|
||||||
@ -3909,8 +3909,8 @@ func TestChannelLinkAcceptDuplicatePayment(t *testing.T) {
|
|||||||
err = n.aliceServer.htlcSwitch.SendHTLC(
|
err = n.aliceServer.htlcSwitch.SendHTLC(
|
||||||
n.firstBobChannelLink.ShortChanID(), pid, htlc,
|
n.firstBobChannelLink.ShortChanID(), pid, htlc,
|
||||||
)
|
)
|
||||||
if err != ErrPaymentIDAlreadyExists {
|
if err != ErrDuplicateAdd {
|
||||||
t.Fatalf("ErrPaymentIDAlreadyExists should have been "+
|
t.Fatalf("ErrDuplicateAdd should have been "+
|
||||||
"received got: %v", err)
|
"received got: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,7 +163,10 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
priv, _ := btcec.NewPrivateKey(btcec.S256())
|
||||||
|
pubkey := priv.PubKey()
|
||||||
cfg := Config{
|
cfg := Config{
|
||||||
|
SelfKey: pubkey,
|
||||||
DB: db,
|
DB: db,
|
||||||
SwitchPackager: channeldb.NewSwitchPackager(),
|
SwitchPackager: channeldb.NewSwitchPackager(),
|
||||||
FwdingLog: &mockForwardingLog{
|
FwdingLog: &mockForwardingLog{
|
||||||
@ -390,7 +393,11 @@ func (o *mockDeobfuscator) DecryptError(reason lnwire.OpaqueReason) (*Forwarding
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
priv, _ := btcec.NewPrivateKey(btcec.S256())
|
||||||
|
pubkey := priv.PubKey()
|
||||||
|
|
||||||
return &ForwardingError{
|
return &ForwardingError{
|
||||||
|
ErrorSource: pubkey,
|
||||||
FailureMessage: failure,
|
FailureMessage: failure,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@ -909,3 +916,58 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte,
|
|||||||
Spend: make(chan *chainntnfs.SpendDetail),
|
Spend: make(chan *chainntnfs.SpendDetail),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mockCircuitMap struct {
|
||||||
|
lookup chan *PaymentCircuit
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ CircuitMap = (*mockCircuitMap)(nil)
|
||||||
|
|
||||||
|
func (m *mockCircuitMap) OpenCircuits(...Keystone) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockCircuitMap) TrimOpenCircuits(chanID lnwire.ShortChannelID,
|
||||||
|
start uint64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockCircuitMap) DeleteCircuits(inKeys ...CircuitKey) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockCircuitMap) CommitCircuits(
|
||||||
|
circuit ...*PaymentCircuit) (*CircuitFwdActions, error) {
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockCircuitMap) CloseCircuit(outKey CircuitKey) (*PaymentCircuit,
|
||||||
|
error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockCircuitMap) FailCircuit(inKey CircuitKey) (*PaymentCircuit,
|
||||||
|
error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockCircuitMap) LookupCircuit(inKey CircuitKey) *PaymentCircuit {
|
||||||
|
return <-m.lookup
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockCircuitMap) LookupOpenCircuit(outKey CircuitKey) *PaymentCircuit {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockCircuitMap) LookupByPaymentHash(hash [32]byte) []*PaymentCircuit {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockCircuitMap) NumPending() int {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockCircuitMap) NumOpen() int {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
@ -1,12 +1,24 @@
|
|||||||
package htlcswitch
|
package htlcswitch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
|
"io"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/coreos/bbolt"
|
||||||
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/lightningnetwork/lnd/multimutex"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
||||||
|
// networkResultStoreBucketKey is used for the root level bucket that
|
||||||
|
// stores the network result for each payment ID.
|
||||||
|
networkResultStoreBucketKey = []byte("network-result-store-bucket")
|
||||||
|
|
||||||
// ErrPaymentIDNotFound is an error returned if the given paymentID is
|
// ErrPaymentIDNotFound is an error returned if the given paymentID is
|
||||||
// not found.
|
// not found.
|
||||||
ErrPaymentIDNotFound = errors.New("paymentID not found")
|
ErrPaymentIDNotFound = errors.New("paymentID not found")
|
||||||
@ -46,3 +58,203 @@ type networkResult struct {
|
|||||||
// which the failure reason might not be included.
|
// which the failure reason might not be included.
|
||||||
isResolution bool
|
isResolution bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// serializeNetworkResult serializes the networkResult.
|
||||||
|
func serializeNetworkResult(w io.Writer, n *networkResult) error {
|
||||||
|
if _, err := lnwire.WriteMessage(w, n.msg, 0); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return channeldb.WriteElements(w, n.unencrypted, n.isResolution)
|
||||||
|
}
|
||||||
|
|
||||||
|
// deserializeNetworkResult deserializes the networkResult.
|
||||||
|
func deserializeNetworkResult(r io.Reader) (*networkResult, error) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
n := &networkResult{}
|
||||||
|
|
||||||
|
n.msg, err = lnwire.ReadMessage(r, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := channeldb.ReadElements(r,
|
||||||
|
&n.unencrypted, &n.isResolution,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// networkResultStore is a persistent store that stores any results of HTLCs in
|
||||||
|
// flight on the network. Since payment results are inherently asynchronous, it
|
||||||
|
// is used as a common access point for senders of HTLCs, to know when a result
|
||||||
|
// is back. The Switch will checkpoint any received result to the store, and
|
||||||
|
// the store will keep results and notify the callers about them.
|
||||||
|
type networkResultStore struct {
|
||||||
|
db *channeldb.DB
|
||||||
|
|
||||||
|
// results is a map from paymentIDs to channels where subscribers to
|
||||||
|
// payment results will be notified.
|
||||||
|
results map[uint64][]chan *networkResult
|
||||||
|
resultsMtx sync.Mutex
|
||||||
|
|
||||||
|
// paymentIDMtx is a multimutex used to make sure the database and
|
||||||
|
// result subscribers map is consistent for each payment ID in case of
|
||||||
|
// concurrent callers.
|
||||||
|
paymentIDMtx *multimutex.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func newNetworkResultStore(db *channeldb.DB) *networkResultStore {
|
||||||
|
return &networkResultStore{
|
||||||
|
db: db,
|
||||||
|
results: make(map[uint64][]chan *networkResult),
|
||||||
|
paymentIDMtx: multimutex.NewMutex(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// storeResult stores the networkResult for the given paymentID, and
|
||||||
|
// notifies any subscribers.
|
||||||
|
func (store *networkResultStore) storeResult(paymentID uint64,
|
||||||
|
result *networkResult) error {
|
||||||
|
|
||||||
|
// We get a mutex for this payment ID. This is needed to ensure
|
||||||
|
// consistency between the database state and the subscribers in case
|
||||||
|
// of concurrent calls.
|
||||||
|
store.paymentIDMtx.Lock(paymentID)
|
||||||
|
defer store.paymentIDMtx.Unlock(paymentID)
|
||||||
|
|
||||||
|
// Serialize the payment result.
|
||||||
|
var b bytes.Buffer
|
||||||
|
if err := serializeNetworkResult(&b, result); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var paymentIDBytes [8]byte
|
||||||
|
binary.BigEndian.PutUint64(paymentIDBytes[:], paymentID)
|
||||||
|
|
||||||
|
err := store.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
|
networkResults, err := tx.CreateBucketIfNotExists(
|
||||||
|
networkResultStoreBucketKey,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return networkResults.Put(paymentIDBytes[:], b.Bytes())
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that the result is stored in the database, we can notify any
|
||||||
|
// active subscribers.
|
||||||
|
store.resultsMtx.Lock()
|
||||||
|
for _, res := range store.results[paymentID] {
|
||||||
|
res <- result
|
||||||
|
}
|
||||||
|
delete(store.results, paymentID)
|
||||||
|
store.resultsMtx.Unlock()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// subscribeResult is used to get the payment result for the given
|
||||||
|
// payment ID. It returns a channel on which the result will be delivered when
|
||||||
|
// ready.
|
||||||
|
func (store *networkResultStore) subscribeResult(paymentID uint64) (
|
||||||
|
<-chan *networkResult, error) {
|
||||||
|
|
||||||
|
// We get a mutex for this payment ID. This is needed to ensure
|
||||||
|
// consistency between the database state and the subscribers in case
|
||||||
|
// of concurrent calls.
|
||||||
|
store.paymentIDMtx.Lock(paymentID)
|
||||||
|
defer store.paymentIDMtx.Unlock(paymentID)
|
||||||
|
|
||||||
|
var (
|
||||||
|
result *networkResult
|
||||||
|
resultChan = make(chan *networkResult, 1)
|
||||||
|
)
|
||||||
|
|
||||||
|
err := store.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
var err error
|
||||||
|
result, err = fetchResult(tx, paymentID)
|
||||||
|
switch {
|
||||||
|
|
||||||
|
// Result not yet available, we will notify once a result is
|
||||||
|
// available.
|
||||||
|
case err == ErrPaymentIDNotFound:
|
||||||
|
return nil
|
||||||
|
|
||||||
|
case err != nil:
|
||||||
|
return err
|
||||||
|
|
||||||
|
// The result was found, and will be returned immediately.
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the result was found, we can send it on the result channel
|
||||||
|
// imemdiately.
|
||||||
|
if result != nil {
|
||||||
|
resultChan <- result
|
||||||
|
return resultChan, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise we store the result channel for when the result is
|
||||||
|
// available.
|
||||||
|
store.resultsMtx.Lock()
|
||||||
|
store.results[paymentID] = append(
|
||||||
|
store.results[paymentID], resultChan,
|
||||||
|
)
|
||||||
|
store.resultsMtx.Unlock()
|
||||||
|
|
||||||
|
return resultChan, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getResult attempts to immediately fetch the result for the given pid from
|
||||||
|
// the store. If no result is available, ErrPaymentIDNotFound is returned.
|
||||||
|
func (store *networkResultStore) getResult(pid uint64) (
|
||||||
|
*networkResult, error) {
|
||||||
|
|
||||||
|
var result *networkResult
|
||||||
|
err := store.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
var err error
|
||||||
|
result, err = fetchResult(tx, pid)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchResult(tx *bbolt.Tx, pid uint64) (*networkResult, error) {
|
||||||
|
var paymentIDBytes [8]byte
|
||||||
|
binary.BigEndian.PutUint64(paymentIDBytes[:], pid)
|
||||||
|
|
||||||
|
networkResults := tx.Bucket(networkResultStoreBucketKey)
|
||||||
|
if networkResults == nil {
|
||||||
|
return nil, ErrPaymentIDNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check whether a result is already available.
|
||||||
|
resultBytes := networkResults.Get(paymentIDBytes[:])
|
||||||
|
if resultBytes == nil {
|
||||||
|
return nil, ErrPaymentIDNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decode the result we found.
|
||||||
|
r := bytes.NewReader(resultBytes)
|
||||||
|
|
||||||
|
return deserializeNetworkResult(r)
|
||||||
|
}
|
||||||
|
192
htlcswitch/payment_result_test.go
Normal file
192
htlcswitch/payment_result_test.go
Normal file
@ -0,0 +1,192 @@
|
|||||||
|
package htlcswitch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io/ioutil"
|
||||||
|
"math/rand"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/davecgh/go-spew/spew"
|
||||||
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
|
"github.com/lightningnetwork/lnd/lntypes"
|
||||||
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestNetworkResultSerialization checks that NetworkResults are properly
|
||||||
|
// (de)serialized.
|
||||||
|
func TestNetworkResultSerialization(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
var preimage lntypes.Preimage
|
||||||
|
if _, err := rand.Read(preimage[:]); err != nil {
|
||||||
|
t.Fatalf("unable gen rand preimag: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var chanID lnwire.ChannelID
|
||||||
|
if _, err := rand.Read(chanID[:]); err != nil {
|
||||||
|
t.Fatalf("unable gen rand chanid: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var reason [256]byte
|
||||||
|
if _, err := rand.Read(reason[:]); err != nil {
|
||||||
|
t.Fatalf("unable gen rand reason: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
settle := &lnwire.UpdateFulfillHTLC{
|
||||||
|
ChanID: chanID,
|
||||||
|
ID: 2,
|
||||||
|
PaymentPreimage: preimage,
|
||||||
|
}
|
||||||
|
|
||||||
|
fail := &lnwire.UpdateFailHTLC{
|
||||||
|
ChanID: chanID,
|
||||||
|
ID: 1,
|
||||||
|
Reason: []byte{},
|
||||||
|
}
|
||||||
|
|
||||||
|
fail2 := &lnwire.UpdateFailHTLC{
|
||||||
|
ChanID: chanID,
|
||||||
|
ID: 1,
|
||||||
|
Reason: reason[:],
|
||||||
|
}
|
||||||
|
|
||||||
|
testCases := []*networkResult{
|
||||||
|
{
|
||||||
|
msg: settle,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
msg: fail,
|
||||||
|
unencrypted: false,
|
||||||
|
isResolution: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
msg: fail,
|
||||||
|
unencrypted: false,
|
||||||
|
isResolution: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
msg: fail2,
|
||||||
|
unencrypted: true,
|
||||||
|
isResolution: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range testCases {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
if err := serializeNetworkResult(&buf, p); err != nil {
|
||||||
|
t.Fatalf("serialize failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r := bytes.NewReader(buf.Bytes())
|
||||||
|
p1, err := deserializeNetworkResult(r)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to deserizlize: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(p, p1) {
|
||||||
|
t.Fatalf("not equal. %v vs %v", spew.Sdump(p),
|
||||||
|
spew.Sdump(p1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNetworkResultStore tests that the networkResult store behaves as
|
||||||
|
// expected, and that we can store, get and subscribe to results.
|
||||||
|
func TestNetworkResultStore(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
const numResults = 4
|
||||||
|
|
||||||
|
tempDir, err := ioutil.TempDir("", "testdb")
|
||||||
|
db, err := channeldb.Open(tempDir)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
store := newNetworkResultStore(db)
|
||||||
|
|
||||||
|
var results []*networkResult
|
||||||
|
for i := 0; i < numResults; i++ {
|
||||||
|
n := &networkResult{
|
||||||
|
msg: &lnwire.UpdateAddHTLC{},
|
||||||
|
unencrypted: true,
|
||||||
|
isResolution: true,
|
||||||
|
}
|
||||||
|
results = append(results, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe to 2 of them.
|
||||||
|
var subs []<-chan *networkResult
|
||||||
|
for i := uint64(0); i < 2; i++ {
|
||||||
|
sub, err := store.subscribeResult(i)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to subscribe: %v", err)
|
||||||
|
}
|
||||||
|
subs = append(subs, sub)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store three of them.
|
||||||
|
for i := uint64(0); i < 3; i++ {
|
||||||
|
err := store.storeResult(i, results[i])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to store result: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The two subscribers should be notified.
|
||||||
|
for _, sub := range subs {
|
||||||
|
select {
|
||||||
|
case <-sub:
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Fatalf("no result received")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let the third one subscribe now. THe result should be received
|
||||||
|
// immediately.
|
||||||
|
sub, err := store.subscribeResult(2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to subscribe: %v", err)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-sub:
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Fatalf("no result received")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try fetching the result directly for the non-stored one. This should
|
||||||
|
// fail.
|
||||||
|
_, err = store.getResult(3)
|
||||||
|
if err != ErrPaymentIDNotFound {
|
||||||
|
t.Fatalf("expected ErrPaymentIDNotFound, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the result and try again.
|
||||||
|
err = store.storeResult(3, results[3])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to store result: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = store.getResult(3)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to get result: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since we don't delete results from the store (yet), make sure we
|
||||||
|
// will get subscriptions for all of them.
|
||||||
|
// TODO(halseth): check deletion when we have reliable handoff.
|
||||||
|
for i := uint64(0); i < numResults; i++ {
|
||||||
|
sub, err := store.subscribeResult(i)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to subscribe: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-sub:
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Fatalf("no result received")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -64,16 +64,6 @@ var (
|
|||||||
zeroPreimage [sha256.Size]byte
|
zeroPreimage [sha256.Size]byte
|
||||||
)
|
)
|
||||||
|
|
||||||
// pendingPayment represents the payment which made by user and waits for
|
|
||||||
// updates to be received whether the payment has been rejected or proceed
|
|
||||||
// successfully.
|
|
||||||
type pendingPayment struct {
|
|
||||||
paymentHash lntypes.Hash
|
|
||||||
amount lnwire.MilliSatoshi
|
|
||||||
|
|
||||||
resultChan chan *networkResult
|
|
||||||
}
|
|
||||||
|
|
||||||
// plexPacket encapsulates switch packet and adds error channel to receive
|
// plexPacket encapsulates switch packet and adds error channel to receive
|
||||||
// error from request handler.
|
// error from request handler.
|
||||||
type plexPacket struct {
|
type plexPacket struct {
|
||||||
@ -201,12 +191,12 @@ type Switch struct {
|
|||||||
// service was initialized with.
|
// service was initialized with.
|
||||||
cfg *Config
|
cfg *Config
|
||||||
|
|
||||||
// pendingPayments stores payments initiated by the user that are not yet
|
// networkResults stores the results of payments initiated by the user.
|
||||||
// settled. The map is used to later look up the payments and notify the
|
// results. The store is used to later look up the payments and notify
|
||||||
// user of the result when they are complete. Each payment is given a unique
|
// the user of the result when they are complete. Each payment attempt
|
||||||
// integer ID when it is created.
|
// should be given a unique integer ID when it is created, otherwise
|
||||||
pendingPayments map[uint64]*pendingPayment
|
// results might be overwritten.
|
||||||
pendingMutex sync.RWMutex
|
networkResults *networkResultStore
|
||||||
|
|
||||||
// circuits is storage for payment circuits which are used to
|
// circuits is storage for payment circuits which are used to
|
||||||
// forward the settle/fail htlc updates back to the add htlc initiator.
|
// forward the settle/fail htlc updates back to the add htlc initiator.
|
||||||
@ -292,7 +282,7 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) {
|
|||||||
forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink),
|
forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink),
|
||||||
interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink),
|
interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink),
|
||||||
pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink),
|
pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink),
|
||||||
pendingPayments: make(map[uint64]*pendingPayment),
|
networkResults: newNetworkResultStore(cfg.DB),
|
||||||
htlcPlex: make(chan *plexPacket),
|
htlcPlex: make(chan *plexPacket),
|
||||||
chanCloseRequests: make(chan *ChanClose),
|
chanCloseRequests: make(chan *ChanClose),
|
||||||
resolutionMsgs: make(chan *resolutionMsg),
|
resolutionMsgs: make(chan *resolutionMsg),
|
||||||
@ -342,15 +332,36 @@ func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) erro
|
|||||||
// result is received on the channel, the HTLC is guaranteed to no longer be in
|
// result is received on the channel, the HTLC is guaranteed to no longer be in
|
||||||
// flight. The switch shutting down is signaled by closing the channel. If the
|
// flight. The switch shutting down is signaled by closing the channel. If the
|
||||||
// paymentID is unknown, ErrPaymentIDNotFound will be returned.
|
// paymentID is unknown, ErrPaymentIDNotFound will be returned.
|
||||||
func (s *Switch) GetPaymentResult(paymentID uint64,
|
func (s *Switch) GetPaymentResult(paymentID uint64, paymentHash lntypes.Hash,
|
||||||
deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) {
|
deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) {
|
||||||
|
|
||||||
s.pendingMutex.Lock()
|
var (
|
||||||
payment, ok := s.pendingPayments[paymentID]
|
nChan <-chan *networkResult
|
||||||
s.pendingMutex.Unlock()
|
err error
|
||||||
|
outKey = CircuitKey{
|
||||||
|
ChanID: sourceHop,
|
||||||
|
HtlcID: paymentID,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
if !ok {
|
// If the payment is not found in the circuit map, check whether a
|
||||||
return nil, ErrPaymentIDNotFound
|
// result is already available.
|
||||||
|
// Assumption: no one will add this payment ID other than the caller.
|
||||||
|
if s.circuits.LookupCircuit(outKey) == nil {
|
||||||
|
res, err := s.networkResults.getResult(paymentID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
c := make(chan *networkResult, 1)
|
||||||
|
c <- res
|
||||||
|
nChan = c
|
||||||
|
} else {
|
||||||
|
// The payment was committed to the circuits, subscribe for a
|
||||||
|
// result.
|
||||||
|
nChan, err = s.networkResults.subscribeResult(paymentID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
resultChan := make(chan *PaymentResult, 1)
|
resultChan := make(chan *PaymentResult, 1)
|
||||||
@ -364,7 +375,7 @@ func (s *Switch) GetPaymentResult(paymentID uint64,
|
|||||||
|
|
||||||
var n *networkResult
|
var n *networkResult
|
||||||
select {
|
select {
|
||||||
case n = <-payment.resultChan:
|
case n = <-nChan:
|
||||||
case <-s.quit:
|
case <-s.quit:
|
||||||
// We close the result channel to signal a shutdown. We
|
// We close the result channel to signal a shutdown. We
|
||||||
// don't send any result in this case since the HTLC is
|
// don't send any result in this case since the HTLC is
|
||||||
@ -375,7 +386,7 @@ func (s *Switch) GetPaymentResult(paymentID uint64,
|
|||||||
|
|
||||||
// Extract the result and pass it to the result channel.
|
// Extract the result and pass it to the result channel.
|
||||||
result, err := s.extractResult(
|
result, err := s.extractResult(
|
||||||
deobfuscator, n, paymentID, payment.paymentHash,
|
deobfuscator, n, paymentID, paymentHash,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e := fmt.Errorf("Unable to extract result: %v", err)
|
e := fmt.Errorf("Unable to extract result: %v", err)
|
||||||
@ -398,24 +409,6 @@ func (s *Switch) GetPaymentResult(paymentID uint64,
|
|||||||
func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64,
|
func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64,
|
||||||
htlc *lnwire.UpdateAddHTLC) error {
|
htlc *lnwire.UpdateAddHTLC) error {
|
||||||
|
|
||||||
// Create payment and add to the map of payment in order later to be
|
|
||||||
// able to retrieve it and return response to the user.
|
|
||||||
payment := &pendingPayment{
|
|
||||||
resultChan: make(chan *networkResult, 1),
|
|
||||||
paymentHash: htlc.PaymentHash,
|
|
||||||
amount: htlc.Amount,
|
|
||||||
}
|
|
||||||
|
|
||||||
s.pendingMutex.Lock()
|
|
||||||
if _, ok := s.pendingPayments[paymentID]; ok {
|
|
||||||
s.pendingMutex.Unlock()
|
|
||||||
|
|
||||||
return ErrPaymentIDAlreadyExists
|
|
||||||
}
|
|
||||||
|
|
||||||
s.pendingPayments[paymentID] = payment
|
|
||||||
s.pendingMutex.Unlock()
|
|
||||||
|
|
||||||
// Generate and send new update packet, if error will be received on
|
// Generate and send new update packet, if error will be received on
|
||||||
// this stage it means that packet haven't left boundaries of our
|
// this stage it means that packet haven't left boundaries of our
|
||||||
// system and something wrong happened.
|
// system and something wrong happened.
|
||||||
@ -426,12 +419,7 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64,
|
|||||||
htlc: htlc,
|
htlc: htlc,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.forward(packet); err != nil {
|
return s.forward(packet)
|
||||||
s.removePendingPayment(paymentID)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateForwardingPolicies sends a message to the switch to update the
|
// UpdateForwardingPolicies sends a message to the switch to update the
|
||||||
@ -856,15 +844,34 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error {
|
|||||||
// multiple db transactions. The guarantees of the circuit map are stringent
|
// multiple db transactions. The guarantees of the circuit map are stringent
|
||||||
// enough such that we are able to tolerate reordering of these operations
|
// enough such that we are able to tolerate reordering of these operations
|
||||||
// without side effects. The primary operations handled are:
|
// without side effects. The primary operations handled are:
|
||||||
// 1. Ack settle/fail references, to avoid resending this response internally
|
// 1. Save the payment result to the pending payment store.
|
||||||
// 2. Teardown the closing circuit in the circuit map
|
// 2. Notify subscribers about the payment result.
|
||||||
// 3. Transition the payment status to grounded or completed.
|
// 3. Ack settle/fail references, to avoid resending this response internally
|
||||||
// 4. Respond to an in-mem pending payment, if it is found.
|
// 4. Teardown the closing circuit in the circuit map
|
||||||
//
|
//
|
||||||
// NOTE: This method MUST be spawned as a goroutine.
|
// NOTE: This method MUST be spawned as a goroutine.
|
||||||
func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
|
func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
|
||||||
defer s.wg.Done()
|
defer s.wg.Done()
|
||||||
|
|
||||||
|
paymentID := pkt.incomingHTLCID
|
||||||
|
|
||||||
|
// The error reason will be unencypted in case this a local
|
||||||
|
// failure or a converted error.
|
||||||
|
unencrypted := pkt.localFailure || pkt.convertedError
|
||||||
|
n := &networkResult{
|
||||||
|
msg: pkt.htlc,
|
||||||
|
unencrypted: unencrypted,
|
||||||
|
isResolution: pkt.isResolution,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store the result to the db. This will also notify subscribers about
|
||||||
|
// the result.
|
||||||
|
if err := s.networkResults.storeResult(paymentID, n); err != nil {
|
||||||
|
log.Errorf("Unable to complete payment for pid=%v: %v",
|
||||||
|
paymentID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// First, we'll clean up any fwdpkg references, circuit entries, and
|
// First, we'll clean up any fwdpkg references, circuit entries, and
|
||||||
// mark in our db that the payment for this payment hash has either
|
// mark in our db that the payment for this payment hash has either
|
||||||
// succeeded or failed.
|
// succeeded or failed.
|
||||||
@ -892,26 +899,6 @@ func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
|
|||||||
pkt.inKey(), err)
|
pkt.inKey(), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Locate the pending payment to notify the application that this
|
|
||||||
// payment has failed. If one is not found, it likely means the daemon
|
|
||||||
// has been restarted since sending the payment.
|
|
||||||
payment := s.findPayment(pkt.incomingHTLCID)
|
|
||||||
|
|
||||||
// The error reason will be unencypted in case this a local
|
|
||||||
// failure or a converted error.
|
|
||||||
unencrypted := pkt.localFailure || pkt.convertedError
|
|
||||||
n := &networkResult{
|
|
||||||
msg: pkt.htlc,
|
|
||||||
unencrypted: unencrypted,
|
|
||||||
isResolution: pkt.isResolution,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deliver the payment error and preimage to the application, if it is
|
|
||||||
// waiting for a response.
|
|
||||||
if payment != nil {
|
|
||||||
payment.resultChan <- n
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// extractResult uses the given deobfuscator to extract the payment result from
|
// extractResult uses the given deobfuscator to extract the payment result from
|
||||||
@ -2173,30 +2160,6 @@ func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) {
|
|||||||
return channelLinks, nil
|
return channelLinks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// removePendingPayment is the helper function which removes the pending user
|
|
||||||
// payment.
|
|
||||||
func (s *Switch) removePendingPayment(paymentID uint64) {
|
|
||||||
s.pendingMutex.Lock()
|
|
||||||
defer s.pendingMutex.Unlock()
|
|
||||||
|
|
||||||
delete(s.pendingPayments, paymentID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// findPayment is the helper function which find the payment.
|
|
||||||
func (s *Switch) findPayment(paymentID uint64) *pendingPayment {
|
|
||||||
s.pendingMutex.RLock()
|
|
||||||
defer s.pendingMutex.RUnlock()
|
|
||||||
|
|
||||||
payment, ok := s.pendingPayments[paymentID]
|
|
||||||
if !ok {
|
|
||||||
log.Errorf("Cannot find pending payment with ID %d",
|
|
||||||
paymentID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return payment
|
|
||||||
}
|
|
||||||
|
|
||||||
// CircuitModifier returns a reference to subset of the interfaces provided by
|
// CircuitModifier returns a reference to subset of the interfaces provided by
|
||||||
// the circuit map, to allow links to open and close circuits.
|
// the circuit map, to allow links to open and close circuits.
|
||||||
func (s *Switch) CircuitModifier() CircuitModifier {
|
func (s *Switch) CircuitModifier() CircuitModifier {
|
||||||
|
@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/btcsuite/fastsha256"
|
"github.com/btcsuite/fastsha256"
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
|
"github.com/lightningnetwork/lnd/lntypes"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/lightningnetwork/lnd/ticker"
|
"github.com/lightningnetwork/lnd/ticker"
|
||||||
)
|
)
|
||||||
@ -1743,7 +1744,7 @@ func TestSwitchSendPayment(t *testing.T) {
|
|||||||
// First check that the switch will correctly respond that this payment
|
// First check that the switch will correctly respond that this payment
|
||||||
// ID is unknown.
|
// ID is unknown.
|
||||||
_, err = s.GetPaymentResult(
|
_, err = s.GetPaymentResult(
|
||||||
paymentID, newMockDeobfuscator(),
|
paymentID, rhash, newMockDeobfuscator(),
|
||||||
)
|
)
|
||||||
if err != ErrPaymentIDNotFound {
|
if err != ErrPaymentIDNotFound {
|
||||||
t.Fatalf("expected ErrPaymentIDNotFound, got %v", err)
|
t.Fatalf("expected ErrPaymentIDNotFound, got %v", err)
|
||||||
@ -1761,7 +1762,7 @@ func TestSwitchSendPayment(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
resultChan, err := s.GetPaymentResult(
|
resultChan, err := s.GetPaymentResult(
|
||||||
paymentID, newMockDeobfuscator(),
|
paymentID, rhash, newMockDeobfuscator(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
@ -2125,3 +2126,115 @@ func TestUpdateFailMalformedHTLCErrorConversion(t *testing.T) {
|
|||||||
assertPaymentFailure(t)
|
assertPaymentFailure(t)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestSwitchGetPaymentResult tests that the switch interacts as expected with
|
||||||
|
// the circuit map and network result store when looking up the result of a
|
||||||
|
// payment ID. This is important for not to lose results under concurrent
|
||||||
|
// lookup and receiving results.
|
||||||
|
func TestSwitchGetPaymentResult(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
const paymentID = 123
|
||||||
|
var preimg lntypes.Preimage
|
||||||
|
preimg[0] = 3
|
||||||
|
|
||||||
|
s, err := initSwitchWithDB(testStartingHeight, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to init switch: %v", err)
|
||||||
|
}
|
||||||
|
if err := s.Start(); err != nil {
|
||||||
|
t.Fatalf("unable to start switch: %v", err)
|
||||||
|
}
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
|
lookup := make(chan *PaymentCircuit, 1)
|
||||||
|
s.circuits = &mockCircuitMap{
|
||||||
|
lookup: lookup,
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the payment circuit is not found in the circuit map, the payment
|
||||||
|
// result must be found in the store if available. Since we haven't
|
||||||
|
// added anything to the store yet, ErrPaymentIDNotFound should be
|
||||||
|
// returned.
|
||||||
|
lookup <- nil
|
||||||
|
_, err = s.GetPaymentResult(
|
||||||
|
paymentID, lntypes.Hash{}, newMockDeobfuscator(),
|
||||||
|
)
|
||||||
|
if err != ErrPaymentIDNotFound {
|
||||||
|
t.Fatalf("expected ErrPaymentIDNotFound, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next let the lookup find the circuit in the circuit map. It should
|
||||||
|
// subscribe to payment results, and return the result when available.
|
||||||
|
lookup <- &PaymentCircuit{}
|
||||||
|
resultChan, err := s.GetPaymentResult(
|
||||||
|
paymentID, lntypes.Hash{}, newMockDeobfuscator(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to get payment result: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the result to the store.
|
||||||
|
n := &networkResult{
|
||||||
|
msg: &lnwire.UpdateFulfillHTLC{
|
||||||
|
PaymentPreimage: preimg,
|
||||||
|
},
|
||||||
|
unencrypted: true,
|
||||||
|
isResolution: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.networkResults.storeResult(paymentID, n)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to store result: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The result should be availble.
|
||||||
|
select {
|
||||||
|
case res, ok := <-resultChan:
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("channel was closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.Error != nil {
|
||||||
|
t.Fatalf("got unexpected error result")
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.Preimage != preimg {
|
||||||
|
t.Fatalf("expected preimg %v, got %v",
|
||||||
|
preimg, res.Preimage)
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Fatalf("result not received")
|
||||||
|
}
|
||||||
|
|
||||||
|
// As a final test, try to get the result again. Now that is no longer
|
||||||
|
// in the circuit map, it should be immediately available from the
|
||||||
|
// store.
|
||||||
|
lookup <- nil
|
||||||
|
resultChan, err = s.GetPaymentResult(
|
||||||
|
paymentID, lntypes.Hash{}, newMockDeobfuscator(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to get payment result: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case res, ok := <-resultChan:
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("channel was closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.Error != nil {
|
||||||
|
t.Fatalf("got unexpected error result")
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.Preimage != preimg {
|
||||||
|
t.Fatalf("expected preimg %v, got %v",
|
||||||
|
preimg, res.Preimage)
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Fatalf("result not received")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -801,7 +801,7 @@ func preparePayment(sendingPeer, receivingPeer lnpeer.Peer,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
resultChan, err := sender.htlcSwitch.GetPaymentResult(
|
resultChan, err := sender.htlcSwitch.GetPaymentResult(
|
||||||
pid, newMockDeobfuscator(),
|
pid, hash, newMockDeobfuscator(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -1289,7 +1289,7 @@ func (n *twoHopNetwork) makeHoldPayment(sendingPeer, receivingPeer lnpeer.Peer,
|
|||||||
}
|
}
|
||||||
|
|
||||||
resultChan, err := sender.htlcSwitch.GetPaymentResult(
|
resultChan, err := sender.htlcSwitch.GetPaymentResult(
|
||||||
pid, newMockDeobfuscator(),
|
pid, rhash, newMockDeobfuscator(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
paymentErr <- err
|
paymentErr <- err
|
||||||
|
@ -32,8 +32,10 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd"
|
"github.com/lightningnetwork/lnd"
|
||||||
"github.com/lightningnetwork/lnd/chanbackup"
|
"github.com/lightningnetwork/lnd/chanbackup"
|
||||||
"github.com/lightningnetwork/lnd/lnrpc"
|
"github.com/lightningnetwork/lnd/lnrpc"
|
||||||
|
"github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
|
||||||
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
|
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
|
||||||
"github.com/lightningnetwork/lnd/lntest"
|
"github.com/lightningnetwork/lnd/lntest"
|
||||||
|
"github.com/lightningnetwork/lnd/lntypes"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
@ -13134,6 +13136,360 @@ func testChannelBackupRestore(net *lntest.NetworkHarness, t *harnessTest) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// testHoldInvoicePersistence tests that a sender to a hold-invoice, can be
|
||||||
|
// restarted before the payment gets settled, and still be able to receive the
|
||||||
|
// preimage.
|
||||||
|
func testHoldInvoicePersistence(net *lntest.NetworkHarness, t *harnessTest) {
|
||||||
|
ctxb := context.Background()
|
||||||
|
|
||||||
|
const (
|
||||||
|
chanAmt = btcutil.Amount(1000000)
|
||||||
|
numPayments = 10
|
||||||
|
)
|
||||||
|
|
||||||
|
// Create carol, and clean up when the test finishes.
|
||||||
|
carol, err := net.NewNode("Carol", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create new nodes: %v", err)
|
||||||
|
}
|
||||||
|
defer shutdownAndAssert(net, t, carol)
|
||||||
|
|
||||||
|
// Connect Alice to Carol.
|
||||||
|
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
|
||||||
|
if err := net.ConnectNodes(ctxb, net.Alice, carol); err != nil {
|
||||||
|
t.Fatalf("unable to connect alice to carol: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open a channel between Alice and Carol.
|
||||||
|
ctxt, _ = context.WithTimeout(ctxb, channelOpenTimeout)
|
||||||
|
chanPointAlice := openChannelAndAssert(
|
||||||
|
ctxt, t, net, net.Alice, carol,
|
||||||
|
lntest.OpenChannelParams{
|
||||||
|
Amt: chanAmt,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// Wait for Alice and Carol to receive the channel edge from the
|
||||||
|
// funding manager.
|
||||||
|
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
|
||||||
|
err = net.Alice.WaitForNetworkChannelOpen(ctxt, chanPointAlice)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("alice didn't see the alice->carol channel before "+
|
||||||
|
"timeout: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
|
||||||
|
err = carol.WaitForNetworkChannelOpen(ctxt, chanPointAlice)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("alice didn't see the alice->carol channel before "+
|
||||||
|
"timeout: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create preimages for all payments we are going to initiate.
|
||||||
|
var preimages []lntypes.Preimage
|
||||||
|
for i := 0; i < numPayments; i++ {
|
||||||
|
var preimage lntypes.Preimage
|
||||||
|
_, err = rand.Read(preimage[:])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to generate preimage: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
preimages = append(preimages, preimage)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let Carol create hold-invoices for all the payments.
|
||||||
|
var (
|
||||||
|
payAmt = btcutil.Amount(4)
|
||||||
|
payReqs []string
|
||||||
|
invoiceStreams []invoicesrpc.Invoices_SubscribeSingleInvoiceClient
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, preimage := range preimages {
|
||||||
|
payHash := preimage.Hash()
|
||||||
|
invoiceReq := &invoicesrpc.AddHoldInvoiceRequest{
|
||||||
|
Memo: "testing",
|
||||||
|
Value: int64(payAmt),
|
||||||
|
Hash: payHash[:],
|
||||||
|
}
|
||||||
|
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
|
||||||
|
resp, err := carol.AddHoldInvoice(ctxt, invoiceReq)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to add invoice: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctxb)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
stream, err := carol.SubscribeSingleInvoice(
|
||||||
|
ctx,
|
||||||
|
&invoicesrpc.SubscribeSingleInvoiceRequest{
|
||||||
|
RHash: payHash[:],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to subscribe to invoice: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
invoiceStreams = append(invoiceStreams, stream)
|
||||||
|
payReqs = append(payReqs, resp.PaymentRequest)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all the invoices to reach the OPEN state.
|
||||||
|
for _, stream := range invoiceStreams {
|
||||||
|
invoice, err := stream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if invoice.State != lnrpc.Invoice_OPEN {
|
||||||
|
t.Fatalf("expected OPEN, got state: %v", invoice.State)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let Alice initiate payments for all the created invoices.
|
||||||
|
var paymentStreams []routerrpc.Router_SendPaymentClient
|
||||||
|
for _, payReq := range payReqs {
|
||||||
|
ctx, cancel := context.WithCancel(ctxb)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
payStream, err := net.Alice.RouterClient.SendPayment(
|
||||||
|
ctx, &routerrpc.SendPaymentRequest{
|
||||||
|
PaymentRequest: payReq,
|
||||||
|
TimeoutSeconds: 60,
|
||||||
|
FeeLimitSat: 1000000,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to send alice htlc: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
paymentStreams = append(paymentStreams, payStream)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for inlight status update.
|
||||||
|
for _, payStream := range paymentStreams {
|
||||||
|
status, err := payStream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed receiving status update: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if status.State != routerrpc.PaymentState_IN_FLIGHT {
|
||||||
|
t.Fatalf("state not in flight: %v", status.State)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The payments should now show up in Alice's ListInvoices, with a zero
|
||||||
|
// preimage, indicating they are not yet settled.
|
||||||
|
err = lntest.WaitNoError(func() error {
|
||||||
|
req := &lnrpc.ListPaymentsRequest{}
|
||||||
|
ctxt, _ = context.WithTimeout(ctxt, defaultTimeout)
|
||||||
|
paymentsResp, err := net.Alice.ListPayments(ctxt, req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error when obtaining payments: %v",
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gather the payment hashes we are looking for in the
|
||||||
|
// response.
|
||||||
|
payHashes := make(map[string]struct{})
|
||||||
|
for _, preimg := range preimages {
|
||||||
|
payHashes[preimg.Hash().String()] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
var zeroPreimg lntypes.Preimage
|
||||||
|
for _, payment := range paymentsResp.Payments {
|
||||||
|
_, ok := payHashes[payment.PaymentHash]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// The preimage should NEVER be non-zero at this point.
|
||||||
|
if payment.PaymentPreimage != zeroPreimg.String() {
|
||||||
|
t.Fatalf("expected zero preimage, got %v",
|
||||||
|
payment.PaymentPreimage)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We wait for the payment attempt to have been
|
||||||
|
// properly recorded in the DB.
|
||||||
|
if len(payment.Path) == 0 {
|
||||||
|
return fmt.Errorf("path is empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(payHashes, payment.PaymentHash)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(payHashes) != 0 {
|
||||||
|
return fmt.Errorf("payhash not found in response")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}, time.Second*15)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("predicate not satisfied: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all invoices to be accepted.
|
||||||
|
for _, stream := range invoiceStreams {
|
||||||
|
invoice, err := stream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if invoice.State != lnrpc.Invoice_ACCEPTED {
|
||||||
|
t.Fatalf("expected ACCEPTED, got state: %v",
|
||||||
|
invoice.State)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restart alice. This to ensure she will still be able to handle
|
||||||
|
// settling the invoices after a restart.
|
||||||
|
if err := net.RestartNode(net.Alice, nil); err != nil {
|
||||||
|
t.Fatalf("Node restart failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now after a restart, we must re-track the payments. We set up a
|
||||||
|
// goroutine for each to track thir status updates.
|
||||||
|
var (
|
||||||
|
statusUpdates []chan *routerrpc.PaymentStatus
|
||||||
|
wg sync.WaitGroup
|
||||||
|
quit = make(chan struct{})
|
||||||
|
)
|
||||||
|
|
||||||
|
defer close(quit)
|
||||||
|
for _, preimg := range preimages {
|
||||||
|
hash := preimg.Hash()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctxb)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
payStream, err := net.Alice.RouterClient.TrackPayment(
|
||||||
|
ctx, &routerrpc.TrackPaymentRequest{
|
||||||
|
PaymentHash: hash[:],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to send track payment: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We set up a channel where we'll forward any status update.
|
||||||
|
upd := make(chan *routerrpc.PaymentStatus)
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
for {
|
||||||
|
status, err := payStream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
close(upd)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case upd <- status:
|
||||||
|
case <-quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
statusUpdates = append(statusUpdates, upd)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the infligt status update.
|
||||||
|
for _, upd := range statusUpdates {
|
||||||
|
select {
|
||||||
|
case status, ok := <-upd:
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("failed getting status update")
|
||||||
|
}
|
||||||
|
|
||||||
|
if status.State != routerrpc.PaymentState_IN_FLIGHT {
|
||||||
|
t.Fatalf("state not in in flight: %v",
|
||||||
|
status.State)
|
||||||
|
}
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("in flight status not recevied")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Settle invoices half the invoices, cancel the rest.
|
||||||
|
for i, preimage := range preimages {
|
||||||
|
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
|
||||||
|
if i%2 == 0 {
|
||||||
|
settle := &invoicesrpc.SettleInvoiceMsg{
|
||||||
|
Preimage: preimage[:],
|
||||||
|
}
|
||||||
|
_, err = carol.SettleInvoice(ctxt, settle)
|
||||||
|
} else {
|
||||||
|
hash := preimage.Hash()
|
||||||
|
settle := &invoicesrpc.CancelInvoiceMsg{
|
||||||
|
PaymentHash: hash[:],
|
||||||
|
}
|
||||||
|
_, err = carol.CancelInvoice(ctxt, settle)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to cancel/settle invoice: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure we get the expected status update.
|
||||||
|
for i, upd := range statusUpdates {
|
||||||
|
select {
|
||||||
|
case status, ok := <-upd:
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("failed getting status update")
|
||||||
|
}
|
||||||
|
|
||||||
|
if i%2 == 0 {
|
||||||
|
if status.State != routerrpc.PaymentState_SUCCEEDED {
|
||||||
|
t.Fatalf("state not suceeded : %v",
|
||||||
|
status.State)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if status.State != routerrpc.PaymentState_FAILED_NO_ROUTE {
|
||||||
|
t.Fatalf("state not failed: %v",
|
||||||
|
status.State)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("in flight status not recevied")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that Alice's invoices to be shown as settled and failed
|
||||||
|
// accordingly, and preimages matching up.
|
||||||
|
req := &lnrpc.ListPaymentsRequest{}
|
||||||
|
ctxt, _ = context.WithTimeout(ctxt, defaultTimeout)
|
||||||
|
paymentsResp, err := net.Alice.ListPayments(ctxt, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error when obtaining Alice payments: %v", err)
|
||||||
|
}
|
||||||
|
for i, preimage := range preimages {
|
||||||
|
paymentHash := preimage.Hash()
|
||||||
|
var p string
|
||||||
|
for _, resp := range paymentsResp.Payments {
|
||||||
|
if resp.PaymentHash == paymentHash.String() {
|
||||||
|
p = resp.PaymentPreimage
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if p == "" {
|
||||||
|
t.Fatalf("payment not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
if i%2 == 0 {
|
||||||
|
if p != preimage.String() {
|
||||||
|
t.Fatalf("preimage doesn't match: %v vs %v",
|
||||||
|
p, preimage.String())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if p != lntypes.ZeroHash.String() {
|
||||||
|
t.Fatalf("preimage not zero: %v", p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
name string
|
name string
|
||||||
test func(net *lntest.NetworkHarness, t *harnessTest)
|
test func(net *lntest.NetworkHarness, t *harnessTest)
|
||||||
@ -13373,6 +13729,10 @@ var testsCases = []*testCase{
|
|||||||
name: "channel backup restore",
|
name: "channel backup restore",
|
||||||
test: testChannelBackupRestore,
|
test: testChannelBackupRestore,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "hold invoice sender persistence",
|
||||||
|
test: testHoldInvoicePersistence,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestLightningNetworkDaemon performs a series of integration tests amongst a
|
// TestLightningNetworkDaemon performs a series of integration tests amongst a
|
||||||
|
@ -52,7 +52,8 @@ func (m *mockPaymentAttemptDispatcher) SendHTLC(firstHop lnwire.ShortChannelID,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockPaymentAttemptDispatcher) GetPaymentResult(paymentID uint64,
|
func (m *mockPaymentAttemptDispatcher) GetPaymentResult(paymentID uint64,
|
||||||
_ htlcswitch.ErrorDecrypter) (<-chan *htlcswitch.PaymentResult, error) {
|
_ lntypes.Hash, _ htlcswitch.ErrorDecrypter) (
|
||||||
|
<-chan *htlcswitch.PaymentResult, error) {
|
||||||
|
|
||||||
c := make(chan *htlcswitch.PaymentResult, 1)
|
c := make(chan *htlcswitch.PaymentResult, 1)
|
||||||
res, ok := m.results[paymentID]
|
res, ok := m.results[paymentID]
|
||||||
@ -139,8 +140,8 @@ func (m *mockPayer) SendHTLC(_ lnwire.ShortChannelID,
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockPayer) GetPaymentResult(paymentID uint64, _ htlcswitch.ErrorDecrypter) (
|
func (m *mockPayer) GetPaymentResult(paymentID uint64, _ lntypes.Hash,
|
||||||
<-chan *htlcswitch.PaymentResult, error) {
|
_ htlcswitch.ErrorDecrypter) (<-chan *htlcswitch.PaymentResult, error) {
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case res := <-m.paymentResult:
|
case res := <-m.paymentResult:
|
||||||
|
@ -95,7 +95,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
|
|||||||
// Now ask the switch to return the result of the payment when
|
// Now ask the switch to return the result of the payment when
|
||||||
// available.
|
// available.
|
||||||
resultChan, err := p.router.cfg.Payer.GetPaymentResult(
|
resultChan, err := p.router.cfg.Payer.GetPaymentResult(
|
||||||
p.attempt.PaymentID, errorDecryptor,
|
p.attempt.PaymentID, p.payment.PaymentHash, errorDecryptor,
|
||||||
)
|
)
|
||||||
switch {
|
switch {
|
||||||
|
|
||||||
|
@ -138,7 +138,8 @@ type PaymentAttemptDispatcher interface {
|
|||||||
// HTLC is guaranteed to no longer be in flight. The switch shutting
|
// HTLC is guaranteed to no longer be in flight. The switch shutting
|
||||||
// down is signaled by closing the channel. If the paymentID is
|
// down is signaled by closing the channel. If the paymentID is
|
||||||
// unknown, ErrPaymentIDNotFound will be returned.
|
// unknown, ErrPaymentIDNotFound will be returned.
|
||||||
GetPaymentResult(paymentID uint64, deobfuscator htlcswitch.ErrorDecrypter) (
|
GetPaymentResult(paymentID uint64, paymentHash lntypes.Hash,
|
||||||
|
deobfuscator htlcswitch.ErrorDecrypter) (
|
||||||
<-chan *htlcswitch.PaymentResult, error)
|
<-chan *htlcswitch.PaymentResult, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user