Merge pull request #1748 from cfromknecht/relax-circuit-deletion

[htlcswitch]: Relax Circuit Deletion
This commit is contained in:
Olaoluwa Osuntokun 2018-08-20 18:42:58 -07:00 committed by GitHub
commit dddda7e0f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 53 deletions

@ -225,8 +225,10 @@ func (cm *circuitMap) initBuckets() error {
// restoreMemState loads the contents of the half circuit and full circuit // restoreMemState loads the contents of the half circuit and full circuit
// buckets from disk and reconstructs the in-memory representation of the // buckets from disk and reconstructs the in-memory representation of the
// circuit map. Afterwards, the state of the hash index is reconstructed using // circuit map. Afterwards, the state of the hash index is reconstructed using
// the recovered set of full circuits. // the recovered set of full circuits. This method will also remove any stray
// keystones, which are those that appear fully-opened, but have no pending
// circuit related to the intended incoming link.
func (cm *circuitMap) restoreMemState() error { func (cm *circuitMap) restoreMemState() error {
log.Infof("Restoring in-memory circuit state from disk") log.Infof("Restoring in-memory circuit state from disk")
@ -235,7 +237,7 @@ func (cm *circuitMap) restoreMemState() error {
pending = make(map[CircuitKey]*PaymentCircuit) pending = make(map[CircuitKey]*PaymentCircuit)
) )
if err := cm.cfg.DB.View(func(tx *bolt.Tx) error { if err := cm.cfg.DB.Update(func(tx *bolt.Tx) error {
// Restore any of the circuits persisted in the circuit bucket // Restore any of the circuits persisted in the circuit bucket
// back into memory. // back into memory.
circuitBkt := tx.Bucket(circuitAddKey) circuitBkt := tx.Bucket(circuitAddKey)
@ -264,6 +266,7 @@ func (cm *circuitMap) restoreMemState() error {
return ErrCorruptedCircuitMap return ErrCorruptedCircuitMap
} }
var strayKeystones []Keystone
if err := keystoneBkt.ForEach(func(k, v []byte) error { if err := keystoneBkt.ForEach(func(k, v []byte) error {
var ( var (
inKey CircuitKey inKey CircuitKey
@ -280,15 +283,45 @@ func (cm *circuitMap) restoreMemState() error {
// Retrieve the pending circuit, set its keystone, then // Retrieve the pending circuit, set its keystone, then
// add it to the opened map. // add it to the opened map.
circuit := pending[inKey] circuit, ok := pending[inKey]
circuit.Outgoing = outKey if ok {
opened[*outKey] = circuit circuit.Outgoing = outKey
opened[*outKey] = circuit
} else {
strayKeystones = append(strayKeystones, Keystone{
InKey: inKey,
OutKey: *outKey,
})
}
return nil return nil
}); err != nil { }); err != nil {
return err return err
} }
// If any stray keystones were found, we'll proceed to prune
// them from the circuit map's persistent storage. This may
// manifest on older nodes that had updated channels before
// their short channel id was set properly. We believe this
// issue has been fixed, though this will allow older nodes to
// recover without additional intervention.
for _, strayKeystone := range strayKeystones {
// As a precaution, we will only cleanup keystones
// related to locally-initiated payments. If a
// documented case of stray keystones emerges for
// forwarded payments, this check should be removed, but
// with extreme caution.
if strayKeystone.OutKey.ChanID != sourceHop {
continue
}
log.Infof("Removing stray keystone: %v", strayKeystone)
err := keystoneBkt.Delete(strayKeystone.OutKey.Bytes())
if err != nil {
return err
}
}
return nil return nil
}); err != nil { }); err != nil {
@ -495,8 +528,13 @@ func (cm *circuitMap) LookupByPaymentHash(hash [32]byte) []*PaymentCircuit {
func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) ( func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) (
*CircuitFwdActions, error) { *CircuitFwdActions, error) {
inKeys := make([]CircuitKey, 0, len(circuits))
for _, circuit := range circuits {
inKeys = append(inKeys, circuit.Incoming)
}
log.Tracef("Committing fresh circuits: %v", newLogClosure(func() string { log.Tracef("Committing fresh circuits: %v", newLogClosure(func() string {
return spew.Sdump(circuits) return spew.Sdump(inKeys)
})) }))
actions := &CircuitFwdActions{} actions := &CircuitFwdActions{}
@ -765,10 +803,12 @@ func (cm *circuitMap) CloseCircuit(outKey CircuitKey) (*PaymentCircuit, error) {
return circuit, nil return circuit, nil
} }
// DeleteCircuits destroys the target circuit by removing it from the circuit map, // DeleteCircuits destroys the target circuits by removing them from the circuit
// additionally removing the circuit's keystone if the HTLC was forwarded // map, additionally removing the circuits' keystones if any HTLCs were
// through an outgoing link. The circuit should be identified by its incoming // forwarded through an outgoing link. The circuits should be identified by its
// circuit key. // incoming circuit key. If a given circuit is not found in the circuit map, it
// will be ignored from the query. This would typically indicate that the
// circuit was already cleaned up at a different point in time.
func (cm *circuitMap) DeleteCircuits(inKeys ...CircuitKey) error { func (cm *circuitMap) DeleteCircuits(inKeys ...CircuitKey) error {
log.Tracef("Deleting resolved circuits: %v", newLogClosure(func() string { log.Tracef("Deleting resolved circuits: %v", newLogClosure(func() string {
@ -781,22 +821,15 @@ func (cm *circuitMap) DeleteCircuits(inKeys ...CircuitKey) error {
) )
cm.mtx.Lock() cm.mtx.Lock()
// First check that all provided keys are still known to the circuit // Remove any references to the circuits from memory, keeping track of
// map. // which circuits were removed, and which ones had been marked closed.
// This can be used to restore these entries later if the persistent
// removal fails.
for _, inKey := range inKeys { for _, inKey := range inKeys {
if _, ok := cm.pending[inKey]; !ok { circuit, ok := cm.pending[inKey]
cm.mtx.Unlock() if !ok {
return ErrUnknownCircuit continue
} }
}
// If no offenders were found, remove any references to the circuit from
// memory, keeping track of which circuits were removed, and which ones
// had been marked closed. This can be used to restore these entries
// later if the persistent removal fails.
for _, inKey := range inKeys {
circuit := cm.pending[inKey]
delete(cm.pending, inKey) delete(cm.pending, inKey)
if _, ok := cm.closed[inKey]; ok { if _, ok := cm.closed[inKey]; ok {

@ -483,8 +483,9 @@ func TestCircuitMapPersistence(t *testing.T) {
// Removing already-removed circuit should return an error. // Removing already-removed circuit should return an error.
err = circuitMap.DeleteCircuits(circuit1.Incoming) err = circuitMap.DeleteCircuits(circuit1.Incoming)
if err == nil { if err != nil {
t.Fatal("Remove did not return expected not found error") t.Fatal("Unexpected failure when deleting already "+
"deleted circuit: %v", err)
} }
// Verify that nothing related to hash1 has changed // Verify that nothing related to hash1 has changed
@ -518,10 +519,17 @@ func TestCircuitMapPersistence(t *testing.T) {
assertNumCircuitsWithHash(t, circuitMap, hash2, 0) assertNumCircuitsWithHash(t, circuitMap, hash2, 0)
assertNumCircuitsWithHash(t, circuitMap, hash3, 1) assertNumCircuitsWithHash(t, circuitMap, hash3, 1)
// Remove last remaining circuit with payment hash hash3. // In removing the final circuit, we will try and remove all other known
err = circuitMap.DeleteCircuits(circuit3.Incoming) // circuits as well. Any circuits that are unknown to the circuit map
// will be ignored, and only circuit 3 should be cause any change in the
// state.
err = circuitMap.DeleteCircuits(
circuit1.Incoming, circuit2.Incoming,
circuit3.Incoming, circuit4.Incoming,
)
if err != nil { if err != nil {
t.Fatalf("Remove returned unexpected error: %v", err) t.Fatalf("Unexpected failure when removing circuit while also "+
"deleting already deleted circuits: %v", err)
} }
// Check that the circuit map is empty, even after restarting. // Check that the circuit map is empty, even after restarting.

@ -585,7 +585,7 @@ func (l *channelLink) syncChanStates() error {
// Ensure that all packets have been have been removed from the // Ensure that all packets have been have been removed from the
// link's mailbox. // link's mailbox.
if err := l.ackDownStreamPackets(true); err != nil { if err := l.ackDownStreamPackets(); err != nil {
return err return err
} }
@ -1493,12 +1493,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// removed from the circuit map before removing them from the link's mailbox, // removed from the circuit map before removing them from the link's mailbox,
// otherwise it could be possible for some circuit to be missed if this link // otherwise it could be possible for some circuit to be missed if this link
// flaps. // flaps.
// func (l *channelLink) ackDownStreamPackets() error {
// The `forgive` flag allows this method to tolerate restarts, and ignores
// errors that could be caused by a previous circuit deletion. Under normal
// operation, this is set to false so that we would fail the link if we were
// unable to remove a circuit.
func (l *channelLink) ackDownStreamPackets(forgive bool) error {
// First, remove the downstream Add packets that were included in the // First, remove the downstream Add packets that were included in the
// previous commitment signature. This will prevent the Adds from being // previous commitment signature. This will prevent the Adds from being
// replayed if this link disconnects. // replayed if this link disconnects.
@ -1524,21 +1519,6 @@ func (l *channelLink) ackDownStreamPackets(forgive bool) error {
case nil: case nil:
// Successful deletion. // Successful deletion.
case ErrUnknownCircuit:
if forgive {
// After a restart, we may have already removed this
// circuit. Since it shouldn't be possible for a
// circuit to be closed by different htlcs, we assume
// this error signals that the whole batch was
// successfully removed.
l.warnf("forgiving unknown circuit error after " +
"attempting deletion, circuit was probably " +
"removed before shutting down.")
break
}
return err
default: default:
l.errorf("unable to delete %d circuits: %v", l.errorf("unable to delete %d circuits: %v",
len(l.closedCircuits), err) len(l.closedCircuits), err)
@ -1603,7 +1583,7 @@ func (l *channelLink) updateCommitTx() error {
return err return err
} }
if err := l.ackDownStreamPackets(false); err != nil { if err := l.ackDownStreamPackets(); err != nil {
return err return err
} }