Merge pull request #2736 from cfromknecht/config-num-workers

lncfg: add CLI worker configuration
This commit is contained in:
Olaoluwa Osuntokun 2019-03-19 15:47:38 -07:00 committed by GitHub
commit 71161848aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 167 additions and 4 deletions

@ -252,6 +252,8 @@ type config struct {
net tor.Net
Routing *routing.Conf `group:"routing" namespace:"routing"`
Workers *lncfg.Workers `group:"workers" namespace:"workers"`
}
// loadConfig initializes and parses the config using a config file and command
@ -334,6 +336,11 @@ func loadConfig() (*config, error) {
Control: defaultTorControl,
},
net: &tor.ClearNet{},
Workers: &lncfg.Workers{
Read: lncfg.DefaultReadWorkers,
Write: lncfg.DefaultWriteWorkers,
Sig: lncfg.DefaultSigWorkers,
},
}
// Pre-parse the command line options to pick up an alternative config
@ -968,6 +975,12 @@ func loadConfig() (*config, error) {
"minbackoff")
}
// Assert that all worker pools will have a positive number of
// workers, otherwise the pools will rendered useless.
if err := cfg.Workers.Validate(); err != nil {
return nil, err
}
// Finally, ensure that the user's color is correctly formatted,
// otherwise the server will not be able to start after the unlocking
// the wallet.

49
lncfg/workers.go Normal file

@ -0,0 +1,49 @@
package lncfg
import "fmt"
const (
// DefaultReadWorkers is the default maximum number of concurrent
// workers used by the daemon's read pool.
DefaultReadWorkers = 16
// DefaultWriteWorkers is the default maximum number of concurrent
// workers used by the daemon's write pool.
DefaultWriteWorkers = 16
// DefaultSigWorkers is the default maximum number of concurrent workers
// used by the daemon's sig pool.
DefaultSigWorkers = 8
)
// Workers exposes CLI configuration for turning resources consumed by worker
// pools.
type Workers struct {
// Read is the maximum number of concurrent read pool workers.
Read int `long:"read" description:"Maximum number of concurrent read pool workers."`
// Write is the maximum number of concurrent write pool workers.
Write int `long:"write" description:"Maximum number of concurrent write pool workers."`
// Sig is the maximum number of concurrent sig pool workers.
Sig int `long:"sig" description:"Maximum number of concurrent sig pool workers."`
}
// Validate checks the Workers configuration to ensure that the input values are
// sane.
func (w *Workers) Validate() error {
if w.Read <= 0 {
return fmt.Errorf("number of read workers (%d) must be "+
"positive", w.Read)
}
if w.Write <= 0 {
return fmt.Errorf("number of write workers (%d) must be "+
"positive", w.Write)
}
if w.Sig <= 0 {
return fmt.Errorf("number of sig workers (%d) must be "+
"positive", w.Sig)
}
return nil
}

102
lncfg/workers_test.go Normal file

@ -0,0 +1,102 @@
package lncfg_test
import (
"testing"
"github.com/lightningnetwork/lnd/lncfg"
)
const (
maxUint = ^uint(0)
maxInt = int(maxUint >> 1)
minInt = -maxInt - 1
)
// TestValidateWorkers asserts that validating the Workers config only succeeds
// if all fields specify a positive number of workers.
func TestValidateWorkers(t *testing.T) {
tests := []struct {
name string
cfg *lncfg.Workers
valid bool
}{
{
name: "min valid",
cfg: &lncfg.Workers{
Read: 1,
Write: 1,
Sig: 1,
},
valid: true,
},
{
name: "max valid",
cfg: &lncfg.Workers{
Read: maxInt,
Write: maxInt,
Sig: maxInt,
},
valid: true,
},
{
name: "read max invalid",
cfg: &lncfg.Workers{
Read: 0,
Write: 1,
Sig: 1,
},
},
{
name: "write max invalid",
cfg: &lncfg.Workers{
Read: 1,
Write: 0,
Sig: 1,
},
},
{
name: "sig max invalid",
cfg: &lncfg.Workers{
Read: 1,
Write: 1,
Sig: 0,
},
},
{
name: "read min invalid",
cfg: &lncfg.Workers{
Read: minInt,
Write: 1,
Sig: 1,
},
},
{
name: "write min invalid",
cfg: &lncfg.Workers{
Read: 1,
Write: minInt,
Sig: 1,
},
},
{
name: "sig min invalid",
cfg: &lncfg.Workers{
Read: 1,
Write: 1,
Sig: minInt,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
err := test.cfg.Validate()
switch {
case test.valid && err != nil:
t.Fatalf("valid config was invalid: %v", err)
case !test.valid && err == nil:
t.Fatalf("invalid config was valid")
}
})
}
}

@ -11,7 +11,6 @@ import (
"net"
"path/filepath"
"regexp"
"runtime"
"strconv"
"sync"
"sync/atomic"
@ -273,7 +272,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
)
writePool := pool.NewWrite(
writeBufferPool, runtime.NumCPU(), pool.DefaultWorkerTimeout,
writeBufferPool, cfg.Workers.Write, pool.DefaultWorkerTimeout,
)
readBufferPool := pool.NewReadBuffer(
@ -282,7 +281,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
)
readPool := pool.NewRead(
readBufferPool, runtime.NumCPU(), pool.DefaultWorkerTimeout,
readBufferPool, cfg.Workers.Read, pool.DefaultWorkerTimeout,
)
decodeFinalCltvExpiry := func(payReq string) (uint32, error) {
@ -296,7 +295,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
s := &server{
chanDB: chanDB,
cc: cc,
sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer),
sigPool: lnwallet.NewSigPool(cfg.Workers.Sig, cc.signer),
writePool: writePool,
readPool: readPool,