原子性-Atomic类常见的使用

admin
2022-04-20 / 0 评论 / 172 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2022年04月21日,已超过946天没有更新,若内容或图片失效,请留言反馈。

1、(信号量)Semaphore+(闭锁)CountDownLatch+(常用源自类)AtomicInteger

package com.yanxizhu.demo.concurrency.atomic;

import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @description: 线程安全
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/19 20:05
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class DemoSemaphoreAndCountDownLatchAndAtomic {

    //用户数量
    private static final int clientsTotal = 5000;
    //并发数量
    private static final int concurrencyTotal = 200;
    //累加总和
    private static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量
        final Semaphore semaphore = new Semaphore(clientsTotal);
        //闭锁
        final CountDownLatch countDownLatch = new CountDownLatch(concurrencyTotal);

        for (int i = 0; i < clientsTotal; i++) {
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("出现错误:【{}】", e.getMessage());
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("累加结果为count=【{}】",count.get());
    }

    /**
     * 累加
     */
    private  static void add(){
        //说明:incrementAndGet、getAndIncrement类似,i++与++i
        count.incrementAndGet();
        count.getAndIncrement();
    }
}

运行结果:

累加结果为count=【10000】

2、AtomicReference

package com.yanxizhu.demo.concurrency.atomic;

import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicReference;

/**
 * @description: AtomicReference
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/20 21:09
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class DemoAtomicReference {

    public static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {

        count.compareAndSet(0, 2);
        count.compareAndSet(0, 1);
        count.compareAndSet(1, 3);
        count.compareAndSet(2, 4);
        count.compareAndSet(3, 5);

        log.info("count结果为【{}】",count.get());
    }
}

运行结果:

count结果为【4】

3、AtomicIntegerFieldUpdater

package com.yanxizhu.demo.concurrency.atomic;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
 * @description: AtomicIntegerFieldUpdater,原子性修改
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/20 21:17
 * @version: 1.0
 */
@Slf4j
public class DemoAtomicIntegerFieldUpdater {

    private static AtomicIntegerFieldUpdater<DemoAtomicIntegerFieldUpdater> updater = AtomicIntegerFieldUpdater.newUpdater(DemoAtomicIntegerFieldUpdater.class, "count");

    @Getter
    public volatile int count = 100;

    private static DemoAtomicIntegerFieldUpdater demoAtomicIntegerFieldUpdater = new DemoAtomicIntegerFieldUpdater();

    public static void main(String[] args) {
        if(updater.compareAndSet(demoAtomicIntegerFieldUpdater, 100, 200)) {
            log.info("updater succcess1,{}", demoAtomicIntegerFieldUpdater.getCount());
        }

        if(updater.compareAndSet(demoAtomicIntegerFieldUpdater, 100, 200)) {
            log.info("updater succcess2,{}", demoAtomicIntegerFieldUpdater.getCount());
        }else {
            log.info("updater fail,{}", demoAtomicIntegerFieldUpdater.getCount());
        }
    }
}

运行结果:

updater succcess1,200
updater fail,200

4、AtomicStampedReference

主要用于解决CAS中ABA问题,主要是AtomicStampedReference类中compareAndSet方法多了一个stamp的比较。stamp是每次更新来维护的。

    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }

5、LongAdder

JDK1.8后新增了LongAdder。

AtomicLong:是基于 CAS 方式自旋更新的;

LongAdder: 是把 value 分成若干cell,并发量低的时候,直接 CAS 更新值,成功即结束。并发量高的情况,CAS更新某个cell值和需要时对cell数据扩容,成功结束;更新失败自旋 CAS 更新 cell值。取值的时候,调用 sum() 方法进行每个cell累加。

AtomicLong: 包含有原子性的读、写结合的api;

LongAdder :没有原子性的读、写结合的api,能保证结果最终一致性。

低并发:低并发场景AtomicLong 和 LongAdder 性能相似。

高并发:高并发场景 LongAdder 性能优于 AtomicLong。

6、AtomicLongArray

主要用于原子修改数组

    //                                  下标,      期望值               更新值
    public final boolean compareAndSet(int i, long expectedValue, long newValue) {
        return AA.compareAndSet(array, i, expectedValue, newValue);
    }

通过传入下标、期望值、要更新的值进行修改。

7、AtomicBoolean

    public final boolean compareAndSet(boolean expectedValue, boolean newValue) {
        return VALUE.compareAndSet(this,
                                   (expectedValue ? 1 : 0),
                                   (newValue ? 1 : 0));
    }
package com.yanxizhu.demo.concurrency.atomic;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @description: 结果,最后只会执行一次
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/20 22:05
 * @version: 1.0
 */
@Slf4j
public class DemoAtomicBoolean {

    private static AtomicBoolean isHappen = new AtomicBoolean(false);

    //用户数量
    private static final int clientsTotal = 5000;
    //并发数量
    private static final int concurrencyTotal = 200;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量
        final Semaphore semaphore = new Semaphore(clientsTotal);
        //闭锁
        final CountDownLatch countDownLatch = new CountDownLatch(concurrencyTotal);

        for (int i = 0; i < clientsTotal; i++) {
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("出现错误:【{}】", e.getMessage());
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("isHappen结果为count=【{}】",isHappen.get());
    }

    public static void test() {
        //从false变成true是一次原子操作,只会执行一次
        if(isHappen.compareAndSet(false, true)) {
            log.info("execute .. {}", isHappen);
        }
    }
}

运行结果:

execute .. true
isHappen结果为count=【true】
2

评论 (0)

取消