multi: make AddPreimage variadic, optimistically compute key

In this commit, we modify the WitnessCache's
AddPreimage method to accept a variadic number
of preimages. This enables callers to batch
preimage writes in performance critical areas
of the codebase, e.g. the htlcswitch.

Additionally, we lift the computation of the
witnesses' keys outside of the db transaction.
This saves us from having to do hashing inside
and blocking other callers, and limits extraneous
blocking at the call site.
This commit is contained in:
Conner Fromknecht 2019-02-19 17:05:04 -08:00
parent 81783a60dc
commit 30f61b7630
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
11 changed files with 95 additions and 42 deletions

View File

@ -70,12 +70,42 @@ func (d *DB) NewWitnessCache() *WitnessCache {
}
}
// AddWitness adds a new witness of wType to the witness cache. The type of the
// witness will be used to map the witness to the key that will be used to look
// it up.
// witnessEntry is a key-value struct that holds each key -> witness pair, used
// when inserting records into the cache.
type witnessEntry struct {
key []byte
witness []byte
}
// AddWitnesses adds a batch of new witnesses of wType to the witness cache. The
// type of the witness will be used to map each witness to the key that will be
// used to look it up. All witnesses should be of the same WitnessType.
//
// TODO(roasbeef): fake closure to map instead a constructor?
func (w *WitnessCache) AddWitness(wType WitnessType, witness []byte) error {
func (w *WitnessCache) AddWitnesses(wType WitnessType, witnesses ...[]byte) error {
// Optimistically compute the witness keys before attempting to start
// the db transaction.
entries := make([]witnessEntry, 0, len(witnesses))
for _, witness := range witnesses {
// Map each witness to its key by applying the appropriate
// transformation for the given witness type.
switch wType {
case Sha256HashWitness:
key := sha256.Sum256(witness)
entries = append(entries, witnessEntry{
key: key[:],
witness: witness,
})
default:
return ErrUnknownWitnessType
}
}
// Exit early if there are no witnesses to add.
if len(entries) == 0 {
return nil
}
return w.db.Batch(func(tx *bbolt.Tx) error {
witnessBucket, err := tx.CreateBucketIfNotExists(witnessBucketKey)
if err != nil {
@ -93,16 +123,14 @@ func (w *WitnessCache) AddWitness(wType WitnessType, witness []byte) error {
return err
}
// Now that we have the proper bucket for this witness, we'll map the
// witness type to the proper key.
var witnessKey []byte
switch wType {
case Sha256HashWitness:
key := sha256.Sum256(witness)
witnessKey = key[:]
for _, entry := range entries {
err = witnessTypeBucket.Put(entry.key, entry.witness)
if err != nil {
return err
}
}
return witnessTypeBucket.Put(witnessKey, witness)
return nil
})
}

View File

@ -25,7 +25,7 @@ func TestWitnessCacheRetrieval(t *testing.T) {
witnessKey := sha256.Sum256(witness)
// First, we'll attempt to add the witness to the database.
err = wCache.AddWitness(Sha256HashWitness, witness)
err = wCache.AddWitnesses(Sha256HashWitness, witness)
if err != nil {
t.Fatalf("unable to add witness: %v", err)
}
@ -59,13 +59,13 @@ func TestWitnessCacheDeletion(t *testing.T) {
// We'll start by adding two witnesses to the cache.
witness1 := rev[:]
witness1Key := sha256.Sum256(witness1)
if err := wCache.AddWitness(Sha256HashWitness, witness1); err != nil {
if err := wCache.AddWitnesses(Sha256HashWitness, witness1); err != nil {
t.Fatalf("unable to add witness: %v", err)
}
witness2 := key[:]
witness2Key := sha256.Sum256(witness2)
if err := wCache.AddWitness(Sha256HashWitness, witness2); err != nil {
if err := wCache.AddWitnesses(Sha256HashWitness, witness2); err != nil {
t.Fatalf("unable to add witness: %v", err)
}
@ -107,7 +107,7 @@ func TestWitnessCacheUnknownWitness(t *testing.T) {
// We'll attempt to add a new, undefined witness type to the database.
// We should get an error.
err = wCache.AddWitness(234, key[:])
err = wCache.AddWitnesses(234, key[:])
if err != ErrUnknownWitnessType {
t.Fatalf("expected ErrUnknownWitnessType, got %v", err)
}

View File

@ -64,8 +64,10 @@ type WitnessBeacon interface {
// True is returned for the second argument if the preimage is found.
LookupPreimage(payhash []byte) ([]byte, bool)
// AddPreImage adds a newly discovered preimage to the global cache.
AddPreimage(pre []byte) error
// AddPreimages adds a batch of newly discovered preimages to the global
// cache, and also signals any subscribers of the newly discovered
// witness.
AddPreimages(preimages ...[]byte) error
}
// ChannelArbitratorConfig contains all the functionality that the

View File

@ -79,7 +79,7 @@ func (h *htlcOutgoingContestResolver) Resolve() (ContractResolver, error) {
// With the preimage obtained, we can now add it to the global
// cache.
if err := h.PreimageDB.AddPreimage(preimage[:]); err != nil {
if err := h.PreimageDB.AddPreimages(preimage[:]); err != nil {
log.Errorf("%T(%v): unable to add witness to cache",
h, h.htlcResolution.ClaimOutpoint)
}

View File

@ -1417,7 +1417,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// any contested contracts watched by any on-chain arbitrators
// can now sweep this HTLC on-chain.
go func() {
err := l.cfg.PreimageCache.AddPreimage(pre[:])
err := l.cfg.PreimageCache.AddPreimages(pre[:])
if err != nil {
l.errorf("unable to add preimage=%x to "+
"cache", pre[:])

View File

@ -46,11 +46,13 @@ func (m *mockPreimageCache) LookupPreimage(hash []byte) ([]byte, bool) {
return p, ok
}
func (m *mockPreimageCache) AddPreimage(preimage []byte) error {
func (m *mockPreimageCache) AddPreimages(preimages ...[]byte) error {
m.Lock()
defer m.Unlock()
m.preimageMap[sha256.Sum256(preimage[:])] = preimage
for _, preimage := range preimages {
m.preimageMap[sha256.Sum256(preimage)] = preimage
}
return nil
}

View File

@ -584,7 +584,7 @@ func TestForceClose(t *testing.T) {
// Before we force close Alice's channel, we'll add the pre-image of
// Bob's HTLC to her preimage cache.
aliceChannel.pCache.AddPreimage(preimageBob[:])
aliceChannel.pCache.AddPreimages(preimageBob[:])
// With the cache populated, we'll now attempt the force close
// initiated by Alice.
@ -4953,7 +4953,7 @@ func TestChannelUnilateralCloseHtlcResolution(t *testing.T) {
// Now that Bob has force closed, we'll modify Alice's pre image cache
// such that she now gains the ability to also settle the incoming HTLC
// from Bob.
aliceChannel.pCache.AddPreimage(preimageBob[:])
aliceChannel.pCache.AddPreimages(preimageBob[:])
// We'll then use Bob's transaction to trigger a spend notification for
// Alice.

View File

@ -274,9 +274,13 @@ type PreimageCache interface {
// argument. Otherwise, it'll return false.
LookupPreimage(hash []byte) ([]byte, bool)
// AddPreimage attempts to add a new preimage to the global cache. If
// successful a nil error will be returned.
AddPreimage(preimage []byte) error
// AddPreimages adds a batch of newly discovered preimages to the global
// cache, and also signals any subscribers of the newly discovered
// witness.
//
// NOTE: The backing slice of MUST NOT be modified, otherwise the
// subscribers may be notified of the incorrect preimages.
AddPreimages(preimages ...[]byte) error
}
// WalletDriver represents a "driver" for a particular concrete

View File

@ -403,11 +403,13 @@ func (m *mockPreimageCache) LookupPreimage(hash []byte) ([]byte, bool) {
return p, ok
}
func (m *mockPreimageCache) AddPreimage(preimage []byte) error {
func (m *mockPreimageCache) AddPreimages(preimages ...[]byte) error {
m.Lock()
defer m.Unlock()
m.preimageMap[sha256.Sum256(preimage[:])] = preimage
for _, preimage := range preimages {
m.preimageMap[sha256.Sum256(preimage)] = preimage
}
return nil
}

View File

@ -317,11 +317,13 @@ func (m *mockPreimageCache) LookupPreimage(hash []byte) ([]byte, bool) {
return p, ok
}
func (m *mockPreimageCache) AddPreimage(preimage []byte) error {
func (m *mockPreimageCache) AddPreimages(preimages ...[]byte) error {
m.Lock()
defer m.Unlock()
m.preimageMap[sha256.Sum256(preimage[:])] = preimage
for _, preimage := range preimages {
m.preimageMap[sha256.Sum256(preimage)] = preimage
}
return nil
}

View File

@ -101,28 +101,41 @@ func (p *preimageBeacon) LookupPreimage(payHash []byte) ([]byte, bool) {
return preimage, true
}
// AddPreImage adds a newly discovered preimage to the global cache, and also
// signals any subscribers of the newly discovered witness.
func (p *preimageBeacon) AddPreimage(pre []byte) error {
p.Lock()
defer p.Unlock()
// AddPreimages adds a batch of newly discovered preimages to the global cache,
// and also signals any subscribers of the newly discovered witness.
func (p *preimageBeacon) AddPreimages(preimages ...[]byte) error {
// Exit early if no preimages are presented.
if len(preimages) == 0 {
return nil
}
srvrLog.Infof("Adding preimage=%x to witness cache", pre[:])
// Copy the preimages to ensure the backing area can't be modified by
// the caller when delivering notifications.
preimageCopies := make([][]byte, 0, len(preimages))
for _, preimage := range preimages {
srvrLog.Infof("Adding preimage=%x to witness cache", preimage)
preimageCopies = append(preimageCopies, preimage)
}
// First, we'll add the witness to the decaying witness cache.
err := p.wCache.AddWitness(channeldb.Sha256HashWitness, pre)
err := p.wCache.AddWitnesses(channeldb.Sha256HashWitness, preimages...)
if err != nil {
return err
}
p.Lock()
defer p.Unlock()
// With the preimage added to our state, we'll now send a new
// notification to all subscribers.
for _, client := range p.subscribers {
go func(c *preimageSubscriber) {
select {
case c.updateChan <- pre:
case <-c.quit:
return
for _, preimage := range preimageCopies {
select {
case c.updateChan <- preimage:
case <-c.quit:
return
}
}
}(client)
}