Merge pull request #2356 from joostjager/invoices-subserver

invoices: add subscribesingleinvoice
This commit is contained in:
Olaoluwa Osuntokun 2019-02-01 17:19:58 -08:00 committed by GitHub
commit 4af857f0c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1289 additions and 287 deletions

@ -359,10 +359,10 @@ func TestDuplicateSettleInvoice(t *testing.T) {
} }
// If we try to settle the invoice again, then we should get the very // If we try to settle the invoice again, then we should get the very
// same invoice back. // same invoice back, but with an error this time.
dbInvoice, err = db.SettleInvoice(payHash, amt) dbInvoice, err = db.SettleInvoice(payHash, amt)
if err != nil { if err != ErrInvoiceAlreadySettled {
t.Fatalf("unable to settle invoice: %v", err) t.Fatalf("expected ErrInvoiceAlreadySettled")
} }
if dbInvoice == nil { if dbInvoice == nil {

@ -4,12 +4,14 @@ import (
"bytes" "bytes"
"crypto/sha256" "crypto/sha256"
"encoding/binary" "encoding/binary"
"errors"
"fmt" "fmt"
"io" "io"
"time" "time"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/coreos/bbolt" "github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
@ -56,6 +58,10 @@ var (
// //
// settleIndexNo => invoiceKey // settleIndexNo => invoiceKey
settleIndexBucket = []byte("invoice-settle-index") settleIndexBucket = []byte("invoice-settle-index")
// ErrInvoiceAlreadySettled is returned when the invoice is already
// settled.
ErrInvoiceAlreadySettled = errors.New("invoice already settled")
) )
const ( const (
@ -105,7 +111,7 @@ type ContractTerm struct {
// PaymentPreimage is the preimage which is to be revealed in the // PaymentPreimage is the preimage which is to be revealed in the
// occasion that an HTLC paying to the hash of this preimage is // occasion that an HTLC paying to the hash of this preimage is
// extended. // extended.
PaymentPreimage [32]byte PaymentPreimage lntypes.Preimage
// Value is the expected amount of milli-satoshis to be paid to an HTLC // Value is the expected amount of milli-satoshis to be paid to an HTLC
// which can be satisfied by the above preimage. // which can be satisfied by the above preimage.
@ -625,21 +631,14 @@ func (d *DB) SettleInvoice(paymentHash [32]byte,
return ErrInvoiceNotFound return ErrInvoiceNotFound
} }
invoice, err := settleInvoice( settledInvoice, err = settleInvoice(
invoices, settleIndex, invoiceNum, amtPaid, invoices, settleIndex, invoiceNum, amtPaid,
) )
if err != nil {
return err return err
}
settledInvoice = invoice
return nil
}) })
if err != nil {
return nil, err
}
return settledInvoice, nil return settledInvoice, err
} }
// InvoicesSettledSince can be used by callers to catch up any settled invoices // InvoicesSettledSince can be used by callers to catch up any settled invoices
@ -897,10 +896,8 @@ func settleInvoice(invoices, settleIndex *bbolt.Bucket, invoiceNum []byte,
return nil, err return nil, err
} }
// Add idempotency to duplicate settles, return here to avoid
// overwriting the previous info.
if invoice.Terms.State == ContractSettled { if invoice.Terms.State == ContractSettled {
return &invoice, nil return &invoice, ErrInvoiceAlreadySettled
} }
// Now that we know the invoice hasn't already been settled, we'll // Now that we know the invoice hasn't already been settled, we'll

@ -12,6 +12,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/sweep" "github.com/lightningnetwork/lnd/sweep"
@ -139,7 +140,7 @@ type ChainArbitratorConfig struct {
// SettleInvoice attempts to settle an existing invoice on-chain with // SettleInvoice attempts to settle an existing invoice on-chain with
// the given payment hash. ErrInvoiceNotFound is returned if an invoice // the given payment hash. ErrInvoiceNotFound is returned if an invoice
// is not found. // is not found.
SettleInvoice func(chainhash.Hash, lnwire.MilliSatoshi) error SettleInvoice func(lntypes.Hash, lnwire.MilliSatoshi) error
} }
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all // ChainArbitrator is a sub-system that oversees the on-chain resolution of all

@ -11,6 +11,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
@ -176,7 +177,7 @@ func createTestChannelArbitrator(log ArbitratorLog) (*ChannelArbitrator,
*lnwallet.IncomingHtlcResolution, uint32) error { *lnwallet.IncomingHtlcResolution, uint32) error {
return nil return nil
}, },
SettleInvoice: func(chainhash.Hash, lnwire.MilliSatoshi) error { SettleInvoice: func(lntypes.Hash, lnwire.MilliSatoshi) error {
return nil return nil
}, },
} }

@ -1,9 +1,9 @@
package htlcswitch package htlcswitch
import ( import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
@ -14,11 +14,11 @@ type InvoiceDatabase interface {
// byte payment hash. This method should also reutrn the min final CLTV // byte payment hash. This method should also reutrn the min final CLTV
// delta for this invoice. We'll use this to ensure that the HTLC // delta for this invoice. We'll use this to ensure that the HTLC
// extended to us gives us enough time to settle as we prescribe. // extended to us gives us enough time to settle as we prescribe.
LookupInvoice(chainhash.Hash) (channeldb.Invoice, uint32, error) LookupInvoice(lntypes.Hash) (channeldb.Invoice, uint32, error)
// SettleInvoice attempts to mark an invoice corresponding to the // SettleInvoice attempts to mark an invoice corresponding to the
// passed payment hash as fully settled. // passed payment hash as fully settled.
SettleInvoice(payHash chainhash.Hash, paidAmount lnwire.MilliSatoshi) error SettleInvoice(payHash lntypes.Hash, paidAmount lnwire.MilliSatoshi) error
} }
// ChannelLink is an interface which represents the subsystem for managing the // ChannelLink is an interface which represents the subsystem for managing the

