分类目录归档:Java

java

Kafka中时间轮分析与Java实现

  在Kafka中应用了大量的延迟操作但在Kafka中 并没用使用JDK自带的Timer或是DelayQueue用于延迟操作,而是使用自己开发的DelayedOperationPurgatory组件用于管理延迟操作,Kafka这类分布式框架有大量延迟操作并且对性能要求及其高,而java.util.Timer与java.util.concurrent.DelayQueue的插入和删除时间复杂度都为对数阶O(log n)并不能满足Kafka性能要求,所以Kafka实现了基于时间轮的定时任务组件,该时间轮定时任务实现的插入与删除(开始定时器与暂停定时器)的时间复杂度都为常数阶O(1)
  时间轮的应用并不少见,在Netty、akka、Quartz、Zookeeper等高性能组件中都存在时间轮定时器的踪影;

时间轮数据结构

enter image description here

时间轮名词解释:

  时间格:环形结构中用于存放延迟任务的区块;
  指针(CurrentTime):指向当前操作的时间格,代表当前时间
  格数(ticksPerWheel):为时间轮中时间格的个数
  间隔(tickDuration):每个时间格之间的间隔
  总间隔(interval):当前时间轮总间隔,也就是等于ticksPerWheel*tickDuration

  TimingWheel并非简单的环形时间轮,而是多层级时间轮,每个时间轮由多个时间格组成,每个时间格为一个时间间隔,底层的时间格跨度较小,然后随着延迟任务延迟时间的长短逐层变大;如上图,底下的时间轮每个时间格为1ms,整个时间轮为10ms,而上面一层的时间轮中时间格为10ms,整个时间轮为100ms;
  时间轮添加上级时间轮的规则为:当前currentTime为上级时间轮的startMs,当前interval为上级时间轮的tickDuration,每层ticksPerWheel相同;简单点说就是上层时间轮跨度为当前的M倍,时间格为当前的N倍;

Kafka中时间轮的实现

  Kafka中时间轮时间类为TimingWheel,该类结构为存储定时任务的环形队列,内部使用数组实现,数组是用于存放TimerTaskList对象,TimerTaskList环形双向链表,链表项TimerTaskEntry封装了定时任务TimerTask,TimerTaskList与TimerTaskEntry中均有超时时间字段,TimerTask中delayMs字段用于记录任务延迟时间;该三个类为Kafka时间轮实现的核心;
  TimingWheel:表示一个时间轮,通常会有多层时间轮也就存在多个TimingWheel对象;
  TimerTaskList:为数组对象用于存放延迟任务,一个TimerTaskList就代表一个时间格,一个时间格中能保存的任务到期时间只可在[t~t+10ms]区间(t为时间格到期时间,10ms时间格间格),每个时间格有个过期时间,时间格过期后时间格中的任务将向前移动存入前面时间格中;
  TimerTask:表示延迟任务;
  SystemTimer:kafka实现的定时器,内部封装了TimningWheel用于执行、管理定时任务;

enter image description here

  下面通过一个示例来介绍kafka时间轮的工作过程:

  时间轮初始化:初始时间轮中的格数、间隔、指针的初始化时间,创建时间格所对应的buckets数组,计算总间隔interval;
  添加延迟任务:判断该任务是否已被取消、是否已经过期如已过期则把任务放入线程池中执行、根据时间轮总间隔与当前时间判断任务是否可存入当前层级时间轮否则添加上层时间轮并再次尝试往时间轮中添加该任务;

  时间轮降级:有一个定时任务再300ms后将执行,现层级时间轮每层有10个时间格,顶层时间轮的时间格间隔为1ms,整个时间轮为10ms,无法存下该任务。这时创建第二层时间轮,时间格间隔为10ms,整个时间轮为100ms,还是无法存该任务。接着创建第三层时间轮,时间格间隔为100ms,整个时间轮为1000ms,此时任务存入第三层时间轮的第三个时间格中;过了段时间,TimerTaskList到期(时间格)可该任务还有90ms,还无法执行。此时将再次把定时任务添加到时间轮中,顶层时间轮还是无法满足存入条件,往第二层时间轮添加,这时定时任务存入第二层时间轮第九个时间格当中;任务在时间轮中如此反复,直到任务过期时将放入线程池中执行;

