healthcheck: monitor access to chain backend

Add a new health check package which will periodically poll health
check functions and shutdown if we do not succeed after our set number
of attempts. The first check that we add is one for our chain backend,
to ensure that we are connected to a bitcoin node.
This commit is contained in:
carla 2020-08-24 08:54:38 +02:00
parent daae8a9944
commit c365a16656
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91
8 changed files with 633 additions and 0 deletions

View File

@ -86,6 +86,11 @@ const (
// HostAnnouncer will wait between DNS resolutions to check if the
// backing IP of a host has changed.
defaultHostSampleInterval = time.Minute * 5
defaultChainInterval = time.Minute
defaultChainTimeout = time.Second * 10
defaultChainBackoff = time.Second * 30
defaultChainAttempts = 3
)
var (
@ -265,6 +270,8 @@ type Config struct {
AllowCircularRoute bool `long:"allow-circular-route" description:"If true, our node will allow htlc forwards that arrive and depart on the same channel."`
HealthChecks *lncfg.HealthCheckConfig `group:"healthcheck" namespace:"healthcheck"`
DB *lncfg.DB `group:"db" namespace:"db"`
// LogWriter is the root logger that all of the daemon's subloggers are
@ -383,6 +390,14 @@ func DefaultConfig() Config {
Watchtower: &lncfg.Watchtower{
TowerDir: defaultTowerDir,
},
HealthChecks: &lncfg.HealthCheckConfig{
ChainCheck: &lncfg.CheckConfig{
Interval: defaultChainInterval,
Timeout: defaultChainTimeout,
Attempts: defaultChainAttempts,
Backoff: defaultChainBackoff,
},
},
MaxOutgoingCltvExpiry: htlcswitch.DefaultMaxOutgoingCltvExpiry,
MaxChannelFeeAllocation: htlcswitch.DefaultMaxLinkFeeAllocation,
LogWriter: build.NewRotatingLogWriter(),
@ -1124,6 +1139,7 @@ func ValidateConfig(cfg Config, usageMessage string) (*Config, error) {
cfg.Caches,
cfg.WtClient,
cfg.DB,
cfg.HealthChecks,
)
if err != nil {
return nil, err

231
healthcheck/healthcheck.go Normal file
View File

@ -0,0 +1,231 @@
// Package healthcheck contains a monitor which takes a set of liveliness checks
// which it periodically checks. If a check fails after its configured number
// of allowed call attempts, the monitor will send a request to shutdown using
// the function is is provided in its config. Checks are dispatched in their own
// goroutines so that they do not block each other.
package healthcheck
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/lightningnetwork/lnd/ticker"
)
// Config contains configuration settings for our monitor.
type Config struct {
// Checks is a set of health checks that assert that lnd has access to
// critical resources.
Checks []*Observation
// Shutdown should be called to request safe shutdown on failure of a
// health check.
Shutdown shutdownFunc
}
// shutdownFunc is the signature we use for a shutdown function which allows us
// to print our reason for shutdown.
type shutdownFunc func(format string, params ...interface{})
// Monitor periodically checks a series of configured liveliness checks to
// ensure that lnd has access to all critical resources.
type Monitor struct {
started int32 // To be used atomically.
stopped int32 // To be used atomically.
cfg *Config
quit chan struct{}
wg sync.WaitGroup
}
// NewMonitor returns a monitor with the provided config.
func NewMonitor(cfg *Config) *Monitor {
return &Monitor{
cfg: cfg,
quit: make(chan struct{}),
}
}
// Start launches the goroutines required to run our monitor.
func (m *Monitor) Start() error {
if !atomic.CompareAndSwapInt32(&m.started, 0, 1) {
return errors.New("monitor already started")
}
// Run through all of the health checks that we have configured and
// start a goroutine for each check.
for _, check := range m.cfg.Checks {
check := check
// Skip over health checks that are disabled by setting zero
// attempts.
if check.Attempts == 0 {
log.Warnf("check: %v configured with 0 attempts, "+
"skipping it", check.Name)
continue
}
m.wg.Add(1)
go func() {
defer m.wg.Done()
check.monitor(m.cfg.Shutdown, m.quit)
}()
}
return nil
}
// Stop sends all goroutines the signal to exit and waits for them to exit.
func (m *Monitor) Stop() error {
if !atomic.CompareAndSwapInt32(&m.stopped, 0, 1) {
return fmt.Errorf("monitor already stopped")
}
close(m.quit)
m.wg.Wait()
return nil
}
// CreateCheck is a helper function that takes a function that produces an error
// and wraps it in a function that returns its result on an error channel.
// We do not wait group the goroutine running our checkFunc because we expect
// to be dealing with health checks that may block; if we wait group them, we
// may wait forever. Ideally future health checks will allow callers to cancel
// them early, and we can wait group this.
func CreateCheck(checkFunc func() error) func() chan error {
return func() chan error {
errChan := make(chan error, 1)
go func() {
errChan <- checkFunc()
}()
return errChan
}
}
// Observation represents a liveliness check that we periodically check.
type Observation struct {
// Name describes the health check.
Name string
// Check runs the health check itself, returning an error channel that
// is expected to receive nil or an error.
Check func() chan error
// Interval is a ticker which triggers running our check function. This
// ticker must be started and stopped by the observation.
Interval ticker.Ticker
// Attempts is the number of calls we make for a single check before
// failing.
Attempts int
// Timeout is the amount of time we allow our check function to take
// before we time it out.
Timeout time.Duration
// Backoff is the amount of time we back off between retries for failed
// checks.
Backoff time.Duration
}
// NewObservation creates an observation.
func NewObservation(name string, check func() error, interval,
timeout, backoff time.Duration, attempts int) *Observation {
return &Observation{
Name: name,
Check: CreateCheck(check),
Interval: ticker.New(interval),
Attempts: attempts,
Timeout: timeout,
Backoff: backoff,
}
}
// String returns a string representation of an observation.
func (o *Observation) String() string {
return o.Name
}
// monitor executes a health check every time its interval ticks until the quit
// channel signals that we should shutdown. This function is also responsible
// for starting and stopping our ticker.
func (o *Observation) monitor(shutdown shutdownFunc, quit chan struct{}) {
log.Debugf("Monitoring: %v", o)
o.Interval.Resume()
defer o.Interval.Stop()
for {
select {
case <-o.Interval.Ticks():
o.retryCheck(quit, shutdown)
// Exit if we receive the instruction to shutdown.
case <-quit:
return
}
}
}
// retryCheck calls a check function until it succeeds, or we reach our
// configured number of attempts, waiting for our back off period between failed
// calls. If we fail to obtain a passing health check after the allowed number
// of calls, we will request shutdown.
func (o *Observation) retryCheck(quit chan struct{}, shutdown shutdownFunc) {
var count int
for count < o.Attempts {
// Increment our call count and call the health check endpoint.
count++
// Wait for our check to return, timeout to elapse, or quit
// signal to be received.
var err error
select {
case err = <-o.Check():
case <-time.After(o.Timeout):
err = fmt.Errorf("health check: %v timed out after: "+
"%v", o, o.Timeout)
case <-quit:
return
}
// If our error is nil, we have passed our health check, so we
// can exit.
if err == nil {
return
}
// If we have reached our allowed number of attempts, this
// check has failed so we request shutdown.
if count == o.Attempts {
shutdown("Health check: %v failed after %v "+
"calls", o, o.Attempts)
return
}
log.Debugf("Health check: %v, call: %v failed with: %v, "+
"backing off for: %v", o, count, err, o.Backoff)
// If we are still within the number of calls allowed for this
// check, we wait for our back off period to elapse, or exit if
// we get the signal to shutdown.
select {
case <-time.After(o.Backoff):
case <-quit:
return
}
}
}

View File

@ -0,0 +1,225 @@
package healthcheck
import (
"errors"
"testing"
"time"
"github.com/lightningnetwork/lnd/ticker"
"github.com/stretchr/testify/require"
)
var (
errNonNil = errors.New("non-nil test error")
timeout = time.Second
testTime = time.Unix(1, 2)
)
type mockedCheck struct {
t *testing.T
errChan chan error
}
// newMockCheck creates a new mock.
func newMockCheck(t *testing.T) *mockedCheck {
return &mockedCheck{
t: t,
errChan: make(chan error),
}
}
// call returns our mock's error channel, which we can send responses on.
func (m *mockedCheck) call() chan error {
return m.errChan
}
// sendError sends an error into our mock's error channel, mocking the sending
// of a response from our check function.
func (m *mockedCheck) sendError(err error) {
select {
case m.errChan <- err:
case <-time.After(timeout):
m.t.Fatalf("could not send error: %v", err)
}
}
// TestMonitor tests creation and triggering of a monitor with a health check.
func TestMonitor(t *testing.T) {
intervalTicker := ticker.NewForce(time.Hour)
mock := newMockCheck(t)
shutdown := make(chan struct{})
// Create our config for monitoring. We will use a 0 back off so that
// out test does not need to wait.
cfg := &Config{
Checks: []*Observation{
{
Check: mock.call,
Interval: intervalTicker,
Attempts: 2,
Backoff: 0,
Timeout: time.Hour,
},
},
Shutdown: func(string, ...interface{}) {
shutdown <- struct{}{}
},
}
monitor := NewMonitor(cfg)
require.NoError(t, monitor.Start(), "could not start monitor")
// Tick is a helper we will use to tick our interval.
tick := func() {
select {
case intervalTicker.Force <- testTime:
case <-time.After(timeout):
t.Fatal("could not tick timer")
}
}
// Tick our timer and provide our error channel with a nil error. This
// mocks our check function succeeding on the first call.
tick()
mock.sendError(nil)
// Now we tick our timer again. This time send a non-nil error, followed
// by a nil error. This tests our retry logic, because we allow 2
// retries, so should recover without needing to shutdown.
tick()
mock.sendError(errNonNil)
mock.sendError(nil)
// Finally, we tick our timer once more, and send two non-nil errors
// into our error channel. This mocks our check function failing twice.
tick()
mock.sendError(errNonNil)
mock.sendError(errNonNil)
// Since we have failed within our allowed number of retries, we now
// expect a call to our shutdown function.
select {
case <-shutdown:
case <-time.After(timeout):
t.Fatal("expected shutdown")
}
require.NoError(t, monitor.Stop(), "could not stop monitor")
}
// TestRetryCheck tests our retry logic. It does not include a test for exiting
// during the back off period.
func TestRetryCheck(t *testing.T) {
tests := []struct {
name string
// errors provides an in-order list of errors that we expect our
// health check to respond with. The number of errors in this
// list indicates the number of times we expect our check to
// be called, because our test will fail if we do not consume
// every error.
errors []error
// attempts is the number of times we call a check before
// failing.
attempts int
// timeout is the time we allow our check to take before we
// fail them.
timeout time.Duration
// expectedShutdown is true if we expect a shutdown to be
// triggered because all of our calls failed.
expectedShutdown bool
}{
{
name: "first call succeeds",
errors: []error{nil},
attempts: 2,
timeout: time.Hour,
expectedShutdown: false,
},
{
name: "first call fails",
errors: []error{errNonNil},
attempts: 1,
timeout: time.Hour,
expectedShutdown: true,
},
{
name: "fail then recover",
errors: []error{errNonNil, nil},
attempts: 2,
timeout: time.Hour,
expectedShutdown: false,
},
{
name: "always fail",
errors: []error{errNonNil, errNonNil},
attempts: 2,
timeout: time.Hour,
expectedShutdown: true,
},
{
name: "no calls",
errors: nil,
attempts: 0,
timeout: time.Hour,
expectedShutdown: false,
},
{
name: "call times out",
errors: nil,
attempts: 1,
timeout: 1,
expectedShutdown: true,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
var shutdown bool
shutdownFunc := func(string, ...interface{}) {
shutdown = true
}
mock := newMockCheck(t)
// Create an observation that calls our call counting
// function. We set a zero back off so that the test
// will not wait.
observation := &Observation{
Check: mock.call,
Attempts: test.attempts,
Timeout: test.timeout,
Backoff: 0,
}
quit := make(chan struct{})
// Run our retry check in a goroutine because it blocks
// on us sending errors into the mocked caller's error
// channel.
done := make(chan struct{})
go func() {
observation.retryCheck(quit, shutdownFunc)
close(done)
}()
// Prompt our mock caller to send responses for calls
// to our call function.
for _, err := range test.errors {
mock.sendError(err)
}
// Make sure that we have finished running our retry
// check function before we start checking results.
<-done
require.Equal(t, test.expectedShutdown, shutdown,
"unexpected shutdown state")
})
}
}

32
healthcheck/log.go Normal file
View File

@ -0,0 +1,32 @@
package healthcheck
import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)
// Subsystem defines the logging code for this subsystem.
const Subsystem = "HLCK"
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger(Subsystem, nil))
}
// DisableLog disables all library log output. Logging output is disabled
// by default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}
// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}

