Merge pull request #16 from lightningnetwork/chainnotifier-interface

Refactor the chainntfs package to accommodate future ChainNotifier implementations
This commit is contained in:
Olaoluwa Osuntokun 2016-09-02 01:08:56 -05:00
commit 124b8b026c
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
16 changed files with 290 additions and 111 deletions

@ -1 +0,0 @@
package chainntnfs

@ -7,13 +7,20 @@ import (
"sync/atomic"
"time"
"github.com/lightningnetwork/lnd/chainntfs"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/roasbeef/btcd/btcjson"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcrpcclient"
"github.com/roasbeef/btcutil"
)
const (
// notifierType uniquely identifies this concrete implementation of the
// ChainNotifier interface.
notifierType = "btcd"
)
// BtcdNotifier implements the ChainNotifier interface using btcd's websockets
// notifications. Multiple concurrent clients are supported. All notifications
// are achieved via non-blocking sends on client channels.
@ -42,10 +49,10 @@ type BtcdNotifier struct {
// Ensure BtcdNotifier implements the ChainNotifier interface at compile time.
var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil)
// NewBtcdNotifier returns a new BtcdNotifier instance. This function assumes
// the btcd node detailed in the passed configuration is already running, and
// New returns a new BtcdNotifier instance. This function assumes the btcd node
// detailed in the passed configuration is already running, and
// willing to accept new websockets clients.
func NewBtcdNotifier(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) {
func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) {
notifier := &BtcdNotifier{
notificationRegistry: make(chan interface{}),
@ -66,8 +73,8 @@ func NewBtcdNotifier(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) {
OnRedeemingTx: notifier.onRedeemingTx,
}
// Disable connecting to btcd within the btcrpcclient.New method. We defer
// establishing the connection to our .Start() method.
// Disable connecting to btcd within the btcrpcclient.New method. We
// defer establishing the connection to our .Start() method.
config.DisableConnectOnNew = true
config.DisableAutoReconnect = false
chainConn, err := btcrpcclient.New(config, ntfnCallbacks)

@ -1,17 +1,21 @@
package btcdnotify
// confEntry...
// confEntry represents an entry in the min-confirmation heap. .
type confEntry struct {
*confirmationsNotification
triggerHeight uint32
}
// confirmationHeap...
// confirmationHeap is a list of confEntries sorted according to nearest
// "confirmation" height.Each entry within the min-confirmation heap is sorted
// according to the smallest dleta from the current blockheight to the
// triggerHeight of the next entry confirmationHeap
type confirmationHeap struct {
items []*confEntry
}
// newConfirmationHeap returns a new confirmationHeap with zero items.
func newConfirmationHeap() *confirmationHeap {
var confItems []*confEntry
return &confirmationHeap{confItems}

@ -0,0 +1,40 @@
package btcdnotify
import (
"fmt"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/roasbeef/btcrpcclient"
)
// createNewNotifier creates a new instance of the ChainNotifier interface
// implemented by BtcdNotifier.
func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) {
if len(args) != 1 {
return nil, fmt.Errorf("incorrect number of arguments to .New(...), "+
"expected 1, instead passed %v", len(args))
}
config, ok := args[0].(*btcrpcclient.ConnConfig)
if !ok {
return nil, fmt.Errorf("first argument to btcdnotifier.New is " +
"incorrect, expected a *btcrpcclient.ConnConfig")
}
return New(config)
}
// init registers a driver for the BtcdNotifier concrete implementation of the
// chainntnfs.ChainNotifier interface.
func init() {
// Register the driver.
notifier := &chainntnfs.NotifierDriver{
NotifierType: notifierType,
New: createNewNotifier,
}
if err := chainntnfs.RegisterNotifier(notifier); err != nil {
panic(fmt.Sprintf("failed to register notifier driver '%s': %v",
notifierType, err))
}
}

@ -1,6 +1,11 @@
package chainntnfs
import "github.com/roasbeef/btcd/wire"
import (
"fmt"
"sync"
"github.com/roasbeef/btcd/wire"
)
// ChainNotifier represents a trusted source to receive notifications concerning
// targeted events on the Bitcoin blockchain. The interface specification is
@ -104,3 +109,74 @@ type BlockEpoch struct {
type BlockEpochEvent struct {
Epochs chan *BlockEpoch // MUST be buffered.
}
// NotifierDriver represents a "driver" for a particular interface. A driver is
// indentified by a globally unique string identifier along with a 'New()'
// method which is responsible for initializing a particular ChainNotifier
// concrete implementation.
type NotifierDriver struct {
// NotifierType is a string which uniquely identifes the ChainNotifier
// that this driver, drives.
NotifierType string
// New creates a new instance of a concrete ChainNotifier
// implementation given a variadic set up arguments. The function takes
// a varidaic number of interface paramters in order to provide
// initialization flexibility, thereby accomodating several potential
// ChainNotifier implementations.
New func(args ...interface{}) (ChainNotifier, error)
}
var (
notifiers = make(map[string]*NotifierDriver)
registerMtx sync.Mutex
)
// RegisteredNotifiers returns a slice of all currently registered notifiers.
//
// NOTE: This function is safe for concurrent access.
func RegisteredNotifiers() []*NotifierDriver {
registerMtx.Lock()
defer registerMtx.Unlock()
drivers := make([]*NotifierDriver, 0, len(notifiers))
for _, driver := range notifiers {
drivers = append(drivers, driver)
}
return drivers
}
// RegisterNotifier registers a NotifierDriver which is capable of driving a
// concrete ChainNotifier interface. In the case that this driver has already
// been registered, an error is returned.
//
// NOTE: This function is safe for concurrent access.
func RegisterNotifier(driver *NotifierDriver) error {
registerMtx.Lock()
defer registerMtx.Unlock()
if _, ok := notifiers[driver.NotifierType]; ok {
return fmt.Errorf("notifier already registered")
}
notifiers[driver.NotifierType] = driver
return nil
}
// SupportedNotifiers returns a slice of strings that represent the database
// drivers that have been registered and are therefore supported.
//
// NOTE: This function is safe for concurrent access.
func SupportedNotifiers() []string {
registerMtx.Lock()
defer registerMtx.Unlock()
supportedNotifiers := make([]string, 0, len(notifiers))
for driverName := range notifiers {
supportedNotifiers = append(supportedNotifiers, driverName)
}
return supportedNotifiers
}

@ -1,11 +1,13 @@
package btcdnotify
package chainntnfs_test
import (
"bytes"
"testing"
"time"
"github.com/lightningnetwork/lnd/chainntfs"
"github.com/lightningnetwork/lnd/chainntnfs"
_ "github.com/lightningnetwork/lnd/chainntnfs/btcdnotify"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg"
"github.com/roasbeef/btcd/rpctest"
@ -181,7 +183,7 @@ func testSpendNotification(miner *rpctest.Harness,
notifier chainntnfs.ChainNotifier, t *testing.T) {
// We'd like to test the spend notifiations for all
// chainntnfs.ChainNotifier concrete implemenations.
// ChainNotifier concrete implemenations.
//
// To do so, we first create a new output to our test target
// address.
@ -286,15 +288,22 @@ var ntfnTests = []func(node *rpctest.Harness, notifier chainntnfs.ChainNotifier,
testSpendNotification,
}
// TODO(roasbeef): make test generic across all interfaces?
// * indeed!
// * requires interface implementation registration
func TestBtcdNotifier(t *testing.T) {
// TestInterfaces tests all registered interfaces with a unified set of tests
// which excersie each of the required methods found within the ChainNotifier
// interface.
//
// NOTE: In the future, when additional implementations of the ChainNotifier
// interface have been implemented, in order to ensure the new concrete
// implementation is automatically tested, two steps must be undertaken. First,
// one needs add a "non-captured" (_) import from the new sub-package. This
// import should trigger an init() method within the package which registeres
// the interface. Second, an additional case in the switch within the main loop
// below needs to be added which properly initializes the interface.
func TestInterfaces(t *testing.T) {
// Initialize the harness around a btcd node which will serve as our
// dedicated miner to generate blocks, cause re-orgs, etc. We'll set
// up this node with a chain length of 125, so we have plentyyy of BTC
// to play around with.
// dedicated miner to generate blocks, cause re-orgs, etc. We'll set up
// this node with a chain length of 125, so we have plentyyy of BTC to
// play around with.
miner, err := rpctest.New(netParams, nil, nil)
if err != nil {
t.Fatalf("unable to create mining node: %v", err)
@ -304,17 +313,30 @@ func TestBtcdNotifier(t *testing.T) {
t.Fatalf("unable to set up mining node: %v", err)
}
nodeConfig := miner.RPCConfig()
notifier, err := NewBtcdNotifier(&nodeConfig)
rpcConfig := miner.RPCConfig()
var notifier chainntnfs.ChainNotifier
for _, notifierDriver := range chainntnfs.RegisteredNotifiers() {
notifierType := notifierDriver.NotifierType
switch notifierType {
case "btcd":
notifier, err = notifierDriver.New(&rpcConfig)
if err != nil {
t.Fatalf("unable to create notifier: %v", err)
t.Fatalf("unable to create %v notifier: %v",
notifierType, err)
}
}
if err := notifier.Start(); err != nil {
t.Fatalf("unable to start notifier: %v", err)
t.Fatalf("unable to start notifier %v: %v",
notifierType, err)
}
defer notifier.Stop()
for _, ntfnTest := range ntfnTests {
ntfnTest(miner, notifier, t)
}
notifier.Stop()
}
}

@ -89,6 +89,7 @@ func newHtlcSwitch() *htlcSwitch {
linkControl: make(chan interface{}),
htlcPlex: make(chan *htlcPacket, htlcQueueSize),
outgoingPayments: make(chan *htlcPacket, htlcQueueSize),
quit: make(chan struct{}),
}
}

37
lnd.go

@ -14,9 +14,11 @@ import (
"google.golang.org/grpc"
"github.com/lightningnetwork/lnd/chainntnfs/btcdnotify"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcrpcclient"
)
var (
@ -94,9 +96,29 @@ func lndMain() error {
return err
}
// Create, and start the lnwallet, which handles the core payment channel
// logic, and exposes control via proxy state machines.
config := &lnwallet.Config{
btcdHost := fmt.Sprintf("%v:%v", loadedConfig.RPCHost, activeNetParams.rpcPort)
btcdUser := loadedConfig.RPCUser
btcdPass := loadedConfig.RPCPass
// TODO(roasbeef): parse config here and select chosen notifier instead
rpcConfig := &btcrpcclient.ConnConfig{
Host: btcdHost,
Endpoint: "ws",
User: btcdUser,
Pass: btcdPass,
Certificates: rpcCert,
DisableTLS: false,
DisableConnectOnNew: true,
DisableAutoReconnect: false,
}
notifier, err := btcdnotify.New(rpcConfig)
if err != nil {
return err
}
// Create, and start the lnwallet, which handles the core payment
// channel logic, and exposes control via proxy state machines.
walletConfig := &lnwallet.Config{
PrivatePass: []byte("hello"),
DataDir: filepath.Join(loadedConfig.DataDir, "lnwallet"),
RpcHost: fmt.Sprintf("%v:%v", rpcIP[0], activeNetParams.rpcPort),
@ -105,7 +127,7 @@ func lndMain() error {
CACert: rpcCert,
NetParams: activeNetParams.Params,
}
wallet, err := lnwallet.NewLightningWallet(config, chanDB)
wallet, err := lnwallet.NewLightningWallet(walletConfig, chanDB, notifier)
if err != nil {
fmt.Printf("unable to create wallet: %v\n", err)
return err
@ -124,12 +146,15 @@ func lndMain() error {
defaultListenAddrs := []string{
net.JoinHostPort("", strconv.Itoa(loadedConfig.PeerPort)),
}
server, err := newServer(defaultListenAddrs, wallet, chanDB)
server, err := newServer(defaultListenAddrs, notifier, wallet, chanDB)
if err != nil {
srvrLog.Errorf("unable to create server: %v\n", err)
return err
}
server.Start()
if err := server.Start(); err != nil {
srvrLog.Errorf("unable to create to start: %v\n", err)
return err
}
addInterruptHandler(func() {
ltndLog.Infof("Gracefully shutting down the server...")

@ -8,7 +8,7 @@ import (
"github.com/btcsuite/fastsha256"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntfs"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"

@ -13,9 +13,9 @@ import (
// such as: uspv, btcwallet, Bitcoin Core, Electrum, etc. This interface then
// serves as a "base wallet", with Lightning Network awareness taking place at
// a "higher" level of abstraction. Essentially, an overlay wallet.
// Implementors of this interface must closely adhere to the documented behavior
// of all interface methods in order to ensure identical behavior accross all
// concrete implementations.
// Implementors of this interface must closely adhere to the documented
// behavior of all interface methods in order to ensure identical behavior
// across all concrete implementations.
type WalletController interface {
// ConfirmedBalance returns the sum of all the wallet's unspent outputs
// that have at least confs confirmations. If confs is set to zero,

@ -9,8 +9,7 @@ import (
"sync/atomic"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntfs"
"github.com/lightningnetwork/lnd/chainntfs/btcdnotify"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/elkrem"
"github.com/roasbeef/btcd/btcjson"
@ -18,7 +17,6 @@ import (
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcrpcclient"
"github.com/roasbeef/btcutil"
"github.com/roasbeef/btcutil/coinset"
"github.com/roasbeef/btcutil/txsort"
@ -234,8 +232,7 @@ type LightningWallet struct {
// Used by in order to obtain notifications about funding transaction
// reaching a specified confirmation depth, and to catch
// counterparty's broadcasting revoked commitment states.
// TODO(roasbeef): needs to be stripped out from wallet
ChainNotifier chainntnfs.ChainNotifier
chainNotifier chainntnfs.ChainNotifier
// The core wallet, all non Lightning Network specific interaction is
// proxied to the internal wallet.
@ -279,7 +276,12 @@ type LightningWallet struct {
// NewLightningWallet creates/opens and initializes a LightningWallet instance.
// If the wallet has never been created (according to the passed dataDir), first-time
// setup is executed.
func NewLightningWallet(config *Config, cdb *channeldb.DB) (*LightningWallet, error) {
//
// NOTE: The passed channeldb, and ChainNotifier should already be fully
// initialized/started before being passed as a function arugment.
func NewLightningWallet(config *Config, cdb *channeldb.DB,
notifier chainntnfs.ChainNotifier) (*LightningWallet, error) {
// Ensure the wallet exists or create it when the create flag is set.
netDir := networkDir(config.DataDir, config.NetParams)
@ -344,26 +346,8 @@ func NewLightningWallet(config *Config, cdb *channeldb.DB) (*LightningWallet, er
return nil, err
}
// Using the same authentication info, create a config for a second
// rpcclient which will be used by the current default chain
// notifier implemenation.
rpcConfig := &btcrpcclient.ConnConfig{
Host: config.RpcHost,
Endpoint: "ws",
User: config.RpcUser,
Pass: config.RpcPass,
Certificates: config.CACert,
DisableTLS: false,
DisableConnectOnNew: true,
DisableAutoReconnect: false,
}
chainNotifier, err := btcdnotify.NewBtcdNotifier(rpcConfig)
if err != nil {
return nil, err
}
return &LightningWallet{
ChainNotifier: chainNotifier,
chainNotifier: notifier,
rpc: rpcc,
Wallet: wallet,
channelDB: cdb,
@ -393,17 +377,8 @@ func (l *LightningWallet) Startup() error {
}
l.Start()
// Start the notification server. This is used so channel managment
// goroutines can be notified when a funding transaction reaches a
// sufficient number of confirmations, or when the input for the funding
// transaction is spent in an attempt at an uncooperative close by the
// counter party.
if err := l.ChainNotifier.Start(); err != nil {
return err
}
// Pass the rpc client into the wallet so it can sync up to the current
// main chain.
// Pass the rpc client into the wallet so it can sync up to the
// current main chain.
l.SynchronizeRPC(l.rpc)
l.wg.Add(1)
@ -426,8 +401,6 @@ func (l *LightningWallet) Shutdown() error {
l.rpc.Shutdown()
l.ChainNotifier.Stop()
close(l.quit)
l.wg.Wait()
return nil
@ -1251,7 +1224,7 @@ func (l *LightningWallet) handleChannelOpen(req *channelOpenMsg) {
// Finally, create and officially open the payment channel!
// TODO(roasbeef): CreationTime once tx is 'open'
channel, _ := NewLightningChannel(l, l.ChainNotifier, l.channelDB,
channel, _ := NewLightningChannel(l, l.chainNotifier, l.channelDB,
res.partialState)
res.chanOpen <- channel
@ -1266,7 +1239,7 @@ func (l *LightningWallet) openChannelAfterConfirmations(res *ChannelReservation)
// transaction reaches `numConfs` confirmations.
txid := res.fundingTx.TxSha()
numConfs := uint32(res.numConfsToOpen)
confNtfn, _ := l.ChainNotifier.RegisterConfirmationsNtfn(&txid, numConfs)
confNtfn, _ := l.chainNotifier.RegisterConfirmationsNtfn(&txid, numConfs)
walletLog.Infof("Waiting for funding tx (txid: %v) to reach %v confirmations",
txid, numConfs)
@ -1293,7 +1266,7 @@ out:
// Finally, create and officially open the payment channel!
// TODO(roasbeef): CreationTime once tx is 'open'
channel, _ := NewLightningChannel(l, l.ChainNotifier, l.channelDB,
channel, _ := NewLightningChannel(l, l.chainNotifier, l.channelDB,
res.partialState)
res.chanOpen <- channel
}

@ -11,6 +11,7 @@ import (
"time"
"github.com/boltdb/bolt"
"github.com/lightningnetwork/lnd/chainntnfs/btcdnotify"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/roasbeef/btcd/chaincfg"
"github.com/roasbeef/btcutil/txsort"
@ -338,7 +339,15 @@ func createTestWallet(miningNode *rpctest.Harness, netParams *chaincfg.Params) (
return "", nil, err
}
wallet, err := NewLightningWallet(config, cdb)
chainNotifier, err := btcdnotify.New(&rpcConfig)
if err != nil {
return "", nil, err
}
if err := chainNotifier.Start(); err != nil {
return "", nil, err
}
wallet, err := NewLightningWallet(config, cdb, chainNotifier)
if err != nil {
return "", nil, err
}

2
log.go

@ -6,7 +6,7 @@ import (
"github.com/btcsuite/btclog"
"github.com/btcsuite/seelog"
"github.com/lightningnetwork/lnd/chainntfs"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
)

@ -218,7 +218,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
for _, dbChan := range chans {
chanID := dbChan.ChanID
lnChan, err := lnwallet.NewLightningChannel(p.server.lnwallet,
p.server.lnwallet.ChainNotifier, p.server.chanDB, dbChan)
p.server.chainNotifier, p.server.chanDB, dbChan)
if err != nil {
return err
}
@ -673,8 +673,7 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
// confirmation.
go func() {
// TODO(roasbeef): add param for num needed confs
notifier := p.server.lnwallet.ChainNotifier
confNtfn, err := notifier.RegisterConfirmationsNtfn(txid, 1)
confNtfn, err := p.server.chainNotifier.RegisterConfirmationsNtfn(txid, 1)
if err != nil {
req.err <- err
return

@ -8,6 +8,7 @@ import (
"sync/atomic"
"github.com/btcsuite/fastsha256"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lndc"
"github.com/lightningnetwork/lnd/lnrpc"
@ -41,7 +42,7 @@ type server struct {
peers map[int32]*peer
rpcServer *rpcServer
// TODO(roasbeef): add chan notifier also
chainNotifier chainntnfs.ChainNotifier
lnwallet *lnwallet.LightningWallet
// TODO(roasbeef): add to constructor
@ -63,8 +64,8 @@ type server struct {
// newServer creates a new instance of the server which is to listen using the
// passed listener address.
func newServer(listenAddrs []string, wallet *lnwallet.LightningWallet,
chanDB *channeldb.DB) (*server, error) {
func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
wallet *lnwallet.LightningWallet, chanDB *channeldb.DB) (*server, error) {
privKey, err := getIdentityPrivKey(chanDB, wallet)
if err != nil {
@ -81,6 +82,7 @@ func newServer(listenAddrs []string, wallet *lnwallet.LightningWallet,
serializedPubKey := privKey.PubKey().SerializeCompressed()
s := &server{
chainNotifier: notifier,
chanDB: chanDB,
fundingMgr: newFundingManager(wallet),
htlcSwitch: newHtlcSwitch(),
@ -110,10 +112,10 @@ func newServer(listenAddrs []string, wallet *lnwallet.LightningWallet,
// Start starts the main daemon server, all requested listeners, and any helper
// goroutines.
func (s *server) Start() {
func (s *server) Start() error {
// Already running?
if atomic.AddInt32(&s.started, 1) != 1 {
return
return nil
}
// Start all the listeners.
@ -122,12 +124,30 @@ func (s *server) Start() {
go s.listener(l)
}
s.fundingMgr.Start()
s.htlcSwitch.Start()
// Start the notification server. This is used so channel managment
// goroutines can be notified when a funding transaction reaches a
// sufficient number of confirmations, or when the input for the
// funding transaction is spent in an attempt at an uncooperative
// close by the counter party.
if err := s.chainNotifier.Start(); err != nil {
return err
}
if err := s.rpcServer.Start(); err != nil {
return err
}
if err := s.fundingMgr.Start(); err != nil {
return err
}
if err := s.htlcSwitch.Start(); err != nil {
return err
}
s.routingMgr.Start()
s.wg.Add(1)
go s.queryHandler()
return nil
}
// Stop gracefully shutsdown the main daemon server. This function will signal
@ -146,10 +166,14 @@ func (s *server) Stop() error {
}
}
// Shutdown the wallet, funding manager, and the rpc server.
s.chainNotifier.Stop()
s.rpcServer.Stop()
s.lnwallet.Shutdown()
s.fundingMgr.Stop()
s.routingMgr.Stop()
s.htlcSwitch.Stop()
s.lnwallet.Shutdown()
// Signal all the lingering goroutines to quit.
close(s.quit)