关键实现方法

 public boolean add(TaskEntry e) {
    synchronized (this) {
        long expiration = e.getExpirationMs(); 
        if(expiration<(currentTime+tickDuration)){
            //当前任务过期时间
            LOGGER.info("当前任务已过期");
            return false;
        }else if(expiration<(currentTime+interval)) {
            //查找时间格的位置,过期时间/时间格%时间轮大小
            long virtualId = expiration / tickDuration;
             TaskEntryList taskEntryList = buckets.get((int) (virtualId % ticksPerWheel));
            taskEntryList.add(e); 
            //设置EntryList过期时间
            if(taskEntryList.setTime(virtualId * tickDuration)) { 
                listDelayQueue.offer(taskEntryList);

            }
            return true;
        }else{
            if(overflowWheel==null){ 
 // 添加上级timingWheel
                addOverflowWheel();
            }
            return overflowWheel.add(e);

         }
      }
 }  

 /**
 *时间表针移动
 * @param timeMS
  */
 public void advanceClock(long timeMS){
    if(timeMS>=(currentTime+tickDuration)){
        currentTime=timeMS-(timeMS%tickDuration);
    }
    if (overflowWheel != null) overflowWheel.advanceClock(currentTime);
}

/**
 * 添加定时任务
 * @param taskEntry
 */
public void add(TaskEntry taskEntry) {
    if (!timingWheel.add(taskEntry)) {
        System.out.println(String.format("任务已过期,开始执行 %s",taskEntry.getTimerTask()));
        taskExecutor.execute(taskEntry.getTimerTask());
    }
}

Java中的RASP实现

RSAP

  RASP是Gartner公司提出的一个概念,称:程序不应该依赖于外部组件进行运行时保护,而应该自身拥有运行时环境保护机制;
  RASP就是运行时应用自我保护(Runtime application self-protection)的缩写,正如RASP字面意思一样,这是运行在运行时的一种防护技能;也就是说RASP能够在程序运行期间实施自我保护,监控与过滤有害信息,还能够拥结合程序的当前上下文实施精确、实时的防护;

Java中的RASP

  不严格来说Java是半编译、半解释型语言,我们也都知道Java中也有运行时(Runtime)那Java的运行时在哪呢?
  不急,我们先看看Java从编译到运行的流程图;

  enter image description here

  上图的流程为:Java编译程序如Javac编译.java源码文件,生成Java字节码文件.class,接着.class文件进入JVM中解释执行; 从中我们可以看到Java的最后执行阶段是在JVM中,也就可以说Runtime运行时是JVM的重要组成部分;除此之外我们还发现Java中实现RASP的几个关键点:
    1、 我们的防护程序必须能够分析、修改java的.class文件;
    2、 必须在JVM解释执行.class文件时进行注入(Java Runtime);
  通过上面的分析我们知道了要实现Java的RASP所要具备的能力,然后我们发现在Java中有Javassist、与ASM可以实现对Java字节码的修改;有了修改.class字节码文件的技能,还需要能够在Java运行期间注入我们的防护程序,通过上面我们发现Java运行时是发生在JVM中,通过查找相关资料与JVM参数发现在JVM参数中有-javaagent参数配置Java代理可以在 运行时注入我们的防护程序;

Java RASP实现

  在上面的分析中我们发现只要在JVM的-javaagent参数 中配置我们的保护程序,就能够轻松实现Java的RASP;
  Java代理程序入口类需要有名为premain的静态方法 ,还需要在jar的META-INF/MAINIFEST.MF文件中包含 Premain-Class配置,下面是RASP保护程序的入口类;
  JavaRASPApp:

/**
 * @author linx
 * @date 2017-06-25
*/
public class JavaRASPApp {

  public static void premain(String agentArgs, Instrumentation instru) throws ClassNotFoundException, UnmodifiableClassException {
    System.out.println("premain");
    instru.addTransformer(new ClassTransformer());
  }

}

  ClassTransformer类实现了Java的代理程序机制提供的ClassFileTransformer接口 ,能够在运行时(Runtime)对类的字节码进行替换与修改;
  ClassTransformer也很简单,只有一个实现方法:transform,此方法中可以获取得到ClassLoader、className、classfileBuffer等,分别为类加载器、类名、字节码
  此时我们可以在transform方法中做文章,实现我们的防护程序;

