线程安全-并发容器J.U.C
- ArrayList -> CopyOnWriteArrayList
- HashSet、TreeSet -> CopyOnWriteArraySet 、ConcurrentSkipListSet
- HashMap、TreeMap -> ConcurrentHashMap 、ConcurrentSkipListMap
CopyOnWriteArrayList
适合读多写少的场景。读的时候再原数组读不需要加锁,写的时候会copy一份会单独加锁。
代码示例:
package com.yanxizhu.demo.concurrency.concurrent;
import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @description: 线程安全容器:CopyOnWriteArrayList
* @author: <a href="mailto:batis@foxmail.com">清风</a>
* @date: 2022/4/24 9:52
* @version: 1.0
*/
@Slf4j
@ThreadSafety
public class DemoCopyOnWriteArrayList {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static List<Integer> list = new CopyOnWriteArrayList<>();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i) {
list.add(i);
}
}
输出结果:5000
CopyOnWriteArraySet
线程安全,代码示例
package com.yanxizhu.demo.concurrency.concurrent;
import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;
import java.util.Set;
import java.util.concurrent.*;
/**
* @description: 线程安全容器:CopyOnWriteArraySet
* @author: <a href="mailto:batis@foxmail.com">清风</a>
* @date: 2022/4/24 9:52
* @version: 1.0
*/
@Slf4j
@ThreadSafety
public class DemoCopyOnWriteArraySet {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static Set<Integer> set = new CopyOnWriteArraySet<>();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("set size:{}", set.size());
}
private static void update(int i) {
set.add(i);
}
}
输出结果:set size:5000,线程安全
ConcurrentSkipListSet
线程安全,代码示例
package com.yanxizhu.demo.concurrency.concurrent;
import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
/**
* @description: 线程安全容器:ConcurrentSkipListSet
* @author: <a href="mailto:batis@foxmail.com">清风</a>
* @date: 2022/4/24 9:52
* @version: 1.0
*/
@Slf4j
@ThreadSafety
public class DemoConcurrentSkipListSet {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static Set<Integer> set = new ConcurrentSkipListSet<>();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("set size:{}", set.size());
}
private static void update(int i) {
set.add(i);
}
}
输出结果:5000,线程安全。
注意:单纯的add、remove是线程安全的,contains等是不安全的。
ConcurrentHashMap
线程安全,适合并发量大场景
package com.yanxizhu.demo.concurrency.concurrent;
import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
/**
* @description: 线程安全容器:ConcurrentHashMap适合高并发场景
* @author: <a href="mailto:batis@foxmail.com">清风</a>
* @date: 2022/4/24 9:52
* @version: 1.0
*/
@Slf4j
@UnThreadSafety
public class DemoConcurrentHashMap {
private static Map<Integer, Integer> map = new ConcurrentHashMap<>();
//用户数量
private static final int clientsTotal = 5000;
//并发数量
private static final int concurrencyTotal = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
//信号量
final Semaphore semaphore = new Semaphore(concurrencyTotal);
//闭锁
final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal);
for (int i = 0; i < clientsTotal; i++) {
final int count = i;
executorService.execute(()->{
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (InterruptedException e) {
log.error("出现错误:【{}】", e.getMessage());
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("map size:{}", map.size());
}
/**
* 通过线程安全对象dateTimeFormatter处理
*/
private static void update(int count){
map.put(count, count);
}
}
输出结果:5000,线程安全
ConcurrentSkipListMap
线程安全,随着并发量增加性能表现越强,大并发下使用ConcurrentHashMap,并发一直增加推荐使用ConcurrentSkipListMap。
package com.yanxizhu.demo.concurrency.concurrent;
import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.*;
/**
* @description: 线程安全容器:ConcurrentSkipListMap,随着并发的增加性能表现的越好
* @author: <a href="mailto:batis@foxmail.com">清风</a>
* @date: 2022/4/24 9:52
* @version: 1.0
*/
@Slf4j
@UnThreadSafety
public class DemoConcurrentHashMap1 {
private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>();
//用户数量
private static final int clientsTotal = 5000;
//并发数量
private static final int concurrencyTotal = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
//信号量
final Semaphore semaphore = new Semaphore(concurrencyTotal);
//闭锁
final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal);
for (int i = 0; i < clientsTotal; i++) {
final int count = i;
executorService.execute(()->{
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (InterruptedException e) {
log.error("出现错误:【{}】", e.getMessage());
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("map size:{}", map.size());
}
/**
* 通过线程安全对象dateTimeFormatter处理
*/
private static void update(int count){
map.put(count, count);
}
}
输出结果:5000,线程安全。
注意:ConcurrentHashMap与ConcurrentSkipListMap的使用场景。
小总结:
- 线程限制:一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改
- 共享只读:一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访问,但是任何线程都不能修改它。
- 线程安全对象:一个线程安全的对象或者容器,在内部通过同步机制来保证线程安全,所以其他线程无需额外的同步就可以通过公共接口随意访问它
- 被守护对象:被守护对象只能通过获取特定的锁来访问
评论 (0)