首页
关于
友链
Search
1
wlop 4K 壁纸 4k8k 动态 壁纸
839 阅读
2
Docker搭建Typecho博客
604 阅读
3
Nacos持久化MySQL问题-解决方案
525 阅读
4
keytool证书导入
438 阅读
5
SpringBoot整合SpringCache
430 阅读
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
开发工具
百度网盘资源
天翼网盘资源
阿里网盘资源
登录
Search
标签搜索
java
javase
docker
java8
springboot
thread
spring
分布式
mysql
锁
linux
redis
源码
typecho
centos
git
map
lambda
stream
nginx
少年
累计撰写
184
篇文章
累计收到
13
条评论
首页
栏目
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
开发工具
百度网盘资源
天翼网盘资源
阿里网盘资源
页面
关于
友链
搜索到
30
篇与
的结果
2022-04-26
ReentrantLock、StampedLock、ReentrantReadWriteLock、Condition
锁分为Synchronized和锁Lock。ReentrantLock与锁ReentrantLock(可重入锁)与Synchronized(同步锁)的区别:可重入性:区别不大,都是通过计数器,当计数器下降为0时,锁释放。锁的实现:Synchronized依赖JVM,ReentrantLock基于JDK实现。(类似Synchronized是操作系统实现,ReentrantLock是敲代码实现),ReentrantLock可通过看源码明白实现,Synchronized依赖JVM就看不到实现原理。性能的区别:优化之前Synchronized比ReentrantLock性能差很多,优化后Synchronized引入偏向锁、轻量级锁也就是自旋锁后,2者性能差不多了。差不多时,官方推荐Synchronized。功能的区别:Synchronized的使用方便简洁,隐式获取锁释放锁,ReentrantLock需要手动获取锁,释放锁,如果忘记容易出现死锁,所以最好再finally中释放锁。锁的颗粒度和灵活度很明显ReentrantLock优于Synchronized。ReentrantLock独有的功能:ReentrantLock可指定公平锁、非公平锁,而Synchronized只能时非公平锁。公平锁:先等待的先获取锁,反之亦然。提供了一个Condition类,可以分组唤醒需要唤醒的线程,Synchronized要么随机唤醒,要么全部唤醒。提供能够中断等待锁的线程的机制,lock.lockInterruptibly(),lockInterruptibly是一种自旋锁,通过循环来调用CAS来加锁,性能比较好,就是因为不会进入内核阻塞状态。使用场景:必须实现上面3个功能时,就必须用了。代码示例一ReentrantLock实现累加,注意与synchronized的不用用法package com.yanxizhu.demo.concurrency.lock; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @description: ReentrantLock锁实现计数,注意使用synchroized怎么实现的 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 23:09 * @version: 1.0 */ @Slf4j public class ReentrantLockDemo { //定义锁 private static Lock lock = new ReentrantLock(); //定义锁用户数 private static final int totalThread = 5000; //定义并发线程数 private static final int concurrentThread = 200; //定义总和 private static int count = 0; public static void main(String[] args) throws Exception{ //定义线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定义信号量 final Semaphore semaphore = new Semaphore(concurrentThread); //定义闭锁 final CountDownLatch countDownLatch = new CountDownLatch(totalThread); for (int i = 0; i < totalThread; i++) { executorService.execute(() -> { try { semaphore.acquire(); sum(); semaphore.release(); } catch (InterruptedException e) { log.error("错误信息:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("最后累加结果Count={}", count); } //通过ReentrantLock锁实现 public static void sum() { lock.lock(); count++; lock.unlock(); } //累加求和(synchronized锁实现) // public synchronized static void sum() { // count ++; // } }输出结果:23:21:08.549 [main] INFO com.yanxizhu.demo.concurrency.lock.ReentrantLockDemo - 最后累加结果Count=5000源码查看public ReentrantLock() { sync = new NonfairSync(); }默认传入的是不公平的锁。 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }可以传入ture或false决定是否是公平锁。需要注意的方法tryLok: public boolean tryLock() { return sync.nonfairTryAcquire(1); }本质是仅在调用的时候,锁定未被另一个线程持有的情况下,才锁定。参数同样可以传入超时时间和单位,保持线程时间多久可以锁定。提供了丰富的方法,可以参考官方文档,ReentrantReadWriteLock没有任何读写锁的时候,才可以取得写入锁。可实现悲观读取。读多写少时,可能会导致写进入饥饿。注意是悲观锁,当有读时,写可能一直执行不了,有写的时候,不能读。代码示例package com.yanxizhu.demo.concurrency.lock; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @description: ReentrantReadWriteLock 读锁、写锁的使用 * 注意: ReentrantReadWriteLock时悲观锁,读的时候不能写,写的时候不能读,互斥的 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 23:40 * @version: 1.0 */ public class ReentrantReadWriteLockDemo { //定义读写锁 private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); //获取读锁 private static final Lock readLock = lock.readLock(); //获取写锁 private static final Lock writeLock = lock.writeLock(); private static Map<String, Data> map = new TreeMap<>(); public class Data{ } //读map中的数据 public Data getValue(String key) { readLock.lock(); try{ return map.get(key); }catch (Exception e) { }finally { readLock.unlock(); } return null; } //读取map中所有的key public Set<String> getKeys() { readLock.lock(); try{ return map.keySet(); }catch (Exception e) { }finally { readLock.unlock(); } return null; } //向map中写入数据 public void writeDate(String key, Data value) { writeLock.lock(); try{ map.put(key, value); }catch (Exception e){ }finally { writeLock.unlock(); } } }StampedLockStampedLock控制锁,有3种模式。写、读、是由锁和版本控制,就是这里的。读分为悲观读和乐观读。官方代码示例package com.yanxizhu.demo.concurrency.lock; import java.util.concurrent.locks.StampedLock; public class StampedLockDemo { class Point { private double x, y; private final StampedLock sl = new StampedLock(); void move(double deltaX, double deltaY) { // an exclusively locked method long stamp = sl.writeLock(); try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } //下面看看乐观读锁案例 double distanceFromOrigin() { // A read-only method long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁 double currentX = x, currentY = y; //将两个字段读入本地局部变量 if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生? stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁 try { currentX = x; // 将两个字段读入本地局部变量 currentY = y; // 将两个字段读入本地局部变量 } finally { sl.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } //下面是悲观读锁案例 void moveIfAtOrigin(double newX, double newY) { // upgrade // Could instead start with optimistic, not read mode long stamp = sl.readLock(); try { while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合 long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁 if (ws != 0L) { //这是确认转为写锁是否成功 stamp = ws; //如果成功 替换票据 x = newX; //进行状态改变 y = newY; //进行状态改变 break; } else { //如果不能成功转换为写锁 sl.unlockRead(stamp); //我们显式释放读锁 stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试 } } } finally { sl.unlock(stamp); //释放读锁或写锁 } } } }示例代码二累加求和package com.yanxizhu.demo.concurrency.lock; import lombok.extern.slf4j.Slf4j; import javax.annotation.concurrent.ThreadSafe; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.StampedLock; @Slf4j @ThreadSafe public class StampedLockDemo2 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static int count = 0; private final static StampedLock lock = new StampedLock(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private static void add() { //这里加锁,返回值 long stamp = lock.writeLock(); try { count++; } finally { //释放锁,带上加锁的返回值 lock.unlock(stamp); } } }输出结果:5000,线程安全,并发性好。Condition代码示例package com.yanxizhu.demo.concurrency.lock; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * ReentrantLock中另一个队列,Condition的使用 */ @Slf4j public class ConditionDemo { public static void main(String[] args) { ReentrantLock reentrantLock = new ReentrantLock(); Condition condition = reentrantLock.newCondition(); new Thread(() -> { try { reentrantLock.lock(); log.info("wait signal"); // 1 condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("get signal"); // 4 reentrantLock.unlock(); }).start(); new Thread(() -> { reentrantLock.lock(); log.info("get lock"); // 2 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } condition.signalAll(); log.info("send signal ~ "); // 3 reentrantLock.unlock(); }).start(); } }
2022年04月26日
90 阅读
0 评论
3 点赞
2022-04-25
J.U.C之AQS
AQSJ.U.C之AQS,底层由线程队列实现。AbstractQueuedSynchronizer简称AQS,JUC的核心。使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架。利用了一个int类型表示状态。使用方法是继承。子类通过继承并通过实现它的方法管理其状态{acquire和release}的方法操纵状态。可以同时实现排它锁和共享锁模式(独占、共享)。AQS主要同步组件CountDownLatchSemaphoreCyclicBarrierReentrantLockConditionFutureTask依次介绍1、CountDownLatch-闭锁可以完成类似阻塞的功能。一个线程或多个线程等待,直到其它线程完成。使用场景,程序执行需要某个条件执行完后,才执行。示例一package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @description: CountDownLatch,实现线程阻塞,等待执行完再执行程序打印 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:24 * @version: 1.0 */ @Slf4j @ThreadSafety public class CountDownLatchDemo { private static final int threadNum = 500; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ pringNum(num); }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); }finally { countDownLatch.countDown(); } }); } countDownLatch.await(); executorService.shutdown(); log.info("finash .."); } public static void pringNum(int num) throws InterruptedException { Thread.sleep(100); log.info("num=【{}】", num); Thread.sleep(100); } }输出结果:。。。。。 20:36:34.598 [pool-1-thread-249] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【248】 20:36:34.608 [pool-1-thread-259] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【258】 20:36:34.597 [pool-1-thread-237] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【236】 20:36:34.597 [pool-1-thread-206] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【205】 20:36:34.608 [pool-1-thread-256] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【255】 20:36:34.608 [pool-1-thread-258] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【257】 20:36:34.597 [pool-1-thread-240] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【239】 20:36:34.597 [pool-1-thread-67] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【66】 20:36:34.608 [pool-1-thread-251] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【250】 20:36:34.608 [pool-1-thread-257] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【256】 20:36:34.608 [pool-1-thread-44] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【43】 20:36:34.598 [pool-1-thread-61] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【60】 20:36:34.598 [pool-1-thread-33] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【32】 20:36:34.597 [pool-1-thread-172] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【171】 20:36:34.608 [pool-1-thread-230] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【229】 20:36:34.608 [pool-1-thread-253] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【252】 20:36:34.598 [pool-1-thread-59] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【58】 20:36:34.598 [pool-1-thread-39] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【38】 20:36:34.608 [pool-1-thread-298] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【297】 20:36:34.596 [pool-1-thread-192] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【191】 20:36:34.597 [pool-1-thread-75] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【74】 20:36:34.608 [pool-1-thread-299] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【298】 20:36:34.597 [pool-1-thread-146] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【145】 20:36:34.608 [pool-1-thread-304] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【303】 20:36:34.598 [pool-1-thread-229] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【228】 20:36:34.597 [pool-1-thread-87] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【86】 20:36:34.597 [pool-1-thread-46] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【45】 20:36:34.764 [main] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - finash ..另一种用法,给定时间内未执行执行await方法。到时间等当前线程执行完成后就不执行了。示例一package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @description: CountDownLatch,设置等待时间 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:24 * @version: 1.0 */ @Slf4j @ThreadSafety public class CountDownLatchDemo2 { private static final int threadNum = 500; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ pringNum(num); }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); }finally { countDownLatch.countDown(); } }); } //设置了等待时间 countDownLatch.await(100, TimeUnit.MILLISECONDS); executorService.shutdown(); log.info("finash .."); } public static void pringNum(int num) throws InterruptedException { Thread.sleep(100); log.info("num=【{}】", num); } }输出结果20:39:27.285 [pool-1-thread-258] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【257】 20:39:27.284 [pool-1-thread-180] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【179】 20:39:27.287 [pool-1-thread-54] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【53】 20:39:27.283 [pool-1-thread-368] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【367】 20:39:27.285 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【7】 20:39:27.301 [main] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - finash .. 20:39:27.286 [pool-1-thread-204] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【203】 20:39:27.301 [pool-1-thread-42] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【41】 20:39:27.305 [pool-1-thread-432] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【431】 20:39:27.285 [pool-1-thread-270] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【269】100毫秒后执行打印finash,之后的线程执行完后不再执行。2、Semaphore-信号量用于控制线程并发的个数。使用场景,提供仅限使用的资源。比如:有大量请求时,可用信号量控制数据库连接数,防止大量请求。示例一package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * @description: Semaphore-信号量,控制线程并发数 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:41 * @version: 1.0 */ @Slf4j @ThreadSafety public class SemaphoreDemo { private static final int threadNum = 20; private static final int currentThreadNum = 3; public static void main(String[] args) throws InterruptedException { //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 Semaphore semaphore = new Semaphore(currentThreadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ //获取一个许可 semaphore.acquire(); pringNum(num); //释放一个许可 semaphore.release(); }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); } }); } executorService.shutdown(); } public static void pringNum(int num) throws InterruptedException { log.info("num=【{}】", num); Thread.sleep(1000); } }输出结果:"C:\Program Files\jdk-11.0.10_windows-x64_bin\jdk-11.0.10\bin\java.exe" -Dvisualvm.id=83604261825100 "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2020.3.2\lib\idea_rt.jar=11416:C:\Program Files\JetBrains\IntelliJ IDEA 2020.3.2\bin" -Dfile.encoding=UTF-8 -classpath D:\Workspaces\ThreadConcurrency\demo-concurrency\target\classes;D:\Tools\repo\org\springframework\boot\spring-boot-starter-web\2.6.6\spring-boot-starter-web-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter\2.6.6\spring-boot-starter-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter-logging\2.6.6\spring-boot-starter-logging-2.6.6.jar;D:\Tools\repo\ch\qos\logback\logback-classic\1.2.11\logback-classic-1.2.11.jar;D:\Tools\repo\ch\qos\logback\logback-core\1.2.11\logback-core-1.2.11.jar;D:\Tools\repo\org\apache\logging\log4j\log4j-to-slf4j\2.17.2\log4j-to-slf4j-2.17.2.jar;D:\Tools\repo\org\apache\logging\log4j\log4j-api\2.17.2\log4j-api-2.17.2.jar;D:\Tools\repo\org\slf4j\jul-to-slf4j\1.7.36\jul-to-slf4j-1.7.36.jar;D:\Tools\repo\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\Tools\repo\org\yaml\snakeyaml\1.29\snakeyaml-1.29.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter-json\2.6.6\spring-boot-starter-json-2.6.6.jar;D:\Tools\repo\com\fasterxml\jackson\core\jackson-databind\2.13.2.2\jackson-databind-2.13.2.2.jar;D:\Tools\repo\com\fasterxml\jackson\core\jackson-annotations\2.13.2\jackson-annotations-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\core\jackson-core\2.13.2\jackson-core-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.13.2\jackson-datatype-jdk8-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.13.2\jackson-datatype-jsr310-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\module\jackson-module-parameter-names\2.13.2\jackson-module-parameter-names-2.13.2.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter-tomcat\2.6.6\spring-boot-starter-tomcat-2.6.6.jar;D:\Tools\repo\org\apache\tomcat\embed\tomcat-embed-core\9.0.60\tomcat-embed-core-9.0.60.jar;D:\Tools\repo\org\apache\tomcat\embed\tomcat-embed-el\9.0.60\tomcat-embed-el-9.0.60.jar;D:\Tools\repo\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.60\tomcat-embed-websocket-9.0.60.jar;D:\Tools\repo\org\springframework\spring-web\5.3.18\spring-web-5.3.18.jar;D:\Tools\repo\org\springframework\spring-beans\5.3.18\spring-beans-5.3.18.jar;D:\Tools\repo\org\springframework\spring-webmvc\5.3.18\spring-webmvc-5.3.18.jar;D:\Tools\repo\org\springframework\spring-aop\5.3.18\spring-aop-5.3.18.jar;D:\Tools\repo\org\springframework\spring-context\5.3.18\spring-context-5.3.18.jar;D:\Tools\repo\org\springframework\spring-expression\5.3.18\spring-expression-5.3.18.jar;D:\Tools\repo\org\springframework\boot\spring-boot-devtools\2.6.6\spring-boot-devtools-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot\2.6.6\spring-boot-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot-autoconfigure\2.6.6\spring-boot-autoconfigure-2.6.6.jar;D:\Tools\repo\org\projectlombok\lombok\1.18.22\lombok-1.18.22.jar;D:\Tools\repo\org\slf4j\slf4j-api\1.7.36\slf4j-api-1.7.36.jar;D:\Tools\repo\org\springframework\spring-core\5.3.18\spring-core-5.3.18.jar;D:\Tools\repo\org\springframework\spring-jcl\5.3.18\spring-jcl-5.3.18.jar;D:\Tools\repo\com\google\guava\guava\31.1-jre\guava-31.1-jre.jar;D:\Tools\repo\com\google\guava\failureaccess\1.0.1\failureaccess-1.0.1.jar;D:\Tools\repo\com\google\guava\listenablefuture\9999.0-empty-to-avoid-conflict-with-guava\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;D:\Tools\repo\com\google\code\findbugs\jsr305\3.0.2\jsr305-3.0.2.jar;D:\Tools\repo\org\checkerframework\checker-qual\3.12.0\checker-qual-3.12.0.jar;D:\Tools\repo\com\google\errorprone\error_prone_annotations\2.11.0\error_prone_annotations-2.11.0.jar;D:\Tools\repo\com\google\j2objc\j2objc-annotations\1.3\j2objc-annotations-1.3.jar;D:\Tools\repo\joda-time\joda-time\2.9.9\joda-time-2.9.9.jar com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo 20:53:22.696 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【1】 20:53:22.696 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【2】 20:53:22.696 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【0】 20:53:23.707 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【4】 20:53:23.707 [pool-1-thread-6] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【5】 20:53:23.707 [pool-1-thread-7] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【6】 20:53:24.711 [pool-1-thread-13] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【12】 20:53:24.711 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【7】 20:53:24.711 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【3】 20:53:25.726 [pool-1-thread-9] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【8】 20:53:25.726 [pool-1-thread-12] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【11】 20:53:25.726 [pool-1-thread-14] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【13】 20:53:26.738 [pool-1-thread-16] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【15】 20:53:26.738 [pool-1-thread-11] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【10】 20:53:26.738 [pool-1-thread-10] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【9】 20:53:27.741 [pool-1-thread-18] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【17】 20:53:27.741 [pool-1-thread-17] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【16】 20:53:27.741 [pool-1-thread-15] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【14】 20:53:28.741 [pool-1-thread-19] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【18】 20:53:28.741 [pool-1-thread-20] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【19】 Process finished with exit code 0通过结果可以看到,每秒只打印2个数字,就是信号量控制的。示例二同时获取多个许可,释放多个许可,也可以一个一个释放许可。下面代码示例,只有2个并发线程所有,获取后没有了,就类似单线程。package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: Semaphore-信号量,控制线程并发数,可以获取或释放多个许可 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:41 * @version: 1.0 */ @Slf4j @ThreadSafety public class SemaphoreDemo2 { private static final int threadNum = 20; private static final int currentThreadNum = 3; public static void main(String[] args) throws InterruptedException { //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 Semaphore semaphore = new Semaphore(currentThreadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ //获取3个许可,需要等3个都执行完了,才释放3个许可 semaphore.acquire(3); pringNum(num); //释放3个许可 semaphore.release(3); }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); } }); } executorService.shutdown(); } public static void pringNum(int num) throws InterruptedException { log.info("num=【{}】", num); Thread.sleep(1000); } }输出结果:20:56:17.733 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【0】 20:56:18.740 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【1】 20:56:19.754 [pool-1-thread-6] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【5】 20:56:20.755 [pool-1-thread-7] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【6】 20:56:21.767 [pool-1-thread-10] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【9】 20:56:22.780 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【7】 20:56:23.791 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【4】 20:56:24.800 [pool-1-thread-9] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【8】 20:56:25.801 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【3】 20:56:26.812 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【2】 20:56:27.824 [pool-1-thread-11] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【10】 20:56:28.834 [pool-1-thread-12] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【11】 20:56:29.847 [pool-1-thread-13] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【12】 20:56:30.850 [pool-1-thread-14] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【13】 20:56:31.853 [pool-1-thread-17] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【16】 20:56:32.863 [pool-1-thread-15] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【14】 20:56:33.873 [pool-1-thread-16] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【15】 20:56:34.885 [pool-1-thread-18] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【17】 20:56:35.885 [pool-1-thread-19] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【18】 20:56:36.897 [pool-1-thread-20] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【19】 Disconnected from the target VM, address: '127.0.0.1:13050', transport: 'socket'示例三当线程数超过并发数,就丢失多余的。package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: Semaphore-信号量,控制线程并发数,超过并发量时掉漆多余线程 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:41 * @version: 1.0 */ @Slf4j @ThreadSafety public class SemaphoreDemo3 { private static final int threadNum = 20; private static final int currentThreadNum = 3; public static void main(String[] args) throws InterruptedException { //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 Semaphore semaphore = new Semaphore(currentThreadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ //尝试获取许可,获取到才执行 if (semaphore.tryAcquire()) { pringNum(num); //释放一个许可 semaphore.release(); } }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); } }); } executorService.shutdown(); } public static void pringNum(int num) throws InterruptedException { log.info("num=【{}】", num); Thread.sleep(1000); } }输出结果:21:04:48.376 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo3 - num=【0】 21:04:49.387 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo3 - num=【1】 Disconnected from the target VM, address: '127.0.0.1:2839', transport: 'socket'只输出2个线程,其它18个线程被丢弃。tryAcquire方法重载说明://无参 public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } //1个参数,获取许可个数 public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } //2个参数,超时时间,单位 public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //3个参数,获取许可个数,超时时间,单位 public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }示例四设置超时时间package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * @description: Semaphore-信号量,控制线程并发数,带超时时间限制的 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:41 * @version: 1.0 */ @Slf4j @ThreadSafety public class SemaphoreDemo4 { private static final int threadNum = 50; private static final int currentThreadNum = 3; public static void main(String[] args) throws InterruptedException { //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 Semaphore semaphore = new Semaphore(currentThreadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ //尝试获取许可,获取到才执行,获取超过3秒,就不执行了。 if (semaphore.tryAcquire(3000, TimeUnit.MILLISECONDS)) { pringNum(num); //释放一个许可 semaphore.release(); } }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); } }); } executorService.shutdown(); } public static void pringNum(int num) throws InterruptedException { log.info("num=【{}】", num); Thread.sleep(1000); } }输出结果:22:01:36.804 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【0】 22:01:36.804 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【2】 22:01:36.804 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【1】 22:01:37.828 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【3】 22:01:37.828 [pool-1-thread-6] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【5】 22:01:37.828 [pool-1-thread-10] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【9】 22:01:38.843 [pool-1-thread-12] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【11】 22:01:38.843 [pool-1-thread-13] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【12】 22:01:38.843 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【7】 Disconnected from the target VM, address: '127.0.0.1:11385', transport: 'socket'3、CyclicBarrier允许一组线程相互等待,直到达到某个共同屏障点,直到每个线程都就绪后面的才继续往下执行。和CoutDownLatch相似都是通过计数器来实现的,图从下往上看的。线程释放后可以循环使用,因此也叫循环屏障。CoutDownLatch用户分计算,汇总最后的结果。示例一package com.yanxizhu.demo.concurrency.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @description: CyclicBarrier 实现线程相互等待 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 22:15 * @version: 1.0 */ @Slf4j public class CyclicBarrierDemo { //定义需要相互等待5个线程,就绪后才执行,后面的代码 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(4); public static void main(String[] args) throws Exception{ //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; //模式看效果,等待1秒 Thread.sleep(1000); executorService.execute(() -> { try { pringWait(threadNum); } catch (Exception e) { log.error("执行出错了。。。", e.getMessage()); } }); } executorService.shutdown(); } public static void pringWait(int threadNum) throws InterruptedException, BrokenBarrierException { //模式看效果,等待1秒 Thread.sleep(1000); log.info("等待中。。。{}", threadNum); //告诉线程每个就绪了,await方法支持传入超时时间 //可选参数long timeout, TimeUnit unit cyclicBarrier.await(); //就绪后才执行 log.info("开始执行了。。{}",threadNum); } }输出结果:Connected to the target VM, address: '127.0.0.1:14410', transport: 'socket' 22:26:04.855 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。0 22:26:05.853 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。1 22:26:06.866 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。2 22:26:07.881 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。3 22:26:07.881 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。3 22:26:07.882 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。1 22:26:07.882 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。2 22:26:07.881 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。0 22:26:08.896 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。4 22:26:09.899 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。5 22:26:10.901 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。6 22:26:11.902 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。7 22:26:11.902 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。7 22:26:11.902 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。4 22:26:11.902 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。6 22:26:11.902 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。5 22:26:12.903 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。8 22:26:13.916 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。9 Disconnected from the target VM, address: '127.0.0.1:14410', transport: 'socket'示例二带超时时间的,注意需要trycatch才能不影响后续流程执行。package com.yanxizhu.demo.concurrency.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * @description: CyclicBarrier 实现线程相互等待,带超时时间,注意捕获异常,才能继续执行后面的操作 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 22:15 * @version: 1.0 */ @Slf4j public class CyclicBarrierDemo2 { //定义需要相互等待5个线程,就绪后才执行,后面的代码 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(4); public static void main(String[] args) throws Exception{ //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; //模式看效果,等待1秒 Thread.sleep(1000); executorService.execute(() -> { try { pringWait(threadNum); } catch (Exception e) { log.error("执行出错了。。。", e.getMessage()); } }); } executorService.shutdown(); } public static void pringWait(int threadNum) throws InterruptedException, BrokenBarrierException { //模式看效果,等待1秒 Thread.sleep(1000); log.info("等待中。。。{}", threadNum); //告诉线程每个就绪了,带超时时间 try { cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e){ log.warn("超时了,注意:捕获异常,才能继续执行", e.getMessage()); } //就绪后才执行 log.info("开始执行了。。{}",threadNum); } }输出结果:22:37:57.786 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。0 22:37:58.797 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。1 22:37:59.797 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:37:59.797 [pool-1-thread-2] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:37:59.797 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。0 22:37:59.798 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。1 22:37:59.812 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。2 22:37:59.812 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:37:59.812 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。2 22:38:00.813 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。3 22:38:00.813 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:00.813 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。3 22:38:01.828 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。4 22:38:01.828 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:01.828 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。4 22:38:02.840 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。5 22:38:02.840 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:02.840 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。5 22:38:03.842 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。6 22:38:03.842 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:03.842 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。6 22:38:04.855 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。7 22:38:04.855 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:04.855 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。7 22:38:05.870 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。8 22:38:05.870 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:05.870 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。8 22:38:06.870 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。9 22:38:06.870 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:06.870 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。9 Disconnected from the target VM, address: '127.0.0.1:2195', transport: 'socket'示例三构造可以传入runable线程,优先执行。package com.yanxizhu.demo.concurrency.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @description: CyclicBarrier 实现线程相互等待,CyclicBarrier可以传入runable线程,优先执行该部分代码 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 22:15 * @version: 1.0 */ @Slf4j public class CyclicBarrierDemo3 { //定义需要相互等待5个线程,就绪后才执行,后面的代码 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(4, () -> { log.info("相互等待线程都就绪后,这部分代码先执行"); }); public static void main(String[] args) throws Exception{ //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; //模式看效果,等待1秒 Thread.sleep(1000); executorService.execute(() -> { try { pringWait(threadNum); } catch (Exception e) { log.error("执行出错了。。。", e.getMessage()); } }); } executorService.shutdown(); } public static void pringWait(int threadNum) throws InterruptedException, BrokenBarrierException { //模式看效果,等待1秒 Thread.sleep(1000); log.info("等待中。。。{}", threadNum); //告诉线程每个就绪了,await方法支持传入超时时间 //可选参数long timeout, TimeUnit unit cyclicBarrier.await(); //就绪后才执行 log.info("开始执行了。。{}",threadNum); } }输出结果:22:42:09.313 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。0 22:42:10.313 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。1 22:42:11.318 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。2 22:42:12.320 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。3 22:42:12.320 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 相互等待线程都就绪后,这部分代码先执行 22:42:12.320 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。3 22:42:12.320 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。2 22:42:12.320 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。1 22:42:12.320 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。0 22:42:13.325 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。4 22:42:14.339 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。5 22:42:15.353 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。6 22:42:16.355 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。7 22:42:16.355 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 相互等待线程都就绪后,这部分代码先执行 22:42:16.355 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。7 22:42:16.355 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。4 22:42:16.355 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。5 22:42:16.355 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。6 22:42:17.358 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。8 22:42:18.372 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。9总结:注意CoutDownLatch与CyclicBarrier的不同,对别使用。
2022年04月25日
100 阅读
0 评论
1 点赞
2022-04-24
线程安全-并发容器J.U.C
线程安全-并发容器J.U.CArrayList -> CopyOnWriteArrayListHashSet、TreeSet -> CopyOnWriteArraySet 、ConcurrentSkipListSetHashMap、TreeMap -> ConcurrentHashMap 、ConcurrentSkipListMapCopyOnWriteArrayList适合读多写少的场景。读的时候再原数组读不需要加锁,写的时候会copy一份会单独加锁。代码示例:package com.yanxizhu.demo.concurrency.concurrent; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** * @description: 线程安全容器:CopyOnWriteArrayList * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:52 * @version: 1.0 */ @Slf4j @ThreadSafety public class DemoCopyOnWriteArrayList { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static List<Integer> list = new CopyOnWriteArrayList<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", list.size()); } private static void update(int i) { list.add(i); } }输出结果:5000CopyOnWriteArraySet线程安全,代码示例package com.yanxizhu.demo.concurrency.concurrent; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.Set; import java.util.concurrent.*; /** * @description: 线程安全容器:CopyOnWriteArraySet * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:52 * @version: 1.0 */ @Slf4j @ThreadSafety public class DemoCopyOnWriteArraySet { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static Set<Integer> set = new CopyOnWriteArraySet<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("set size:{}", set.size()); } private static void update(int i) { set.add(i); } }输出结果:set size:5000,线程安全ConcurrentSkipListSet线程安全,代码示例package com.yanxizhu.demo.concurrency.concurrent; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.Set; import java.util.concurrent.*; /** * @description: 线程安全容器:ConcurrentSkipListSet * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:52 * @version: 1.0 */ @Slf4j @ThreadSafety public class DemoConcurrentSkipListSet { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static Set<Integer> set = new ConcurrentSkipListSet<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("set size:{}", set.size()); } private static void update(int i) { set.add(i); } }输出结果:5000,线程安全。注意:单纯的add、remove是线程安全的,contains等是不安全的。ConcurrentHashMap线程安全,适合并发量大场景package com.yanxizhu.demo.concurrency.concurrent; import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.HashMap; import java.util.Map; import java.util.concurrent.*; /** * @description: 线程安全容器:ConcurrentHashMap适合高并发场景 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:52 * @version: 1.0 */ @Slf4j @UnThreadSafety public class DemoConcurrentHashMap { private static Map<Integer, Integer> map = new ConcurrentHashMap<>(); //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(concurrencyTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal); for (int i = 0; i < clientsTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("map size:{}", map.size()); } /** * 通过线程安全对象dateTimeFormatter处理 */ private static void update(int count){ map.put(count, count); } }输出结果:5000,线程安全ConcurrentSkipListMap线程安全,随着并发量增加性能表现越强,大并发下使用ConcurrentHashMap,并发一直增加推荐使用ConcurrentSkipListMap。package com.yanxizhu.demo.concurrency.concurrent; import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.concurrent.*; /** * @description: 线程安全容器:ConcurrentSkipListMap,随着并发的增加性能表现的越好 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:52 * @version: 1.0 */ @Slf4j @UnThreadSafety public class DemoConcurrentHashMap1 { private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>(); //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(concurrencyTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal); for (int i = 0; i < clientsTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("map size:{}", map.size()); } /** * 通过线程安全对象dateTimeFormatter处理 */ private static void update(int count){ map.put(count, count); } }输出结果:5000,线程安全。注意:ConcurrentHashMap与ConcurrentSkipListMap的使用场景。小总结:线程限制:一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改共享只读:一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访问,但是任何线程都不能修改它。线程安全对象:一个线程安全的对象或者容器,在内部通过同步机制来保证线程安全,所以其他线程无需额外的同步就可以通过公共接口随意访问它被守护对象:被守护对象只能通过获取特定的锁来访问
2022年04月24日
108 阅读
0 评论
3 点赞
2022-04-24
线程安全-同步容器
线程安全-同步容器主要包括ArrayList -> Vector,Stack HashMap->HashTable(key、value不能为null)Collections.synchronizedXXX(List、Set、Map)都是使用Synchronized进行修饰的,性能不是特别好。可以使用并发容器代替。注意:同步容器也可能是线程步安全的。同步容器线程步安全,代码示例:package com.yanxizhu.demo.concurrency.synContainer; import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety; import java.util.Vector; /** * @description: 同步容器,也可能出现线程不安全情况 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 10:32 * @version: 1.0 */ @UnThreadSafety public class DemoVecotNo { private static final Vector<Integer> vector = new Vector<>(); public static void main(String[] args) { //一直循环 while(true) { //向vector容器放入值 for(int i=0; i <10; i++){ vector.add(i); } //线程1,向vector容器移除值 new Thread(()->{ for(int i=0; i <vector.size(); i++){ vector.remove(i); } }).start(); //线程2,向vector中获取值 new Thread(()->{ for(int i=0; i <vector.size(); i++){ vector.get(i); } }).start(); } } }输出结果:Exception in thread "Thread-478" Exception in thread "Thread-1724" Exception in thread "Thread-1918" Exception in thread "Thread-1769" java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 10 at java.base/java.util.Vector.remove(Vector.java:875) at com.yanxizhu.demo.concurrency.synContainer.DemoVecotNo.lambda$main$0(DemoVecotNo.java:27) at java.base/java.lang.Thread.run(Thread.java:834) java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 62 at java.base/java.util.Vector.remove(Vector.java:875) at com.yanxizhu.demo.concurrency.synContainer.DemoVecotNo.lambda$main$0(DemoVecotNo.java:27) at java.base/java.lang.Thread.run(Thread.java:834) java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 2 at java.base/java.util.Vector.get(Vector.java:781) at com.yanxizhu.demo.concurrency.synContainer.DemoVecotNo.lambda$main$1(DemoVecotNo.java:34) at java.base/java.lang.Thread.run(Thread.java:834) java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 10 at java.base/java.util.Vector.remove(Vector.java:875) at com.yanxizhu.demo.concurrency.synContainer.DemoVecotNo.lambda$main$0(DemoVecotNo.java:27) at java.base/java.lang.Thread.run(Thread.java:834) Exception in thread "Thread-4507" java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 12 at java.base/java.util.Vector.get(Vector.java:781) at com.yanxizhu.demo.concurrency.synContainer.DemoVecotNo.lambda$main$1(DemoVecotNo.java:34) at java.base/java.lang.Thread.run(Thread.java:834)说明:add、remove、get防范都是通过synchronized修饰了的,为什么还是会出现线程不安全,因为当remove和get运行时,如果i相等,remove移除i时,get再获取i就包错了。Vector线程安全代码示例package com.yanxizhu.demo.concurrency.synContainer; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: 线程安全容器:Vector * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 10:18 * @version: 1.0 */ @Slf4j @ThreadSafety public class DeomVecotr { private static Vector<Integer> vector = new Vector<>(); //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(concurrencyTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal); for (int i = 0; i < clientsTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("vector size:{}", vector.size()); } /** * 通过线程安全对象dateTimeFormatter处理 */ private static void update(int count){ vector.add(count); } }输出结果:每次是输出5000Hashtable线程安全,代码示例:package com.yanxizhu.demo.concurrency.synContainer; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.HashMap; import java.util.Hashtable; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: 线程安全容器:Hashtable * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:52 * @version: 1.0 */ @Slf4j @ThreadSafety public class DemoHashTable { private static Map<Integer, Integer> map = new Hashtable<>(); //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(concurrencyTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal); for (int i = 0; i < clientsTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("map size:{}", map.size()); } private static void update(int count){ map.put(count, count); } }输出结果:每次都是5000.线程安全。Collections.synchronizedList线程安全,同步容器package com.yanxizhu.demo.concurrency.synContainer; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: 安全容器,Collections下的同步容器,线程安全 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 10:47 * @version: 1.0 */ @Slf4j @ThreadSafety public class DemoCollections { private static List<Integer> list = Collections.synchronizedList(new ArrayList<>()); //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(concurrencyTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal); for (int i = 0; i < clientsTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("list size:{}", list.size()); } /** * 通过线程安全对象dateTimeFormatter处理 */ private static void update(int count){ list.add(count); } }输出结果:每次输出5000,线程安全。Collections.synchronizedSet线程安全,同步容器,代码示例package com.yanxizhu.demo.concurrency.synContainer; import com.google.common.collect.Sets; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.Collections; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: 安全容器,Collections下的同步容器,synchronizedSet,线程安全 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 10:47 * @version: 1.0 */ @Slf4j @ThreadSafety public class DemoCollectionsSynSet { private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet()); //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(concurrencyTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal); for (int i = 0; i < clientsTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("set size:{}", set.size()); } /** * 通过线程安全对象dateTimeFormatter处理 */ private static void update(int count){ set.add(count); } }输出结果:每次输出5000,线程安全Collections.synchronizedMap线程安全同步容器,代码示例:package com.yanxizhu.demo.concurrency.synContainer; import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: 安全容器,Collections.synchronizedMap,线程安全 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:52 * @version: 1.0 */ @Slf4j @UnThreadSafety public class DemoCollectionsSynHashMap { private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap()); //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(concurrencyTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal); for (int i = 0; i < clientsTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("map size:{}", map.size()); } private static void update(int count){ map.put(count, count); } }输出结果:每次输出5000,线程安全。常用操作错误package com.yanxizhu.demo.concurrency.synContainer; import java.util.Iterator; import java.util.Vector; /** * @description: 常见容器使用错误,解决办法 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 11:04 * @version: 1.0 */ public class DemoErrorUser { //增强for循环中修改,报错:java.util.ConcurrentModificationException public static void test1(Vector<Integer> vector) { for (Integer integer: vector) { //使用增强for循环 if (integer.equals(3)) { // vector.remove(integer); vector.add(4); } } } //迭代器中循环修改,报错:java.util.ConcurrentModificationException public static void test2(Vector<Integer> vector) { Iterator<Integer> iterator = vector.iterator(); while (iterator.hasNext()){ Integer integer = iterator.next(); if( integer.equals(3)) { vector.remove(integer); } } } //普通for循环中修改,正常。 public static void test3(Vector<Integer> vector) { for(int i=0; i< vector.size(); i++) { if(i==3) { vector.remove(i); } } } //总结:增强for、迭代器循环中修改,可以先标记,然后最后进行修改。 public static void main(String[] args) { Vector<Integer> vector = new Vector<>(); vector.add(1); vector.add(2); vector.add(3); test1(vector); // test2(vector); // test3(vector); } }输出结果:ConcurrentModificationExceptionException in thread "main" java.util.ConcurrentModificationException at java.base/java.util.Vector$Itr.checkForComodification(Vector.java:1321) at java.base/java.util.Vector$Itr.next(Vector.java:1277) at com.yanxizhu.demo.concurrency.synContainer.DemoErrorUser.test1(DemoErrorUser.java:16) at com.yanxizhu.demo.concurrency.synContainer.DemoErrorUser.main(DemoErrorUser.java:52)增强for、迭代器中循环修改线程报错,正常for循环,正常,解决办法,通过标记后,再单独进行移除操作。
2022年04月24日
111 阅读
0 评论
2 点赞
2022-04-24
线程不安全的类与写法
线程不安全的类与写法线程不安全的类:类的对象可以同时被多个线程访问,比如抛出异常,逻辑错误。下面介绍一些常见的类。StringBuilderStringBuilder线程不安全的类代码示例package com.yanxizhu.demo.concurrency.threadUnSafetyClass; import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: 线程不安全的类:StringBuilder * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:01 * @version: 1.0 */ @Slf4j @UnThreadSafety public class DemoStringBuilder { //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; //累加总和 private static StringBuilder stringBuilder = new StringBuilder(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(concurrencyTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal); for (int i = 0; i < clientsTotal; i++) { executorService.execute(()->{ try { semaphore.acquire(); update(); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("字符长度Length=【{}】",stringBuilder.length()); } /** * 累加 */ private static void update(){ stringBuilder.append(1); } }每次输出字符长度不一样,也不等于5000StringBuffer线程安全的类代码示例package com.yanxizhu.demo.concurrency.threadUnSafetyClass; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: 线程安全的类:StringBuffer * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:06 * @version: 1.0 */ @Slf4j @ThreadSafety public class DemoStringBuffer { //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; //累加总和 private static StringBuffer stringBuffer = new StringBuffer(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(concurrencyTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal); for (int i = 0; i < clientsTotal; i++) { executorService.execute(()->{ try { semaphore.acquire(); update(); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("字符长度Length=【{}】", stringBuffer.length()); } /** * 累加 */ private static void update(){ stringBuffer.append(1); } }输出结果:每次都是长度为5000.StringBuffer为什么线程安全,看源码 @Override @HotSpotIntrinsicCandidate public synchronized StringBuffer append(int i) { toStringCache = null; super.append(i); return this; }因为StringBuffer所有的方法操作都加了synchronized。区别StringBuffer由于加了synchronized,性能上有损耗,没有StringBuilder性能高,在线程封闭中,堆栈封闭时,一般处理字符串局部变量时用StringBuilfer就行了。SimpleDateFormat线程不安全的类代码示例package com.yanxizhu.demo.concurrency.threadUnSafetyClass; import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety; import lombok.extern.slf4j.Slf4j; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: 线程不安全类: SimpleDateFormat * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:18 * @version: 1.0 */ @Slf4j @UnThreadSafety public class DemoSimpleDateFormat { private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyyMMMdd"); //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(concurrencyTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal); for (int i = 0; i < clientsTotal; i++) { executorService.execute(()->{ try { semaphore.acquire(); update(); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); } /** * 累加 */ private static void update(){ try { simpleDateFormat.parse("20220424"); } catch (ParseException e) { log.error(e.getMessage()); } } }输出结果: at java.base/java.lang.Double.parseDouble(Double.java:543) at java.base/java.text.DigitList.getDouble(DigitList.java:169) at java.base/java.text.DecimalFormat.parse(DecimalFormat.java:2126) at java.base/java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1933) at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1541) at java.base/java.text.DateFormat.parse(DateFormat.java:393) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.update(DemoSimpleDateFormat.java:57) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.lambda$main$0(DemoSimpleDateFormat.java:40) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Exception in thread "pool-1-thread-824" java.lang.NumberFormatException: For input string: "E.0220E0" at java.base/jdk.internal.math.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2054) at java.base/jdk.internal.math.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.base/java.lang.Double.parseDouble(Double.java:543) at java.base/java.text.DigitList.getDouble(DigitList.java:169) at java.base/java.text.DecimalFormat.parse(DecimalFormat.java:2126) at java.base/java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1933) at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1541) at java.base/java.text.DateFormat.parse(DateFormat.java:393) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.update(DemoSimpleDateFormat.java:57) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.lambda$main$0(DemoSimpleDateFormat.java:40) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Exception in thread "pool-1-thread-717" java.lang.NumberFormatException: multiple points at java.base/jdk.internal.math.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1890) at java.base/jdk.internal.math.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.base/java.lang.Double.parseDouble(Double.java:543) at java.base/java.text.DigitList.getDouble(DigitList.java:169) at java.base/java.text.DecimalFormat.parse(DecimalFormat.java:2126) at java.base/java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1933) at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1541) at java.base/java.text.DateFormat.parse(DateFormat.java:393) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.update(DemoSimpleDateFormat.java:57) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.lambda$main$0(DemoSimpleDateFormat.java:40) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Exception in thread "pool-1-thread-671" java.lang.NumberFormatException: For input string: "" at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.base/java.lang.Long.parseLong(Long.java:702) at java.base/java.lang.Long.parseLong(Long.java:817) at java.base/java.text.DigitList.getLong(DigitList.java:195) at java.base/java.text.DecimalFormat.parse(DecimalFormat.java:2121) at java.base/java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1933) at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1541) at java.base/java.text.DateFormat.parse(DateFormat.java:393) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.update(DemoSimpleDateFormat.java:57) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.lambda$main$0(DemoSimpleDateFormat.java:40) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Exception in thread "pool-1-thread-805" java.lang.NumberFormatException: multiple points at java.base/jdk.internal.math.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1914) at java.base/jdk.internal.math.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.base/java.lang.Double.parseDouble(Double.java:543) at java.base/java.text.DigitList.getDouble(DigitList.java:169) at java.base/java.text.DecimalFormat.parse(DecimalFormat.java:2126) at java.base/java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1933) at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1541) at java.base/java.text.DateFormat.parse(DateFormat.java:393) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.update(DemoSimpleDateFormat.java:57) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.lambda$main$0(DemoSimpleDateFormat.java:40) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Exception in thread "pool-1-thread-736" java.lang.NumberFormatException: For input string: ".20220424" at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.base/java.lang.Long.parseLong(Long.java:678) at java.base/java.lang.Long.parseLong(Long.java:817) at java.base/java.text.DigitList.getLong(DigitList.java:195) at java.base/java.text.DecimalFormat.parse(DecimalFormat.java:2121) at java.base/java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1933) at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1541) at java.base/java.text.DateFormat.parse(DateFormat.java:393) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.update(DemoSimpleDateFormat.java:57) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.lambda$main$0(DemoSimpleDateFormat.java:40) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Exception in thread "pool-1-thread-740" java.lang.NumberFormatException: multiple points at java.base/jdk.internal.math.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1914) at java.base/jdk.internal.math.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.base/java.lang.Double.parseDouble(Double.java:543) at java.base/java.text.DigitList.getDouble(DigitList.java:169) at java.base/java.text.DecimalFormat.parse(DecimalFormat.java:2126) at java.base/java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1933) at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1541) at java.base/java.text.DateFormat.parse(DateFormat.java:393) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.update(DemoSimpleDateFormat.java:57) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.lambda$main$0(DemoSimpleDateFormat.java:40) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Exception in thread "pool-1-thread-695" java.lang.NumberFormatException: For input string: "E.0220" at java.base/jdk.internal.math.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2054) at java.base/jdk.internal.math.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.base/java.lang.Double.parseDouble(Double.java:543) at java.base/java.text.DigitList.getDouble(DigitList.java:169) at java.base/java.text.DecimalFormat.parse(DecimalFormat.java:2126) at java.base/java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1933) at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1541) at java.base/java.text.DateFormat.parse(DateFormat.java:393) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.update(DemoSimpleDateFormat.java:57) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.lambda$main$0(DemoSimpleDateFormat.java:40) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Exception in thread "pool-1-thread-888" java.lang.NumberFormatException: For input string: "202204242022042420220424" at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.base/java.lang.Long.parseLong(Long.java:692) at java.base/java.lang.Long.parseLong(Long.java:817) at java.base/java.text.DigitList.getLong(DigitList.java:195) at java.base/java.text.DecimalFormat.parse(DecimalFormat.java:2121) at java.base/java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1933) at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1541) at java.base/java.text.DateFormat.parse(DateFormat.java:393) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.update(DemoSimpleDateFormat.java:57) at com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormat.lambda$main$0(DemoSimpleDateFormat.java:40) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)虽然SimpleDateFormat是线程不安全的类,但是可以通过线程封闭中,堆栈封闭(栈封闭)方法,将SimpleDateFormat定义到局部变量中,每次是一个新对象,这样来保证线程安全。package com.yanxizhu.demo.concurrency.threadUnSafetyClass; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety; import lombok.extern.slf4j.Slf4j; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: 线程不安全类: SimpleDateFormat,通过堆栈封闭,实现线程安全。 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:18 * @version: 1.0 */ @Slf4j @ThreadSafety public class DemoSimpleDateFormatOk { //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(concurrencyTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal); for (int i = 0; i < clientsTotal; i++) { executorService.execute(()->{ try { semaphore.acquire(); update(); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); } /** * 通过线程封闭,将SimpleDateFormat定义在方法内,每次都是一个新的对象,来保证线程安全 */ private static void update(){ try { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyyMMMdd"); simpleDateFormat.parse("20220424"); } catch (ParseException e) { log.error(e.getMessage()); } } }输出结果:输出5000条,一样的。09:27:17.633 [pool-1-thread-461] ERROR com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormatOk - Unparseable date: "20220424" 09:27:17.633 [pool-1-thread-460] ERROR com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormatOk - Unparseable date: "20220424" 09:27:17.634 [pool-1-thread-452] ERROR com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormatOk - Unparseable date: "20220424" 09:27:17.634 [pool-1-thread-445] ERROR com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormatOk - Unparseable date: "20220424" 09:27:17.634 [pool-1-thread-453] ERROR com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoSimpleDateFormatOk - Unparseable date: "20220424" 。。。。。joda.time第三方包中DateTimeFormatter代码示例package com.yanxizhu.demo.concurrency.threadUnSafetyClass; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import java.util.Date; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: 线程安全的类:Joda-time包里面的DateTimeFormatter需要单独导入 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:31 * @version: 1.0 */ @Slf4j @ThreadSafety public class DemoJodaTime { private static DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd"); //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(clientsTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(concurrencyTotal); for (int i = 0; i < clientsTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); } /** * 通过线程安全对象dateTimeFormatter处理 */ private static void update(int count){ Date date = DateTime.parse("20220424", dateTimeFormatter).toDate(); log.info("{},{}", count, date); } }输出结果:有5000条,线程安全的09:45:06.164 [pool-1-thread-1947] INFO com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoJodaTime - 1946,Sun Apr 24 00:00:00 CST 2022 09:45:06.154 [pool-1-thread-491] INFO com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoJodaTime - 490,Sun Apr 24 00:00:00 CST 2022 09:45:06.184 [pool-1-thread-2943] INFO com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoJodaTime - 2942,Sun Apr 24 00:00:00 CST 2022 09:45:06.169 [pool-1-thread-1911] INFO com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoJodaTime - 1910,Sun Apr 24 00:00:00 CST 2022 09:45:06.169 [pool-1-thread-616] INFO com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoJodaTime - 615,Sun Apr 24 00:00:00 CST 2022 09:45:06.168 [pool-1-thread-2551] INFO com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoJodaTime - 2550,Sun Apr 24 00:00:00 CST 2022 09:45:06.144 [pool-1-thread-1415] INFO com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoJodaTime - 1414,Sun Apr 24 00:00:00 CST 2022 09:45:06.154 [pool-1-thread-4611] INFO com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoJodaTime - 4610,Sun Apr 24 00:00:00 CST 2022 09:45:06.169 [pool-1-thread-4760] INFO com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoJodaTime - 4759,Sun Apr 24 00:00:00 CST 2022 09:45:06.167 [pool-1-thread-2388] INFO com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoJodaTime - 2387,Sun Apr 24 00:00:00 CST 2022 09:45:06.184 [pool-1-thread-3990] INFO com.yanxizhu.demo.concurrency.threadUnSafetyClass.DemoJodaTime - 3989,Sun Apr 24 00:00:00 CST 2022 。。。。。ArrayList线程不安全的代码示例package com.yanxizhu.demo.concurrency.threadUnSafetyClass; import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.units.qual.A; import org.joda.time.DateTime; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: 线程不安全类:ArrayList * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:52 * @version: 1.0 */ @Slf4j @UnThreadSafety public class DemoArrayList { private static List<Integer> list = new ArrayList<>(); //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(concurrencyTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal); for (int i = 0; i < clientsTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("list size:{}", list.size()); } /** * 通过线程安全对象dateTimeFormatter处理 */ private static void update(int count){ list.add(count); } }输出结果:每次输出ist长度不同,线程不安全的。HashSet线程不安全类代码示例package com.yanxizhu.demo.concurrency.threadUnSafetyClass; import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: 线程不安全类:HashSet * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:52 * @version: 1.0 */ @Slf4j @UnThreadSafety public class DemoHashSet { private static Set<Integer> set = new HashSet<>(); //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(concurrencyTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal); for (int i = 0; i < clientsTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("set size:{}", set.size()); } /** * 通过线程安全对象dateTimeFormatter处理 */ private static void update(int count){ set.add(count); } }输出结果:每次长度不同,不是5000,线程不安全。HashMap线程不安全的代码示例package com.yanxizhu.demo.concurrency.threadUnSafetyClass; import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: 线程不安全类:HashMap * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/24 9:52 * @version: 1.0 */ @Slf4j @UnThreadSafety public class DemoHashMap { private static Map<Integer, Integer> map = new HashMap(); //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(concurrencyTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal); for (int i = 0; i < clientsTotal; i++) { final int count = i; executorService.execute(()->{ try { semaphore.acquire(); update(count); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("map size:{}", map.size()); } /** * 通过线程安全对象dateTimeFormatter处理 */ private static void update(int count){ map.put(count, count); } }输出结果: 长度不是5000,每次长度不一样,线程不安全。总结:线程安全类对应的不只一个。后面专门说明。注意:先检查再执行这种类是线程不安全的,因为被分成2步了,if (condition(a)) { handle(1);}
2022年04月24日
120 阅读
0 评论
2 点赞
2022-04-23
线程封闭
线程封闭当访问共享数据时,通常是要使用同步。如果要避免使用同步,就是不提供共享数据。如果仅在单线程中访问数据,就不需要同步,这种技术就叫做线程封闭,它是实现线程安全最简单的方式之一。当某个对象封闭在一个线程当中时将自动实现线程安全性,即使被封闭的对象本身它并不是安全的,实现线程主要有三种方式。实现线程封闭的三种方式:1、Ad-hoc线程封闭:程序控制实现,最糟糕,忽略2、堆栈封闭:局部变量,无并发问题:栈封闭简单理解就是通过局部变量来实现线程封闭,多个线程访问对象的同一个方法,方法内部的局部变量会拷贝到每个线程的线程栈当中,只有当前线程才能访问到,互不干扰。所以局部变量是不被多个线程所共享的。尽量少用全局变量(不是全局常量)。3、ThreadLocal线程封闭:特别好的封闭方法。维护线程封闭一种更规范的方法就是使用ThreadLocal。ThreadLocal底层是一个map,可以就是线程名字,value就是我们封装的对象。ThreadLocal提供get、set方法为每个使用该变量的线程都存有一份独立的副本,因此get总是返回的是当前线程在调用set时设置的值。ThreadLocal一般用于防止对可变的单实例变量或者全局变量进行共享。ThreadLocal使用:代码示例1、自定义ThreadLoca工具package com.yanxizhu.demo.concurrency.DemoThrodLocal.ThreadLock; /** * @description: 使用ThrodLock的线程封闭,实现在一个线程种数据共享 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/22 22:56 * @version: 1.0 */ public class MyThroldLock { private final static ThreadLocal<Long> requestholder = new ThreadLocal<>(); //用ThreadLock提供的set方法,向ThreadLock中放入共享数据id //一般在实际接口处理前调用 public static void add(Long id) { requestholder.set(id); } //用ThreadLock提供的get方法获取ThreadLock中共享的数据 public static Long get() { return requestholder.get(); } //用ThreadLock提供的remove方法移除共享数据,不然容易导致内存溢出 //一般在接口真正处理完后调用 public static void remove() { requestholder.remove(); } }2、自定义过滤器Filterpackage com.yanxizhu.demo.concurrency.DemoThrodLocal.filter; import com.yanxizhu.demo.concurrency.DemoThrodLocal.ThreadLock.MyThroldLock; import lombok.extern.slf4j.Slf4j; import javax.servlet.*; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; /** * @description: 自定一自己的filter,实现filter接口,注意是servlet中的 * 说明: * Servlet中的过滤器Filter是实现了javax.servlet.Filter接口的服务器端程序, * 主要的用途是设置字符集、控制权限、控制转向、做一些业务逻辑判断等。其工作原理是, * 只要你在web.xml文件配置好要拦截的客户端请求,它都会帮你拦截到请求, * 此时你就可以对请求或响应(Request、Response)统一设置编码,简化操作; * 同时还可进行逻辑判断,如用户是否已经登陆、有没有权限访问该页面等等工作。 * 它是随你的web应用启动而启动的,只初始化一次,以后就可以拦截相关请求,只有当你的web应用停止或重新部署的时候才销毁。 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/22 23:02 * @version: 1.0 */ @Slf4j public class HttpFilter implements Filter { //初始化的 public void init(FilterConfig filterConfig) throws ServletException { } public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { //一般需要强转成HttpServletRequest、HttpServletResponse,这样才能直接获取用户浏览器访问时携带信息 //比如获取用户session中的用户信息 HttpServletRequest request = (HttpServletRequest) servletRequest; HttpServletResponse response = (HttpServletResponse) servletResponse; //一般登录会将用户信息写入session,这里只是演示一下 // String userName = (String) request.getSession().getAttribute("userName"); log.info("do filter,线程id {},请求地址{}", Thread.currentThread().getId(), request.getServletPath()); //通过向ThreadLock添加共享值 MyThroldLock.add(Thread.currentThread().getId()); // 所有处理完后,记得执行下面,不然不会往下执行 filterChain.doFilter(servletRequest, servletResponse); } //销毁时的 public void destroy() { } }注册自定义过滤器HttpFilterpackage com.yanxizhu.demo.concurrency.DemoThrodLocal.filter; import org.springframework.boot.web.servlet.FilterRegistrationBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @description: 将自定义的filter添加到spring拦截器中 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/22 23:37 * @version: 1.0 */ @Configuration public class MyFilterConfig { @Bean public FilterRegistrationBean httpFilter() { FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(); filterRegistrationBean.setFilter(new HttpFilter()); filterRegistrationBean.addUrlPatterns("/demo/*"); return filterRegistrationBean; } }3、自定义拦截器HandlerInterceptorpackage com.yanxizhu.demo.concurrency.DemoThrodLocal.interceptor; import com.yanxizhu.demo.concurrency.DemoThrodLocal.ThreadLock.MyThroldLock; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import org.springframework.web.servlet.HandlerInterceptor; import org.springframework.web.servlet.ModelAndView; import org.springframework.web.servlet.handler.HandlerInterceptorAdapter; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * @description: 自定义HandlerInerceptor * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/22 22:28 * @version: 1.0 */ @Slf4j @Component public class MyHandlerInterceptor implements HandlerInterceptor { //接口处理之前 public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { log.info("进入controller之前的preHanler方法。。。。"); return true; } public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception { log.info("这个是干嘛的呢。。。"); } //接口处理之后 public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { log.info("之后的方法。。。。afterCompletion,MyThroldLock.remove()"); MyThroldLock.remove(); return; } }注册自定义拦截器MyHandlerInterceptorpackage com.yanxizhu.demo.concurrency.DemoThrodLocal.interceptor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; /** * @description: 自定义WebMVCConfig * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/22 22:31 * @version: 1.0 */ @Configuration public class MyWebMvcConfigurerAdapter implements WebMvcConfigurer { @Autowired MyHandlerInterceptor myHandlerInterceptor; public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(myHandlerInterceptor).addPathPatterns("/**"); } }4、编写Controller测试ThreadLock的使用package com.yanxizhu.demo.concurrency.DemoThrodLocal.controller; import com.yanxizhu.demo.concurrency.DemoThrodLocal.ThreadLock.MyThroldLock; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @description: HandlerInterceptor 拦截器测试 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/22 22:25 * @version: 1.0 */ @RestController @RequestMapping("/demo") @Slf4j public class ThreadLockController { @GetMapping("/myHandlerInterceptor") public String myHandlerInterceptorTest() { log.info("进入Controoler方法种。。。。。。"); return "My HandlerInterceptor....,共享ThreadLock的变量为:"+MyThroldLock.get(); } }测试请求地址http://127.0.0.1:8080/demo/myHandlerInterceptor页面输出结果:My HandlerInterceptor....,共享ThreadLock的变量为:46IDEA控制台输出结果:do filter,线程id 50,请求地址/demo/myHandlerInterceptor 进入controller之前的preHanler方法。。。。 进入Controoler方法种。。。。。。 这个是干嘛的呢。。。 之后的方法。。。。afterCompletion注意点注意过滤器Filter和拦截器HandlerInterceptor的区别及使用方式。可参考地址:https://blog.csdn.net/yb546822612/article/details/102950040https://blog.csdn.net/yb546822612/article/details/102950981
2022年04月23日
110 阅读
0 评论
3 点赞
1
2
...
5