线程安全-同步容器

admin
2022-04-24 / 0 评论 / 25 阅读 / 正在检测是否收录...

线程安全-同步容器

主要包括

ArrayList -> Vector,Stack

HashMap->HashTable(key、value不能为null)
Collections.synchronizedXXX(List、Set、Map)

都是使用Synchronized进行修饰的,性能不是特别好。可以使用并发容器代替。

注意:同步容器也可能是线程步安全的。

同步容器线程步安全,代码示例:

package com.yanxizhu.demo.concurrency.synContainer;

import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety;

import java.util.Vector;

/**
 * @description: 同步容器,也可能出现线程不安全情况
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/24 10:32
 * @version: 1.0
 */
@UnThreadSafety
public class DemoVecotNo {

    private static final Vector<Integer> vector = new Vector<>();

    public static void main(String[] args) {
        //一直循环
        while(true) {

            //向vector容器放入值
            for(int i=0; i <10; i++){
                vector.add(i);
            }

            //线程1,向vector容器移除值
            new Thread(()->{
                for(int i=0; i <vector.size(); i++){
                    vector.remove(i);
                }
            }).start();

            //线程2,向vector中获取值
            new Thread(()->{
                for(int i=0; i <vector.size(); i++){
                    vector.get(i);
                }
            }).start();
        }
    }
}

输出结果:

Exception in thread "Thread-478" Exception in thread "Thread-1724" Exception in thread "Thread-1918" Exception in thread "Thread-1769" java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 10
    at java.base/java.util.Vector.remove(Vector.java:875)
    at com.yanxizhu.demo.concurrency.synContainer.DemoVecotNo.lambda$main$0(DemoVecotNo.java:27)
    at java.base/java.lang.Thread.run(Thread.java:834)
java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 62
    at java.base/java.util.Vector.remove(Vector.java:875)
    at com.yanxizhu.demo.concurrency.synContainer.DemoVecotNo.lambda$main$0(DemoVecotNo.java:27)
    at java.base/java.lang.Thread.run(Thread.java:834)
java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 2
    at java.base/java.util.Vector.get(Vector.java:781)
    at com.yanxizhu.demo.concurrency.synContainer.DemoVecotNo.lambda$main$1(DemoVecotNo.java:34)
    at java.base/java.lang.Thread.run(Thread.java:834)
java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 10
    at java.base/java.util.Vector.remove(Vector.java:875)
    at com.yanxizhu.demo.concurrency.synContainer.DemoVecotNo.lambda$main$0(DemoVecotNo.java:27)
    at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "Thread-4507" java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 12
    at java.base/java.util.Vector.get(Vector.java:781)
    at com.yanxizhu.demo.concurrency.synContainer.DemoVecotNo.lambda$main$1(DemoVecotNo.java:34)
    at java.base/java.lang.Thread.run(Thread.java:834)

说明:

add、remove、get防范都是通过synchronized修饰了的,为什么还是会出现线程不安全,因为当remove和get运行时,如果i相等,remove移除i时,get再获取i就包错了。


Vector

线程安全

代码示例

package com.yanxizhu.demo.concurrency.synContainer;

import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @description: 线程安全容器:Vector
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/24 10:18
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class DeomVecotr {

    private static Vector<Integer> vector = new Vector<>();
    //用户数量
    private static final int clientsTotal = 5000;
    //并发数量
    private static final int concurrencyTotal = 200;
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量
        final Semaphore semaphore = new Semaphore(concurrencyTotal);
        //闭锁
        final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal);

        for (int i = 0; i < clientsTotal; i++) {
            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("出现错误:【{}】", e.getMessage());
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("vector size:{}", vector.size());
    }

    /**
     * 通过线程安全对象dateTimeFormatter处理
     */
    private  static void update(int count){
        vector.add(count);
    }
}

输出结果:每次是输出5000

Hashtable

线程安全,代码示例:

package com.yanxizhu.demo.concurrency.synContainer;

import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @description: 线程安全容器:Hashtable
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/24 9:52
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class DemoHashTable {
    private static Map<Integer, Integer> map = new Hashtable<>();
    //用户数量
    private static final int clientsTotal = 5000;
    //并发数量
    private static final int concurrencyTotal = 200;
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量
        final Semaphore semaphore = new Semaphore(concurrencyTotal);
        //闭锁
        final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal);

        for (int i = 0; i < clientsTotal; i++) {
            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("出现错误:【{}】", e.getMessage());
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("map size:{}", map.size());
    }


    private  static void update(int count){
        map.put(count, count);
    }
}

输出结果:每次都是5000.线程安全。

Collections.synchronizedList

线程安全,同步容器

package com.yanxizhu.demo.concurrency.synContainer;

