discovery: fix deadlock in processChanPolicyUpdate

In this commit, we fix an existing deadlock in the
processChanPolicyUpdate method. Before this commit, within
processChanPolicyUpdate, we would directly call updateChannel *within*
the ForEachChannel closure. This would at times result in a deadlock, as
updateChannel will itself attempt to create a write transaction in order
to persist the newly updated channel.

We fix this deadlock by simply performing another loop once we know the
set of channels that we wish to update. This second loop will actually
update the channels on disk.
This commit is contained in:
Olaoluwa Osuntokun 2018-04-03 19:46:06 -07:00
parent 80d57f3ddf
commit 9d4cea93f0
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -1097,8 +1097,9 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels() error {
for _, chanToUpdate := range edgesToUpdate {
// Re-sign and update the channel on disk and retrieve our
// ChannelUpdate to broadcast.
chanAnn, chanUpdate, err := d.updateChannel(chanToUpdate.info,
chanToUpdate.edge)
chanAnn, chanUpdate, err := d.updateChannel(
chanToUpdate.info, chanToUpdate.edge,
)
if err != nil {
return fmt.Errorf("unable to update channel: %v", err)
}
@ -1131,8 +1132,8 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels() error {
// processChanPolicyUpdate generates a new set of channel updates with the new
// channel policy applied for each specified channel identified by its channel
// point. In the case that no channel points are specified, then the update will
// be applied to all channels. Finally, the backing ChannelGraphSource is
// point. In the case that no channel points are specified, then the update
// will be applied to all channels. Finally, the backing ChannelGraphSource is
// updated with the latest information reflecting the applied updates.
//
// TODO(roasbeef): generalize into generic for any channel update
@ -1147,12 +1148,17 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
haveChanFilter := len(chansToUpdate) != 0
var chanUpdates []networkMsg
type edgeWithInfo struct {
info *channeldb.ChannelEdgeInfo
edge *channeldb.ChannelEdgePolicy
}
var edgesToUpdate []edgeWithInfo
// Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all.
err := d.cfg.Router.ForAllOutgoingChannels(func(info *channeldb.ChannelEdgeInfo,
err := d.cfg.Router.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
// If we have a channel filter, and this channel isn't a part
@ -1161,20 +1167,37 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
return nil
}
// Apply the new fee schema to the edge.
// Now that we know we should update this channel, we'll update
// its set of policies.
edge.FeeBaseMSat = policyUpdate.newSchema.BaseFee
edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
policyUpdate.newSchema.FeeRate,
)
// Apply the new TimeLockDelta.
edge.TimeLockDelta = uint16(policyUpdate.newSchema.TimeLockDelta)
// Re-sign and update the backing ChannelGraphSource, and
edgesToUpdate = append(edgesToUpdate, edgeWithInfo{
info: info,
edge: edge,
})
return nil
})
if err != nil {
return nil, err
}
// With the set of edges we need to update retrieved, we'll now re-sign
// them, and insert them into the database.
var chanUpdates []networkMsg
for _, edgeInfo := range edgesToUpdate {
// Now that we've collected all the channels we need to update,
// we'll Re-sign and update the backing ChannelGraphSource, and
// retrieve our ChannelUpdate to broadcast.
_, chanUpdate, err := d.updateChannel(info, edge)
_, chanUpdate, err := d.updateChannel(
edgeInfo.info, edgeInfo.edge,
)
if err != nil {
return err
return nil, err
}
// We set ourselves as the source of this message to indicate
@ -1183,11 +1206,6 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
peer: d.selfKey,
msg: chanUpdate,
})
return nil
})
if err != nil {
return nil, err
}
return chanUpdates, nil