标签归档:算法

基础算法——排序【一】

  排序可以说时最基础的算法之一,排序就是将数据按照某种逻辑重新排列的过程,比如从大到小排序、从小到大排序;排序非常常见比如有购物车物品的排序、历史订单的排序等等;算法我们比较关心的主要有两点:时间复杂度空间复杂度,排序算法一样;这篇文章只介绍几种基本的排序算法:冒泡排序插入排序选择排序

选择排序

  算法逻辑上分为已排序未排序两个区间,从当前列表中选择最小的元素与当前列表第一个元素交换位置,从列表剩余元素中找到最小元素与列表第二个元素交换位置如此重复执行的算法为选择排序算法;
  选择排序与数据的初始状态无关,每次查找最小元素值时都会与数组中元素进行交换位置,因此选择排序为非稳定排序算法,在各种情况下算法时间复杂度都为O(n2),空间复杂度为O(n);

 func SelectionSort(data [] int) {
     n := len(data)
     if n <= 1 {
         return
     }
     for i := 0; i < n; i++ {
         //最小元素索引
         min := i
         for j := i + 1; j < n; j++ {
             if data[j] < data[min] {
                 min = j
             }
         }
         //交换元素,交换后未已排序数据
         data[i], data[min] = data[min], data[i]
     }
 }

插入排序

  与选择排序类似在逻辑上列表分为已排序和未排序两个区间,初始时已排序区间未为左边第一个元素,插入排序的核心为从右边未排序区间中选择元素插入到左边合适的位置,直到已排序列表长度等于原始列表长度;插入排序算法的关键步骤为元素比较与元素的移动,当从未排序端拿数据插入到已排序端时已排序端可能会存在大量的数据移动;
  与选择排序不同的是列表初始状态对插入排序的性能影响很大,当列表初始状态有序度很高时插入排序并不需要大量移动数据,从中可以看出插入排序为原地排序算法、算法是稳定的
  算法最好情况下时间复杂度为:O(n)、平均、最坏复杂度为:O(n2);

 func InsertionSort(data [] int) {
          n := len(data)
          for i := 1; i < n; i++ {
              value := data[i]
              //查找插入位置
              var j int
              for j = i - 1; j >= 0; j-- {
                  if data[j] > value {
                      //数据后移
                      data[j+1] = data[j]
                  } else {
                      break
                  }
              }
              data[j+1] = value
          }
 }

冒泡排序

  每次比较相邻两个元素是否满足,否则交换相邻两个元素,每次冒泡至少一个元素移动到它所属位置,重复N次完成列表排序;
  冒泡操作核心为项链元素交换,空间复杂度为O(1),为原地排序算法;算法是稳定的,最好情况时间复杂度为为O(n),平均与最坏情况时间复杂度为:O(n2);

 func BubbleSort(data [] int) {
     n := len(data)
     for i := 0; i < n; i++ {
         //是否有数据交换,用于当某轮中数据已排序时退出循环
         flag := false
         for j := 0; j < n-1-i; j++ {
             if data[j] > data[j+1] {
                 //冒泡
                 data[j], data[j+1] = data[j+1], data[j]
                 flag = true
             }
         }
         if !flag {
             break
         }
     }
      }

基础排序算法特性

enter image description here

参考资料: 《算法四》

Kafka中时间轮分析与Java实现

  在Kafka中应用了大量的延迟操作但在Kafka中 并没用使用JDK自带的Timer或是DelayQueue用于延迟操作,而是使用自己开发的DelayedOperationPurgatory组件用于管理延迟操作,Kafka这类分布式框架有大量延迟操作并且对性能要求及其高,而java.util.Timer与java.util.concurrent.DelayQueue的插入和删除时间复杂度都为对数阶O(log n)并不能满足Kafka性能要求,所以Kafka实现了基于时间轮的定时任务组件,该时间轮定时任务实现的插入与删除(开始定时器与暂停定时器)的时间复杂度都为常数阶O(1)
  时间轮的应用并不少见,在Netty、akka、Quartz、Zookeeper等高性能组件中都存在时间轮定时器的踪影;

时间轮数据结构

enter image description here

