multi: move active/inactive ntfns from switch to link

Since we will now wait to deliver the event after channel reestablish,
notifying when the link is added to the switch will no longer be
sufficient. Later, we will add receiving reestablish as an additional
requirement for EligibleToForward returning true.

The inactive ntfn is also moved, to ensure that we don't fire inactive
notifications if no corresponding active notification was sent.
This commit is contained in:
Conner Fromknecht 2019-09-19 12:46:44 -07:00
parent 1d41d4d666
commit 6dca07577d
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
7 changed files with 28 additions and 21 deletions

@ -262,6 +262,14 @@ type ChannelLinkConfig struct {
// commitment fee to be of its balance. This only applies to the // commitment fee to be of its balance. This only applies to the
// initiator of the channel. // initiator of the channel.
MaxFeeAllocation float64 MaxFeeAllocation float64
// NotifyActiveChannel allows the link to tell the ChannelNotifier when
// channels becomes active.
NotifyActiveChannel func(wire.OutPoint)
// NotifyInactiveChannel allows the switch to tell the ChannelNotifier
// when channels become inactive.
NotifyInactiveChannel func(wire.OutPoint)
} }
// channelLink is the service which drives a channel's commitment update // channelLink is the service which drives a channel's commitment update
@ -870,6 +878,14 @@ func (l *channelLink) htlcManager() {
log.Infof("HTLC manager for ChannelPoint(%v) started, "+ log.Infof("HTLC manager for ChannelPoint(%v) started, "+
"bandwidth=%v", l.channel.ChannelPoint(), l.Bandwidth()) "bandwidth=%v", l.channel.ChannelPoint(), l.Bandwidth())
// Funding locked has already been received, so we'll go ahead and
// deliver the active channel notification since EligibleToForward
// returns true now that the link has been added to the switch. We'll
// also defer the inactive notification for when the link exits to
// ensure that every active notification is matched by an inactive one.
l.cfg.NotifyActiveChannel(*l.ChannelPoint())
defer l.cfg.NotifyInactiveChannel(*l.ChannelPoint())
// TODO(roasbeef): need to call wipe chan whenever D/C? // TODO(roasbeef): need to call wipe chan whenever D/C?
// If this isn't the first time that this channel link has been // If this isn't the first time that this channel link has been

@ -1688,6 +1688,8 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
MaxFeeUpdateTimeout: 40 * time.Minute, MaxFeeUpdateTimeout: 40 * time.Minute,
MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry,
MaxFeeAllocation: DefaultMaxLinkFeeAllocation, MaxFeeAllocation: DefaultMaxLinkFeeAllocation,
NotifyActiveChannel: func(wire.OutPoint) {},
NotifyInactiveChannel: func(wire.OutPoint) {},
} }
aliceLink := NewChannelLink(aliceCfg, aliceLc.channel) aliceLink := NewChannelLink(aliceCfg, aliceLc.channel)
@ -4249,6 +4251,8 @@ func (h *persistentLinkHarness) restartLink(
HodlMask: hodl.MaskFromFlags(hodlFlags...), HodlMask: hodl.MaskFromFlags(hodlFlags...),
MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry,
MaxFeeAllocation: DefaultMaxLinkFeeAllocation, MaxFeeAllocation: DefaultMaxLinkFeeAllocation,
NotifyActiveChannel: func(wire.OutPoint) {},
NotifyInactiveChannel: func(wire.OutPoint) {},
} }
aliceLink := NewChannelLink(aliceCfg, aliceChannel) aliceLink := NewChannelLink(aliceCfg, aliceChannel)

@ -171,12 +171,10 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error)
FetchLastChannelUpdate: func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) { FetchLastChannelUpdate: func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) {
return nil, nil return nil, nil
}, },
Notifier: &mockNotifier{}, Notifier: &mockNotifier{},
FwdEventTicker: ticker.NewForce(DefaultFwdEventInterval), FwdEventTicker: ticker.NewForce(DefaultFwdEventInterval),
LogEventTicker: ticker.NewForce(DefaultLogInterval), LogEventTicker: ticker.NewForce(DefaultLogInterval),
AckEventTicker: ticker.NewForce(DefaultAckInterval), AckEventTicker: ticker.NewForce(DefaultAckInterval),
NotifyActiveChannel: func(wire.OutPoint) {},
NotifyInactiveChannel: func(wire.OutPoint) {},
} }
return New(cfg, startingHeight) return New(cfg, startingHeight)

