package main import ( "container/list" "net" "sync" "sync/atomic" "time" "github.com/btcsuite/btcd/wire" "li.lan/labs/plasma/lnwallet" "li.lan/labs/plasma/lnwire" ) // channelState... type channelState uint8 const ( // TODO(roasbeef): others?? channelPending channelState = iota channelOpen channelClosed channelDispute channelPendingPayment ) const ( numAllowedRetransmits = 5 pingInterval = 1 * time.Minute ) // outgoinMsg... type outgoinMsg struct { msg lnwire.Message sentChan chan struct{} } // peer... // TODO(roasbeef): make this a package now?? // inspired by btcd/peer.go type peer struct { started int32 connected int32 disconnect int32 // only to be used atomically // *ETcpConn or w/e it is in strux conn net.Conn // TODO(rosabeef): one for now, may need more granularity sync.RWMutex addr string lnID [32]byte // TODO(roasbeef): copy from strux inbound bool protocolVersion uint32 // For purposes of detecting retransmits, etc. // lastNMessages map[lnwire.Message]struct{} timeConnected time.Time lastSend time.Time lastRecv time.Time bytesReceived uint64 bytesSent uint64 satoshisSent uint64 satoshisReceived uint64 // TODO(roasbeef): pings?? sendQueueSync chan struct{} outgoingQueue chan outgoinMsg sendQueue chan outgoinMsg // TODO(roasbeef): akward import, just rename to Wallet? wallet *lnwallet.LightningWallet // (tadge: what is this for?) // Only will be set if the channel is in the 'pending' state. reservation *lnwallet.ChannelReservation channel *lnwallet.LightningChannel // TODO(roasbeef): rename to PaymentChannel?? queueQuit chan struct{} quit chan struct{} wg sync.WaitGroup } // readNextMessage... func (p *peer) readNextMessage() (lnwire.Message, []byte, error) { // TODO(roasbeef): use our own net magic? _, nextMsg, rawPayload, err := lnwire.ReadMessage(p.conn, 0, wire.TestNet) if err != nil { return nil, nil, err } return nextMsg, rawPayload, nil } // inHandler.. func (p *peer) inHandler() { // TODO(roasbeef): set timeout for initial channel request or version // exchange. out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, _, err := p.readNextMessage() if err != nil { // TODO(roasbeef): log error break out } // TODO(roasbeef): state-machine to track version exchange switch msg := nextMsg.(type) { // TODO(roasbeef): cases } } p.wg.Done() } // writeMessage... func (p *peer) writeMessage(msg lnwire.Message) error { // Simply exit if we're shutting down. if atomic.LoadInt32(&p.disconnect) != 0 { return nil } _, err := lnwire.WriteMessage(p.conn, msg, 0, wire.TestNet) return err } // outHandler.. func (p *peer) outHandler() { // pingTicker is used to periodically send pings to the remote peer. pingTicker := time.NewTicker(pingInterval) defer pingTicker.Stop() out: for { select { case outMsg := <-p.sendQueue: switch m := outMsg.msg.(type) { // TODO(roasbeef): handle special write cases } if err := p.writeMessage(outMsg.msg); err != nil { // TODO(roasbeef): disconnect } // Synchronize with the outHandler. p.sendQueueSync <- struct{}{} case <-pingTicker.C: // TODO(roasbeef): ping em case <-p.quit: break out } } // Wait for the queueHandler to finish so we can empty out all pending // messages avoiding a possible deadlock somewhere. <-p.queueQuit // Drain any lingering messages that we're meant to be sent. But since // we're shutting down, just ignore them. fin: for { select { case msg := <-p.sendQueue: if msg.sentChan != nil { msg.sentChan <- struct{}{} } default: break fin } } p.wg.Done() } // queueHandler.. func (p *peer) queueHandler() { waitOnSync := false pendingMsgs := list.New() out: for { select { case msg := <-p.outgoingQueue: if !waitOnSync { p.sendQueue <- msg } else { pendingMsgs.PushBack(msg) } waitOnSync = true case <-p.sendQueueSync: // If there aren't any more remaining messages in the // queue, then we're no longer waiting to synchronize // with the outHandler. next := pendingMsgs.Front() if next == nil { waitOnSync = false continue } // Notify the outHandler about the next item to // asynchronously send. val := pendingMsgs.Remove(next) p.sendQueue <- val.(outgoinMsg) // TODO(roasbeef): other sync stuffs case <-p.quit: break out } } close(p.queueQuit) p.wg.Done() }