@ -9,7 +9,6 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors" "github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
@ -17,6 +16,7 @@ import (
"github.com/lightningnetwork/lnd/htlcswitch/hodl" "github.com/lightningnetwork/lnd/htlcswitch/hodl"
"github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/ticker"
@ -2312,7 +2312,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
// We're the designated payment destination. Therefore // We're the designated payment destination. Therefore
// we attempt to see if we have an invoice locally // we attempt to see if we have an invoice locally
// which'll allow us to settle this htlc. // which'll allow us to settle this htlc.
invoiceHash := chainhash.Hash(pd.RHash) invoiceHash := lntypes.Hash(pd.RHash)
invoice, minCltvDelta, err := l.cfg.Registry.LookupInvoice( invoice, minCltvDelta, err := l.cfg.Registry.LookupInvoice(
invoiceHash, invoiceHash,
) )

@ -17,7 +17,6 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/fastsha256"
"github.com/go-errors/errors" "github.com/go-errors/errors"
"github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
@ -25,6 +24,7 @@ import (
"github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/ticker"
@ -686,18 +686,18 @@ var _ ChannelLink = (*mockChannelLink)(nil)
type mockInvoiceRegistry struct { type mockInvoiceRegistry struct {
sync.Mutex sync.Mutex
invoices map[chainhash.Hash]channeldb.Invoice invoices map[lntypes.Hash]channeldb.Invoice
finalDelta uint32 finalDelta uint32
} }
func newMockRegistry(minDelta uint32) *mockInvoiceRegistry { func newMockRegistry(minDelta uint32) *mockInvoiceRegistry {
return &mockInvoiceRegistry{ return &mockInvoiceRegistry{
finalDelta: minDelta, finalDelta: minDelta,
invoices: make(map[chainhash.Hash]channeldb.Invoice), invoices: make(map[lntypes.Hash]channeldb.Invoice),
} }
} }
func (i *mockInvoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice, uint32, error) { func (i *mockInvoiceRegistry) LookupInvoice(rHash lntypes.Hash) (channeldb.Invoice, uint32, error) {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
@ -710,7 +710,7 @@ func (i *mockInvoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Inv
return invoice, i.finalDelta, nil return invoice, i.finalDelta, nil
} }
func (i *mockInvoiceRegistry) SettleInvoice(rhash chainhash.Hash, func (i *mockInvoiceRegistry) SettleInvoice(rhash lntypes.Hash,
amt lnwire.MilliSatoshi) error { amt lnwire.MilliSatoshi) error {
i.Lock() i.Lock()
@ -736,8 +736,8 @@ func (i *mockInvoiceRegistry) AddInvoice(invoice channeldb.Invoice) error {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
rhash := fastsha256.Sum256(invoice.Terms.PaymentPreimage[:]) rhash := invoice.Terms.PaymentPreimage.Hash()
i.invoices[chainhash.Hash(rhash)] = invoice i.invoices[rhash] = invoice
return nil return nil
} }

@ -27,6 +27,7 @@ import (
"github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/shachain" "github.com/lightningnetwork/lnd/shachain"
@ -651,11 +652,11 @@ func generateHops(payAmt lnwire.MilliSatoshi, startingHeight uint32,
} }
type paymentResponse struct { type paymentResponse struct {
rhash chainhash.Hash rhash lntypes.Hash
err chan error err chan error
} }
func (r *paymentResponse) Wait(d time.Duration) (chainhash.Hash, error) { func (r *paymentResponse) Wait(d time.Duration) (lntypes.Hash, error) {
select { select {
case err := <-r.err: case err := <-r.err:
close(r.err) close(r.err)
@ -680,7 +681,7 @@ func (n *threeHopNetwork) makePayment(sendingPeer, receivingPeer lnpeer.Peer,
paymentErr := make(chan error, 1) paymentErr := make(chan error, 1)
var rhash chainhash.Hash var rhash lntypes.Hash
sender := sendingPeer.(*mockServer) sender := sendingPeer.(*mockServer)
receiver := receivingPeer.(*mockServer) receiver := receivingPeer.(*mockServer)

@ -2,17 +2,16 @@ package invoices
import ( import (
"bytes" "bytes"
"crypto/sha256"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/zpay32" "github.com/lightningnetwork/lnd/zpay32"
@ -24,10 +23,10 @@ var (
// All nodes initialized with the flag active will immediately settle // All nodes initialized with the flag active will immediately settle
// any incoming HTLC whose rHash corresponds with the debug // any incoming HTLC whose rHash corresponds with the debug
// preimage. // preimage.
DebugPre, _ = chainhash.NewHash(bytes.Repeat([]byte{1}, 32)) DebugPre, _ = lntypes.NewPreimage(bytes.Repeat([]byte{1}, 32))
// DebugHash is the hash of the default preimage. // DebugHash is the hash of the default preimage.
DebugHash = chainhash.Hash(sha256.Sum256(DebugPre[:])) DebugHash = DebugPre.Hash()
) )
// InvoiceRegistry is a central registry of all the outstanding invoices // InvoiceRegistry is a central registry of all the outstanding invoices
@ -41,15 +40,17 @@ type InvoiceRegistry struct {
clientMtx sync.Mutex clientMtx sync.Mutex
nextClientID uint32 nextClientID uint32
notificationClients map[uint32]*InvoiceSubscription notificationClients map[uint32]*InvoiceSubscription
singleNotificationClients map[uint32]*SingleInvoiceSubscription
newSubscriptions chan *InvoiceSubscription newSubscriptions chan *InvoiceSubscription
newSingleSubscriptions chan *SingleInvoiceSubscription
subscriptionCancels chan uint32 subscriptionCancels chan uint32
invoiceEvents chan *invoiceEvent invoiceEvents chan *invoiceEvent
// debugInvoices is a map which stores special "debug" invoices which // debugInvoices is a map which stores special "debug" invoices which
// should be only created/used when manual tests require an invoice // should be only created/used when manual tests require an invoice
// that *all* nodes are able to fully settle. // that *all* nodes are able to fully settle.
debugInvoices map[chainhash.Hash]*channeldb.Invoice debugInvoices map[lntypes.Hash]*channeldb.Invoice
activeNetParams *chaincfg.Params activeNetParams *chaincfg.Params
@ -66,9 +67,11 @@ func NewRegistry(cdb *channeldb.DB,
return &InvoiceRegistry{ return &InvoiceRegistry{
cdb: cdb, cdb: cdb,
debugInvoices: make(map[chainhash.Hash]*channeldb.Invoice), debugInvoices: make(map[lntypes.Hash]*channeldb.Invoice),
notificationClients: make(map[uint32]*InvoiceSubscription), notificationClients: make(map[uint32]*InvoiceSubscription),
singleNotificationClients: make(map[uint32]*SingleInvoiceSubscription),
newSubscriptions: make(chan *InvoiceSubscription), newSubscriptions: make(chan *InvoiceSubscription),
newSingleSubscriptions: make(chan *SingleInvoiceSubscription),
subscriptionCancels: make(chan uint32), subscriptionCancels: make(chan uint32),
invoiceEvents: make(chan *invoiceEvent, 100), invoiceEvents: make(chan *invoiceEvent, 100),
activeNetParams: activeNetParams, activeNetParams: activeNetParams,
@ -97,6 +100,7 @@ func (i *InvoiceRegistry) Stop() {
// instance where invoices are settled. // instance where invoices are settled.
type invoiceEvent struct { type invoiceEvent struct {
state channeldb.ContractState state channeldb.ContractState
hash lntypes.Hash
invoice *channeldb.Invoice invoice *channeldb.Invoice
} }
@ -108,9 +112,9 @@ func (i *InvoiceRegistry) invoiceEventNotifier() {
for { for {
select { select {
// A new invoice subscription has just arrived! We'll query for // A new invoice subscription for all invoices has just arrived!
// any backlog notifications, then add it to the set of // We'll query for any backlog notifications, then add it to the
// clients. // set of clients.
case newClient := <-i.newSubscriptions: case newClient := <-i.newSubscriptions:
// Before we add the client to our set of active // Before we add the client to our set of active
// clients, we'll first attempt to deliver any backlog // clients, we'll first attempt to deliver any backlog
@ -129,6 +133,23 @@ func (i *InvoiceRegistry) invoiceEventNotifier() {
// continue. // continue.
i.notificationClients[newClient.id] = newClient i.notificationClients[newClient.id] = newClient
// A new single invoice subscription has arrived. We'll query
// for any backlog notifications, then add it to the set of
// clients.
case newClient := <-i.newSingleSubscriptions:
err := i.deliverSingleBacklogEvents(newClient)
if err != nil {
log.Errorf("Unable to deliver backlog invoice "+
"notifications: %v", err)
}
log.Infof("New single invoice subscription "+
"client: id=%v, hash=%v",
newClient.id, newClient.hash,
)
i.singleNotificationClients[newClient.id] = newClient
// A client no longer wishes to receive invoice notifications. // A client no longer wishes to receive invoice notifications.
// So we'll remove them from the set of active clients. // So we'll remove them from the set of active clients.
case clientID := <-i.subscriptionCancels: case clientID := <-i.subscriptionCancels:
@ -136,16 +157,53 @@ func (i *InvoiceRegistry) invoiceEventNotifier() {
"client=%v", clientID) "client=%v", clientID)
delete(i.notificationClients, clientID) delete(i.notificationClients, clientID)
delete(i.singleNotificationClients, clientID)
// A sub-systems has just modified the invoice state, so we'll // A sub-systems has just modified the invoice state, so we'll
// dispatch notifications to all registered clients. // dispatch notifications to all registered clients.
case event := <-i.invoiceEvents: case event := <-i.invoiceEvents:
i.dispatchToClients(event)
i.dispatchToSingleClients(event)
case <-i.quit:
return
}
}
}
// dispatchToSingleClients passes the supplied event to all notification clients
// that subscribed to all the invoice this event applies to.
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
// Dispatch to single invoice subscribers.
for _, client := range i.singleNotificationClients {
if client.hash != event.hash {
continue
}
select {
case client.ntfnQueue.ChanIn() <- &invoiceEvent{
state: event.state,
invoice: event.invoice,
}:
case <-i.quit:
return
}
}
}
// dispatchToClients passes the supplied event to all notification clients that
// subscribed to all invoices. Add and settle indices are used to make sure that
// clients don't receive duplicate or unwanted events.
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
invoice := event.invoice
for clientID, client := range i.notificationClients { for clientID, client := range i.notificationClients {
// Before we dispatch this event, we'll check // Before we dispatch this event, we'll check
// to ensure that this client hasn't already // to ensure that this client hasn't already
// received this notification in order to // received this notification in order to
// ensure we don't duplicate any events. // ensure we don't duplicate any events.
invoice := event.invoice
// TODO(joostjager): Refactor switches.
switch { switch {
// If we've already sent this settle event to // If we've already sent this settle event to
// the client, then we can skip this. // the client, then we can skip this.
@ -169,6 +227,7 @@ func (i *InvoiceRegistry) invoiceEventNotifier() {
"add_index=%v, new add event index=%v", "add_index=%v, new add event index=%v",
clientID, client.addIndex, clientID, client.addIndex,
invoice.AddIndex) invoice.AddIndex)
case event.state == channeldb.ContractSettled && case event.state == channeldb.ContractSettled &&
client.settleIndex+1 != invoice.SettleIndex: client.settleIndex+1 != invoice.SettleIndex:
log.Warnf("client=%v for invoice "+ log.Warnf("client=%v for invoice "+
@ -187,25 +246,17 @@ func (i *InvoiceRegistry) invoiceEventNotifier() {
return return
} }
// Each time we send a notification to a // Each time we send a notification to a client, we'll record
// client, we'll record the latest add/settle // the latest add/settle index it has. We'll use this to ensure
// index it has. We'll use this to ensure we // we don't send a notification twice, which can happen if a new
// don't send a notification twice, which can // event is added while we're catching up a new client.
// happen if a new event is added while we're
// catching up a new client.
switch event.state { switch event.state {
case channeldb.ContractSettled: case channeldb.ContractSettled:
client.settleIndex = invoice.SettleIndex client.settleIndex = invoice.SettleIndex
case channeldb.ContractOpen: case channeldb.ContractOpen:
client.addIndex = invoice.AddIndex client.addIndex = invoice.AddIndex
default: default:
log.Errorf("unknown invoice "+ log.Errorf("unknown invoice state: %v", event.state)
"state: %v", event.state)
}
}
case <-i.quit:
return
} }
} }
} }
@ -220,6 +271,7 @@ func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) erro
if err != nil { if err != nil {
return err return err
} }
settleEvents, err := i.cdb.InvoicesSettledSince(client.settleIndex) settleEvents, err := i.cdb.InvoicesSettledSince(client.settleIndex)
if err != nil { if err != nil {
return err return err
@ -242,6 +294,7 @@ func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) erro
return fmt.Errorf("registry shutting down") return fmt.Errorf("registry shutting down")
} }
} }
for _, settleEvent := range settleEvents { for _, settleEvent := range settleEvents {
// We re-bind the loop variable to ensure we don't hold onto // We re-bind the loop variable to ensure we don't hold onto
// the loop reference causing is to point to the same item. // the loop reference causing is to point to the same item.
@ -260,12 +313,45 @@ func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) erro
return nil return nil
} }
// deliverSingleBacklogEvents will attempt to query the invoice database to
// retrieve the current invoice state and deliver this to the subscriber. Single
// invoice subscribers will always receive the current state right after
// subscribing. Only in case the invoice does not yet exist, nothing is sent
// yet.
func (i *InvoiceRegistry) deliverSingleBacklogEvents(
client *SingleInvoiceSubscription) error {
invoice, err := i.cdb.LookupInvoice(client.hash)
// It is possible that the invoice does not exist yet, but the client is
// already watching it in anticipation.
if err == channeldb.ErrInvoiceNotFound {
return nil
}
if err != nil {
return err
}
err = client.notify(&invoiceEvent{
hash: client.hash,
invoice: &invoice,
state: invoice.Terms.State,
})
if err != nil {
return err
}
return nil
}
// AddDebugInvoice adds a debug invoice for the specified amount, identified // AddDebugInvoice adds a debug invoice for the specified amount, identified
// by the passed preimage. Once this invoice is added, subsystems within the // by the passed preimage. Once this invoice is added, subsystems within the
// daemon add/forward HTLCs that are able to obtain the proper preimage // daemon add/forward HTLCs that are able to obtain the proper preimage
// required for redemption in the case that we're the final destination. // required for redemption in the case that we're the final destination.
func (i *InvoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash.Hash) { func (i *InvoiceRegistry) AddDebugInvoice(amt btcutil.Amount,
paymentHash := chainhash.Hash(sha256.Sum256(preimage[:])) preimage lntypes.Preimage) {
paymentHash := preimage.Hash()
invoice := &channeldb.Invoice{ invoice := &channeldb.Invoice{
CreationDate: time.Now(), CreationDate: time.Now(),
@ -291,7 +377,9 @@ func (i *InvoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash
// redemption in the case that we're the final destination. We also return the // redemption in the case that we're the final destination. We also return the
// addIndex of the newly created invoice which monotonically increases for each // addIndex of the newly created invoice which monotonically increases for each
// new invoice added. // new invoice added.
func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error) { func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice,
paymentHash lntypes.Hash) (uint64, error) {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
@ -306,7 +394,7 @@ func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error)
// Now that we've added the invoice, we'll send dispatch a message to // Now that we've added the invoice, we'll send dispatch a message to
// notify the clients of this new invoice. // notify the clients of this new invoice.
i.notifyClients(invoice, channeldb.ContractOpen) i.notifyClients(paymentHash, invoice, channeldb.ContractOpen)
return addIndex, nil return addIndex, nil
} }
@ -318,7 +406,7 @@ func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error)
// according to the cltv delta. // according to the cltv delta.
// //
// TODO(roasbeef): ignore if settled? // TODO(roasbeef): ignore if settled?
func (i *InvoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice, uint32, error) { func (i *InvoiceRegistry) LookupInvoice(rHash lntypes.Hash) (channeldb.Invoice, uint32, error) {
// First check the in-memory debug invoice index to see if this is an // First check the in-memory debug invoice index to see if this is an
// existing invoice added for debugging. // existing invoice added for debugging.
i.RLock() i.RLock()
@ -350,7 +438,7 @@ func (i *InvoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice
// SettleInvoice attempts to mark an invoice as settled. If the invoice is a // SettleInvoice attempts to mark an invoice as settled. If the invoice is a
// debug invoice, then this method is a noop as debug invoices are never fully // debug invoice, then this method is a noop as debug invoices are never fully
// settled. // settled.
func (i *InvoiceRegistry) SettleInvoice(rHash chainhash.Hash, func (i *InvoiceRegistry) SettleInvoice(rHash lntypes.Hash,
amtPaid lnwire.MilliSatoshi) error { amtPaid lnwire.MilliSatoshi) error {
i.Lock() i.Lock()
@ -369,25 +457,35 @@ func (i *InvoiceRegistry) SettleInvoice(rHash chainhash.Hash,
// If this isn't a debug invoice, then we'll attempt to settle an // If this isn't a debug invoice, then we'll attempt to settle an
// invoice matching this rHash on disk (if one exists). // invoice matching this rHash on disk (if one exists).
invoice, err := i.cdb.SettleInvoice(rHash, amtPaid) invoice, err := i.cdb.SettleInvoice(rHash, amtPaid)
// Implement idempotency by returning success if the invoice was already
// settled.
if err == channeldb.ErrInvoiceAlreadySettled {
log.Debugf("Invoice %v already settled", rHash)
return nil
}
if err != nil { if err != nil {
return err return err
} }
log.Infof("Payment received: %v", spew.Sdump(invoice)) log.Infof("Payment received: %v", spew.Sdump(invoice))
i.notifyClients(invoice, channeldb.ContractSettled) i.notifyClients(rHash, invoice, channeldb.ContractSettled)
return nil return nil
} }
// notifyClients notifies all currently registered invoice notification clients // notifyClients notifies all currently registered invoice notification clients
// of a newly added/settled invoice. // of a newly added/settled invoice.
func (i *InvoiceRegistry) notifyClients(invoice *channeldb.Invoice, func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
invoice *channeldb.Invoice,
state channeldb.ContractState) { state channeldb.ContractState) {
event := &invoiceEvent{ event := &invoiceEvent{
state: state, state: state,
invoice: invoice, invoice: invoice,
hash: hash,
} }
select { select {
@ -396,13 +494,25 @@ func (i *InvoiceRegistry) notifyClients(invoice *channeldb.Invoice,
} }
} }
// invoiceSubscriptionKit defines that are common to both all invoice
// subscribers and single invoice subscribers.
type invoiceSubscriptionKit struct {
id uint32
inv *InvoiceRegistry
ntfnQueue *queue.ConcurrentQueue
cancelled uint32 // To be used atomically.
cancelChan chan struct{}
wg sync.WaitGroup
}
// InvoiceSubscription represents an intent to receive updates for newly added // InvoiceSubscription represents an intent to receive updates for newly added
// or settled invoices. For each newly added invoice, a copy of the invoice // or settled invoices. For each newly added invoice, a copy of the invoice
// will be sent over the NewInvoices channel. Similarly, for each newly settled // will be sent over the NewInvoices channel. Similarly, for each newly settled
// invoice, a copy of the invoice will be sent over the SettledInvoices // invoice, a copy of the invoice will be sent over the SettledInvoices
// channel. // channel.
type InvoiceSubscription struct { type InvoiceSubscription struct {
cancelled uint32 // To be used atomically. invoiceSubscriptionKit
// NewInvoices is a channel that we'll use to send all newly created // NewInvoices is a channel that we'll use to send all newly created
// invoices with an invoice index greater than the specified // invoices with an invoice index greater than the specified
@ -426,21 +536,23 @@ type InvoiceSubscription struct {
// greater than this will be dispatched before any new notifications // greater than this will be dispatched before any new notifications
// are sent out. // are sent out.
settleIndex uint64 settleIndex uint64
}
ntfnQueue *queue.ConcurrentQueue // SingleInvoiceSubscription represents an intent to receive updates for a
// specific invoice.
type SingleInvoiceSubscription struct {
invoiceSubscriptionKit
id uint32 hash lntypes.Hash
inv *InvoiceRegistry // Updates is a channel that we'll use to send all invoice events for
// the invoice that is subscribed to.
cancelChan chan struct{} Updates chan *channeldb.Invoice
wg sync.WaitGroup
} }
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated // Cancel unregisters the InvoiceSubscription, freeing any previously allocated
// resources. // resources.
func (i *InvoiceSubscription) Cancel() { func (i *invoiceSubscriptionKit) Cancel() {
if !atomic.CompareAndSwapUint32(&i.cancelled, 0, 1) { if !atomic.CompareAndSwapUint32(&i.cancelled, 0, 1) {
return return
} }
@ -456,6 +568,16 @@ func (i *InvoiceSubscription) Cancel() {
i.wg.Wait() i.wg.Wait()
} }
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
select {
case i.ntfnQueue.ChanIn() <- event:
case <-i.inv.quit:
return fmt.Errorf("registry shutting down")
}
return nil
}
// SubscribeNotifications returns an InvoiceSubscription which allows the // SubscribeNotifications returns an InvoiceSubscription which allows the
// caller to receive async notifications when any invoices are settled or // caller to receive async notifications when any invoices are settled or
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start // added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
@ -467,9 +589,11 @@ func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *
SettledInvoices: make(chan *channeldb.Invoice), SettledInvoices: make(chan *channeldb.Invoice),
addIndex: addIndex, addIndex: addIndex,
settleIndex: settleIndex, settleIndex: settleIndex,
invoiceSubscriptionKit: invoiceSubscriptionKit{
inv: i, inv: i,
ntfnQueue: queue.NewConcurrentQueue(20), ntfnQueue: queue.NewConcurrentQueue(20),
cancelChan: make(chan struct{}), cancelChan: make(chan struct{}),
},
} }
client.ntfnQueue.Start() client.ntfnQueue.Start()
@ -534,3 +658,67 @@ func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *
return client return client
} }
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
// caller to receive async notifications for a specific invoice.
func (i *InvoiceRegistry) SubscribeSingleInvoice(
hash lntypes.Hash) *SingleInvoiceSubscription {
client := &SingleInvoiceSubscription{
Updates: make(chan *channeldb.Invoice),
invoiceSubscriptionKit: invoiceSubscriptionKit{
inv: i,
ntfnQueue: queue.NewConcurrentQueue(20),
cancelChan: make(chan struct{}),
},
hash: hash,
}
client.ntfnQueue.Start()
i.clientMtx.Lock()
client.id = i.nextClientID
i.nextClientID++
i.clientMtx.Unlock()
// Before we register this new invoice subscription, we'll launch a new
// goroutine that will proxy all notifications appended to the end of
// the concurrent queue to the two client-side channels the caller will
// feed off of.
i.wg.Add(1)
go func() {
defer i.wg.Done()
for {
select {
// A new invoice event has been sent by the
// invoiceRegistry. We will dispatch the event to the
// client.
case ntfn := <-client.ntfnQueue.ChanOut():
invoiceEvent := ntfn.(*invoiceEvent)
select {
case client.Updates <- invoiceEvent.invoice:
case <-client.cancelChan:
return
case <-i.quit:
return
}
case <-client.cancelChan:
return
case <-i.quit:
return
}
}
}()
select {
case i.newSingleSubscriptions <- client:
case <-i.quit:
}
return client
}

@ -0,0 +1,176 @@
package invoices
import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/btcsuite/btcd/chaincfg"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
testTimeout = 5 * time.Second
preimage = lntypes.Preimage{
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1,
}
hash = preimage.Hash()
// testPayReq is a dummy payment request that does parse properly. It
// has no relation with the real invoice parameters and isn't asserted
// on in this test. LookupInvoice requires this to have a valid value.
testPayReq = "lnbc500u1pwywxzwpp5nd2u9xzq02t0tuf2654as7vma42lwkcjptx4yzfq0umq4swpa7cqdqqcqzysmlpc9ewnydr8rr8dnltyxphdyf6mcqrsd6dml8zajtyhwe6a45d807kxtmzayuf0hh2d9tn478ecxkecdg7c5g85pntupug5kakm7xcpn63zqk"
)
// TestSettleInvoice tests settling of an invoice and related notifications.
func TestSettleInvoice(t *testing.T) {
cdb, cleanup, err := newDB()
if err != nil {
t.Fatal(err)
}
defer cleanup()
// Instantiate and start the invoice registry.
registry := NewRegistry(cdb, &chaincfg.MainNetParams)
err = registry.Start()
if err != nil {
t.Fatal(err)
}
defer registry.Stop()
allSubscriptions := registry.SubscribeNotifications(0, 0)
defer allSubscriptions.Cancel()
// Subscribe to the not yet existing invoice.
subscription := registry.SubscribeSingleInvoice(hash)
defer subscription.Cancel()
if subscription.hash != hash {
t.Fatalf("expected subscription for provided hash")
}
// Add the invoice.
invoice := &channeldb.Invoice{
Terms: channeldb.ContractTerm{
PaymentPreimage: preimage,
Value: lnwire.MilliSatoshi(100000),
},
PaymentRequest: []byte(testPayReq),
}
addIdx, err := registry.AddInvoice(invoice, hash)
if err != nil {
t.Fatal(err)
}
if addIdx != 1 {
t.Fatalf("expected addIndex to start with 1, but got %v",
addIdx)
}
// We expect the open state to be sent to the single invoice subscriber.
select {
case update := <-subscription.Updates:
if update.Terms.State != channeldb.ContractOpen {
t.Fatalf("expected state ContractOpen, but got %v",
update.Terms.State)
}
case <-time.After(testTimeout):
t.Fatal("no update received")
}
// We expect a new invoice notification to be sent out.
select {
case newInvoice := <-allSubscriptions.NewInvoices:
if newInvoice.Terms.State != channeldb.ContractOpen {
t.Fatalf("expected state ContractOpen, but got %v",
newInvoice.Terms.State)
}
case <-time.After(testTimeout):
t.Fatal("no update received")
}
// Settle invoice with a slightly higher amount.
amtPaid := lnwire.MilliSatoshi(100500)
err = registry.SettleInvoice(hash, amtPaid)
if err != nil {
t.Fatal(err)
}
// We expect the settled state to be sent to the single invoice
// subscriber.
select {
case update := <-subscription.Updates:
if update.Terms.State != channeldb.ContractSettled {
t.Fatalf("expected state ContractOpen, but got %v",
update.Terms.State)
}
if update.AmtPaid != amtPaid {
t.Fatal("invoice AmtPaid incorrect")
}
case <-time.After(testTimeout):
t.Fatal("no update received")
}
// We expect a settled notification to be sent out.
select {
case settledInvoice := <-allSubscriptions.SettledInvoices:
if settledInvoice.Terms.State != channeldb.ContractSettled {
t.Fatalf("expected state ContractOpen, but got %v",
settledInvoice.Terms.State)
}
case <-time.After(testTimeout):
t.Fatal("no update received")
}
// Try to settle again.
err = registry.SettleInvoice(hash, amtPaid)
if err != nil {
t.Fatal("expected duplicate settle to succeed")
}
// Try to settle again with a different amount.
err = registry.SettleInvoice(hash, amtPaid+600)
if err != nil {
t.Fatal("expected duplicate settle to succeed")
}
// Check that settled amount remains unchanged.
inv, _, err := registry.LookupInvoice(hash)
if err != nil {
t.Fatal(err)
}
if inv.AmtPaid != amtPaid {
t.Fatal("expected amount to be unchanged")
}
}
func newDB() (*channeldb.DB, func(), error) {
// First, create a temporary directory to be used for the duration of
// this test.
tempDirName, err := ioutil.TempDir("", "channeldb")
if err != nil {
return nil, nil, err
}
// Next, create channeldb for the first time.
cdb, err := channeldb.Open(tempDirName)
if err != nil {
os.RemoveAll(tempDirName)
return nil, nil, err
}
cleanUp := func() {
cdb.Close()
os.RemoveAll(tempDirName)
}
return cdb, cleanUp, nil
}

2
lnd.go

@ -337,7 +337,7 @@ func lndMain() error {
// exported by the rpcServer. // exported by the rpcServer.
rpcServer, err := newRPCServer( rpcServer, err := newRPCServer(
server, macaroonService, cfg.SubRPCServers, serverOpts, server, macaroonService, cfg.SubRPCServers, serverOpts,
proxyOpts, atplManager, tlsConf, proxyOpts, atplManager, server.invoices, tlsConf,
) )
if err != nil { if err != nil {
srvrLog.Errorf("unable to start RPC server: %v", err) srvrLog.Errorf("unable to start RPC server: %v", err)

@ -60,16 +60,6 @@ type Server struct {
// AutopilotServer gRPC service. // AutopilotServer gRPC service.
var _ AutopilotServer = (*Server)(nil) var _ AutopilotServer = (*Server)(nil)
// fileExists reports whether the named file or directory exists.
func fileExists(name string) bool {
if _, err := os.Stat(name); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}
// New returns a new instance of the autopilotrpc Autopilot sub-server. We also // New returns a new instance of the autopilotrpc Autopilot sub-server. We also
// return the set of permissions for the macaroons that we may create within // return the set of permissions for the macaroons that we may create within
// this method. If the macaroons we need aren't found in the filepath, then // this method. If the macaroons we need aren't found in the filepath, then

15
lnrpc/file_utils.go Normal file

@ -0,0 +1,15 @@
package lnrpc
import (
"os"
)
// FileExists reports whether the named file or directory exists.
func FileExists(name string) bool {
if _, err := os.Stat(name); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}

@ -28,6 +28,7 @@ do
echo "Generating protos from ${file}, into ${DIRECTORY}" echo "Generating protos from ${file}, into ${DIRECTORY}"
protoc -I/usr/local/include -I.\ protoc -I/usr/local/include -I.\
-I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \
--go_out=plugins=grpc,paths=source_relative:. \ --go_out=plugins=grpc,paths=source_relative:. \
${file} ${file}
done done

@ -0,0 +1,32 @@
// +build invoicesrpc
package invoicesrpc
import (
"github.com/btcsuite/btcd/chaincfg"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/macaroons"
)
// Config is the primary configuration struct for the invoices RPC server. It
// contains all the items required for the rpc server to carry out its
// duties. The fields with struct tags are meant to be parsed as normal
// configuration options, while if able to be populated, the latter fields MUST
// also be specified.
type Config struct {
// NetworkDir is the main network directory wherein the invoices rpc
// server will find the macaroon named DefaultInvoicesMacFilename.
NetworkDir string
// MacService is the main macaroon service that we'll use to handle
// authentication for the invoices rpc server.
MacService *macaroons.Service
// InvoiceRegistry is a central registry of all the outstanding invoices
// created by the daemon.
InvoiceRegistry *invoices.InvoiceRegistry
// ChainParams are required to properly decode invoice payment requests
// that are marshalled over rpc.
ChainParams *chaincfg.Params
}

@ -0,0 +1,6 @@
// +build !invoicesrpc
package invoicesrpc
// Config is empty for non-invoicesrpc builds.
type Config struct{}

@ -0,0 +1,55 @@
// +build invoicesrpc
package invoicesrpc
import (
"fmt"
"github.com/lightningnetwork/lnd/lnrpc"
)
// createNewSubServer is a helper method that will create the new sub server
// given the main config dispatcher method. If we're unable to find the config
// that is meant for us in the config dispatcher, then we'll exit with an
// error.
func createNewSubServer(configRegistry lnrpc.SubServerConfigDispatcher) (
lnrpc.SubServer, lnrpc.MacaroonPerms, error) {
// We'll attempt to look up the config that we expect, according to our
// subServerName name. If we can't find this, then we'll exit with an
// error, as we're unable to properly initialize ourselves without this
// config.
subServerConf, ok := configRegistry.FetchConfig(subServerName)
if !ok {
return nil, nil, fmt.Errorf("unable to find config for "+
"subserver type %s", subServerName)
}
// Now that we've found an object mapping to our service name, we'll
// ensure that it's the type we need.
config, ok := subServerConf.(*Config)
if !ok {
return nil, nil, fmt.Errorf("wrong type of config for "+
"subserver %s, expected %T got %T", subServerName,
&Config{}, subServerConf)
}
return New(config)
}
func init() {
subServer := &lnrpc.SubServerDriver{
SubServerName: subServerName,
New: func(c lnrpc.SubServerConfigDispatcher) (lnrpc.SubServer,
lnrpc.MacaroonPerms, error) {
return createNewSubServer(c)
},
}
// If the build tag is active, then we'll register ourselves as a
// sub-RPC server within the global lnrpc package namespace.
if err := lnrpc.RegisterSubServer(subServer); err != nil {
panic(fmt.Sprintf("failed to register sub server driver "+
"'%s': %v", subServerName, err))
}
}

@ -0,0 +1,153 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: invoicesrpc/invoices.proto
package invoicesrpc // import "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import lnrpc "github.com/lightningnetwork/lnd/lnrpc"
import _ "google.golang.org/genproto/googleapis/api/annotations"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// InvoicesClient is the client API for Invoices service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type InvoicesClient interface {
// *
// SubscribeSingleInvoice returns a uni-directional stream (server -> client)
// to notify the client of state transitions of the specified invoice.
// Initially the current invoice state is always sent out.
SubscribeSingleInvoice(ctx context.Context, in *lnrpc.PaymentHash, opts ...grpc.CallOption) (Invoices_SubscribeSingleInvoiceClient, error)
}
type invoicesClient struct {
cc *grpc.ClientConn
}
func NewInvoicesClient(cc *grpc.ClientConn) InvoicesClient {
return &invoicesClient{cc}
}
func (c *invoicesClient) SubscribeSingleInvoice(ctx context.Context, in *lnrpc.PaymentHash, opts ...grpc.CallOption) (Invoices_SubscribeSingleInvoiceClient, error) {
stream, err := c.cc.NewStream(ctx, &_Invoices_serviceDesc.Streams[0], "/invoicesrpc.Invoices/SubscribeSingleInvoice", opts...)
if err != nil {
return nil, err
}
x := &invoicesSubscribeSingleInvoiceClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Invoices_SubscribeSingleInvoiceClient interface {
Recv() (*lnrpc.Invoice, error)
grpc.ClientStream
}
type invoicesSubscribeSingleInvoiceClient struct {
grpc.ClientStream
}
func (x *invoicesSubscribeSingleInvoiceClient) Recv() (*lnrpc.Invoice, error) {
m := new(lnrpc.Invoice)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// InvoicesServer is the server API for Invoices service.
type InvoicesServer interface {
// *
// SubscribeSingleInvoice returns a uni-directional stream (server -> client)
// to notify the client of state transitions of the specified invoice.
// Initially the current invoice state is always sent out.
SubscribeSingleInvoice(*lnrpc.PaymentHash, Invoices_SubscribeSingleInvoiceServer) error
}
func RegisterInvoicesServer(s *grpc.Server, srv InvoicesServer) {
s.RegisterService(&_Invoices_serviceDesc, srv)
}
func _Invoices_SubscribeSingleInvoice_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(lnrpc.PaymentHash)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(InvoicesServer).SubscribeSingleInvoice(m, &invoicesSubscribeSingleInvoiceServer{stream})
}
type Invoices_SubscribeSingleInvoiceServer interface {
Send(*lnrpc.Invoice) error
grpc.ServerStream
}
type invoicesSubscribeSingleInvoiceServer struct {
grpc.ServerStream
}
func (x *invoicesSubscribeSingleInvoiceServer) Send(m *lnrpc.Invoice) error {
return x.ServerStream.SendMsg(m)
}
var _Invoices_serviceDesc = grpc.ServiceDesc{
ServiceName: "invoicesrpc.Invoices",
HandlerType: (*InvoicesServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "SubscribeSingleInvoice",
Handler: _Invoices_SubscribeSingleInvoice_Handler,
ServerStreams: true,
},
},
Metadata: "invoicesrpc/invoices.proto",
}
func init() {
proto.RegisterFile("invoicesrpc/invoices.proto", fileDescriptor_invoices_c6414974947f2940)
}
var fileDescriptor_invoices_c6414974947f2940 = []byte{
// 177 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x8e, 0xb1, 0x8e, 0xc2, 0x30,
0x10, 0x44, 0x75, 0xcd, 0xe9, 0x2e, 0x27, 0x5d, 0xe1, 0x82, 0xc2, 0xe2, 0x1b, 0xb2, 0x40, 0x7a,
0x0a, 0x2a, 0xa0, 0x42, 0x4a, 0x47, 0x67, 0x1b, 0xcb, 0x59, 0xe1, 0xec, 0x5a, 0xce, 0x06, 0xc4,
0xdf, 0x23, 0x82, 0x91, 0xd2, 0x8d, 0x66, 0xe6, 0x49, 0xaf, 0xd2, 0x48, 0x37, 0x46, 0xe7, 0x87,
0x9c, 0x1c, 0x7c, 0x72, 0x9d, 0x32, 0x0b, 0xab, 0xbf, 0xd9, 0xa6, 0x97, 0x81, 0x39, 0x44, 0x0f,
0x26, 0x21, 0x18, 0x22, 0x16, 0x23, 0xc8, 0x54, 0xae, 0xfa, 0x37, 0x27, 0xf7, 0x8e, 0x9b, 0x63,
0xf5, 0x73, 0x28, 0x9c, 0xda, 0x56, 0x8b, 0x76, 0xb4, 0x83, 0xcb, 0x68, 0x7d, 0x8b, 0x14, 0xa2,
0x2f, 0x93, 0x52, 0x75, 0xa4, 0x17, 0x73, 0x32, 0x8f, 0xde, 0x93, 0xec, 0xcd, 0xd0, 0xe9, 0xff,
0xd2, 0x95, 0xcf, 0xea, 0x6b, 0xd7, 0x9c, 0xd7, 0x01, 0xa5, 0x1b, 0x6d, 0xed, 0xb8, 0x87, 0x88,
0xa1, 0x13, 0x42, 0x0a, 0xe4, 0xe5, 0xce, 0xf9, 0x0a, 0x91, 0x2e, 0x30, 0x21, 0x30, 0x33, 0xb5,
0xdf, 0x93, 0x47, 0xf3, 0x0c, 0x00, 0x00, 0xff, 0xff, 0xcf, 0x4e, 0x97, 0xf2, 0xdb, 0x00, 0x00,
0x00,
}

@ -0,0 +1,20 @@
syntax = "proto3";
import "google/api/annotations.proto";
import "rpc.proto";
package invoicesrpc;
option go_package = "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc";
// Invoices is a service that can be used to create, accept, settle and cancel
// invoices.
service Invoices {
/**
SubscribeSingleInvoice returns a uni-directional stream (server -> client)
to notify the client of state transitions of the specified invoice.
Initially the current invoice state is always sent out.
*/
rpc SubscribeSingleInvoice (lnrpc.PaymentHash) returns (stream lnrpc.Invoice);
}

@ -0,0 +1,183 @@
// +build invoicesrpc
package invoicesrpc
import (
"context"
"google.golang.org/grpc"
"gopkg.in/macaroon-bakery.v2/bakery"
"io/ioutil"
"os"
"path/filepath"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lntypes"
)
const (
// subServerName is the name of the sub rpc server. We'll use this name
// to register ourselves, and we also require that the main
// SubServerConfigDispatcher instance recognize it as the name of our
// RPC service.
subServerName = "InvoicesRPC"
)
var (
// macaroonOps are the set of capabilities that our minted macaroon (if
// it doesn't already exist) will have.
macaroonOps = []bakery.Op{
{
Entity: "invoices",
Action: "read",
},
}
// macPermissions maps RPC calls to the permissions they require.
macPermissions = map[string][]bakery.Op{
"/invoicesrpc.Invoices/SubscribeSingleInvoice": {{
Entity: "invoices",
Action: "read",
}},
}
// DefaultInvoicesMacFilename is the default name of the invoices
// macaroon that we expect to find via a file handle within the main
// configuration file in this package.
DefaultInvoicesMacFilename = "invoices.macaroon"
)
// Server is a sub-server of the main RPC server: the invoices RPC. This sub
// RPC server allows external callers to access the status of the invoices
// currently active within lnd, as well as configuring it at runtime.
type Server struct {
started int32 // To be used atomically.
shutdown int32 // To be used atomically.
quit chan struct{}
cfg *Config
}
// A compile time check to ensure that Server fully implements the
// InvoicesServer gRPC service.
var _ InvoicesServer = (*Server)(nil)
// New returns a new instance of the invoicesrpc Invoices sub-server. We also
// return the set of permissions for the macaroons that we may create within
// this method. If the macaroons we need aren't found in the filepath, then
// we'll create them on start up. If we're unable to locate, or create the
// macaroons we need, then we'll return with an error.
func New(cfg *Config) (*Server, lnrpc.MacaroonPerms, error) {
// If the path of the invoices macaroon wasn't specified, then we'll
// assume that it's found at the default network directory.
macFilePath := filepath.Join(
cfg.NetworkDir, DefaultInvoicesMacFilename,
)
// Now that we know the full path of the invoices macaroon, we can
// check to see if we need to create it or not.
if !lnrpc.FileExists(macFilePath) && cfg.MacService != nil {
log.Infof("Baking macaroons for invoices RPC Server at: %v",
macFilePath)
// At this point, we know that the invoices macaroon doesn't
// yet, exist, so we need to create it with the help of the
// main macaroon service.
invoicesMac, err := cfg.MacService.Oven.NewMacaroon(
context.Background(), bakery.LatestVersion, nil,
macaroonOps...,
)
if err != nil {
return nil, nil, err
}
invoicesMacBytes, err := invoicesMac.M().MarshalBinary()
if err != nil {
return nil, nil, err
}
err = ioutil.WriteFile(macFilePath, invoicesMacBytes, 0644)
if err != nil {
os.Remove(macFilePath)
return nil, nil, err
}
}
server := &Server{
cfg: cfg,
quit: make(chan struct{}, 1),
}
return server, macPermissions, nil
}
// Start launches any helper goroutines required for the Server to function.
//
// NOTE: This is part of the lnrpc.SubServer interface.
func (s *Server) Start() error {
return nil
}
// Stop signals any active goroutines for a graceful closure.
//
// NOTE: This is part of the lnrpc.SubServer interface.
func (s *Server) Stop() error {
close(s.quit)
return nil
}
// Name returns a unique string representation of the sub-server. This can be
// used to identify the sub-server and also de-duplicate them.
//
// NOTE: This is part of the lnrpc.SubServer interface.
func (s *Server) Name() string {
return subServerName
}
// RegisterWithRootServer will be called by the root gRPC server to direct a sub
// RPC server to register itself with the main gRPC root server. Until this is
// called, each sub-server won't be able to have requests routed towards it.
//
// NOTE: This is part of the lnrpc.SubServer interface.
func (s *Server) RegisterWithRootServer(grpcServer *grpc.Server) error {
// We make sure that we register it with the main gRPC server to ensure
// all our methods are routed properly.
RegisterInvoicesServer(grpcServer, s)
log.Debugf("Invoices RPC server successfully registered with root " +
"gRPC server")
return nil
}
// SubscribeInvoices returns a uni-directional stream (server -> client) for
// notifying the client of invoice state changes.
func (s *Server) SubscribeSingleInvoice(req *lnrpc.PaymentHash,
updateStream Invoices_SubscribeSingleInvoiceServer) error {
hash, err := lntypes.NewHash(req.RHash)
if err != nil {
return err
}
invoiceClient := s.cfg.InvoiceRegistry.SubscribeSingleInvoice(*hash)
defer invoiceClient.Cancel()
for {
select {
case newInvoice := <-invoiceClient.Updates:
rpcInvoice, err := CreateRPCInvoice(
newInvoice, s.cfg.ChainParams,
)
if err != nil {
return err
}
if err := updateStream.Send(rpcInvoice); err != nil {
return err
}
case <-s.quit:
return nil
}
}
}

45
lnrpc/invoicesrpc/log.go Normal file

@ -0,0 +1,45 @@
package invoicesrpc
import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)
// log is a logger that is initialized with no output filters. This means the
// package will not perform any logging by default until the caller requests
// it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger("IRPC", nil))
}
// DisableLog disables all library log output. Logging output is disabled by
// by default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}
// UseLogger uses a specified Logger to output package logging info. This
// should be used in preference to SetLogWriter if the caller is also using
// btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}
// logClosure is used to provide a closure over expensive logging operations so
// don't have to be performed when the logging level doesn't warrant it.
type logClosure func() string
// String invokes the underlying function and returns the result.
func (c logClosure) String() string {
return c()
}
// newLogClosure returns a new closure over a function that returns a string
// which itself provides a Stringer interface so that it can be used with the
// logging system.
func newLogClosure(c func() string) logClosure {
return logClosure(c)
}

119
lnrpc/invoicesrpc/utils.go Normal file

@ -0,0 +1,119 @@
package invoicesrpc
import (
"encoding/hex"
"fmt"
"github.com/btcsuite/btcd/chaincfg"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/zpay32"
)
// CreateRPCInvoice creates an *lnrpc.Invoice from the *channeldb.Invoice.
func CreateRPCInvoice(invoice *channeldb.Invoice,
activeNetParams *chaincfg.Params) (*lnrpc.Invoice, error) {
paymentRequest := string(invoice.PaymentRequest)
decoded, err := zpay32.Decode(paymentRequest, activeNetParams)
if err != nil {
return nil, fmt.Errorf("unable to decode payment request: %v",
err)
}
var descHash []byte
if decoded.DescriptionHash != nil {
descHash = decoded.DescriptionHash[:]
}
fallbackAddr := ""
if decoded.FallbackAddr != nil {
fallbackAddr = decoded.FallbackAddr.String()
}
settleDate := int64(0)
if !invoice.SettleDate.IsZero() {
settleDate = invoice.SettleDate.Unix()
}
// Expiry time will default to 3600 seconds if not specified
// explicitly.
expiry := int64(decoded.Expiry().Seconds())
// The expiry will default to 9 blocks if not specified explicitly.
cltvExpiry := decoded.MinFinalCLTVExpiry()
// Convert between the `lnrpc` and `routing` types.
routeHints := CreateRPCRouteHints(decoded.RouteHints)
preimage := invoice.Terms.PaymentPreimage
satAmt := invoice.Terms.Value.ToSatoshis()
satAmtPaid := invoice.AmtPaid.ToSatoshis()
isSettled := invoice.Terms.State == channeldb.ContractSettled
var state lnrpc.Invoice_InvoiceState
switch invoice.Terms.State {
case channeldb.ContractOpen:
state = lnrpc.Invoice_OPEN
case channeldb.ContractSettled:
state = lnrpc.Invoice_SETTLED
default:
return nil, fmt.Errorf("unknown invoice state")
}
return &lnrpc.Invoice{
Memo: string(invoice.Memo[:]),
Receipt: invoice.Receipt[:],
RHash: decoded.PaymentHash[:],
RPreimage: preimage[:],
Value: int64(satAmt),
CreationDate: invoice.CreationDate.Unix(),
SettleDate: settleDate,
Settled: isSettled,
PaymentRequest: paymentRequest,
DescriptionHash: descHash,
Expiry: expiry,
CltvExpiry: cltvExpiry,
FallbackAddr: fallbackAddr,
RouteHints: routeHints,
AddIndex: invoice.AddIndex,
Private: len(routeHints) > 0,
SettleIndex: invoice.SettleIndex,
AmtPaidSat: int64(satAmtPaid),
AmtPaidMsat: int64(invoice.AmtPaid),
AmtPaid: int64(invoice.AmtPaid),
State: state,
}, nil
}
// CreateRPCRouteHints takes in the decoded form of an invoice's route hints
// and converts them into the lnrpc type.
func CreateRPCRouteHints(routeHints [][]routing.HopHint) []*lnrpc.RouteHint {
var res []*lnrpc.RouteHint
for _, route := range routeHints {
hopHints := make([]*lnrpc.HopHint, 0, len(route))
for _, hop := range route {
pubKey := hex.EncodeToString(
hop.NodeID.SerializeCompressed(),
)
hint := &lnrpc.HopHint{
NodeId: pubKey,
ChanId: hop.ChannelID,
FeeBaseMsat: hop.FeeBaseMSat,
FeeProportionalMillionths: hop.FeeProportionalMillionths,
CltvExpiryDelta: uint32(hop.CLTVExpiryDelta),
}
hopHints = append(hopHints, hint)
}
routeHint := &lnrpc.RouteHint{HopHints: hopHints}
res = append(res, routeHint)
}
return res
}

@ -69,16 +69,6 @@ type Server struct {
// gRPC service. // gRPC service.
var _ SignerServer = (*Server)(nil) var _ SignerServer = (*Server)(nil)
// fileExists reports whether the named file or directory exists.
func fileExists(name string) bool {
if _, err := os.Stat(name); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}
// New returns a new instance of the signrpc Signer sub-server. We also return // New returns a new instance of the signrpc Signer sub-server. We also return
// the set of permissions for the macaroons that we may create within this // the set of permissions for the macaroons that we may create within this
// method. If the macaroons we need aren't found in the filepath, then we'll // method. If the macaroons we need aren't found in the filepath, then we'll
@ -96,7 +86,7 @@ func New(cfg *Config) (*Server, lnrpc.MacaroonPerms, error) {
// Now that we know the full path of the signer macaroon, we can check // Now that we know the full path of the signer macaroon, we can check
// to see if we need to create it or not. // to see if we need to create it or not.
macFilePath := cfg.SignerMacPath macFilePath := cfg.SignerMacPath
if cfg.MacService != nil && !fileExists(macFilePath) { if cfg.MacService != nil && !lnrpc.FileExists(macFilePath) {
log.Infof("Making macaroons for Signer RPC Server at: %v", log.Infof("Making macaroons for Signer RPC Server at: %v",
macFilePath) macFilePath)

@ -93,16 +93,6 @@ type WalletKit struct {
// WalletKitServer gRPC service. // WalletKitServer gRPC service.
var _ WalletKitServer = (*WalletKit)(nil) var _ WalletKitServer = (*WalletKit)(nil)
// fileExists reports whether the named file or directory exists.
func fileExists(name string) bool {
if _, err := os.Stat(name); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}
// New creates a new instance of the WalletKit sub-RPC server. // New creates a new instance of the WalletKit sub-RPC server.
func New(cfg *Config) (*WalletKit, lnrpc.MacaroonPerms, error) { func New(cfg *Config) (*WalletKit, lnrpc.MacaroonPerms, error) {
// If the path of the wallet kit macaroon wasn't specified, then we'll // If the path of the wallet kit macaroon wasn't specified, then we'll
@ -116,7 +106,7 @@ func New(cfg *Config) (*WalletKit, lnrpc.MacaroonPerms, error) {
// Now that we know the full path of the wallet kit macaroon, we can // Now that we know the full path of the wallet kit macaroon, we can
// check to see if we need to create it or not. // check to see if we need to create it or not.
macFilePath := cfg.WalletKitMacPath macFilePath := cfg.WalletKitMacPath
if !fileExists(macFilePath) && cfg.MacService != nil { if !lnrpc.FileExists(macFilePath) && cfg.MacService != nil {
log.Infof("Baking macaroons for WalletKit RPC Server at: %v", log.Infof("Baking macaroons for WalletKit RPC Server at: %v",
macFilePath) macFilePath)

49
lntypes/hash.go Normal file

@ -0,0 +1,49 @@
package lntypes
import (
"encoding/hex"
"fmt"
)
// HashSize of array used to store hashes.
const HashSize = 32
// Hash is used in several of the lightning messages and common structures. It
// typically represents a payment hash.
type Hash [HashSize]byte
// String returns the Hash as a hexadecimal string.
func (hash Hash) String() string {
return hex.EncodeToString(hash[:])
}
// NewHash returns a new Hash from a byte slice. An error is returned if
// the number of bytes passed in is not HashSize.
func NewHash(newHash []byte) (*Hash, error) {
nhlen := len(newHash)
if nhlen != HashSize {
return nil, fmt.Errorf("invalid hash length of %v, want %v",
nhlen, HashSize)
}
var hash Hash
copy(hash[:], newHash)
return &hash, nil
}
// NewHashFromStr creates a Hash from a hex hash string.
func NewHashFromStr(newHash string) (*Hash, error) {
// Return error if hash string is of incorrect length.
if len(newHash) != HashSize*2 {
return nil, fmt.Errorf("invalid hash string length of %v, "+
"want %v", len(newHash), HashSize*2)
}
hash, err := hex.DecodeString(newHash)
if err != nil {
return nil, err
}
return NewHash(hash)
}

55
lntypes/preimage.go Normal file

@ -0,0 +1,55 @@
package lntypes
import (
"crypto/sha256"
"encoding/hex"
"fmt"
)
// PreimageSize of array used to store preimagees.
const PreimageSize = 32
// Preimage is used in several of the lightning messages and common structures. It
// represents a payment preimage.
type Preimage [PreimageSize]byte
// String returns the Preimage as a hexadecimal string.
func (p Preimage) String() string {
return hex.EncodeToString(p[:])
}
// NewPreimage returns a new Preimage from a byte slice. An error is returned if
// the number of bytes passed in is not PreimageSize.
func NewPreimage(newPreimage []byte) (*Preimage, error) {
nhlen := len(newPreimage)
if nhlen != PreimageSize {
return nil, fmt.Errorf("invalid preimage length of %v, want %v",
nhlen, PreimageSize)
}
var preimage Preimage
copy(preimage[:], newPreimage)
return &preimage, nil
}
// NewPreimageFromStr creates a Preimage from a hex preimage string.
func NewPreimageFromStr(newPreimage string) (*Preimage, error) {
// Return error if preimage string is of incorrect length.
if len(newPreimage) != PreimageSize*2 {
return nil, fmt.Errorf("invalid preimage string length of %v, "+
"want %v", len(newPreimage), PreimageSize*2)
}
preimage, err := hex.DecodeString(newPreimage)
if err != nil {
return nil, err
}
return NewPreimage(preimage)
}
// Hash returns the sha256 hash of the preimage.
func (p *Preimage) Hash() Hash {
return Hash(sha256.Sum256(p[:]))
}

4
log.go

@ -21,6 +21,7 @@ import (
"github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lnrpc/autopilotrpc" "github.com/lightningnetwork/lnd/lnrpc/autopilotrpc"
"github.com/lightningnetwork/lnd/lnrpc/chainrpc" "github.com/lightningnetwork/lnd/lnrpc/chainrpc"
"github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
"github.com/lightningnetwork/lnd/lnrpc/signrpc" "github.com/lightningnetwork/lnd/lnrpc/signrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
@ -78,6 +79,7 @@ var (
nannLog = build.NewSubLogger("NANN", backendLog.Logger) nannLog = build.NewSubLogger("NANN", backendLog.Logger)
wtwrLog = build.NewSubLogger("WTWR", backendLog.Logger) wtwrLog = build.NewSubLogger("WTWR", backendLog.Logger)
ntfrLog = build.NewSubLogger("NTFR", backendLog.Logger) ntfrLog = build.NewSubLogger("NTFR", backendLog.Logger)
irpcLog = build.NewSubLogger("IRPC", backendLog.Logger)
) )
// Initialize package-global logger variables. // Initialize package-global logger variables.
@ -102,6 +104,7 @@ func init() {
netann.UseLogger(nannLog) netann.UseLogger(nannLog)
watchtower.UseLogger(wtwrLog) watchtower.UseLogger(wtwrLog)
chainrpc.UseLogger(ntfrLog) chainrpc.UseLogger(ntfrLog)
invoicesrpc.UseLogger(irpcLog)
} }
// subsystemLoggers maps each subsystem identifier to its associated logger. // subsystemLoggers maps each subsystem identifier to its associated logger.
@ -132,6 +135,7 @@ var subsystemLoggers = map[string]btclog.Logger{
"NANN": nannLog, "NANN": nannLog,
"WTWR": wtwrLog, "WTWR": wtwrLog,
"NTFR": ntfnLog, "NTFR": ntfnLog,
"IRPC": irpcLog,
} }
// initLogRotator initializes the logging rotator to write logs to logFile and // initLogRotator initializes the logging rotator to write logs to logFile and

@ -35,6 +35,7 @@ import (
"github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/macaroons" "github.com/lightningnetwork/lnd/macaroons"
@ -397,6 +398,7 @@ var _ lnrpc.LightningServer = (*rpcServer)(nil)
func newRPCServer(s *server, macService *macaroons.Service, func newRPCServer(s *server, macService *macaroons.Service,
subServerCgs *subRPCServerConfigs, serverOpts []grpc.ServerOption, subServerCgs *subRPCServerConfigs, serverOpts []grpc.ServerOption,
restServerOpts []grpc.DialOption, atpl *autopilot.Manager, restServerOpts []grpc.DialOption, atpl *autopilot.Manager,
invoiceRegistry *invoices.InvoiceRegistry,
tlsCfg *tls.Config) (*rpcServer, error) { tlsCfg *tls.Config) (*rpcServer, error) {
var ( var (
@ -408,7 +410,8 @@ func newRPCServer(s *server, macService *macaroons.Service,
// the dependencies they need are properly populated within each sub // the dependencies they need are properly populated within each sub
// server configuration struct. // server configuration struct.
err := subServerCgs.PopulateDependencies( err := subServerCgs.PopulateDependencies(
s.cc, networkDir, macService, atpl, s.cc, networkDir, macService, atpl, invoiceRegistry,
activeNetParams.Params,
) )
if err != nil { if err != nil {
return nil, err return nil, err
@ -3383,7 +3386,7 @@ func (r *rpcServer) AddInvoice(ctx context.Context,
) )
// With all sanity checks passed, write the invoice to the database. // With all sanity checks passed, write the invoice to the database.
addIndex, err := r.server.invoices.AddInvoice(newInvoice) addIndex, err := r.server.invoices.AddInvoice(newInvoice, rHash)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -3395,111 +3398,6 @@ func (r *rpcServer) AddInvoice(ctx context.Context,
}, nil }, nil
} }
// createRPCInvoice creates an *lnrpc.Invoice from the *channeldb.Invoice.
func createRPCInvoice(invoice *channeldb.Invoice) (*lnrpc.Invoice, error) {
paymentRequest := string(invoice.PaymentRequest)
decoded, err := zpay32.Decode(paymentRequest, activeNetParams.Params)
if err != nil {
return nil, fmt.Errorf("unable to decode payment request: %v",
err)
}
descHash := []byte("")
if decoded.DescriptionHash != nil {
descHash = decoded.DescriptionHash[:]
}
fallbackAddr := ""
if decoded.FallbackAddr != nil {
fallbackAddr = decoded.FallbackAddr.String()
}
settleDate := int64(0)
if !invoice.SettleDate.IsZero() {
settleDate = invoice.SettleDate.Unix()
}
// Expiry time will default to 3600 seconds if not specified
// explicitly.
expiry := int64(decoded.Expiry().Seconds())
// The expiry will default to 9 blocks if not specified explicitly.
cltvExpiry := decoded.MinFinalCLTVExpiry()
// Convert between the `lnrpc` and `routing` types.
routeHints := createRPCRouteHints(decoded.RouteHints)
preimage := invoice.Terms.PaymentPreimage
satAmt := invoice.Terms.Value.ToSatoshis()
satAmtPaid := invoice.AmtPaid.ToSatoshis()
isSettled := invoice.Terms.State == channeldb.ContractSettled
var state lnrpc.Invoice_InvoiceState
switch invoice.Terms.State {
case channeldb.ContractOpen:
state = lnrpc.Invoice_OPEN
case channeldb.ContractSettled:
state = lnrpc.Invoice_SETTLED
default:
return nil, fmt.Errorf("unknown invoice state")
}
return &lnrpc.Invoice{
Memo: string(invoice.Memo[:]),
Receipt: invoice.Receipt[:],
RHash: decoded.PaymentHash[:],
RPreimage: preimage[:],
Value: int64(satAmt),
CreationDate: invoice.CreationDate.Unix(),
SettleDate: settleDate,
Settled: isSettled,
PaymentRequest: paymentRequest,
DescriptionHash: descHash,
Expiry: expiry,
CltvExpiry: cltvExpiry,
FallbackAddr: fallbackAddr,
RouteHints: routeHints,
AddIndex: invoice.AddIndex,
Private: len(routeHints) > 0,
SettleIndex: invoice.SettleIndex,
AmtPaidSat: int64(satAmtPaid),
AmtPaidMsat: int64(invoice.AmtPaid),
AmtPaid: int64(invoice.AmtPaid),
State: state,
}, nil
}
// createRPCRouteHints takes in the decoded form of an invoice's route hints
// and converts them into the lnrpc type.
func createRPCRouteHints(routeHints [][]routing.HopHint) []*lnrpc.RouteHint {
var res []*lnrpc.RouteHint
for _, route := range routeHints {
hopHints := make([]*lnrpc.HopHint, 0, len(route))
for _, hop := range route {
pubKey := hex.EncodeToString(
hop.NodeID.SerializeCompressed(),
)
hint := &lnrpc.HopHint{
NodeId: pubKey,
ChanId: hop.ChannelID,
FeeBaseMsat: hop.FeeBaseMSat,
FeeProportionalMillionths: hop.FeeProportionalMillionths,
CltvExpiryDelta: uint32(hop.CLTVExpiryDelta),
}
hopHints = append(hopHints, hint)
}
routeHint := &lnrpc.RouteHint{HopHints: hopHints}
res = append(res, routeHint)
}
return res
}
// LookupInvoice attempts to look up an invoice according to its payment hash. // LookupInvoice attempts to look up an invoice according to its payment hash.
// The passed payment hash *must* be exactly 32 bytes, if not an error is // The passed payment hash *must* be exactly 32 bytes, if not an error is
// returned. // returned.
@ -3542,7 +3440,9 @@ func (r *rpcServer) LookupInvoice(ctx context.Context,
return spew.Sdump(invoice) return spew.Sdump(invoice)
})) }))
rpcInvoice, err := createRPCInvoice(&invoice) rpcInvoice, err := invoicesrpc.CreateRPCInvoice(
&invoice, activeNetParams.Params,
)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -3582,7 +3482,9 @@ func (r *rpcServer) ListInvoices(ctx context.Context,
LastIndexOffset: invoiceSlice.LastIndexOffset, LastIndexOffset: invoiceSlice.LastIndexOffset,
} }
for i, invoice := range invoiceSlice.Invoices { for i, invoice := range invoiceSlice.Invoices {
resp.Invoices[i], err = createRPCInvoice(&invoice) resp.Invoices[i], err = invoicesrpc.CreateRPCInvoice(
&invoice, activeNetParams.Params,
)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -3604,7 +3506,9 @@ func (r *rpcServer) SubscribeInvoices(req *lnrpc.InvoiceSubscription,
for { for {
select { select {
case newInvoice := <-invoiceClient.NewInvoices: case newInvoice := <-invoiceClient.NewInvoices:
rpcInvoice, err := createRPCInvoice(newInvoice) rpcInvoice, err := invoicesrpc.CreateRPCInvoice(
newInvoice, activeNetParams.Params,
)
if err != nil { if err != nil {
return err return err
} }
@ -3614,7 +3518,9 @@ func (r *rpcServer) SubscribeInvoices(req *lnrpc.InvoiceSubscription,
} }
case settledInvoice := <-invoiceClient.SettledInvoices: case settledInvoice := <-invoiceClient.SettledInvoices:
rpcInvoice, err := createRPCInvoice(settledInvoice) rpcInvoice, err := invoicesrpc.CreateRPCInvoice(
settledInvoice, activeNetParams.Params,
)
if err != nil { if err != nil {
return err return err
} }
@ -4518,7 +4424,7 @@ func (r *rpcServer) DecodePayReq(ctx context.Context,
expiry := int64(payReq.Expiry().Seconds()) expiry := int64(payReq.Expiry().Seconds())
// Convert between the `lnrpc` and `routing` types. // Convert between the `lnrpc` and `routing` types.
routeHints := createRPCRouteHints(payReq.RouteHints) routeHints := invoicesrpc.CreateRPCRouteHints(payReq.RouteHints)
amt := int64(0) amt := int64(0)
if payReq.MilliSat != nil { if payReq.MilliSat != nil {

@ -4,9 +4,12 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"github.com/btcsuite/btcd/chaincfg"
"github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lnrpc/autopilotrpc" "github.com/lightningnetwork/lnd/lnrpc/autopilotrpc"
"github.com/lightningnetwork/lnd/lnrpc/chainrpc" "github.com/lightningnetwork/lnd/lnrpc/chainrpc"
"github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
"github.com/lightningnetwork/lnd/lnrpc/signrpc" "github.com/lightningnetwork/lnd/lnrpc/signrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/macaroons" "github.com/lightningnetwork/lnd/macaroons"
@ -37,6 +40,10 @@ type subRPCServerConfigs struct {
// client to be notified of certain on-chain events (new blocks, // client to be notified of certain on-chain events (new blocks,
// confirmations, spends). // confirmations, spends).
ChainRPC *chainrpc.Config `group:"chainrpc" namespace:"chainrpc"` ChainRPC *chainrpc.Config `group:"chainrpc" namespace:"chainrpc"`
// InvoicesRPC is a sub-RPC server that exposes invoice related methods
// as a gRPC service.
InvoicesRPC *invoicesrpc.Config `group:"invoicesrpc" namespace:"invoicesrpc"`
} }
// PopulateDependencies attempts to iterate through all the sub-server configs // PopulateDependencies attempts to iterate through all the sub-server configs
@ -47,7 +54,9 @@ type subRPCServerConfigs struct {
// FetchConfig method. // FetchConfig method.
func (s *subRPCServerConfigs) PopulateDependencies(cc *chainControl, func (s *subRPCServerConfigs) PopulateDependencies(cc *chainControl,
networkDir string, macService *macaroons.Service, networkDir string, macService *macaroons.Service,
atpl *autopilot.Manager) error { atpl *autopilot.Manager,
invoiceRegistry *invoices.InvoiceRegistry,
activeNetParams *chaincfg.Params) error {
// First, we'll use reflect to obtain a version of the config struct // First, we'll use reflect to obtain a version of the config struct
// that allows us to programmatically inspect its fields. // that allows us to programmatically inspect its fields.
@ -125,6 +134,22 @@ func (s *subRPCServerConfigs) PopulateDependencies(cc *chainControl,
reflect.ValueOf(cc.chainNotifier), reflect.ValueOf(cc.chainNotifier),
) )
case *invoicesrpc.Config:
subCfgValue := extractReflectValue(cfg)
subCfgValue.FieldByName("NetworkDir").Set(
reflect.ValueOf(networkDir),
)
subCfgValue.FieldByName("MacService").Set(
reflect.ValueOf(macService),
)
subCfgValue.FieldByName("InvoiceRegistry").Set(
reflect.ValueOf(invoiceRegistry),
)
subCfgValue.FieldByName("ChainParams").Set(
reflect.ValueOf(activeNetParams),
)
default: default:
return fmt.Errorf("unknown field: %v, %T", fieldName, return fmt.Errorf("unknown field: %v, %T", fieldName,
cfg) cfg)

@ -3,10 +3,10 @@ package main
import ( import (
"sync" "sync"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
) )
@ -72,7 +72,7 @@ func (p *preimageBeacon) LookupPreimage(payHash []byte) ([]byte, bool) {
// First, we'll check the invoice registry to see if we already know of // First, we'll check the invoice registry to see if we already know of
// the preimage as it's on that we created ourselves. // the preimage as it's on that we created ourselves.
var invoiceKey chainhash.Hash var invoiceKey lntypes.Hash
copy(invoiceKey[:], payHash) copy(invoiceKey[:], payHash)
invoice, _, err := p.invoices.LookupInvoice(invoiceKey) invoice, _, err := p.invoices.LookupInvoice(invoiceKey)
switch { switch {