lnd: add new channelNotifier impl of chanbackup.ChannelNotifier

In this commit, we add the channelNotifier, and implementation fo the
chanbackup.ChannelNotifier interface. This will be our bridge from the
channelnotifier.ChannelNotifier sturct within lnd, and the interface
abstraction that the chanbackup.SubSwapper accpets. The role of this new
struct is simple: proxy the messages from the
channelnotifier.ChannelNotifier to the chanbackup.SubSwapper in a format
that it understands.

Along the way we introduce a tiny interface such that we don't need to
depend on the the channledb package.
This commit is contained in:
Olaoluwa Osuntokun 2019-02-08 19:34:40 -08:00
parent af1dfe176e
commit 411000dbbc
No known key found for this signature in database
GPG Key ID: CE58F7F8E20FD9A2

145
channel_notifier.go Normal file

@ -0,0 +1,145 @@
package main
import (
"fmt"
"net"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/channelnotifier"
)
// addrSource is an interface that allow us to get the addresses for a target
// node. We'll need this in order to be able to properly proxy the
// notifications to create SCBs.
type addrSource interface {
// AddrsForNode returns all known addresses for the target node public
// key.
AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error)
}
// channelNotifier is an implementation of the chanbackup.ChannelNotifier
// interface using the existing channelnotifier.ChannelNotifier struct. This
// implementation allows us to satisfy all the dependencies of the
// chanbackup.SubSwapper struct.
type channelNotifier struct {
// chanNotifier is the based channel notifier that we'll proxy requests
// from.
chanNotifier *channelnotifier.ChannelNotifier
// addrs is an implementation of the addrSource interface that allows
// us to get the latest set of addresses for a given node. We'll need
// this to be able to create an SCB for new channels.
addrs addrSource
}
// SubscribeChans requests a new channel subscription relative to the initial
// set of known channels. We use the knownChans as a synchronization point to
// ensure that the chanbackup.SubSwapper does not miss any channel open or
// close events in the period between when it's created, and when it requests
// the channel subscription.
//
// NOTE: This is part of the chanbackup.ChannelNotifier interface.
func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{}) (
*chanbackup.ChannelSubscription, error) {
ltndLog.Infof("Channel backup proxy channel notifier starting")
// TODO(roasbeef): read existing set of chans and diff
quit := make(chan struct{})
chanUpdates := make(chan chanbackup.ChannelEvent, 1)
// In order to adhere to the interface, we'll proxy the events from the
// channel notifier to the sub-swapper in a format it understands.
go func() {
// First, we'll subscribe to the primary channel notifier so we can
// obtain events for new opened/closed channels.
chanSubscription, err := c.chanNotifier.SubscribeChannelEvents()
if err != nil {
panic(fmt.Sprintf("unable to subscribe to chans: %v",
err))
}
defer chanSubscription.Cancel()
for {
select {
// A new event has been sent by the chanNotifier, we'll
// filter out the events we actually care about and
// send them to the sub-swapper.
case e := <-chanSubscription.Updates():
// TODO(roasbeef): batch dispatch ntnfs
switch event := e.(type) {
// A new channel has been opened, we'll obtain
// the node address, then send to the
// sub-swapper.
case channelnotifier.OpenChannelEvent:
nodeAddrs, err := c.addrs.AddrsForNode(
event.Channel.IdentityPub,
)
if err != nil {
pub := event.Channel.IdentityPub
ltndLog.Errorf("unable to "+
"fetch addrs for %x: %v",
pub.SerializeCompressed(),
err)
}
channel := event.Channel
chanEvent := chanbackup.ChannelEvent{
NewChans: []chanbackup.ChannelWithAddrs{
{
OpenChannel: channel,
Addrs: nodeAddrs,
},
},
}
select {
case chanUpdates <- chanEvent:
case <-quit:
return
}
// An existing channel has been closed, we'll
// send only the chanPoint of the closed
// channel to the sub-swapper.
case channelnotifier.ClosedChannelEvent:
chanPoint := event.CloseSummary.ChanPoint
chanEvent := chanbackup.ChannelEvent{
ClosedChans: []wire.OutPoint{
chanPoint,
},
}
select {
case chanUpdates <- chanEvent:
case <-quit:
return
}
}
// The cancel method has been called, signalling us to
// exit
case <-quit:
return
}
}
}()
return &chanbackup.ChannelSubscription{
ChanUpdates: chanUpdates,
Cancel: func() {
close(quit)
},
}, nil
}
// A compile-time constraint to ensure channelNotifier implements
// chanbackup.ChannelNotifier.
var _ chanbackup.ChannelNotifier = (*channelNotifier)(nil)