contractcourt: make synchronous chain watcher notifications optional
In this commit, we modify the way that notifications are dispatched within the chainWatcher. Before we would *always* wait for an ack back before we started to clean up he database state. This would at times lead to deadlocks. To remedy this, we now allow callers to decide if they want notifications to be sync or not. The only current caller that requires this is the breach arbiter.
This commit is contained in:
parent
f85f1f97ca
commit
5df6704a9c
@ -339,7 +339,7 @@ func (c *ChainArbitrator) Start() error {
|
|||||||
|
|
||||||
c.activeWatchers[channel.FundingOutpoint] = chainWatcher
|
c.activeWatchers[channel.FundingOutpoint] = chainWatcher
|
||||||
channelArb, err := newActiveChannelArbitrator(
|
channelArb, err := newActiveChannelArbitrator(
|
||||||
channel, c, chainWatcher.SubscribeChannelEvents(),
|
channel, c, chainWatcher.SubscribeChannelEvents(false),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -667,7 +667,7 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error
|
|||||||
// We'll also create a new channel arbitrator instance using this new
|
// We'll also create a new channel arbitrator instance using this new
|
||||||
// channel, and our internal state.
|
// channel, and our internal state.
|
||||||
channelArb, err := newActiveChannelArbitrator(
|
channelArb, err := newActiveChannelArbitrator(
|
||||||
newChan, c, chainWatcher.SubscribeChannelEvents(),
|
newChan, c, chainWatcher.SubscribeChannelEvents(false),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -687,12 +687,15 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error
|
|||||||
// SubscribeChannelEvents returns a new active subscription for the set of
|
// SubscribeChannelEvents returns a new active subscription for the set of
|
||||||
// possible on-chain events for a particular channel. The struct can be used by
|
// possible on-chain events for a particular channel. The struct can be used by
|
||||||
// callers to be notified whenever an event that changes the state of the
|
// callers to be notified whenever an event that changes the state of the
|
||||||
// channel on-chain occurs.
|
// channel on-chain occurs. If syncDispatch is true, then the sender of the
|
||||||
|
// notification will wait until an error is sent over the ProcessACK before
|
||||||
|
// modifying any database state. This allows callers to request a reliable hand
|
||||||
|
// off.
|
||||||
//
|
//
|
||||||
// TODO(roasbeef): can be used later to provide RPC hook for all channel
|
// TODO(roasbeef): can be used later to provide RPC hook for all channel
|
||||||
// lifetimes
|
// lifetimes
|
||||||
func (c *ChainArbitrator) SubscribeChannelEvents(
|
func (c *ChainArbitrator) SubscribeChannelEvents(
|
||||||
chanPoint wire.OutPoint) (*ChainEventSubscription, error) {
|
chanPoint wire.OutPoint, syncDispatch bool) (*ChainEventSubscription, error) {
|
||||||
|
|
||||||
// First, we'll attempt to look up the active watcher for this channel.
|
// First, we'll attempt to look up the active watcher for this channel.
|
||||||
// If we can't find it, then we'll return an error back to the caller.
|
// If we can't find it, then we'll return an error back to the caller.
|
||||||
@ -704,7 +707,7 @@ func (c *ChainArbitrator) SubscribeChannelEvents(
|
|||||||
|
|
||||||
// With the watcher located, we'll request for it to create a new chain
|
// With the watcher located, we'll request for it to create a new chain
|
||||||
// event subscription client.
|
// event subscription client.
|
||||||
return watcher.SubscribeChannelEvents(), nil
|
return watcher.SubscribeChannelEvents(syncDispatch), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BeginCoopChanClose allows the initiator or responder to a cooperative
|
// BeginCoopChanClose allows the initiator or responder to a cooperative
|
||||||
|
@ -45,7 +45,10 @@ type ChainEventSubscription struct {
|
|||||||
// synchronize dispatch and processing of the notification with the act
|
// synchronize dispatch and processing of the notification with the act
|
||||||
// of updating the state of the channel on disk. This ensures that the
|
// of updating the state of the channel on disk. This ensures that the
|
||||||
// event can be reliably handed off.
|
// event can be reliably handed off.
|
||||||
ProcessACK chan struct{}
|
//
|
||||||
|
// NOTE: This channel will only be used if the syncDispatch arg passed
|
||||||
|
// into the constructor is true.
|
||||||
|
ProcessACK chan error
|
||||||
|
|
||||||
// Cancel cancels the subscription to the event stream for a particular
|
// Cancel cancels the subscription to the event stream for a particular
|
||||||
// channel. This method should be called once the caller no longer needs to
|
// channel. This method should be called once the caller no longer needs to
|
||||||
@ -89,7 +92,7 @@ type chainWatcher struct {
|
|||||||
signer lnwallet.Signer
|
signer lnwallet.Signer
|
||||||
|
|
||||||
// All the fields below are protected by this mutex.
|
// All the fields below are protected by this mutex.
|
||||||
sync.RWMutex
|
sync.Mutex
|
||||||
|
|
||||||
// clientID is an ephemeral counter used to keep track of each
|
// clientID is an ephemeral counter used to keep track of each
|
||||||
// individual client subscription.
|
// individual client subscription.
|
||||||
@ -207,13 +210,17 @@ func (c *chainWatcher) Stop() error {
|
|||||||
// SubscribeChannelEvents returns a n active subscription to the set of channel
|
// SubscribeChannelEvents returns a n active subscription to the set of channel
|
||||||
// events for the channel watched by this chain watcher. Once clients no longer
|
// events for the channel watched by this chain watcher. Once clients no longer
|
||||||
// require the subscription, they should call the Cancel() method to allow the
|
// require the subscription, they should call the Cancel() method to allow the
|
||||||
// watcher to regain those committed resources.
|
// watcher to regain those committed resources. The syncDispatch bool indicates
|
||||||
func (c *chainWatcher) SubscribeChannelEvents() *ChainEventSubscription {
|
// if the caller would like a synchronous dispatch of the notification. This
|
||||||
c.Lock()
|
// means that the main chain watcher goroutine won't proceed with
|
||||||
defer c.Unlock()
|
// post-processing after the notification until the ProcessACK channel is sent
|
||||||
|
// upon.
|
||||||
|
func (c *chainWatcher) SubscribeChannelEvents(syncDispatch bool) *ChainEventSubscription {
|
||||||
|
|
||||||
|
c.Lock()
|
||||||
clientID := c.clientID
|
clientID := c.clientID
|
||||||
c.clientID++
|
c.clientID++
|
||||||
|
c.Unlock()
|
||||||
|
|
||||||
log.Debugf("New ChainEventSubscription(id=%v) for ChannelPoint(%v)",
|
log.Debugf("New ChainEventSubscription(id=%v) for ChannelPoint(%v)",
|
||||||
clientID, c.chanState.FundingOutpoint)
|
clientID, c.chanState.FundingOutpoint)
|
||||||
@ -231,7 +238,13 @@ func (c *chainWatcher) SubscribeChannelEvents() *ChainEventSubscription {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if syncDispatch {
|
||||||
|
sub.ProcessACK = make(chan error, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Lock()
|
||||||
c.clientSubscriptions[clientID] = sub
|
c.clientSubscriptions[clientID] = sub
|
||||||
|
c.Unlock()
|
||||||
|
|
||||||
return sub
|
return sub
|
||||||
}
|
}
|
||||||
@ -547,7 +560,6 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
broadcastStateNum = remoteCommit.CommitHeight
|
|
||||||
commitTxBroadcast = spendEvent.SpendingTx
|
commitTxBroadcast = spendEvent.SpendingTx
|
||||||
spendHeight = uint32(spendEvent.SpendingHeight)
|
spendHeight = uint32(spendEvent.SpendingHeight)
|
||||||
)
|
)
|
||||||
@ -578,17 +590,20 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Wait for the breach arbiter to ACK the handoff before
|
// Wait for the breach arbiter to ACK the handoff before
|
||||||
// marking the channel as pending force closed in channeldb.
|
// marking the channel as pending force closed in channeldb,
|
||||||
select {
|
// but only if the client requested a sync dispatch.
|
||||||
case <-sub.ProcessACK:
|
if sub.ProcessACK != nil {
|
||||||
// Bail if the handoff failed.
|
select {
|
||||||
if err != nil {
|
case err := <-sub.ProcessACK:
|
||||||
return fmt.Errorf("unable to handoff "+
|
// Bail if the handoff failed.
|
||||||
"retribution info: %v", err)
|
if err != nil {
|
||||||
}
|
return fmt.Errorf("unable to handoff "+
|
||||||
|
"retribution info: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
case <-c.quit:
|
case <-c.quit:
|
||||||
return fmt.Errorf("quitting")
|
return fmt.Errorf("quitting")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.Unlock()
|
c.Unlock()
|
||||||
|
@ -293,7 +293,7 @@ func (c *ChannelArbitrator) Stop() error {
|
|||||||
log.Debugf("Stopping ChannelArbitrator(%v)", c.cfg.ChanPoint)
|
log.Debugf("Stopping ChannelArbitrator(%v)", c.cfg.ChanPoint)
|
||||||
|
|
||||||
if c.cfg.ChainEvents.Cancel != nil {
|
if c.cfg.ChainEvents.Cancel != nil {
|
||||||
c.cfg.ChainEvents.Cancel()
|
go c.cfg.ChainEvents.Cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, activeResolver := range c.activeResolvers {
|
for _, activeResolver := range c.activeResolvers {
|
||||||
@ -1319,7 +1319,6 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32,
|
|||||||
// state, so we'll get the most up to date signals to we can
|
// state, so we'll get the most up to date signals to we can
|
||||||
// properly do our job.
|
// properly do our job.
|
||||||
case signalUpdate := <-c.signalUpdates:
|
case signalUpdate := <-c.signalUpdates:
|
||||||
|
|
||||||
log.Tracef("ChannelArbitrator(%v) got new signal "+
|
log.Tracef("ChannelArbitrator(%v) got new signal "+
|
||||||
"update!", c.cfg.ChanPoint)
|
"update!", c.cfg.ChanPoint)
|
||||||
|
|
||||||
|
8
peer.go
8
peer.go
@ -370,7 +370,9 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
|
|||||||
// Register this new channel link with the HTLC Switch. This is
|
// Register this new channel link with the HTLC Switch. This is
|
||||||
// necessary to properly route multi-hop payments, and forward
|
// necessary to properly route multi-hop payments, and forward
|
||||||
// new payments triggered by RPC clients.
|
// new payments triggered by RPC clients.
|
||||||
chainEvents, err := p.server.chainArb.SubscribeChannelEvents(*chanPoint)
|
chainEvents, err := p.server.chainArb.SubscribeChannelEvents(
|
||||||
|
*chanPoint, false,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -1259,7 +1261,9 @@ out:
|
|||||||
peerLog.Errorf("unable to get best block: %v", err)
|
peerLog.Errorf("unable to get best block: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
chainEvents, err := p.server.chainArb.SubscribeChannelEvents(*chanPoint)
|
chainEvents, err := p.server.chainArb.SubscribeChannelEvents(
|
||||||
|
*chanPoint, false,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peerLog.Errorf("unable to subscribe to chain "+
|
peerLog.Errorf("unable to subscribe to chain "+
|
||||||
"events: %v", err)
|
"events: %v", err)
|
||||||
|
15
server.go
15
server.go
@ -417,11 +417,16 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl,
|
|||||||
GenSweepScript: func() ([]byte, error) {
|
GenSweepScript: func() ([]byte, error) {
|
||||||
return newSweepPkScript(cc.wallet)
|
return newSweepPkScript(cc.wallet)
|
||||||
},
|
},
|
||||||
Notifier: cc.chainNotifier,
|
Notifier: cc.chainNotifier,
|
||||||
PublishTransaction: cc.wallet.PublishTransaction,
|
PublishTransaction: cc.wallet.PublishTransaction,
|
||||||
SubscribeChannelEvents: s.chainArb.SubscribeChannelEvents,
|
SubscribeChannelEvents: func(chanPoint wire.OutPoint) (*contractcourt.ChainEventSubscription, error) {
|
||||||
Signer: cc.wallet.Cfg.Signer,
|
// We'll request a sync dispatch to ensure that the channel
|
||||||
Store: newRetributionStore(chanDB),
|
// is only marked as closed *after* we update our internal
|
||||||
|
// state.
|
||||||
|
return s.chainArb.SubscribeChannelEvents(chanPoint, true)
|
||||||
|
},
|
||||||
|
Signer: cc.wallet.Cfg.Signer,
|
||||||
|
Store: newRetributionStore(chanDB),
|
||||||
})
|
})
|
||||||
|
|
||||||
// Create the connection manager which will be responsible for
|
// Create the connection manager which will be responsible for
|
||||||
|
Loading…
Reference in New Issue
Block a user