首页
关于
友链
Search
1
wlop 4K 壁纸 4k8k 动态 壁纸
713 阅读
2
Docker搭建Typecho博客
566 阅读
3
Nacos持久化MySQL问题-解决方案
430 阅读
4
SpringBoot整合SpringCache
388 阅读
5
keytool证书导入
381 阅读
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
开发工具
百度网盘资源
天翼网盘资源
阿里网盘资源
登录
Search
标签搜索
java
javase
docker
java8
springboot
thread
spring
分布式
mysql
锁
linux
redis
源码
typecho
centos
git
map
lambda
stream
nginx
少年
累计撰写
183
篇文章
累计收到
10
条评论
首页
栏目
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
开发工具
百度网盘资源
天翼网盘资源
阿里网盘资源
页面
关于
友链
搜索到
8
篇与
的结果
2022-03-24
SpringBoot整合SpringCache
SpringBoot整合SpringCache1、引入依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-cache</artifactId> </dependency>2、引入redis依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>配置redis地址密码等等,可参考springboo整合redis3、默认配置自动配置// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.cache; import java.util.List; import java.util.stream.Collectors; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.data.couchbase.CouchbaseDataAutoConfiguration; import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; import org.springframework.boot.autoconfigure.hazelcast.HazelcastAutoConfiguration; import org.springframework.boot.autoconfigure.orm.jpa.EntityManagerFactoryDependsOnPostProcessor; import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cache.CacheManager; import org.springframework.cache.interceptor.CacheAspectSupport; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.ImportSelector; import org.springframework.core.type.AnnotationMetadata; import org.springframework.orm.jpa.AbstractEntityManagerFactoryBean; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.util.Assert; @Configuration( proxyBeanMethods = false ) @ConditionalOnClass({CacheManager.class}) @ConditionalOnBean({CacheAspectSupport.class}) @ConditionalOnMissingBean( value = {CacheManager.class}, name = {"cacheResolver"} ) @EnableConfigurationProperties({CacheProperties.class}) @AutoConfigureAfter({CouchbaseDataAutoConfiguration.class, HazelcastAutoConfiguration.class, HibernateJpaAutoConfiguration.class, RedisAutoConfiguration.class}) @Import({CacheAutoConfiguration.CacheConfigurationImportSelector.class, CacheAutoConfiguration.CacheManagerEntityManagerFactoryDependsOnPostProcessor.class}) public class CacheAutoConfiguration { public CacheAutoConfiguration() { } @Bean @ConditionalOnMissingBean public CacheManagerCustomizers cacheManagerCustomizers(ObjectProvider<CacheManagerCustomizer<?>> customizers) { return new CacheManagerCustomizers((List)customizers.orderedStream().collect(Collectors.toList())); } @Bean public CacheAutoConfiguration.CacheManagerValidator cacheAutoConfigurationValidator(CacheProperties cacheProperties, ObjectProvider<CacheManager> cacheManager) { return new CacheAutoConfiguration.CacheManagerValidator(cacheProperties, cacheManager); } static class CacheConfigurationImportSelector implements ImportSelector { CacheConfigurationImportSelector() { } public String[] selectImports(AnnotationMetadata importingClassMetadata) { CacheType[] types = CacheType.values(); String[] imports = new String[types.length]; for(int i = 0; i < types.length; ++i) { imports[i] = CacheConfigurations.getConfigurationClass(types[i]); } return imports; } } static class CacheManagerValidator implements InitializingBean { private final CacheProperties cacheProperties; private final ObjectProvider<CacheManager> cacheManager; CacheManagerValidator(CacheProperties cacheProperties, ObjectProvider<CacheManager> cacheManager) { this.cacheProperties = cacheProperties; this.cacheManager = cacheManager; } public void afterPropertiesSet() { Assert.notNull(this.cacheManager.getIfAvailable(), () -> { return "No cache manager could be auto-configured, check your configuration (caching type is '" + this.cacheProperties.getType() + "')"; }); } } @ConditionalOnClass({LocalContainerEntityManagerFactoryBean.class}) @ConditionalOnBean({AbstractEntityManagerFactoryBean.class}) static class CacheManagerEntityManagerFactoryDependsOnPostProcessor extends EntityManagerFactoryDependsOnPostProcessor { CacheManagerEntityManagerFactoryDependsOnPostProcessor() { super(new String[]{"cacheManager"}); } } }所有相关配置都在CacheProperties。重点关注自动配置中的: static class CacheConfigurationImportSelector implements ImportSelector { CacheConfigurationImportSelector() { } public String[] selectImports(AnnotationMetadata importingClassMetadata) { CacheType[] types = CacheType.values(); String[] imports = new String[types.length]; for(int i = 0; i < types.length; ++i) { imports[i] = CacheConfigurations.getConfigurationClass(types[i]); } return imports; } }CacheConfigurations.getConfigurationClass(types[i]); static String getConfigurationClass(CacheType cacheType) { String configurationClassName = (String)MAPPINGS.get(cacheType); Assert.state(configurationClassName != null, () -> { return "Unknown cache type " + cacheType; }); return configurationClassName; }MAPPINGS.get(cacheType);static { Map<CacheType, String> mappings = new EnumMap(CacheType.class); mappings.put(CacheType.GENERIC, GenericCacheConfiguration.class.getName()); mappings.put(CacheType.EHCACHE, EhCacheCacheConfiguration.class.getName()); mappings.put(CacheType.HAZELCAST, HazelcastCacheConfiguration.class.getName()); mappings.put(CacheType.INFINISPAN, InfinispanCacheConfiguration.class.getName()); mappings.put(CacheType.JCACHE, JCacheCacheConfiguration.class.getName()); mappings.put(CacheType.COUCHBASE, CouchbaseCacheConfiguration.class.getName()); mappings.put(CacheType.REDIS, RedisCacheConfiguration.class.getName()); mappings.put(CacheType.CAFFEINE, CaffeineCacheConfiguration.class.getName()); mappings.put(CacheType.SIMPLE, SimpleCacheConfiguration.class.getName()); mappings.put(CacheType.NONE, NoOpCacheConfiguration.class.getName()); MAPPINGS = Collections.unmodifiableMap(mappings); }mappings.put(CacheType.REDIS, RedisCacheConfiguration.class.getName());CacheAutoConfiguration会导入RedisCacheConfiguration;// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.cache; import java.util.LinkedHashSet; import java.util.List; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.cache.CacheProperties.Redis; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; import org.springframework.cache.CacheManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ResourceLoader; import org.springframework.data.redis.cache.RedisCacheManager; import org.springframework.data.redis.cache.RedisCacheManager.RedisCacheManagerBuilder; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializationContext.SerializationPair; @Configuration( proxyBeanMethods = false ) @ConditionalOnClass({RedisConnectionFactory.class}) @AutoConfigureAfter({RedisAutoConfiguration.class}) @ConditionalOnBean({RedisConnectionFactory.class}) @ConditionalOnMissingBean({CacheManager.class}) @Conditional({CacheCondition.class}) class RedisCacheConfiguration { RedisCacheConfiguration() { } @Bean RedisCacheManager cacheManager(CacheProperties cacheProperties, CacheManagerCustomizers cacheManagerCustomizers, ObjectProvider<org.springframework.data.redis.cache.RedisCacheConfiguration> redisCacheConfiguration, ObjectProvider<RedisCacheManagerBuilderCustomizer> redisCacheManagerBuilderCustomizers, RedisConnectionFactory redisConnectionFactory, ResourceLoader resourceLoader) { RedisCacheManagerBuilder builder = RedisCacheManager.builder(redisConnectionFactory).cacheDefaults(this.determineConfiguration(cacheProperties, redisCacheConfiguration, resourceLoader.getClassLoader())); List<String> cacheNames = cacheProperties.getCacheNames(); if (!cacheNames.isEmpty()) { builder.initialCacheNames(new LinkedHashSet(cacheNames)); } if (cacheProperties.getRedis().isEnableStatistics()) { builder.enableStatistics(); } redisCacheManagerBuilderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(builder); }); return (RedisCacheManager)cacheManagerCustomizers.customize(builder.build()); } private org.springframework.data.redis.cache.RedisCacheConfiguration determineConfiguration(CacheProperties cacheProperties, ObjectProvider<org.springframework.data.redis.cache.RedisCacheConfiguration> redisCacheConfiguration, ClassLoader classLoader) { return (org.springframework.data.redis.cache.RedisCacheConfiguration)redisCacheConfiguration.getIfAvailable(() -> { return this.createConfiguration(cacheProperties, classLoader); }); } private org.springframework.data.redis.cache.RedisCacheConfiguration createConfiguration(CacheProperties cacheProperties, ClassLoader classLoader) { Redis redisProperties = cacheProperties.getRedis(); org.springframework.data.redis.cache.RedisCacheConfiguration config = org.springframework.data.redis.cache.RedisCacheConfiguration.defaultCacheConfig(); config = config.serializeValuesWith(SerializationPair.fromSerializer(new JdkSerializationRedisSerializer(classLoader))); if (redisProperties.getTimeToLive() != null) { config = config.entryTtl(redisProperties.getTimeToLive()); } if (redisProperties.getKeyPrefix() != null) { config = config.prefixCacheNameWith(redisProperties.getKeyPrefix()); } if (!redisProperties.isCacheNullValues()) { config = config.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { config = config.disableKeyPrefix(); } return config; } } 关注里面的方法:@Bean RedisCacheManager cacheManager(CacheProperties cacheProperties, CacheManagerCustomizers cacheManagerCustomizers, ObjectProvider<org.springframework.data.redis.cache.RedisCacheConfiguration> redisCacheConfiguration, ObjectProvider<RedisCacheManagerBuilderCustomizer> redisCacheManagerBuilderCustomizers, RedisConnectionFactory redisConnectionFactory, ResourceLoader resourceLoader) { RedisCacheManagerBuilder builder = RedisCacheManager.builder(redisConnectionFactory).cacheDefaults(this.determineConfiguration(cacheProperties, redisCacheConfiguration, resourceLoader.getClassLoader())); List<String> cacheNames = cacheProperties.getCacheNames(); if (!cacheNames.isEmpty()) { builder.initialCacheNames(new LinkedHashSet(cacheNames)); } if (cacheProperties.getRedis().isEnableStatistics()) { builder.enableStatistics(); } redisCacheManagerBuilderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(builder); }); return (RedisCacheManager)cacheManagerCustomizers.customize(builder.build()); }相当于自动配置好了缓存管理器RedisCacheManager,cacheProperties.getCacheNames();方法会读取yml里面的配置,接着初始化缓存builder.initialCacheNames(new LinkedHashSet(cacheNames)); public RedisCacheManager.RedisCacheManagerBuilder initialCacheNames(Set<String> cacheNames) { Assert.notNull(cacheNames, "CacheNames must not be null!"); cacheNames.forEach((it) -> { this.withCacheConfiguration(it, this.defaultCacheConfiguration); }); return this; }this.withCacheConfiguration(it, this.defaultCacheConfiguration);public RedisCacheManager.RedisCacheManagerBuilder withCacheConfiguration(String cacheName, RedisCacheConfiguration cacheConfiguration) { Assert.notNull(cacheName, "CacheName must not be null!"); Assert.notNull(cacheConfiguration, "CacheConfiguration must not be null!"); this.initialCaches.put(cacheName, cacheConfiguration); return this; }RedisCacheConfiguration中默认缓存定义规则:private org.springframework.data.redis.cache.RedisCacheConfiguration createConfiguration(CacheProperties cacheProperties, ClassLoader classLoader) { Redis redisProperties = cacheProperties.getRedis(); org.springframework.data.redis.cache.RedisCacheConfiguration config = org.springframework.data.redis.cache.RedisCacheConfiguration.defaultCacheConfig(); config = config.serializeValuesWith(SerializationPair.fromSerializer(new JdkSerializationRedisSerializer(classLoader))); if (redisProperties.getTimeToLive() != null) { config = config.entryTtl(redisProperties.getTimeToLive()); } if (redisProperties.getKeyPrefix() != null) { config = config.prefixCacheNameWith(redisProperties.getKeyPrefix()); } if (!redisProperties.isCacheNullValues()) { config = config.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { config = config.disableKeyPrefix(); } return config; }redisProperties也就是从yml中得到的配置。4、配置需要配置的地方:配置使用redis作为缓存spring.cache.type=redis使用缓存,官方文档 https://docs.spring.io/spring-framework/docs/5.3.18-SNAPSHOT/reference/html/integration.html#cache重点关注几个注解:@Cacheable: Triggers cache population. @CacheEvict: Triggers cache eviction. @CachePut: Updates the cache without interfering with the method execution. @Caching: Regroups multiple cache operations to be applied on a method. @CacheConfig: Shares some common cache-related settings at class-level.@Cacheable: Triggers cache population.触发将数据保存到缓存的操作@CacheEvict: Triggers cache eviction.触发将数据从缓存删除的操作@CachePut: Updates the cache without interfering with the method execution.不影响方法执行更新缓存@Caching: Regroups multiple cache operations to be applied on a method.组合以上多个操作@CacheConfig: Shares some common cache-related settings at class-level.在类级别共享缓存的相同配置5、自定义规则第一步、启动类上开启缓存功能:@EnableCaching第二部、只需要注解就能完成缓存操作@Cacheable:1、代表当前方法的结果需要缓存,如果缓存中有,方法不用调用。如果缓存中没有,会调用方法,最后将方法的结果放入缓存。2、每一个需要缓存的数据我们都来指定要翻到那个名字的缓存【缓存的分区,推荐按照业务类型分】。3、key默认自动生成,value默认是使用jdk默认序列化机制。默认时间是-1,永不过期。因此需要我们自定义规则:自定义规则/** * @description: Cache redis数据格式配置 * @date: 2022/2/17 21:59 * @version: 1.0 */ import com.alibaba.fastjson.support.spring.GenericFastJsonRedisSerializer; import org.springframework.boot.autoconfigure.cache.CacheProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.cache.RedisCacheConfiguration; import org.springframework.data.redis.cache.RedisCacheManager; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.data.redis.serializer.StringRedisSerializer; @EnableConfigurationProperties(CacheProperties.class) @Configuration @EnableCaching public class MyCacheConfig { @Bean RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties){ RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig(); config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())); config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer())); CacheProperties.Redis redisProperties = cacheProperties.getRedis(); if (redisProperties.getTimeToLive() != null) { config = config.entryTtl(redisProperties.getTimeToLive()); } // if (redisProperties.getKeyPrefix() != null) { // config = config.prefixCacheNameWith(redisProperties.getKeyPrefix()); // } if (!redisProperties.isCacheNullValues()) { config = config.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { config = config.disableKeyPrefix(); } return config; } }6、使用示例// 使用缓存注解方式,key:为方法名 @Override @Cacheable(value = {"record"},key = "#root.method.name") public RecordEntity getRecordAllInfoById(Long id) { //未使用注解缓存方案 // RecordEntity recordEntity = null; // // ValueOperations<String, String> forValue = redisTemplate.opsForValue(); // String recordData = forValue.get("RecordData"); // if(recordData==null){ // System.out.println("缓存没数据,执行查询数据库方法。。。。"); // recordEntity = getEntityByIdByFbsRedisson(id); // }else{ // System.out.println("从缓存中获取数据。。。。。"); // recordEntity = JSON.parseObject(recordData, new TypeReference<RecordEntity>() { // }); // } //使用注解缓存后 RecordEntity recordEntity = getEntityById(id); if(recordEntity!=null){ Long categoryId = recordEntity.getCategoryId(); Long[] catelogPath = findCatelogPath(categoryId); recordEntity.setCatelogPath(catelogPath); } return recordEntity; }key使用SPEL表达式规则可参考官网中 Using Custom Annotations使用规则。https://docs.spring.io/spring-framework/docs/5.3.18-SNAPSHOT/reference/html/integration.html#cache-annotation-stereotype
2022年03月24日
388 阅读
0 评论
8 点赞
2022-03-15
SpringBoot整合定时任务和异步任务
SpringBoot整合定时任务和异步任务一、定时任务SpringBoot整合quartz-scheduler,执行定时任务。1、开启定时任务@EnableScheduling2、开启一个定时任务@Scheduled3、编写cron表达式cron表达式格式请参考官方文档4、实例package com.yanxizhu.ifamily.booking.controller.scheduler; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * SpringBoot整合Scheduling定时任务 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/15 20:26 * @version: 1.0 */ @Slf4j @Component @EnableScheduling public class MyScheduler { /** * cron表达式 */ @Scheduled(cron = "1/3 1/1 * * * ? ") public void sayHell(){ log.info("Hello"); } }5、定时任务总结1、@EnableScheduling 开启一个定时任务2、@Scheduled 开启一个定时任务3、编写定时规则,也就是cron表达式。6、注意事项spring中6位组成,不允许第7位的年。在周几的位置,1-7代表周一到周日,MON-SUN定时任务不应该阻塞,默认时阻塞的,应该以异步任务执行。7、解决方案解决定时任务阻塞问题:使用异步+定时任务二、异步任务执行几种方式1、异步方式提交线程池可以让业务运行以异步的方式,自己提交到线程池(CompletableFuture异步编排)。@Slf4j @Component @EnableScheduling public class MyScheduler { public static ExecutorService executorService = Executors.newFixedThreadPool(10); /** * cron表达式 */ @Scheduled(cron = "1/3 1/1 * * * ? ") public void sayHell(){ CompletableFuture.runAsync(()->{ //调用server业务方法 }, executorService); } }2、定时任务线程池支持定时任务线程池(定时任务线程池)。Scheduling自动配置类,默认只有1个线程。// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.task; import java.util.concurrent.ScheduledExecutorService; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.LazyInitializationExcludeFilter; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.task.TaskSchedulingProperties.Shutdown; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.task.TaskSchedulerBuilder; import org.springframework.boot.task.TaskSchedulerCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @ConditionalOnClass({ThreadPoolTaskScheduler.class}) @Configuration( proxyBeanMethods = false ) @EnableConfigurationProperties({TaskSchedulingProperties.class}) @AutoConfigureAfter({TaskExecutionAutoConfiguration.class}) public class TaskSchedulingAutoConfiguration { public TaskSchedulingAutoConfiguration() { } @Bean @ConditionalOnBean( name = {"org.springframework.context.annotation.internalScheduledAnnotationProcessor"} ) @ConditionalOnMissingBean({SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class}) public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) { return builder.build(); } @Bean public static LazyInitializationExcludeFilter scheduledBeanLazyInitializationExcludeFilter() { return new ScheduledBeanLazyInitializationExcludeFilter(); } @Bean @ConditionalOnMissingBean public TaskSchedulerBuilder taskSchedulerBuilder(TaskSchedulingProperties properties, ObjectProvider<TaskSchedulerCustomizer> taskSchedulerCustomizers) { TaskSchedulerBuilder builder = new TaskSchedulerBuilder(); builder = builder.poolSize(properties.getPool().getSize()); Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); builder = builder.customizers(taskSchedulerCustomizers); return builder; } }默认只有一个线程package org.springframework.boot.autoconfigure.task; import java.time.Duration; import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties("spring.task.scheduling") public class TaskSchedulingProperties { private final TaskSchedulingProperties.Pool pool = new TaskSchedulingProperties.Pool(); private final TaskSchedulingProperties.Shutdown shutdown = new TaskSchedulingProperties.Shutdown(); private String threadNamePrefix = "scheduling-"; public TaskSchedulingProperties() { } public TaskSchedulingProperties.Pool getPool() { return this.pool; } public TaskSchedulingProperties.Shutdown getShutdown() { return this.shutdown; } public String getThreadNamePrefix() { return this.threadNamePrefix; } public void setThreadNamePrefix(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } public static class Shutdown { private boolean awaitTermination; private Duration awaitTerminationPeriod; public Shutdown() { } public boolean isAwaitTermination() { return this.awaitTermination; } public void setAwaitTermination(boolean awaitTermination) { this.awaitTermination = awaitTermination; } public Duration getAwaitTerminationPeriod() { return this.awaitTerminationPeriod; } public void setAwaitTerminationPeriod(Duration awaitTerminationPeriod) { this.awaitTerminationPeriod = awaitTerminationPeriod; } } public static class Pool { private int size = 1; public Pool() { } public int getSize() { return this.size; } public void setSize(int size) { this.size = size; } } }配置线程池数量spring.task.scheduling.pool.size=5这样定时任务就有5个线程可以执行了,如果1个线程,定时任务就会阻塞。3、让定时任务执行@EnableAsync 开启异步任务功能@Async 在希望异步执行的方法开启异步执行注解,普通方法也可以使用。默认线程数8异步任务自动配置类源码:// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.task; import java.util.concurrent.Executor; import java.util.stream.Stream; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.task.TaskExecutionProperties.Pool; import org.springframework.boot.autoconfigure.task.TaskExecutionProperties.Shutdown; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.task.TaskExecutorBuilder; import org.springframework.boot.task.TaskExecutorCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; import org.springframework.core.task.TaskDecorator; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @ConditionalOnClass({ThreadPoolTaskExecutor.class}) @Configuration( proxyBeanMethods = false ) @EnableConfigurationProperties({TaskExecutionProperties.class}) public class TaskExecutionAutoConfiguration { public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor"; public TaskExecutionAutoConfiguration() { } @Bean @ConditionalOnMissingBean public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) { Pool pool = properties.getPool(); TaskExecutorBuilder builder = new TaskExecutorBuilder(); builder = builder.queueCapacity(pool.getQueueCapacity()); builder = builder.corePoolSize(pool.getCoreSize()); builder = builder.maxPoolSize(pool.getMaxSize()); builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); builder = builder.keepAlive(pool.getKeepAlive()); Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); Stream var10001 = taskExecutorCustomizers.orderedStream(); var10001.getClass(); builder = builder.customizers(var10001::iterator); builder = builder.taskDecorator((TaskDecorator)taskDecorator.getIfUnique()); return builder; } @Lazy @Bean( name = {"applicationTaskExecutor", "taskExecutor"} ) @ConditionalOnMissingBean({Executor.class}) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); } }默认线程8package org.springframework.boot.autoconfigure.task; import java.time.Duration; import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties("spring.task.execution") public class TaskExecutionProperties { private final TaskExecutionProperties.Pool pool = new TaskExecutionProperties.Pool(); private final TaskExecutionProperties.Shutdown shutdown = new TaskExecutionProperties.Shutdown(); private String threadNamePrefix = "task-"; public TaskExecutionProperties() { } public TaskExecutionProperties.Pool getPool() { return this.pool; } public TaskExecutionProperties.Shutdown getShutdown() { return this.shutdown; } public String getThreadNamePrefix() { return this.threadNamePrefix; } public void setThreadNamePrefix(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } public static class Shutdown { private boolean awaitTermination; private Duration awaitTerminationPeriod; public Shutdown() { } public boolean isAwaitTermination() { return this.awaitTermination; } public void setAwaitTermination(boolean awaitTermination) { this.awaitTermination = awaitTermination; } public Duration getAwaitTerminationPeriod() { return this.awaitTerminationPeriod; } public void setAwaitTerminationPeriod(Duration awaitTerminationPeriod) { this.awaitTerminationPeriod = awaitTerminationPeriod; } } public static class Pool { private int queueCapacity = 2147483647; private int coreSize = 8; private int maxSize = 2147483647; private boolean allowCoreThreadTimeout = true; private Duration keepAlive = Duration.ofSeconds(60L); public Pool() { } public int getQueueCapacity() { return this.queueCapacity; } public void setQueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; } public int getCoreSize() { return this.coreSize; } public void setCoreSize(int coreSize) { this.coreSize = coreSize; } public int getMaxSize() { return this.maxSize; } public void setMaxSize(int maxSize) { this.maxSize = maxSize; } public boolean isAllowCoreThreadTimeout() { return this.allowCoreThreadTimeout; } public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) { this.allowCoreThreadTimeout = allowCoreThreadTimeout; } public Duration getKeepAlive() { return this.keepAlive; } public void setKeepAlive(Duration keepAlive) { this.keepAlive = keepAlive; } } }通过配置线程池线程数spring.task.execution.pool.core-size=5 spring.task.execution.pool.max-size=50实列import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j @Component @EnableAsync @EnableScheduling public class MyScheduler { public static ExecutorService executorService = Executors.newFixedThreadPool(10); /** * cron表达式 */ @Async @Scheduled(cron = "1/3 1/1 * * * ? ") public void sayHell(){ System.out.println("Hello ...."); } }要用线程池的时候,自己注入即可用了。
2022年03月15日
142 阅读
0 评论
12 点赞
2022-03-14
SpringBoot整合Sleuth+Zipkin 服务链路追踪
SpringBoot整合Sleuth+Zipkin 服务链路追踪一、整合 Sleu1、服务提供者与消费者导入依赖<!--整合 Sleut:服务提供者与消费者导入依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> <version>3.1.1</version> </dependency>2、打开 debug 日志logging: level: org.springframework.cloud.openfeign: debug org.springframework.cloud.sleuth: debug3、发起一次远程调用,观察控制台DEBUG [user-service,541450f08573fff5,541450f08573fff5,false]user-service:服务名 541450f08573fff5:是 TranceId,一条链路中,只有一个 TranceId 541450f08573fff5:是 spanId,链路中的基本工作单元 id false:表示是否将数据输出到其他服务,true 则会把信息输出到其他可视化的服务上观察二、整合 zipkin 可视化观察1、docker 安装 zipkin 服务器docker run --name zipkin -d -p 9411:9411 openzipkin/zipkin设置随docker启动docker update --restart=always zipkin2、导入依赖<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zipkin</artifactId> <version>2.2.8.RELEASE</version> </dependency>因为spring-cloud-starter-zipkin中引入了spring-cloud-starter-sleuth,因此上面的spring-cloud-starter-sleuth可以不用引入。3、添加 zipkin 相关配置spring: application: name: ifamily-booking zipkin: base-url: http://127.0.0.1:9411/ # zipkin 服务器的地址 # 关闭服务发现,否则 Spring Cloud 会把 zipkin 的 url 当做服务名称 discoveryClientEnabled: false sender: type: web # 设置使用 http 的方式传输数据 sleuth: sampler: probability: 1 # 设置抽样采集率为 100%,默认为 0.1,即 10%4、查看通过http://127.0.0.1:9411即可查看链路追踪信息。三、持久化链路追踪数据到ES启动命令docker run --env STORAGE_TYPE=elasticsearch --env ES_HOSTS=192.168.56.10:9200 openzipkin/zipkin-dependencies注意:这个是openzipkin/zipkin-dependencies,改版后的。
2022年03月14日
214 阅读
0 评论
7 点赞
2022-03-13
SpringBoot整合Sentinel
SpringBoot整合Sentinel1、引入依赖<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency>2、下载安装Sentinel可视化控制台下载自己引入sentinel对应的核心包版本,下载地址:https://github.com/alibaba/Sentinel/releases运行Sentinel可视化控制台java -jar sentinel-dashboard-1.8.1.jar注意自己的版本。打开http://127.0.0.1:8080/,默认账号密码sentinel3、微服务配置sentinelapplication.ymlspring: cloud: sentinel: transport: port: 8719 dashboard: localhost:8080port: 8719端口随意,只要不被占用,用于各微服务与控制台通讯。4、查看监控信息启动微服务,随意访问一个接口,Sentinel控制台即可看到实施监控信息。5、启用相关功能可在Sentinel控制台调整相应功能。默认所有流控设置都是保存在内存中,重启服务就没有了。6、添加监控图标引入审计start,sentinel会自动根据spring-boot-starter-actuator监控。<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> <version>2.6.1</version> </dependency>开启监控图标management.endpoints.web.exposure.include=*7、自定义流控返回提示信息import com.alibaba.csp.sentinel.adapter.spring.webmvc.callback.BlockExceptionHandler; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.fastjson.JSON; import com.yanxizhu.common.utils.R; import org.springframework.context.annotation.Configuration; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * @description: Sentinel流控信息提示自定i * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/13 20:14 * @version: 1.0 */ @Configuration public class MySentinelConfig implements BlockExceptionHandler { @Override public void handle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, BlockException e) throws Exception { R r = R.error(10000, "自定义提示"); httpServletResponse.setContentType("application/json;charset=utf-8"); httpServletResponse.getWriter().write(JSON.toJSONString(r)); } }限流规则可参考官网限流文档。每一个微服务都有一个自己的自定义流控返回提示信息,其它配置一样,只是提示信息不同。8、熔断、降级针对Feign远程调用熔断引入Feign远程调用依赖<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency>打开 Sentinel 对 Feign 的支持:feign.sentinel.enabled=true熔断实现@FeignClient(value = "renren-fast",fallback = SeckillFeignServiceFallBack.class) public interface CardFeignService { @GetMapping("/sys/user/list") public R list(@RequestParam Map<String, Object> params); }熔断回调@S1f4j @Component public class SeckillFeignServiceFallBack implements SeckillFeignService{ @Override public R getskuseckillInfo(Long skuld) { Log.info("熔断方法调用...getskuSecki11Info"); return R.error(com.yanxizhu.family.common.exception.BizCodeEnume.TOO_MWANY_REQUEST.getCode(); com.yanxizhu.family.common.exception.BizCodeEnume.TOO_MANY_REQUEST.getAsg()); } }调用方:手动指定远程服务的降级策略。远程服务被降级处理。触发我们的熔断回调方法。提供方:超大浏览的时侯,必须牺牲一些远程服务。在服务的提供方(远程服务)指定降级策略;提供方是在运行。但是不运行自己的业务逻辑,返回的是默认的降级数据(限流的数据)。@SentinelResource 注解用来标识资源是否被限流、降级。上述例子上该注解的属性 sayHello 表示资源名。@SentinelResource 还提供了其它额外的属性如 blockHandler,blockHandlerClass,fallback 用于表示限流或降级的操作(注意有方法签名要求),更多内容可以参考 Sentinel 注解支持文档。若不配置 blockHandler、fallback 等函数,则被流控降级时方法会直接抛出对应的 BlockException;若方法未定义 throws BlockException 则会被 JVM 包装一层 UndeclaredThrowableException。注:一般推荐将 @SentinelResource 注解加到服务实现上,而在 Web 层直接使用 Spring Cloud Alibaba 自带的 Web 埋点适配。Sentinel Web 适配同样支持配置自定义流控处理逻辑,参考 相关文档。9、自定义受保护的资源1)、代码try(Entry entry =SphU.entry("seckillSkus"))(/业务逻辑catch(Execption e)(}2)、基于注解。eSentinelResource(value ="getCurrentSeckiLLSkusResource",blockHandler ="blockHandler")无论是1,2方式一定要配置被限流以后的默认返回注:一般推荐将 @SentinelResource 注解加到服务实现上,而在 Web 层直接使用 Spring Cloud Alibaba 自带的 Web 埋点适配。Sentinel Web 适配同样支持配置自定义流控处理逻辑,参考 相关文档。10、网关限流引入网关sentinel依赖<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-spring-cloud-gateway-adapter</artifactId> <version>x.y.z</version> </dependency>使用时只需注入对应的 SentinelGatewayFilter 实例以及 SentinelGatewayBlockExceptionHandler 实例即可(若使用了 Spring Cloud Alibaba Sentinel,则只需按照文档进行配置即可,无需自己加 Configuration)。更多网关限流可参考官方文档11、定制网关流控返回参考官方文档
2022年03月13日
125 阅读
0 评论
4 点赞
2022-03-13
SpringBoot整合RabbitMQ
SpringBoot整合RabbitMQ1、引入依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.6.3</version> </dependency>2、配置rabbitmq通过RabbitProperties.properties配置文件配置链接 rabbitmq: host: 192.168.56.10 virtual-host: / port: 5672 publisher-confirm-type: correlated publisher-returns: true template: mandatory: true重点:#开启发送端确认机制,发送到Broke的确认 publisher-confirm-type: correlated #开启时消息发送到队列的消息确认机制 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个Returnconfirm template: mandatory: true3、开启Rabbit启动类添加@EnableRabbit注解@EnableRabbit @SpringBootApplication public class FamilyBookingApplication { public static void main(String[] args) { SpringApplication.run(FamilyBookingApplication.class, args); } }4、Rabbit使用现在RabbitAutoConfiguration就生效了,就有了组件RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate5、使用测试1、创建Exchange、Queue、Binding使用AmqpAdmin创建。通过创建各交换机构造方法,找到对应接口的实现,传入参数即可创建Exchange。public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) @Autowired AmqpAdmin amqpAdmin; /** * 创建Exchange交换机 */ @Test public void craeteChage() { /** * public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) */ DirectExchange directExchange = new DirectExchange("Family-Hello-Exchange",true,false); amqpAdmin.declareExchange(directExchange); } /** * 创建Queue交换机 */ @Test public void craetQueue() { /** * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments) */ Queue queue = new Queue("Family-Hello-Queue", true,false, false); amqpAdmin.declareQueue(queue); } /** * 创建Bind交换机 */ @Test public void craetBind() { /** * public Binding(String destination, 绑定目标 * Binding.DestinationType destinationType, 绑定类型 * String exchange, 交换机 * String routingKey, router-key * @Nullable Map<String, Object> arguments 参数) { */ Binding binding = new Binding("Family-Hello-Queue", Binding.DestinationType.QUEUE, "Family-Hello-Exchange", "Java.hello", null); amqpAdmin.declareBinding(binding); }上面为DirectExchange类型交换机的创建,其它类型Exchange类似。更多API可参考AmqpAdmin的各方法。2、收发消息发送消息 /** * rabbitTemplate 发送消息 */ @Test public String sendMessage(){ //发送字符串 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello","Hello RabbitMQ 。。。",new CorrelationData(UUID.randomUUID().toString())); //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setAddress("rabbitMQ测试-会员地址信息"); memberEntity.setEmail("rabbitMQ测试-会员邮箱,batis@foxmial.com"); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello",memberEntity,new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; }3、接受消息RabbitMQ为我们提供了监听@RabbitListener监听注解,使用时,必须开启@EnableRabbit功能。不用监听消息队列时,可以不用开启@EnableRabbit注解。 /** * 监听队列是各数组,可以同时监听多个队列 * 通过返回的消息类型知道是一个org.springframework.amqp.core.Message; * @param message */ @RabbitListener(queues={"Family-Hello-Queue"}) public String sendMessage(){ //发送字符串 // rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello","Hello RabbitMQ 。。。",new CorrelationData(UUID.randomUUID().toString())); //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setAddress("rabbitMQ测试-会员地址信息"); memberEntity.setEmail("rabbitMQ测试-会员邮箱,batis@foxmial.com"); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello",memberEntity,new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; } /** * * 通过传入第二个参数spring可以帮我们把之间转的对象获取后转成对象,第二个参数不固定的泛型<T> * 比如上面发送的是一个user对象,那我们第二个参数就是User user,接受到的数据就直接是user了。 * 还可以直接获取到链接中传输数据的通道channel * @param message */ @RabbitListener(queues={"Family-Hello-Queue"}) public void receiveMessageAndObject(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息2:"+message+"消息类型:"+message.getClass()); System.out.println("监听到队列消息2:"+message+"消息内容:"+memberEntity); }Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息注意:1)、同一个消息,只能有一个客户端收到2)、只有一个消息完全处理完,方法运行结束,我们才可以接收到下一个消息3)、@RabbitHandler也可以接受队列消息@RabbitListener可以用在类和方法上@RabbitHandler只能用在方法上。@RabbitHandler应用场景:当发送的数据为不同类型时,@RabbitListener接口消息时,参数类型不知道定义那个。比如:想队列发送的对象有user、People people,那下面接受时,对象不知道写那个public void receiveMessageObject(Message message, 这里该写那个对象, Channel channel)因此@RabbitHandler就可以用上了。通过在类上加上@RabbitHandler注解监听队列消息,在方法上加@RabbitHandler注解,参数就可以指定接受那个数据类型的消息了。简单代码如下:@RabbitListener(queues={"Family-Hello-Queue"}) @Service("memberService") public class MemberServiceImpl extends ServiceImpl<MemberDao, MemberEntity> implements MemberService { //接受队列中指定的People类型数据 @RabbitHandler public void receiveMessageObj(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息:"+message+"消息内容:"+memberEntity); } //接受队列中指定的User类型数据 @RabbitHandler public void receiveMessageObj(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息:"+message+"消息内容:"+memberEntity); } }6、RabbitMQ Java配置Json序列化通过RabbitAutoConfiguration.java中注入的RabbitTemplate看到configure。@Bean @ConditionalOnMissingBean public RabbitTemplateConfigurer rabbitTemplateConfigurer(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) { RabbitTemplateConfigurer configurer = new RabbitTemplateConfigurer(); configurer.setMessageConverter((MessageConverter)messageConverter.getIfUnique()); configurer.setRetryTemplateCustomizers((List)retryTemplateCustomizers.orderedStream().collect(Collectors.toList())); configurer.setRabbitProperties(properties); return configurer; } @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean({RabbitOperations.class}) public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(); configurer.configure(template, connectionFactory); return template; }如果配置中有MessageConverter消息转换器就用configurer.setMessageConverter((MessageConverter)messageConverter.getIfUnique());如果没有就用RabbitTemplate默认配置的。RabbitTemplateConfigurer.java默认配置源码中可以看到默认使用的转换器如下:public MessageConverter getMessageConverter() { return this.messageConverter; } private MessageConverter messageConverter = new SimpleMessageConverter(); 然后通过SimpleMessageConverter可以看到默认创建消息时,使用的消息转换器类型 protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; if (object instanceof byte[]) { bytes = (byte[])((byte[])object); messageProperties.setContentType("application/octet-stream"); } else if (object instanceof String) { try { bytes = ((String)object).getBytes(this.defaultCharset); } catch (UnsupportedEncodingException var6) { throw new MessageConversionException("failed to convert to Message content", var6); } messageProperties.setContentType("text/plain"); messageProperties.setContentEncoding(this.defaultCharset); } else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException var5) { throw new MessageConversionException("failed to convert to serialized Message content", var5); } messageProperties.setContentType("application/x-java-serialized-object"); } if (bytes != null) { messageProperties.setContentLength((long)bytes.length); return new Message(bytes, messageProperties); } else { throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); } }如果是序列化的就是使用的SerializationUtils.serialize(object);else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException var5) { throw new MessageConversionException("failed to convert to serialized Message content", var5); } messageProperties.setContentType("application/x-java-serialized-object");因此我们可以自定义MessageConverter。通过MessageConverter接口实现类型,可以看到json各实现类。因此RabbitMQ自定义配置如下:import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @description: RabbitMQ配置 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/12 17:07 * @version: 1.0 */ @Configuration public class MyRabbitConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }再次发送对象时,从mq里面获取的对象就是json格式了。7、开启消息回调确认机制#开启发送端确认机制,发送到Broke的确认 publisher-confirm-type: correlated #开启时消息发送到队列的消息确认机制 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个ReturnCallback template: mandatory: true #手动确认ack listener: simple: acknowledge-mode: manual8、重点:发送端:package com.yanxizhu.family.users.config; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * @description: RabbitMQ配置 序列化 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/17 16:44 * @version: 1.0 */ @Configuration public class RabbitConfig { private RabbitTemplate rabbitTemplate; /** * rabbitTemplate循环依赖,解决方法:手动初始化模板方法 * @param connectionFactory * @return */ @Primary @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setMessageConverter(messageConverter()); initRabbitTemplate(); return rabbitTemplate; } @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 定制rabbitMaTemplate * 一、发送端消息确认机制,发送给Broke的消息确认回调 * 1、开始消息确认 publisher-confirm-type: correlated * 2、设置确认回调 setConfirmCallback() * 二、发送端消息确认机制,发送给队列的消息确认回调 * 1、开始消息确认 * publisher-returns: true * template: * mandatory: true * 2、设置确认回调 setReturnsCallback * 三、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息) * 1、开启消费端消息确认 * listener: * simple: * acknowledge-mode: manual * 2、消费端默认是自动消息确认的,只要消息手法哦,客户端会自动确认,服务器就会移除这个消息。 * 默认的消息确认(默认ACK)问题: * 我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,服务器宕机了。会发生消息丢失; * 3、因此,需要使用消费者手动确认默认(手动ack),只要我们没有明确高数MQ,消息被消费,就没有ack,消息就一致是unacked状态。 * 即使服务器宕机,消息也不会丢失,会重新变为ready,下一次有新的消费者链接进来就发给它处理。 * 4、手动ack如何签收 * //消息头信息 * MessageProperties messageProperties = message.getMessageProperties(); * long deliveryTag = messageProperties.getDeliveryTag(); * channel.basicAck(deliveryTag,true); //手动确认签收,deliveryTag默认自增的,true是否批量确认签收 * 5、消费退回 * * //参数:long var1, boolean var3(是否批量), boolean var4 * channel.basicNack(deliveryTag,true,true); //可以批量回退 * * //参数:long var1, boolean var3 * channel.basicReject(deliveryTag,true); //单个回退 * * 第三个参数或第二个参数:false 丢弃 true:发回服务器,服务器重新入对。 */ public void initRabbitTemplate(){ //设置确认回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 只要消息抵达服务器 * @param correlationData 当前消息关联的唯一数据(这个消息的唯一id) * @param ack 消息是否成功收到 * @param cause 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //correlationData:注意,这个correlationData就是发送消息时,设置的correlationData。 System.out.println("====确认收到消息了。。。。================"); System.out.println(correlationData+"|"+ack+"|"+cause); } }); //注意:必须要有这一步,不然不生效 rabbitTemplate.setMandatory(true); //设置消息抵达队列回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息没有投递给指定的队列,就触发这个失败回调 * @param message 投递失败的消息详细信息 * @param replyCode 回复的状态码 * @param replyText 回复的文本内容 * @param exchange 当时这个消息发给哪个交换机 * @param routingKey 当时这个消息用的哪个路由键 */ public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("==========投递到队列失败了。。。=========="); System.out.println(message); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); } }); } } 发送消息时,可以将new CorrelationData(UUID.randomUUID().toString()),传的uuid参数保存到数据库,到时候直接遍历数据库即可知道那些投递成功到队列,那些投递失败。说明: * 一、发送端消息确认机制,发送给Broke的消息确认回调 * 1、开始消息确认 publisher-confirm-type: correlated * 2、设置确认回调 setConfirmCallback() * 二、发送端消息确认机制,发送给队列的消息确认回调 * 1、开始消息确认 * publisher-returns: true * template: * mandatory: true * 2、设置确认回调 setReturnsCallback * 三、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息) * 1、开启消费端消息确认 * listener: * simple: * acknowledge-mode: manual * 2、消费端默认是自动消息确认的,只要消息手法哦,客户端会自动确认,服务器就会移除这个消息。 * 默认的消息确认(默认ACK)问题: * 我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,服务器宕机了。会发生消息丢失; * 3、因此,需要使用消费者手动确认默认(手动ack),只要我们没有明确高数MQ,消息被消费,就没有ack,消息就一致是unacked状态。 * 即使服务器宕机,消息也不会丢失,会重新变为ready,下一次有新的消费者链接进来就发给它处理。消费端手动ack /** * * 通过传入第二个参数spring可以帮我们把之间转的对象获取后转成对象,第二个参数不固定的泛型<T> * 比如上面发送的是一个user对象,那我们第二个参数就是User user,接受到的数据就直接是user了。 * 还可以直接获取到链接中传输数据的通道channel * @param message */ @RabbitHandler public void receiveMessageObject(Message message, MemberEntity memberEntity, Channel channel) throws IOException { //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); //手动确认签收 channel.basicAck(deliveryTag,true); System.out.println("监听到队列消息3:"+message+"消息类型:"+message.getClass()); System.out.println("监听到队列消息3:"+message+"消息内容:"+memberEntity); //回退 //可以批量回退 // channel.basicNack(deliveryTag,true,true); //单个回退 //true重新排队,false丢弃 // channel.basicReject(deliveryTag,true); }说明:手动ack如何签收 //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); channel.basicAck(deliveryTag,true); //手动确认签收,deliveryTag默认自增的,true是否批量确认签收消费退回 //参数:long var1, boolean var3(是否批量), boolean var4 channel.basicNack(deliveryTag,true,true); //可以批量回退 //参数:long var1, boolean var3 channel.basicReject(deliveryTag,true); //单个回退 第三个参数或第二个参数:false 丢弃 true:发回服务器,服务器重新入对。最后rabbitMQ配置:spring: rabbitmq: host: 192.168.56.10 virtual-host: / port: 5672 #开启发送端确认 publisher-confirm-type: correlated #开启发送端消息抵达队列的确认 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个retureconfirm template: mandatory: true #开启手动确认 listener: simple: acknowledge-mode: manualSpringBoot中使用延时队列1、Queue、Exchange、Binding可以@Bean进去 2、监听消息的方法可以有三种参数(不分数量,顺序) Object content, Message message, Channel channel 3、channel可以用来拒绝消息,否则自动ack;代码实现发送消息到MQ: public String sendDelayMessage(){ //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setId(11111111L); memberEntity.setAddress("死信队列测试"); memberEntity.setEmail("延迟队列,异步处理会员信息"); memberEntity.setCreateTime(new Date()); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("member-event-exchange","users.create.member",memberEntity, new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; }通过延迟队列,将延迟的会员信息,发送到另一个队列,通过监听过期延迟的队列,接收延迟的会员信息。package com.yanxizhu.family.users.config; import com.rabbitmq.client.Channel; import com.yanxizhu.family.users.entity.MemberEntity; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * @description: 延迟队列 * 说明: 1、首先通过创建的路由键(users.create.member),将消息从交换机(member-event-exchange)发送到死信队列(member.delay.queue)。 * 2、根据死信队列(member.delay.queue)的配置,过期后,消息将通过路由键为(users.release.member)转发到交换机(member-event-exchange)。 * 3、最后交换机(member-event-exchange)再通过绑定的最终队列(users.release.member.queue)和路由键(users.release.member)转发到最终队列(member-event-exchange)。 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/17 20:47 * @version: 1.0 */ @Configuration public class DelayQueueConfig { /** * 监听最终的队列 * @param entity * @param channel * @param message * @throws IOException */ @RabbitListener(queues="member.release.queue") public void listener(MemberEntity entity, Channel channel, Message message) throws IOException { System.out.println("收到过期的会员信息:准备关闭会员"+entity.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } //@bean自动创建交换机、路由、绑定 //交换机 @Bean public Exchange orderEventExchange(){ return new TopicExchange("member-event-exchange", true, false); } /** * 容器中的Binding、Queue、Exchange都会自动创建(RabbitMQ没有的情况) * @return */ //死信队列 @Bean public Queue memberDelayQueue(){ Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "member-event-exchange");//交换机 arguments.put("x-dead-letter-routing-key", "users.release.member");//路由键 arguments.put("x-message-ttl", 60000);//过期时间 //public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments) Queue queue = new Queue("member.delay.queue",true,false,false,arguments); return queue; } //将死信队列和交换机绑定,路由键-users.create.member @Bean public Binding orderCreateOrderBingding(){ //目的地、绑定类型、交换机、路由键、参数 //public Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments) return new Binding("member.delay.queue", Binding.DestinationType.QUEUE, "member-event-exchange", "users.create.member", null); } //最后过期的队列 @Bean public Queue orderReleaseQueue(){ Queue queue = new Queue("member.release.queue",true,false,false); return queue; } //将最后的队列和交换机绑定,路由键-user.release.member @Bean public Binding orderReleaseOrderBingding(){ return new Binding("member.release.queue", Binding.DestinationType.QUEUE, "member-event-exchange", "users.release.member", null); } }
2022年03月13日
253 阅读
0 评论
9 点赞
1
2