JAVA8-实现一个异步基于事件回调的Future程序
前面2个例子(JAVA8-多线程Future设计模式原理,自定义实现一个Future程序、JAVA8-JDK自带Future,Callable,ExecutorService)+该例子,是为了学习CompletableFuture,理解其原理。
自定义Future程序代码示例:
package com.example.study.java8.funture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* 实现一个异步基于事件回调的Future程序
*/
public class FutureInAction3 {
public static void main(String[] args) {
Future<String> future = invoke(() -> {
try {
Thread.sleep(10000L);
return "I'm finished.";
} catch (InterruptedException e) {
return "I'm Error.";
}
});
//注册一个事件
future.setCompletable(new Completable<String>() {
@Override
public void completable(String s) {
System.out.println(s);
}
@Override
public void excetion(Throwable cause) {
System.out.println("Error");
cause.printStackTrace();
}
});
//下面就可以执行其它逻辑了。。。
System.out.println("。。。。。。。。。");
System.out.println(future.get());
System.out.println(future.get());
}
private static <T> Future<T> invoke(Callable<T> callable) {
AtomicReference<T> result = new AtomicReference<>();
AtomicBoolean finished = new AtomicBoolean(false);
Future future = new Future() {
private Completable<T> completable;
@Override
public Object get() {
return result.get();
}
@Override
public boolean isDone() {
return finished.get();
}
@Override
public void setCompletable(Completable completable) {
this.completable = completable;
}
@Override
public Completable getCompletable() {
return completable;
}
};
Thread t = new Thread(() -> {
try {
T value = callable.action();
result.set(value);
finished.set(true);
if (future.getCompletable() != null) {
//调用回调函数
future.getCompletable().completable(value);
}
} catch (Exception cause) {
if (future.getCompletable() != null) {
future.getCompletable().excetion(cause);
}
}
});
t.start();
return future;
}
/**
* 自定义的Future
*
* @param <T>
*/
private interface Future<T> {
T get();
boolean isDone();
void setCompletable(Completable<T> completable);
Completable<T> getCompletable();
}
private interface Callable<T> {
T action();
}
/**
* 回调接口
*
* @param <T>
*/
private interface Completable<T> {
/**
* 执行完后,调用的回调函数
*
* @param t
*/
void completable(T t);
/**
* 执行过程中出现的异常,直接传入需要抛出的异常回调。
*
* @param cause
*/
void excetion(Throwable cause);
}
}
输出结果:
。。。。。。。。。 //不会阻塞,继续执行后面操作
null //不会阻塞,继续执行后面操作
null //不会阻塞,继续执行后面操作
I'm finished. //等线程中的操作计算完成后,会根据注册的事件,调用回调函数,输出结果,不用阻塞等待,必须完成后续操作才能执行。
评论 (0)