首页
关于
友链
Search
1
wlop 4K 壁纸 4k8k 动态 壁纸
1,472 阅读
2
Nacos持久化MySQL问题-解决方案
932 阅读
3
Docker搭建Typecho博客
752 阅读
4
滑动时间窗口算法
728 阅读
5
Nginx反向代理微服务配置
699 阅读
生活
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
安全
开发工具
百度网盘
天翼网盘
阿里网盘
登录
Search
标签搜索
java
javase
docker
java8
springboot
thread
spring
分布式
mysql
锁
linux
redis
源码
typecho
centos
git
map
RabbitMQ
lambda
stream
少年
累计撰写
189
篇文章
累计收到
24
条评论
首页
栏目
生活
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
安全
开发工具
百度网盘
天翼网盘
阿里网盘
页面
关于
友链
搜索到
4
篇与
的结果
2022-11-21
JAVA8-CompletableFuture常用API:thenApply、handle、thenRun、thenAccept、thenCompose、thenCombine、thenAcceptBoth
JAVA8-CompletableFuture常用API:thenApply、handle、thenRun、thenAccept、thenCompose、thenCombine、thenAcceptBoth1、thenApplypackage com.example.study.java8.completableFutures.api; import java.util.Optional; import java.util.concurrent.CompletableFuture; /** * CompletableFuture常用API: thenApply、handle */ public class CompletableFutureAtion1 { public static void main(String[] args) throws InterruptedException { //API-- 1、thenApply //模拟其它 异步 逻辑操作然后返回结果1 CompletableFuture.supplyAsync(() -> 1) .thenApply(i -> Integer.sum(i, 10)) //thenApply 将结果加10 .whenComplete((v, t) -> Optional.ofNullable(v).ifPresent(System.out::println)); //同步执行打印结果 // .whenCompleteAsync() //异步操作,可以将结果在进行其它逻辑异步操作 //whenComplete VS whenCompleteAsync //为了防止主线程结束后,守护线程被关闭,模拟修改10000毫秒 Thread.sleep(10000); } } 输出结果:11对比whenComplete VS whenCompleteAsync:whenCompleteAsync异步操作,可以将结果在进行其它逻辑异步操作。2、handlepackage com.example.study.java8.completableFutures.api; import java.util.Optional; import java.util.concurrent.CompletableFuture; /** * CompletableFuture常用API: thenApply、handle */ public class CompletableFutureAtion1 { public static void main(String[] args) throws InterruptedException { //API- 2、handle CompletableFuture.supplyAsync(() -> 1) .handle((v, t) -> Integer.sum(v, 10)) .whenComplete((v, t) -> Optional.ofNullable(v).ifPresent(System.out::println)); // thenApply VS handle :handle只是多了一个对堆内存的考虑。 //为了防止主线程结束后,守护线程被关闭,模拟修改10000毫秒 Thread.sleep(10000); } } 输出结果:11对比thenApply VS handle :handle只是多了一个对堆内存的考虑。3、thenRunpackage com.example.study.java8.completableFutures.api; import java.util.Optional; import java.util.concurrent.CompletableFuture; /** * CompletableFuture常用API: thenApply、handle */ public class CompletableFutureAtion1 { public static void main(String[] args) throws InterruptedException { //API-3、thenRun:所有结果执行完后,执行其它操作 CompletableFuture.supplyAsync(() -> 1) .handle((v, t) -> Integer.sum(v, 10)) .whenComplete((v, t) -> Optional.ofNullable(v).ifPresent(System.out::println)) .thenRun(System.out::println); //这里没有入参,只会打印一个换行 // .thenRunAsync() 如果做成异步的就用这个方法 //为了防止主线程结束后,守护线程被关闭,模拟修改10000毫秒 Thread.sleep(10000); } }输出结果:11 //这里打印了一个换行对比thenRun VS thenRunAsync :thenRunAsync 做成异步的就用这个方法。4、thenAcceptpackage com.example.study.java8.completableFutures.api; import java.util.Optional; import java.util.concurrent.CompletableFuture; /** * CompletableFuture常用API: thenApply、handle */ public class CompletableFutureAtion1 { public static void main(String[] args) throws InterruptedException { //API-4、thenAccept:不会有任何返回值,只是对结果进行消费 CompletableFuture.supplyAsync(() -> 1) .thenAccept(System.out::println); //为了防止主线程结束后,守护线程被关闭,模拟修改10000毫秒 Thread.sleep(10000); } } 输出结果:15、thenComposepackage com.example.study.java8.completableFutures.api; import java.util.Optional; import java.util.concurrent.CompletableFuture; /** * CompletableFuture常用API: thenApply、handle */ public class CompletableFutureAtion1 { public static void main(String[] args) throws InterruptedException { //API-5、thenCompose:将异步产生的结果,再交(组合)给另一个ComletableFuture进行处理,是有返回值的。 CompletableFuture.supplyAsync(() -> 1) .thenCompose(i -> CompletableFuture.supplyAsync(() -> 10 * i)) //将1再组合成另一个CompletableFuture处理 .thenAccept(System.out::println); //然后消费,直接输出结果 //为了防止主线程结束后,守护线程被关闭,模拟修改10000毫秒 Thread.sleep(10000); } }输出结果:106、thenCombinepackage com.example.study.java8.completableFutures.api; import java.util.Optional; import java.util.concurrent.CompletableFuture; /** * CompletableFuture常用API: thenApply、handle */ public class CompletableFutureAtion1 { public static void main(String[] args) throws InterruptedException { //API-6、thenCombine:将异步产生的结果,与另一个CompletableFuture结果,作为参数进行处理,有返回值。 CompletableFuture.supplyAsync(() -> 1) .thenCombine(CompletableFuture.supplyAsync(()->0.2),(v1,v2)->v1+v2) .thenAccept(System.out::println); //为了防止主线程结束后,守护线程被关闭,模拟修改10000毫秒 Thread.sleep(10000); } }输出结果:1.27、thenAcceptBothpackage com.example.study.java8.completableFutures.api; import java.util.Optional; import java.util.concurrent.CompletableFuture; /** * CompletableFuture常用API: thenApply、handle */ public class CompletableFutureAtion1 { public static void main(String[] args) throws InterruptedException { //API-7、thenAcceptBoth:将异步产生的结果,与另一个CompletableFuture结果,作为参数进行处理,没有返回值,直接用于消费。 CompletableFuture.supplyAsync(() -> 1) .thenAcceptBoth(CompletableFuture.supplyAsync(()->0.5), (v1,v2)->{ System.out.println(v1); System.out.println(v2); }); // thenCombine VS thenAcceptBoth:2个方法类似,有个有返回值,有个没有返回值。 //为了防止主线程结束后,守护线程被关闭,模拟修改10000毫秒 Thread.sleep(10000); } } 输出结果:1 0.5CompletableFuture常用API使用:1、thenApply2、handle3、thenRun4、thenAccept5、thenCompose6、thenCombine7、thenAcceptBoth
2022年11月21日
133 阅读
0 评论
4 点赞
2022-11-21
JAVA8-CompletableFuture基本用法
JAVA8-CompletableFuture基本用法针对<实现一个异步基于事件回调的Future程序> 用CompletableFuture进行改进。代码示例:package com.example.study.java8.completableFutures; import java.util.Optional; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /** * CompletableFuture 基本用法 */ public class CompletableFutureInAction1 { private final static Random RANDOM = new Random(System.currentTimeMillis()); public static void main(String[] args) throws ExecutionException, InterruptedException { //实际开发中,一般不直接new,而是使用工厂创建 // CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(); CompletableFuture<Double> completableFuture = new CompletableFuture<>(); new Thread(()->{ double value = get(); completableFuture.complete(value); }).start(); //不会阻塞程序执行 System.out.println("===========no====block====.."); //1、后面获取程序执行结果 // Optional.ofNullable(completableFuture.get()).ifPresent(System.out::println); //2、执行完后,通过回调自动返回结果 completableFuture.whenComplete((v,t)->{ Optional.ofNullable(v).ifPresent(System.out::println); Optional.ofNullable(t).ifPresent(x->x.printStackTrace()); }); } private static double get(){ try { Thread.sleep(RANDOM.nextInt(10000)); } catch (InterruptedException e) { e.printStackTrace(); } return RANDOM.nextDouble(); } }输出结果:===========no====block====.. 0.5192854493861505
2022年11月21日
140 阅读
0 评论
2 点赞
2022-03-15
SpringBoot整合定时任务和异步任务
SpringBoot整合定时任务和异步任务一、定时任务SpringBoot整合quartz-scheduler,执行定时任务。1、开启定时任务@EnableScheduling2、开启一个定时任务@Scheduled3、编写cron表达式cron表达式格式请参考官方文档4、实例package com.yanxizhu.ifamily.booking.controller.scheduler; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * SpringBoot整合Scheduling定时任务 * @author: <a href="mailto:vip@foxmail.com">清风</a> * @date: 2022/3/15 20:26 * @version: 1.0 */ @Slf4j @Component @EnableScheduling public class MyScheduler { /** * cron表达式 */ @Scheduled(cron = "1/3 1/1 * * * ? ") public void sayHell(){ log.info("Hello"); } }5、定时任务总结1、@EnableScheduling 开启一个定时任务2、@Scheduled 开启一个定时任务3、编写定时规则,也就是cron表达式。6、注意事项spring中6位组成,不允许第7位的年。在周几的位置,1-7代表周一到周日,MON-SUN定时任务不应该阻塞,默认时阻塞的,应该以异步任务执行。7、解决方案解决定时任务阻塞问题:使用异步+定时任务二、异步任务执行几种方式1、异步方式提交线程池可以让业务运行以异步的方式,自己提交到线程池(CompletableFuture异步编排)。@Slf4j @Component @EnableScheduling public class MyScheduler { public static ExecutorService executorService = Executors.newFixedThreadPool(10); /** * cron表达式 */ @Scheduled(cron = "1/3 1/1 * * * ? ") public void sayHell(){ CompletableFuture.runAsync(()->{ //调用server业务方法 }, executorService); } }2、定时任务线程池支持定时任务线程池(定时任务线程池)。Scheduling自动配置类,默认只有1个线程。// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.task; import java.util.concurrent.ScheduledExecutorService; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.LazyInitializationExcludeFilter; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.task.TaskSchedulingProperties.Shutdown; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.task.TaskSchedulerBuilder; import org.springframework.boot.task.TaskSchedulerCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @ConditionalOnClass({ThreadPoolTaskScheduler.class}) @Configuration( proxyBeanMethods = false ) @EnableConfigurationProperties({TaskSchedulingProperties.class}) @AutoConfigureAfter({TaskExecutionAutoConfiguration.class}) public class TaskSchedulingAutoConfiguration { public TaskSchedulingAutoConfiguration() { } @Bean @ConditionalOnBean( name = {"org.springframework.context.annotation.internalScheduledAnnotationProcessor"} ) @ConditionalOnMissingBean({SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class}) public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) { return builder.build(); } @Bean public static LazyInitializationExcludeFilter scheduledBeanLazyInitializationExcludeFilter() { return new ScheduledBeanLazyInitializationExcludeFilter(); } @Bean @ConditionalOnMissingBean public TaskSchedulerBuilder taskSchedulerBuilder(TaskSchedulingProperties properties, ObjectProvider<TaskSchedulerCustomizer> taskSchedulerCustomizers) { TaskSchedulerBuilder builder = new TaskSchedulerBuilder(); builder = builder.poolSize(properties.getPool().getSize()); Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); builder = builder.customizers(taskSchedulerCustomizers); return builder; } }默认只有一个线程package org.springframework.boot.autoconfigure.task; import java.time.Duration; import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties("spring.task.scheduling") public class TaskSchedulingProperties { private final TaskSchedulingProperties.Pool pool = new TaskSchedulingProperties.Pool(); private final TaskSchedulingProperties.Shutdown shutdown = new TaskSchedulingProperties.Shutdown(); private String threadNamePrefix = "scheduling-"; public TaskSchedulingProperties() { } public TaskSchedulingProperties.Pool getPool() { return this.pool; } public TaskSchedulingProperties.Shutdown getShutdown() { return this.shutdown; } public String getThreadNamePrefix() { return this.threadNamePrefix; } public void setThreadNamePrefix(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } public static class Shutdown { private boolean awaitTermination; private Duration awaitTerminationPeriod; public Shutdown() { } public boolean isAwaitTermination() { return this.awaitTermination; } public void setAwaitTermination(boolean awaitTermination) { this.awaitTermination = awaitTermination; } public Duration getAwaitTerminationPeriod() { return this.awaitTerminationPeriod; } public void setAwaitTerminationPeriod(Duration awaitTerminationPeriod) { this.awaitTerminationPeriod = awaitTerminationPeriod; } } public static class Pool { private int size = 1; public Pool() { } public int getSize() { return this.size; } public void setSize(int size) { this.size = size; } } }配置线程池数量spring.task.scheduling.pool.size=5这样定时任务就有5个线程可以执行了,如果1个线程,定时任务就会阻塞。3、让定时任务执行@EnableAsync 开启异步任务功能@Async 在希望异步执行的方法开启异步执行注解,普通方法也可以使用。默认线程数8异步任务自动配置类源码:// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.task; import java.util.concurrent.Executor; import java.util.stream.Stream; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.task.TaskExecutionProperties.Pool; import org.springframework.boot.autoconfigure.task.TaskExecutionProperties.Shutdown; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.task.TaskExecutorBuilder; import org.springframework.boot.task.TaskExecutorCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; import org.springframework.core.task.TaskDecorator; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @ConditionalOnClass({ThreadPoolTaskExecutor.class}) @Configuration( proxyBeanMethods = false ) @EnableConfigurationProperties({TaskExecutionProperties.class}) public class TaskExecutionAutoConfiguration { public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor"; public TaskExecutionAutoConfiguration() { } @Bean @ConditionalOnMissingBean public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) { Pool pool = properties.getPool(); TaskExecutorBuilder builder = new TaskExecutorBuilder(); builder = builder.queueCapacity(pool.getQueueCapacity()); builder = builder.corePoolSize(pool.getCoreSize()); builder = builder.maxPoolSize(pool.getMaxSize()); builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); builder = builder.keepAlive(pool.getKeepAlive()); Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); Stream var10001 = taskExecutorCustomizers.orderedStream(); var10001.getClass(); builder = builder.customizers(var10001::iterator); builder = builder.taskDecorator((TaskDecorator)taskDecorator.getIfUnique()); return builder; } @Lazy @Bean( name = {"applicationTaskExecutor", "taskExecutor"} ) @ConditionalOnMissingBean({Executor.class}) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); } }默认线程8package org.springframework.boot.autoconfigure.task; import java.time.Duration; import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties("spring.task.execution") public class TaskExecutionProperties { private final TaskExecutionProperties.Pool pool = new TaskExecutionProperties.Pool(); private final TaskExecutionProperties.Shutdown shutdown = new TaskExecutionProperties.Shutdown(); private String threadNamePrefix = "task-"; public TaskExecutionProperties() { } public TaskExecutionProperties.Pool getPool() { return this.pool; } public TaskExecutionProperties.Shutdown getShutdown() { return this.shutdown; } public String getThreadNamePrefix() { return this.threadNamePrefix; } public void setThreadNamePrefix(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } public static class Shutdown { private boolean awaitTermination; private Duration awaitTerminationPeriod; public Shutdown() { } public boolean isAwaitTermination() { return this.awaitTermination; } public void setAwaitTermination(boolean awaitTermination) { this.awaitTermination = awaitTermination; } public Duration getAwaitTerminationPeriod() { return this.awaitTerminationPeriod; } public void setAwaitTerminationPeriod(Duration awaitTerminationPeriod) { this.awaitTerminationPeriod = awaitTerminationPeriod; } } public static class Pool { private int queueCapacity = 2147483647; private int coreSize = 8; private int maxSize = 2147483647; private boolean allowCoreThreadTimeout = true; private Duration keepAlive = Duration.ofSeconds(60L); public Pool() { } public int getQueueCapacity() { return this.queueCapacity; } public void setQueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; } public int getCoreSize() { return this.coreSize; } public void setCoreSize(int coreSize) { this.coreSize = coreSize; } public int getMaxSize() { return this.maxSize; } public void setMaxSize(int maxSize) { this.maxSize = maxSize; } public boolean isAllowCoreThreadTimeout() { return this.allowCoreThreadTimeout; } public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) { this.allowCoreThreadTimeout = allowCoreThreadTimeout; } public Duration getKeepAlive() { return this.keepAlive; } public void setKeepAlive(Duration keepAlive) { this.keepAlive = keepAlive; } } }通过配置线程池线程数spring.task.execution.pool.core-size=5 spring.task.execution.pool.max-size=50实列import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j @Component @EnableAsync @EnableScheduling public class MyScheduler { public static ExecutorService executorService = Executors.newFixedThreadPool(10); /** * cron表达式 */ @Async @Scheduled(cron = "1/3 1/1 * * * ? ") public void sayHell(){ System.out.println("Hello ...."); } }要用线程池的时候,自己注入即可用了。
2022年03月15日
289 阅读
0 评论
12 点赞
2022-03-11
CompletableFuture异步编排
通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。因此我们可以使用completableFuture 异步编排方案。比如:一个业务场景,需要同时获取多个数据,如果同步线程挨个执行,则需要时间为所有线程执行时间的总和。如果我们使用异步线程执行,所需时间则为耗时最长那个异步线程的执行时间。如果多个异常线程之间还存在依赖关系,比如线程3需要线程1的执行结果,线程6依赖线程3、线程2,那这个问题怎么解决呢。那就可以使用completableFuture 异步编排方案实现。注意:completableFuture 是jdk1.8之后添加的一个功能。CompletableFuture接口:public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {public interface Future<V> {以前用到的FutureTask就是用到的Future可以得到返回结果public class FutureTask<V> implements RunnableFuture<V> {public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }Future可以得到返回结果CompletableFuture随便一个方法,都接受一个Function public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn) { return orApplyStage(null, other, fn); }@FunctionalInterface public interface Function<T, R> {Function是一个@FunctionalInterface,所以对Lambda使用要熟悉。CompletableFuture异步编排问题多个异步线程远程调用会导致丢失请求头,原因是多个线程,通过拦截器不能获取其它线程的请求的heard信息。解决方案:每个线程共享自己的requestAttributes。自定义feign拦截器:import feign.RequestInterceptor; import feign.RequestTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; /** * @description: TODO * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/10 23:03 * @version: 1.0 */ @Configuration public class FamilyFegin { @Bean(name ="requestInterceptor") public RequestInterceptor requestInterceptor(){ return new RequestInterceptor() { @Override public void apply(RequestTemplate requestTemplate) { //RequestContextHolder拿到刚请求来的数据 ServletRequestAttributes requestAttributes = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes(); System.out.println("requestAttributes线程"+Thread.currentThread().getId()); HttpServletRequest request = requestAttributes.getRequest(); System.out.println("调用feign之前"); if(request != null){ //同步请求头数据 String cookie = request.getHeader("Cookie");//老数据 //给新请求同步老请求的cookie requestTemplate.header("Cookie", cookie); } } }; } }web配置:因为需要拦截器生效import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; /** * @description: WEB配置 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/10 22:40 * @version: 1.0 */ public class FamilyWebConfig implements WebMvcConfigurer { @Autowired LoginUserInterceptor loginUserInterceptor; @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(loginUserInterceptor).addPathPatterns("/**"); } }CompletableFuture异步编排import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @description: CompletableFuture异步编排 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/10 23:32 * @version: 1.0 */ public class MyCompletableFuture { public static ExecutorService executorService = Executors.newFixedThreadPool(10); public void myOpenFeign() throws ExecutionException, InterruptedException { System.out.println("主线程0"); //远程调用存在丢失请求头的问题,因为不在同一线程,导致自定义拦截器不能获取head信息。 //解决丢失请求头方案: ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); //第一个异步任务 CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> { System.out.println("辅线程1"); //每个线程都共享一下自己的requestAttributes RequestContextHolder.setRequestAttributes(requestAttributes); //异步线程远程调用,业务1 //异步线程调用业务代码 }, executorService); //第二个异步任务 CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> { System.out.println("辅线程2"); //每个线程都共享一下自己的requestAttributes RequestContextHolder.setRequestAttributes(requestAttributes); //异步线程远程调用,业务2 //异步线程调用业务代码 }, executorService); CompletableFuture.allOf(oneFuture, twoFuture).get(); } }以上就是解决CompletableFuture异步编排,异步多线程引起的远程调用请求丢失解决方案。代码中共享requestAttributes原因ThreadLocal数据:ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();RequestContextHolder主线程不一样获取的数据不一样,如下:import javax.faces.context.FacesContext; import org.springframework.core.NamedInheritableThreadLocal; import org.springframework.core.NamedThreadLocal; import org.springframework.lang.Nullable; import org.springframework.util.ClassUtils; public abstract class RequestContextHolder { private static final boolean jsfPresent = ClassUtils.isPresent("javax.faces.context.FacesContext", RequestContextHolder.class.getClassLoader()); private static final ThreadLocal<RequestAttributes> requestAttributesHolder = new NamedThreadLocal("Request attributes"); private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder = new NamedInheritableThreadLocal("Request context"); public RequestContextHolder() { }
2022年03月11日
407 阅读
0 评论
5 点赞