Merge pull request #1902 from cfromknecht/move-chan-series-to-discovery

Move chan series to discovery
This commit is contained in:
Olaoluwa Osuntokun 2018-11-30 18:20:09 -08:00 committed by GitHub
commit ec01bebb81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 74 deletions

@ -1,31 +1,88 @@
package main package discovery
import ( import (
"time" "time"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing"
) )
// chanSeries is an implementation of the discovery.ChannelGraphTimeSeries // ChannelGraphTimeSeries is an interface that provides time and block based
// querying into our view of the channel graph. New channels will have
// monotonically increasing block heights, and new channel updates will have
// increasing timestamps. Once we connect to a peer, we'll use the methods in
// this interface to determine if we're already in sync, or need to request
// some new information from them.
type ChannelGraphTimeSeries interface {
// HighestChanID should return the channel ID of the channel we know of
// that's furthest in the target chain. This channel will have a block
// height that's close to the current tip of the main chain as we
// know it. We'll use this to start our QueryChannelRange dance with
// the remote node.
HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error)
// UpdatesInHorizon returns all known channel and node updates with an
// update timestamp between the start time and end time. We'll use this
// to catch up a remote node to the set of channel updates that they
// may have missed out on within the target chain.
UpdatesInHorizon(chain chainhash.Hash,
startTime time.Time, endTime time.Time) ([]lnwire.Message, error)
// FilterKnownChanIDs takes a target chain, and a set of channel ID's,
// and returns a filtered set of chan ID's. This filtered set of chan
// ID's represents the ID's that we don't know of which were in the
// passed superSet.
FilterKnownChanIDs(chain chainhash.Hash,
superSet []lnwire.ShortChannelID) ([]lnwire.ShortChannelID, error)
// FilterChannelRange returns the set of channels that we created
// between the start height and the end height. We'll use this to to a
// remote peer's QueryChannelRange message.
FilterChannelRange(chain chainhash.Hash,
startHeight, endHeight uint32) ([]lnwire.ShortChannelID, error)
// FetchChanAnns returns a full set of channel announcements as well as
// their updates that match the set of specified short channel ID's.
// We'll use this to reply to a QueryShortChanIDs message sent by a
// remote peer. The response will contain a unique set of
// ChannelAnnouncements, the latest ChannelUpdate for each of the
// announcements, and a unique set of NodeAnnouncements.
FetchChanAnns(chain chainhash.Hash,
shortChanIDs []lnwire.ShortChannelID) ([]lnwire.Message, error)
// FetchChanUpdates returns the latest channel update messages for the
// specified short channel ID. If no channel updates are known for the
// channel, then an empty slice will be returned.
FetchChanUpdates(chain chainhash.Hash,
shortChanID lnwire.ShortChannelID) ([]*lnwire.ChannelUpdate, error)
}
// ChanSeries is an implementation of the ChannelGraphTimeSeries
// interface backed by the channeldb ChannelGraph database. We'll provide this // interface backed by the channeldb ChannelGraph database. We'll provide this
// implementation to the AuthenticatedGossiper so it can properly use the // implementation to the AuthenticatedGossiper so it can properly use the
// in-protocol channel range queries to quickly and efficiently synchronize our // in-protocol channel range queries to quickly and efficiently synchronize our
// channel state with all peers. // channel state with all peers.
type chanSeries struct { type ChanSeries struct {
graph *channeldb.ChannelGraph graph *channeldb.ChannelGraph
} }
// NewChanSeries constructs a new ChanSeries backed by a channeldb.ChannelGraph.
// The returned ChanSeries implements the ChannelGraphTimeSeries interface.
func NewChanSeries(graph *channeldb.ChannelGraph) *ChanSeries {
return &ChanSeries{
graph: graph,
}
}
// HighestChanID should return is the channel ID of the channel we know of // HighestChanID should return is the channel ID of the channel we know of
// that's furthest in the target chain. This channel will have a block height // that's furthest in the target chain. This channel will have a block height
// that's close to the current tip of the main chain as we know it. We'll use // that's close to the current tip of the main chain as we know it. We'll use
// this to start our QueryChannelRange dance with the remote node. // this to start our QueryChannelRange dance with the remote node.
// //
// NOTE: This is part of the discovery.ChannelGraphTimeSeries interface. // NOTE: This is part of the ChannelGraphTimeSeries interface.
func (c *chanSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) { func (c *ChanSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) {
chanID, err := c.graph.HighestChanID() chanID, err := c.graph.HighestChanID()
if err != nil { if err != nil {
return nil, err return nil, err
@ -40,8 +97,8 @@ func (c *chanSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID
// remote node to the set of channel updates that they may have missed out on // remote node to the set of channel updates that they may have missed out on
// within the target chain. // within the target chain.
// //
// NOTE: This is part of the discovery.ChannelGraphTimeSeries interface. // NOTE: This is part of the ChannelGraphTimeSeries interface.
func (c *chanSeries) UpdatesInHorizon(chain chainhash.Hash, func (c *ChanSeries) UpdatesInHorizon(chain chainhash.Hash,
startTime time.Time, endTime time.Time) ([]lnwire.Message, error) { startTime time.Time, endTime time.Time) ([]lnwire.Message, error) {
var updates []lnwire.Message var updates []lnwire.Message
@ -62,7 +119,7 @@ func (c *chanSeries) UpdatesInHorizon(chain chainhash.Hash,
continue continue
} }
chanAnn, edge1, edge2, err := discovery.CreateChanAnnouncement( chanAnn, edge1, edge2, err := CreateChanAnnouncement(
channel.Info.AuthProof, channel.Info, channel.Policy1, channel.Info.AuthProof, channel.Info, channel.Policy1,
channel.Policy2, channel.Policy2,
) )
@ -93,13 +150,13 @@ func (c *chanSeries) UpdatesInHorizon(chain chainhash.Hash,
// prevent leaking information about nodes. // prevent leaking information about nodes.
isNodePublic, err := c.graph.IsPublicNode(nodeAnn.PubKeyBytes) isNodePublic, err := c.graph.IsPublicNode(nodeAnn.PubKeyBytes)
if err != nil { if err != nil {
srvrLog.Errorf("Unable to determine if node %x is "+ log.Errorf("Unable to determine if node %x is "+
"advertised: %v", nodeAnn.PubKeyBytes, err) "advertised: %v", nodeAnn.PubKeyBytes, err)
continue continue
} }
if !isNodePublic { if !isNodePublic {
srvrLog.Tracef("Skipping forwarding announcement for "+ log.Tracef("Skipping forwarding announcement for "+
"node %x due to being unadvertised", "node %x due to being unadvertised",
nodeAnn.PubKeyBytes) nodeAnn.PubKeyBytes)
continue continue
@ -120,8 +177,8 @@ func (c *chanSeries) UpdatesInHorizon(chain chainhash.Hash,
// returns a filtered set of chan ID's. This filtered set of chan ID's // returns a filtered set of chan ID's. This filtered set of chan ID's
// represents the ID's that we don't know of which were in the passed superSet. // represents the ID's that we don't know of which were in the passed superSet.
// //
// NOTE: This is part of the discovery.ChannelGraphTimeSeries interface. // NOTE: This is part of the ChannelGraphTimeSeries interface.
func (c *chanSeries) FilterKnownChanIDs(chain chainhash.Hash, func (c *ChanSeries) FilterKnownChanIDs(chain chainhash.Hash,
superSet []lnwire.ShortChannelID) ([]lnwire.ShortChannelID, error) { superSet []lnwire.ShortChannelID) ([]lnwire.ShortChannelID, error) {
chanIDs := make([]uint64, 0, len(superSet)) chanIDs := make([]uint64, 0, len(superSet))
@ -148,8 +205,8 @@ func (c *chanSeries) FilterKnownChanIDs(chain chainhash.Hash,
// start height and the end height. We'll use this respond to a remote peer's // start height and the end height. We'll use this respond to a remote peer's
// QueryChannelRange message. // QueryChannelRange message.
// //
// NOTE: This is part of the discovery.ChannelGraphTimeSeries interface. // NOTE: This is part of the ChannelGraphTimeSeries interface.
func (c *chanSeries) FilterChannelRange(chain chainhash.Hash, func (c *ChanSeries) FilterChannelRange(chain chainhash.Hash,
startHeight, endHeight uint32) ([]lnwire.ShortChannelID, error) { startHeight, endHeight uint32) ([]lnwire.ShortChannelID, error) {
chansInRange, err := c.graph.FilterChannelRange(startHeight, endHeight) chansInRange, err := c.graph.FilterChannelRange(startHeight, endHeight)
@ -173,8 +230,8 @@ func (c *chanSeries) FilterChannelRange(chain chainhash.Hash,
// will contain a unique set of ChannelAnnouncements, the latest ChannelUpdate // will contain a unique set of ChannelAnnouncements, the latest ChannelUpdate
// for each of the announcements, and a unique set of NodeAnnouncements. // for each of the announcements, and a unique set of NodeAnnouncements.
// //
// NOTE: This is part of the discovery.ChannelGraphTimeSeries interface. // NOTE: This is part of the ChannelGraphTimeSeries interface.
func (c *chanSeries) FetchChanAnns(chain chainhash.Hash, func (c *ChanSeries) FetchChanAnns(chain chainhash.Hash,
shortChanIDs []lnwire.ShortChannelID) ([]lnwire.Message, error) { shortChanIDs []lnwire.ShortChannelID) ([]lnwire.Message, error) {
chanIDs := make([]uint64, 0, len(shortChanIDs)) chanIDs := make([]uint64, 0, len(shortChanIDs))
@ -201,7 +258,7 @@ func (c *chanSeries) FetchChanAnns(chain chainhash.Hash,
continue continue
} }
chanAnn, edge1, edge2, err := discovery.CreateChanAnnouncement( chanAnn, edge1, edge2, err := CreateChanAnnouncement(
channel.Info.AuthProof, channel.Info, channel.Policy1, channel.Info.AuthProof, channel.Info, channel.Policy1,
channel.Policy2, channel.Policy2,
) )
@ -253,8 +310,8 @@ func (c *chanSeries) FetchChanAnns(chain chainhash.Hash,
// specified short channel ID. If no channel updates are known for the channel, // specified short channel ID. If no channel updates are known for the channel,
// then an empty slice will be returned. // then an empty slice will be returned.
// //
// NOTE: This is part of the discovery.ChannelGraphTimeSeries interface. // NOTE: This is part of the ChannelGraphTimeSeries interface.
func (c *chanSeries) FetchChanUpdates(chain chainhash.Hash, func (c *ChanSeries) FetchChanUpdates(chain chainhash.Hash,
shortChanID lnwire.ShortChannelID) ([]*lnwire.ChannelUpdate, error) { shortChanID lnwire.ShortChannelID) ([]*lnwire.ChannelUpdate, error) {
chanInfo, e1, e2, err := c.graph.FetchChannelEdgesByID( chanInfo, e1, e2, err := c.graph.FetchChannelEdgesByID(
@ -307,6 +364,6 @@ func (c *chanSeries) FetchChanUpdates(chain chainhash.Hash,
return chanUpdates, nil return chanUpdates, nil
} }
// A compile-time assertion to ensure that chanSeries meets the // A compile-time assertion to ensure that ChanSeries meets the
// discovery.ChannelGraphTimeSeries interface. // ChannelGraphTimeSeries interface.
var _ discovery.ChannelGraphTimeSeries = (*chanSeries)(nil) var _ ChannelGraphTimeSeries = (*ChanSeries)(nil)

@ -107,56 +107,6 @@ const (
chanRangeQueryBuffer = 144 chanRangeQueryBuffer = 144
) )
// ChannelGraphTimeSeries is an interface that provides time and block based
// querying into our view of the channel graph. New channels will have
// monotonically increasing block heights, and new channel updates will have
// increasing timestamps. Once we connect to a peer, we'll use the methods in
// this interface to determine if we're already in sync, or need to request
// some new information from them.
type ChannelGraphTimeSeries interface {
// HighestChanID should return the channel ID of the channel we know of
// that's furthest in the target chain. This channel will have a block
// height that's close to the current tip of the main chain as we
// know it. We'll use this to start our QueryChannelRange dance with
// the remote node.
HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error)
// UpdatesInHorizon returns all known channel and node updates with an
// update timestamp between the start time and end time. We'll use this
// to catch up a remote node to the set of channel updates that they
// may have missed out on within the target chain.
UpdatesInHorizon(chain chainhash.Hash,
startTime time.Time, endTime time.Time) ([]lnwire.Message, error)
// FilterKnownChanIDs takes a target chain, and a set of channel ID's,
// and returns a filtered set of chan ID's. This filtered set of chan
// ID's represents the ID's that we don't know of which were in the
// passed superSet.
FilterKnownChanIDs(chain chainhash.Hash,
superSet []lnwire.ShortChannelID) ([]lnwire.ShortChannelID, error)
// FilterChannelRange returns the set of channels that we created
// between the start height and the end height. We'll use this to to a
// remote peer's QueryChannelRange message.
FilterChannelRange(chain chainhash.Hash,
startHeight, endHeight uint32) ([]lnwire.ShortChannelID, error)
// FetchChanAnns returns a full set of channel announcements as well as
// their updates that match the set of specified short channel ID's.
// We'll use this to reply to a QueryShortChanIDs message sent by a
// remote peer. The response will contain a unique set of
// ChannelAnnouncements, the latest ChannelUpdate for each of the
// announcements, and a unique set of NodeAnnouncements.
FetchChanAnns(chain chainhash.Hash,
shortChanIDs []lnwire.ShortChannelID) ([]lnwire.Message, error)
// FetchChanUpdates returns the latest channel update messages for the
// specified short channel ID. If no channel updates are known for the
// channel, then an empty slice will be returned.
FetchChanUpdates(chain chainhash.Hash,
shortChanID lnwire.ShortChannelID) ([]*lnwire.ChannelUpdate, error)
}
// gossipSyncerCfg is a struct that packages all the information a gossipSyncer // gossipSyncerCfg is a struct that packages all the information a gossipSyncer
// needs to carry out its duties. // needs to carry out its duties.
type gossipSyncerCfg struct { type gossipSyncerCfg struct {

@ -564,12 +564,16 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
return nil, fmt.Errorf("can't create router: %v", err) return nil, fmt.Errorf("can't create router: %v", err)
} }
chanSeries := discovery.NewChanSeries(
s.chanDB.ChannelGraph(),
)
s.authGossiper, err = discovery.New(discovery.Config{ s.authGossiper, err = discovery.New(discovery.Config{
Router: s.chanRouter, Router: s.chanRouter,
Notifier: s.cc.chainNotifier, Notifier: s.cc.chainNotifier,
ChainHash: *activeNetParams.GenesisHash, ChainHash: *activeNetParams.GenesisHash,
Broadcast: s.BroadcastMessage, Broadcast: s.BroadcastMessage,
ChanSeries: &chanSeries{s.chanDB.ChannelGraph()}, ChanSeries: chanSeries,
SendToPeer: s.SendToPeer, SendToPeer: s.SendToPeer,
FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) { FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) {
return s.FindPeer(pub) return s.FindPeer(pub)