分类目录归档:Scala

Kafka中时间轮分析与Java实现

  在Kafka中应用了大量的延迟操作但在Kafka中 并没用使用JDK自带的Timer或是DelayQueue用于延迟操作,而是使用自己开发的DelayedOperationPurgatory组件用于管理延迟操作,Kafka这类分布式框架有大量延迟操作并且对性能要求及其高,而java.util.Timer与java.util.concurrent.DelayQueue的插入和删除时间复杂度都为对数阶O(log n)并不能满足Kafka性能要求,所以Kafka实现了基于时间轮的定时任务组件,该时间轮定时任务实现的插入与删除(开始定时器与暂停定时器)的时间复杂度都为常数阶O(1)
  时间轮的应用并不少见,在Netty、akka、Quartz、Zookeeper等高性能组件中都存在时间轮定时器的踪影;

时间轮数据结构

enter image description here

时间轮名词解释:

  时间格:环形结构中用于存放延迟任务的区块;
  指针(CurrentTime):指向当前操作的时间格,代表当前时间
  格数(ticksPerWheel):为时间轮中时间格的个数
  间隔(tickDuration):每个时间格之间的间隔
  总间隔(interval):当前时间轮总间隔,也就是等于ticksPerWheel*tickDuration

  TimingWheel并非简单的环形时间轮,而是多层级时间轮,每个时间轮由多个时间格组成,每个时间格为一个时间间隔,底层的时间格跨度较小,然后随着延迟任务延迟时间的长短逐层变大;如上图,底下的时间轮每个时间格为1ms,整个时间轮为10ms,而上面一层的时间轮中时间格为10ms,整个时间轮为100ms;
  时间轮添加上级时间轮的规则为:当前currentTime为上级时间轮的startMs,当前interval为上级时间轮的tickDuration,每层ticksPerWheel相同;简单点说就是上层时间轮跨度为当前的M倍,时间格为当前的N倍;

Kafka中时间轮的实现

  Kafka中时间轮时间类为TimingWheel,该类结构为存储定时任务的环形队列,内部使用数组实现,数组是用于存放TimerTaskList对象,TimerTaskList环形双向链表,链表项TimerTaskEntry封装了定时任务TimerTask,TimerTaskList与TimerTaskEntry中均有超时时间字段,TimerTask中delayMs字段用于记录任务延迟时间;该三个类为Kafka时间轮实现的核心;
  TimingWheel:表示一个时间轮,通常会有多层时间轮也就存在多个TimingWheel对象;
  TimerTaskList:为数组对象用于存放延迟任务,一个TimerTaskList就代表一个时间格,一个时间格中能保存的任务到期时间只可在[t~t+10ms]区间(t为时间格到期时间,10ms时间格间格),每个时间格有个过期时间,时间格过期后时间格中的任务将向前移动存入前面时间格中;
  TimerTask:表示延迟任务;
  SystemTimer:kafka实现的定时器,内部封装了TimningWheel用于执行、管理定时任务;

enter image description here

  下面通过一个示例来介绍kafka时间轮的工作过程:

  时间轮初始化:初始时间轮中的格数、间隔、指针的初始化时间,创建时间格所对应的buckets数组,计算总间隔interval;
  添加延迟任务:判断该任务是否已被取消、是否已经过期如已过期则把任务放入线程池中执行、根据时间轮总间隔与当前时间判断任务是否可存入当前层级时间轮否则添加上层时间轮并再次尝试往时间轮中添加该任务;

  时间轮降级:有一个定时任务再300ms后将执行,现层级时间轮每层有10个时间格,顶层时间轮的时间格间隔为1ms,整个时间轮为10ms,无法存下该任务。这时创建第二层时间轮,时间格间隔为10ms,整个时间轮为100ms,还是无法存该任务。接着创建第三层时间轮,时间格间隔为100ms,整个时间轮为1000ms,此时任务存入第三层时间轮的第三个时间格中;过了段时间,TimerTaskList到期(时间格)可该任务还有90ms,还无法执行。此时将再次把定时任务添加到时间轮中,顶层时间轮还是无法满足存入条件,往第二层时间轮添加,这时定时任务存入第二层时间轮第九个时间格当中;任务在时间轮中如此反复,直到任务过期时将放入线程池中执行;

