`
sillycat
  • 浏览: 2487406 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Play framework Timer and Scheduler

 
阅读更多
Play framework Timer and Scheduler

1 Global Setting Up and Akka Scheduler for Actors
I create a scala application here Global.scala
import actors._
import actors.jobs.{JobServiceActor, JobParseActor}
import actors.resumes.{ResumeServiceActor, ResumeParseActor, ResumeAttachActor}
import actors.jobs.{JobServiceActor, JobParseActor}
import actors.resumes.{ResumeServiceActor, ResumeParseActor, ResumeAttachActor}
import akka.actor.Props
import com.sillycat.dao.{IpplyJobDAO, DAO}
import com.sillycat.util.IncludeLogger
import play.api.libs.concurrent.Akka
import play.api.libs.json.{JsError, Json}
import play.api.mvc.{BodyParsers, Action, Controller}
import play.api.Play.current
import akka.contrib.throttle.TimerBasedThrottler
import akka.contrib.throttle.Throttler._
import utils.IncludeWSConfig
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

object Global extends  play.api.GlobalSettings with IncludeLogger with IncludeWSConfig{

  override def onStart(app: play.api.Application) {
    logger.info("Start Email Scan Application")
    logger.info("init all the actors -------------")
    Akka.system.actorOf(JobParseActor.props, name = "job-parse-actor")
    logger.trace("job parse actor")
    Akka.system.actorOf(JobServiceActor.props, name = "job-service-actor")
    logger.trace("job service actor")
    Akka.system.actorOf(ResumeAttachActor.props, name = "resume-attach-actor")
    logger.trace("resume attach actor")
    Akka.system.actorOf(ResumeParseActor.props, name = "resume-parse-actor")
    logger.trace("resume parse actor")
    Akka.system.actorOf(ResumeServiceActor.props, name = "resume-service-actor")
    logger.trace("resume service actor")
    Akka.system.actorOf(ParseCOMWebHookActor.props, name= "parse-com-webhook-actor")
    logger.trace("parse.com web hook actor")
    Akka.system.actorOf(TriggerActor.props, name = "trigger-actor")
    logger.trace("trigger actor")
    Akka.system.actorOf(AccountServiceActor.props, name = "account-service-actor")
    logger.trace("account service actor")
    val contextIOActor = Akka.system.actorOf(ContextIOActor.props, name = "contextio-actor")
    logger.trace("contextio actor")
    val contextIOThrottler = Akka.system.actorOf(Props(classOf[TimerBasedThrottler], contextio_throttle_times msgsPer contextio_throttle_duration.second), name = "context-io-throttler")
    contextIOThrottler ! SetTarget(Some(contextIOActor))
    logger.trace("throttler actor")
    val schedulerSyncActor = Akka.system.actorOf(SchedulerSyncActor.props, name = "scheduler-sync-actor")
    logger.trace("scheduler sync actor")
    logger.info("---------------------------------done")
    logger.info(" ")

    Comments out the sync logic after having meeting with context.io
    logger.info("init the schedule jobs-----------")
    Akka.system.scheduler.schedule(
          0 seconds,
          contextio_sync_interval seconds,
          schedulerSyncActor,
          "fire-in-the-hole"
        )
    logger.info("schedulerSyncActor executing every " + contextio_sync_interval + " seconds")
    logger.info("---------------------------------done")
  }
}

Usually, I can put the codes to start up the actors there. Also the scheduler of Actors is also there. Then every time, the scheduler will call the actors.

2 Timer and Schedule for Methods
package actors

import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.{TimerTask, Timer}

import akka.actor.{Props, Actor}
import akka.contrib.throttle.Throttler.SetTarget
import com.google.common.cache.CacheBuilder
import com.sillycat.util.{IncludeConfig, IncludeLogger}
import models.messages.webhooks.{SyncMessage, WebhookMessage}
import models._
import models.messages.jobs.JobScanMessage
import models.messages.resumes.ResumeScanMessage
import org.joda.time.DateTime
import play.api.libs.json.{Writes, Reads, Json}
import play.api.libs.ws.{WSResponse, WS}
import play.api.Play.current
import utils.IncludeWSConfig
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import concurrent.duration._
import scala.collection._
import scala.collection.convert.decorateAsScala._
import java.util.concurrent.ConcurrentHashMap

object TriggerActor {
  def props = Props[TriggerActor]
}

class TriggerActor  extends Actor with IncludeLogger with IncludeWSConfig {
  implicit val jodaDateReads = Reads.jodaDateReads("yyyy-MM-dd HH:mm:ss")
  implicit val jodaDateWrites = Writes.jodaDateWrites("yyyy-MM-dd HH:mm:ss")

  implicit val contextSyncStatusResponseWrites = Json.writes[ContextSyncStatusResponse]
  implicit val contextSyncStatusResponseReads = Json.reads[ContextSyncStatusResponse]

  val contextIOthrottler = context.actorSelection("/user/context-io-throttler")
  val accountServiceActor = context.actorSelection("/user/account-service-actor")
  val maxDwellInMinutes = 120
  val maxRetries = 100
  val TASK_LIST_KEY = "TASK_SYNC_LIST"
  val MAX_RETRY_LIST_KEY = "TASK_RETRY_MAX_LIST"
  val timer = new Timer(true)
  timer.schedule(new TimerTask {
    def run() = {
      checkSyncBatch()
    }
  }, 0, 120000) // deplay - 0 milliseconds, period 2 * 60 * 1000 milliseconds

  val builder = CacheBuilder.newBuilder().expireAfterWrite(maxDwellInMinutes, TimeUnit.MINUTES)
  val taskBucket = builder.build[java.lang.String, ConcurrentHashMap[String,ScanTriggerRequest]]()
  val retriesBucket = builder.build[java.lang.String, ConcurrentHashMap[String,Int]]()
  val initMap = new ConcurrentHashMap[String,ScanTriggerRequest]()
  val retriesMap = new ConcurrentHashMap[String, Int]()
  taskBucket.put(TASK_LIST_KEY, initMap)
  retriesBucket.put(MAX_RETRY_LIST_KEY, retriesMap)

  def receive = {
     …snip...
}

  def checkSyncBatch() = {
    logger.info("Checking the sync status every period time..." + DateTime.now.toString(default_date_time_format))

    //fetch the list from cache
    val taskMap = taskBucket.getIfPresent(TASK_LIST_KEY)

    if(taskMap != null && !taskMap.isEmpty){
      //check sync
      val removeTasks = taskMap.asScala.filter { case(key:String, msg:ScanTriggerRequest) =>
        val dateBefore = handleDateBefore(msg.dateBefore)
        checkSync(msg.accountCode, dateBefore)
      }

      //remove the synced task
      removeTasks.foreach { case(key:String, v:ScanTriggerRequest) =>
        //remove
        taskMap.remove(key)
        val msg = v
        val dateBefore = handleDateBefore(msg.dateBefore)
        val dateAfter = handleDateAfter(msg.dateAfter)
        //fireMessage
        contextIOthrottler ! JobScanMessage(v.accountCode,dateBefore, dateAfter,default_paging_num_of_item, 0)
        contextIOthrottler ! ResumeScanMessage(v.accountCode, dateBefore, dateAfter, default_paging_num_of_item, 0)
      }
      //remove all the max task
      val maxMap = retriesBucket.getIfPresent(MAX_RETRY_LIST_KEY)
      if(maxMap != null && !maxMap.isEmpty){
        val removeMax = maxMap.asScala.filter { case(key:String, count: Int)=>
          count > maxRetries
        }
        removeMax.foreach{ case (removeKey:String, count: Int) =>
          taskMap.remove(removeKey)
          maxMap.remove(removeKey)
        }
      }
      retriesBucket.put(MAX_RETRY_LIST_KEY, maxMap)
      taskBucket.put(TASK_LIST_KEY, taskMap)
    }
  }
  …snip...
}

References:
http://sillycat.iteye.com/
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics