htlcswitch/circuit_map: moves circuit map impl to own file
This commit is contained in:
parent
067b261602
commit
0b71f74199
846
htlcswitch/circuit_map.go
Normal file
846
htlcswitch/circuit_map.go
Normal file
@ -0,0 +1,846 @@
|
||||
package htlcswitch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/go-errors/errors"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrCorruptedCircuitMap indicates that the on-disk bucketing structure
|
||||
// has altered since the circuit map instance was initialized.
|
||||
ErrCorruptedCircuitMap = errors.New("circuit map has been corrupted")
|
||||
|
||||
// ErrCircuitNotInHashIndex indicates that a particular circuit did not
|
||||
// appear in the in-memory hash index.
|
||||
ErrCircuitNotInHashIndex = errors.New("payment circuit not found in " +
|
||||
"hash index")
|
||||
|
||||
// ErrUnknownCircuit signals that circuit could not be removed from the
|
||||
// map because it was not found.
|
||||
ErrUnknownCircuit = errors.New("unknown payment circuit")
|
||||
|
||||
// ErrCircuitClosing signals that an htlc has already closed this
|
||||
// circuit in-memory.
|
||||
ErrCircuitClosing = errors.New("circuit has already been closed")
|
||||
|
||||
// ErrDuplicateCircuit signals that this circuit was previously
|
||||
// added.
|
||||
ErrDuplicateCircuit = errors.New("duplicate circuit add")
|
||||
|
||||
// ErrUnknownKeystone signals that no circuit was found using the
|
||||
// outgoing circuit key.
|
||||
ErrUnknownKeystone = errors.New("unknown circuit keystone")
|
||||
|
||||
// ErrDuplicateKeystone signals that this circuit was previously
|
||||
// assigned a keystone.
|
||||
ErrDuplicateKeystone = errors.New("cannot add duplicate keystone")
|
||||
)
|
||||
|
||||
// CircuitModifier is a common interface used by channel links to modify the
|
||||
// contents of the circuit map maintained by the switch.
|
||||
type CircuitModifier interface {
|
||||
// OpenCircuits preemptively records a batch keystones that will mark
|
||||
// currently pending circuits as open. These changes can be rolled back
|
||||
// on restart if the outgoing Adds do not make it into a commitment txn.
|
||||
OpenCircuits(...Keystone) error
|
||||
|
||||
// TrimOpenCircuits removes a channel's open channels with htlc indexes
|
||||
// above `start`.
|
||||
TrimOpenCircuits(chanID lnwire.ShortChannelID, start uint64) error
|
||||
|
||||
// DeleteCircuits removes the incoming circuit key to remove all
|
||||
// persistent references to a circuit. Returns a ErrUnknownCircuit if
|
||||
// any of the incoming keys are not known.
|
||||
DeleteCircuits(inKeys ...CircuitKey) error
|
||||
}
|
||||
|
||||
// CircuitFwdActions represents the forwarding decision made by the circuit map,
|
||||
// and is returned from CommitCircuits. The sequence of circuits provided to
|
||||
// CommitCircuits is split into three subsequences, allowing the caller to do an
|
||||
// in-order scan, comparing the head of each subsequence, to determine the
|
||||
// decision made by the circuit map.
|
||||
type CircuitFwdActions struct {
|
||||
// Adds is the subsequence of circuits that were successfully committed
|
||||
// in the circuit map.
|
||||
Adds []*PaymentCircuit
|
||||
|
||||
// Drops is the subsequence of circuits for which no action should be
|
||||
// done.
|
||||
Drops []*PaymentCircuit
|
||||
|
||||
// Fails is the subsequence of circuits that should be failed back by
|
||||
// the calling link.
|
||||
Fails []*PaymentCircuit
|
||||
}
|
||||
|
||||
// CircuitMap is an interface for managing the construction and teardown of
|
||||
// payment circuits used by the switch.
|
||||
type CircuitMap interface {
|
||||
CircuitModifier
|
||||
|
||||
// CommitCircuits attempts to add the given circuits to the circuit
|
||||
// map. The list of circuits is split into three distinct subsequences,
|
||||
// corresponding to adds, drops, and fails. Adds should be forwarded to
|
||||
// the switch, while fails should be failed back locally within the
|
||||
// calling link.
|
||||
CommitCircuits(circuit ...*PaymentCircuit) (*CircuitFwdActions, error)
|
||||
|
||||
// CloseCircuit marks the circuit identified by `outKey` as closing
|
||||
// in-memory, which prevents duplicate settles/fails from completing an
|
||||
// open circuit twice.
|
||||
CloseCircuit(outKey CircuitKey) (*PaymentCircuit, error)
|
||||
|
||||
// FailCircuit is used by locally failed HTLCs to mark the circuit
|
||||
// identified by `inKey` as closing in-memory, which prevents duplicate
|
||||
// settles/fails from being accepted for the same circuit.
|
||||
FailCircuit(inKey CircuitKey) (*PaymentCircuit, error)
|
||||
|
||||
// LookupCircuit queries the circuit map for the circuit identified by
|
||||
// inKey.
|
||||
LookupCircuit(inKey CircuitKey) *PaymentCircuit
|
||||
|
||||
// LookupOpenCircuit queries the circuit map for a circuit identified by
|
||||
// its outgoing circuit key.
|
||||
LookupOpenCircuit(outKey CircuitKey) *PaymentCircuit
|
||||
|
||||
// LookupByPaymentHash queries the circuit map and returns all open
|
||||
// circuits that use the given payment hash.
|
||||
LookupByPaymentHash(hash [32]byte) []*PaymentCircuit
|
||||
|
||||
// NumPending returns the total number of active circuits added by
|
||||
// CommitCircuits.
|
||||
NumPending() int
|
||||
|
||||
// NumOpen returns the number of circuits with HTLCs that have been
|
||||
// forwarded via an outgoing link.
|
||||
NumOpen() int
|
||||
}
|
||||
|
||||
var (
|
||||
// circuitAddKey is the key used to retrieve the bucket containing
|
||||
// payment circuits. A circuit records information about how to return a
|
||||
// packet to the source link, potentially including an error encrypter
|
||||
// for applying this hop's encryption to the payload in the reverse
|
||||
// direction.
|
||||
circuitAddKey = []byte("circuit-adds")
|
||||
|
||||
// circuitKeystoneKey is used to retrieve the bucket containing circuit
|
||||
// keystones, which are set in place once a forwarded packet is assigned
|
||||
// an index on an outgoing commitment txn.
|
||||
circuitKeystoneKey = []byte("circuit-keystones")
|
||||
)
|
||||
|
||||
// circuitMap is a data structure that implements thread safe, persistent
|
||||
// storage of circuit routing information. The switch consults a circuit map to
|
||||
// determine where to forward returning HTLC update messages. Circuits are
|
||||
// always identifiable by their incoming CircuitKey, in addition to their
|
||||
// outgoing CircuitKey if the circuit is fully-opened.
|
||||
type circuitMap struct {
|
||||
// db provides the persistent storage engine for the circuit map.
|
||||
// TODO(conner): create abstraction to allow for the substitution of
|
||||
// other persistence engines.
|
||||
db *channeldb.DB
|
||||
|
||||
mtx sync.RWMutex
|
||||
|
||||
// pending is an in-memory mapping of all half payment circuits, and
|
||||
// is kept in sync with the on-disk contents of the circuit map.
|
||||
pending map[CircuitKey]*PaymentCircuit
|
||||
|
||||
// opened is an in-memory mapping of all full payment circuits, which is
|
||||
// also synchronized with the persistent state of the circuit map.
|
||||
opened map[CircuitKey]*PaymentCircuit
|
||||
|
||||
// closed is an in-memory set of circuits for which the switch has
|
||||
// received a settle or fail. This precedes the actual deletion of a
|
||||
// circuit from disk.
|
||||
closed map[CircuitKey]struct{}
|
||||
|
||||
// hashIndex is a volatile index that facilitates fast queries by
|
||||
// payment hash against the contents of circuits. This index can be
|
||||
// reconstructed entirely from the set of persisted full circuits on
|
||||
// startup.
|
||||
hashIndex map[[32]byte]map[CircuitKey]struct{}
|
||||
}
|
||||
|
||||
// NewCircuitMap creates a new instance of the circuitMap.
|
||||
func NewCircuitMap(db *channeldb.DB) (CircuitMap, error) {
|
||||
cm := &circuitMap{
|
||||
db: db,
|
||||
}
|
||||
|
||||
// Initialize the on-disk buckets used by the circuit map.
|
||||
if err := cm.initBuckets(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Load any previously persisted circuit into back into memory.
|
||||
if err := cm.restoreMemState(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Trim any keystones that were not committed in an outgoing commit txn.
|
||||
//
|
||||
// NOTE: This operation will be applied to the persistent state of all
|
||||
// active channels. Therefore, it must be called before any links are
|
||||
// created to avoid interfering with normal operation.
|
||||
if err := cm.trimAllOpenCircuits(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
// initBuckets ensures that the primary buckets used by the circuit are
|
||||
// initialized so that we can assume their existence after startup.
|
||||
func (cm *circuitMap) initBuckets() error {
|
||||
return cm.db.Update(func(tx *bolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(circuitKeystoneKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.CreateBucketIfNotExists(circuitAddKey)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// restoreMemState loads the contents of the half circuit and full circuit buckets
|
||||
// from disk and reconstructs the in-memory representation of the circuit map.
|
||||
// Afterwards, the state of the hash index is reconstructed using the recovered
|
||||
// set of full circuits.
|
||||
func (cm *circuitMap) restoreMemState() error {
|
||||
|
||||
var (
|
||||
opened = make(map[CircuitKey]*PaymentCircuit)
|
||||
pending = make(map[CircuitKey]*PaymentCircuit)
|
||||
)
|
||||
|
||||
if err := cm.db.View(func(tx *bolt.Tx) error {
|
||||
// Restore any of the circuits persisted in the circuit bucket
|
||||
// back into memory.
|
||||
circuitBkt := tx.Bucket(circuitAddKey)
|
||||
if circuitBkt == nil {
|
||||
return ErrCorruptedCircuitMap
|
||||
}
|
||||
|
||||
if err := circuitBkt.ForEach(func(_, v []byte) error {
|
||||
circuit, err := decodeCircuit(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
circuit.LoadedFromDisk = true
|
||||
pending[circuit.Incoming] = circuit
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Furthermore, load the keystone bucket and resurrect the
|
||||
// keystones used in any open circuits.
|
||||
keystoneBkt := tx.Bucket(circuitKeystoneKey)
|
||||
if keystoneBkt == nil {
|
||||
return ErrCorruptedCircuitMap
|
||||
}
|
||||
|
||||
if err := keystoneBkt.ForEach(func(k, v []byte) error {
|
||||
var (
|
||||
inKey CircuitKey
|
||||
outKey = &CircuitKey{}
|
||||
)
|
||||
|
||||
// Decode the incoming and outgoing circuit keys.
|
||||
if err := inKey.SetBytes(v); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := outKey.SetBytes(k); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Retrieve the pending circuit, set its keystone, then
|
||||
// add it to the opened map.
|
||||
circuit := pending[inKey]
|
||||
circuit.Outgoing = outKey
|
||||
opened[*outKey] = circuit
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cm.pending = pending
|
||||
cm.opened = opened
|
||||
cm.closed = make(map[CircuitKey]struct{})
|
||||
|
||||
// Finally, reconstruct the hash index by running through our set of
|
||||
// open circuits.
|
||||
cm.hashIndex = make(map[[32]byte]map[CircuitKey]struct{})
|
||||
for _, circuit := range opened {
|
||||
cm.addCircuitToHashIndex(circuit)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// decodeCircuit reconstructs an in-memory payment circuit from a byte slice.
|
||||
// The byte slice is assumed to have been generated by the circuit's Encode
|
||||
// method.
|
||||
func decodeCircuit(v []byte) (*PaymentCircuit, error) {
|
||||
var circuit = &PaymentCircuit{}
|
||||
|
||||
circuitReader := bytes.NewReader(v)
|
||||
if err := circuit.Decode(circuitReader); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return circuit, nil
|
||||
}
|
||||
|
||||
// trimAllOpenCircuits reads the set of active channels from disk and trims
|
||||
// keystones for any non-pending channels. This method is intended to be called
|
||||
// on startup. Each link will also trim it's own circuits upon startup.
|
||||
//
|
||||
// NOTE: This operation will be applied to the persistent state of all active
|
||||
// channels. Therefore, it must be called before any links are created to avoid
|
||||
// interfering with normal operation.
|
||||
func (cm *circuitMap) trimAllOpenCircuits() error {
|
||||
activeChannels, err := cm.db.FetchAllChannels()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, activeChannel := range activeChannels {
|
||||
if activeChannel.IsPending {
|
||||
continue
|
||||
}
|
||||
|
||||
chanID := activeChannel.ShortChanID
|
||||
start := activeChannel.LocalCommitment.LocalHtlcIndex
|
||||
if err := cm.TrimOpenCircuits(chanID, start); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TrimOpenCircuits removes a channel's keystones above the short chan id's
|
||||
// highest committed htlc index. This has the effect of returning those circuits
|
||||
// to a half-open state. Since opening of circuits is done in advance of
|
||||
// actually committing the Add htlcs into a commitment txn, this allows circuits
|
||||
// to be opened preemetively, since we can roll them back after any failures.
|
||||
func (cm *circuitMap) TrimOpenCircuits(chanID lnwire.ShortChannelID,
|
||||
start uint64) error {
|
||||
|
||||
var trimmedOutKeys []CircuitKey
|
||||
|
||||
// Scan forward from the last unacked htlc id, stopping as soon as we
|
||||
// don't find any more. Outgoing htlc id's must be assigned in order, so
|
||||
// there should never be disjoint segments of keystones to trim.
|
||||
cm.mtx.Lock()
|
||||
for i := start; ; i++ {
|
||||
outKey := CircuitKey{
|
||||
ChanID: chanID,
|
||||
HtlcID: i,
|
||||
}
|
||||
|
||||
circuit, ok := cm.opened[outKey]
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
circuit.Outgoing = nil
|
||||
delete(cm.opened, outKey)
|
||||
trimmedOutKeys = append(trimmedOutKeys, outKey)
|
||||
cm.removeCircuitFromHashIndex(circuit)
|
||||
}
|
||||
cm.mtx.Unlock()
|
||||
|
||||
if len(trimmedOutKeys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return cm.db.Update(func(tx *bolt.Tx) error {
|
||||
keystoneBkt := tx.Bucket(circuitKeystoneKey)
|
||||
if keystoneBkt == nil {
|
||||
return ErrCorruptedCircuitMap
|
||||
}
|
||||
|
||||
for _, outKey := range trimmedOutKeys {
|
||||
err := keystoneBkt.Delete(outKey.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// LookupByHTLC looks up the payment circuit by the outgoing channel and HTLC
|
||||
// IDs. Returns nil if there is no such circuit.
|
||||
func (cm *circuitMap) LookupCircuit(inKey CircuitKey) *PaymentCircuit {
|
||||
cm.mtx.RLock()
|
||||
defer cm.mtx.RUnlock()
|
||||
|
||||
return cm.pending[inKey]
|
||||
}
|
||||
|
||||
// LookupOpenCircuit searches for the circuit identified by its outgoing circuit
|
||||
// key.
|
||||
func (cm *circuitMap) LookupOpenCircuit(outKey CircuitKey) *PaymentCircuit {
|
||||
cm.mtx.RLock()
|
||||
defer cm.mtx.RUnlock()
|
||||
|
||||
return cm.opened[outKey]
|
||||
}
|
||||
|
||||
// LookupByPaymentHash looks up and returns any payment circuits with a given
|
||||
// payment hash.
|
||||
func (cm *circuitMap) LookupByPaymentHash(hash [32]byte) []*PaymentCircuit {
|
||||
cm.mtx.RLock()
|
||||
defer cm.mtx.RUnlock()
|
||||
|
||||
var circuits []*PaymentCircuit
|
||||
if circuitSet, ok := cm.hashIndex[hash]; ok {
|
||||
// Iterate over the outgoing circuit keys found with this hash,
|
||||
// and retrieve the circuit from the opened map.
|
||||
circuits = make([]*PaymentCircuit, 0, len(circuitSet))
|
||||
for key := range circuitSet {
|
||||
if circuit, ok := cm.opened[key]; ok {
|
||||
circuits = append(circuits, circuit)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return circuits
|
||||
}
|
||||
|
||||
// CommitCircuits accepts any number of circuits and persistently adds them to
|
||||
// the switch's circuit map. The method returns a list of circuits that had not
|
||||
// been seen prior by the switch. A link should only forward HTLCs corresponding
|
||||
// to the returned circuits to the switch.
|
||||
//
|
||||
// NOTE: This method uses batched writes to improve performance, gains will only
|
||||
// be realized if it is called concurrently from separate goroutines.
|
||||
func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) (
|
||||
*CircuitFwdActions, error) {
|
||||
|
||||
actions := &CircuitFwdActions{}
|
||||
|
||||
// If an empty list was passed, return early to avoid grabbing the lock.
|
||||
if len(circuits) == 0 {
|
||||
return actions, nil
|
||||
}
|
||||
|
||||
// First, we reconcile the provided circuits with our set of pending
|
||||
// circuits to construct a set of new circuits that need to be written
|
||||
// to disk. The circuit's pointer is stored so that we only permit this
|
||||
// exact circuit to be forwarded through the switch. If a circuit is
|
||||
// already pending, the htlc will be reforwarded by the switch.
|
||||
//
|
||||
// NOTE: We track an additional addFails subsequence, which permits us
|
||||
// to fail back all packets that weren't dropped if we encounter an
|
||||
// error when committing the circuits.
|
||||
cm.mtx.Lock()
|
||||
var adds, drops, fails, addFails []*PaymentCircuit
|
||||
for _, circuit := range circuits {
|
||||
inKey := circuit.InKey()
|
||||
if foundCircuit, ok := cm.pending[inKey]; ok {
|
||||
switch {
|
||||
|
||||
// This circuit has a keystone, it's waiting for a
|
||||
// response from the remote peer on the outgoing link.
|
||||
// Drop it like it's hot, ensure duplicates get caught.
|
||||
case foundCircuit.HasKeystone():
|
||||
drops = append(drops, circuit)
|
||||
|
||||
// If no keystone is set and the switch has not been
|
||||
// restarted, the corresponding packet should still be
|
||||
// in the outgoing link's mailbox. It will be delivered
|
||||
// if it comes online before the switch goes down.
|
||||
//
|
||||
// NOTE: Dropping here prevents a flapping, incoming
|
||||
// link from failing a duplicate add while it is still
|
||||
// in the server's memory mailboxes.
|
||||
case !foundCircuit.LoadedFromDisk:
|
||||
drops = append(drops, circuit)
|
||||
|
||||
// Otherwise, the in-mem packet has been lost due to a
|
||||
// restart. It is now safe to send back a failure along
|
||||
// the incoming link. The incoming link should be able
|
||||
// detect and ignore duplicate packets of this type.
|
||||
default:
|
||||
fails = append(fails, circuit)
|
||||
addFails = append(addFails, circuit)
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
cm.pending[inKey] = circuit
|
||||
adds = append(adds, circuit)
|
||||
addFails = append(addFails, circuit)
|
||||
}
|
||||
cm.mtx.Unlock()
|
||||
|
||||
// If all circuits are dropped or failed, we are done.
|
||||
if len(adds) == 0 {
|
||||
actions.Drops = drops
|
||||
actions.Fails = fails
|
||||
return actions, nil
|
||||
}
|
||||
|
||||
// Now, optimistically serialize the circuits to add.
|
||||
var bs = make([]bytes.Buffer, len(adds))
|
||||
for i, circuit := range adds {
|
||||
if err := circuit.Encode(&bs[i]); err != nil {
|
||||
actions.Drops = drops
|
||||
actions.Fails = addFails
|
||||
return actions, err
|
||||
}
|
||||
}
|
||||
|
||||
// Write the entire batch of circuits to the persistent circuit bucket
|
||||
// using bolt's Batch write. This method must be called from multiple,
|
||||
// distinct goroutines to have any impact on performance.
|
||||
err := cm.db.Batch(func(tx *bolt.Tx) error {
|
||||
circuitBkt := tx.Bucket(circuitAddKey)
|
||||
if circuitBkt == nil {
|
||||
return ErrCorruptedCircuitMap
|
||||
}
|
||||
|
||||
for i, circuit := range adds {
|
||||
inKeyBytes := circuit.InKey().Bytes()
|
||||
circuitBytes := bs[i].Bytes()
|
||||
|
||||
err := circuitBkt.Put(inKeyBytes, circuitBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
// Return if the write succeeded.
|
||||
if err == nil {
|
||||
actions.Adds = adds
|
||||
actions.Drops = drops
|
||||
actions.Fails = fails
|
||||
return actions, nil
|
||||
}
|
||||
|
||||
// Otherwise, rollback the circuits added to the pending set if the
|
||||
// write failed.
|
||||
cm.mtx.Lock()
|
||||
for _, circuit := range adds {
|
||||
delete(cm.pending, circuit.InKey())
|
||||
}
|
||||
cm.mtx.Unlock()
|
||||
|
||||
// Since our write failed, we will return the dropped packets and mark
|
||||
// all other circuits as failed.
|
||||
actions.Drops = drops
|
||||
actions.Fails = addFails
|
||||
|
||||
return actions, err
|
||||
}
|
||||
|
||||
// Keystone is a tuple binding an incoming and outgoing CircuitKey. Keystones
|
||||
// are preemptively written by an outgoing link before signing a new commitment
|
||||
// state, and cements which HTLCs we are awaiting a response from a remote peer.
|
||||
type Keystone struct {
|
||||
InKey CircuitKey
|
||||
OutKey CircuitKey
|
||||
}
|
||||
|
||||
// String returns a human readable description of the Keystone.
|
||||
func (k *Keystone) String() string {
|
||||
return fmt.Sprintf("%s --> %s", k.InKey, k.OutKey)
|
||||
}
|
||||
|
||||
// OpenCircuits sets the outgoing circuit key for the circuit identified by
|
||||
// inKey, persistently marking the circuit as opened. After the changes have
|
||||
// been persisted, the circuit map's in-memory indexes are updated so that this
|
||||
// circuit can be queried using LookupByKeystone or LookupByPaymentHash.
|
||||
func (cm *circuitMap) OpenCircuits(keystones ...Keystone) error {
|
||||
if len(keystones) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check that all keystones correspond to committed-but-unopened
|
||||
// circuits.
|
||||
cm.mtx.RLock()
|
||||
openedCircuits := make([]*PaymentCircuit, 0, len(keystones))
|
||||
for _, ks := range keystones {
|
||||
if _, ok := cm.opened[ks.OutKey]; ok {
|
||||
cm.mtx.RUnlock()
|
||||
return ErrDuplicateKeystone
|
||||
}
|
||||
|
||||
circuit, ok := cm.pending[ks.InKey]
|
||||
if !ok {
|
||||
cm.mtx.RUnlock()
|
||||
return ErrUnknownCircuit
|
||||
}
|
||||
|
||||
openedCircuits = append(openedCircuits, circuit)
|
||||
}
|
||||
cm.mtx.RUnlock()
|
||||
|
||||
err := cm.db.Update(func(tx *bolt.Tx) error {
|
||||
// Now, load the circuit bucket to which we will write the
|
||||
// already serialized circuit.
|
||||
keystoneBkt := tx.Bucket(circuitKeystoneKey)
|
||||
if keystoneBkt == nil {
|
||||
return ErrCorruptedCircuitMap
|
||||
}
|
||||
|
||||
for _, ks := range keystones {
|
||||
outBytes := ks.OutKey.Bytes()
|
||||
inBytes := ks.InKey.Bytes()
|
||||
err := keystoneBkt.Put(outBytes, inBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cm.mtx.Lock()
|
||||
for i, circuit := range openedCircuits {
|
||||
ks := keystones[i]
|
||||
|
||||
// Since our persistent operation was successful, we can now
|
||||
// modify the in memory representations. Set the outgoing
|
||||
// circuit key on our pending circuit, add the same circuit to
|
||||
// set of opened circuits, and add this circuit to the hash
|
||||
// index.
|
||||
circuit.Outgoing = &CircuitKey{}
|
||||
*circuit.Outgoing = ks.OutKey
|
||||
|
||||
cm.opened[ks.OutKey] = circuit
|
||||
cm.addCircuitToHashIndex(circuit)
|
||||
}
|
||||
cm.mtx.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// addCirciutToHashIndex inserts a circuit into the circuit map's hash index, so
|
||||
// that it can be queried using LookupByPaymentHash.
|
||||
func (cm *circuitMap) addCircuitToHashIndex(c *PaymentCircuit) {
|
||||
if _, ok := cm.hashIndex[c.PaymentHash]; !ok {
|
||||
cm.hashIndex[c.PaymentHash] = make(map[CircuitKey]struct{})
|
||||
}
|
||||
cm.hashIndex[c.PaymentHash][c.OutKey()] = struct{}{}
|
||||
}
|
||||
|
||||
// FailCircuit marks the circuit identified by `inKey` as closing in-memory,
|
||||
// which prevents duplicate settles/fails from completing an open circuit twice.
|
||||
func (cm *circuitMap) FailCircuit(
|
||||
inKey CircuitKey) (*PaymentCircuit, error) {
|
||||
|
||||
cm.mtx.Lock()
|
||||
defer cm.mtx.Unlock()
|
||||
|
||||
circuit, ok := cm.pending[inKey]
|
||||
if !ok {
|
||||
return nil, ErrUnknownCircuit
|
||||
}
|
||||
|
||||
_, ok = cm.closed[inKey]
|
||||
if ok {
|
||||
return nil, ErrCircuitClosing
|
||||
}
|
||||
|
||||
cm.closed[inKey] = struct{}{}
|
||||
|
||||
return circuit, nil
|
||||
}
|
||||
|
||||
// CloseCircuit marks the circuit identified by `outKey` as closing
|
||||
// in-memory, which prevents duplicate settles/fails from completing an open
|
||||
// circuit twice.
|
||||
func (cm *circuitMap) CloseCircuit(
|
||||
outKey CircuitKey) (*PaymentCircuit, error) {
|
||||
|
||||
cm.mtx.Lock()
|
||||
defer cm.mtx.Unlock()
|
||||
|
||||
circuit, ok := cm.opened[outKey]
|
||||
if !ok {
|
||||
return nil, ErrUnknownCircuit
|
||||
}
|
||||
|
||||
_, ok = cm.closed[circuit.Incoming]
|
||||
if ok {
|
||||
return nil, ErrCircuitClosing
|
||||
}
|
||||
|
||||
cm.closed[circuit.Incoming] = struct{}{}
|
||||
|
||||
return circuit, nil
|
||||
}
|
||||
|
||||
// DeleteCircuits destroys the target circuit by removing it from the circuit map,
|
||||
// additionally removing the circuit's keystone if the HTLC was forwarded
|
||||
// through an outgoing link. The circuit should be identified by its incoming
|
||||
// circuit key.
|
||||
func (cm *circuitMap) DeleteCircuits(inKeys ...CircuitKey) error {
|
||||
|
||||
var (
|
||||
closingCircuits = make(map[CircuitKey]struct{})
|
||||
removedCircuits = make(map[CircuitKey]*PaymentCircuit)
|
||||
)
|
||||
|
||||
cm.mtx.Lock()
|
||||
// First check that all provided keys are still known to the circuit
|
||||
// map.
|
||||
for _, inKey := range inKeys {
|
||||
if _, ok := cm.pending[inKey]; !ok {
|
||||
cm.mtx.Unlock()
|
||||
return ErrUnknownCircuit
|
||||
}
|
||||
}
|
||||
|
||||
// If no offenders were found, remove any references to the circuit from
|
||||
// memory, keeping track of which circuits were removed, and which ones
|
||||
// had been marked closed. This can be used to restore these entries
|
||||
// later if the persistent removal fails.
|
||||
for _, inKey := range inKeys {
|
||||
circuit := cm.pending[inKey]
|
||||
|
||||
delete(cm.pending, inKey)
|
||||
|
||||
if _, ok := cm.closed[inKey]; ok {
|
||||
closingCircuits[inKey] = struct{}{}
|
||||
delete(cm.closed, inKey)
|
||||
}
|
||||
|
||||
if circuit.HasKeystone() {
|
||||
delete(cm.opened, circuit.OutKey())
|
||||
cm.removeCircuitFromHashIndex(circuit)
|
||||
}
|
||||
|
||||
removedCircuits[inKey] = circuit
|
||||
}
|
||||
cm.mtx.Unlock()
|
||||
|
||||
err := cm.db.Batch(func(tx *bolt.Tx) error {
|
||||
for _, circuit := range removedCircuits {
|
||||
// If this htlc made it to an outgoing link, load the
|
||||
// keystone bucket from which we will remove the
|
||||
// outgoing circuit key.
|
||||
if circuit.HasKeystone() {
|
||||
keystoneBkt := tx.Bucket(circuitKeystoneKey)
|
||||
if keystoneBkt == nil {
|
||||
return ErrCorruptedCircuitMap
|
||||
}
|
||||
|
||||
outKey := circuit.OutKey()
|
||||
|
||||
err := keystoneBkt.Delete(outKey.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the circuit itself based on the incoming
|
||||
// circuit key.
|
||||
circuitBkt := tx.Bucket(circuitAddKey)
|
||||
if circuitBkt == nil {
|
||||
return ErrCorruptedCircuitMap
|
||||
}
|
||||
|
||||
inKey := circuit.InKey()
|
||||
if err := circuitBkt.Delete(inKey.Bytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
// Return if the write succeeded.
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If the persistent changes failed, restore the circuit map to it's
|
||||
// previous state.
|
||||
cm.mtx.Lock()
|
||||
for inKey, circuit := range removedCircuits {
|
||||
cm.pending[inKey] = circuit
|
||||
|
||||
if _, ok := closingCircuits[inKey]; ok {
|
||||
cm.closed[inKey] = struct{}{}
|
||||
}
|
||||
|
||||
if circuit.HasKeystone() {
|
||||
cm.opened[circuit.OutKey()] = circuit
|
||||
cm.addCircuitToHashIndex(circuit)
|
||||
}
|
||||
}
|
||||
cm.mtx.Unlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// removeCircuitFromHashIndex removes the given circuit from the hash index,
|
||||
// pruning any unnecessary memory optimistically.
|
||||
func (cm *circuitMap) removeCircuitFromHashIndex(c *PaymentCircuit) {
|
||||
// Locate bucket containing this circuit's payment hashes.
|
||||
circuitsWithHash, ok := cm.hashIndex[c.PaymentHash]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
outKey := c.OutKey()
|
||||
|
||||
// Remove this circuit from the set of circuitsWithHash.
|
||||
delete(circuitsWithHash, outKey)
|
||||
|
||||
// Prune the payment hash bucket if no other entries remain.
|
||||
if len(circuitsWithHash) == 0 {
|
||||
delete(cm.hashIndex, c.PaymentHash)
|
||||
}
|
||||
}
|
||||
|
||||
// NumPending returns the number of active circuits added to the circuit map.
|
||||
func (cm *circuitMap) NumPending() int {
|
||||
cm.mtx.RLock()
|
||||
defer cm.mtx.RUnlock()
|
||||
|
||||
return len(cm.pending)
|
||||
}
|
||||
|
||||
// NumOpen returns the number of circuits that have been opened by way of
|
||||
// setting their keystones. This is the number of HTLCs that are waiting for a
|
||||
// settle/fail response from a remote peer.
|
||||
func (cm *circuitMap) NumOpen() int {
|
||||
cm.mtx.RLock()
|
||||
defer cm.mtx.RUnlock()
|
||||
|
||||
return len(cm.opened)
|
||||
}
|
Loading…
Reference in New Issue
Block a user