/**
 * @author linxin
 * @version v1.0
 *          Copyright (c) 2017 by linx
 * @date 2017/6/23.
*/
 public class ClassTransformer implements ClassFileTransformer {
 public byte[] transform(ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer){

    byte[] transformeredByteCode = classfileBuffer;
    try {

        if (className.equals("co/solinx/demo/Test")) {
            System.out.println(String.format("transform start %s",className));
            ClassReader reader = new ClassReader(classfileBuffer);
            ClassWriter writer = new ClassWriter(ClassWriter.COMPUTE_MAXS);
            ClassVisitor classVisitor = (ClassVisitor) createVisitorIns("co.solinx.demo.visitor.TestVisitor", writer, className);
            reader.accept(classVisitor, ClassReader.EXPAND_FRAMES);
            transformeredByteCode = writer.toByteArray(); 
        }

    } catch (Exception e) {
        e.printStackTrace();
    }catch (Throwable t){
        t.printStackTrace();
    }
    return transformeredByteCode;
}
 public Object createVisitorIns(final String name, ClassVisitor cv, String className)
        throws NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException, ClassNotFoundException {
    Constructor<?> ctor = Class.forName(name).getDeclaredConstructor(new Class[]{ClassVisitor.class, String.class});
    ctor.setAccessible(true);
    return  ctor.newInstance(new Object[]{cv, className});
 }
}

  可以看到我们在transform方法中co/solinx/demo/Test类进行拦截,并通过ASM修改字节码注入我们的保护逻辑,下面代码是TestVisitorAdapter类中的onMethodEnter方法实现了通过ASM调用拦截器,抛出异常的字节码;

@Override
protected void onMethodEnter() { 

    mv.visitTypeInsn(NEW,"co/solinx/demo/filter/SqlFilter");
    mv.visitInsn(DUP);
    mv.visitMethodInsn(INVOKESPECIAL,"co/solinx/demo/filter/SqlFilter","<init>","()V",false);
    mv.visitVarInsn(ASTORE,2);
    mv.visitVarInsn(ALOAD,2);
    mv.visitVarInsn(ALOAD,1);
    mv.visitMethodInsn(INVOKEVIRTUAL,"co/solinx/demo/filter/SqlFilter", "filter","(Ljava/lang/Object;)Z",false);

    Label label = new Label();
    /**
     * IFEQ filter返回值也就是栈顶int型数值等于true时跳转,抛出异常
     */
    mv.visitJumpInsn(IFEQ, label);  
    mv.visitTypeInsn(NEW, "java/sql/SQLException");
    mv.visitInsn(DUP);
    mv.visitLdcInsn("invalid sql because of security check");
    mv.visitMethodInsn(INVOKESPECIAL, "java/sql/SQLException", "<init>", "(Ljava/lang/String;)V", false);
    mv.visitInsn(ATHROW);
    mv.visitLabel(label);
    /**
     * 必须要调该方法,手动设置Stack Map Table,否则会有 java.lang.VerifyError: Expecting a stackmap frame at branch target 26异常
     * 在jdk1.7中可以使用JVM参数-UseSplitVerifier,关掉class验证,但jdk1.8中该参数已经去掉,所以要在1.8中运行必须调用该方法;
     */
    mv.visitFrame(Opcodes.F_SAME, 0, null, 0, null);
 } 

  SqlFilter拦截类:

public class SqlFilter {
public boolean filter(Object sql){
    boolean ret=false;
    System.out.println(String.format("sql filter : %s ",sql));
    if(sql.toString().contains("1=1")){
        ret=true;
    }
    return ret;
}
}

  TestVisitorAdapter类中的onMethodEnter方法中通过调用filter拦截器,返回true就是被拦截了,抛出异常,否则放行;至此一个简单的JAVA RASP demo就完成了; 通过后面的方式即可使用我们的RASP程序:java -javaagent:respjar-1.0-SNAPSHOT.jar app.jar;
  通过RASP可以通过无嵌入、无需修改代码的实现安全保护,在RASP中可以拦截SQL、会话、有害请求、OGNL等等信息;
  Demo源码:https://github.com/linxin26/javarespdemo