`
sillycat
  • 浏览: 2488153 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Improve the Throttle for Akka(1)Local Cache

 
阅读更多
Improve the Throttle for Akka(1)Local Cache

1 Basic Idea of Akka
Start the Akka System, send messages.
package com.sillycat.throttle.demo
import akka.actor.{Actor, ActorSystem, Props}
class HelloActor extends Actor{

  def receive = {
    case "hello" => {
      println("hello!")
    }
    case _ => {
      println("huh?")
    }
  }
}

object HelloApp extends App {
  val system =  ActorSystem("HelloSystem")
  system.actorOf(Props[HelloActor], name="test1")
  system.actorOf(Props[HelloActor], name="test2")
  system.actorOf(Props[HelloActor], name="test3")

//  implicit val resolveTimeout = Timeout(5 seconds)
//  system.actorSelection("/user/test*").resolveOne().map { helloActor=>
//    println(helloActor)
//    helloActor ! "hello"
//    helloActor ! "bye"
//  }

  val helloActor = system.actorSelection("/user/test1")
      helloActor ! "hello"
      helloActor ! "bye"
  system.awaitTermination()
}

Simple Example with Router
package com.sillycat.throttle.demo

import akka.actor._
import akka.routing.FromConfig
import com.typesafe.config.ConfigFactory

object RoutingApp extends App{

  // A simple actor that prints whatever it receives
  class Printer extends Actor {
    def receive = {
      case x => println(self.path + " saying " + x)
    }
  }

  class Shooter extends Actor {
    def receive = {
      case x => println(self.path + " shouting " + x)
    }
  }

  val system =  ActorSystem("RoutingSystem", ConfigFactory.load())
  val router1: ActorRef = system.actorOf(Props[Printer].withRouter(FromConfig()), name = "Router1")
  // These three messages will be sent to the printer immediately
  router1 ! "11"
  router1 ! "12"
  router1 ! "13"
  // These two will wait at least until 1 second has passed
  router1 ! "14"
  router1 ! "15"
  println(" Router 1 " + router1.path)

  val router2: ActorRef = system.actorOf(Props[Shooter].withRouter(FromConfig()), name = "Router2")
  router2 ! "21"
  router2 ! "22"

  val router3: ActorSelection = system.actorSelection("/user/Router2")
  router3 ! "23"

  println(" Router 2 " + router2.path)

  system.shutdown()

}

2 Throttler Based on Local Cache
Try to implement this solution.
  //FUNCTION LIMIT_API_CALL(ip)
  //  ts = CURRENT_UNIX_TIME()
  //  keyname = ip+":"+ts
  //  current = GET(keyname)
  //  IF current != NULL AND current > 10 THEN
  //    ERROR "too many requests per second"
  //  ELSE
  //    MULTI
  //      INCR(keyname,1)
  //      EXPIRE(keyname,10)
  //    EXEC
  //      PERFORM_API_CALL()
  //END

I first build one implementation on top of guava local cache.
package actors.throttle

import akka.actor.{Actor, ActorRef}
import com.sillycat.util.IncludeLogger
import services.LocalCache
import utils.IncludeDateTimeUtil

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

class LocalCacheBasedThrottler(var rate: Rate,var target: ActorRef) extends Actor with IncludeLogger with IncludeDateTimeUtil{

  val throttleKey = "THROTTLE_"

  def receive = {
    case msg: MessageTick => {
      val msgKey = msg.key
      val realMsg = msg.msg
      val counter = msg.count

      val limitCalls = rate.numberOfCalls
      val timeWindows = rate.duration

      val timeKey = convertCurrentTime2Key(timeWindows)
      val key = throttleKey + timeKey + msgKey

      LocalCache.throttleBucket.getIfPresent(key) match {
        case count:java.lang.Integer if count >= limitCalls => {
          //delay random and tick self
          LocalCache.throttleBucket.put(key, count + counter)
          val delay = calculateDelay(count + counter, limitCalls, timeWindows)
          //tick to self within the delay
          context.system.scheduler.scheduleOnce(delay second, self, msg)
        }
        case count:java.lang.Integer => {
          //count + 1
          LocalCache.throttleBucket.put(key, count + counter)
          //pass the ticket
          target ! realMsg
        }
        case _ => {
          //init the count
          LocalCache.throttleBucket.put(key, new Integer(counter))
          //pass the ticket
          target ! realMsg
        }
      }

    }
    case _ => {
      logger.error("Received a message I don't understand.")
    }
  }

}

case class Rate(val numberOfCalls: Int, val duration: Int)

case class MessageTick( key:String,  msg:Any, count: Int = 1)

The Local Cache Classes
package services

import java.util.concurrent.TimeUnit

import com.google.common.cache.CacheBuilder
import models.CountStep

object LocalCache {

  val builderThrottle = CacheBuilder.newBuilder().expireAfterWrite(60, TimeUnit.SECONDS)
  val throttleBucket = builderThrottle.build[java.lang.String, java.lang.Integer]()

  val builderMsg = CacheBuilder.newBuilder().expireAfterWrite(30, TimeUnit.MINUTES)
  val msgBucket = builderMsg.build[java.lang.String, CountStep]()

}

Here is how to use it.
    Akka.system.actorOf(Props(classOf[LocalCacheBasedThrottler],
      Rate(4, 15),
      contextIOActor),
      name = "context-io-throttler")

References:
http://sillycat.iteye.com/blog/2258226
http://letitcrash.com/post/28901663062/throttling-messages-in-akka-2
https://github.com/hbf/akka-throttler
http://sillycat.iteye.com/blog/1553508
http://doc.akka.io/api/akka/2.3.4/index.html#akka.contrib.throttle.TimerBasedThrottler
http://redis.io/commands/incr

http://doc.akka.io/docs/akka/snapshot/scala/routing.html
分享到:
评论

相关推荐

    Embedded Software Control Design for an Electronic Throttle Body

    derived for controlling the throttle plate dynamics. The adaptive control law provides an on-line estimate of the friction, which can identify a throttle body with excessive friction. Besides the ...

    ThrottleStop 免费版,尽情使用,永不过期!

    ThrottleStop 免费版,尽情使用,永不过期!ThrottleStop 让电脑为性能发烧锁定主频,气绝降频,更可以调节至尊系列处理器的倍频、电压、外频,ThrottleStop 在手,强悍恒持久!

    ThrottleStop cpu超频调频软件 最新版 v8.6

    throttlestop是电脑cpu性能检测软件。throttlestop主要是为用户实时监测cpu运行性能,并解决笔记玩游戏时温度一高就降频运行迟钝歇菜难题,让玩家能尽情畅玩游戏。 throttlestop功能介绍: 1、用于笔记本调节cpu; 2...

    ThrottleStop-9.5 笔记本处理器intel超频软件

    ThrottleStop_9.5 笔记本处理器intel超频软件

    ThrottleStop

    ThrottleStop是一款能让你的Intel Core i或者Core 2处理器运行更为流畅的应用程序。此程序提供4种预设配置文件,提高你电脑运行的体验。 “Performance”模式可以提高一般性能,“Game”模式可为游戏者完善功能,...

    Android代码-BeamNG.drive Remote Control

    BeamNG.drive Remote Control Remote Control app for the PC game BeamNG.drive. Communication functionality ...Throttle 1 for pushed otherwise 0 Breaks 1 for pushed otherwise 0 App needs

    ThrottleStop 最新版8.48

    上传资源的目的是因为之前下载的ThrottleStop8.40版本过期了(打开后提示This beta version has now expired.),内网也没有新版,寻找到这个最新版本,上传后以便大家使用。 保证原版资源无任何修改添加。 throttle...

    Throttle v8.3.4.2019.zip

    今天小编要给大家介绍一款专业好用的网络加速软件——Throttle。Throttle的特点是不用查找用户所用的路由或猫的型号,只要输入连网的方式和操作系统,软件即会自动实现网络加速功能。软件功能强大,支持4.4/28.8/...

    Throttlestop_CPU鸡血

    免除硬超烦恼,好评如潮的软超软件。Throttlestop_CPU鸡血

    throttlestop6_jb51 (1).rar

    官方原版ThrottleStop_8.70.6 笔记本cpu超频降频睿频工具

    ThrottleStop_850

    ThrottleStop 8.50版本是Intel Core 2和Core i系列CPU的性能监视和修改工具。它可能以“不计代价”的方式最大化发挥CPU的性能,并允许系统忽视温度的影响。

    ThrottleStop_500a

    ThrottleStop是一个酷睿2/酷睿i处理器性能调整工具,以监测和正确的CPU节流,许多笔记本电脑上正在使用的3个主要。左侧的ThrottleStop包含多种选项可以用来绕过CPU节流

    ThrottleStop_9.3.zip

    解决CPU过热降频问题 throttlestop功能介绍: 1、非常使用的cup功能。 2、能够很好的帮助用户解决cpu问题。 3、可设置cpu温度。

    ThrottleStop v6.0 汉化

    ThrottleStop6.0汉化版一款拥有强大的 CPU硬件控制功能的软件,分享给需要的朋友!!!

    ThrottleStop 8.20

    上传资源的目的是因为之前下载的ThrottleStop8.10beta2版本过期了(打开后提示This beta version has now expired.),内网也没有新版,寻找到这个最新版本,上传后以便大家使用。 throttlestop是款防cpu降频软件。...

    前端项目-jquery-throttle-debounce.zip

    前端项目-jquery-throttle-debounce,jquery throttle/debounce允许您以多种有用的方式对函数进行速率限制。

    throttlestop6

    throttlestop6汉化版是一款笔记本电脑CPU频率调节软件;是市面上非常优秀的一款笔记本cpu调频软件,界面简洁、操作方便,能绕过BD PROCHOT技术,让CPU稳定运行在默认频率或者之上,具有监视CPU状态以及锁定CPU频率等...

    ThrottleStop_8.70.6

    hrottlestop是款专业的、强大的、易用的cpu性能测试软件。throttlestop使用简单,功能实用,能够很好的帮助用户实时监测cpu的运行...1、非常使用的cup功能。 2、能够很好的帮助用户解决cpu问题。 3、可设置cpu温度。

    ThrottleStop-9.4 笔记本处理器intel超频软件

    ThrottleStop_9.4 笔记本处理器intel超频软件

    ThrottleStop V8.48解决CPU降频软件.zip

    ThrottleStop8.48是当前最新版本,可以解决8.40版本在使用时提示This beta version has now expired的问题。throttlestop用于防止cpu降频运行,解决电脑cpu达不到最高性能的问题。现在的CPU很智能化,当温度...

Global site tag (gtag.js) - Google Analytics