线程安全-并发容器J.U.C

admin
2022-04-24 / 0 评论 / 30 阅读 / 正在检测是否收录...

线程安全-并发容器J.U.C

  • ArrayList -> CopyOnWriteArrayList
  • HashSet、TreeSet -> CopyOnWriteArraySet 、ConcurrentSkipListSet
  • HashMap、TreeMap -> ConcurrentHashMap 、ConcurrentSkipListMap

CopyOnWriteArrayList

适合读多写少的场景。读的时候再原数组读不需要加锁,写的时候会copy一份会单独加锁。

代码示例:

package com.yanxizhu.demo.concurrency.concurrent;

import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * @description: 线程安全容器:CopyOnWriteArrayList
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/24 9:52
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class DemoCopyOnWriteArrayList {
    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static List<Integer> list = new CopyOnWriteArrayList<>();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);


        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }

        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", list.size());
    }

    private static void update(int i) {
        list.add(i);
    }
}

输出结果:5000

CopyOnWriteArraySet

线程安全,代码示例

package com.yanxizhu.demo.concurrency.concurrent;

import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.Set;
import java.util.concurrent.*;

/**
 * @description: 线程安全容器:CopyOnWriteArraySet
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/24 9:52
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class DemoCopyOnWriteArraySet {
    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static Set<Integer> set = new CopyOnWriteArraySet<>();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);


        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }

        countDownLatch.await();
        executorService.shutdown();
        log.info("set size:{}", set.size());
    }

    private static void update(int i) {
        set.add(i);
    }
}

输出结果:set size:5000,线程安全

ConcurrentSkipListSet

线程安全,代码示例

package com.yanxizhu.demo.concurrency.concurrent;

import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Set;
import java.util.concurrent.*;

/**
 * @description: 线程安全容器:ConcurrentSkipListSet
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/24 9:52
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class DemoConcurrentSkipListSet {
    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static Set<Integer> set = new ConcurrentSkipListSet<>();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);


        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }

        countDownLatch.await();
        executorService.shutdown();
        log.info("set size:{}", set.size());
    }

    private static void update(int i) {
        set.add(i);
    }
}

输出结果:5000,线程安全。

注意:单纯的add、remove是线程安全的,contains等是不安全的。

ConcurrentHashMap

线程安全,适合并发量大场景

package com.yanxizhu.demo.concurrency.concurrent;

import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

/**
 * @description: 线程安全容器:ConcurrentHashMap适合高并发场景
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/24 9:52
 * @version: 1.0
 */
@Slf4j
@UnThreadSafety
public class DemoConcurrentHashMap {
    private static Map<Integer, Integer> map = new ConcurrentHashMap<>();
    //用户数量
    private static final int clientsTotal = 5000;
    //并发数量
    private static final int concurrencyTotal = 200;
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量
        final Semaphore semaphore = new Semaphore(concurrencyTotal);
        //闭锁
        final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal);

        for (int i = 0; i < clientsTotal; i++) {
            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("出现错误:【{}】", e.getMessage());
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("map size:{}", map.size());
    }

    /**
     * 通过线程安全对象dateTimeFormatter处理
     */
    private  static void update(int count){
        map.put(count, count);
    }
}

输出结果:5000,线程安全

ConcurrentSkipListMap

线程安全,随着并发量增加性能表现越强,大并发下使用ConcurrentHashMap,并发一直增加推荐使用ConcurrentSkipListMap。

package com.yanxizhu.demo.concurrency.concurrent;

import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.*;

/**
 * @description: 线程安全容器:ConcurrentSkipListMap,随着并发的增加性能表现的越好
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/24 9:52
 * @version: 1.0
 */
@Slf4j
@UnThreadSafety
public class DemoConcurrentHashMap1 {
    private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>();
    //用户数量
    private static final int clientsTotal = 5000;
    //并发数量
    private static final int concurrencyTotal = 200;
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量
        final Semaphore semaphore = new Semaphore(concurrencyTotal);
        //闭锁
        final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal);

        for (int i = 0; i < clientsTotal; i++) {
            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("出现错误:【{}】", e.getMessage());
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("map size:{}", map.size());
    }

    /**
     * 通过线程安全对象dateTimeFormatter处理
     */
    private  static void update(int count){
        map.put(count, count);
    }
}

输出结果:5000,线程安全。

注意:ConcurrentHashMap与ConcurrentSkipListMap的使用场景。

小总结:

  1. 线程限制:一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改
  2. 共享只读:一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访问,但是任何线程都不能修改它。
  3. 线程安全对象:一个线程安全的对象或者容器,在内部通过同步机制来保证线程安全,所以其他线程无需额外的同步就可以通过公共接口随意访问它
  4. 被守护对象:被守护对象只能通过获取特定的锁来访问
2

评论 (0)

取消