CompletableFuture异步编排

admin
2022-03-11 / 0 评论 / 290 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2022年04月14日,已超过743天没有更新,若内容或图片失效,请留言反馈。

通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。因此我们可以使用completableFuture 异步编排方案。

比如:一个业务场景,需要同时获取多个数据,如果同步线程挨个执行,则需要时间为所有线程执行时间的总和。

如果我们使用异步线程执行,所需时间则为耗时最长那个异步线程的执行时间。

如果多个异常线程之间还存在依赖关系,比如线程3需要线程1的执行结果,线程6依赖线程3、线程2,那这个问题怎么解决呢。那就可以使用completableFuture 异步编排方案实现。

注意:completableFuture 是jdk1.8之后添加的一个功能。

CompletableFuture接口:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public interface Future<V> {

以前用到的FutureTask就是用到的Future可以得到返回结果

public class FutureTask<V> implements RunnableFuture<V> {
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

Future可以得到返回结果

CompletableFuture随便一个方法,都接受一个Function

    public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(null, other, fn);
    }
@FunctionalInterface
public interface Function<T, R> {

Function是一个@FunctionalInterface,所以对Lambda使用要熟悉。

CompletableFuture异步编排问题

多个异步线程远程调用会导致丢失请求头,原因是多个线程,通过拦截器不能获取其它线程的请求的heard信息。

解决方案:每个线程共享自己的requestAttributes。

自定义feign拦截器:

import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;

/**
 * @description: TODO
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/3/10 23:03
 * @version: 1.0
 */
@Configuration
public class FamilyFegin {

    @Bean(name ="requestInterceptor")
    public RequestInterceptor requestInterceptor(){
        return new RequestInterceptor() {
            @Override
            public void apply(RequestTemplate requestTemplate) {
                //RequestContextHolder拿到刚请求来的数据
                ServletRequestAttributes requestAttributes = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes();
                System.out.println("requestAttributes线程"+Thread.currentThread().getId());

                HttpServletRequest request = requestAttributes.getRequest();
                System.out.println("调用feign之前");
                if(request != null){
                    //同步请求头数据
                   String cookie = request.getHeader("Cookie");//老数据
                   //给新请求同步老请求的cookie
                   requestTemplate.header("Cookie", cookie);
                }

            }
        };
    }
}

web配置:因为需要拦截器生效

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

/**
 * @description: WEB配置
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/3/10 22:40
 * @version: 1.0
 */
public class FamilyWebConfig implements WebMvcConfigurer {

    @Autowired
    LoginUserInterceptor loginUserInterceptor;
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(loginUserInterceptor).addPathPatterns("/**");
    }
}

CompletableFuture异步编排

import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @description: CompletableFuture异步编排
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/3/10 23:32
 * @version: 1.0
 */
public class MyCompletableFuture {
    public static ExecutorService executorService = Executors.newFixedThreadPool(10);

    public void myOpenFeign() throws ExecutionException, InterruptedException {
        System.out.println("主线程0");
        
        //远程调用存在丢失请求头的问题,因为不在同一线程,导致自定义拦截器不能获取head信息。
        //解决丢失请求头方案:
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();


        //第一个异步任务
        CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {
            System.out.println("辅线程1");
            //每个线程都共享一下自己的requestAttributes
            RequestContextHolder.setRequestAttributes(requestAttributes);
            //异步线程远程调用,业务1
            //异步线程调用业务代码
        }, executorService);

        //第二个异步任务
        CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {
            System.out.println("辅线程2");
            //每个线程都共享一下自己的requestAttributes
            RequestContextHolder.setRequestAttributes(requestAttributes);
            //异步线程远程调用,业务2
            //异步线程调用业务代码
        }, executorService);
        CompletableFuture.allOf(oneFuture, twoFuture).get();
    }

}

以上就是解决CompletableFuture异步编排,异步多线程引起的远程调用请求丢失解决方案。

代码中共享requestAttributes原因ThreadLocal数据:

ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();

RequestContextHolder主线程不一样获取的数据不一样,如下:

import javax.faces.context.FacesContext;
import org.springframework.core.NamedInheritableThreadLocal;
import org.springframework.core.NamedThreadLocal;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;

public abstract class RequestContextHolder {
    private static final boolean jsfPresent = ClassUtils.isPresent("javax.faces.context.FacesContext", RequestContextHolder.class.getClassLoader());
    private static final ThreadLocal<RequestAttributes> requestAttributesHolder = new NamedThreadLocal("Request attributes");
    private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder = new NamedInheritableThreadLocal("Request context");

    public RequestContextHolder() {
    }
5

评论 (0)

取消