65
lncfg/healthcheck.go Normal file
View File

@ -0,0 +1,65 @@
package lncfg
import (
"fmt"
"time"
)
var (
// MinHealthCheckInterval is the minimum interval we allow between
// health checks.
MinHealthCheckInterval = time.Minute
// MinHealthCheckTimeout is the minimum timeout we allow for health
// check calls.
MinHealthCheckTimeout = time.Second
// MinHealthCheckBackoff is the minimum back off we allow between health
// check retries.
MinHealthCheckBackoff = time.Second
)
// HealthCheckConfig contains the configuration for the different health checks
// the lnd runs.
type HealthCheckConfig struct {
ChainCheck *CheckConfig `group:"chainbackend" namespace:"chainbackend"`
}
// Validate checks the values configured for our health checks.
func (h *HealthCheckConfig) Validate() error {
return h.ChainCheck.validate("chain backend")
}
type CheckConfig struct {
Interval time.Duration `long:"interval" description:"How often to run a health check."`
Attempts int `long:"attempts" description:"The number of calls we will make for the check before failing. Set this value to 0 to disable a check."`
Timeout time.Duration `long:"timeout" description:"The amount of time we allow the health check to take before failing due to timeout."`
Backoff time.Duration `long:"backoff" description:"The amount of time to back-off between failed health checks."`
}
// validate checks the values in a health check config entry if it is enabled.
func (c *CheckConfig) validate(name string) error {
if c.Attempts == 0 {
return nil
}
if c.Backoff < MinHealthCheckBackoff {
return fmt.Errorf("%v backoff: %v below minimum: %v", name,
c.Backoff, MinHealthCheckBackoff)
}
if c.Timeout < MinHealthCheckTimeout {
return fmt.Errorf("%v timeout: %v below minimum: %v", name,
c.Timeout, MinHealthCheckTimeout)
}
if c.Interval < MinHealthCheckInterval {
return fmt.Errorf("%v interval: %v below minimum: %v", name,
c.Interval, MinHealthCheckInterval)
}
return nil
}

