Java异步编程总结

Java异步编程总结

Scroll Down

从JDK1.5开始,Java就引入了对异步的支持,而Vert.x作为一个异步框架,也有着基于回调的异步模型,本文我们就来分析下JDK原生的异步调用和Vert.x的异步模型的区别

JDK原生Future

JDK原生的Future代表了一次异步调用的执行结果,在主线程提交一次异步任务时,JVM将会开辟一个新的线程去执行异步任务,并立即返回一个Future对象,因此不会阻塞主线程的执行,主线程可以通过Future对象的get()方法获取异步任务的执行结果,但如果在获取结果是异步任务仍未执行完,get()方法仍旧会引起阻塞直到异步任务执行完毕,虽然我们可以通过向get()方法传入超时时间来控制阻塞造成的影响,但从本质上说,如果异步任务所需的执行时间大于主线程从获取到异步任务的Future对象到执行到需要获取异步结果的时间,这种异步的模式依旧会引起主线程的阻塞 15673958664756.jpg

CompletableFuture

如果一段程序需要执行多个异步任务,并且多个异步任务之间还有关系,考虑以下情况

  1. 异步任务二依赖于异步任务一的结果
  2. 某段代码需要获取到所有异步任务的结果后才能执行
  3. 多个异步任务中仅需任意一个执行完后即可执行某段代码

在以上场景中,如果只使用Future,我们不得不显示的取出所有异步任务的结果并进行一些繁琐的判断才能满足需求,而CompletableFuture正是为了简化这部分操作而诞生,CompletableFuture是JDK原生的对Future的增强,它提供了对于多个Future间的串行执行,All,Any等组合形式的支持,在JDK8中,我们甚至可以活用Stream的并行流来优化组合的执行来提高运行效率。但是它仍旧存在Future所存在的问题:当主线程执行到必须要获取异步结果的时候而异步任务还未完成时主线程依然需要陷入阻塞

基于回调的Future/Promise

在热门的异步框架Netty和Vert.x中,都使用了基于回调的Future/Promise模式来实现异步,这里的Future有别于JDK原生的Future,它代表的是一个无法修改的异步任务执行结果,任务执行的成功与否已经成为一个既定的事实,在异步编程中很少单独使用。Promise是一个状态可写的Future,它的核心是一组回调函数列表,(try)Fail()方法和(try)Complete()方法,异步任务执行完成后会根据任务的执行情况调用这两组方法,更新Promise的执行状态,并同时根据状态决定是否要执行在Promise中注册的回调函数。在异步任务提交后Promise将会立即返回,接下来我们就可以在Promise对象上将接下来与异步任务相关的逻辑作为回调注册到Promise上,又异步任务执行完成后触发执行

15673958873784.jpg

Netty中DefaultPromise的部分实现

     @Override
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

@Override
public boolean trySuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
        return true;
    }
    return false;
}

@Override
public Promise<V> setFailure(Throwable cause) {
    if (setFailure0(cause)) {
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this, cause);
}

@Override
public boolean tryFailure(Throwable cause) {
    if (setFailure0(cause)) {
        notifyListeners();
        return true;
    }
    return false;
}

注:Vert.x中的Future直接提供了(try)Fail()方法和(try)Complete()方法,它本质上其实是Promise,从Vert.x 3.8开始也加入了Promise来避免Future与Promise的混淆,但为了向前兼容,在3.8中Future还是保留了修改状态相关的逻辑,只是将这些相关方法标记为废除不推荐使用

Future/Promise中的CompositeFuture

在基于回调的异步模型中,也需要考虑当多个异步任务有逻辑联系的情况,但实现的细节与JDK中的CompletableFuture有本质区别,Vert.x的源码中中提供了一种不错的实现思路,记把多个Promise的执行也看作是一个独特的Promise(CompositeFuture),这个CompositeFuture把和组合逻辑相关的逻辑作为回调注册到各个被组合的Promise中,由它们来触发CompositeFuture状态的更新从而触发组合逻辑的运转

Vert.x中CompositeFuture.all实现

    public static CompositeFuture all(Future<?>... results) {
CompositeFutureImpl composite = new CompositeFutureImpl(results);
int len = results.length;
for (int i = 0; i < len; i++) {
  results[i].setHandler(ar -> {//给每个异步任务注册回调
    Handler<AsyncResult<CompositeFuture>> handler = null;
    if (ar.succeeded()) {
      synchronized (composite) {
        composite.count++; 
        if (!composite.isComplete() && composite.count == len) {  //比对异步任务总个数和成功的总个数
          handler = composite.setCompleted(null);
        }
      }
    } else { //一个失败全部失败
      synchronized (composite) {
        if (!composite.isComplete()) {
          handler = composite.setCompleted(ar.cause());
        }
      }
    }
    if (handler != null) {
      handler.handle(composite);
    }
  });
}
if (len == 0) {
  composite.setCompleted(null);
}
return composite;
}