JAVA8-CompletableFuture流水线工作,join多个异步任务详解

admin
2022-11-21 / 0 评论 / 196 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2022年11月21日,已超过730天没有更新,若内容或图片失效,请留言反馈。

JAVA8-CompletableFuture流水线工作,join多个异步任务详解

需求:根据商品id,将每个商品价格翻2倍?

代码示例

package com.example.study.java8.completableFutures;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;

/**
 * 根据商品id,将每个商品价格翻2倍。CompletableFuture高并发执行。
 */
public class CompletableFutureInAction4 {
    private final static Random RANDOM = new Random(System.currentTimeMillis());

    public static void main(String[] args) {
        //防止主线程执行完后,守护线程也关闭
        ExecutorService executorService = Executors.newFixedThreadPool(2, r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(false);
            return thread;
        });

        //5个商品ID
        List<Integer> productIDs = Arrays.asList(1, 2, 3, 4, 5);
        //通过CompletableFuture查询5个商品价格
        Stream<CompletableFuture<Double>> completableFutureStream = productIDs.stream().map(i -> CompletableFuture.supplyAsync(() -> queryProduct(i), executorService));
        //将每个商品价格翻2倍
        Stream<CompletableFuture<Double>> multplyFutures = completableFutureStream.map(future -> future.thenApply(CompletableFutureInAction4::multply));
        //将翻倍后的CompletableFuture加入线程中,将翻倍后价格收集成一个list数组
        List<Double> result = multplyFutures.map(CompletableFuture::join).collect(toList());
        //输出最后翻倍价格
        System.out.println(result);
    }

    private static Double multply(Double value) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return value * 2D;
    }

    private static Double queryProduct(int i){
        return CompletableFutureInAction4.get();
    }

    //模拟从数据库根据商品ID查询价格
    static double get(){
        try {
            Thread.sleep(RANDOM.nextInt(100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        double value = RANDOM.nextDouble();
        System.out.println(value);
        return value;
    }
}

输出结果:

0.22471854337617791
0.11072895680534822
0.6087836739979867
0.3209858426806811
0.8416829454071859
[0.44943708675235583, 0.22145791361069644, 1.2175673479959734, 0.6419716853613622, 1.6833658908143718]

原理图

CompletableFuture
说明:

Sequential(串行操作):将第一个商品价格查询,然后翻倍,获取翻倍后价格,然后查询第二个商品,在将第二个商品价格翻倍,。。。一次执行完后,将所有的返回价格放到结果中。

Parrallel(并行操作):5个商品同时执行价格查询,价格翻倍任务,同时将返回价格放到结果中。

明显看出Parrallel并行操作会快很多。

缩减后代码

package com.example.study.java8.completableFutures;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;

/**
 * 根据商品id,将每个商品价格翻2倍。利用CompletableFuture的高并发执行。代码缩减后更简洁。
 */
public class CompletableFutureInAction5 {
    private final static Random RANDOM = new Random(System.currentTimeMillis());

    public static void main(String[] args) {
        //防止主线程执行完后,守护线程也关闭
        ExecutorService executorService = Executors.newFixedThreadPool(2, r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(false);
            return thread;
        });

        //5个商品ID
        List<Integer> productIDs = Arrays.asList(1, 2, 3, 4, 5);
        //通过CompletableFuture查询5个商品价格,并将价格翻2倍
        List<Double> result = productIDs
                .stream()
                .map(i -> CompletableFuture.supplyAsync(() -> queryProduct(i), executorService))
                .map(future -> future.thenApply(CompletableFutureInAction5::multply))
                .map(CompletableFuture::join).collect(toList());
        //输出最后翻倍价格
        System.out.println(result);
    }

    private static Double multply(Double value) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return value * 2D;
    }

    private static Double queryProduct(int i) {
        return CompletableFutureInAction5.get();
    }

    //模拟从数据库根据商品ID查询价格
    static double get() {
        try {
            Thread.sleep(RANDOM.nextInt(100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        double value = RANDOM.nextDouble();
        System.out.println(value);
        return value;
    }
}
2

评论 (0)

取消