- 浏览: 2487406 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
Play framework Timer and Scheduler
1 Global Setting Up and Akka Scheduler for Actors
I create a scala application here Global.scala
import actors._
import actors.jobs.{JobServiceActor, JobParseActor}
import actors.resumes.{ResumeServiceActor, ResumeParseActor, ResumeAttachActor}
import actors.jobs.{JobServiceActor, JobParseActor}
import actors.resumes.{ResumeServiceActor, ResumeParseActor, ResumeAttachActor}
import akka.actor.Props
import com.sillycat.dao.{IpplyJobDAO, DAO}
import com.sillycat.util.IncludeLogger
import play.api.libs.concurrent.Akka
import play.api.libs.json.{JsError, Json}
import play.api.mvc.{BodyParsers, Action, Controller}
import play.api.Play.current
import akka.contrib.throttle.TimerBasedThrottler
import akka.contrib.throttle.Throttler._
import utils.IncludeWSConfig
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object Global extends play.api.GlobalSettings with IncludeLogger with IncludeWSConfig{
override def onStart(app: play.api.Application) {
logger.info("Start Email Scan Application")
logger.info("init all the actors -------------")
Akka.system.actorOf(JobParseActor.props, name = "job-parse-actor")
logger.trace("job parse actor")
Akka.system.actorOf(JobServiceActor.props, name = "job-service-actor")
logger.trace("job service actor")
Akka.system.actorOf(ResumeAttachActor.props, name = "resume-attach-actor")
logger.trace("resume attach actor")
Akka.system.actorOf(ResumeParseActor.props, name = "resume-parse-actor")
logger.trace("resume parse actor")
Akka.system.actorOf(ResumeServiceActor.props, name = "resume-service-actor")
logger.trace("resume service actor")
Akka.system.actorOf(ParseCOMWebHookActor.props, name= "parse-com-webhook-actor")
logger.trace("parse.com web hook actor")
Akka.system.actorOf(TriggerActor.props, name = "trigger-actor")
logger.trace("trigger actor")
Akka.system.actorOf(AccountServiceActor.props, name = "account-service-actor")
logger.trace("account service actor")
val contextIOActor = Akka.system.actorOf(ContextIOActor.props, name = "contextio-actor")
logger.trace("contextio actor")
val contextIOThrottler = Akka.system.actorOf(Props(classOf[TimerBasedThrottler], contextio_throttle_times msgsPer contextio_throttle_duration.second), name = "context-io-throttler")
contextIOThrottler ! SetTarget(Some(contextIOActor))
logger.trace("throttler actor")
val schedulerSyncActor = Akka.system.actorOf(SchedulerSyncActor.props, name = "scheduler-sync-actor")
logger.trace("scheduler sync actor")
logger.info("---------------------------------done")
logger.info(" ")
Comments out the sync logic after having meeting with context.io
logger.info("init the schedule jobs-----------")
Akka.system.scheduler.schedule(
0 seconds,
contextio_sync_interval seconds,
schedulerSyncActor,
"fire-in-the-hole"
)
logger.info("schedulerSyncActor executing every " + contextio_sync_interval + " seconds")
logger.info("---------------------------------done")
}
}
Usually, I can put the codes to start up the actors there. Also the scheduler of Actors is also there. Then every time, the scheduler will call the actors.
2 Timer and Schedule for Methods
package actors
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.{TimerTask, Timer}
import akka.actor.{Props, Actor}
import akka.contrib.throttle.Throttler.SetTarget
import com.google.common.cache.CacheBuilder
import com.sillycat.util.{IncludeConfig, IncludeLogger}
import models.messages.webhooks.{SyncMessage, WebhookMessage}
import models._
import models.messages.jobs.JobScanMessage
import models.messages.resumes.ResumeScanMessage
import org.joda.time.DateTime
import play.api.libs.json.{Writes, Reads, Json}
import play.api.libs.ws.{WSResponse, WS}
import play.api.Play.current
import utils.IncludeWSConfig
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import concurrent.duration._
import scala.collection._
import scala.collection.convert.decorateAsScala._
import java.util.concurrent.ConcurrentHashMap
object TriggerActor {
def props = Props[TriggerActor]
}
class TriggerActor extends Actor with IncludeLogger with IncludeWSConfig {
implicit val jodaDateReads = Reads.jodaDateReads("yyyy-MM-dd HH:mm:ss")
implicit val jodaDateWrites = Writes.jodaDateWrites("yyyy-MM-dd HH:mm:ss")
implicit val contextSyncStatusResponseWrites = Json.writes[ContextSyncStatusResponse]
implicit val contextSyncStatusResponseReads = Json.reads[ContextSyncStatusResponse]
val contextIOthrottler = context.actorSelection("/user/context-io-throttler")
val accountServiceActor = context.actorSelection("/user/account-service-actor")
val maxDwellInMinutes = 120
val maxRetries = 100
val TASK_LIST_KEY = "TASK_SYNC_LIST"
val MAX_RETRY_LIST_KEY = "TASK_RETRY_MAX_LIST"
val timer = new Timer(true)
timer.schedule(new TimerTask {
def run() = {
checkSyncBatch()
}
}, 0, 120000) // deplay - 0 milliseconds, period 2 * 60 * 1000 milliseconds
val builder = CacheBuilder.newBuilder().expireAfterWrite(maxDwellInMinutes, TimeUnit.MINUTES)
val taskBucket = builder.build[java.lang.String, ConcurrentHashMap[String,ScanTriggerRequest]]()
val retriesBucket = builder.build[java.lang.String, ConcurrentHashMap[String,Int]]()
val initMap = new ConcurrentHashMap[String,ScanTriggerRequest]()
val retriesMap = new ConcurrentHashMap[String, Int]()
taskBucket.put(TASK_LIST_KEY, initMap)
retriesBucket.put(MAX_RETRY_LIST_KEY, retriesMap)
def receive = {
…snip...
}
def checkSyncBatch() = {
logger.info("Checking the sync status every period time..." + DateTime.now.toString(default_date_time_format))
//fetch the list from cache
val taskMap = taskBucket.getIfPresent(TASK_LIST_KEY)
if(taskMap != null && !taskMap.isEmpty){
//check sync
val removeTasks = taskMap.asScala.filter { case(key:String, msg:ScanTriggerRequest) =>
val dateBefore = handleDateBefore(msg.dateBefore)
checkSync(msg.accountCode, dateBefore)
}
//remove the synced task
removeTasks.foreach { case(key:String, v:ScanTriggerRequest) =>
//remove
taskMap.remove(key)
val msg = v
val dateBefore = handleDateBefore(msg.dateBefore)
val dateAfter = handleDateAfter(msg.dateAfter)
//fireMessage
contextIOthrottler ! JobScanMessage(v.accountCode,dateBefore, dateAfter,default_paging_num_of_item, 0)
contextIOthrottler ! ResumeScanMessage(v.accountCode, dateBefore, dateAfter, default_paging_num_of_item, 0)
}
//remove all the max task
val maxMap = retriesBucket.getIfPresent(MAX_RETRY_LIST_KEY)
if(maxMap != null && !maxMap.isEmpty){
val removeMax = maxMap.asScala.filter { case(key:String, count: Int)=>
count > maxRetries
}
removeMax.foreach{ case (removeKey:String, count: Int) =>
taskMap.remove(removeKey)
maxMap.remove(removeKey)
}
}
retriesBucket.put(MAX_RETRY_LIST_KEY, maxMap)
taskBucket.put(TASK_LIST_KEY, taskMap)
}
}
…snip...
}
References:
http://sillycat.iteye.com/
1 Global Setting Up and Akka Scheduler for Actors
I create a scala application here Global.scala
import actors._
import actors.jobs.{JobServiceActor, JobParseActor}
import actors.resumes.{ResumeServiceActor, ResumeParseActor, ResumeAttachActor}
import actors.jobs.{JobServiceActor, JobParseActor}
import actors.resumes.{ResumeServiceActor, ResumeParseActor, ResumeAttachActor}
import akka.actor.Props
import com.sillycat.dao.{IpplyJobDAO, DAO}
import com.sillycat.util.IncludeLogger
import play.api.libs.concurrent.Akka
import play.api.libs.json.{JsError, Json}
import play.api.mvc.{BodyParsers, Action, Controller}
import play.api.Play.current
import akka.contrib.throttle.TimerBasedThrottler
import akka.contrib.throttle.Throttler._
import utils.IncludeWSConfig
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object Global extends play.api.GlobalSettings with IncludeLogger with IncludeWSConfig{
override def onStart(app: play.api.Application) {
logger.info("Start Email Scan Application")
logger.info("init all the actors -------------")
Akka.system.actorOf(JobParseActor.props, name = "job-parse-actor")
logger.trace("job parse actor")
Akka.system.actorOf(JobServiceActor.props, name = "job-service-actor")
logger.trace("job service actor")
Akka.system.actorOf(ResumeAttachActor.props, name = "resume-attach-actor")
logger.trace("resume attach actor")
Akka.system.actorOf(ResumeParseActor.props, name = "resume-parse-actor")
logger.trace("resume parse actor")
Akka.system.actorOf(ResumeServiceActor.props, name = "resume-service-actor")
logger.trace("resume service actor")
Akka.system.actorOf(ParseCOMWebHookActor.props, name= "parse-com-webhook-actor")
logger.trace("parse.com web hook actor")
Akka.system.actorOf(TriggerActor.props, name = "trigger-actor")
logger.trace("trigger actor")
Akka.system.actorOf(AccountServiceActor.props, name = "account-service-actor")
logger.trace("account service actor")
val contextIOActor = Akka.system.actorOf(ContextIOActor.props, name = "contextio-actor")
logger.trace("contextio actor")
val contextIOThrottler = Akka.system.actorOf(Props(classOf[TimerBasedThrottler], contextio_throttle_times msgsPer contextio_throttle_duration.second), name = "context-io-throttler")
contextIOThrottler ! SetTarget(Some(contextIOActor))
logger.trace("throttler actor")
val schedulerSyncActor = Akka.system.actorOf(SchedulerSyncActor.props, name = "scheduler-sync-actor")
logger.trace("scheduler sync actor")
logger.info("---------------------------------done")
logger.info(" ")
Comments out the sync logic after having meeting with context.io
logger.info("init the schedule jobs-----------")
Akka.system.scheduler.schedule(
0 seconds,
contextio_sync_interval seconds,
schedulerSyncActor,
"fire-in-the-hole"
)
logger.info("schedulerSyncActor executing every " + contextio_sync_interval + " seconds")
logger.info("---------------------------------done")
}
}
Usually, I can put the codes to start up the actors there. Also the scheduler of Actors is also there. Then every time, the scheduler will call the actors.
2 Timer and Schedule for Methods
package actors
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.{TimerTask, Timer}
import akka.actor.{Props, Actor}
import akka.contrib.throttle.Throttler.SetTarget
import com.google.common.cache.CacheBuilder
import com.sillycat.util.{IncludeConfig, IncludeLogger}
import models.messages.webhooks.{SyncMessage, WebhookMessage}
import models._
import models.messages.jobs.JobScanMessage
import models.messages.resumes.ResumeScanMessage
import org.joda.time.DateTime
import play.api.libs.json.{Writes, Reads, Json}
import play.api.libs.ws.{WSResponse, WS}
import play.api.Play.current
import utils.IncludeWSConfig
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import concurrent.duration._
import scala.collection._
import scala.collection.convert.decorateAsScala._
import java.util.concurrent.ConcurrentHashMap
object TriggerActor {
def props = Props[TriggerActor]
}
class TriggerActor extends Actor with IncludeLogger with IncludeWSConfig {
implicit val jodaDateReads = Reads.jodaDateReads("yyyy-MM-dd HH:mm:ss")
implicit val jodaDateWrites = Writes.jodaDateWrites("yyyy-MM-dd HH:mm:ss")
implicit val contextSyncStatusResponseWrites = Json.writes[ContextSyncStatusResponse]
implicit val contextSyncStatusResponseReads = Json.reads[ContextSyncStatusResponse]
val contextIOthrottler = context.actorSelection("/user/context-io-throttler")
val accountServiceActor = context.actorSelection("/user/account-service-actor")
val maxDwellInMinutes = 120
val maxRetries = 100
val TASK_LIST_KEY = "TASK_SYNC_LIST"
val MAX_RETRY_LIST_KEY = "TASK_RETRY_MAX_LIST"
val timer = new Timer(true)
timer.schedule(new TimerTask {
def run() = {
checkSyncBatch()
}
}, 0, 120000) // deplay - 0 milliseconds, period 2 * 60 * 1000 milliseconds
val builder = CacheBuilder.newBuilder().expireAfterWrite(maxDwellInMinutes, TimeUnit.MINUTES)
val taskBucket = builder.build[java.lang.String, ConcurrentHashMap[String,ScanTriggerRequest]]()
val retriesBucket = builder.build[java.lang.String, ConcurrentHashMap[String,Int]]()
val initMap = new ConcurrentHashMap[String,ScanTriggerRequest]()
val retriesMap = new ConcurrentHashMap[String, Int]()
taskBucket.put(TASK_LIST_KEY, initMap)
retriesBucket.put(MAX_RETRY_LIST_KEY, retriesMap)
def receive = {
…snip...
}
def checkSyncBatch() = {
logger.info("Checking the sync status every period time..." + DateTime.now.toString(default_date_time_format))
//fetch the list from cache
val taskMap = taskBucket.getIfPresent(TASK_LIST_KEY)
if(taskMap != null && !taskMap.isEmpty){
//check sync
val removeTasks = taskMap.asScala.filter { case(key:String, msg:ScanTriggerRequest) =>
val dateBefore = handleDateBefore(msg.dateBefore)
checkSync(msg.accountCode, dateBefore)
}
//remove the synced task
removeTasks.foreach { case(key:String, v:ScanTriggerRequest) =>
//remove
taskMap.remove(key)
val msg = v
val dateBefore = handleDateBefore(msg.dateBefore)
val dateAfter = handleDateAfter(msg.dateAfter)
//fireMessage
contextIOthrottler ! JobScanMessage(v.accountCode,dateBefore, dateAfter,default_paging_num_of_item, 0)
contextIOthrottler ! ResumeScanMessage(v.accountCode, dateBefore, dateAfter, default_paging_num_of_item, 0)
}
//remove all the max task
val maxMap = retriesBucket.getIfPresent(MAX_RETRY_LIST_KEY)
if(maxMap != null && !maxMap.isEmpty){
val removeMax = maxMap.asScala.filter { case(key:String, count: Int)=>
count > maxRetries
}
removeMax.foreach{ case (removeKey:String, count: Int) =>
taskMap.remove(removeKey)
maxMap.remove(removeKey)
}
}
retriesBucket.put(MAX_RETRY_LIST_KEY, maxMap)
taskBucket.put(TASK_LIST_KEY, taskMap)
}
}
…snip...
}
References:
http://sillycat.iteye.com/
发表评论
-
Stop Update Here
2020-04-28 09:00 263I will stop update here, and mo ... -
NodeJS12 and Zlib
2020-04-01 07:44 433NodeJS12 and Zlib It works as ... -
Docker Swarm 2020(2)Docker Swarm and Portainer
2020-03-31 23:18 313Docker Swarm 2020(2)Docker Swar ... -
Docker Swarm 2020(1)Simply Install and Use Swarm
2020-03-31 07:58 323Docker Swarm 2020(1)Simply Inst ... -
Traefik 2020(1)Introduction and Installation
2020-03-29 13:52 294Traefik 2020(1)Introduction and ... -
Portainer 2020(4)Deploy Nginx and Others
2020-03-20 12:06 381Portainer 2020(4)Deploy Nginx a ... -
Private Registry 2020(1)No auth in registry Nginx AUTH for UI
2020-03-18 00:56 377Private Registry 2020(1)No auth ... -
Docker Compose 2020(1)Installation and Basic
2020-03-15 08:10 329Docker Compose 2020(1)Installat ... -
VPN Server 2020(2)Docker on CentOS in Ubuntu
2020-03-02 08:04 401VPN Server 2020(2)Docker on Cen ... -
Buffer in NodeJS 12 and NodeJS 8
2020-02-25 06:43 337Buffer in NodeJS 12 and NodeJS ... -
NodeJS ENV Similar to JENV and PyENV
2020-02-25 05:14 419NodeJS ENV Similar to JENV and ... -
Prometheus HA 2020(3)AlertManager Cluster
2020-02-24 01:47 363Prometheus HA 2020(3)AlertManag ... -
Serverless with NodeJS and TencentCloud 2020(5)CRON and Settings
2020-02-24 01:46 293Serverless with NodeJS and Tenc ... -
GraphQL 2019(3)Connect to MySQL
2020-02-24 01:48 209GraphQL 2019(3)Connect to MySQL ... -
GraphQL 2019(2)GraphQL and Deploy to Tencent Cloud
2020-02-24 01:48 392GraphQL 2019(2)GraphQL and Depl ... -
GraphQL 2019(1)Apollo Basic
2020-02-19 01:36 277GraphQL 2019(1)Apollo Basic Cl ... -
Serverless with NodeJS and TencentCloud 2020(4)Multiple Handlers and Running wit
2020-02-19 01:19 265Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(3)Build Tree and Traverse Tree
2020-02-19 01:19 262Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(2)Trigger SCF in SCF
2020-02-19 01:18 253Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(1)Running with Component
2020-02-19 01:17 238Serverless with NodeJS and Tenc ...
相关推荐
Scheduling Date and time
ICE_Timer and TimerTask相关
资源分类:Python库 所属语言:Python 资源全名:robotframework-timer-0.0.1.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
在.NET Framework里面提供了三种Timer ① System.Windows.Forms.Timer ② System.Timers.Timer ③ System.Threading.Timer 现分述如下: 一、System.Windows.Forms.Timer 1、基于Windows消息循环,用事件方式触发,...
C语言02-Timer0-Timer1-Timer2-Timer3-Timer4测试程序(STC32G-DEMO-CODE-220311kw)C语言02-Timer0-Timer1-Timer2-Timer3-Timer4测试程序(STC32G-DEMO-CODE-220311kw)C语言02-Timer0-Timer1-Timer2-Timer3-Timer4...
最新单片机仿真 TIMER0与TIMER1控制条形LED最新单片机仿真 TIMER0与TIMER1控制条形LED最新单片机仿真 TIMER0与TIMER1控制条形LED最新单片机仿真 TIMER0与TIMER1控制条形LED最新单片机仿真 TIMER0与TIMER1控制条形LED...
实现了spring配置比较流行的任务调度操作,java timer 和 quartz俩种方式 并且有注释描述。
单片机C语言程序设计 TIMER0与TIMER1控制条形LED(有源码)单片机C语言程序设计 TIMER0与TIMER1控制条形LED(有源码)单片机C语言程序设计 TIMER0与TIMER1控制条形LED(有源码)单片机C语言程序设计 TIMER0与TIMER1控制...
AddinTimer是手机上的Task Scheduler,可以自动控制各种不同功能,同时具有多种不同的定时方式,几乎可以满足所有场景的定时要求,并且这些定时器可以分类管理,使你摆脱日常设置繁琐的烦恼,AddinTimer简单你的生活...
基于Verilog的timer计时器,start开始,到达设置计时点时输出一个高电平up信号
单片机C语言程序设计31 TIMER0与TIMER1控制条形LED(基于8051+Proteus仿真)单片机C语言程序设计31 TIMER0与TIMER1控制条形LED(基于8051+Proteus仿真)单片机C语言程序设计31 TIMER0与TIMER1控制条形LED(基于8051+...
播放计时器这是一个比较简单的 Meteor 程序,可以跟踪计算机时间。 它通过保存在一个选项卡上的计时器跟踪用户的时间。 它还在主页上显示其所有记录,并带有用户、日期和计算机活动类型的限制器。...
timer and capture 4 channel and pwm
ajax_timer ajax_timer ajax_timer ajax_timer
学习32位单片机的基础例程,主要针对初学者对TIMER的编程学习
addintimer高级版是一个万能定时器,非常好用,强烈推荐,可以当时间定时器来用,还有整点报时,定时录音等功能
加Timer控件 timer1 编写其Tick事件为 private void timer1_Tick(object sender, EventArgs e) { this.toolStripStatusLabel3.Text = "系统当前时间:" + DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"); } ...
对POSIX timer的面向对象的封装。
Timer Input Capture using stm8
synchronization and volatility, waiting and notification, and the additional capabilities of thread groups, thread local variables, and the Timer Framework. In Part 2, you learn about concurrency ...