时间轮名词解释:

  时间格:环形结构中用于存放延迟任务的区块;
  指针(CurrentTime):指向当前操作的时间格,代表当前时间
  格数(ticksPerWheel):为时间轮中时间格的个数
  间隔(tickDuration):每个时间格之间的间隔
  总间隔(interval):当前时间轮总间隔,也就是等于ticksPerWheel*tickDuration

  TimingWheel并非简单的环形时间轮,而是多层级时间轮,每个时间轮由多个时间格组成,每个时间格为一个时间间隔,底层的时间格跨度较小,然后随着延迟任务延迟时间的长短逐层变大;如上图,底下的时间轮每个时间格为1ms,整个时间轮为10ms,而上面一层的时间轮中时间格为10ms,整个时间轮为100ms;
  时间轮添加上级时间轮的规则为:当前currentTime为上级时间轮的startMs,当前interval为上级时间轮的tickDuration,每层ticksPerWheel相同;简单点说就是上层时间轮跨度为当前的M倍,时间格为当前的N倍;

Kafka中时间轮的实现

  Kafka中时间轮时间类为TimingWheel,该类结构为存储定时任务的环形队列,内部使用数组实现,数组是用于存放TimerTaskList对象,TimerTaskList环形双向链表,链表项TimerTaskEntry封装了定时任务TimerTask,TimerTaskList与TimerTaskEntry中均有超时时间字段,TimerTask中delayMs字段用于记录任务延迟时间;该三个类为Kafka时间轮实现的核心;
  TimingWheel:表示一个时间轮,通常会有多层时间轮也就存在多个TimingWheel对象;
  TimerTaskList:为数组对象用于存放延迟任务,一个TimerTaskList就代表一个时间格,一个时间格中能保存的任务到期时间只可在[t~t+10ms]区间(t为时间格到期时间,10ms时间格间格),每个时间格有个过期时间,时间格过期后时间格中的任务将向前移动存入前面时间格中;
  TimerTask:表示延迟任务;
  SystemTimer:kafka实现的定时器,内部封装了TimningWheel用于执行、管理定时任务;

enter image description here

  下面通过一个示例来介绍kafka时间轮的工作过程:

  时间轮初始化:初始时间轮中的格数、间隔、指针的初始化时间,创建时间格所对应的buckets数组,计算总间隔interval;
  添加延迟任务:判断该任务是否已被取消、是否已经过期如已过期则把任务放入线程池中执行、根据时间轮总间隔与当前时间判断任务是否可存入当前层级时间轮否则添加上层时间轮并再次尝试往时间轮中添加该任务;

  时间轮降级:有一个定时任务再300ms后将执行,现层级时间轮每层有10个时间格,顶层时间轮的时间格间隔为1ms,整个时间轮为10ms,无法存下该任务。这时创建第二层时间轮,时间格间隔为10ms,整个时间轮为100ms,还是无法存该任务。接着创建第三层时间轮,时间格间隔为100ms,整个时间轮为1000ms,此时任务存入第三层时间轮的第三个时间格中;过了段时间,TimerTaskList到期(时间格)可该任务还有90ms,还无法执行。此时将再次把定时任务添加到时间轮中,顶层时间轮还是无法满足存入条件,往第二层时间轮添加,这时定时任务存入第二层时间轮第九个时间格当中;任务在时间轮中如此反复,直到任务过期时将放入线程池中执行;

关键实现方法

 public boolean add(TaskEntry e) {
    synchronized (this) {
        long expiration = e.getExpirationMs(); 
        if(expiration<(currentTime+tickDuration)){
            //当前任务过期时间
            LOGGER.info("当前任务已过期");
            return false;
        }else if(expiration<(currentTime+interval)) {
            //查找时间格的位置,过期时间/时间格%时间轮大小
            long virtualId = expiration / tickDuration;
             TaskEntryList taskEntryList = buckets.get((int) (virtualId % ticksPerWheel));
            taskEntryList.add(e); 
            //设置EntryList过期时间
            if(taskEntryList.setTime(virtualId * tickDuration)) { 
                listDelayQueue.offer(taskEntryList);

            }
            return true;
        }else{
            if(overflowWheel==null){ 
 // 添加上级timingWheel
                addOverflowWheel();
            }
            return overflowWheel.add(e);

         }
      }
 }  

 /**
 *时间表针移动
 * @param timeMS
  */
 public void advanceClock(long timeMS){
    if(timeMS>=(currentTime+tickDuration)){
        currentTime=timeMS-(timeMS%tickDuration);
    }
    if (overflowWheel != null) overflowWheel.advanceClock(currentTime);
}

/**
 * 添加定时任务
 * @param taskEntry
 */
public void add(TaskEntry taskEntry) {
    if (!timingWheel.add(taskEntry)) {
        System.out.println(String.format("任务已过期,开始执行 %s",taskEntry.getTimerTask()));
        taskExecutor.execute(taskEntry.getTimerTask());
    }
}