通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。因此我们可以使用completableFuture 异步编排方案。
比如:一个业务场景,需要同时获取多个数据,如果同步线程挨个执行,则需要时间为所有线程执行时间的总和。
如果我们使用异步线程执行,所需时间则为耗时最长那个异步线程的执行时间。
如果多个异常线程之间还存在依赖关系,比如线程3需要线程1的执行结果,线程6依赖线程3、线程2,那这个问题怎么解决呢。那就可以使用completableFuture 异步编排方案实现。
注意:completableFuture 是jdk1.8之后添加的一个功能。
CompletableFuture接口:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {public interface Future<V> {以前用到的FutureTask就是用到的Future可以得到返回结果
public class FutureTask<V> implements RunnableFuture<V> {public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}Future可以得到返回结果
CompletableFuture随便一个方法,都接受一个Function
    public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(null, other, fn);
    }@FunctionalInterface
public interface Function<T, R> {Function是一个@FunctionalInterface,所以对Lambda使用要熟悉。
CompletableFuture异步编排问题
多个异步线程远程调用会导致丢失请求头,原因是多个线程,通过拦截器不能获取其它线程的请求的heard信息。
解决方案:每个线程共享自己的requestAttributes。
自定义feign拦截器:
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
/**
 * @description: TODO
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/3/10 23:03
 * @version: 1.0
 */
@Configuration
public class FamilyFegin {
    @Bean(name ="requestInterceptor")
    public RequestInterceptor requestInterceptor(){
        return new RequestInterceptor() {
            @Override
            public void apply(RequestTemplate requestTemplate) {
                //RequestContextHolder拿到刚请求来的数据
                ServletRequestAttributes requestAttributes = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes();
                System.out.println("requestAttributes线程"+Thread.currentThread().getId());
                HttpServletRequest request = requestAttributes.getRequest();
                System.out.println("调用feign之前");
                if(request != null){
                    //同步请求头数据
                   String cookie = request.getHeader("Cookie");//老数据
                   //给新请求同步老请求的cookie
                   requestTemplate.header("Cookie", cookie);
                }
            }
        };
    }
}web配置:因为需要拦截器生效
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
 * @description: WEB配置
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/3/10 22:40
 * @version: 1.0
 */
public class FamilyWebConfig implements WebMvcConfigurer {
    @Autowired
    LoginUserInterceptor loginUserInterceptor;
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(loginUserInterceptor).addPathPatterns("/**");
    }
}CompletableFuture异步编排
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @description: CompletableFuture异步编排
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/3/10 23:32
 * @version: 1.0
 */
public class MyCompletableFuture {
    public static ExecutorService executorService = Executors.newFixedThreadPool(10);
    public void myOpenFeign() throws ExecutionException, InterruptedException {
        System.out.println("主线程0");
        
        //远程调用存在丢失请求头的问题,因为不在同一线程,导致自定义拦截器不能获取head信息。
        //解决丢失请求头方案:
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        //第一个异步任务
        CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {
            System.out.println("辅线程1");
            //每个线程都共享一下自己的requestAttributes
            RequestContextHolder.setRequestAttributes(requestAttributes);
            //异步线程远程调用,业务1
            //异步线程调用业务代码
        }, executorService);
        //第二个异步任务
        CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {
            System.out.println("辅线程2");
            //每个线程都共享一下自己的requestAttributes
            RequestContextHolder.setRequestAttributes(requestAttributes);
            //异步线程远程调用,业务2
            //异步线程调用业务代码
        }, executorService);
        CompletableFuture.allOf(oneFuture, twoFuture).get();
    }
}以上就是解决CompletableFuture异步编排,异步多线程引起的远程调用请求丢失解决方案。
代码中共享requestAttributes原因ThreadLocal数据:
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();RequestContextHolder主线程不一样获取的数据不一样,如下:
import javax.faces.context.FacesContext;
import org.springframework.core.NamedInheritableThreadLocal;
import org.springframework.core.NamedThreadLocal;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
public abstract class RequestContextHolder {
    private static final boolean jsfPresent = ClassUtils.isPresent("javax.faces.context.FacesContext", RequestContextHolder.class.getClassLoader());
    private static final ThreadLocal<RequestAttributes> requestAttributesHolder = new NamedThreadLocal("Request attributes");
    private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder = new NamedInheritableThreadLocal("Request context");
    public RequestContextHolder() {
    }      
        
      
    
          
评论 (0)