htlcswitch/switch: permit link shutdown mid-forwarding

In this commit, we thread through a link's quit channel into
routeAsync, the primary helper method allowing links to send
htlcPackets through the switch. This is intended to remove
deadlocks from happening, where the link is synchronously
blocking on forwarding packets to the switch, but also
needs to shutdown.
This commit is contained in:
Conner Fromknecht 2018-07-30 13:17:39 -07:00
parent 0fef1c71fe
commit f84cd14b12
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

@ -527,12 +527,15 @@ func (s *Switch) forward(packet *htlcPacket) error {
// ForwardPackets adds a list of packets to the switch for processing. Fails
// and settles are added on a first past, simultaneously constructing circuits
// for any adds. After persisting the circuits, another pass of the adds is
// given to forward them through the router.
// given to forward them through the router. The sending link's quit channel is
// used to prevent deadlocks when the switch stops a link in the midst of
// forwarding.
//
// NOTE: This method guarantees that the returned err chan will eventually be
// closed. The receiver should read on the channel until receiving such a
// signal.
func (s *Switch) ForwardPackets(packets ...*htlcPacket) chan error {
func (s *Switch) ForwardPackets(linkQuit chan struct{},
packets ...*htlcPacket) chan error {
var (
// fwdChan is a buffered channel used to receive err msgs from
@ -568,6 +571,9 @@ func (s *Switch) ForwardPackets(packets ...*htlcPacket) chan error {
// so, we exit early to avoid incrementing the switch's waitgroup while
// it is already in the process of shutting down.
select {
case <-linkQuit:
close(errChan)
return errChan
case <-s.quit:
close(errChan)
return errChan
@ -593,7 +599,10 @@ func (s *Switch) ForwardPackets(packets ...*htlcPacket) chan error {
circuits = append(circuits, circuit)
addBatch = append(addBatch, packet)
default:
s.routeAsync(packet, fwdChan)
err := s.routeAsync(packet, fwdChan, linkQuit)
if err != nil {
return errChan
}
numSent++
}
}
@ -635,7 +644,10 @@ func (s *Switch) ForwardPackets(packets ...*htlcPacket) chan error {
// Now, forward any packets for circuits that were successfully added to
// the switch's circuit map.
for _, packet := range addedPackets {
s.routeAsync(packet, fwdChan)
err := s.routeAsync(packet, fwdChan, linkQuit)
if err != nil {
return errChan
}
numSent++
}
@ -722,9 +734,13 @@ func (s *Switch) route(packet *htlcPacket) error {
}
// routeAsync sends a packet through the htlc switch, using the provided err
// chan to propagate errors back to the caller. This method does not wait for
// a response before returning.
func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error) error {
// chan to propagate errors back to the caller. The link's quit channel is
// provided so that the send can be canceled if either the link or the switch
// receive a shutdown requuest. This method does not wait for a response from
// the htlcForwarder before returning.
func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error,
linkQuit chan struct{}) error {
command := &plexPacket{
pkt: packet,
err: errChan,
@ -733,6 +749,8 @@ func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error) error {
select {
case s.htlcPlex <- command:
return nil
case <-linkQuit:
return ErrLinkShuttingDown
case <-s.quit:
return errors.New("Htlc Switch was stopped")
}
@ -1734,7 +1752,10 @@ func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) {
}
}
errChan := s.ForwardPackets(switchPackets...)
// Since this send isn't tied to a specific link, we pass a nil
// link quit channel, meaning the send will fail only if the
// switch receives a shutdown request.
errChan := s.ForwardPackets(nil, switchPackets...)
go handleBatchFwdErrs(errChan)
}
}