package autopilot

import (
	"net"
	"sync"
	"sync/atomic"

	"github.com/btcsuite/btcd/btcec"
	"github.com/btcsuite/btcutil"
	"github.com/davecgh/go-spew/spew"
	"github.com/lightningnetwork/lnd/lnwire"
)

// Config couples all the items that an autopilot agent needs to function.
// All items within the struct MUST be populated for the Agent to be able to
// carry out its duties.
type Config struct {
	// Self is the identity public key of the Lightning Network node that
	// is being driven by the agent. This is used to ensure that we don't
	// accidentally attempt to open a channel with ourselves.
	Self *btcec.PublicKey

	// Heuristic is an attachment heuristic which will govern to whom we
	// open channels to, and also what those channels look like in terms of
	// desired capacity. The Heuristic will take into account the current
	// state of the graph, our set of open channels, and the amount of
	// available funds when determining how channels are to be opened.
	// Additionally, a heuristic make also factor in extra-graph
	// information in order to make more pertinent recommendations.
	Heuristic AttachmentHeuristic

	// ChanController is an interface that is able to directly manage the
	// creation, closing and update of channels within the network.
	ChanController ChannelController

	// ConnectToPeer attempts to connect to the peer using one of its
	// advertised addresses. The boolean returned signals whether the peer
	// was already connected.
	ConnectToPeer func(*btcec.PublicKey, []net.Addr) (bool, error)

	// DisconnectPeer attempts to disconnect the peer with the given public
	// key.
	DisconnectPeer func(*btcec.PublicKey) error

	// WalletBalance is a function closure that should return the current
	// available balance o the backing wallet.
	WalletBalance func() (btcutil.Amount, error)

	// Graph is an abstract channel graph that the Heuristic and the Agent
	// will use to make decisions w.r.t channel allocation and placement
	// within the graph.
	Graph ChannelGraph

	// MaxPendingOpens is the maximum number of pending channel
	// establishment goroutines that can be lingering. We cap this value in
	// order to control the level of parallelism caused by the autopiloit
	// agent.
	MaxPendingOpens uint16

	// TODO(roasbeef): add additional signals from fee rates and revenue of
	// currently opened channels
}

// channelState is a type that represents the set of active channels of the
// backing LN node that the Agent should be ware of. This type contains a few
// helper utility methods.
type channelState map[lnwire.ShortChannelID]Channel

// Channels returns a slice of all the active channels.
func (c channelState) Channels() []Channel {
	chans := make([]Channel, 0, len(c))
	for _, channel := range c {
		chans = append(chans, channel)
	}
	return chans
}

// ConnectedNodes returns the set of nodes we currently have a channel with.
// This information is needed as we want to avoid making repeated channels with
// any node.
func (c channelState) ConnectedNodes() map[NodeID]struct{} {
	nodes := make(map[NodeID]struct{})
	for _, channels := range c {
		nodes[channels.Node] = struct{}{}
	}

	// TODO(roasbeef): add outgoing, nodes, allow incoming and outgoing to
	// per node
	//  * only add node is chan as funding amt set

	return nodes
}

// Agent implements a closed-loop control system which seeks to autonomously
// optimize the allocation of satoshis within channels throughput the network's
// channel graph. An agent is configurable by swapping out different
// AttachmentHeuristic strategies. The agent uses external signals such as the
// wallet balance changing, or new channels being opened/closed for the local
// node as an indicator to re-examine its internal state, and the amount of
// available funds in order to make updated decisions w.r.t the channel graph.
// The Agent will automatically open, close, and splice in/out channel as
// necessary for it to step closer to its optimal state.
//
// TODO(roasbeef): prob re-word
type Agent struct {
	// Only to be used atomically.
	started uint32
	stopped uint32

	// cfg houses the configuration state of the Ant.
	cfg Config

	// chanState tracks the current set of open channels.
	chanState channelState

	// stateUpdates is a channel that any external state updates that may
	// affect the heuristics of the agent will be sent over.
	stateUpdates chan interface{}

	// balanceUpdates is a channel where notifications about updates to the
	// wallet's balance will be sent. This channel will be buffered to
	// ensure we have at most one pending update of this type to handle at
	// a given time.
	balanceUpdates chan *balanceUpdate

	// nodeUpdates is a channel that changes to the graph node landscape
	// will be sent over. This channel will be buffered to ensure we have
	// at most one pending update of this type to handle at a given time.
	nodeUpdates chan *nodeUpdates

	// pendingOpenUpdates is a channel where updates about channel pending
	// opening will be sent. This channel will be buffered to ensure we
	// have at most one pending update of this type to handle at a given
	// time.
	pendingOpenUpdates chan *chanPendingOpenUpdate

	// chanOpenFailures is a channel where updates about channel open
	// failures will be sent. This channel will be buffered to ensure we
	// have at most one pending update of this type to handle at a given
	// time.
	chanOpenFailures chan *chanOpenFailureUpdate

	// totalBalance is the total number of satoshis the backing wallet is
	// known to control at any given instance. This value will be updated
	// when the agent receives external balance update signals.
	totalBalance btcutil.Amount

	quit chan struct{}
	wg   sync.WaitGroup
}

