htlcswitch: add new ticker in switch to batch log forwarding events

In this commit, we extend the switch as is, to record details
concerning settled payment circuits. To do this, we introduce a new
interface to the package: the ForwardingLog. This is a tiny interface
that simply lets us abstract away the details of the storage backing of
the forwarding log.

Each time we receive a successful HTLC settle, we’ll log the full
details (chans, fees, time) as a pending forwarding log entry. Every 15
seconds, we’ll then batch flush out these entries to disk. When we’re
exiting, we’ll try to flush out all entries to ensure everything gets
recorded to disk.
This commit is contained in:
Olaoluwa Osuntokun 2018-02-27 22:18:52 -08:00
parent 6f11fee1a4
commit ad522a72c1
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
3 changed files with 130 additions and 9 deletions

@ -121,3 +121,15 @@ type Peer interface {
// properly handle.
Disconnect(reason error)
}
// ForwardingLog is an interface that represents a time series database which
// keep track of all successfully completed payment circuits. Every few
// seconds, the switch will collate and flush out all the successful payment
// circuits during the last interval.
type ForwardingLog interface {
// AddForwardingEvents is a method that should write out the set of
// forwarding events in a batch to persistent storage. Outside
// sub-systems can then query the contents of the log for analysis,
// visualizations, etc.
AddForwardingEvents([]channeldb.ForwardingEvent) error
}

