package actors import akka actor Props ActorLogging Actor import akka

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package actors
import akka.actor.{Props, ActorLogging, Actor}
import akka.routing.RoundRobinRouter
import models.sae.{Candle, CandleStep}
import nxt.util.Convert
import nxt.{Asset, Trade}
import scala.collection.JavaConversions._
import scala.collection.concurrent.TrieMap
case class TradesDiff(assetId:Long, trades:Seq[Trade])
class CandleBuilder extends Actor with ActorLogging {
override def receive = {
case TradesDiff(assetId, trades) =>
trades.foreach { t =>
CandleStep.values.foreach { step =>
val ct = Candle.candleStart(Convert.fromEpochTime(t.getTimestamp), step)
println(s"Going to add or merge candle for step $step, asset ${Convert.toUnsignedLong(assetId)}candles amount before: ${Candle.count(assetId, step)}")
Candle.byTime(assetId, step, ct) match {
case Some(c) =>
Candle.put(c + t)
case None =>
Candle.put(Candle(t, step))
}
}
}
}
}
class TradesWatcher extends Actor with ActorLogging {
val tradeCountCache = TrieMap[Long, Int]()
val candleBuilder = context.system.actorOf(Props[CandleBuilder].withRouter(RoundRobinRouter(3)))
override def receive = {
case RebuildCandles =>
val allAssets = Asset.getAllAssets(0, Integer.MAX_VALUE).iterator().toSeq
allAssets foreach { asset =>
val aid = asset.getId
val cachedCount = tradeCountCache.getOrElse(aid, 0)
val realCount = Trade.getTradeCount(aid)
if (cachedCount != realCount) {
val diff = TradesDiff(aid, Trade.getAssetTrades(aid, cachedCount, realCount).iterator().toList)
candleBuilder ! diff
tradeCountCache.put(aid, realCount)
}
}
}
}
object RebuildCandles