首页
关于
友链
Search
1
wlop 4K 壁纸 4k8k 动态 壁纸
1,608 阅读
2
Nacos持久化MySQL问题-解决方案
1,014 阅读
3
Docker搭建Typecho博客
815 阅读
4
滑动时间窗口算法
808 阅读
5
ChatGPT注册 OpenAI's services are not available in your country 解决方法
790 阅读
生活
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
安全
开发工具
百度网盘
天翼网盘
阿里网盘
登录
Search
标签搜索
java
javase
docker
java8
springboot
thread
spring
分布式
mysql
锁
linux
redis
源码
typecho
centos
git
map
RabbitMQ
lambda
stream
少年
累计撰写
189
篇文章
累计收到
26
条评论
首页
栏目
生活
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
安全
开发工具
百度网盘
天翼网盘
阿里网盘
页面
关于
友链
搜索到
23
篇与
的结果
2022-03-13
SpringBoot整合RabbitMQ
SpringBoot整合RabbitMQ1、引入依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.6.3</version> </dependency>2、配置rabbitmq通过RabbitProperties.properties配置文件配置链接 rabbitmq: host: 192.168.56.10 virtual-host: / port: 5672 publisher-confirm-type: correlated publisher-returns: true template: mandatory: true重点:#开启发送端确认机制,发送到Broke的确认 publisher-confirm-type: correlated #开启时消息发送到队列的消息确认机制 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个Returnconfirm template: mandatory: true3、开启Rabbit启动类添加@EnableRabbit注解@EnableRabbit @SpringBootApplication public class FamilyBookingApplication { public static void main(String[] args) { SpringApplication.run(FamilyBookingApplication.class, args); } }4、Rabbit使用现在RabbitAutoConfiguration就生效了,就有了组件RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate5、使用测试1、创建Exchange、Queue、Binding使用AmqpAdmin创建。通过创建各交换机构造方法,找到对应接口的实现,传入参数即可创建Exchange。public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) @Autowired AmqpAdmin amqpAdmin; /** * 创建Exchange交换机 */ @Test public void craeteChage() { /** * public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) */ DirectExchange directExchange = new DirectExchange("Family-Hello-Exchange",true,false); amqpAdmin.declareExchange(directExchange); } /** * 创建Queue交换机 */ @Test public void craetQueue() { /** * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments) */ Queue queue = new Queue("Family-Hello-Queue", true,false, false); amqpAdmin.declareQueue(queue); } /** * 创建Bind交换机 */ @Test public void craetBind() { /** * public Binding(String destination, 绑定目标 * Binding.DestinationType destinationType, 绑定类型 * String exchange, 交换机 * String routingKey, router-key * @Nullable Map<String, Object> arguments 参数) { */ Binding binding = new Binding("Family-Hello-Queue", Binding.DestinationType.QUEUE, "Family-Hello-Exchange", "Java.hello", null); amqpAdmin.declareBinding(binding); }上面为DirectExchange类型交换机的创建,其它类型Exchange类似。更多API可参考AmqpAdmin的各方法。2、收发消息发送消息 /** * rabbitTemplate 发送消息 */ @Test public String sendMessage(){ //发送字符串 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello","Hello RabbitMQ 。。。",new CorrelationData(UUID.randomUUID().toString())); //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setAddress("rabbitMQ测试-会员地址信息"); memberEntity.setEmail("rabbitMQ测试-会员邮箱,batis@foxmial.com"); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello",memberEntity,new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; }3、接受消息RabbitMQ为我们提供了监听@RabbitListener监听注解,使用时,必须开启@EnableRabbit功能。不用监听消息队列时,可以不用开启@EnableRabbit注解。 /** * 监听队列是各数组,可以同时监听多个队列 * 通过返回的消息类型知道是一个org.springframework.amqp.core.Message; * @param message */ @RabbitListener(queues={"Family-Hello-Queue"}) public String sendMessage(){ //发送字符串 // rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello","Hello RabbitMQ 。。。",new CorrelationData(UUID.randomUUID().toString())); //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setAddress("rabbitMQ测试-会员地址信息"); memberEntity.setEmail("rabbitMQ测试-会员邮箱,batis@foxmial.com"); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello",memberEntity,new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; } /** * * 通过传入第二个参数spring可以帮我们把之间转的对象获取后转成对象,第二个参数不固定的泛型<T> * 比如上面发送的是一个user对象,那我们第二个参数就是User user,接受到的数据就直接是user了。 * 还可以直接获取到链接中传输数据的通道channel * @param message */ @RabbitListener(queues={"Family-Hello-Queue"}) public void receiveMessageAndObject(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息2:"+message+"消息类型:"+message.getClass()); System.out.println("监听到队列消息2:"+message+"消息内容:"+memberEntity); }Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息注意:1)、同一个消息,只能有一个客户端收到2)、只有一个消息完全处理完,方法运行结束,我们才可以接收到下一个消息3)、@RabbitHandler也可以接受队列消息@RabbitListener可以用在类和方法上@RabbitHandler只能用在方法上。@RabbitHandler应用场景:当发送的数据为不同类型时,@RabbitListener接口消息时,参数类型不知道定义那个。比如:想队列发送的对象有user、People people,那下面接受时,对象不知道写那个public void receiveMessageObject(Message message, 这里该写那个对象, Channel channel)因此@RabbitHandler就可以用上了。通过在类上加上@RabbitHandler注解监听队列消息,在方法上加@RabbitHandler注解,参数就可以指定接受那个数据类型的消息了。简单代码如下:@RabbitListener(queues={"Family-Hello-Queue"}) @Service("memberService") public class MemberServiceImpl extends ServiceImpl<MemberDao, MemberEntity> implements MemberService { //接受队列中指定的People类型数据 @RabbitHandler public void receiveMessageObj(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息:"+message+"消息内容:"+memberEntity); } //接受队列中指定的User类型数据 @RabbitHandler public void receiveMessageObj(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息:"+message+"消息内容:"+memberEntity); } }6、RabbitMQ Java配置Json序列化通过RabbitAutoConfiguration.java中注入的RabbitTemplate看到configure。@Bean @ConditionalOnMissingBean public RabbitTemplateConfigurer rabbitTemplateConfigurer(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) { RabbitTemplateConfigurer configurer = new RabbitTemplateConfigurer(); configurer.setMessageConverter((MessageConverter)messageConverter.getIfUnique()); configurer.setRetryTemplateCustomizers((List)retryTemplateCustomizers.orderedStream().collect(Collectors.toList())); configurer.setRabbitProperties(properties); return configurer; } @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean({RabbitOperations.class}) public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(); configurer.configure(template, connectionFactory); return template; }如果配置中有MessageConverter消息转换器就用configurer.setMessageConverter((MessageConverter)messageConverter.getIfUnique());如果没有就用RabbitTemplate默认配置的。RabbitTemplateConfigurer.java默认配置源码中可以看到默认使用的转换器如下:public MessageConverter getMessageConverter() { return this.messageConverter; } private MessageConverter messageConverter = new SimpleMessageConverter(); 然后通过SimpleMessageConverter可以看到默认创建消息时,使用的消息转换器类型 protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; if (object instanceof byte[]) { bytes = (byte[])((byte[])object); messageProperties.setContentType("application/octet-stream"); } else if (object instanceof String) { try { bytes = ((String)object).getBytes(this.defaultCharset); } catch (UnsupportedEncodingException var6) { throw new MessageConversionException("failed to convert to Message content", var6); } messageProperties.setContentType("text/plain"); messageProperties.setContentEncoding(this.defaultCharset); } else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException var5) { throw new MessageConversionException("failed to convert to serialized Message content", var5); } messageProperties.setContentType("application/x-java-serialized-object"); } if (bytes != null) { messageProperties.setContentLength((long)bytes.length); return new Message(bytes, messageProperties); } else { throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); } }如果是序列化的就是使用的SerializationUtils.serialize(object);else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException var5) { throw new MessageConversionException("failed to convert to serialized Message content", var5); } messageProperties.setContentType("application/x-java-serialized-object");因此我们可以自定义MessageConverter。通过MessageConverter接口实现类型,可以看到json各实现类。因此RabbitMQ自定义配置如下:import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @description: RabbitMQ配置 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/12 17:07 * @version: 1.0 */ @Configuration public class MyRabbitConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }再次发送对象时,从mq里面获取的对象就是json格式了。7、开启消息回调确认机制#开启发送端确认机制,发送到Broke的确认 publisher-confirm-type: correlated #开启时消息发送到队列的消息确认机制 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个ReturnCallback template: mandatory: true #手动确认ack listener: simple: acknowledge-mode: manual8、重点:发送端:package com.yanxizhu.family.users.config; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * @description: RabbitMQ配置 序列化 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/17 16:44 * @version: 1.0 */ @Configuration public class RabbitConfig { private RabbitTemplate rabbitTemplate; /** * rabbitTemplate循环依赖,解决方法:手动初始化模板方法 * @param connectionFactory * @return */ @Primary @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setMessageConverter(messageConverter()); initRabbitTemplate(); return rabbitTemplate; } @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 定制rabbitMaTemplate * 一、发送端消息确认机制,发送给Broke的消息确认回调 * 1、开始消息确认 publisher-confirm-type: correlated * 2、设置确认回调 setConfirmCallback() * 二、发送端消息确认机制,发送给队列的消息确认回调 * 1、开始消息确认 * publisher-returns: true * template: * mandatory: true * 2、设置确认回调 setReturnsCallback * 三、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息) * 1、开启消费端消息确认 * listener: * simple: * acknowledge-mode: manual * 2、消费端默认是自动消息确认的,只要消息手法哦,客户端会自动确认,服务器就会移除这个消息。 * 默认的消息确认(默认ACK)问题: * 我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,服务器宕机了。会发生消息丢失; * 3、因此,需要使用消费者手动确认默认(手动ack),只要我们没有明确高数MQ,消息被消费,就没有ack,消息就一致是unacked状态。 * 即使服务器宕机,消息也不会丢失,会重新变为ready,下一次有新的消费者链接进来就发给它处理。 * 4、手动ack如何签收 * //消息头信息 * MessageProperties messageProperties = message.getMessageProperties(); * long deliveryTag = messageProperties.getDeliveryTag(); * channel.basicAck(deliveryTag,true); //手动确认签收,deliveryTag默认自增的,true是否批量确认签收 * 5、消费退回 * * //参数:long var1, boolean var3(是否批量), boolean var4 * channel.basicNack(deliveryTag,true,true); //可以批量回退 * * //参数:long var1, boolean var3 * channel.basicReject(deliveryTag,true); //单个回退 * * 第三个参数或第二个参数:false 丢弃 true:发回服务器,服务器重新入对。 */ public void initRabbitTemplate(){ //设置确认回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 只要消息抵达服务器 * @param correlationData 当前消息关联的唯一数据(这个消息的唯一id) * @param ack 消息是否成功收到 * @param cause 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //correlationData:注意,这个correlationData就是发送消息时,设置的correlationData。 System.out.println("====确认收到消息了。。。。================"); System.out.println(correlationData+"|"+ack+"|"+cause); } }); //注意:必须要有这一步,不然不生效 rabbitTemplate.setMandatory(true); //设置消息抵达队列回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息没有投递给指定的队列,就触发这个失败回调 * @param message 投递失败的消息详细信息 * @param replyCode 回复的状态码 * @param replyText 回复的文本内容 * @param exchange 当时这个消息发给哪个交换机 * @param routingKey 当时这个消息用的哪个路由键 */ public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("==========投递到队列失败了。。。=========="); System.out.println(message); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); } }); } } 发送消息时,可以将new CorrelationData(UUID.randomUUID().toString()),传的uuid参数保存到数据库,到时候直接遍历数据库即可知道那些投递成功到队列,那些投递失败。说明: * 一、发送端消息确认机制,发送给Broke的消息确认回调 * 1、开始消息确认 publisher-confirm-type: correlated * 2、设置确认回调 setConfirmCallback() * 二、发送端消息确认机制,发送给队列的消息确认回调 * 1、开始消息确认 * publisher-returns: true * template: * mandatory: true * 2、设置确认回调 setReturnsCallback * 三、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息) * 1、开启消费端消息确认 * listener: * simple: * acknowledge-mode: manual * 2、消费端默认是自动消息确认的,只要消息手法哦,客户端会自动确认,服务器就会移除这个消息。 * 默认的消息确认(默认ACK)问题: * 我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,服务器宕机了。会发生消息丢失; * 3、因此,需要使用消费者手动确认默认(手动ack),只要我们没有明确高数MQ,消息被消费,就没有ack,消息就一致是unacked状态。 * 即使服务器宕机,消息也不会丢失,会重新变为ready,下一次有新的消费者链接进来就发给它处理。消费端手动ack /** * * 通过传入第二个参数spring可以帮我们把之间转的对象获取后转成对象,第二个参数不固定的泛型<T> * 比如上面发送的是一个user对象,那我们第二个参数就是User user,接受到的数据就直接是user了。 * 还可以直接获取到链接中传输数据的通道channel * @param message */ @RabbitHandler public void receiveMessageObject(Message message, MemberEntity memberEntity, Channel channel) throws IOException { //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); //手动确认签收 channel.basicAck(deliveryTag,true); System.out.println("监听到队列消息3:"+message+"消息类型:"+message.getClass()); System.out.println("监听到队列消息3:"+message+"消息内容:"+memberEntity); //回退 //可以批量回退 // channel.basicNack(deliveryTag,true,true); //单个回退 //true重新排队,false丢弃 // channel.basicReject(deliveryTag,true); }说明:手动ack如何签收 //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); channel.basicAck(deliveryTag,true); //手动确认签收,deliveryTag默认自增的,true是否批量确认签收消费退回 //参数:long var1, boolean var3(是否批量), boolean var4 channel.basicNack(deliveryTag,true,true); //可以批量回退 //参数:long var1, boolean var3 channel.basicReject(deliveryTag,true); //单个回退 第三个参数或第二个参数:false 丢弃 true:发回服务器,服务器重新入对。最后rabbitMQ配置:spring: rabbitmq: host: 192.168.56.10 virtual-host: / port: 5672 #开启发送端确认 publisher-confirm-type: correlated #开启发送端消息抵达队列的确认 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个retureconfirm template: mandatory: true #开启手动确认 listener: simple: acknowledge-mode: manualSpringBoot中使用延时队列1、Queue、Exchange、Binding可以@Bean进去 2、监听消息的方法可以有三种参数(不分数量,顺序) Object content, Message message, Channel channel 3、channel可以用来拒绝消息,否则自动ack;代码实现发送消息到MQ: public String sendDelayMessage(){ //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setId(11111111L); memberEntity.setAddress("死信队列测试"); memberEntity.setEmail("延迟队列,异步处理会员信息"); memberEntity.setCreateTime(new Date()); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("member-event-exchange","users.create.member",memberEntity, new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; }通过延迟队列,将延迟的会员信息,发送到另一个队列,通过监听过期延迟的队列,接收延迟的会员信息。package com.yanxizhu.family.users.config; import com.rabbitmq.client.Channel; import com.yanxizhu.family.users.entity.MemberEntity; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * @description: 延迟队列 * 说明: 1、首先通过创建的路由键(users.create.member),将消息从交换机(member-event-exchange)发送到死信队列(member.delay.queue)。 * 2、根据死信队列(member.delay.queue)的配置,过期后,消息将通过路由键为(users.release.member)转发到交换机(member-event-exchange)。 * 3、最后交换机(member-event-exchange)再通过绑定的最终队列(users.release.member.queue)和路由键(users.release.member)转发到最终队列(member-event-exchange)。 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/17 20:47 * @version: 1.0 */ @Configuration public class DelayQueueConfig { /** * 监听最终的队列 * @param entity * @param channel * @param message * @throws IOException */ @RabbitListener(queues="member.release.queue") public void listener(MemberEntity entity, Channel channel, Message message) throws IOException { System.out.println("收到过期的会员信息:准备关闭会员"+entity.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } //@bean自动创建交换机、路由、绑定 //交换机 @Bean public Exchange orderEventExchange(){ return new TopicExchange("member-event-exchange", true, false); } /** * 容器中的Binding、Queue、Exchange都会自动创建(RabbitMQ没有的情况) * @return */ //死信队列 @Bean public Queue memberDelayQueue(){ Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "member-event-exchange");//交换机 arguments.put("x-dead-letter-routing-key", "users.release.member");//路由键 arguments.put("x-message-ttl", 60000);//过期时间 //public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments) Queue queue = new Queue("member.delay.queue",true,false,false,arguments); return queue; } //将死信队列和交换机绑定,路由键-users.create.member @Bean public Binding orderCreateOrderBingding(){ //目的地、绑定类型、交换机、路由键、参数 //public Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments) return new Binding("member.delay.queue", Binding.DestinationType.QUEUE, "member-event-exchange", "users.create.member", null); } //最后过期的队列 @Bean public Queue orderReleaseQueue(){ Queue queue = new Queue("member.release.queue",true,false,false); return queue; } //将最后的队列和交换机绑定,路由键-user.release.member @Bean public Binding orderReleaseOrderBingding(){ return new Binding("member.release.queue", Binding.DestinationType.QUEUE, "member-event-exchange", "users.release.member", null); } }
2022年03月13日
629 阅读
0 评论
9 点赞
2022-03-12
SpringBoot整合seata分布式事务(不适用高并发场景)
Springboot整合seata分布式事务一、创建seata日志表-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;二、安装事务协调器(seata-server)1、下载地址 2、导入依赖<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency>3、启动seata服务器4、seata文件说明registry.conf:注册中心配置。这里执行注册中西为nacos。type = "nacos",config {执行seata配置数据放在那里,这里默认使用文件放配置数据。 type = "file"file.conf:seata默认配置信息存放这里。例如,事务日志存放地方配置## transaction log store, only used in server side store { ## store mode: file、db mode = "file" ## file store property file { ## store location dir dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions maxBranchSessionSize = 16384 # globe session size , if exceeded throws exceptions maxGlobalSessionSize = 512 # file buffer size , if exceeded allocate new buffer fileWriteBufferCacheSize = 16384 # when recover batch read size sessionReloadReadSize = 100 # async, sync flushDiskMode = async } ## database store property db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. datasource = "druid" ## mysql/oracle/postgresql/h2/oceanbase etc. dbType = "mysql" driverClassName = "com.mysql.jdbc.Driver" ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param url = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true" user = "mysql" password = "mysql" minConn = 5 maxConn = 30 globalTable = "global_table" branchTable = "branch_table" lockTable = "lock_table" queryLimit = 100 } }三、自定义代理数据源注意:所有想要用到分布式事务的微服务使用seata DataSourceProxy代理自己的数据源。springboot默认使用的是Hikari数据源。DataSourceAutoConfiguration.java源代码:// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.jdbc; import javax.sql.DataSource; import javax.sql.XADataSource; import org.springframework.boot.autoconfigure.condition.AnyNestedCondition; import org.springframework.boot.autoconfigure.condition.ConditionMessage; import org.springframework.boot.autoconfigure.condition.ConditionOutcome; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.SpringBootCondition; import org.springframework.boot.autoconfigure.condition.ConditionMessage.Builder; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.Dbcp2; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.Generic; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.Hikari; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.OracleUcp; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.Tomcat; import org.springframework.boot.autoconfigure.jdbc.DataSourceInitializationConfiguration.InitializationSpecificCredentialsDataSourceInitializationConfiguration; import org.springframework.boot.autoconfigure.jdbc.DataSourceInitializationConfiguration.SharedCredentialsDataSourceInitializationConfiguration; import org.springframework.boot.autoconfigure.jdbc.metadata.DataSourcePoolMetadataProvidersConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.boot.jdbc.EmbeddedDatabaseConnection; import org.springframework.context.annotation.Condition; import org.springframework.context.annotation.ConditionContext; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.ConfigurationCondition.ConfigurationPhase; import org.springframework.core.env.Environment; import org.springframework.core.type.AnnotatedTypeMetadata; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; import org.springframework.util.StringUtils; @Configuration( proxyBeanMethods = false ) @ConditionalOnClass({DataSource.class, EmbeddedDatabaseType.class}) @ConditionalOnMissingBean( type = {"io.r2dbc.spi.ConnectionFactory"} ) @EnableConfigurationProperties({DataSourceProperties.class}) @Import({DataSourcePoolMetadataProvidersConfiguration.class, InitializationSpecificCredentialsDataSourceInitializationConfiguration.class, SharedCredentialsDataSourceInitializationConfiguration.class}) public class DataSourceAutoConfiguration { public DataSourceAutoConfiguration() { } static class EmbeddedDatabaseCondition extends SpringBootCondition { private static final String DATASOURCE_URL_PROPERTY = "spring.datasource.url"; private final SpringBootCondition pooledCondition = new DataSourceAutoConfiguration.PooledDataSourceCondition(); EmbeddedDatabaseCondition() { } public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) { Builder message = ConditionMessage.forCondition("EmbeddedDataSource", new Object[0]); if (this.hasDataSourceUrlProperty(context)) { return ConditionOutcome.noMatch(message.because("spring.datasource.url is set")); } else if (this.anyMatches(context, metadata, new Condition[]{this.pooledCondition})) { return ConditionOutcome.noMatch(message.foundExactly("supported pooled data source")); } else { EmbeddedDatabaseType type = EmbeddedDatabaseConnection.get(context.getClassLoader()).getType(); return type == null ? ConditionOutcome.noMatch(message.didNotFind("embedded database").atAll()) : ConditionOutcome.match(message.found("embedded database").items(new Object[]{type})); } } private boolean hasDataSourceUrlProperty(ConditionContext context) { Environment environment = context.getEnvironment(); if (environment.containsProperty("spring.datasource.url")) { try { return StringUtils.hasText(environment.getProperty("spring.datasource.url")); } catch (IllegalArgumentException var4) { } } return false; } } static class PooledDataSourceAvailableCondition extends SpringBootCondition { PooledDataSourceAvailableCondition() { } public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) { Builder message = ConditionMessage.forCondition("PooledDataSource", new Object[0]); return DataSourceBuilder.findType(context.getClassLoader()) != null ? ConditionOutcome.match(message.foundExactly("supported DataSource")) : ConditionOutcome.noMatch(message.didNotFind("supported DataSource").atAll()); } } static class PooledDataSourceCondition extends AnyNestedCondition { PooledDataSourceCondition() { super(ConfigurationPhase.PARSE_CONFIGURATION); } @Conditional({DataSourceAutoConfiguration.PooledDataSourceAvailableCondition.class}) static class PooledDataSourceAvailable { PooledDataSourceAvailable() { } } @ConditionalOnProperty( prefix = "spring.datasource", name = {"type"} ) static class ExplicitType { ExplicitType() { } } } @Configuration( proxyBeanMethods = false ) @Conditional({DataSourceAutoConfiguration.PooledDataSourceCondition.class}) @ConditionalOnMissingBean({DataSource.class, XADataSource.class}) @Import({Hikari.class, Tomcat.class, Dbcp2.class, OracleUcp.class, Generic.class, DataSourceJmxConfiguration.class}) protected static class PooledDataSourceConfiguration { protected PooledDataSourceConfiguration() { } } @Configuration( proxyBeanMethods = false ) @Conditional({DataSourceAutoConfiguration.EmbeddedDatabaseCondition.class}) @ConditionalOnMissingBean({DataSource.class, XADataSource.class}) @Import({EmbeddedDataSourceConfiguration.class}) protected static class EmbeddedDatabaseConfiguration { protected EmbeddedDatabaseConfiguration() { } } } 导入很多数据源 @Import({Hikari.class, Tomcat.class, Dbcp2.class, OracleUcp.class, Generic.class, DataSourceJmxConfiguration.class})DataSourceConfiguration.java源代码@Configuration( proxyBeanMethods = false ) @ConditionalOnClass({HikariDataSource.class}) @ConditionalOnMissingBean({DataSource.class}) @ConditionalOnProperty( name = {"spring.datasource.type"}, havingValue = "com.zaxxer.hikari.HikariDataSource", matchIfMissing = true ) static class Hikari { Hikari() { } @Bean @ConfigurationProperties( prefix = "spring.datasource.hikari" ) HikariDataSource dataSource(DataSourceProperties properties) { HikariDataSource dataSource = (HikariDataSource)DataSourceConfiguration.createDataSource(properties, HikariDataSource.class); if (StringUtils.hasText(properties.getName())) { dataSource.setPoolName(properties.getName()); } return dataSource; } }自定义代理数据源配置:import com.zaxxer.hikari.HikariDataSource; import io.seata.rm.datasource.DataSourceProxy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; import javax.sql.DataSource; /** * @description: Seata自定义代理数据源 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/12 14:41 * @version: 1.0 */ @Configuration public class MySeataConfig { @Autowired DataSourceProperties dataSourceProperties; @Bean public DataSource dataSource(DataSourceProperties dataSourceProperties){ HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build(); if (StringUtils.hasText(dataSourceProperties.getName())) { dataSource.setPoolName(dataSourceProperties.getName()); } return new DataSourceProxy(dataSource); } }四、复制seata配置文件到项目resources下主要复制seata里面的模板文件file.conf、registry.conf注意:file.conf 的 service.vgroup_mapping 配置必须和spring.application.name一致。vgroup_mapping.{应用名称}-fescar-service-group = "default"service{ vgroup_mapping.family-booking-fescar-service-group = "default" }family-booking:微服务名字也可以通过配置 spring.cloud.alibaba.seata.tx-service-group修改后缀,但是必须和file.conf中的配置保持一致五、使用在分布式事务入口方法加上全局事务注解@GlobalTransactional,远程调用方法上加本地事务注解@Transactional即可。@GlobalTransactional @Transactional public PageUtils queryPage(Map<String, Object> params) { //调用远程方法,另一个微服务的方法加上小事务@Transactional }六、使用场景不适用高并发的场景,适用普通的业务远程调用。seata默认是使用的AT模式。针对高并发场景还是的用柔性事务,消息队列、延迟队列,MQ中间件完成。
2022年03月12日
401 阅读
0 评论
8 点赞
2022-03-12
Springboot同一Server类方法调用事务解决方案
Springboot同一Server类方法调用事务解决方案1、引入springboot-aop start<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency>主要是使用里面的动态代理 <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>1.9.7</version> <scope>compile</scope> </dependency>2、开启动态代理@EnableAspectJAutoProxy(exposeProxy = true) @EnableDiscoveryClient @SpringBootApplication public class FamilyBookingApplication { public static void main(String[] args) { SpringApplication.run(FamilyBookingApplication.class, args); } }@EnableAspectJAutoProxy(exposeProxy = true):开启aspectj动态代理功能。以后所有的动态代理都是aspectj对象暴露代理对象。3、本类互调用代理对象调用@Service("userService") public class UserServiceImpl implements userService { @Transactional public void a(){ UserService userService = (UserService)AopContext.currentProxy(); userService.b(); userService.c(); } @Transactional public void b(){ } @Transactional public void c(){ } }
2022年03月12日
275 阅读
0 评论
4 点赞
2022-03-10
Springboot拦截器配置
Springboot拦截器配置1、拦截器配置,主要实现HandlerInterceptor接口import org.springframework.stereotype.Component; import org.springframework.web.servlet.HandlerInterceptor; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * @description: 拦截器,配合FamilyWebConfig 配置使用 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/10 22:36 * @version: 1.0 */ @Component public class LoginUserInterceptor implements HandlerInterceptor { public static ThreadLocal<Object> objUser = new ThreadLocal<>(); @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { Object loginusr = request.getSession().getAttribute("LOGIN_USR"); if(loginusr != null){ objUser.set(loginusr); //登录了才能访问 return true; }else{ //没登录,跳转去登录 return false; } } } 由于拦截器需要配合web配置使用,因此创建web配置。2、web配置主要实现WebMvcConfigurer接口import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; /** * @description: WEB配置 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/10 22:40 * @version: 1.0 */ public class FamilyWebConfig implements WebMvcConfigurer { @Autowired LoginUserInterceptor loginUserInterceptor; @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(loginUserInterceptor).addPathPatterns("/**"); } }
2022年03月10日
244 阅读
0 评论
2 点赞
2022-03-03
springboot整合redis
适合当如缓存场景:即时性、数据一致性要求不高的。访问量大且更新频率不高的数据(读多,写少)承担持久化工作。读模式缓存使用流程:凡是放入缓存中的数据,我们应该指定过期时间,使其可以再系统即使没有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃导致的数据永久不一致问题。解决分布式缓存中本地缓存导致数据不一致问题,可以使用redis中间件解决。springboot整合redis1、引入springboot整合的start:pom文件引入依赖 <!--redis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>引入依赖之后就会有RedisAutoConfiguration,里面可以看的到redis的配置文件Redis.Properties.@Configuration( proxyBeanMethods = false ) @ConditionalOnClass({RedisOperations.class}) @EnableConfigurationProperties({RedisProperties.class}) @Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class}) public class RedisAutoConfiguration { public RedisAutoConfiguration() { } @Bean @ConditionalOnMissingBean( name = {"redisTemplate"} ) @ConditionalOnSingleCandidate(RedisConnectionFactory.class) public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> template = new RedisTemplate(); template.setConnectionFactory(redisConnectionFactory); return template; } @Bean @ConditionalOnMissingBean @ConditionalOnSingleCandidate(RedisConnectionFactory.class) public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) { return new StringRedisTemplate(redisConnectionFactory); } }Redis.Properties文件里面包含了redis的配置信息:@ConfigurationProperties( prefix = "spring.redis" ) public class RedisProperties { private int database = 0; private String url; private String host = "localhost"; private String username; private String password; private int port = 6379; private boolean ssl; private Duration timeout; private Duration connectTimeout; private String clientName; private RedisProperties.ClientType clientType; private RedisProperties.Sentinel sentinel; private RedisProperties.Cluster cluster; private final RedisProperties.Jedis jedis = new RedisProperties.Jedis(); private final RedisProperties.Lettuce lettuce = new RedisProperties.Lettuce();2、redis配置,可以根据上面的Redis.Properties类型,再applicatioin.yml中配置相关属性。spring: redis: host: 127.0.0.1 port: 6379上面一个最简单的redis配置就完成了。3、使用redis由上面的RedisAutoConfiguration自动配置类,可以看到自动装配了RedisTemplate<Object, Object>、StringRedisTemplate。RedisTemplate<Object, Object>:一般是字符串的Key,和序列化后的字符串value。由于使用String类型的key、value较多,所以还提供了StringRedisTemplate。public class StringRedisTemplate extends RedisTemplate<String, String> { public StringRedisTemplate() { this.setKeySerializer(RedisSerializer.string()); this.setValueSerializer(RedisSerializer.string()); this.setHashKeySerializer(RedisSerializer.string()); this.setHashValueSerializer(RedisSerializer.string()); } public StringRedisTemplate(RedisConnectionFactory connectionFactory) { this(); this.setConnectionFactory(connectionFactory); this.afterPropertiesSet(); } protected RedisConnection preProcessConnection(RedisConnection connection, boolean existingConnection) { return new DefaultStringRedisConnection(connection); } }StringRedisTemplate也是继承了RedisTemplate<String, String>,都是字符串的key,value.this.setKeySerializer(RedisSerializer.string());this.setValueSerializer(RedisSerializer.string());this.setHashKeySerializer(RedisSerializer.string());this.setHashValueSerializer(RedisSerializer.string());上面的key、value都是用RedisSerializer.string()类型序列化。使用配置好的StringRedisTemplate。StringRedisTemplate的value由多种不同类型的值。保存获取值: @Test public void testStringRedisTemplate(){ ValueOperations<String, String> opsForValue = stringRedisTemplate.opsForValue(); opsForValue.set("hello", "world"+ UUID.randomUUID().toString()); String hello = opsForValue.get("hello"); System.out.println("之前保存值为:"+hello); }输出结果:之前保存值为:world1e9f395e-67b8-4077-9912-c690c7da0f06redis数据库的值:注意保存获取时,key必须是一样的。value一般是序列化后的json字符串,因为json字符串是跨语言跨平台的。vlaue存复杂对象时,序列化可以用alibaba的fastjson。Map<String,List<UserEntity>> data:要存入redis的复杂数据,通过序列化后存在redis。 String s = JSON.toJSONString(data); ValueOperations<String, String> ops = redisTemplate.opsForValue(); ops.set("mydata",s);同样获取的json数据也需要反序列化后才能使用:比如转化成一个复杂数据格式String jsonStr= ops.get("mydata"); Map<String,List<UserEntity>> result = JSON.parseObject(jsonStr, new TypeReference<Map<String,List<UserEntity>>>() {});注意:springboot2.0后默认使用lettuce作为操作redis的客户端,使用netty进行网络通信。但是lettuce的bug导致netty容易堆外内存溢出。netty如果没有指定堆外内存,默认使用-Xmx300m。可以通过-Dio.netty.maxDirectMemory设置堆外内存大小,但是始终会出现堆外内存溢出。 private static void incrementMemoryCounter(int capacity) { if (DIRECT_MEMORY_COUNTER != null) { long newUsedMemory = DIRECT_MEMORY_COUNTER.addAndGet((long)capacity); if (newUsedMemory > DIRECT_MEMORY_LIMIT) { DIRECT_MEMORY_COUNTER.addAndGet((long)(-capacity)); throw new OutOfDirectMemoryError("failed to allocate " + capacity + " byte(s) of direct memory (used: " + (newUsedMemory - (long)capacity) + ", max: " + DIRECT_MEMORY_LIMIT + ')'); } } }解决方案:不能只用-Dio.netty.maxDirectMemory去调大内存。1、升级netty客户端。2、切换使用jedis如何使用jedisredis默认使用的是lettuce,因此需要排除掉lettuce,引入jedis。 <!--引入redis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion><!--排除lettuce--> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <!--引入jedis--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency>lettuce、jedis都是操作redis的底层客户端。RedisTemplate是对lettuce、jedis对redis操作的再次封装,以后操作都可以用RedisTemplate进行操作redis。通过redis的自动装配可以看到是引入了lettuce,jedis的。@Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class}) public class RedisAutoConfiguration {通过jedis可以看到。class JedisConnectionConfiguration extends RedisConnectionConfiguration { JedisConnectionConfiguration(RedisProperties properties, ObjectProvider<RedisSentinelConfiguration> sentinelConfiguration, ObjectProvider<RedisClusterConfiguration> clusterConfiguration) { super(properties, sentinelConfiguration, clusterConfiguration); } @Bean JedisConnectionFactory redisConnectionFactory(ObjectProvider<JedisClientConfigurationBuilderCustomizer> builderCustomizers) { return this.createJedisConnectionFactory(builderCustomizers); }无论是lettuce、jedis最后都会注入JedisConnectionFactory,就是redis要用的。高并发下缓存失效问题-缓存穿透:缓存穿透:指查询一个一定不存的数据,由于缓存是不命中,将去查询数据库,但是数据库也没有此记录,我们将这此查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层查询,失去了缓存的意义。风险:利用不存在的数据进行攻击,,数据库瞬时压力增大,最终导致崩溃。解决方案:null结果也缓存到redis,并加入短暂的过期时间。高并发下缓存失效-缓存雪崩缓存雪崩:指在我们设置缓存时key采用了相同的过期时间,导致缓存在某一时间同时失效,请求全部转发到DB,DB瞬时压力过大雪崩。解决方案:原有的失效时间基础上增加一个随机值,比如11-5分分钟随机,这样每一个缓存的过期时间重复概率就会降低,很难引发集体失效。高并发下缓存失效问题-缓存击穿缓存穿透:对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种费用“热点”的数据。如果这个key在大量请求同时进来前正好失效,那么所有对这个key的数据查询都落到DB上,我们称为缓存击穿。解决方案:加锁大量并发只让一个去查,其他人等待,查到以后释放锁,其它人获取到锁,先差缓存,就会有数据了,而不用去DB查询。总结:1、解决缓存穿透:空结果也缓存。2、解决缓存雪崩:设置过期时间(随机值)。3、解决缓存击穿:加锁。例如:ops.set("RecordData",null, 1, TimeUnit.DAYS); //null值也缓存 ops.set("RecordData",JSON.toJSONString(recordEntity), 1, TimeUnit.DAYS); 解锁:1、通过synchronized/Lock本地锁。2、分布式锁解决方案Redisson。但是本地锁synchronized/Lock不能解决分布式带来的击穿问题。因此分布式还得用Redisson进行加锁解决。redisson分布式锁,可参考另一篇redisson分布式锁。 //本地加锁 public RecordEntity getEntityByIdByRedis(Long id){ synchronized (this){ ValueOperations<String, String> ops = redisTemplate.opsForValue(); String recordData = ops.get("RecordData"); if(recordData!=null){ System.out.println("从数据库查询数据之前,有缓存数据。。。。"); return JSON.parseObject(recordData, new TypeReference<RecordEntity>() {}); } System.out.println("从数据库查询数据...."); RecordEntity recordEntity = baseMapper.selectById(id); ops.set("RecordData",JSON.toJSONString(recordEntity), 1, TimeUnit.DAYS); return recordEntity; } } //redis分布式锁 public RecordEntity getEntityByIdByFbs(Long id){ String uuid = UUID.randomUUID().toString(); ValueOperations<String, String> ops = redisTemplate.opsForValue(); //保证原子性,加锁同时设置过期时间 Boolean lock = ops.setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS); if(lock){ System.out.println("获取分布式锁成功。。。。。"); RecordEntity recordEntity = null; try{ recordEntity = getEntityById(id); }finally { //删除锁保证原子性,使用脚本 String script ="if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" + "then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid); } return recordEntity; }else{ System.out.println("获取分布式锁失败,等待重试。。。。"); //重试,可以等待sleep一下 try{ Thread.sleep(200); }catch (Exception e){ } return getEntityByIdByFbs(id); } } //业务代码 public RecordEntity getEntityById(Long id){ ValueOperations<String, String> ops = redisTemplate.opsForValue(); String recordData = ops.get("RecordData"); if(recordData!=null){ System.out.println("从数据库查询数据之前,有缓存数据。。。。"); return JSON.parseObject(recordData, new TypeReference<RecordEntity>() {}); } System.out.println("从数据库查询数据...."); RecordEntity recordEntity = baseMapper.selectById(id); ops.set("RecordData",JSON.toJSONString(recordEntity), 1, TimeUnit.DAYS); return recordEntity; } //Redisson分布式锁 public RecordEntity getEntityByIdByFbsRedisson(Long id){ //保证原子性,加锁同时设置过期时间 RLock lock = redissonClient.getLock("RecordData-lock"); lock.lock(); RecordEntity recordEntity = null; try { System.out.println("获取分布式锁成功。。。。。"); recordEntity = getEntityById(id); }finally { System.out.println("释放锁成功。。。。。"); lock.unlock(); } return recordEntity; } // 使用缓存注解方式 @Override @Cacheable(value = {"record"},key = "#root.method.name") public RecordEntity getRecordAllInfoById(Long id) { //未使用注解缓存方案 // RecordEntity recordEntity = null; // // ValueOperations<String, String> forValue = redisTemplate.opsForValue(); // String recordData = forValue.get("RecordData"); // if(recordData==null){ // System.out.println("缓存没数据,执行查询数据库方法。。。。"); // recordEntity = getEntityByIdByFbsRedisson(id); // }else{ // System.out.println("从缓存中获取数据。。。。。"); // recordEntity = JSON.parseObject(recordData, new TypeReference<RecordEntity>() { // }); // } //使用注解缓存后 RecordEntity recordEntity = getEntityById(id); if(recordEntity!=null){ Long categoryId = recordEntity.getCategoryId(); Long[] catelogPath = findCatelogPath(categoryId); recordEntity.setCatelogPath(catelogPath); } return recordEntity; }利用redis的set NX实现分布式redisson分布式锁,实现原理就是利用redis的set nx实现的。SET key value [EX seconds] [PX milliseconds] [NX|XX]分布式锁原理可以看redis官方文档set nx命令。更多分布式锁可参考,另一个篇springboot整合分布式锁redisson。
2022年03月03日
311 阅读
0 评论
6 点赞
1
2
3