2
log.go
View File

@ -16,6 +16,7 @@ import (
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/healthcheck"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lnrpc/autopilotrpc"
@ -129,6 +130,7 @@ func SetupLoggers(root *build.RotatingLogWriter) {
AddSubLogger(root, routerrpc.Subsystem, routerrpc.UseLogger)
AddSubLogger(root, chanfitness.Subsystem, chanfitness.UseLogger)
AddSubLogger(root, verrpc.Subsystem, verrpc.UseLogger)
AddSubLogger(root, healthcheck.Subsystem, healthcheck.UseLogger)
}
// AddSubLogger is a helper method to conveniently create and register the

View File

@ -400,3 +400,21 @@ litecoin.node=ltcd
; sweep funds if a breach occurs while being offline. The fee rate should be
; specified in sat/byte, the default is 10 sat/byte.
; wtclient.sweep-fee-rate=10
[healthcheck]
; The number of times we should attempt to query our chain backend before
; gracefully shutting down. Set this value to 0 to disable this health check.
; healthcheck.chainbackend.attempts=3
; The amount of time we allow a call to our chain backend to take before we fail
; the attempt. This value must be >= 1s.
; healthcheck.chainbackend.timeout=10s
; The amount of time we should backoff between failed attempts to query chain
; backend. This value must be >= 1s.
; healthcheck.chainbackend.backoff=30s
; The amount of time we should wait between chain backend health checks. This
; value must be >= 1m.
; healthcheck.chainbackend.interval=1m

View File

@ -37,6 +37,7 @@ import (
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/feature"
"github.com/lightningnetwork/lnd/healthcheck"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/input"
@ -276,6 +277,9 @@ type server struct {
hostAnn *netann.HostAnnouncer
// livelinessMonitor monitors that lnd has access to critical resources.
livelinessMonitor *healthcheck.Monitor
quit chan struct{}
wg sync.WaitGroup
@ -1254,6 +1258,32 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
})
}
// Create a set of health checks using our configured values. If a
// health check has been disabled by setting attempts to 0, our monitor
// will not run it.
chainHealthCheck := healthcheck.NewObservation(
"chain backend",
func() error {
_, _, err := cc.chainIO.GetBestBlock()
return err
},
cfg.HealthChecks.ChainCheck.Interval,
cfg.HealthChecks.ChainCheck.Timeout,
cfg.HealthChecks.ChainCheck.Backoff,
cfg.HealthChecks.ChainCheck.Attempts,
)
// If we have not disabled all of our health checks, we create a
// liveliness monitor with our configured checks.
s.livelinessMonitor = healthcheck.NewMonitor(
&healthcheck.Config{
Checks: []*healthcheck.Observation{
chainHealthCheck,
},
Shutdown: srvrLog.Criticalf,
},
)
// Create the connection manager which will be responsible for
// maintaining persistent outbound connections and also accepting new
// incoming connections
@ -1304,6 +1334,13 @@ func (s *server) Start() error {
}
}
if s.livelinessMonitor != nil {
if err := s.livelinessMonitor.Start(); err != nil {
startErr = err
return
}
}
// Start the notification server. This is used so channel
// management goroutines can be notified when a funding
// transaction reaches a sufficient number of confirmations, or
@ -1535,6 +1572,13 @@ func (s *server) Stop() error {
}
}
if s.livelinessMonitor != nil {
if err := s.livelinessMonitor.Stop(); err != nil {
srvrLog.Warnf("unable to shutdown liveliness "+
"monitor: %v", err)
}
}
// Wait for all lingering goroutines to quit.
s.wg.Wait()