// New creates a new instance of the Agent instantiated using the passed
// configuration and initial channel state. The initial channel state slice
// should be populated with the set of Channels that are currently opened by
// the backing Lightning Node.
func New(cfg Config, initialState []Channel) (*Agent, error) {
	a := &Agent{
		cfg:                cfg,
		chanState:          make(map[lnwire.ShortChannelID]Channel),
		quit:               make(chan struct{}),
		stateUpdates:       make(chan interface{}),
		balanceUpdates:     make(chan *balanceUpdate, 1),
		nodeUpdates:        make(chan *nodeUpdates, 1),
		chanOpenFailures:   make(chan *chanOpenFailureUpdate, 1),
		pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1),
	}

	for _, c := range initialState {
		a.chanState[c.ChanID] = c
	}

	return a, nil
}

// Start starts the agent along with any goroutines it needs to perform its
// normal duties.
func (a *Agent) Start() error {
	if !atomic.CompareAndSwapUint32(&a.started, 0, 1) {
		return nil
	}

	log.Infof("Autopilot Agent starting")

	a.wg.Add(1)
	go a.controller()

	return nil
}

// Stop signals the Agent to gracefully shutdown. This function will block
// until all goroutines have exited.
func (a *Agent) Stop() error {
	if !atomic.CompareAndSwapUint32(&a.stopped, 0, 1) {
		return nil
	}

	log.Infof("Autopilot Agent stopping")

	close(a.quit)
	a.wg.Wait()

	return nil
}

// balanceUpdate is a type of external state update that reflects an
// increase/decrease in the funds currently available to the wallet.
type balanceUpdate struct {
}

// nodeUpdates is a type of external state update that reflects an addition or
// modification in channel graph node membership.
type nodeUpdates struct{}

// chanOpenUpdate is a type of external state update that indicates a new
// channel has been opened, either by the Agent itself (within the main
// controller loop), or by an external user to the system.
type chanOpenUpdate struct {
	newChan Channel
}

// chanPendingOpenUpdate is a type of external state update that indicates a new
// channel has been opened, either by the agent itself or an external subsystem,
// but is still pending.
type chanPendingOpenUpdate struct{}

// chanOpenFailureUpdate is a type of external state update that indicates
// a previous channel open failed, and that it might be possible to try again.
type chanOpenFailureUpdate struct{}

// chanCloseUpdate is a type of external state update that indicates that the
// backing Lightning Node has closed a previously open channel.
type chanCloseUpdate struct {
	closedChans []lnwire.ShortChannelID
}

// OnBalanceChange is a callback that should be executed each time the balance
// of the backing wallet changes.
func (a *Agent) OnBalanceChange() {
	select {
	case a.balanceUpdates <- &balanceUpdate{}:
	default:
	}
}

// OnNodeUpdates is a callback that should be executed each time our channel
// graph has new nodes or their node announcements are updated.
func (a *Agent) OnNodeUpdates() {
	select {
	case a.nodeUpdates <- &nodeUpdates{}:
	default:
	}
}

// OnChannelOpen is a callback that should be executed each time a new channel
// is manually opened by the user or any system outside the autopilot agent.
func (a *Agent) OnChannelOpen(c Channel) {
	a.wg.Add(1)
	go func() {
		defer a.wg.Done()

		select {
		case a.stateUpdates <- &chanOpenUpdate{newChan: c}:
		case <-a.quit:
		}
	}()
}

