From a3eee453c397112a2ebc24f296e660b923ef6182 Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Fri, 5 May 2017 19:43:11 +0300 Subject: [PATCH] channeldb: add waiting proof storage In this commit boltdb persistent storage have been added, which allow as to not loose announcement proofs inside gossiper subsystem. --- channeldb/waitingproof.go | 245 +++++++++++++++++++++++++++++++++ channeldb/waitingproof_test.go | 57 ++++++++ 2 files changed, 302 insertions(+) create mode 100644 channeldb/waitingproof.go create mode 100644 channeldb/waitingproof_test.go diff --git a/channeldb/waitingproof.go b/channeldb/waitingproof.go new file mode 100644 index 00000000..083c131a --- /dev/null +++ b/channeldb/waitingproof.go @@ -0,0 +1,245 @@ +package channeldb + +import ( + "encoding/binary" + + "io" + + "bytes" + + "github.com/boltdb/bolt" + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnwire" +) + +var ( + // waitingProofsBucketKey byte string name of the waiting proofs store. + waitingProofsBucketKey = []byte("waitingproofs") + + // ErrWaitingProofNotFound is returned if waiting proofs haven't been + // found by db. + ErrWaitingProofNotFound = errors.New("waiting proofs haven't been " + + "found") + + // ErrWaitingProofAlreadyExist is returned if waiting proofs haven't been + // found by db. + ErrWaitingProofAlreadyExist = errors.New("waiting proof with such " + + "key already exist") +) + +// WaitingProofStore is the bold db map-like storage for half announcement +// signatures. The one responsibility of this storage is to be able to +// retrieve waiting proofs after client restart. +type WaitingProofStore struct { + // cache is used in order to reduce the number of redundant get + // calls, when object isn't stored in it. + cache map[WaitingProofKey]struct{} + db *DB +} + +// NewWaitingProofStore creates new instance of proofs storage. +func NewWaitingProofStore(db *DB) (*WaitingProofStore, error) { + s := &WaitingProofStore{ + db: db, + cache: make(map[WaitingProofKey]struct{}), + } + + if err := s.ForAll(func(proof *WaitingProof) error { + s.cache[proof.Key()] = struct{}{} + return nil + }); err != nil && err != ErrWaitingProofNotFound { + return nil, err + } + + return s, nil +} + +// Add adds new waiting proof in the storage. +func (s *WaitingProofStore) Add(proof *WaitingProof) error { + if _, ok := s.cache[proof.Key()]; ok { + return ErrWaitingProofAlreadyExist + } + + return s.db.Batch(func(tx *bolt.Tx) error { + var err error + var b bytes.Buffer + + // Get or create the bucket. + bucket, err := tx.CreateBucketIfNotExists(waitingProofsBucketKey) + if err != nil { + return err + } + + // Encode the objects and place it in the bucket. + if err := proof.Encode(&b); err != nil { + return err + } + + key := proof.Key() + if err := bucket.Put(key[:], b.Bytes()); err != nil { + return err + } + + s.cache[proof.Key()] = struct{}{} + return nil + }) +} + +// Remove removes the proof from storage by its key. +func (s *WaitingProofStore) Remove(key WaitingProofKey) error { + if _, ok := s.cache[key]; !ok { + return ErrWaitingProofNotFound + } + + return s.db.Batch(func(tx *bolt.Tx) error { + // Get or create the top bucket. + bucket := tx.Bucket(waitingProofsBucketKey) + if bucket == nil { + return ErrWaitingProofNotFound + } + + if err := bucket.Delete(key[:]); err != nil { + return err + } + + delete(s.cache, key) + return nil + }) +} + +// ForAll iterates thought all waiting proofs and passing the waiting proof +// in the given callback. +func (s *WaitingProofStore) ForAll(cb func(*WaitingProof) error) error { + return s.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(waitingProofsBucketKey) + if bucket == nil { + return ErrWaitingProofNotFound + } + + // Iterate over objects buckets. + return bucket.ForEach(func(k, v []byte) error { + // Skip buckets fields. + if v == nil { + return nil + } + + r := bytes.NewReader(v) + proof := &WaitingProof{} + if err := proof.Decode(r); err != nil { + return err + } + + return cb(proof) + }) + }) +} + +// Get returns the object which corresponds to the given index. +func (s *WaitingProofStore) Get(key WaitingProofKey) (*WaitingProof, error) { + proof := &WaitingProof{} + + if _, ok := s.cache[key]; !ok { + return nil, ErrWaitingProofNotFound + } + + err := s.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(waitingProofsBucketKey) + if bucket == nil { + return ErrWaitingProofNotFound + } + + // Iterate over objects buckets. + v := bucket.Get(key[:]) + if v == nil { + return ErrWaitingProofNotFound + } + + r := bytes.NewReader(v) + return proof.Decode(r) + }) + + return proof, err +} + +// WaitingProofKey is the proof key which uniquely identifies the waiting +// proof object. The goal of this key is distinguish the local and remote +// proof for the same channel id. +type WaitingProofKey [9]byte + +// WaitingProof is the storable object, which encapsulate the half proof and +// the information about from which side this proof came. This structure is +// needed to make channel proof exchange persistent, so that after client +// restart we may receive remote/local half proof and process it. +type WaitingProof struct { + *lnwire.AnnounceSignatures + isRemote bool +} + +// NewWaitingProof constructs a new waiting prof instance. +func NewWaitingProof(isRemote bool, proof *lnwire.AnnounceSignatures) *WaitingProof { + return &WaitingProof{ + AnnounceSignatures: proof, + isRemote: isRemote, + } +} + +// OppositeKey returns the key which uniquely identifies opposite waiting proof. +func (p *WaitingProof) OppositeKey() WaitingProofKey { + var key [9]byte + binary.BigEndian.PutUint64(key[:8], p.ShortChannelID.ToUint64()) + + if !p.isRemote { + key[8] = 1 + } + return key +} + +// Key returns the key which uniquely identifies waiting proof. +func (p *WaitingProof) Key() WaitingProofKey { + var key [9]byte + binary.BigEndian.PutUint64(key[:8], p.ShortChannelID.ToUint64()) + + if p.isRemote { + key[8] = 1 + } + return key +} + +// Encode writes the internal representation of waiting proof in byte stream. +func (p *WaitingProof) Encode(w io.Writer) error { + var b [1]byte + if p.isRemote { + b[0] = 1 + } + + if _, err := w.Write(b[:]); err != nil { + return err + } + + if err := p.AnnounceSignatures.Encode(w, 0); err != nil { + return err + } + + return nil +} + +// Decode reads the data from the byte stream and initialize the +// waiting proof object with it. +func (p *WaitingProof) Decode(r io.Reader) error { + var b [1]byte + if _, err := r.Read(b[:]); err != nil { + return err + } + + if b[0] == 1 { + (*p).isRemote = true + } + + msg := &lnwire.AnnounceSignatures{} + if err := msg.Decode(r, 0); err != nil { + return err + } + + (*p).AnnounceSignatures = msg + return nil +} diff --git a/channeldb/waitingproof_test.go b/channeldb/waitingproof_test.go new file mode 100644 index 00000000..dd5f07eb --- /dev/null +++ b/channeldb/waitingproof_test.go @@ -0,0 +1,57 @@ +package channeldb + +import ( + "testing" + + "reflect" + + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnwire" +) + +// TestWaitingProofStore tests add/get/remove functions of the waiting proof +// storage. +func TestWaitingProofStore(t *testing.T) { + db, cleanup, err := makeTestDB() + if err != nil { + + } + defer cleanup() + + proof1 := NewWaitingProof(true, &lnwire.AnnounceSignatures{ + NodeSignature: testSig, + BitcoinSignature: testSig, + }) + + store, err := NewWaitingProofStore(db) + if err != nil { + t.Fatalf("unable to create the waiting proofs storage: %v", + err) + } + + if err := store.Add(proof1); err != nil { + t.Fatalf("unable add proof to storage: %v", err) + } + + proof2, err := store.Get(proof1.Key()) + if err != nil { + t.Fatalf("unable retrieve proof from storage: %v", err) + } + if !reflect.DeepEqual(proof1, proof2) { + t.Fatal("wrong proof retrieved") + } + + if _, err := store.Get(proof1.OppositeKey()); err != ErrWaitingProofNotFound { + t.Fatalf("proof shouldn't be found: %v", err) + } + + if err := store.Remove(proof1.Key()); err != nil { + t.Fatalf("unable remove proof from storage: %v", err) + } + + if err := store.ForAll(func(proof *WaitingProof) error { + return errors.New("storage should be empty") + }); err != nil && err != ErrWaitingProofNotFound { + t.Fatal(err) + } +}