package actors import akka.actor.{Props, ActorLogging, Actor} import akka.routing.RoundRobinRouter import models.sae.{Candle, CandleStep} import nxt.util.Convert import nxt.{Asset, Trade} import scala.collection.JavaConversions._ import scala.collection.concurrent.TrieMap case class TradesDiff(assetId:Long, trades:Seq[Trade]) class CandleBuilder extends Actor with ActorLogging { override def receive = { case TradesDiff(assetId, trades) => trades.foreach { t => CandleStep.values.foreach { step => val ct = Candle.candleStart(Convert.fromEpochTime(t.getTimestamp), step) println(s"Going to add or merge candle for step $step, asset ${Convert.toUnsignedLong(assetId)}candles amount before: ${Candle.count(assetId, step)}") Candle.byTime(assetId, step, ct) match { case Some(c) => Candle.put(c + t) case None => Candle.put(Candle(t, step)) } } } } } class TradesWatcher extends Actor with ActorLogging { val tradeCountCache = TrieMap[Long, Int]() val candleBuilder = context.system.actorOf(Props[CandleBuilder].withRouter(RoundRobinRouter(3))) override def receive = { case RebuildCandles => val allAssets = Asset.getAllAssets(0, Integer.MAX_VALUE).iterator().toSeq allAssets foreach { asset => val aid = asset.getId val cachedCount = tradeCountCache.getOrElse(aid, 0) val realCount = Trade.getTradeCount(aid) if (cachedCount != realCount) { val diff = TradesDiff(aid, Trade.getAssetTrades(aid, cachedCount, realCount).iterator().toList) candleBuilder ! diff tradeCountCache.put(aid, realCount) } } } } object RebuildCandles