@ -163,11 +163,6 @@ type Config struct {
// fails in forwarding packages. // fails in forwarding packages.
AckEventTicker ticker.Ticker AckEventTicker ticker.Ticker
// NotifyActiveChannel and NotifyInactiveChannel allow the link to tell
// the ChannelNotifier when channels become active and inactive.
NotifyActiveChannel func(wire.OutPoint)
NotifyInactiveChannel func(wire.OutPoint)
// RejectHTLC is a flag that instructs the htlcswitch to reject any // RejectHTLC is a flag that instructs the htlcswitch to reject any
// HTLCs that are not from the source hop. // HTLCs that are not from the source hop.
RejectHTLC bool RejectHTLC bool
@ -2012,11 +2007,6 @@ func (s *Switch) addLiveLink(link ChannelLink) {
s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink) s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink)
} }
s.interfaceIndex[peerPub][link.ChanID()] = link s.interfaceIndex[peerPub][link.ChanID()] = link
// Inform the channel notifier if the link has become active.
if link.EligibleToForward() {
s.cfg.NotifyActiveChannel(*link.ChannelPoint())
}
} }
// GetLink is used to initiate the handling of the get link command. The // GetLink is used to initiate the handling of the get link command. The
@ -2092,9 +2082,6 @@ func (s *Switch) removeLink(chanID lnwire.ChannelID) ChannelLink {
return nil return nil
} }
// Inform the Channel Notifier about the link becoming inactive.
s.cfg.NotifyInactiveChannel(*link.ChannelPoint())
// Remove the channel from live link indexes. // Remove the channel from live link indexes.
delete(s.pendingLinkIndex, link.ChanID()) delete(s.pendingLinkIndex, link.ChanID())
delete(s.linkIndex, link.ChanID()) delete(s.linkIndex, link.ChanID())

@ -1122,6 +1122,8 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
OutgoingCltvRejectDelta: 3, OutgoingCltvRejectDelta: 3,
MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry,
MaxFeeAllocation: DefaultMaxLinkFeeAllocation, MaxFeeAllocation: DefaultMaxLinkFeeAllocation,
NotifyActiveChannel: func(wire.OutPoint) {},
NotifyInactiveChannel: func(wire.OutPoint) {},
}, },
channel, channel,
) )

@ -576,6 +576,8 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
TowerClient: p.server.towerClient, TowerClient: p.server.towerClient,
MaxOutgoingCltvExpiry: cfg.MaxOutgoingCltvExpiry, MaxOutgoingCltvExpiry: cfg.MaxOutgoingCltvExpiry,
MaxFeeAllocation: cfg.MaxChannelFeeAllocation, MaxFeeAllocation: cfg.MaxChannelFeeAllocation,
NotifyActiveChannel: p.server.channelNotifier.NotifyActiveChannelEvent,
NotifyInactiveChannel: p.server.channelNotifier.NotifyInactiveChannelEvent,
} }
link := htlcswitch.NewChannelLink(linkCfg, lnChan) link := htlcswitch.NewChannelLink(linkCfg, lnChan)

@ -430,8 +430,6 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
FwdEventTicker: ticker.New(htlcswitch.DefaultFwdEventInterval), FwdEventTicker: ticker.New(htlcswitch.DefaultFwdEventInterval),
LogEventTicker: ticker.New(htlcswitch.DefaultLogInterval), LogEventTicker: ticker.New(htlcswitch.DefaultLogInterval),
AckEventTicker: ticker.New(htlcswitch.DefaultAckInterval), AckEventTicker: ticker.New(htlcswitch.DefaultAckInterval),
NotifyActiveChannel: s.channelNotifier.NotifyActiveChannelEvent,
NotifyInactiveChannel: s.channelNotifier.NotifyInactiveChannelEvent,
RejectHTLC: cfg.RejectHTLC, RejectHTLC: cfg.RejectHTLC,
}, uint32(currentHeight)) }, uint32(currentHeight))
if err != nil { if err != nil {