分类目录归档:并发模型

Akka初步介绍

  Akka可能很多人都没有用过,也不知道是什么,但如果说起Scala或Spark就有很多人都听说过或使用过 ,这里简单说下三者的关系Akka是使用Scala开发的,Spark中使用了Akka作为其消息的通信工具;这篇文章主要 说说Akka的一些特性,做个简要的介绍;
  要说Akka首先要从并发开始说起,我记得之前我也写过并发模型相关的文章,并发模型主要有这么三类:
    1、共享内存模型
    2、Actor模型
    3、CSP模型
  共享内存模型:是通过使用线程与锁对共享内存进行控制用于实现并发,它依赖于多线程、锁,会产生过多的线程又使用锁对竞态资源进行同步控制,所以性能不是太高,对编程也不太友好;
  Actor模型:使用Actor作为并发的基础多个任务之间通过Actor相互发送消息进行通讯,Actor比线程轻量得多,一组Actor使用一个或多个线程所以也就没有线程、锁相关影响性能的问题存在,对编程也很友好;
  CSP模型Communicating Sequential Process):为多个进程提供了channel,并发的任务存放于channel当中,Golang的goroutine也是用了类似CSP模式的并发,并在channel中多加一个缓存;
  Actor与CSP模型都提倡:要通过通讯来共享内存,不要通过共享内存来通讯,这个可以说是他们与共享内存模型最大的区别;
  介绍了相关的基本概念,接下来说说今天的主题:Akka
  
  简单来说Akka就是基于Actor模型实现的并发框架;Akka降低了编写具有容错性、可扩展的并发程序的难度,容错性方面采用了“let it crach(让它崩溃)模型”;Akka为垂直扩展(并发)、水平扩展(远程调用)、 高容错提供了一致的编程模型;Akka具有以下几种特性:

  Actors:Actor为并发程序提供了简单高级别的抽象,为异步、非阻塞、高性能的事件驱动模型,1G内存可以容纳数百万个Actor;
  容错性:使用“let it crach”作为其监控层次体系的核心,监控层次可跨越JVM,使编写出“永不停机”、“自愈和”的高容错系统的难度大大降低;
  位置透明:Akka中所有元素都是为了适应分布式而设计的,Actor之间只能通过发送消息进行通讯所有操作均是异步进行的,不管是本地Actor还是远程Actor通信方式、操作都是一致的;
  持久性:Actor的状态、收到的消息可被持久化,并可在Actor启动或重启时恢复状态与消息,不管是JVM崩溃或是节点迁移都适用;

  下面通过一个Akka程序,然后结束本篇文章;

/**
* Created by linx on 2016-06-26.
*/
class Greeter extends Actor {

  override def receive: Receive = {
    case "greet" =>
      println("hello world")
      val hello = context.actorOf(Props[HelloWorld], "hello")
      hello ! "done"
  }
}
/**
  * Created by linx on 2016-06-26.
*/
class HelloWorld extends Actor {

  override def receive: Receive = {
    case "done" =>
      println("done")
      context.system.shutdown()
  }
}
/**
* Created by linx on 2016-06-26.
*/
object Main {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("hello")
    val greeter = system.actorOf(Props[Greeter], "greeter")
    greeter ! "greet"
  }
}

  程序先创建一个Greeter Actor然后往该Actor发送“greet”字符串,Greeter Actor收到后打印Hello World,然后创建HelloWorld Actor并发送done,HelloWorld结束整个程序;
程序运行结果:
Akka Hello

参考资料:
http://doc.akka.io/docs/akka/2.4.7/scala.html
http://www.solinx.co/archives/464

并发编程之可变状态

  熟悉Java或如C#等使用共享内存模型作为并发实现的人都比较清楚,编写线程安全的代码很关键的一点就是要控制好可变状态,对于Java开发者来说可能用内存可见性更容易理解,在各种关于并发的书籍中都是处理好内存可见性问题编写线程安全的代码就成功了一半了,但我认为“内存可见性”太过于抽象、底层,使开发者不容易理解;
  多线程之间通过共享内存进行通讯这句话可能很多人都比较清楚,我认为也可以这么说多线程间通过共享可变状态进行通讯,本篇文章讨论的是命令式编程并发中的可变状态与为什么函数式编程更容易写出并发程序;

可变状态与不可变状态

从字面上理解,状态:某一事务所处的状况;可变:可以变化的;
那么可变状态可以理解成事务的状况是可以变化的,如从固态到液态或到气态;

可变状态
那么在程序中可变状态是怎样的呢,请阅读下面代码:

public class VariableState { 
private int variableInterval=5; 
public int  increment(int x){
   variableInterval=x+variableInterval;
   return variableInterval;
} 

public static void main(String[] args) { 
    VariableState variable=new VariableState();
    variable.increment(5);          //print 10
    //variable.variableInterval=6; 
    variable.increment(5);          //print 15 去掉注释时 print 11
 }
}

