From 411000dbbc3959bc3a53897bafdba251e327230f Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 8 Feb 2019 19:34:40 -0800 Subject: [PATCH] lnd: add new channelNotifier impl of chanbackup.ChannelNotifier In this commit, we add the channelNotifier, and implementation fo the chanbackup.ChannelNotifier interface. This will be our bridge from the channelnotifier.ChannelNotifier sturct within lnd, and the interface abstraction that the chanbackup.SubSwapper accpets. The role of this new struct is simple: proxy the messages from the channelnotifier.ChannelNotifier to the chanbackup.SubSwapper in a format that it understands. Along the way we introduce a tiny interface such that we don't need to depend on the the channledb package. --- channel_notifier.go | 145 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 channel_notifier.go diff --git a/channel_notifier.go b/channel_notifier.go new file mode 100644 index 00000000..c85f94e2 --- /dev/null +++ b/channel_notifier.go @@ -0,0 +1,145 @@ +package main + +import ( + "fmt" + "net" + + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/chanbackup" + "github.com/lightningnetwork/lnd/channelnotifier" +) + +// addrSource is an interface that allow us to get the addresses for a target +// node. We'll need this in order to be able to properly proxy the +// notifications to create SCBs. +type addrSource interface { + // AddrsForNode returns all known addresses for the target node public + // key. + AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) +} + +// channelNotifier is an implementation of the chanbackup.ChannelNotifier +// interface using the existing channelnotifier.ChannelNotifier struct. This +// implementation allows us to satisfy all the dependencies of the +// chanbackup.SubSwapper struct. +type channelNotifier struct { + // chanNotifier is the based channel notifier that we'll proxy requests + // from. + chanNotifier *channelnotifier.ChannelNotifier + + // addrs is an implementation of the addrSource interface that allows + // us to get the latest set of addresses for a given node. We'll need + // this to be able to create an SCB for new channels. + addrs addrSource +} + +// SubscribeChans requests a new channel subscription relative to the initial +// set of known channels. We use the knownChans as a synchronization point to +// ensure that the chanbackup.SubSwapper does not miss any channel open or +// close events in the period between when it's created, and when it requests +// the channel subscription. +// +// NOTE: This is part of the chanbackup.ChannelNotifier interface. +func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{}) ( + *chanbackup.ChannelSubscription, error) { + + ltndLog.Infof("Channel backup proxy channel notifier starting") + + // TODO(roasbeef): read existing set of chans and diff + + quit := make(chan struct{}) + chanUpdates := make(chan chanbackup.ChannelEvent, 1) + + // In order to adhere to the interface, we'll proxy the events from the + // channel notifier to the sub-swapper in a format it understands. + go func() { + // First, we'll subscribe to the primary channel notifier so we can + // obtain events for new opened/closed channels. + chanSubscription, err := c.chanNotifier.SubscribeChannelEvents() + if err != nil { + panic(fmt.Sprintf("unable to subscribe to chans: %v", + err)) + } + + defer chanSubscription.Cancel() + + for { + select { + + // A new event has been sent by the chanNotifier, we'll + // filter out the events we actually care about and + // send them to the sub-swapper. + case e := <-chanSubscription.Updates(): + // TODO(roasbeef): batch dispatch ntnfs + + switch event := e.(type) { + + // A new channel has been opened, we'll obtain + // the node address, then send to the + // sub-swapper. + case channelnotifier.OpenChannelEvent: + nodeAddrs, err := c.addrs.AddrsForNode( + event.Channel.IdentityPub, + ) + if err != nil { + pub := event.Channel.IdentityPub + ltndLog.Errorf("unable to "+ + "fetch addrs for %x: %v", + pub.SerializeCompressed(), + err) + } + + channel := event.Channel + chanEvent := chanbackup.ChannelEvent{ + NewChans: []chanbackup.ChannelWithAddrs{ + { + OpenChannel: channel, + Addrs: nodeAddrs, + }, + }, + } + + select { + case chanUpdates <- chanEvent: + case <-quit: + return + } + + // An existing channel has been closed, we'll + // send only the chanPoint of the closed + // channel to the sub-swapper. + case channelnotifier.ClosedChannelEvent: + chanPoint := event.CloseSummary.ChanPoint + chanEvent := chanbackup.ChannelEvent{ + ClosedChans: []wire.OutPoint{ + chanPoint, + }, + } + + select { + case chanUpdates <- chanEvent: + case <-quit: + return + } + } + + // The cancel method has been called, signalling us to + // exit + case <-quit: + return + } + } + }() + + return &chanbackup.ChannelSubscription{ + ChanUpdates: chanUpdates, + Cancel: func() { + close(quit) + }, + }, nil +} + +// A compile-time constraint to ensure channelNotifier implements +// chanbackup.ChannelNotifier. +var _ chanbackup.ChannelNotifier = (*channelNotifier)(nil)