channeldb: add bucket for storing p2p counterparty meta-data

This commit adds a new bucket to the database which is dedicated to
storing data pertaining to p2p related reachability for direct channel
counter parties. The data stored in this new bucket can be used within
heuristics when deciding to unilaterally close a channel due to
inactivity. Additionally, all known reachable IP addresses for a
particular LinkNode are to be stored and updated within the database in
order to facilitate the establishment of persistent connections to
direct channel counter parties.
This commit is contained in:
Olaoluwa Osuntokun 2016-10-25 14:04:42 -07:00
parent d181aad8e2
commit d109bd9298
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
4 changed files with 393 additions and 2 deletions

@ -111,11 +111,11 @@ func createChannelDB(dbPath string) error {
return err
}
if _, err := tx.CreateBucket(channelLogBucket); err != nil {
if _, err := tx.CreateBucket(invoiceBucket); err != nil {
return err
}
if _, err := tx.CreateBucket(invoiceBucket); err != nil {
if _, err := tx.CreateBucket(nodeInfoBucket); err != nil {
return err
}

@ -12,4 +12,6 @@ var (
ErrInvoiceNotFound = fmt.Errorf("unable to locate invoice")
ErrNoInvoicesCreated = fmt.Errorf("there are no existing invoices")
ErrDuplicateInvoice = fmt.Errorf("invoice with payment hash already exists")
ErrNodeNotFound = fmt.Errorf("link node with target identity not found")
)

@ -1 +1,282 @@
package channeldb
import (
"bytes"
"fmt"
"io"
"net"
"time"
"github.com/boltdb/bolt"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/wire"
)
var (
// nodeInfoBucket stores meta-data pertaining to nodes that we've had
// direct channel-based correspondence with. This bucket allows one to
// query for all open channels pertaining to the node by exploring each
// node's sub-bucket within the openChanBucket.
nodeInfoBucket = []byte("nib")
// chanIndexBucket...
// * stores (chanPoint || nodePub) index
// * allows for prefix-ish scan to randomly get info for a channel
// * written to after FullSync
// * deleted from during channel close
// * used to fetch all open channels
chanIdentityBucket = []byte("chanidentity")
)
// LinkNode stores meta-data related to node's that we have/had a direct
// channel open with. Information such as the Bitcoin network the node
// advertised, and its identity public key are also stored. Additionally, this
// struct and the bucket its stored within have store data similar to that of
// Bitcion's addrmanager. The TCP address information stored within the struct
// can be used to establish persistent connections will all channel
// counter-parties on daemon startup.
//
// TODO(roasbeef): also add current OnionKey plus rotation schedule?
// TODO(roasbeef): add bitfield for supported services
// * possibly add a wire.NetAddress type, type
type LinkNode struct {
// Network indicates the Bitcoin network that the LinkNode advertises
// for incoming channel creation.
Network wire.BitcoinNet
// IdentityPub is the node's current identity public key. Any
// channel/topology related information received by this node MUST be
// signed by this public key.
IdentityPub *btcec.PublicKey
// LastSeen tracks the last time this node was seen within the network.
// A node should be marked as seen if the daemon either is able to
// establish an outgoing connection to the node or receives a new
// incoming connection from the node. This timestamp (stored in unix
// epoch) may be used within a heuristic which aims to determine when a
// channel should be unilaterally closed due to inactivity.
//
// TODO(roasbeef): replace with block hash/height?
LastSeen time.Time
// Addresses is a list of IP address in which either we were able to
// reach the node over in the past, OR we received an incoming
// authenticated connection for the stored identity public key.
//
// TODO(roasbeef): also need to support hidden service addrs
// * make map to easily remove addrs?
Addresses []*net.TCPAddr
db *DB
}
// NewLinkNode creates a new LinkNode from the provided parameters, which is
// backed by an instance of channeldb.
func (db *DB) NewLinkNode(bitNet wire.BitcoinNet, pub *btcec.PublicKey,
addr *net.TCPAddr) *LinkNode {
return &LinkNode{
Network: bitNet,
IdentityPub: pub,
LastSeen: time.Now(),
Addresses: []*net.TCPAddr{addr},
db: db,
}
}
// UpdateLastSeen updates the last time this node was directly encountered on
// the Lightning Network.
func (l *LinkNode) UpdateLastSeen(lastSeen time.Time) error {
l.LastSeen = lastSeen
return l.Sync()
}
// AddAddress appends the specified TCP address to the list of known addresses
// this node is/was known to be reachable at.
func (l *LinkNode) AddAddress(addr *net.TCPAddr) error {
l.Addresses = append(l.Addresses, addr)
return l.Sync()
}
// Sync performs a full database sync which writes the current up-to-date data
// within the struct to the database.
func (l *LinkNode) Sync() error {
// First serialize the LinkNode into its raw-bytes encoding. This is
// done outside of the transaction in order to minimize the length of
// the DB transaction.
var b bytes.Buffer
if err := serializeLinkNode(&b, l); err != nil {
return err
}
nodePub := l.IdentityPub.SerializeCompressed()
// Finally update the database by storing the link node and updating
// any relevant indexes.
return l.db.store.Update(func(tx *bolt.Tx) error {
nodeMetaBucket := tx.Bucket(nodeInfoBucket)
if nodeInfoBucket == nil {
return fmt.Errorf("node bucket not created")
}
return nodeMetaBucket.Put(nodePub, b.Bytes())
})
}
// FetchLinkNode attempts to lookup the data for a LinkNode based on a target
// identity public key. If a particular LinkNode for the passed identity public
// key cannot be found, then ErrNodeNotFound if returned.
func (db *DB) FetchLinkNode(identity *btcec.PublicKey) (*LinkNode, error) {
var (
node *LinkNode
err error
)
err = db.store.View(func(tx *bolt.Tx) error {
// First fetch the bucket for storing node meta-data, bailing
// out early if it hasn't been created yet.
nodeMetaBucket := tx.Bucket(nodeInfoBucket)
if nodeInfoBucket == nil {
return fmt.Errorf("node bucket not created")
}
// If a link node for that particular public key cannot be
// located, then exit early with a ErrNodeNotFound.
pubKey := identity.SerializeCompressed()
nodeBytes := nodeMetaBucket.Get(pubKey)
if nodeBytes == nil {
return ErrNodeNotFound
}
// Finally, decode an allocate a fresh LinkNode object to be
// returned to the caller.
nodeReader := bytes.NewReader(nodeBytes)
node, err = deserializeLinkNode(nodeReader)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return node, nil
}
// FetchAllLinkNodes attempts to fetch all active LinkNodes from the database.
func (db *DB) FetchAllLinkNodes() ([]*LinkNode, error) {
var linkNodes []*LinkNode
err := db.store.View(func(tx *bolt.Tx) error {
nodeMetaBucket := tx.Bucket(nodeInfoBucket)
if nodeInfoBucket == nil {
return fmt.Errorf("node bucket not created")
}
return nodeMetaBucket.ForEach(func(k, v []byte) error {
nodeReader := bytes.NewReader(v)
linkNode, err := deserializeLinkNode(nodeReader)
if err != nil {
return err
}
linkNodes = append(linkNodes, linkNode)
return nil
})
return nil
})
if err != nil {
return nil, err
}
return linkNodes, nil
}
func serializeLinkNode(w io.Writer, l *LinkNode) error {
var buf [8]byte
byteOrder.PutUint32(buf[:4], uint32(l.Network))
if _, err := w.Write(buf[:4]); err != nil {
return err
}
serializedID := l.IdentityPub.SerializeCompressed()
if _, err := w.Write(serializedID); err != nil {
return err
}
seenUnix := uint64(l.LastSeen.Unix())
byteOrder.PutUint64(buf[:], seenUnix)
if _, err := w.Write(buf[:]); err != nil {
return err
}
numAddrs := uint32(len(l.Addresses))
byteOrder.PutUint32(buf[:4], numAddrs)
if _, err := w.Write(buf[:4]); err != nil {
return err
}
for _, addr := range l.Addresses {
addrString := addr.String()
if err := wire.WriteVarString(w, 0, addrString); err != nil {
return err
}
}
return nil
}
func deserializeLinkNode(r io.Reader) (*LinkNode, error) {
var (
err error
buf [8]byte
)
node := &LinkNode{}
if _, err := io.ReadFull(r, buf[:4]); err != nil {
return nil, err
}
node.Network = wire.BitcoinNet(byteOrder.Uint32(buf[:4]))
var pub [33]byte
if _, err := io.ReadFull(r, pub[:]); err != nil {
return nil, err
}
node.IdentityPub, err = btcec.ParsePubKey(pub[:], btcec.S256())
if err != nil {
return nil, err
}
if _, err := io.ReadFull(r, buf[:]); err != nil {
return nil, err
}
node.LastSeen = time.Unix(int64(byteOrder.Uint64(buf[:])), 0)
if _, err := io.ReadFull(r, buf[:4]); err != nil {
return nil, err
}
numAddrs := byteOrder.Uint32(buf[:4])
node.Addresses = make([]*net.TCPAddr, numAddrs)
for i := uint32(0); i < numAddrs; i++ {
addrString, err := wire.ReadVarString(r, 0)
if err != nil {
return nil, err
}
addr, err := net.ResolveTCPAddr("tcp", addrString)
if err != nil {
return nil, err
}
node.Addresses[i] = addr
}
return node, nil
}

108
channeldb/nodes_test.go Normal file

@ -0,0 +1,108 @@
package channeldb
import (
"bytes"
"net"
"reflect"
"testing"
"time"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/wire"
)
func TestLinkNodeEncodeDecode(t *testing.T) {
cdb, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("uanble to make test database: %v", err)
}
defer cleanUp()
// First we'll create some initial data to use for populating our test
// LinkNode instances.
_, pub1 := btcec.PrivKeyFromBytes(btcec.S256(), key[:])
_, pub2 := btcec.PrivKeyFromBytes(btcec.S256(), rev[:])
addr1, err := net.ResolveTCPAddr("tcp", "10.0.0.1:9000")
if err != nil {
t.Fatalf("unable to create test addr: %v", err)
}
addr2, err := net.ResolveTCPAddr("tcp", "10.0.0.2:9000")
if err != nil {
t.Fatalf("unable to create test addr: %v", err)
}
// Create two fresh link node instances with the above dummy data, then
// fully sync both instances to disk.
node1 := cdb.NewLinkNode(wire.MainNet, pub1, addr1)
node2 := cdb.NewLinkNode(wire.TestNet3, pub2, addr2)
if err := node1.Sync(); err != nil {
t.Fatalf("unable to sync node: %v", err)
}
if err := node2.Sync(); err != nil {
t.Fatalf("unable to sync node: %v", err)
}
// Fetch all current link nodes from the database, they should exactly
// match the two created above.
originalNodes := []*LinkNode{node2, node1}
linkNodes, err := cdb.FetchAllLinkNodes()
if err != nil {
t.Fatalf("unable to fetch nodes: %v", err)
}
for i, node := range linkNodes {
if originalNodes[i].Network != node.Network {
t.Fatalf("node networks don't match: expected %v, got %v",
originalNodes[i].Network, node.Network)
}
originalPubkey := originalNodes[i].IdentityPub.SerializeCompressed()
dbPubkey := node.IdentityPub.SerializeCompressed()
if !bytes.Equal(originalPubkey, dbPubkey) {
t.Fatalf("node pubkeys don't match: expected %x, got %x",
originalPubkey, dbPubkey)
}
if originalNodes[i].LastSeen.Unix() != node.LastSeen.Unix() {
t.Fatalf("last seen timestamps don't match: expected %v got %v",
originalNodes[i].LastSeen.Unix(), node.LastSeen.Unix())
}
if !reflect.DeepEqual(originalNodes[i].Addresses,
node.Addresses) {
t.Fatalf("addresses don't match: expected %v, got %v",
originalNodes[i].Addresses, node.Addresses)
}
}
// Next, we'll excercise the methods to append additionall IP
// addresses, and also to update the last seen time.
if err := node1.UpdateLastSeen(time.Now()); err != nil {
t.Fatalf("unable to update last seen: %v", err)
}
if err := node1.AddAddress(addr2); err != nil {
t.Fatalf("unable to update addr: %v", err)
}
// Fetch the same node from the databse according to its public key.
node1DB, err := cdb.FetchLinkNode(pub1)
if err != nil {
t.Fatalf("unable to find node: %v", err)
}
// Both the last seen timestamp and the list of reachable addresses for
// the node should be updated.
if node1DB.LastSeen.Unix() != node1.LastSeen.Unix() {
t.Fatalf("last seen timestamps don't match: expected %v got %v",
node1.LastSeen.Unix(), node1DB.LastSeen.Unix())
}
if len(node1DB.Addresses) != 2 {
t.Fatalf("wrong length for node1 addrsses: expected %v, got %v",
2, len(node1DB.Addresses))
}
if node1DB.Addresses[0].String() != addr1.String() {
t.Fatalf("wrong address for node: expected %v, got %v",
addr1.String(), node1DB.Addresses[0].String())
}
if node1DB.Addresses[1].String() != addr2.String() {
t.Fatalf("wrong address for node: expected %v, got %v",
addr2.String(), node1DB.Addresses[1].String())
}
}