diff --git a/peer.go b/peer.go index e112eda5..c26abe53 100644 --- a/peer.go +++ b/peer.go @@ -1,11 +1,15 @@ package main import ( + "container/list" "net" "sync" + "sync/atomic" "time" + "github.com/btcsuite/btcd/wire" "li.lan/labs/plasma/lnwallet" + "li.lan/labs/plasma/lnwire" ) // channelState... @@ -22,15 +26,18 @@ const ( const ( numAllowedRetransmits = 5 + pingInterval = 1 * time.Minute ) +// outgoinMsg... +type outgoinMsg struct { + msg lnwire.Message + sentChan chan struct{} +} + // peer... // TODO(roasbeef): make this a package now?? // inspired by btcd/peer.go -// * three goroutines -// * inHandler -// * ourHandler -// * queueHandler (maybe?), we don't have any trickling issues so idk type peer struct { started int32 connected int32 @@ -58,15 +65,9 @@ type peer struct { satoshisReceived uint64 // TODO(roasbeef): pings?? - sendQueueDone chan struct{} - // outgoingQueue chan lnwire.Message - // sendQueue chan lnwire.Message - // TODO(roasbeef+j): something like? - // type Message { - // Decode(b bytes.Buffer) error - // Encode(b bytes.Buffer) error - // Command() string - //} + sendQueueSync chan struct{} + outgoingQueue chan outgoinMsg + sendQueue chan outgoinMsg // TODO(roasbeef): akward import, just rename to Wallet? wallet *lnwallet.LightningWallet // (tadge: what is this for?) @@ -78,4 +79,137 @@ type peer struct { queueQuit chan struct{} quit chan struct{} + wg sync.WaitGroup +} + +// readNextMessage... +func (p *peer) readNextMessage() (lnwire.Message, []byte, error) { + // TODO(roasbeef): use our own net magic? + _, nextMsg, rawPayload, err := lnwire.ReadMessage(p.conn, 0, wire.TestNet) + if err != nil { + return nil, nil, err + } + + return nextMsg, rawPayload, nil +} + +// inHandler.. +func (p *peer) inHandler() { + // TODO(roasbeef): set timeout for initial channel request or version + // exchange. + +out: + for atomic.LoadInt32(&p.disconnect) == 0 { + nextMsg, _, err := p.readNextMessage() + if err != nil { + // TODO(roasbeef): log error + break out + } + + // TODO(roasbeef): state-machine to track version exchange + switch msg := nextMsg.(type) { + // TODO(roasbeef): cases + } + } + + p.wg.Done() +} + +// writeMessage... +func (p *peer) writeMessage(msg lnwire.Message) error { + // Simply exit if we're shutting down. + if atomic.LoadInt32(&p.disconnect) != 0 { + return nil + } + + _, err := lnwire.WriteMessage(p.conn, msg, 0, + wire.TestNet) + + return err +} + +// outHandler.. +func (p *peer) outHandler() { + // pingTicker is used to periodically send pings to the remote peer. + pingTicker := time.NewTicker(pingInterval) + defer pingTicker.Stop() + +out: + for { + select { + case outMsg := <-p.sendQueue: + switch m := outMsg.msg.(type) { + // TODO(roasbeef): handle special write cases + } + + if err := p.writeMessage(outMsg.msg); err != nil { + // TODO(roasbeef): disconnect + } + + // Synchronize with the outHandler. + p.sendQueueSync <- struct{}{} + case <-pingTicker.C: + // TODO(roasbeef): ping em + case <-p.quit: + break out + + } + } + + // Wait for the queueHandler to finish so we can empty out all pending + // messages avoiding a possible deadlock somewhere. + <-p.queueQuit + + // Drain any lingering messages that we're meant to be sent. But since + // we're shutting down, just ignore them. +fin: + for { + select { + case msg := <-p.sendQueue: + if msg.sentChan != nil { + msg.sentChan <- struct{}{} + } + default: + break fin + } + } + p.wg.Done() +} + +// queueHandler.. +func (p *peer) queueHandler() { + waitOnSync := false + pendingMsgs := list.New() +out: + for { + select { + case msg := <-p.outgoingQueue: + if !waitOnSync { + p.sendQueue <- msg + } else { + pendingMsgs.PushBack(msg) + } + waitOnSync = true + case <-p.sendQueueSync: + // If there aren't any more remaining messages in the + // queue, then we're no longer waiting to synchronize + // with the outHandler. + next := pendingMsgs.Front() + if next == nil { + waitOnSync = false + continue + } + + // Notify the outHandler about the next item to + // asynchronously send. + val := pendingMsgs.Remove(next) + p.sendQueue <- val.(outgoinMsg) + // TODO(roasbeef): other sync stuffs + case <-p.quit: + break out + } + } + + close(p.queueQuit) + p.wg.Done() } diff --git a/rpcserver.go b/rpcserver.go index c3a6b95b..f87ebe38 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -38,6 +38,11 @@ func newRpcServer(wallet *lnwallet.LightningWallet) *rpcServer { make(chan []byte)} // init OmniChan (size 1 ok...?) } +// Stop... +func (r *rpcServer) Stop() error { + return nil +} + // getPriv gets the identity private key out of the wallet DB func getPriv(l *lnwallet.LightningWallet) (*btcec.PrivateKey, error) { adr, err := l.ChannelDB.GetIdAdr() diff --git a/server.go b/server.go index 06ab7d0f..8b9d94d2 100644 --- a/server.go +++ b/server.go @@ -1 +1,134 @@ package main + +import ( + "fmt" + "net" + "sync" + "sync/atomic" + + "li.lan/labs/plasma/lnwallet" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcwallet/walletdb" +) + +// server... +type server struct { + listeners []net.Listener + peers map[int32]*peer + + started int32 // atomic + shutdown int32 // atomic + + bitcoinNet *chaincfg.Params + + rpcServer *rpcServer + lnwallet *lnwallet.LightningWallet + + db walletdb.DB + + newPeers chan *peer + donePeers chan *peer + + wg sync.WaitGroup + quit chan struct{} +} + +// addPeer... +func (s *server) addPeer(p *peer) { +} + +// removePeer... +func (s *server) removePeer(p *peer) { +} + +// peerManager... +func (s *server) peerManager() { +out: + for { + select { + // New peers. + case p := <-s.newPeers: + s.addPeer(p) + // Finished peers. + case p := <-s.donePeers: + s.removePeer(p) + case <-s.quit: + break out + } + } + s.wg.Done() +} + +func (s *server) queryHandler() { +out: + for { + select { + // TODO(roasbeef): meta-rpc-stuff + case <-s.quit: + break out + } + } + + s.wg.Done() +} + +// AddPeer... +func (s *server) AddPeer(p *peer) { + s.newPeers <- p +} + +// listener... +func (s *server) listener(l net.Listener) { + for atomic.LoadInt32(&s.shutdown) == 0 { + conn, err := l.Accept() + if err != nil { + // TODO(roasbeef): log + continue + } + // TODO(roasbeef): create new peer, start it's goroutines + fmt.Println(conn) + } + + s.wg.Done() +} + +// Start... +func (s *server) Start() { + // Already running? + if atomic.AddInt32(&s.started, 1) != 1 { + return + } + + // Start all the listeners. + for _, l := range s.listeners { + s.wg.Add(1) + go s.listener(l) + } + + s.wg.Add(1) + go s.peerManager() + +} + +// Stop... +func (s *server) Stop() error { + // Bail if we're already shutting down. + if atomic.AddInt32(&s.shutdown, 1) != 1 { + return nil + } + + // Stop all the listeners. + for _, listener := range s.listeners { + if err := listener.Close(); err != nil { + return err + } + } + + s.rpcServer.Stop() + s.lnwallet.Stop() + + // Signal all the lingering goroutines to quit. + close(s.quit) + return nil +}