首页
关于
友链
Search
1
wlop 4K 壁纸 4k8k 动态 壁纸
1,738 阅读
2
Nacos持久化MySQL问题-解决方案
1,088 阅读
3
Docker搭建Typecho博客
916 阅读
4
滑动时间窗口算法
908 阅读
5
ChatGPT注册 OpenAI's services are not available in your country 解决方法
886 阅读
生活
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
安全
开发工具
百度网盘
天翼网盘
阿里网盘
登录
Search
标签搜索
java
javase
docker
java8
springboot
thread
spring
分布式
mysql
锁
linux
redis
源码
typecho
centos
git
map
RabbitMQ
lambda
stream
少年
累计撰写
189
篇文章
累计收到
48
条评论
首页
栏目
生活
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
安全
开发工具
百度网盘
天翼网盘
阿里网盘
页面
关于
友链
搜索到
189
篇与
的结果
2022-03-13
SpringBoot整合Sentinel
SpringBoot整合Sentinel1、引入依赖<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency>2、下载安装Sentinel可视化控制台下载自己引入sentinel对应的核心包版本,下载地址:https://github.com/alibaba/Sentinel/releases运行Sentinel可视化控制台java -jar sentinel-dashboard-1.8.1.jar注意自己的版本。打开http://127.0.0.1:8080/,默认账号密码sentinel3、微服务配置sentinelapplication.ymlspring: cloud: sentinel: transport: port: 8719 dashboard: localhost:8080port: 8719端口随意,只要不被占用,用于各微服务与控制台通讯。4、查看监控信息启动微服务,随意访问一个接口,Sentinel控制台即可看到实施监控信息。5、启用相关功能可在Sentinel控制台调整相应功能。默认所有流控设置都是保存在内存中,重启服务就没有了。6、添加监控图标引入审计start,sentinel会自动根据spring-boot-starter-actuator监控。<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> <version>2.6.1</version> </dependency>开启监控图标management.endpoints.web.exposure.include=*7、自定义流控返回提示信息import com.alibaba.csp.sentinel.adapter.spring.webmvc.callback.BlockExceptionHandler; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.fastjson.JSON; import com.yanxizhu.common.utils.R; import org.springframework.context.annotation.Configuration; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * @description: Sentinel流控信息提示自定i * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/13 20:14 * @version: 1.0 */ @Configuration public class MySentinelConfig implements BlockExceptionHandler { @Override public void handle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, BlockException e) throws Exception { R r = R.error(10000, "自定义提示"); httpServletResponse.setContentType("application/json;charset=utf-8"); httpServletResponse.getWriter().write(JSON.toJSONString(r)); } }限流规则可参考官网限流文档。每一个微服务都有一个自己的自定义流控返回提示信息,其它配置一样,只是提示信息不同。8、熔断、降级针对Feign远程调用熔断引入Feign远程调用依赖<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency>打开 Sentinel 对 Feign 的支持:feign.sentinel.enabled=true熔断实现@FeignClient(value = "renren-fast",fallback = SeckillFeignServiceFallBack.class) public interface CardFeignService { @GetMapping("/sys/user/list") public R list(@RequestParam Map<String, Object> params); }熔断回调@S1f4j @Component public class SeckillFeignServiceFallBack implements SeckillFeignService{ @Override public R getskuseckillInfo(Long skuld) { Log.info("熔断方法调用...getskuSecki11Info"); return R.error(com.yanxizhu.family.common.exception.BizCodeEnume.TOO_MWANY_REQUEST.getCode(); com.yanxizhu.family.common.exception.BizCodeEnume.TOO_MANY_REQUEST.getAsg()); } }调用方:手动指定远程服务的降级策略。远程服务被降级处理。触发我们的熔断回调方法。提供方:超大浏览的时侯,必须牺牲一些远程服务。在服务的提供方(远程服务)指定降级策略;提供方是在运行。但是不运行自己的业务逻辑,返回的是默认的降级数据(限流的数据)。@SentinelResource 注解用来标识资源是否被限流、降级。上述例子上该注解的属性 sayHello 表示资源名。@SentinelResource 还提供了其它额外的属性如 blockHandler,blockHandlerClass,fallback 用于表示限流或降级的操作(注意有方法签名要求),更多内容可以参考 Sentinel 注解支持文档。若不配置 blockHandler、fallback 等函数,则被流控降级时方法会直接抛出对应的 BlockException;若方法未定义 throws BlockException 则会被 JVM 包装一层 UndeclaredThrowableException。注:一般推荐将 @SentinelResource 注解加到服务实现上,而在 Web 层直接使用 Spring Cloud Alibaba 自带的 Web 埋点适配。Sentinel Web 适配同样支持配置自定义流控处理逻辑,参考 相关文档。9、自定义受保护的资源1)、代码try(Entry entry =SphU.entry("seckillSkus"))(/业务逻辑catch(Execption e)(}2)、基于注解。eSentinelResource(value ="getCurrentSeckiLLSkusResource",blockHandler ="blockHandler")无论是1,2方式一定要配置被限流以后的默认返回注:一般推荐将 @SentinelResource 注解加到服务实现上,而在 Web 层直接使用 Spring Cloud Alibaba 自带的 Web 埋点适配。Sentinel Web 适配同样支持配置自定义流控处理逻辑,参考 相关文档。10、网关限流引入网关sentinel依赖<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-spring-cloud-gateway-adapter</artifactId> <version>x.y.z</version> </dependency>使用时只需注入对应的 SentinelGatewayFilter 实例以及 SentinelGatewayBlockExceptionHandler 实例即可(若使用了 Spring Cloud Alibaba Sentinel,则只需按照文档进行配置即可,无需自己加 Configuration)。更多网关限流可参考官方文档11、定制网关流控返回参考官方文档
2022年03月13日
378 阅读
0 评论
5 点赞
2022-03-13
RabbitMQ延时队列及消息可靠性
RabbitMQ延时队列及消息可靠性
2022年03月13日
381 阅读
0 评论
5 点赞
2022-03-13
RabbitMQ运行机制
RabbitMQ运行机制AMQP 中的消息路由 AMQP 中消息的路由过程和 Java 开 发者熟悉的 JMS 存在一些差别, AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布 到 Exchange 上,消息最终到达队列 并被消费者接收,而 Binding 决定交 换器的消息应该发送到那个队列Exchange 类型Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、 fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接 看另外三种类型:Direct Exchange消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器 就将消息发到对应的队列中。路由键与队 列名完全匹配,如果一个队列绑定到交换 机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发 “dog.puppy”,也不会转发“dog.guard” 等等。它是完全匹配、单播的模式Fanout Exchange每个发到 fanout 类型交换器的消息都 会分到所有绑定的队列上去。fanout 交 换器不处理路由键,只是简单的将队列 绑定到交换器上,每个发送到交换器的 消息都会被转发到与该交换器绑定的所 有队列上。很像子网广播,每台子网内 的主机都获得了一份复制的消息。 fanout 类型转发消息是最快的Topic Exchangetopic 交换器通过模式匹配分配消息的 路由键属性,将路由键和某个模式进行 匹配,此时队列需要绑定到一个模式上。 它将路由键和绑定键的字符串切分成单 词,这些单词之间用点隔开。它同样也 会识别两个通配符:符号“#”和符号 “”。#匹配0个或多个单词,匹配一 个单词RabbitMQ消息确认机制-可靠抵达保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制publisher confirmCallback 确认模式publisher returnCallback 未投递到 queue 退回模式consumer ack机制1、发送端-可靠抵达-ConfirmCallback• spring.rabbitmq.publisher-confirms=true• 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启confirmcallback 。• CorrelationData:用来表示当前消息唯一性。• 消息只要被 broker 接收到就会执行 confirmCallback,如果是cluster 模式,需要所有broker 接收到才会调用 confirmCallback。• 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。2、发送端-可靠抵达-ReturnCallback• spring.rabbitmq.publisher-returns=true• spring.rabbitmq.template.mandatory=true• confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标queue里。在有些业务场景下,我们需要保证消息一定要投递到目标queue 里,此时就需要用到return 退回模式。• 这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。3、消费端-可靠抵达-Ack消息确认机制消费者获取到消息,成功处理,可以回复Ack给Broker•basic.ack用于肯定确认;broker将移除此消息 •basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量 •basic.reject用于否定确认;同上,但不能批量•默认自动ack,消息被消费者收到,就会从broker的queue中移除•queue无消费者,消息依然会被存储,直到消费者消费•消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式•消息处理成功,ack(),接受下一个消息,此消息broker就会移除•消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack•消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人详细使用可参考,springboot整合RabbitMQ 及初识RabbitMQ
2022年03月13日
271 阅读
0 评论
2 点赞
2022-03-13
SpringBoot整合RabbitMQ
SpringBoot整合RabbitMQ1、引入依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.6.3</version> </dependency>2、配置rabbitmq通过RabbitProperties.properties配置文件配置链接 rabbitmq: host: 192.168.56.10 virtual-host: / port: 5672 publisher-confirm-type: correlated publisher-returns: true template: mandatory: true重点:#开启发送端确认机制,发送到Broke的确认 publisher-confirm-type: correlated #开启时消息发送到队列的消息确认机制 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个Returnconfirm template: mandatory: true3、开启Rabbit启动类添加@EnableRabbit注解@EnableRabbit @SpringBootApplication public class FamilyBookingApplication { public static void main(String[] args) { SpringApplication.run(FamilyBookingApplication.class, args); } }4、Rabbit使用现在RabbitAutoConfiguration就生效了,就有了组件RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate5、使用测试1、创建Exchange、Queue、Binding使用AmqpAdmin创建。通过创建各交换机构造方法,找到对应接口的实现,传入参数即可创建Exchange。public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) @Autowired AmqpAdmin amqpAdmin; /** * 创建Exchange交换机 */ @Test public void craeteChage() { /** * public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) */ DirectExchange directExchange = new DirectExchange("Family-Hello-Exchange",true,false); amqpAdmin.declareExchange(directExchange); } /** * 创建Queue交换机 */ @Test public void craetQueue() { /** * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments) */ Queue queue = new Queue("Family-Hello-Queue", true,false, false); amqpAdmin.declareQueue(queue); } /** * 创建Bind交换机 */ @Test public void craetBind() { /** * public Binding(String destination, 绑定目标 * Binding.DestinationType destinationType, 绑定类型 * String exchange, 交换机 * String routingKey, router-key * @Nullable Map<String, Object> arguments 参数) { */ Binding binding = new Binding("Family-Hello-Queue", Binding.DestinationType.QUEUE, "Family-Hello-Exchange", "Java.hello", null); amqpAdmin.declareBinding(binding); }上面为DirectExchange类型交换机的创建,其它类型Exchange类似。更多API可参考AmqpAdmin的各方法。2、收发消息发送消息 /** * rabbitTemplate 发送消息 */ @Test public String sendMessage(){ //发送字符串 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello","Hello RabbitMQ 。。。",new CorrelationData(UUID.randomUUID().toString())); //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setAddress("rabbitMQ测试-会员地址信息"); memberEntity.setEmail("rabbitMQ测试-会员邮箱,batis@foxmial.com"); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello",memberEntity,new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; }3、接受消息RabbitMQ为我们提供了监听@RabbitListener监听注解,使用时,必须开启@EnableRabbit功能。不用监听消息队列时,可以不用开启@EnableRabbit注解。 /** * 监听队列是各数组,可以同时监听多个队列 * 通过返回的消息类型知道是一个org.springframework.amqp.core.Message; * @param message */ @RabbitListener(queues={"Family-Hello-Queue"}) public String sendMessage(){ //发送字符串 // rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello","Hello RabbitMQ 。。。",new CorrelationData(UUID.randomUUID().toString())); //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setAddress("rabbitMQ测试-会员地址信息"); memberEntity.setEmail("rabbitMQ测试-会员邮箱,batis@foxmial.com"); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello",memberEntity,new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; } /** * * 通过传入第二个参数spring可以帮我们把之间转的对象获取后转成对象,第二个参数不固定的泛型<T> * 比如上面发送的是一个user对象,那我们第二个参数就是User user,接受到的数据就直接是user了。 * 还可以直接获取到链接中传输数据的通道channel * @param message */ @RabbitListener(queues={"Family-Hello-Queue"}) public void receiveMessageAndObject(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息2:"+message+"消息类型:"+message.getClass()); System.out.println("监听到队列消息2:"+message+"消息内容:"+memberEntity); }Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息注意:1)、同一个消息,只能有一个客户端收到2)、只有一个消息完全处理完,方法运行结束,我们才可以接收到下一个消息3)、@RabbitHandler也可以接受队列消息@RabbitListener可以用在类和方法上@RabbitHandler只能用在方法上。@RabbitHandler应用场景:当发送的数据为不同类型时,@RabbitListener接口消息时,参数类型不知道定义那个。比如:想队列发送的对象有user、People people,那下面接受时,对象不知道写那个public void receiveMessageObject(Message message, 这里该写那个对象, Channel channel)因此@RabbitHandler就可以用上了。通过在类上加上@RabbitHandler注解监听队列消息,在方法上加@RabbitHandler注解,参数就可以指定接受那个数据类型的消息了。简单代码如下:@RabbitListener(queues={"Family-Hello-Queue"}) @Service("memberService") public class MemberServiceImpl extends ServiceImpl<MemberDao, MemberEntity> implements MemberService { //接受队列中指定的People类型数据 @RabbitHandler public void receiveMessageObj(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息:"+message+"消息内容:"+memberEntity); } //接受队列中指定的User类型数据 @RabbitHandler public void receiveMessageObj(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息:"+message+"消息内容:"+memberEntity); } }6、RabbitMQ Java配置Json序列化通过RabbitAutoConfiguration.java中注入的RabbitTemplate看到configure。@Bean @ConditionalOnMissingBean public RabbitTemplateConfigurer rabbitTemplateConfigurer(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) { RabbitTemplateConfigurer configurer = new RabbitTemplateConfigurer(); configurer.setMessageConverter((MessageConverter)messageConverter.getIfUnique()); configurer.setRetryTemplateCustomizers((List)retryTemplateCustomizers.orderedStream().collect(Collectors.toList())); configurer.setRabbitProperties(properties); return configurer; } @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean({RabbitOperations.class}) public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(); configurer.configure(template, connectionFactory); return template; }如果配置中有MessageConverter消息转换器就用configurer.setMessageConverter((MessageConverter)messageConverter.getIfUnique());如果没有就用RabbitTemplate默认配置的。RabbitTemplateConfigurer.java默认配置源码中可以看到默认使用的转换器如下:public MessageConverter getMessageConverter() { return this.messageConverter; } private MessageConverter messageConverter = new SimpleMessageConverter(); 然后通过SimpleMessageConverter可以看到默认创建消息时,使用的消息转换器类型 protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; if (object instanceof byte[]) { bytes = (byte[])((byte[])object); messageProperties.setContentType("application/octet-stream"); } else if (object instanceof String) { try { bytes = ((String)object).getBytes(this.defaultCharset); } catch (UnsupportedEncodingException var6) { throw new MessageConversionException("failed to convert to Message content", var6); } messageProperties.setContentType("text/plain"); messageProperties.setContentEncoding(this.defaultCharset); } else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException var5) { throw new MessageConversionException("failed to convert to serialized Message content", var5); } messageProperties.setContentType("application/x-java-serialized-object"); } if (bytes != null) { messageProperties.setContentLength((long)bytes.length); return new Message(bytes, messageProperties); } else { throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); } }如果是序列化的就是使用的SerializationUtils.serialize(object);else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException var5) { throw new MessageConversionException("failed to convert to serialized Message content", var5); } messageProperties.setContentType("application/x-java-serialized-object");因此我们可以自定义MessageConverter。通过MessageConverter接口实现类型,可以看到json各实现类。因此RabbitMQ自定义配置如下:import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @description: RabbitMQ配置 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/12 17:07 * @version: 1.0 */ @Configuration public class MyRabbitConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }再次发送对象时,从mq里面获取的对象就是json格式了。7、开启消息回调确认机制#开启发送端确认机制,发送到Broke的确认 publisher-confirm-type: correlated #开启时消息发送到队列的消息确认机制 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个ReturnCallback template: mandatory: true #手动确认ack listener: simple: acknowledge-mode: manual8、重点:发送端:package com.yanxizhu.family.users.config; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * @description: RabbitMQ配置 序列化 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/17 16:44 * @version: 1.0 */ @Configuration public class RabbitConfig { private RabbitTemplate rabbitTemplate; /** * rabbitTemplate循环依赖,解决方法:手动初始化模板方法 * @param connectionFactory * @return */ @Primary @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setMessageConverter(messageConverter()); initRabbitTemplate(); return rabbitTemplate; } @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 定制rabbitMaTemplate * 一、发送端消息确认机制,发送给Broke的消息确认回调 * 1、开始消息确认 publisher-confirm-type: correlated * 2、设置确认回调 setConfirmCallback() * 二、发送端消息确认机制,发送给队列的消息确认回调 * 1、开始消息确认 * publisher-returns: true * template: * mandatory: true * 2、设置确认回调 setReturnsCallback * 三、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息) * 1、开启消费端消息确认 * listener: * simple: * acknowledge-mode: manual * 2、消费端默认是自动消息确认的,只要消息手法哦,客户端会自动确认,服务器就会移除这个消息。 * 默认的消息确认(默认ACK)问题: * 我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,服务器宕机了。会发生消息丢失; * 3、因此,需要使用消费者手动确认默认(手动ack),只要我们没有明确高数MQ,消息被消费,就没有ack,消息就一致是unacked状态。 * 即使服务器宕机,消息也不会丢失,会重新变为ready,下一次有新的消费者链接进来就发给它处理。 * 4、手动ack如何签收 * //消息头信息 * MessageProperties messageProperties = message.getMessageProperties(); * long deliveryTag = messageProperties.getDeliveryTag(); * channel.basicAck(deliveryTag,true); //手动确认签收,deliveryTag默认自增的,true是否批量确认签收 * 5、消费退回 * * //参数:long var1, boolean var3(是否批量), boolean var4 * channel.basicNack(deliveryTag,true,true); //可以批量回退 * * //参数:long var1, boolean var3 * channel.basicReject(deliveryTag,true); //单个回退 * * 第三个参数或第二个参数:false 丢弃 true:发回服务器,服务器重新入对。 */ public void initRabbitTemplate(){ //设置确认回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 只要消息抵达服务器 * @param correlationData 当前消息关联的唯一数据(这个消息的唯一id) * @param ack 消息是否成功收到 * @param cause 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //correlationData:注意,这个correlationData就是发送消息时,设置的correlationData。 System.out.println("====确认收到消息了。。。。================"); System.out.println(correlationData+"|"+ack+"|"+cause); } }); //注意:必须要有这一步,不然不生效 rabbitTemplate.setMandatory(true); //设置消息抵达队列回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息没有投递给指定的队列,就触发这个失败回调 * @param message 投递失败的消息详细信息 * @param replyCode 回复的状态码 * @param replyText 回复的文本内容 * @param exchange 当时这个消息发给哪个交换机 * @param routingKey 当时这个消息用的哪个路由键 */ public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("==========投递到队列失败了。。。=========="); System.out.println(message); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); } }); } } 发送消息时,可以将new CorrelationData(UUID.randomUUID().toString()),传的uuid参数保存到数据库,到时候直接遍历数据库即可知道那些投递成功到队列,那些投递失败。说明: * 一、发送端消息确认机制,发送给Broke的消息确认回调 * 1、开始消息确认 publisher-confirm-type: correlated * 2、设置确认回调 setConfirmCallback() * 二、发送端消息确认机制,发送给队列的消息确认回调 * 1、开始消息确认 * publisher-returns: true * template: * mandatory: true * 2、设置确认回调 setReturnsCallback * 三、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息) * 1、开启消费端消息确认 * listener: * simple: * acknowledge-mode: manual * 2、消费端默认是自动消息确认的,只要消息手法哦,客户端会自动确认,服务器就会移除这个消息。 * 默认的消息确认(默认ACK)问题: * 我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,服务器宕机了。会发生消息丢失; * 3、因此,需要使用消费者手动确认默认(手动ack),只要我们没有明确高数MQ,消息被消费,就没有ack,消息就一致是unacked状态。 * 即使服务器宕机,消息也不会丢失,会重新变为ready,下一次有新的消费者链接进来就发给它处理。消费端手动ack /** * * 通过传入第二个参数spring可以帮我们把之间转的对象获取后转成对象,第二个参数不固定的泛型<T> * 比如上面发送的是一个user对象,那我们第二个参数就是User user,接受到的数据就直接是user了。 * 还可以直接获取到链接中传输数据的通道channel * @param message */ @RabbitHandler public void receiveMessageObject(Message message, MemberEntity memberEntity, Channel channel) throws IOException { //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); //手动确认签收 channel.basicAck(deliveryTag,true); System.out.println("监听到队列消息3:"+message+"消息类型:"+message.getClass()); System.out.println("监听到队列消息3:"+message+"消息内容:"+memberEntity); //回退 //可以批量回退 // channel.basicNack(deliveryTag,true,true); //单个回退 //true重新排队,false丢弃 // channel.basicReject(deliveryTag,true); }说明:手动ack如何签收 //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); channel.basicAck(deliveryTag,true); //手动确认签收,deliveryTag默认自增的,true是否批量确认签收消费退回 //参数:long var1, boolean var3(是否批量), boolean var4 channel.basicNack(deliveryTag,true,true); //可以批量回退 //参数:long var1, boolean var3 channel.basicReject(deliveryTag,true); //单个回退 第三个参数或第二个参数:false 丢弃 true:发回服务器,服务器重新入对。最后rabbitMQ配置:spring: rabbitmq: host: 192.168.56.10 virtual-host: / port: 5672 #开启发送端确认 publisher-confirm-type: correlated #开启发送端消息抵达队列的确认 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个retureconfirm template: mandatory: true #开启手动确认 listener: simple: acknowledge-mode: manualSpringBoot中使用延时队列1、Queue、Exchange、Binding可以@Bean进去 2、监听消息的方法可以有三种参数(不分数量,顺序) Object content, Message message, Channel channel 3、channel可以用来拒绝消息,否则自动ack;代码实现发送消息到MQ: public String sendDelayMessage(){ //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setId(11111111L); memberEntity.setAddress("死信队列测试"); memberEntity.setEmail("延迟队列,异步处理会员信息"); memberEntity.setCreateTime(new Date()); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("member-event-exchange","users.create.member",memberEntity, new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; }通过延迟队列,将延迟的会员信息,发送到另一个队列,通过监听过期延迟的队列,接收延迟的会员信息。package com.yanxizhu.family.users.config; import com.rabbitmq.client.Channel; import com.yanxizhu.family.users.entity.MemberEntity; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * @description: 延迟队列 * 说明: 1、首先通过创建的路由键(users.create.member),将消息从交换机(member-event-exchange)发送到死信队列(member.delay.queue)。 * 2、根据死信队列(member.delay.queue)的配置,过期后,消息将通过路由键为(users.release.member)转发到交换机(member-event-exchange)。 * 3、最后交换机(member-event-exchange)再通过绑定的最终队列(users.release.member.queue)和路由键(users.release.member)转发到最终队列(member-event-exchange)。 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/17 20:47 * @version: 1.0 */ @Configuration public class DelayQueueConfig { /** * 监听最终的队列 * @param entity * @param channel * @param message * @throws IOException */ @RabbitListener(queues="member.release.queue") public void listener(MemberEntity entity, Channel channel, Message message) throws IOException { System.out.println("收到过期的会员信息:准备关闭会员"+entity.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } //@bean自动创建交换机、路由、绑定 //交换机 @Bean public Exchange orderEventExchange(){ return new TopicExchange("member-event-exchange", true, false); } /** * 容器中的Binding、Queue、Exchange都会自动创建(RabbitMQ没有的情况) * @return */ //死信队列 @Bean public Queue memberDelayQueue(){ Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "member-event-exchange");//交换机 arguments.put("x-dead-letter-routing-key", "users.release.member");//路由键 arguments.put("x-message-ttl", 60000);//过期时间 //public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments) Queue queue = new Queue("member.delay.queue",true,false,false,arguments); return queue; } //将死信队列和交换机绑定,路由键-users.create.member @Bean public Binding orderCreateOrderBingding(){ //目的地、绑定类型、交换机、路由键、参数 //public Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments) return new Binding("member.delay.queue", Binding.DestinationType.QUEUE, "member-event-exchange", "users.create.member", null); } //最后过期的队列 @Bean public Queue orderReleaseQueue(){ Queue queue = new Queue("member.release.queue",true,false,false); return queue; } //将最后的队列和交换机绑定,路由键-user.release.member @Bean public Binding orderReleaseOrderBingding(){ return new Binding("member.release.queue", Binding.DestinationType.QUEUE, "member-event-exchange", "users.release.member", null); } }
2022年03月13日
688 阅读
0 评论
9 点赞
2022-03-12
Windows常用命令
Widnows常用命令查看所有端口号情况netstat -ano搜索查看指定端口netstat -ano|findstr 端口号查看所有程序占用端口信息tasklist搜索指定端口被那个程序占用tasklist|findstr 端口号
2022年03月12日
253 阅读
0 评论
3 点赞
2022-03-12
Docker安装RabbitMQ
Docker安装RabbitMQ运行RabbitMQ容器第一次运行没有RabbitMQ镜像,会自动下载。docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management说明:4369, 25672 (Erlang发现&集群端口)5672, 5671 (AMQP端口)15672 (web管理后台端口)61613, 61614 (STOMP协议端口)1883, 8883 (MQTT协议端口)https://www.rabbitmq.com/networking.html设置随docker启动docker update --restart=always rabbitmq访问RabbitMQ通过ip地址加15672端口即可访问,初始账号密码guest
2022年03月12日
300 阅读
0 评论
5 点赞
2022-03-12
初识RabbitMQ
RabbitMQ作用异步处理、应用解耦、流量控制概述大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力消息服务中两个重要概念: 消息代理(message broker)和目的地(destination) 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。消息队列主要有两种形式的目的地队列(queue):点对点消息通信(point-to-point)主题(topic):发布(publish)/订阅(subscribe)消息通信点对点式:• 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获 取消息内容,消息读取后被移出队列 • 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者发布订阅式: • 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个 主题,那么就会在消息到达时同时收到消息JMS(Java Message Service)JAVA消息服务: • 基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现AMQP(Advanced Message Queuing Protocol)• 高级消息队列协议,也是一个消息代理的规范,兼容JMS• RabbitMQ是AMQP的实现Spring支持 • spring-jms提供了对JMS的支持 • spring-rabbit提供了对AMQP的支持 • 需要ConnectionFactory的实现来连接消息代理• 提供JmsTemplate、RabbitTemplate来发送消息• @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息 代理发布的消息• @EnableJms、@EnableRabbit开启支持Spring Boot自动配置 • JmsAutoConfiguration• RabbitAutoConfiguration市面的MQ产品 • ActiveMQ、RabbitMQ、RocketMQ、KafkRabbitMQ概念RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现核心概念Message消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。 Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。 Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别 Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直 在队列里面,等待消费者连接到这个队列将其取走。 Binding绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。 Exchange 和Queue的绑定可以是多对多的关系。 Connection网络连接,比如一个TCP连接。Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都 是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 Consumer消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥 有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时 指定,RabbitMQ 默认的 vhost 是 /。 Broker 表示消息队列服务器实体
2022年03月12日
352 阅读
0 评论
2 点赞
2022-03-12
SpringBoot整合seata分布式事务(不适用高并发场景)
Springboot整合seata分布式事务一、创建seata日志表-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;二、安装事务协调器(seata-server)1、下载地址 2、导入依赖<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency>3、启动seata服务器4、seata文件说明registry.conf:注册中心配置。这里执行注册中西为nacos。type = "nacos",config {执行seata配置数据放在那里,这里默认使用文件放配置数据。 type = "file"file.conf:seata默认配置信息存放这里。例如,事务日志存放地方配置## transaction log store, only used in server side store { ## store mode: file、db mode = "file" ## file store property file { ## store location dir dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions maxBranchSessionSize = 16384 # globe session size , if exceeded throws exceptions maxGlobalSessionSize = 512 # file buffer size , if exceeded allocate new buffer fileWriteBufferCacheSize = 16384 # when recover batch read size sessionReloadReadSize = 100 # async, sync flushDiskMode = async } ## database store property db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. datasource = "druid" ## mysql/oracle/postgresql/h2/oceanbase etc. dbType = "mysql" driverClassName = "com.mysql.jdbc.Driver" ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param url = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true" user = "mysql" password = "mysql" minConn = 5 maxConn = 30 globalTable = "global_table" branchTable = "branch_table" lockTable = "lock_table" queryLimit = 100 } }三、自定义代理数据源注意:所有想要用到分布式事务的微服务使用seata DataSourceProxy代理自己的数据源。springboot默认使用的是Hikari数据源。DataSourceAutoConfiguration.java源代码:// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.jdbc; import javax.sql.DataSource; import javax.sql.XADataSource; import org.springframework.boot.autoconfigure.condition.AnyNestedCondition; import org.springframework.boot.autoconfigure.condition.ConditionMessage; import org.springframework.boot.autoconfigure.condition.ConditionOutcome; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.SpringBootCondition; import org.springframework.boot.autoconfigure.condition.ConditionMessage.Builder; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.Dbcp2; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.Generic; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.Hikari; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.OracleUcp; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.Tomcat; import org.springframework.boot.autoconfigure.jdbc.DataSourceInitializationConfiguration.InitializationSpecificCredentialsDataSourceInitializationConfiguration; import org.springframework.boot.autoconfigure.jdbc.DataSourceInitializationConfiguration.SharedCredentialsDataSourceInitializationConfiguration; import org.springframework.boot.autoconfigure.jdbc.metadata.DataSourcePoolMetadataProvidersConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.boot.jdbc.EmbeddedDatabaseConnection; import org.springframework.context.annotation.Condition; import org.springframework.context.annotation.ConditionContext; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.ConfigurationCondition.ConfigurationPhase; import org.springframework.core.env.Environment; import org.springframework.core.type.AnnotatedTypeMetadata; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; import org.springframework.util.StringUtils; @Configuration( proxyBeanMethods = false ) @ConditionalOnClass({DataSource.class, EmbeddedDatabaseType.class}) @ConditionalOnMissingBean( type = {"io.r2dbc.spi.ConnectionFactory"} ) @EnableConfigurationProperties({DataSourceProperties.class}) @Import({DataSourcePoolMetadataProvidersConfiguration.class, InitializationSpecificCredentialsDataSourceInitializationConfiguration.class, SharedCredentialsDataSourceInitializationConfiguration.class}) public class DataSourceAutoConfiguration { public DataSourceAutoConfiguration() { } static class EmbeddedDatabaseCondition extends SpringBootCondition { private static final String DATASOURCE_URL_PROPERTY = "spring.datasource.url"; private final SpringBootCondition pooledCondition = new DataSourceAutoConfiguration.PooledDataSourceCondition(); EmbeddedDatabaseCondition() { } public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) { Builder message = ConditionMessage.forCondition("EmbeddedDataSource", new Object[0]); if (this.hasDataSourceUrlProperty(context)) { return ConditionOutcome.noMatch(message.because("spring.datasource.url is set")); } else if (this.anyMatches(context, metadata, new Condition[]{this.pooledCondition})) { return ConditionOutcome.noMatch(message.foundExactly("supported pooled data source")); } else { EmbeddedDatabaseType type = EmbeddedDatabaseConnection.get(context.getClassLoader()).getType(); return type == null ? ConditionOutcome.noMatch(message.didNotFind("embedded database").atAll()) : ConditionOutcome.match(message.found("embedded database").items(new Object[]{type})); } } private boolean hasDataSourceUrlProperty(ConditionContext context) { Environment environment = context.getEnvironment(); if (environment.containsProperty("spring.datasource.url")) { try { return StringUtils.hasText(environment.getProperty("spring.datasource.url")); } catch (IllegalArgumentException var4) { } } return false; } } static class PooledDataSourceAvailableCondition extends SpringBootCondition { PooledDataSourceAvailableCondition() { } public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) { Builder message = ConditionMessage.forCondition("PooledDataSource", new Object[0]); return DataSourceBuilder.findType(context.getClassLoader()) != null ? ConditionOutcome.match(message.foundExactly("supported DataSource")) : ConditionOutcome.noMatch(message.didNotFind("supported DataSource").atAll()); } } static class PooledDataSourceCondition extends AnyNestedCondition { PooledDataSourceCondition() { super(ConfigurationPhase.PARSE_CONFIGURATION); } @Conditional({DataSourceAutoConfiguration.PooledDataSourceAvailableCondition.class}) static class PooledDataSourceAvailable { PooledDataSourceAvailable() { } } @ConditionalOnProperty( prefix = "spring.datasource", name = {"type"} ) static class ExplicitType { ExplicitType() { } } } @Configuration( proxyBeanMethods = false ) @Conditional({DataSourceAutoConfiguration.PooledDataSourceCondition.class}) @ConditionalOnMissingBean({DataSource.class, XADataSource.class}) @Import({Hikari.class, Tomcat.class, Dbcp2.class, OracleUcp.class, Generic.class, DataSourceJmxConfiguration.class}) protected static class PooledDataSourceConfiguration { protected PooledDataSourceConfiguration() { } } @Configuration( proxyBeanMethods = false ) @Conditional({DataSourceAutoConfiguration.EmbeddedDatabaseCondition.class}) @ConditionalOnMissingBean({DataSource.class, XADataSource.class}) @Import({EmbeddedDataSourceConfiguration.class}) protected static class EmbeddedDatabaseConfiguration { protected EmbeddedDatabaseConfiguration() { } } } 导入很多数据源 @Import({Hikari.class, Tomcat.class, Dbcp2.class, OracleUcp.class, Generic.class, DataSourceJmxConfiguration.class})DataSourceConfiguration.java源代码@Configuration( proxyBeanMethods = false ) @ConditionalOnClass({HikariDataSource.class}) @ConditionalOnMissingBean({DataSource.class}) @ConditionalOnProperty( name = {"spring.datasource.type"}, havingValue = "com.zaxxer.hikari.HikariDataSource", matchIfMissing = true ) static class Hikari { Hikari() { } @Bean @ConfigurationProperties( prefix = "spring.datasource.hikari" ) HikariDataSource dataSource(DataSourceProperties properties) { HikariDataSource dataSource = (HikariDataSource)DataSourceConfiguration.createDataSource(properties, HikariDataSource.class); if (StringUtils.hasText(properties.getName())) { dataSource.setPoolName(properties.getName()); } return dataSource; } }自定义代理数据源配置:import com.zaxxer.hikari.HikariDataSource; import io.seata.rm.datasource.DataSourceProxy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; import javax.sql.DataSource; /** * @description: Seata自定义代理数据源 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/12 14:41 * @version: 1.0 */ @Configuration public class MySeataConfig { @Autowired DataSourceProperties dataSourceProperties; @Bean public DataSource dataSource(DataSourceProperties dataSourceProperties){ HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build(); if (StringUtils.hasText(dataSourceProperties.getName())) { dataSource.setPoolName(dataSourceProperties.getName()); } return new DataSourceProxy(dataSource); } }四、复制seata配置文件到项目resources下主要复制seata里面的模板文件file.conf、registry.conf注意:file.conf 的 service.vgroup_mapping 配置必须和spring.application.name一致。vgroup_mapping.{应用名称}-fescar-service-group = "default"service{ vgroup_mapping.family-booking-fescar-service-group = "default" }family-booking:微服务名字也可以通过配置 spring.cloud.alibaba.seata.tx-service-group修改后缀,但是必须和file.conf中的配置保持一致五、使用在分布式事务入口方法加上全局事务注解@GlobalTransactional,远程调用方法上加本地事务注解@Transactional即可。@GlobalTransactional @Transactional public PageUtils queryPage(Map<String, Object> params) { //调用远程方法,另一个微服务的方法加上小事务@Transactional }六、使用场景不适用高并发的场景,适用普通的业务远程调用。seata默认是使用的AT模式。针对高并发场景还是的用柔性事务,消息队列、延迟队列,MQ中间件完成。
2022年03月12日
456 阅读
0 评论
8 点赞
2022-03-12
Springboot同一Server类方法调用事务解决方案
Springboot同一Server类方法调用事务解决方案1、引入springboot-aop start<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency>主要是使用里面的动态代理 <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>1.9.7</version> <scope>compile</scope> </dependency>2、开启动态代理@EnableAspectJAutoProxy(exposeProxy = true) @EnableDiscoveryClient @SpringBootApplication public class FamilyBookingApplication { public static void main(String[] args) { SpringApplication.run(FamilyBookingApplication.class, args); } }@EnableAspectJAutoProxy(exposeProxy = true):开启aspectj动态代理功能。以后所有的动态代理都是aspectj对象暴露代理对象。3、本类互调用代理对象调用@Service("userService") public class UserServiceImpl implements userService { @Transactional public void a(){ UserService userService = (UserService)AopContext.currentProxy(); userService.b(); userService.c(); } @Transactional public void b(){ } @Transactional public void c(){ } }
2022年03月12日
330 阅读
0 评论
4 点赞
1
...
12
13
14
...
21