add lnAddr implementation to peer.go, finish peer draft

* With this commit, then initial draft of the peer struct is finished.
Items to still complete include the interaction between the peer and
internal wallet, version handshake, pings, etc.
This commit is contained in:
Olaoluwa Osuntokun 2016-01-16 19:03:03 -08:00
parent 6c51942b03
commit 4c8b10617a

166
peer.go

@ -2,14 +2,24 @@ package main
import (
"container/list"
"encoding/hex"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/btcsuite/btcutil"
"li.lan/labs/plasma/lnwallet"
"li.lan/labs/plasma/lnwire"
)
var (
numNodes int32
)
// channelState...
@ -27,8 +37,90 @@ const (
const (
numAllowedRetransmits = 5
pingInterval = 1 * time.Minute
outgoingQueueLen = 50
)
// lnAddr...
type lnAddr struct {
lnId [16]byte // redundant because adr contains it
pubKey *btcec.PublicKey
bitcoinAddr btcutil.Address
netAddr *net.TCPAddr
name string
endorsement []byte
}
// String...
func (l *lnAddr) String() string {
var encodedId []byte
if l.pubKey == nil {
encodedId = l.bitcoinAddr.ScriptAddress()
} else {
encodedId = l.pubKey.SerializeCompressed()
}
return fmt.Sprintf("%v@%v", encodedId, l.netAddr)
}
// newLnAddr...
func newLnAddr(encodedAddr string) (*lnAddr, error) {
// The format of an lnaddr is "<pubkey or pkh>@host"
idHost := strings.Split(encodedAddr, "@")
if len(idHost) != 2 {
return nil, fmt.Errorf("invalid format for lnaddr string: %v", encodedAddr)
}
// Attempt to resolve the IP address, this handles parsing IPv6 zones,
// and such.
fmt.Println("host: ", idHost[1])
ipAddr, err := net.ResolveTCPAddr("tcp", idHost[1])
if err != nil {
return nil, err
}
addr := &lnAddr{netAddr: ipAddr}
idLen := len(idHost[0])
switch {
// Is the ID a hex-encoded compressed public key?
case idLen > 65 && idLen < 69:
pubkeyBytes, err := hex.DecodeString(idHost[0])
if err != nil {
return nil, err
}
addr.pubKey, err = btcec.ParsePubKey(pubkeyBytes, btcec.S256())
if err != nil {
return nil, err
}
// got pubey, populate address from pubkey
pkh := btcutil.Hash160(addr.pubKey.SerializeCompressed())
addr.bitcoinAddr, err = btcutil.NewAddressPubKeyHash(pkh,
&chaincfg.TestNet3Params)
if err != nil {
return nil, err
}
// Is the ID a string encoded bitcoin address?
case idLen > 33 && idLen < 37:
addr.bitcoinAddr, err = btcutil.DecodeAddress(idHost[0],
&chaincfg.TestNet3Params)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("invalid address %s", idHost[0])
}
// Finally, populate the lnid from the address.
copy(addr.lnId[:], addr.bitcoinAddr.ScriptAddress())
return addr, nil
}
// outgoinMsg...
type outgoinMsg struct {
msg lnwire.Message
@ -36,26 +128,24 @@ type outgoinMsg struct {
}
// peer...
// TODO(roasbeef): make this a package now??
// inspired by btcd/peer.go
type peer struct {
// only to be used atomically
started int32
connected int32
disconnect int32 // only to be used atomically
// *ETcpConn or w/e it is in strux
disconnect int32
conn net.Conn
// TODO(rosabeef): one for now, may need more granularity
sync.RWMutex
addr string
lnID [32]byte // TODO(roasbeef): copy from strux
lightningAddr lnAddr
inbound bool
protocolVersion uint32
peerId int32
// For purposes of detecting retransmits, etc.
// lastNMessages map[lnwire.Message]struct{}
lastNMessages map[lnwire.Message]struct{}
sync.RWMutex
timeConnected time.Time
lastSend time.Time
lastRecv time.Time
@ -69,19 +159,65 @@ type peer struct {
outgoingQueue chan outgoinMsg
sendQueue chan outgoinMsg
// TODO(roasbeef): akward import, just rename to Wallet?
wallet *lnwallet.LightningWallet // (tadge: what is this for?)
// Only will be set if the channel is in the 'pending' state.
reservation *lnwallet.ChannelReservation
channel *lnwallet.LightningChannel // TODO(roasbeef): rename to PaymentChannel??
lnChannel *lnwallet.LightningChannel
queueQuit chan struct{}
quit chan struct{}
wg sync.WaitGroup
}
// newPeer...
func newPeer(conn net.Conn, server *server) *peer {
return &peer{
conn: conn,
peerId: atomic.AddInt32(&numNodes, 1),
lastNMessages: make(map[lnwire.Message]struct{}),
sendQueueSync: make(chan struct{}, 1),
sendQueue: make(chan outgoinMsg, 1),
outgoingQueue: make(chan outgoinMsg, outgoingQueueLen),
queueQuit: make(chan struct{}),
quit: make(chan struct{}),
}
}
func (p *peer) Start() error {
if atomic.AddInt32(&p.started, 1) != 1 {
return nil
}
// TODO(roasbeef): version handshake
p.wg.Add(3)
go p.inHandler()
go p.queueHandler()
go p.outHandler()
return nil
}
func (p *peer) Stop() error {
// If we're already disconnecting, just exit.
if atomic.AddInt32(&p.disconnect, 1) != 1 {
return nil
}
// Otherwise, close the connection if we're currently connected.
if atomic.LoadInt32(&p.connected) != 0 {
p.conn.Close()
}
// Signal all worker goroutines to gracefully exit.
close(p.quit)
return nil
}
// readNextMessage...
func (p *peer) readNextMessage() (lnwire.Message, []byte, error) {
// TODO(roasbeef): use our own net magic?