import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @description: 安全容器,Collections下的同步容器,线程安全
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/24 10:47
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class DemoCollections {
    private static List<Integer> list = Collections.synchronizedList(new ArrayList<>());
    //用户数量
    private static final int clientsTotal = 5000;
    //并发数量
    private static final int concurrencyTotal = 200;
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量
        final Semaphore semaphore = new Semaphore(concurrencyTotal);
        //闭锁
        final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal);

        for (int i = 0; i < clientsTotal; i++) {
            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("出现错误:【{}】", e.getMessage());
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("list size:{}", list.size());
    }

    /**
     * 通过线程安全对象dateTimeFormatter处理
     */
    private  static void update(int count){
        list.add(count);
    }
}

输出结果:每次输出5000,线程安全。

Collections.synchronizedSet

线程安全,同步容器,代码示例

package com.yanxizhu.demo.concurrency.synContainer;

import com.google.common.collect.Sets;
import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @description: 安全容器,Collections下的同步容器,synchronizedSet,线程安全
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/24 10:47
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class DemoCollectionsSynSet {
    private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet());
    //用户数量
    private static final int clientsTotal = 5000;
    //并发数量
    private static final int concurrencyTotal = 200;
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量
        final Semaphore semaphore = new Semaphore(concurrencyTotal);
        //闭锁
        final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal);

        for (int i = 0; i < clientsTotal; i++) {
            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("出现错误:【{}】", e.getMessage());
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("set size:{}", set.size());
    }

    /**
     * 通过线程安全对象dateTimeFormatter处理
     */
    private  static void update(int count){
        set.add(count);
    }
}

输出结果:每次输出5000,线程安全

Collections.synchronizedMap

线程安全同步容器,代码示例:

package com.yanxizhu.demo.concurrency.synContainer;

import com.yanxizhu.demo.concurrency.annotation.UnThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @description: 安全容器,Collections.synchronizedMap,线程安全
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/24 9:52
 * @version: 1.0
 */
@Slf4j
@UnThreadSafety
public class DemoCollectionsSynHashMap {
    private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap());
    //用户数量
    private static final int clientsTotal = 5000;
    //并发数量
    private static final int concurrencyTotal = 200;
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量
        final Semaphore semaphore = new Semaphore(concurrencyTotal);
        //闭锁
        final CountDownLatch countDownLatch = new CountDownLatch(clientsTotal);

        for (int i = 0; i < clientsTotal; i++) {
            final int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("出现错误:【{}】", e.getMessage());
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("map size:{}", map.size());
    }

    private  static void update(int count){
        map.put(count, count);
    }
}

输出结果:每次输出5000,线程安全。

常用操作错误

package com.yanxizhu.demo.concurrency.synContainer;

import java.util.Iterator;
import java.util.Vector;

/**
 * @description: 常见容器使用错误,解决办法
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/24 11:04
 * @version: 1.0
 */
public class DemoErrorUser {

    //增强for循环中修改,报错:java.util.ConcurrentModificationException
    public static void test1(Vector<Integer> vector) {
        for (Integer integer: vector) { //使用增强for循环
            if (integer.equals(3)) {
//                vector.remove(integer);
                vector.add(4);
            }
        }
    }

    //迭代器中循环修改,报错:java.util.ConcurrentModificationException
    public static void test2(Vector<Integer> vector) {
        Iterator<Integer> iterator = vector.iterator();
        while (iterator.hasNext()){
            Integer integer = iterator.next();
            if( integer.equals(3)) {
                vector.remove(integer);
            }
        }
    }

    //普通for循环中修改,正常。
    public static void test3(Vector<Integer> vector) {
        for(int i=0; i< vector.size(); i++) {
            if(i==3) {
                vector.remove(i);
            }
        }
    }

    //总结:增强for、迭代器循环中修改,可以先标记,然后最后进行修改。

    public static void main(String[] args) {
        Vector<Integer> vector = new Vector<>();
        vector.add(1);
        vector.add(2);
        vector.add(3);

        test1(vector);
//        test2(vector);
//        test3(vector);
    }
}

输出结果:ConcurrentModificationException

Exception in thread "main" java.util.ConcurrentModificationException
    at java.base/java.util.Vector$Itr.checkForComodification(Vector.java:1321)
    at java.base/java.util.Vector$Itr.next(Vector.java:1277)
    at com.yanxizhu.demo.concurrency.synContainer.DemoErrorUser.test1(DemoErrorUser.java:16)
    at com.yanxizhu.demo.concurrency.synContainer.DemoErrorUser.main(DemoErrorUser.java:52)

增强for、迭代器中循环修改线程报错,正常for循环,正常,解决办法,通过标记后,再单独进行移除操作。

2

评论 (0)

取消