channeldb: create new channeldb package, update lnwallet to use new API

* Initial draft of brain dump of chandler. Nothing yet set in stone.
* Will most likely move the storage of all structs to a more “column”
oriented approach. Such that, small updates like incrementing the total
satoshi sent don’t result in the entire struct being serialized and
written.
* Some skeleton structs for other possible data we might want to store
are also included.
* Seem valuable to record as much data as possible for record keeping,
visualization, debugging, etc. Will need to set up a time+space+dirty
cache to ensure performance isn’t impacted too much.
This commit is contained in:
Olaoluwa Osuntokun 2015-12-26 12:35:15 -06:00
parent d7a1c5d337
commit 4fdb2763e6
13 changed files with 194 additions and 131 deletions

@ -1,4 +1,4 @@
package lnwallet
package channeldb
import (
"bytes"
@ -7,137 +7,38 @@ import (
"io"
"time"
"li.lan/labs/plasma/shachain"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/walletdb"
"li.lan/labs/plasma/shachain"
)
var (
// Namespace bucket keys.
lightningNamespaceKey = []byte("ln-wallet")
waddrmgrNamespaceKey = []byte("waddrmgr")
wtxmgrNamespaceKey = []byte("wtxmgr")
openChannelBucket = []byte("o")
closedChannelBucket = []byte("c")
activeChanKey = []byte("a")
endian = binary.BigEndian
// TODO(roasbeef): replace w/ tesnet-L also revisit dependancy...
ActiveNetParams = &chaincfg.TestNet3Params
)
// ChannelDB...
// TODO(roasbeef): CHECKSUMS, REDUNDANCY, etc etc.
type ChannelDB struct {
// TODO(roasbeef): caching, etc?
addrmgr *waddrmgr.Manager
namespace walletdb.Namespace
// Payment...
type Payment struct {
// r [32]byte
// path *Route
}
// PutOpenChannel...
func (c *ChannelDB) PutOpenChannel(channel *OpenChannelState) error {
return c.namespace.Update(func(tx walletdb.Tx) error {
// Get the bucket dedicated to storing the meta-data for open
// channels.
rootBucket := tx.RootBucket()
openChanBucket, err := rootBucket.CreateBucketIfNotExists(openChannelBucket)
if err != nil {
return err
// ClosedChannel...
type ClosedChannel struct {
}
return dbPutOpenChannel(openChanBucket, channel, c.addrmgr)
})
}
// GetOpenChannel...
// TODO(roasbeef): assumes only 1 active channel per-node
func (c *ChannelDB) FetchOpenChannel(nodeID [32]byte) (*OpenChannelState, error) {
var channel *OpenChannelState
dbErr := c.namespace.View(func(tx walletdb.Tx) error {
// Get the bucket dedicated to storing the meta-data for open
// channels.
rootBucket := tx.RootBucket()
openChanBucket := rootBucket.Bucket(openChannelBucket)
if openChannelBucket == nil {
return fmt.Errorf("open channel bucket does not exist")
}
oChannel, err := dbGetOpenChannel(openChanBucket, nodeID, c.addrmgr)
if err != nil {
return err
}
channel = oChannel
return nil
})
return channel, dbErr
}
// dbPutChannel...
func dbPutOpenChannel(activeChanBucket walletdb.Bucket, channel *OpenChannelState,
addrmgr *waddrmgr.Manager) error {
// Generate a serialized version of the open channel. The addrmgr is
// required in order to encrypt densitive data.
var b bytes.Buffer
if err := channel.Encode(&b, addrmgr); err != nil {
return err
}
// Grab the bucket dedicated to storing data related to this particular
// node.
nodeBucket, err := activeChanBucket.CreateBucketIfNotExists(channel.TheirLNID[:])
if err != nil {
return err
}
return nodeBucket.Put(activeChanKey, b.Bytes())
}
// dbPutChannel...
func dbGetOpenChannel(bucket walletdb.Bucket, nodeID [32]byte,
addrmgr *waddrmgr.Manager) (*OpenChannelState, error) {
// Grab the bucket dedicated to storing data related to this particular
// node.
nodeBucket := bucket.Bucket(nodeID[:])
if nodeBucket == nil {
return nil, fmt.Errorf("channel bucket for node does not exist")
}
serializedChannel := nodeBucket.Get(activeChanKey)
if serializedChannel == nil {
// TODO(roasbeef): make proper in error.go
return nil, fmt.Errorf("node has no open channels")
}
// Decode the serialized channel state, using the addrmgr to decrypt
// sensitive information.
channel := &OpenChannelState{}
reader := bytes.NewReader(serializedChannel)
if err := channel.Decode(reader, addrmgr); err != nil {
return nil, err
}
return channel, nil
}
// NewChannelDB...
// TODO(roasbeef): re-visit this dependancy...
func NewChannelDB(addrmgr *waddrmgr.Manager, namespace walletdb.Namespace) *ChannelDB {
// TODO(roasbeef): create buckets if not created?
return &ChannelDB{addrmgr, namespace}
}
// OpenChannelState...
// OpenChannel...
// TODO(roasbeef): store only the essentials? optimize space...
// TODO(roasbeef): switch to "column store"
type OpenChannelState struct {
type OpenChannel struct {
// Hash? or Their current pubKey?
// TODO(roasbeef): switch to Tadge's LNId
TheirLNID [wire.HashSize]byte
@ -194,9 +95,99 @@ type OpenChannelState struct {
CreationTime time.Time
}
// PutOpenChannel...
func (c *DB) PutOpenChannel(channel *OpenChannel) error {
return c.namespace.Update(func(tx walletdb.Tx) error {
// Get the bucket dedicated to storing the meta-data for open
// channels.
rootBucket := tx.RootBucket()
openChanBucket, err := rootBucket.CreateBucketIfNotExists(openChannelBucket)
if err != nil {
return err
}
return putOpenChannel(openChanBucket, channel, c.addrmgr)
})
}
// GetOpenChannel...
// TODO(roasbeef): assumes only 1 active channel per-node
func (c *DB) FetchOpenChannel(nodeID [32]byte) (*OpenChannel, error) {
var channel *OpenChannel
err := c.namespace.View(func(tx walletdb.Tx) error {
// Get the bucket dedicated to storing the meta-data for open
// channels.
rootBucket := tx.RootBucket()
openChanBucket := rootBucket.Bucket(openChannelBucket)
if openChannelBucket == nil {
return fmt.Errorf("open channel bucket does not exist")
}
oChannel, err := fetchOpenChannel(openChanBucket, nodeID,
c.addrmgr)
if err != nil {
return err
}
channel = oChannel
return nil
})
return channel, err
}
// putChannel...
func putOpenChannel(activeChanBucket walletdb.Bucket, channel *OpenChannel,
addrmgr *waddrmgr.Manager) error {
// Generate a serialized version of the open channel. The addrmgr is
// required in order to encrypt densitive data.
var b bytes.Buffer
if err := channel.Encode(&b, addrmgr); err != nil {
return err
}
// Grab the bucket dedicated to storing data related to this particular
// node.
nodeBucket, err := activeChanBucket.CreateBucketIfNotExists(channel.TheirLNID[:])
if err != nil {
return err
}
return nodeBucket.Put(activeChanKey, b.Bytes())
}
// fetchOpenChannel
func fetchOpenChannel(bucket walletdb.Bucket, nodeID [32]byte,
addrmgr *waddrmgr.Manager) (*OpenChannel, error) {
// Grab the bucket dedicated to storing data related to this particular
// node.
nodeBucket := bucket.Bucket(nodeID[:])
if nodeBucket == nil {
return nil, fmt.Errorf("channel bucket for node does not exist")
}
serializedChannel := nodeBucket.Get(activeChanKey)
if serializedChannel == nil {
// TODO(roasbeef): make proper in error.go
return nil, fmt.Errorf("node has no open channels")
}
// Decode the serialized channel state, using the addrmgr to decrypt
// sensitive information.
channel := &OpenChannel{}
reader := bytes.NewReader(serializedChannel)
if err := channel.Decode(reader, addrmgr); err != nil {
return nil, err
}
return channel, nil
}
// Encode...
// TODO(roasbeef): checksum
func (o *OpenChannelState) Encode(b io.Writer, addrManager *waddrmgr.Manager) error {
func (o *OpenChannel) Encode(b io.Writer, addrManager *waddrmgr.Manager) error {
if _, err := b.Write(o.TheirLNID[:]); err != nil {
return err
}
@ -289,7 +280,7 @@ func (o *OpenChannelState) Encode(b io.Writer, addrManager *waddrmgr.Manager) er
}
// Decode...
func (o *OpenChannelState) Decode(b io.Reader, addrManager *waddrmgr.Manager) error {
func (o *OpenChannel) Decode(b io.Reader, addrManager *waddrmgr.Manager) error {
var scratch [8]byte
if _, err := b.Read(o.TheirLNID[:]); err != nil {

@ -1,7 +1,8 @@
package lnwallet
package channeldb
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"path/filepath"
@ -14,6 +15,7 @@ import (
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/walletdb"
_ "github.com/btcsuite/btcwallet/walletdb/bdb"
)
var (
@ -69,10 +71,11 @@ var (
func createDbNamespace(dbPath string) (walletdb.DB, walletdb.Namespace, error) {
db, err := walletdb.Create("bdb", dbPath)
if err != nil {
fmt.Println("fuk")
return nil, nil, err
}
namespace, err := db.Namespace(waddrmgrNamespaceKey)
namespace, err := db.Namespace([]byte("waddr"))
if err != nil {
db.Close()
return nil, nil, err
@ -115,11 +118,11 @@ func createTestManager(t *testing.T) (tearDownFunc func(), mgr *waddrmgr.Manager
return tearDownFunc, mgr
}
func TestOpenChannelStateEncodeDecode(t *testing.T) {
func TestOpenChannelEncodeDecode(t *testing.T) {
teardown, manager := createTestManager(t)
defer teardown()
privKey, pubKey := btcec.PrivKeyFromBytes(btcec.S256(), testWalletPrivKey)
privKey, pubKey := btcec.PrivKeyFromBytes(btcec.S256(), key[:])
addr, err := btcutil.NewAddressPubKey(pubKey.SerializeCompressed(), ActiveNetParams)
if err != nil {
t.Fatalf("unable to create delivery address")
@ -130,7 +133,7 @@ func TestOpenChannelStateEncodeDecode(t *testing.T) {
t.Fatalf("unable to create redeemScript")
}
state := OpenChannelState{
state := OpenChannel{
TheirLNID: id,
ChanID: id,
MinFeePerKb: btcutil.Amount(5000),
@ -161,7 +164,7 @@ func TestOpenChannelStateEncodeDecode(t *testing.T) {
}
reader := bytes.NewReader(b.Bytes())
newState := &OpenChannelState{}
newState := &OpenChannel{}
if err := newState.Decode(reader, manager); err != nil {
t.Fatalf("unable to decode channel state: %v", err)
}
@ -274,5 +277,5 @@ func TestOpenChannelStateEncodeDecode(t *testing.T) {
}
}
func TestOpenChannelStateEncodeDecodeCorruption(t *testing.T) {
func TestOpenChannelEncodeDecodeCorruption(t *testing.T) {
}

54
channeldb/db.go Normal file

@ -0,0 +1,54 @@
package channeldb
import (
"bytes"
"encoding/binary"
"sync"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/walletdb"
)
var (
endian = binary.BigEndian
)
var bufPool = &sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}
// Store...
// TODO(roasbeef): CHECKSUMS, REDUNDANCY, etc etc.
type DB struct {
// TODO(roasbeef): caching, etc?
addrmgr *waddrmgr.Manager
namespace walletdb.Namespace
}
// Wipe...
func (d *DB) Wipe() error {
return d.namespace.Update(func(tx walletdb.Tx) error {
rootBucket := tx.RootBucket()
// TODO(roasbeef): other buckets
return rootBucket.DeleteBucket(openChannelBucket)
})
}
// New...
// TODO(roasbeef): re-visit this dependancy...
func New(addrmgr *waddrmgr.Manager, namespace walletdb.Namespace) *DB {
// TODO(roasbeef): create buckets if not created?
return &DB{addrmgr, namespace}
}
// Open...
// TODO(roasbeef): create+open, ditch New, fixes above
func Open() *DB {
return nil
}
// Create...
func Create() *DB {
return nil
}

1
channeldb/doc.go Normal file

@ -0,0 +1 @@
package channeldb

1
channeldb/error.go Normal file

@ -0,0 +1 @@
package channeldb

1
channeldb/fees.go Normal file

@ -0,0 +1 @@
package channeldb

1
channeldb/log.go Normal file

@ -0,0 +1 @@
package channeldb

1
channeldb/nodes.go Normal file

@ -0,0 +1 @@
package channeldb

1
channeldb/route.go Normal file

@ -0,0 +1 @@
package channeldb

@ -27,11 +27,11 @@ type LightningChannel struct {
// commitment update, plus some other meta-data...Or just use OP_RETURN
// to help out?
// currently going for: nSequence/nLockTime overloading
channelDB *ChannelDB
channelDB *channeldb.DB
// stateMtx protects concurrent access to the state struct.
stateMtx sync.RWMutex
channelState OpenChannelState
channelState channeldb.OpenChannel
// TODO(roasbeef): create and embed 'Service' interface w/ below?
started int32
@ -44,6 +44,7 @@ type LightningChannel struct {
// newLightningChannel...
func newLightningChannel(wallet *LightningWallet, events *chainntnfs.ChainNotifier,
chanDB *ChannelDB, state OpenChannelState) (*LightningChannel, error) {
chanDB *channeldb.DB, state channeldb.OpenChannel) (*LightningChannel, error) {
return &LightningChannel{
wallet: wallet,

@ -3,6 +3,8 @@ package lnwallet
import (
"sync"
"li.lan/labs/plasma/channeldb"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
@ -64,7 +66,7 @@ type ChannelReservation struct {
ourContribution *ChannelContribution
theirContribution *ChannelContribution
partialState *OpenChannelState
partialState *channeldb.OpenChannel
// The ID of this reservation, used to uniquely track the reservation
// throughout its lifetime.
@ -90,7 +92,7 @@ func newChannelReservation(t FundingType, fundingAmt btcutil.Amount,
theirContribution: &ChannelContribution{
FundingAmount: fundingAmt,
},
partialState: &OpenChannelState{
partialState: &channeldb.OpenChannel{
// TODO(roasbeef): assumes balanced symmetric channels.
Capacity: fundingAmt * 2,
OurBalance: fundingAmt,

@ -1,6 +1,7 @@
package lnwallet
import (
"encoding/binary"
"errors"
"fmt"
"math"
@ -9,6 +10,7 @@ import (
"sync"
"sync/atomic"
"li.lan/labs/plasma/channeldb"
"li.lan/labs/plasma/shachain"
"github.com/btcsuite/btcd/btcec"
@ -34,7 +36,14 @@ var (
"create funding transaction")
// Which bitcoin network are we using?
// TODO(roasbeef): config
ActiveNetParams = &chaincfg.TestNet3Params
// Namespace bucket keys.
lightningNamespaceKey = []byte("ln-wallet")
waddrmgrNamespaceKey = []byte("waddrmgr")
wtxmgrNamespaceKey = []byte("wtxmgr")
endian = binary.BigEndian
)
type FundingType uint16
@ -137,7 +146,7 @@ type LightningWallet struct {
// A wrapper around a namespace within boltdb reserved for ln-based
// wallet meta-data.
channelDB *ChannelDB
channelDB *channeldb.DB
wallet *btcwallet.Wallet
rpc *chain.Client
@ -210,7 +219,7 @@ func NewLightningWallet(privWalletPass, pubWalletPass, hdSeed []byte, dataDir st
return &LightningWallet{
DB: db,
wallet: wallet,
channelDB: NewChannelDB(wallet.Manager, lnNamespace),
channelDB: channeldb.New(wallet.Manager, lnNamespace),
msgChan: make(chan interface{}, msgBufferSize),
// TODO(roasbeef): make this atomic.Uint32 instead? Which is
// faster, locks or CAS? I'm guessing CAS because assembly:

@ -591,10 +591,7 @@ func clearWalletState(w *LightningWallet) error {
w.nextFundingID = 0
w.fundingLimbo = make(map[uint64]*ChannelReservation)
w.wallet.ResetLockedOutpoints()
return w.channelDB.namespace.Update(func(tx walletdb.Tx) error {
rootBucket := tx.RootBucket()
return rootBucket.DeleteBucket(openChannelBucket)
})
return w.channelDB.Wipe()
}
// TODO(roasbeef): why is wallet so slow to create+open?