通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。因此我们可以使用completableFuture 异步编排方案。
比如:一个业务场景,需要同时获取多个数据,如果同步线程挨个执行,则需要时间为所有线程执行时间的总和。
如果我们使用异步线程执行,所需时间则为耗时最长那个异步线程的执行时间。
如果多个异常线程之间还存在依赖关系,比如线程3需要线程1的执行结果,线程6依赖线程3、线程2,那这个问题怎么解决呢。那就可以使用completableFuture 异步编排方案实现。
注意:completableFuture 是jdk1.8之后添加的一个功能。
CompletableFuture接口:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public interface Future<V> {
以前用到的FutureTask就是用到的Future可以得到返回结果
public class FutureTask<V> implements RunnableFuture<V> {
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
Future可以得到返回结果
CompletableFuture随便一个方法,都接受一个Function
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(null, other, fn);
}
@FunctionalInterface
public interface Function<T, R> {
Function是一个@FunctionalInterface,所以对Lambda使用要熟悉。
CompletableFuture异步编排问题
多个异步线程远程调用会导致丢失请求头,原因是多个线程,通过拦截器不能获取其它线程的请求的heard信息。
解决方案:每个线程共享自己的requestAttributes。
自定义feign拦截器:
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
/**
* @description: TODO
* @author: <a href="mailto:batis@foxmail.com">清风</a>
* @date: 2022/3/10 23:03
* @version: 1.0
*/
@Configuration
public class FamilyFegin {
@Bean(name ="requestInterceptor")
public RequestInterceptor requestInterceptor(){
return new RequestInterceptor() {
@Override
public void apply(RequestTemplate requestTemplate) {
//RequestContextHolder拿到刚请求来的数据
ServletRequestAttributes requestAttributes = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes();
System.out.println("requestAttributes线程"+Thread.currentThread().getId());
HttpServletRequest request = requestAttributes.getRequest();
System.out.println("调用feign之前");
if(request != null){
//同步请求头数据
String cookie = request.getHeader("Cookie");//老数据
//给新请求同步老请求的cookie
requestTemplate.header("Cookie", cookie);
}
}
};
}
}
web配置:因为需要拦截器生效
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* @description: WEB配置
* @author: <a href="mailto:batis@foxmail.com">清风</a>
* @date: 2022/3/10 22:40
* @version: 1.0
*/
public class FamilyWebConfig implements WebMvcConfigurer {
@Autowired
LoginUserInterceptor loginUserInterceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(loginUserInterceptor).addPathPatterns("/**");
}
}
CompletableFuture异步编排
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @description: CompletableFuture异步编排
* @author: <a href="mailto:batis@foxmail.com">清风</a>
* @date: 2022/3/10 23:32
* @version: 1.0
*/
public class MyCompletableFuture {
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public void myOpenFeign() throws ExecutionException, InterruptedException {
System.out.println("主线程0");
//远程调用存在丢失请求头的问题,因为不在同一线程,导致自定义拦截器不能获取head信息。
//解决丢失请求头方案:
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
//第一个异步任务
CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {
System.out.println("辅线程1");
//每个线程都共享一下自己的requestAttributes
RequestContextHolder.setRequestAttributes(requestAttributes);
//异步线程远程调用,业务1
//异步线程调用业务代码
}, executorService);
//第二个异步任务
CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {
System.out.println("辅线程2");
//每个线程都共享一下自己的requestAttributes
RequestContextHolder.setRequestAttributes(requestAttributes);
//异步线程远程调用,业务2
//异步线程调用业务代码
}, executorService);
CompletableFuture.allOf(oneFuture, twoFuture).get();
}
}
以上就是解决CompletableFuture异步编排,异步多线程引起的远程调用请求丢失解决方案。
代码中共享requestAttributes原因ThreadLocal数据:
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
RequestContextHolder主线程不一样获取的数据不一样,如下:
import javax.faces.context.FacesContext;
import org.springframework.core.NamedInheritableThreadLocal;
import org.springframework.core.NamedThreadLocal;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
public abstract class RequestContextHolder {
private static final boolean jsfPresent = ClassUtils.isPresent("javax.faces.context.FacesContext", RequestContextHolder.class.getClassLoader());
private static final ThreadLocal<RequestAttributes> requestAttributesHolder = new NamedThreadLocal("Request attributes");
private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder = new NamedInheritableThreadLocal("Request context");
public RequestContextHolder() {
}
评论 (0)