// OnChannelPendingOpen is a callback that should be executed each time a new
// channel is opened, either by the agent or an external subsystems, but is
// still pending.
func (a *Agent) OnChannelPendingOpen() {
	select {
	case a.pendingOpenUpdates <- &chanPendingOpenUpdate{}:
	default:
	}
}

// OnChannelOpenFailure is a callback that should be executed when the
// autopilot has attempted to open a channel, but failed. In this case we can
// retry channel creation with a different node.
func (a *Agent) OnChannelOpenFailure() {
	select {
	case a.chanOpenFailures <- &chanOpenFailureUpdate{}:
	default:
	}
}

// OnChannelClose is a callback that should be executed each time a prior
// channel has been closed for any reason. This includes regular
// closes, force closes, and channel breaches.
func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) {
	a.wg.Add(1)
	go func() {
		defer a.wg.Done()

		select {
		case a.stateUpdates <- &chanCloseUpdate{closedChans: closedChans}:
		case <-a.quit:
		}
	}()
}

// mergeNodeMaps merges the Agent's set of nodes that it already has active
// channels open to, with the other sets of nodes that should be removed from
// consideration during heuristic selection. This ensures that the Agent doesn't
// attempt to open any "duplicate" channels to the same node.
func mergeNodeMaps(c map[NodeID]Channel,
	skips ...map[NodeID]struct{}) map[NodeID]struct{} {

	numNodes := len(c)
	for _, skip := range skips {
		numNodes += len(skip)
	}

	res := make(map[NodeID]struct{}, len(c)+numNodes)
	for nodeID := range c {
		res[nodeID] = struct{}{}
	}
	for _, skip := range skips {
		for nodeID := range skip {
			res[nodeID] = struct{}{}
		}
	}

	return res
}

// mergeChanState merges the Agent's set of active channels, with the set of
// channels awaiting confirmation. This ensures that the agent doesn't go over
// the prescribed channel limit or fund allocation limit.
func mergeChanState(pendingChans map[NodeID]Channel,
	activeChans channelState) []Channel {

	numChans := len(pendingChans) + len(activeChans)
	totalChans := make([]Channel, 0, numChans)

	for _, activeChan := range activeChans.Channels() {
		totalChans = append(totalChans, activeChan)
	}
	for _, pendingChan := range pendingChans {
		totalChans = append(totalChans, pendingChan)
	}

	return totalChans
}

// controller implements the closed-loop control system of the Agent. The
// controller will make a decision w.r.t channel placement within the graph
// based on: its current internal state of the set of active channels open,
// and external state changes as a result of decisions it  makes w.r.t channel
// allocation, or attributes affecting its control loop being updated by the
// backing Lightning Node.
func (a *Agent) controller() {
	defer a.wg.Done()

	// We'll start off by assigning our starting balance, and injecting
	// that amount as an initial wake up to the main controller goroutine.
	a.OnBalanceChange()

	// TODO(roasbeef): do we in fact need to maintain order?
	//  * use sync.Cond if so

	// failedNodes lists nodes that we've previously attempted to initiate
	// channels with, but didn't succeed.
	failedNodes := make(map[NodeID]struct{})

	// pendingConns tracks the nodes that we are attempting to make
	// connections to. This prevents us from making duplicate connection
	// requests to the same node.
	pendingConns := make(map[NodeID]struct{})

	// pendingOpens tracks the channels that we've requested to be
	// initiated, but haven't yet been confirmed as being fully opened.
	// This state is required as otherwise, we may go over our allotted
	// channel limit, or open multiple channels to the same node.
	pendingOpens := make(map[NodeID]Channel)
	var pendingMtx sync.Mutex

	updateBalance := func() {
		newBalance, err := a.cfg.WalletBalance()
		if err != nil {
			log.Warnf("unable to update wallet balance: %v", err)
			return
		}

		a.totalBalance = newBalance
	}

	// TODO(roasbeef): add 10-minute wake up timer
	for {
		select {
		// A new external signal has arrived. We'll use this to update
		// our internal state, then determine if we should trigger a
		// channel state modification (open/close, splice in/out).
		case signal := <-a.stateUpdates:
			log.Infof("Processing new external signal")

			switch update := signal.(type) {
			// A new channel has been opened successfully. This was
			// either opened by the Agent, or an external system
			// that is able to drive the Lightning Node.
			case *chanOpenUpdate:
				log.Debugf("New channel successfully opened, "+
					"updating state with: %v",
					spew.Sdump(update.newChan))

				newChan := update.newChan
				a.chanState[newChan.ChanID] = newChan

				pendingMtx.Lock()
				delete(pendingOpens, newChan.Node)
				pendingMtx.Unlock()

				updateBalance()
			// A channel has been closed, this may free up an
			// available slot, triggering a new channel update.
			case *chanCloseUpdate:
				log.Debugf("Applying closed channel "+
					"updates: %v",
					spew.Sdump(update.closedChans))

				for _, closedChan := range update.closedChans {
					delete(a.chanState, closedChan)
				}

				updateBalance()
			}

		// A new channel has been opened by the agent or an external
		// subsystem, but is still pending confirmation.
		case <-a.pendingOpenUpdates:
			updateBalance()

		// The balance of the backing wallet has changed, if more funds
		// are now available, we may attempt to open up an additional
		// channel, or splice in funds to an existing one.
		case <-a.balanceUpdates:
			log.Debug("Applying external balance state update")

			updateBalance()

		// The channel we tried to open previously failed for whatever
		// reason.
		case <-a.chanOpenFailures:
			log.Debug("Retrying after previous channel open " +
				"failure.")

			updateBalance()

		// New nodes have been added to the graph or their node
		// announcements have been updated. We will consider opening
		// channels to these nodes if we haven't stabilized.
		case <-a.nodeUpdates:
			log.Infof("Node updates received, assessing " +
				"need for more channels")

		// The agent has been signalled to exit, so we'll bail out
		// immediately.
		case <-a.quit:
			return
		}

		pendingMtx.Lock()
		log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens))
		pendingMtx.Unlock()

		// With all the updates applied, we'll obtain a set of the
		// current active channels (confirmed channels), and also
		// factor in our set of unconfirmed channels.
		confirmedChans := a.chanState
		pendingMtx.Lock()
		totalChans := mergeChanState(pendingOpens, confirmedChans)
		pendingMtx.Unlock()

		// Now that we've updated our internal state, we'll consult our
		// channel attachment heuristic to determine if we should open
		// up any additional channels or modify existing channels.
		availableFunds, numChans, needMore := a.cfg.Heuristic.NeedMoreChans(
			totalChans, a.totalBalance,
		)
		if !needMore {
			continue
		}

		log.Infof("Triggering attachment directive dispatch, "+
			"total_funds=%v", a.totalBalance)

		// We're to attempt an attachment so we'll o obtain the set of
		// nodes that we currently have channels with so we avoid
		// duplicate edges.
		connectedNodes := a.chanState.ConnectedNodes()
		pendingMtx.Lock()
		nodesToSkip := mergeNodeMaps(pendingOpens,
			pendingConns, connectedNodes, failedNodes,
		)
		pendingMtx.Unlock()

		// If we reach this point, then according to our heuristic we
		// should modify our channel state to tend towards what it
		// determines to the optimal state. So we'll call Select to get
		// a fresh batch of attachment directives, passing in the
		// amount of funds available for us to use.
		chanCandidates, err := a.cfg.Heuristic.Select(
			a.cfg.Self, a.cfg.Graph, availableFunds,
			numChans, nodesToSkip,
		)
		if err != nil {
			log.Errorf("Unable to select candidates for "+
				"attachment: %v", err)
			continue
		}

		if len(chanCandidates) == 0 {
			log.Infof("No eligible candidates to connect to")
			continue
		}

		log.Infof("Attempting to execute channel attachment "+
			"directives: %v", spew.Sdump(chanCandidates))

		// Before proceeding, check to see if we have any slots
		// available to open channels. If there are any, we will attempt
		// to dispatch the retrieved directives since we can't be
		// certain which ones may actually succeed. If too many
		// connections succeed, we will they will be ignored and made
		// available to future heuristic selections.
		pendingMtx.Lock()
		if uint16(len(pendingOpens)) >= a.cfg.MaxPendingOpens {
			pendingMtx.Unlock()
			log.Debugf("Reached cap of %v pending "+
				"channel opens, will retry "+
				"after success/failure",
				a.cfg.MaxPendingOpens)
			continue
		}

		// For each recommended attachment directive, we'll launch a
		// new goroutine to attempt to carry out the directive. If any
		// of these succeed, then we'll receive a new state update,
		// taking us back to the top of our controller loop.
		for _, chanCandidate := range chanCandidates {
			// Skip candidates which we are already trying
			// to establish a connection with.
			nodeID := chanCandidate.NodeID
			if _, ok := pendingConns[nodeID]; ok {
				continue
			}
			pendingConns[nodeID] = struct{}{}

			go func(directive AttachmentDirective) {
				// We'll start out by attempting to connect to
				// the peer in order to begin the funding
				// workflow.
				pub := directive.NodeKey
				alreadyConnected, err := a.cfg.ConnectToPeer(
					pub, directive.Addrs,
				)
				if err != nil {
					log.Warnf("Unable to connect "+
						"to %x: %v",
						pub.SerializeCompressed(),
						err)

					// Since we failed to connect to them,
					// we'll mark them as failed so that we
					// don't attempt to connect to them
					// again.
					nodeID := NewNodeID(pub)
					pendingMtx.Lock()
					delete(pendingConns, nodeID)
					failedNodes[nodeID] = struct{}{}
					pendingMtx.Unlock()

					// Finally, we'll trigger the agent to
					// select new peers to connect to.
					a.OnChannelOpenFailure()

					return
				}

				// The connection was successful, though before
				// progressing we must check that we have not
				// already met our quota for max pending open
				// channels. This can happen if multiple
				// directives were spawned but fewer slots were
				// available, and other successful attempts
				// finished first.
				pendingMtx.Lock()
				if uint16(len(pendingOpens)) >=
					a.cfg.MaxPendingOpens {
					// Since we've reached our max number of
					// pending opens, we'll disconnect this
					// peer and exit. However, if we were
					// previously connected to them, then
					// we'll make sure to maintain the
					// connection alive.
					if alreadyConnected {
						// Since we succeeded in
						// connecting, we won't add this
						// peer to the failed nodes map,
						// but we will remove it from
						// pendingConns so that it can
						// be retried in the future.
						delete(pendingConns, nodeID)
						pendingMtx.Unlock()
						return
					}

					err = a.cfg.DisconnectPeer(
						pub,
					)
					if err != nil {
						log.Warnf("Unable to "+
							"disconnect peer "+
							"%x: %v",
							pub.SerializeCompressed(),
							err)
					}

					// Now that we have disconnected, we can
					// remove this node from our pending
					// conns map, permitting subsequent
					// connection attempts.
					delete(pendingConns, nodeID)
					pendingMtx.Unlock()
					return
				}

				// If we were successful, we'll track this peer
				// in our set of pending opens. We do this here
				// to ensure we don't stall on selecting new
				// peers if the connection attempt happens to
				// take too long.
				nodeID := directive.NodeID
				delete(pendingConns, nodeID)
				pendingOpens[nodeID] = Channel{
					Capacity: directive.ChanAmt,
					Node:     nodeID,
				}
				pendingMtx.Unlock()

				// We can then begin the funding workflow with
				// this peer.
				err = a.cfg.ChanController.OpenChannel(
					pub, directive.ChanAmt,
				)
				if err != nil {
					log.Warnf("Unable to open "+
						"channel to %x of %v: %v",
						pub.SerializeCompressed(),
						directive.ChanAmt, err)

					// As the attempt failed, we'll clear
					// the peer from the set of pending
					// opens and mark them as failed so we
					// don't attempt to open a channel to
					// them again.
					pendingMtx.Lock()
					delete(pendingOpens, nodeID)
					failedNodes[nodeID] = struct{}{}
					pendingMtx.Unlock()

					// Trigger the agent to re-evaluate
					// everything and possibly retry with a
					// different node.
					a.OnChannelOpenFailure()

					// Finally, we should also disconnect
					// the peer if we weren't already
					// connected to them beforehand by an
					// external subsystem.
					if alreadyConnected {
						return
					}

					err = a.cfg.DisconnectPeer(pub)
					if err != nil {
						log.Warnf("Unable to "+
							"disconnect peer "+
							"%x: %v",
							pub.SerializeCompressed(),
							err)
					}
				}

				// Since the channel open was successful and is
				// currently pending, we'll trigger the
				// autopilot agent to query for more peers.
				a.OnChannelPendingOpen()
			}(chanCandidate)
		}
		pendingMtx.Unlock()

	}
}