JAVA8-CompletableFuture流水线工作,join多个异步任务详解
需求:根据商品id,将每个商品价格翻2倍?
代码示例
package com.example.study.java8.completableFutures;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import static java.util.stream.Collectors.toList;
/**
* 根据商品id,将每个商品价格翻2倍。CompletableFuture高并发执行。
*/
public class CompletableFutureInAction4 {
private final static Random RANDOM = new Random(System.currentTimeMillis());
public static void main(String[] args) {
//防止主线程执行完后,守护线程也关闭
ExecutorService executorService = Executors.newFixedThreadPool(2, r -> {
Thread thread = new Thread(r);
thread.setDaemon(false);
return thread;
});
//5个商品ID
List<Integer> productIDs = Arrays.asList(1, 2, 3, 4, 5);
//通过CompletableFuture查询5个商品价格
Stream<CompletableFuture<Double>> completableFutureStream = productIDs.stream().map(i -> CompletableFuture.supplyAsync(() -> queryProduct(i), executorService));
//将每个商品价格翻2倍
Stream<CompletableFuture<Double>> multplyFutures = completableFutureStream.map(future -> future.thenApply(CompletableFutureInAction4::multply));
//将翻倍后的CompletableFuture加入线程中,将翻倍后价格收集成一个list数组
List<Double> result = multplyFutures.map(CompletableFuture::join).collect(toList());
//输出最后翻倍价格
System.out.println(result);
}
private static Double multply(Double value) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return value * 2D;
}
private static Double queryProduct(int i){
return CompletableFutureInAction4.get();
}
//模拟从数据库根据商品ID查询价格
static double get(){
try {
Thread.sleep(RANDOM.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
double value = RANDOM.nextDouble();
System.out.println(value);
return value;
}
}
输出结果:
0.22471854337617791
0.11072895680534822
0.6087836739979867
0.3209858426806811
0.8416829454071859
[0.44943708675235583, 0.22145791361069644, 1.2175673479959734, 0.6419716853613622, 1.6833658908143718]
原理图
说明:
Sequential(串行操作):将第一个商品价格查询,然后翻倍,获取翻倍后价格,然后查询第二个商品,在将第二个商品价格翻倍,。。。一次执行完后,将所有的返回价格放到结果中。
Parrallel(并行操作):5个商品同时执行价格查询,价格翻倍任务,同时将返回价格放到结果中。
明显看出Parrallel并行操作会快很多。
缩减后代码
package com.example.study.java8.completableFutures;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import static java.util.stream.Collectors.toList;
/**
* 根据商品id,将每个商品价格翻2倍。利用CompletableFuture的高并发执行。代码缩减后更简洁。
*/
public class CompletableFutureInAction5 {
private final static Random RANDOM = new Random(System.currentTimeMillis());
public static void main(String[] args) {
//防止主线程执行完后,守护线程也关闭
ExecutorService executorService = Executors.newFixedThreadPool(2, r -> {
Thread thread = new Thread(r);
thread.setDaemon(false);
return thread;
});
//5个商品ID
List<Integer> productIDs = Arrays.asList(1, 2, 3, 4, 5);
//通过CompletableFuture查询5个商品价格,并将价格翻2倍
List<Double> result = productIDs
.stream()
.map(i -> CompletableFuture.supplyAsync(() -> queryProduct(i), executorService))
.map(future -> future.thenApply(CompletableFutureInAction5::multply))
.map(CompletableFuture::join).collect(toList());
//输出最后翻倍价格
System.out.println(result);
}
private static Double multply(Double value) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return value * 2D;
}
private static Double queryProduct(int i) {
return CompletableFutureInAction5.get();
}
//模拟从数据库根据商品ID查询价格
static double get() {
try {
Thread.sleep(RANDOM.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
double value = RANDOM.nextDouble();
System.out.println(value);
return value;
}
}
评论 (0)