首页
关于
友链
Search
1
wlop 4K 壁纸 4k8k 动态 壁纸
1,470 阅读
2
Nacos持久化MySQL问题-解决方案
932 阅读
3
Docker搭建Typecho博客
752 阅读
4
滑动时间窗口算法
728 阅读
5
Nginx反向代理微服务配置
699 阅读
生活
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
安全
开发工具
百度网盘
天翼网盘
阿里网盘
登录
Search
标签搜索
java
javase
docker
java8
springboot
thread
spring
分布式
mysql
锁
linux
redis
源码
typecho
centos
git
map
RabbitMQ
lambda
stream
少年
累计撰写
189
篇文章
累计收到
24
条评论
首页
栏目
生活
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
安全
开发工具
百度网盘
天翼网盘
阿里网盘
页面
关于
友链
搜索到
1
篇与
的结果
2022-04-25
J.U.C之AQS
AQSJ.U.C之AQS,底层由线程队列实现。AbstractQueuedSynchronizer简称AQS,JUC的核心。使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架。利用了一个int类型表示状态。使用方法是继承。子类通过继承并通过实现它的方法管理其状态{acquire和release}的方法操纵状态。可以同时实现排它锁和共享锁模式(独占、共享)。AQS主要同步组件CountDownLatchSemaphoreCyclicBarrierReentrantLockConditionFutureTask依次介绍1、CountDownLatch-闭锁可以完成类似阻塞的功能。一个线程或多个线程等待,直到其它线程完成。使用场景,程序执行需要某个条件执行完后,才执行。示例一package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @description: CountDownLatch,实现线程阻塞,等待执行完再执行程序打印 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:24 * @version: 1.0 */ @Slf4j @ThreadSafety public class CountDownLatchDemo { private static final int threadNum = 500; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ pringNum(num); }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); }finally { countDownLatch.countDown(); } }); } countDownLatch.await(); executorService.shutdown(); log.info("finash .."); } public static void pringNum(int num) throws InterruptedException { Thread.sleep(100); log.info("num=【{}】", num); Thread.sleep(100); } }输出结果:。。。。。 20:36:34.598 [pool-1-thread-249] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【248】 20:36:34.608 [pool-1-thread-259] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【258】 20:36:34.597 [pool-1-thread-237] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【236】 20:36:34.597 [pool-1-thread-206] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【205】 20:36:34.608 [pool-1-thread-256] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【255】 20:36:34.608 [pool-1-thread-258] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【257】 20:36:34.597 [pool-1-thread-240] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【239】 20:36:34.597 [pool-1-thread-67] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【66】 20:36:34.608 [pool-1-thread-251] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【250】 20:36:34.608 [pool-1-thread-257] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【256】 20:36:34.608 [pool-1-thread-44] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【43】 20:36:34.598 [pool-1-thread-61] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【60】 20:36:34.598 [pool-1-thread-33] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【32】 20:36:34.597 [pool-1-thread-172] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【171】 20:36:34.608 [pool-1-thread-230] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【229】 20:36:34.608 [pool-1-thread-253] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【252】 20:36:34.598 [pool-1-thread-59] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【58】 20:36:34.598 [pool-1-thread-39] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【38】 20:36:34.608 [pool-1-thread-298] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【297】 20:36:34.596 [pool-1-thread-192] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【191】 20:36:34.597 [pool-1-thread-75] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【74】 20:36:34.608 [pool-1-thread-299] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【298】 20:36:34.597 [pool-1-thread-146] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【145】 20:36:34.608 [pool-1-thread-304] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【303】 20:36:34.598 [pool-1-thread-229] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【228】 20:36:34.597 [pool-1-thread-87] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【86】 20:36:34.597 [pool-1-thread-46] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【45】 20:36:34.764 [main] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - finash ..另一种用法,给定时间内未执行执行await方法。到时间等当前线程执行完成后就不执行了。示例一package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @description: CountDownLatch,设置等待时间 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:24 * @version: 1.0 */ @Slf4j @ThreadSafety public class CountDownLatchDemo2 { private static final int threadNum = 500; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ pringNum(num); }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); }finally { countDownLatch.countDown(); } }); } //设置了等待时间 countDownLatch.await(100, TimeUnit.MILLISECONDS); executorService.shutdown(); log.info("finash .."); } public static void pringNum(int num) throws InterruptedException { Thread.sleep(100); log.info("num=【{}】", num); } }输出结果20:39:27.285 [pool-1-thread-258] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【257】 20:39:27.284 [pool-1-thread-180] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【179】 20:39:27.287 [pool-1-thread-54] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【53】 20:39:27.283 [pool-1-thread-368] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【367】 20:39:27.285 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【7】 20:39:27.301 [main] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - finash .. 20:39:27.286 [pool-1-thread-204] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【203】 20:39:27.301 [pool-1-thread-42] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【41】 20:39:27.305 [pool-1-thread-432] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【431】 20:39:27.285 [pool-1-thread-270] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【269】100毫秒后执行打印finash,之后的线程执行完后不再执行。2、Semaphore-信号量用于控制线程并发的个数。使用场景,提供仅限使用的资源。比如:有大量请求时,可用信号量控制数据库连接数,防止大量请求。示例一package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * @description: Semaphore-信号量,控制线程并发数 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:41 * @version: 1.0 */ @Slf4j @ThreadSafety public class SemaphoreDemo { private static final int threadNum = 20; private static final int currentThreadNum = 3; public static void main(String[] args) throws InterruptedException { //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 Semaphore semaphore = new Semaphore(currentThreadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ //获取一个许可 semaphore.acquire(); pringNum(num); //释放一个许可 semaphore.release(); }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); } }); } executorService.shutdown(); } public static void pringNum(int num) throws InterruptedException { log.info("num=【{}】", num); Thread.sleep(1000); } }输出结果:"C:\Program Files\jdk-11.0.10_windows-x64_bin\jdk-11.0.10\bin\java.exe" -Dvisualvm.id=83604261825100 "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2020.3.2\lib\idea_rt.jar=11416:C:\Program Files\JetBrains\IntelliJ IDEA 2020.3.2\bin" -Dfile.encoding=UTF-8 -classpath D:\Workspaces\ThreadConcurrency\demo-concurrency\target\classes;D:\Tools\repo\org\springframework\boot\spring-boot-starter-web\2.6.6\spring-boot-starter-web-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter\2.6.6\spring-boot-starter-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter-logging\2.6.6\spring-boot-starter-logging-2.6.6.jar;D:\Tools\repo\ch\qos\logback\logback-classic\1.2.11\logback-classic-1.2.11.jar;D:\Tools\repo\ch\qos\logback\logback-core\1.2.11\logback-core-1.2.11.jar;D:\Tools\repo\org\apache\logging\log4j\log4j-to-slf4j\2.17.2\log4j-to-slf4j-2.17.2.jar;D:\Tools\repo\org\apache\logging\log4j\log4j-api\2.17.2\log4j-api-2.17.2.jar;D:\Tools\repo\org\slf4j\jul-to-slf4j\1.7.36\jul-to-slf4j-1.7.36.jar;D:\Tools\repo\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\Tools\repo\org\yaml\snakeyaml\1.29\snakeyaml-1.29.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter-json\2.6.6\spring-boot-starter-json-2.6.6.jar;D:\Tools\repo\com\fasterxml\jackson\core\jackson-databind\2.13.2.2\jackson-databind-2.13.2.2.jar;D:\Tools\repo\com\fasterxml\jackson\core\jackson-annotations\2.13.2\jackson-annotations-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\core\jackson-core\2.13.2\jackson-core-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.13.2\jackson-datatype-jdk8-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.13.2\jackson-datatype-jsr310-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\module\jackson-module-parameter-names\2.13.2\jackson-module-parameter-names-2.13.2.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter-tomcat\2.6.6\spring-boot-starter-tomcat-2.6.6.jar;D:\Tools\repo\org\apache\tomcat\embed\tomcat-embed-core\9.0.60\tomcat-embed-core-9.0.60.jar;D:\Tools\repo\org\apache\tomcat\embed\tomcat-embed-el\9.0.60\tomcat-embed-el-9.0.60.jar;D:\Tools\repo\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.60\tomcat-embed-websocket-9.0.60.jar;D:\Tools\repo\org\springframework\spring-web\5.3.18\spring-web-5.3.18.jar;D:\Tools\repo\org\springframework\spring-beans\5.3.18\spring-beans-5.3.18.jar;D:\Tools\repo\org\springframework\spring-webmvc\5.3.18\spring-webmvc-5.3.18.jar;D:\Tools\repo\org\springframework\spring-aop\5.3.18\spring-aop-5.3.18.jar;D:\Tools\repo\org\springframework\spring-context\5.3.18\spring-context-5.3.18.jar;D:\Tools\repo\org\springframework\spring-expression\5.3.18\spring-expression-5.3.18.jar;D:\Tools\repo\org\springframework\boot\spring-boot-devtools\2.6.6\spring-boot-devtools-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot\2.6.6\spring-boot-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot-autoconfigure\2.6.6\spring-boot-autoconfigure-2.6.6.jar;D:\Tools\repo\org\projectlombok\lombok\1.18.22\lombok-1.18.22.jar;D:\Tools\repo\org\slf4j\slf4j-api\1.7.36\slf4j-api-1.7.36.jar;D:\Tools\repo\org\springframework\spring-core\5.3.18\spring-core-5.3.18.jar;D:\Tools\repo\org\springframework\spring-jcl\5.3.18\spring-jcl-5.3.18.jar;D:\Tools\repo\com\google\guava\guava\31.1-jre\guava-31.1-jre.jar;D:\Tools\repo\com\google\guava\failureaccess\1.0.1\failureaccess-1.0.1.jar;D:\Tools\repo\com\google\guava\listenablefuture\9999.0-empty-to-avoid-conflict-with-guava\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;D:\Tools\repo\com\google\code\findbugs\jsr305\3.0.2\jsr305-3.0.2.jar;D:\Tools\repo\org\checkerframework\checker-qual\3.12.0\checker-qual-3.12.0.jar;D:\Tools\repo\com\google\errorprone\error_prone_annotations\2.11.0\error_prone_annotations-2.11.0.jar;D:\Tools\repo\com\google\j2objc\j2objc-annotations\1.3\j2objc-annotations-1.3.jar;D:\Tools\repo\joda-time\joda-time\2.9.9\joda-time-2.9.9.jar com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo 20:53:22.696 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【1】 20:53:22.696 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【2】 20:53:22.696 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【0】 20:53:23.707 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【4】 20:53:23.707 [pool-1-thread-6] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【5】 20:53:23.707 [pool-1-thread-7] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【6】 20:53:24.711 [pool-1-thread-13] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【12】 20:53:24.711 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【7】 20:53:24.711 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【3】 20:53:25.726 [pool-1-thread-9] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【8】 20:53:25.726 [pool-1-thread-12] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【11】 20:53:25.726 [pool-1-thread-14] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【13】 20:53:26.738 [pool-1-thread-16] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【15】 20:53:26.738 [pool-1-thread-11] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【10】 20:53:26.738 [pool-1-thread-10] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【9】 20:53:27.741 [pool-1-thread-18] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【17】 20:53:27.741 [pool-1-thread-17] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【16】 20:53:27.741 [pool-1-thread-15] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【14】 20:53:28.741 [pool-1-thread-19] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【18】 20:53:28.741 [pool-1-thread-20] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【19】 Process finished with exit code 0通过结果可以看到,每秒只打印2个数字,就是信号量控制的。示例二同时获取多个许可,释放多个许可,也可以一个一个释放许可。下面代码示例,只有2个并发线程所有,获取后没有了,就类似单线程。package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: Semaphore-信号量,控制线程并发数,可以获取或释放多个许可 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:41 * @version: 1.0 */ @Slf4j @ThreadSafety public class SemaphoreDemo2 { private static final int threadNum = 20; private static final int currentThreadNum = 3; public static void main(String[] args) throws InterruptedException { //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 Semaphore semaphore = new Semaphore(currentThreadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ //获取3个许可,需要等3个都执行完了,才释放3个许可 semaphore.acquire(3); pringNum(num); //释放3个许可 semaphore.release(3); }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); } }); } executorService.shutdown(); } public static void pringNum(int num) throws InterruptedException { log.info("num=【{}】", num); Thread.sleep(1000); } }输出结果:20:56:17.733 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【0】 20:56:18.740 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【1】 20:56:19.754 [pool-1-thread-6] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【5】 20:56:20.755 [pool-1-thread-7] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【6】 20:56:21.767 [pool-1-thread-10] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【9】 20:56:22.780 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【7】 20:56:23.791 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【4】 20:56:24.800 [pool-1-thread-9] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【8】 20:56:25.801 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【3】 20:56:26.812 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【2】 20:56:27.824 [pool-1-thread-11] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【10】 20:56:28.834 [pool-1-thread-12] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【11】 20:56:29.847 [pool-1-thread-13] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【12】 20:56:30.850 [pool-1-thread-14] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【13】 20:56:31.853 [pool-1-thread-17] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【16】 20:56:32.863 [pool-1-thread-15] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【14】 20:56:33.873 [pool-1-thread-16] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【15】 20:56:34.885 [pool-1-thread-18] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【17】 20:56:35.885 [pool-1-thread-19] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【18】 20:56:36.897 [pool-1-thread-20] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【19】 Disconnected from the target VM, address: '127.0.0.1:13050', transport: 'socket'示例三当线程数超过并发数,就丢失多余的。package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: Semaphore-信号量,控制线程并发数,超过并发量时掉漆多余线程 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:41 * @version: 1.0 */ @Slf4j @ThreadSafety public class SemaphoreDemo3 { private static final int threadNum = 20; private static final int currentThreadNum = 3; public static void main(String[] args) throws InterruptedException { //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 Semaphore semaphore = new Semaphore(currentThreadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ //尝试获取许可,获取到才执行 if (semaphore.tryAcquire()) { pringNum(num); //释放一个许可 semaphore.release(); } }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); } }); } executorService.shutdown(); } public static void pringNum(int num) throws InterruptedException { log.info("num=【{}】", num); Thread.sleep(1000); } }输出结果:21:04:48.376 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo3 - num=【0】 21:04:49.387 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo3 - num=【1】 Disconnected from the target VM, address: '127.0.0.1:2839', transport: 'socket'只输出2个线程,其它18个线程被丢弃。tryAcquire方法重载说明://无参 public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } //1个参数,获取许可个数 public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } //2个参数,超时时间,单位 public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //3个参数,获取许可个数,超时时间,单位 public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }示例四设置超时时间package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * @description: Semaphore-信号量,控制线程并发数,带超时时间限制的 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:41 * @version: 1.0 */ @Slf4j @ThreadSafety public class SemaphoreDemo4 { private static final int threadNum = 50; private static final int currentThreadNum = 3; public static void main(String[] args) throws InterruptedException { //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 Semaphore semaphore = new Semaphore(currentThreadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ //尝试获取许可,获取到才执行,获取超过3秒,就不执行了。 if (semaphore.tryAcquire(3000, TimeUnit.MILLISECONDS)) { pringNum(num); //释放一个许可 semaphore.release(); } }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); } }); } executorService.shutdown(); } public static void pringNum(int num) throws InterruptedException { log.info("num=【{}】", num); Thread.sleep(1000); } }输出结果:22:01:36.804 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【0】 22:01:36.804 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【2】 22:01:36.804 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【1】 22:01:37.828 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【3】 22:01:37.828 [pool-1-thread-6] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【5】 22:01:37.828 [pool-1-thread-10] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【9】 22:01:38.843 [pool-1-thread-12] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【11】 22:01:38.843 [pool-1-thread-13] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【12】 22:01:38.843 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【7】 Disconnected from the target VM, address: '127.0.0.1:11385', transport: 'socket'3、CyclicBarrier允许一组线程相互等待,直到达到某个共同屏障点,直到每个线程都就绪后面的才继续往下执行。和CoutDownLatch相似都是通过计数器来实现的,图从下往上看的。线程释放后可以循环使用,因此也叫循环屏障。CoutDownLatch用户分计算,汇总最后的结果。示例一package com.yanxizhu.demo.concurrency.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @description: CyclicBarrier 实现线程相互等待 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 22:15 * @version: 1.0 */ @Slf4j public class CyclicBarrierDemo { //定义需要相互等待5个线程,就绪后才执行,后面的代码 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(4); public static void main(String[] args) throws Exception{ //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; //模式看效果,等待1秒 Thread.sleep(1000); executorService.execute(() -> { try { pringWait(threadNum); } catch (Exception e) { log.error("执行出错了。。。", e.getMessage()); } }); } executorService.shutdown(); } public static void pringWait(int threadNum) throws InterruptedException, BrokenBarrierException { //模式看效果,等待1秒 Thread.sleep(1000); log.info("等待中。。。{}", threadNum); //告诉线程每个就绪了,await方法支持传入超时时间 //可选参数long timeout, TimeUnit unit cyclicBarrier.await(); //就绪后才执行 log.info("开始执行了。。{}",threadNum); } }输出结果:Connected to the target VM, address: '127.0.0.1:14410', transport: 'socket' 22:26:04.855 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。0 22:26:05.853 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。1 22:26:06.866 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。2 22:26:07.881 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。3 22:26:07.881 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。3 22:26:07.882 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。1 22:26:07.882 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。2 22:26:07.881 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。0 22:26:08.896 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。4 22:26:09.899 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。5 22:26:10.901 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。6 22:26:11.902 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。7 22:26:11.902 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。7 22:26:11.902 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。4 22:26:11.902 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。6 22:26:11.902 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。5 22:26:12.903 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。8 22:26:13.916 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。9 Disconnected from the target VM, address: '127.0.0.1:14410', transport: 'socket'示例二带超时时间的,注意需要trycatch才能不影响后续流程执行。package com.yanxizhu.demo.concurrency.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * @description: CyclicBarrier 实现线程相互等待,带超时时间,注意捕获异常,才能继续执行后面的操作 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 22:15 * @version: 1.0 */ @Slf4j public class CyclicBarrierDemo2 { //定义需要相互等待5个线程,就绪后才执行,后面的代码 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(4); public static void main(String[] args) throws Exception{ //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; //模式看效果,等待1秒 Thread.sleep(1000); executorService.execute(() -> { try { pringWait(threadNum); } catch (Exception e) { log.error("执行出错了。。。", e.getMessage()); } }); } executorService.shutdown(); } public static void pringWait(int threadNum) throws InterruptedException, BrokenBarrierException { //模式看效果,等待1秒 Thread.sleep(1000); log.info("等待中。。。{}", threadNum); //告诉线程每个就绪了,带超时时间 try { cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e){ log.warn("超时了,注意:捕获异常,才能继续执行", e.getMessage()); } //就绪后才执行 log.info("开始执行了。。{}",threadNum); } }输出结果:22:37:57.786 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。0 22:37:58.797 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。1 22:37:59.797 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:37:59.797 [pool-1-thread-2] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:37:59.797 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。0 22:37:59.798 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。1 22:37:59.812 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。2 22:37:59.812 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:37:59.812 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。2 22:38:00.813 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。3 22:38:00.813 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:00.813 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。3 22:38:01.828 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。4 22:38:01.828 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:01.828 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。4 22:38:02.840 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。5 22:38:02.840 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:02.840 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。5 22:38:03.842 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。6 22:38:03.842 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:03.842 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。6 22:38:04.855 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。7 22:38:04.855 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:04.855 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。7 22:38:05.870 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。8 22:38:05.870 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:05.870 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。8 22:38:06.870 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。9 22:38:06.870 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:06.870 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。9 Disconnected from the target VM, address: '127.0.0.1:2195', transport: 'socket'示例三构造可以传入runable线程,优先执行。package com.yanxizhu.demo.concurrency.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @description: CyclicBarrier 实现线程相互等待,CyclicBarrier可以传入runable线程,优先执行该部分代码 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 22:15 * @version: 1.0 */ @Slf4j public class CyclicBarrierDemo3 { //定义需要相互等待5个线程,就绪后才执行,后面的代码 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(4, () -> { log.info("相互等待线程都就绪后,这部分代码先执行"); }); public static void main(String[] args) throws Exception{ //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; //模式看效果,等待1秒 Thread.sleep(1000); executorService.execute(() -> { try { pringWait(threadNum); } catch (Exception e) { log.error("执行出错了。。。", e.getMessage()); } }); } executorService.shutdown(); } public static void pringWait(int threadNum) throws InterruptedException, BrokenBarrierException { //模式看效果,等待1秒 Thread.sleep(1000); log.info("等待中。。。{}", threadNum); //告诉线程每个就绪了,await方法支持传入超时时间 //可选参数long timeout, TimeUnit unit cyclicBarrier.await(); //就绪后才执行 log.info("开始执行了。。{}",threadNum); } }输出结果:22:42:09.313 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。0 22:42:10.313 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。1 22:42:11.318 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。2 22:42:12.320 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。3 22:42:12.320 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 相互等待线程都就绪后,这部分代码先执行 22:42:12.320 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。3 22:42:12.320 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。2 22:42:12.320 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。1 22:42:12.320 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。0 22:42:13.325 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。4 22:42:14.339 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。5 22:42:15.353 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。6 22:42:16.355 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。7 22:42:16.355 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 相互等待线程都就绪后,这部分代码先执行 22:42:16.355 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。7 22:42:16.355 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。4 22:42:16.355 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。5 22:42:16.355 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。6 22:42:17.358 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。8 22:42:18.372 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。9总结:注意CoutDownLatch与CyclicBarrier的不同,对别使用。
2022年04月25日
166 阅读
0 评论
1 点赞