首页
关于
友链
Search
1
wlop 4K 壁纸 4k8k 动态 壁纸
1,468 阅读
2
Nacos持久化MySQL问题-解决方案
931 阅读
3
Docker搭建Typecho博客
749 阅读
4
滑动时间窗口算法
728 阅读
5
Nginx反向代理微服务配置
699 阅读
生活
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
安全
开发工具
百度网盘
天翼网盘
阿里网盘
登录
Search
标签搜索
java
javase
docker
java8
springboot
thread
spring
分布式
mysql
锁
linux
redis
源码
typecho
centos
git
map
RabbitMQ
lambda
stream
少年
累计撰写
189
篇文章
累计收到
24
条评论
首页
栏目
生活
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
安全
开发工具
百度网盘
天翼网盘
阿里网盘
页面
关于
友链
搜索到
2
篇与
的结果
2022-03-11
CompletableFuture异步编排
通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。因此我们可以使用completableFuture 异步编排方案。比如:一个业务场景,需要同时获取多个数据,如果同步线程挨个执行,则需要时间为所有线程执行时间的总和。如果我们使用异步线程执行,所需时间则为耗时最长那个异步线程的执行时间。如果多个异常线程之间还存在依赖关系,比如线程3需要线程1的执行结果,线程6依赖线程3、线程2,那这个问题怎么解决呢。那就可以使用completableFuture 异步编排方案实现。注意:completableFuture 是jdk1.8之后添加的一个功能。CompletableFuture接口:public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {public interface Future<V> {以前用到的FutureTask就是用到的Future可以得到返回结果public class FutureTask<V> implements RunnableFuture<V> {public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }Future可以得到返回结果CompletableFuture随便一个方法,都接受一个Function public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn) { return orApplyStage(null, other, fn); }@FunctionalInterface public interface Function<T, R> {Function是一个@FunctionalInterface,所以对Lambda使用要熟悉。CompletableFuture异步编排问题多个异步线程远程调用会导致丢失请求头,原因是多个线程,通过拦截器不能获取其它线程的请求的heard信息。解决方案:每个线程共享自己的requestAttributes。自定义feign拦截器:import feign.RequestInterceptor; import feign.RequestTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; /** * @description: TODO * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/10 23:03 * @version: 1.0 */ @Configuration public class FamilyFegin { @Bean(name ="requestInterceptor") public RequestInterceptor requestInterceptor(){ return new RequestInterceptor() { @Override public void apply(RequestTemplate requestTemplate) { //RequestContextHolder拿到刚请求来的数据 ServletRequestAttributes requestAttributes = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes(); System.out.println("requestAttributes线程"+Thread.currentThread().getId()); HttpServletRequest request = requestAttributes.getRequest(); System.out.println("调用feign之前"); if(request != null){ //同步请求头数据 String cookie = request.getHeader("Cookie");//老数据 //给新请求同步老请求的cookie requestTemplate.header("Cookie", cookie); } } }; } }web配置:因为需要拦截器生效import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; /** * @description: WEB配置 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/10 22:40 * @version: 1.0 */ public class FamilyWebConfig implements WebMvcConfigurer { @Autowired LoginUserInterceptor loginUserInterceptor; @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(loginUserInterceptor).addPathPatterns("/**"); } }CompletableFuture异步编排import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @description: CompletableFuture异步编排 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/10 23:32 * @version: 1.0 */ public class MyCompletableFuture { public static ExecutorService executorService = Executors.newFixedThreadPool(10); public void myOpenFeign() throws ExecutionException, InterruptedException { System.out.println("主线程0"); //远程调用存在丢失请求头的问题,因为不在同一线程,导致自定义拦截器不能获取head信息。 //解决丢失请求头方案: ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); //第一个异步任务 CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> { System.out.println("辅线程1"); //每个线程都共享一下自己的requestAttributes RequestContextHolder.setRequestAttributes(requestAttributes); //异步线程远程调用,业务1 //异步线程调用业务代码 }, executorService); //第二个异步任务 CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> { System.out.println("辅线程2"); //每个线程都共享一下自己的requestAttributes RequestContextHolder.setRequestAttributes(requestAttributes); //异步线程远程调用,业务2 //异步线程调用业务代码 }, executorService); CompletableFuture.allOf(oneFuture, twoFuture).get(); } }以上就是解决CompletableFuture异步编排,异步多线程引起的远程调用请求丢失解决方案。代码中共享requestAttributes原因ThreadLocal数据:ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();RequestContextHolder主线程不一样获取的数据不一样,如下:import javax.faces.context.FacesContext; import org.springframework.core.NamedInheritableThreadLocal; import org.springframework.core.NamedThreadLocal; import org.springframework.lang.Nullable; import org.springframework.util.ClassUtils; public abstract class RequestContextHolder { private static final boolean jsfPresent = ClassUtils.isPresent("javax.faces.context.FacesContext", RequestContextHolder.class.getClassLoader()); private static final ThreadLocal<RequestAttributes> requestAttributesHolder = new NamedThreadLocal("Request attributes"); private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder = new NamedInheritableThreadLocal("Request context"); public RequestContextHolder() { }
2022年03月11日
407 阅读
0 评论
5 点赞
2022-03-10
Feign远程调用丢失请求头问题
openFeign远程调用丢失请求头feign在远程调用之前要构造请求,调用会经过很多拦截器。丢失请求头原因,是远程调用重新构建了request请求,新的request没有请求头headr。解决方案:定义一个feign的拦截器先来看一下feign远程调用类SynchronousMethodHandler// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package feign; import feign.InvocationHandlerFactory.MethodHandler; import feign.Logger.Level; import feign.Request.Options; import feign.codec.Decoder; import feign.codec.ErrorDecoder; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; final class SynchronousMethodHandler implements MethodHandler { private static final long MAX_RESPONSE_BUFFER_SIZE = 8192L; private final MethodMetadata metadata; private final Target<?> target; private final Client client; private final Retryer retryer; private final List<RequestInterceptor> requestInterceptors; private final Logger logger; private final Level logLevel; private final feign.RequestTemplate.Factory buildTemplateFromArgs; private final Options options; private final ExceptionPropagationPolicy propagationPolicy; private final Decoder decoder; private final AsyncResponseHandler asyncResponseHandler; private SynchronousMethodHandler(Target<?> target, Client client, Retryer retryer, List<RequestInterceptor> requestInterceptors, Logger logger, Level logLevel, MethodMetadata metadata, feign.RequestTemplate.Factory buildTemplateFromArgs, Options options, Decoder decoder, ErrorDecoder errorDecoder, boolean decode404, boolean closeAfterDecode, ExceptionPropagationPolicy propagationPolicy, boolean forceDecoding) { this.target = (Target)Util.checkNotNull(target, "target", new Object[0]); this.client = (Client)Util.checkNotNull(client, "client for %s", new Object[]{target}); this.retryer = (Retryer)Util.checkNotNull(retryer, "retryer for %s", new Object[]{target}); this.requestInterceptors = (List)Util.checkNotNull(requestInterceptors, "requestInterceptors for %s", new Object[]{target}); this.logger = (Logger)Util.checkNotNull(logger, "logger for %s", new Object[]{target}); this.logLevel = (Level)Util.checkNotNull(logLevel, "logLevel for %s", new Object[]{target}); this.metadata = (MethodMetadata)Util.checkNotNull(metadata, "metadata for %s", new Object[]{target}); this.buildTemplateFromArgs = (feign.RequestTemplate.Factory)Util.checkNotNull(buildTemplateFromArgs, "metadata for %s", new Object[]{target}); this.options = (Options)Util.checkNotNull(options, "options for %s", new Object[]{target}); this.propagationPolicy = propagationPolicy; if (forceDecoding) { this.decoder = decoder; this.asyncResponseHandler = null; } else { this.decoder = null; this.asyncResponseHandler = new AsyncResponseHandler(logLevel, logger, decoder, errorDecoder, decode404, closeAfterDecode); } } public Object invoke(Object[] argv) throws Throwable { RequestTemplate template = this.buildTemplateFromArgs.create(argv); Options options = this.findOptions(argv); Retryer retryer = this.retryer.clone(); while(true) { try { return this.executeAndDecode(template, options); } catch (RetryableException var9) { RetryableException e = var9; try { retryer.continueOrPropagate(e); } catch (RetryableException var8) { Throwable cause = var8.getCause(); if (this.propagationPolicy == ExceptionPropagationPolicy.UNWRAP && cause != null) { throw cause; } throw var8; } if (this.logLevel != Level.NONE) { this.logger.logRetry(this.metadata.configKey(), this.logLevel); } } } } Object executeAndDecode(RequestTemplate template, Options options) throws Throwable { Request request = this.targetRequest(template); if (this.logLevel != Level.NONE) { this.logger.logRequest(this.metadata.configKey(), this.logLevel, request); } long start = System.nanoTime(); Response response; try { response = this.client.execute(request, options); response = response.toBuilder().request(request).requestTemplate(template).build(); } catch (IOException var12) { if (this.logLevel != Level.NONE) { this.logger.logIOException(this.metadata.configKey(), this.logLevel, var12, this.elapsedTime(start)); } throw FeignException.errorExecuting(request, var12); } long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); if (this.decoder != null) { return this.decoder.decode(response, this.metadata.returnType()); } else { CompletableFuture<Object> resultFuture = new CompletableFuture(); this.asyncResponseHandler.handleResponse(resultFuture, this.metadata.configKey(), response, this.metadata.returnType(), elapsedTime); try { if (!resultFuture.isDone()) { throw new IllegalStateException("Response handling not done"); } else { return resultFuture.join(); } } catch (CompletionException var13) { Throwable cause = var13.getCause(); if (cause != null) { throw cause; } else { throw var13; } } } } long elapsedTime(long start) { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); } Request targetRequest(RequestTemplate template) { Iterator var2 = this.requestInterceptors.iterator(); while(var2.hasNext()) { RequestInterceptor interceptor = (RequestInterceptor)var2.next(); interceptor.apply(template); } return this.target.apply(template); } Options findOptions(Object[] argv) { if (argv != null && argv.length != 0) { Stream var10000 = Stream.of(argv); Options.class.getClass(); var10000 = var10000.filter(Options.class::isInstance); Options.class.getClass(); return (Options)var10000.map(Options.class::cast).findFirst().orElse(this.options); } else { return this.options; } } static class Factory { private final Client client; private final Retryer retryer; private final List<RequestInterceptor> requestInterceptors; private final Logger logger; private final Level logLevel; private final boolean decode404; private final boolean closeAfterDecode; private final ExceptionPropagationPolicy propagationPolicy; private final boolean forceDecoding; Factory(Client client, Retryer retryer, List<RequestInterceptor> requestInterceptors, Logger logger, Level logLevel, boolean decode404, boolean closeAfterDecode, ExceptionPropagationPolicy propagationPolicy, boolean forceDecoding) { this.client = (Client)Util.checkNotNull(client, "client", new Object[0]); this.retryer = (Retryer)Util.checkNotNull(retryer, "retryer", new Object[0]); this.requestInterceptors = (List)Util.checkNotNull(requestInterceptors, "requestInterceptors", new Object[0]); this.logger = (Logger)Util.checkNotNull(logger, "logger", new Object[0]); this.logLevel = (Level)Util.checkNotNull(logLevel, "logLevel", new Object[0]); this.decode404 = decode404; this.closeAfterDecode = closeAfterDecode; this.propagationPolicy = propagationPolicy; this.forceDecoding = forceDecoding; } public MethodHandler create(Target<?> target, MethodMetadata md, feign.RequestTemplate.Factory buildTemplateFromArgs, Options options, Decoder decoder, ErrorDecoder errorDecoder) { return new SynchronousMethodHandler(target, this.client, this.retryer, this.requestInterceptors, this.logger, this.logLevel, md, buildTemplateFromArgs, options, decoder, errorDecoder, this.decode404, this.closeAfterDecode, this.propagationPolicy, this.forceDecoding); } } } 远程调用时,构建了RequestInterceptor。自定义feign拦截器import feign.RequestInterceptor; import feign.RequestTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; /** * @description: TODO * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/10 23:03 * @version: 1.0 */ @Configuration public class FamilyFegin { @Bean(name ="requestInterceptor") public RequestInterceptor requestInterceptor(){ return new RequestInterceptor() { @Override public void apply(RequestTemplate requestTemplate) { //RequestContextHolder拿到刚请求来的数据 ServletRequestAttributes requestAttributes = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); System.out.println("调用feign之前"); //同步请求头数据 String cookie = request.getHeader("Cookie");//老数据 //给新请求同步老请求的cookie requestTemplate.header("Cookie", cookie); } }; } }拦截器说明RequestContextHolder.getRequestAttributes()获取到RequestAttributes类,RequestAttributes源码:// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.web.context.request; import org.springframework.lang.Nullable; public interface RequestAttributes { int SCOPE_REQUEST = 0; int SCOPE_SESSION = 1; String REFERENCE_REQUEST = "request"; String REFERENCE_SESSION = "session"; @Nullable Object getAttribute(String var1, int var2); void setAttribute(String var1, Object var2, int var3); void removeAttribute(String var1, int var2); String[] getAttributeNames(int var1); void registerDestructionCallback(String var1, Runnable var2, int var3); @Nullable Object resolveReference(String var1); String getSessionId(); Object getSessionMutex(); } ServletRequestAttributes继承了AbstractRequestAttributes,AbstractRequestAttributes实现了RequestAttributes,因此可以强转。 public abstract class AbstractRequestAttributes implements RequestAttributes { protected final Map<String, Runnable> requestDestructionCallbacks = new LinkedHashMap(8); private volatile boolean requestActive = true; public AbstractRequestAttributes() { } public void requestCompleted() {注意:该处理是在同步一个线程种处理的,才能获取之前的heard中的cookie信息。异步多线程该处理方式不同。
2022年03月10日
275 阅读
0 评论
6 点赞