1. 介绍

java自带的实现任务异步执行的API主要就是future和completableFuture。这里简单介绍下。

2. future

future是JAVA5就引入的。主要是创建一个线程池,然后提交Runnable或者Callable的任务。然后可以用get来获取返回结果(执行完毕才会返回)

优点:一定程度上让一个线程池内的任务异步执行了
缺点:回调无法放到与任务不同的线程中执行。传统回调最大的问题就是不能将控制流分离到不同的事件处理器中。例如主线程等待各个异步执行的线程返回的结果来做下一步操作,则必须阻塞在future.get()的地方等待结果返回。这时候又变成同步了。

示例代码:

/**

* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

3. CompletableFutures

CompletableFutures是JAVA8里面引入的。

这个API有Future的优点,并且弥补了其缺点。即异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept、thenApply、thenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。

可见,这种方式才是我们需要的异步处理。一个控制流的多个异步事件处理能无缝的连接在一起。

示例代码:

/**

* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**

* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    //下面就可以看到,f2这个异步事件处理无缝引用了f这个异步事件处理的结果。整个过程中间不需要像future.get()这样引入了不必要的同步阻塞
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

4. 异步非阻塞的JAVA库

针对JAVA的异步非阻塞,很多第三方利用netty、concurrent包(主要是ForkJoinPool)开发了一些JAVA的异步非阻塞框架。例如:

  1. vert.x
  2. quasar
  3. RxJava
  4. comsat(使用quasar分装了各种类库)

当然akka也算,只不过实现方式与以上的不同。

参考资料:

  1. Difference between completableFuture,Future and Observable Rxjava
  2. 使用Java 8的CompletableFuture实现函数式的回调