在这段代码中函数increment的输出结果会随着可变状态variableInterval的变化而变化;

不可变状态
有可变的就会有不可变的,继续看不可变状态在代码中是怎样的:

public class InvariableState {
private final int invariableInterval=5; 
public int increment(int x){
    x=x+invariableInterval;
    return x;
 }
public static void main(String[] args){
    InvariableState invariable=new InvariableState();
    System.out.println(invariable.increment(5));   //print 10
    System.out.println(invariable.increment(5));   //print 10
 }
}  

这段代码中了invariableInterval就是不可变的状态,不管调多少次increment函数的输出结果都是一样的;虽然程序中是存在着可变和不可变状态,但是着又有什么关系呢?

  答案是如果你的程序只是在单线程中运行那么可变、不可变状态对你没有一点影响,但请注意如果你的程序是多线程程序(并发)那么该可变状态程序运行一定会出现异常结果(不是每次都会出现,也许运行100才会有5次异常);
拿刚刚上面有可变状态的代码来说,如果那段代码是在多线程中执行那么就会可能出现异常结果:

public static void main(String[] args) throws InterruptedException {
    VariableState variable=new VariableState();
    Thread [] runnables=new Thread[2];
    for (int i = 0; i < 2; i++) {
        final int finalI = i;
        runnables[i]=new Thread() {
            @Override
            public void run() {
                System.out.println(" i=" + finalI +"  "+variable.increment(5));
            }
        };
    }
    runnables[0].start();
    runnables[1].start();
    runnables[0].join();
    runnables[1].join();
}  

输出结果:
enter image description here enter image description here

  请看上面的示例,运行这段代码程序会输出两个结果,也就是说出现了异常情况,可能大家也都知道出现问题的原因在哪,异常时因为两个线程同时执行了variableInterval=x+variableInterval,一个线程进来执行了x+variableInterval还没有写回variableInterval另一个线程就进来执行x+variableInterval了,接着两个线程都把各自的结果写回到variableInterval中,所以就都是10;
  既然在多线程程序存在可变状态就可能会出现异常结果那我们该怎么处理呢?不急,请继续往下看;

在命令式语言中

在命令式编程语言中,如Java、C#等,像Python、Golang可以说是命令式与函数式混合型的,虽然Java、C#也都加入了Lambda表达式的支持向函数式编程靠拢,但毕竟他的主流还是命令式编程;
下面看看在Java中是如何处理可变状态在多线程中的异常情况的;

public synchronized int increment(int x) {
    variableInterval = x + variableInterval;
    return variableInterval;
}  

  还是刚刚那个示例,只是在方法上添加了synchronized关键字,相信很多Java都清楚这是什么意思,这指的是在increment函数上添加了一个对象锁,当一个线程进入该函数时必须获取该对象锁才能进入,每次只能一个线程进入线程退出后就会释放该锁。在Java中还可以把synchronized当代码块、ReentrantLock、Lock等或使用不可变状态来解决该问题;
  你可能会觉得这么简单的问题还需要谈论么,其实多线程与锁问题一点都不简单,只是这里的示例比较简单这里只是简单对象的可变状态,如果是个复杂的对象存在可变状态呢,如:DataParser或自己写的复杂对象;在Java中编写并发程序通常都会用到锁、原子变量、不可变变量、volatile等,可变状态是非常常见的等你使用锁解决后又会出现死锁问题,等解决了死锁还存子资源竞争又可能会出现性能问题,因为线程(Thread)、锁(Lock)用不好都会影响性能,这时候你还会觉得简单么;

在函数式语言中

  那么在函数式语言中可变状态又是怎么处理呢?答案是你不用处理,因为在函数式语言中没有可变状态,不存在可变状态也就不会遇到可变状态带来的各种问题;
  这里使用同样是运行在JVM上的函数式语言Clojure来说明不可变状态,在Clojure中对象是不可变的没有可变状态也就不存在Java中的可变状态问题;
Java的可变状态示例:

int total=0;
public int sum(int[] numbsers){
    for(int n: numbers){
        total +=n;
    }
  return total;
}  

在上面的代码中total是状态可变的,在for循环的过程中不断的更新状态,接下来看Clojure中状态不可变实现方式:

(defn sum[numbers]
  (if (empty? numbers)
    0  
    (+ (first numbers) (sum(rest numbers))) 
  )
)
运行:    
user=> (sumfn[1,2,3,4])
10  

你可能会说这只是一个递归的实现在java中也能够实现,没错这只是递归,但Clojure还有更简单的实现:

(defn sum [numbers]  
(reduce + numbers))  

这够简单了吧,抛弃的可变状态而且代码更短了,实现并发的时候也不存在可变状态问题;
  这里也不是比较说哪种更好,在合适的地方使用合适的方法最好;命令式编程与函数式编程根本的区别在于:命令式编程代码使用一系列改变状态的语句组成,而函数式编程把数学函数作为第一类对象,将计算过程抽象为表达式求值表达式由纯数学函数构成;