关键实现方法

 public boolean add(TaskEntry e) {
    synchronized (this) {
        long expiration = e.getExpirationMs(); 
        if(expiration<(currentTime+tickDuration)){
            //当前任务过期时间
            LOGGER.info("当前任务已过期");
            return false;
        }else if(expiration<(currentTime+interval)) {
            //查找时间格的位置,过期时间/时间格%时间轮大小
            long virtualId = expiration / tickDuration;
             TaskEntryList taskEntryList = buckets.get((int) (virtualId % ticksPerWheel));
            taskEntryList.add(e); 
            //设置EntryList过期时间
            if(taskEntryList.setTime(virtualId * tickDuration)) { 
                listDelayQueue.offer(taskEntryList);

            }
            return true;
        }else{
            if(overflowWheel==null){ 
 // 添加上级timingWheel
                addOverflowWheel();
            }
            return overflowWheel.add(e);

         }
      }
 }  

 /**
 *时间表针移动
 * @param timeMS
  */
 public void advanceClock(long timeMS){
    if(timeMS>=(currentTime+tickDuration)){
        currentTime=timeMS-(timeMS%tickDuration);
    }
    if (overflowWheel != null) overflowWheel.advanceClock(currentTime);
}

/**
 * 添加定时任务
 * @param taskEntry
 */
public void add(TaskEntry taskEntry) {
    if (!timingWheel.add(taskEntry)) {
        System.out.println(String.format("任务已过期,开始执行 %s",taskEntry.getTimerTask()));
        taskExecutor.execute(taskEntry.getTimerTask());
    }
}

Akka初步介绍

  Akka可能很多人都没有用过,也不知道是什么,但如果说起Scala或Spark就有很多人都听说过或使用过 ,这里简单说下三者的关系Akka是使用Scala开发的,Spark中使用了Akka作为其消息的通信工具;这篇文章主要 说说Akka的一些特性,做个简要的介绍;
  要说Akka首先要从并发开始说起,我记得之前我也写过并发模型相关的文章,并发模型主要有这么三类:
    1、共享内存模型
    2、Actor模型
    3、CSP模型
  共享内存模型:是通过使用线程与锁对共享内存进行控制用于实现并发,它依赖于多线程、锁,会产生过多的线程又使用锁对竞态资源进行同步控制,所以性能不是太高,对编程也不太友好;
  Actor模型:使用Actor作为并发的基础多个任务之间通过Actor相互发送消息进行通讯,Actor比线程轻量得多,一组Actor使用一个或多个线程所以也就没有线程、锁相关影响性能的问题存在,对编程也很友好;
  CSP模型Communicating Sequential Process):为多个进程提供了channel,并发的任务存放于channel当中,Golang的goroutine也是用了类似CSP模式的并发,并在channel中多加一个缓存;
  Actor与CSP模型都提倡:要通过通讯来共享内存,不要通过共享内存来通讯,这个可以说是他们与共享内存模型最大的区别;
  介绍了相关的基本概念,接下来说说今天的主题:Akka
  
  简单来说Akka就是基于Actor模型实现的并发框架;Akka降低了编写具有容错性、可扩展的并发程序的难度,容错性方面采用了“let it crach(让它崩溃)模型”;Akka为垂直扩展(并发)、水平扩展(远程调用)、 高容错提供了一致的编程模型;Akka具有以下几种特性:

  Actors:Actor为并发程序提供了简单高级别的抽象,为异步、非阻塞、高性能的事件驱动模型,1G内存可以容纳数百万个Actor;
  容错性:使用“let it crach”作为其监控层次体系的核心,监控层次可跨越JVM,使编写出“永不停机”、“自愈和”的高容错系统的难度大大降低;
  位置透明:Akka中所有元素都是为了适应分布式而设计的,Actor之间只能通过发送消息进行通讯所有操作均是异步进行的,不管是本地Actor还是远程Actor通信方式、操作都是一致的;
  持久性:Actor的状态、收到的消息可被持久化,并可在Actor启动或重启时恢复状态与消息,不管是JVM崩溃或是节点迁移都适用;

  下面通过一个Akka程序,然后结束本篇文章;

/**
* Created by linx on 2016-06-26.
*/
class Greeter extends Actor {

  override def receive: Receive = {
    case "greet" =>
      println("hello world")
      val hello = context.actorOf(Props[HelloWorld], "hello")
      hello ! "done"
  }
}
/**
  * Created by linx on 2016-06-26.
*/
class HelloWorld extends Actor {

  override def receive: Receive = {
    case "done" =>
      println("done")
      context.system.shutdown()
  }
}
/**
* Created by linx on 2016-06-26.
*/
object Main {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("hello")
    val greeter = system.actorOf(Props[Greeter], "greeter")
    greeter ! "greet"
  }
}

  程序先创建一个Greeter Actor然后往该Actor发送“greet”字符串,Greeter Actor收到后打印Hello World,然后创建HelloWorld Actor并发送done,HelloWorld结束整个程序;
程序运行结果:
Akka Hello

参考资料:
http://doc.akka.io/docs/akka/2.4.7/scala.html
http://www.solinx.co/archives/464