@ -80,6 +80,22 @@ func (m *mockFeeEstimator) Stop() error {
var _ lnwallet.FeeEstimator = (*mockFeeEstimator)(nil)
type mockForwardingLog struct {
sync.Mutex
events map[time.Time]channeldb.ForwardingEvent
}
func (m *mockForwardingLog) AddForwardingEvents(events []channeldb.ForwardingEvent) error {
m.Lock()
defer m.Unlock()
for _, event := range events {
m.events[event.Timestamp] = event
}
return nil
}
type mockServer struct {
started int32
shutdown int32

@ -13,6 +13,7 @@ import (
"github.com/roasbeef/btcd/btcec"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
@ -99,6 +100,12 @@ type Config struct {
// properly route around link./vertex failures.
SelfKey *btcec.PublicKey
// FwdingLog is an interface that will be used by the switch to log
// forwarding events. A forwarding event happens each time a payment
// circuit is successfully completed. So when we forward an HTLC, and a
// settle is eventually received.
FwdingLog ForwardingLog
// LocalChannelClose kicks-off the workflow to execute a cooperative or
// forced unilateral closure of the channel initiated by a local
// subsystem.
@ -169,6 +176,12 @@ type Switch struct {
// linkControl is a channel used to propagate add/remove/get htlc
// switch handler commands.
linkControl chan interface{}
// pendingFwdingEvents is the set of forwarding events which have been
// collected during the current interval, but hasn't yet been written
// to the forwarding log.
fwdEventMtx sync.Mutex
pendingFwdingEvents []channeldb.ForwardingEvent
}
// New creates the new instance of htlc switch.
@ -553,8 +566,8 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
// the ultimate settle message back latter.
case *lnwire.UpdateAddHTLC:
if packet.incomingChanID == (lnwire.ShortChannelID{}) {
// A blank incomingChanID indicates that this is a pending
// user-initiated payment.
// A blank incomingChanID indicates that this is a
// pending user-initiated payment.
return s.handleLocalDispatch(packet)
}
@ -606,8 +619,8 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
}
if link.Bandwidth() >= htlc.Amount {
destination = link
break
}
}
@ -665,9 +678,12 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
return err
}
// Remove circuit since we are about to complete the HTLC.
err := s.circuits.Remove(packet.outgoingChanID,
packet.outgoingHTLCID)
// Remove the circuit since we are about to complete
// the HTLC.
err := s.circuits.Remove(
packet.outgoingChanID,
packet.outgoingHTLCID,
)
if err != nil {
log.Warnf("Failed to close completed onion circuit for %x: "+
"(%s, %d) <-> (%s, %d)", circuit.PaymentHash,
@ -683,6 +699,29 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
packet.incomingChanID = circuit.IncomingChanID
packet.incomingHTLCID = circuit.IncomingHTLCID
// If this is an HTLC settle, and it wasn't from a
// locally initiated HTLC, then we'll log a forwarding
// event so we can flush it to disk later.
//
// TODO(roasbeef): only do this once link actually
// fully settles?
_, isSettle := packet.htlc.(*lnwire.UpdateFulfillHTLC)
localHTLC := packet.incomingChanID == (lnwire.ShortChannelID{})
if isSettle && !localHTLC {
s.fwdEventMtx.Lock()
s.pendingFwdingEvents = append(
s.pendingFwdingEvents,
channeldb.ForwardingEvent{
Timestamp: time.Now(),
IncomingChanID: circuit.IncomingChanID,
OutgoingChanID: circuit.OutgoingChanID,
AmtIn: circuit.IncomingAmt,
AmtOut: circuit.OutgoingAmt,
},
)
s.fwdEventMtx.Unlock()
}
// Obfuscate the error message for fail updates before
// sending back through the circuit unless the payment
// was generated locally.
@ -715,9 +754,11 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
}
}
// A blank IncomingChanID in a circuit indicates that it is a
// pending user-initiated payment.
if packet.incomingChanID == (lnwire.ShortChannelID{}) {
// For local HTLC's we'll dispatch the settle event back to the
// caller, rather than to the peer that sent us the HTLC
// originally.
localHTLC := packet.incomingChanID == (lnwire.ShortChannelID{})
if localHTLC {
return s.handleLocalDispatch(packet)
}
@ -790,6 +831,13 @@ func (s *Switch) htlcForwarder() {
"channel link on stop: %v", err)
}
}
// Before we exit fully, we'll attempt to flush out any
// forwarding events that may still be lingering since the last
// batch flush.
if err := s.FlushForwardingEvents(); err != nil {
log.Errorf("unable to flush forwarding events: %v", err)
}
}()
// TODO(roasbeef): cleared vs settled distinction
@ -801,6 +849,11 @@ func (s *Switch) htlcForwarder() {
logTicker := time.NewTicker(10 * time.Second)
defer logTicker.Stop()
// Every 15 seconds, we'll flush out the forwarding events that
// occurred during that period.
fwdEventTicker := time.NewTicker(15 * time.Second)
defer fwdEventTicker.Stop()
for {
select {
// A local close request has arrived, we'll forward this to the
@ -861,6 +914,17 @@ func (s *Switch) htlcForwarder() {
case cmd := <-s.htlcPlex:
cmd.err <- s.handlePacketForward(cmd.pkt)
// When this time ticks, then it indicates that we should
// collect all the forwarding events since the last internal,
// and write them out to our log.
case <-fwdEventTicker.C:
go func() {
if err := s.FlushForwardingEvents(); err != nil {
log.Errorf("unable to flush "+
"forwarding events: %v", err)
}
}()
// The log ticker has fired, so we'll calculate some forwarding
// stats for the last 10 seconds to display within the logs to
// users.
@ -1307,3 +1371,32 @@ func (s *Switch) numPendingPayments() int {
func (s *Switch) addCircuit(circuit *PaymentCircuit) {
s.circuits.Add(circuit)
}
// FlushForwardingEvents flushes out the set of pending forwarding events to
// the persistent log. This will be used by the switch to periodically flush
// out the set of forwarding events to disk. External callers can also use this
// method to ensure all data is flushed to dis before querying the log.
func (s *Switch) FlushForwardingEvents() error {
// First, we'll obtain a copy of the current set of pending forwarding
// events.
s.fwdEventMtx.Lock()
// If we won't have any forwarding events, then we can exit early.
if len(s.pendingFwdingEvents) == 0 {
s.fwdEventMtx.Unlock()
return nil
}
events := make([]channeldb.ForwardingEvent, len(s.pendingFwdingEvents))
copy(events[:], s.pendingFwdingEvents[:])
// With the copy obtained, we can now clear out the header pointer of
// the current slice. This way, we can re-use the underlying storage
// allocated for the slice.
s.pendingFwdingEvents = s.pendingFwdingEvents[:0]
s.fwdEventMtx.Unlock()
// Finally, we'll write out the copied events to the persistent
// forwarding log.
return s.cfg.FwdingLog.AddForwardingEvents(events)
}