更多详情内容请访问:JUC 系列文章导读

1、CAS 基本介绍

1.1 CAS 出现后改善

  • 没有 CAS 之前

    多线程环境不使用原子类保证线程安全,常用 synchronized 锁,但是它比较笨重,牵扯到了用户态和内核态的的切换,效率不高。

  • 使用 CAS 之后

    多线程情况下使用原子类保证线程安全,类似于乐观锁的实现

1.2 什么是 CAS

compare and swap 的缩写,中文翻译成比较并交换,实现并发算法时常用到的一种技术。它包含三个操作数——内存位置、预期原值及更新值。

执行CAS操作的时候,将内存位置的值与预期原值比较:

  • 如果相匹配,那么处理器会自动将该位置值更新为新值,
  • 如果不匹配,处理器不做任何操作或者重来(自旋),多个线程同时执行 CAS 操作只有一个会成功。
public class CASDemo {
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(5);
        System.out.println(atomicInteger.compareAndSet(5, 2022)+"\t"+atomicInteger.get()); // true  2022
        System.out.println(atomicInteger.compareAndSet(5, 2023)+"\t"+atomicInteger.get()); // false 2022
        System.out.println(atomicInteger.compareAndSet(2022, 5)+"\t"+atomicInteger.get()); // true  5
    }
}

2、CAS 底层原理

2.1硬件级别保证

  • CAS 是 JDK 提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性。

  • CAS是一条 CPU 的原子指令(cmpxchg 指令),不会造成所谓的数据不一致问题,Unsafe提供的 CAS方法(如 compareAndSwapXXX)底层实现即为 CPU 指令 cmpxchg。

  • 执行 cmpxchg 指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功 ,加锁成功之后会执行 CAS 操作,也就是说 CAS 的原子性实际上是 CPU 实现的, 其实在这一点上还是有排他锁的,只是比起用 synchronized, 这里的排他时间要短的多, 所以在多线程情况下性能会比较好

2.2 Unsafe

2.2.1 引出 Unsafe

java.util.concurrent.atomic 包下的方法

image-20220813160306619

我们发现她底层调用了 Unsafe 类中的方法,接着我们进入其中查看:

image-20220813160433359

上面三个方法都是类似的,主要对 4 个参数做一下说明:

  1. var1:表示要操作的对象
  2. var2:表示要操作对象中属性地址的偏移量
  3. var4:表示需要修改数据的期望的值
  4. var5 / var6:表示需要修改为的新值

2.2.2 什么是 Unsafe

CAS 这个理念,落地就是 Unsafe 类,是 CAS 的核心类。由于 Java 方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe 相当于一个后门 。

基于该类可以直接操作特定内存的数据 。Unsafe 类存在于 sun.misc 包中,其内部方法操作可以像 C 的指针一样直接操作内存,因为 Java 中 CAS 操作的执行依赖于 Unsafe 类的方法,其都直接调用操作系统底层资源执行相应任务 。

2.2.3 原理

CAS 并发原语体现在 JAVA 语言中就是 sun.misc.Unsafe 类中的各个方法。调用 UnSafe 类中的 CAS 方法,JVM 会帮我们实现出 CAS 汇编指令 。这是一种完全依赖于硬件的功能,通过它实现了原子操作。

再次强调,由于 CAS 是一种系统原语 ,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说 CAS 是一条 CPU 的原子指令,不会造成所谓的数据不一致问题。

假设线程 A 和线程 B 两个线程同时执行 getAndAddInt 操作(分别跑在不同 CPU 上):

  1. AtomicInteger 里面的 value原始值为 3,即主内存中 AtomicInteger 的 value 为 3,根据 JMM 模型,线程 A 和线程 B 各自持有一份值为 3 的 value 的副本分别到各自的工作内存。

  2. 线程 A 通过 getIntVolatile(var1, var2) 拿到 value 值 3,这时线程 A 被挂起。

  3. 线程 B 也通过 getIntVolatile(var1, var2) 方法获取到 value 值 3 ,此时刚好线程 B 没有被挂起并执行 compareAndSwapInt 方法比较内存值也为 3,成功修改内存值为 4,线程 B 打完收工,一切 OK。

  4. 这时线程 A 恢复,执行 compareAndSwapInt 方法比较,发现自己手里的值数字 3 和主内存的值数字 4 不一致,说明该值已经被其它线程抢先一步修改过了,那 A 线程本次修改失败,只能重新读取重新来一遍了。

  5. 线程 A 重新获取 value 值,因为变量 value 被 volatile 修饰,所以其它线程对它的修改,线程 A 总是能够看到,线程 A 继续执行 compareAndSwapInt 进行比较替换,直到成功

3、CAS 与自旋锁

3.1 什么是自旋锁

指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁。当线程发现锁被占用时,会不断循环判断锁的状态,直到获取。这样的好处是减少线程上下文切换的消耗,循环比较获取没有类似 wait 的阻塞。

3.2 实现自旋锁

具体实现下面讲述 AtomicReference 引用类型原子类时再具体实现(点我也可直达)

3.3 CAS 缺点

3.3.1 开销大

CAS 失败,如果它一直自旋会一直占用 CPU 时间,造成较大的开销

3.3.2 只能保证一个共享变量的原子性

  1. 对一个共享变量执行操作时,可以使用循环 CAS 的方式来保证原子操作
  2. 对多个共享变量操作,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁来保证其原子性

3.3.3 ABA 问题

什么是 ABA 问题?

CAS 算法实现一个重要前提:需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差类会导致数据的变化。比如说:

  1. 一个线程 one 从内存位置 V 中取出A,这时候另一个线程 two 也从内存中取出 A,并且线程 two 进行了一些操作将值变成了 B,
  2. 然后线程 two 又将 V 位置的数据变成 A,这时候线程 one 进行 CAS 操作发现内存中仍然是 A,然后线程 one 操作成功。
  3. 尽管线程 one 的 CAS 操作成功,但是不代表这个过程就是没有问题的。

解决方式:AtomicStampedReference 版本号

public class ABADemo {
    static AtomicInteger atomicInteger = new AtomicInteger(100);

    public static void main(String[] args) {
        new Thread(() -> {
            atomicInteger.compareAndSet(100,101);
            try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
            atomicInteger.compareAndSet(101,100);
        },"t1").start();

        new Thread(() -> {
            try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(atomicInteger.compareAndSet(100, 2022)+"\t"+atomicInteger.get());
        },"t2").start();
    }
}

public class ABADemo {

    static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(100,1);

    public static void main(String[] args) {
        new Thread(() -> {
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName()+"\t"+"首次版本号:"+stamp);

            //暂停500毫秒,保证后面的t4线程初始化拿到的版本号和我一样
            try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }

            stampedReference.compareAndSet(100,101,stampedReference.getStamp(),stampedReference.getStamp()+1);
            System.out.println(Thread.currentThread().getName()+"\t"+"2次流水号:"+stampedReference.getStamp());

            stampedReference.compareAndSet(101,100,stampedReference.getStamp(),stampedReference.getStamp()+1);
            System.out.println(Thread.currentThread().getName()+"\t"+"3次流水号:"+stampedReference.getStamp());

        },"t3").start();

        new Thread(() -> {
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName()+"\t"+"首次版本号:"+stamp);

            //暂停1秒钟线程,等待上面的t3线程,发生了ABA问题
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }

            boolean b = stampedReference.compareAndSet(100, 2022, stamp, stamp + 1);

            System.out.println(b+"\t"+stampedReference.getReference()+"\t"+stampedReference.getStamp());

        },"t4").start();
    }
}
/**
 * t3   首次版本号:1
 * t4   首次版本号:1
 * t3   2次流水号:2
 * t3   3次流水号:3
 * t4执行结果:false 100 版本号:3
 */

4、原子类

都是 java.util.concurrent.atomic 包下的

image-20220814160429011

5、基本类型原子类

5.1 基本类型原子类成员

  • AtomicInteger:可以解决例如 i++ 多线程下不安全的问题
  • AtomicBoolean:可以作为中断标识停止线程的方式
  • AtomicLong:底层是 CAS +自旋锁的思想,适用于低并发的全局计算

5.2 常用 API

方法 说明
public final int get() 获取当前值
public final int getAndSet(int newValue) 获取当前的值,并设置新的值
public final int getAndIncrement() 获取当前的值,并自增
public final int getAndDecrement() 获取当前的值,并自减
public final int getAndAdd(int delta) 获取当前的值,并加上预期值
public final int incrementAndGet( ) 返回的是加 1 后的值
boolean compareAndSet(int expect,int update) 如果输入的数值等于预期值,返回 true

5.3 使用案例

public class AtomicIntegerDemo {
    public static final int SIZE = 50;

    public static void main(String[] args) throws InterruptedException {
        MyNumber myNumber = new MyNumber();
        CountDownLatch countDownLatch = new CountDownLatch(SIZE);

        for (int i = 1; i <= SIZE; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 1000; j++) {
                        myNumber.addPlusPlus();
                    }
                } finally {
                    countDownLatch.countDown();
                }
            }, String.valueOf(i)).start();
        }

        //等待上面50个线程全部计算完成后,再去获得最终值,否则可能没算完。
        //try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //暂停几秒钟线程

        countDownLatch.await();

        System.out.println(Thread.currentThread().getName() + "\t" + "result: " + myNumber.atomicInteger.get());
    }
}

public class AtomicBooleanDemo {
    public static void main(String[] args) {
        AtomicBoolean atomicBoolean=new AtomicBoolean(false);

        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t"+"coming.....");
            while(!atomicBoolean.get()){
                System.out.println("==========");
            }
            System.out.println(Thread.currentThread().getName()+"\t"+"over.....");
        },"A").start();

        new Thread(()->{
            atomicBoolean.set(true);
        },"B").start();
    }
}

6、数组类型原子类

  1. AtomicIntegerArray
  2. AtomicLongArray
  3. AtomicRreferenceArray
public class AtomicIntegerArrayDemo {
    public static void main(String[] args) {
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);

        for (int i = 0; i < atomicIntegerArray.length(); i++) {
            System.out.println(atomicIntegerArray.get(i));
        }
        System.out.println();

        int tmpInt = 0;

        tmpInt = atomicIntegerArray.getAndSet(0, 1122);
        System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0)); // 0    1122

        tmpInt = atomicIntegerArray.getAndIncrement(0);
        System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0)); // 1122 1123
    }
}

7、引用类型原子类

7.1 AtomicReference

可以携带泛型,可以使用 AtomicReference 来实现自旋锁案例

  • CAS:匹配的值是我所期望的值,则原来的值变为新的值
  • 案例体现: A 线程抢占到自旋锁之后,略过锁循环,B 线程却遇到了自旋锁循环,直到 A 线程解锁,B 线程才能脱离循环
  • 本质:A 线程抢占锁之后改变了判断值(atomicReference),只有 A 线程对象改变回原来的判断值(null)之后其他线程才允许不再自旋,所以不论其他任何的线程进来都无法摆脱自旋从而等待。
public class AtomicReferenceThreadDemo {

    private AtomicReference<Thread> atomicReference = new AtomicReference<>();

    private void lock() {
        System.out.println(Thread.currentThread().getName() + " 执行加锁操作...");
        while (!atomicReference.compareAndSet(null, Thread.currentThread())){
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(Thread.currentThread().getName() + " -- 自旋ing");
        }
    }

    private void unlock() {
        atomicReference.compareAndSet(Thread.currentThread(), null);
        System.out.println(Thread.currentThread().getName() + "--解锁成功,线程" + Thread.currentThread().getName() + " over!");
    }

    public static void main(String[] args) {
        AtomicReferenceThreadDemo demo = new AtomicReferenceThreadDemo();

        new Thread(() -> {
            demo.lock();
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName() + " 线程执行即将结束,开始解锁");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                demo.unlock();
            }
        }, "A").start();

        new Thread(() -> {
            demo.lock();
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 线程执行即将结束,开始解锁");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                demo.unlock();
            }
        }, "B").start();
    }
}

执行结果解析:

image-20220817151219384

7.2 AtomicStampedReference

携带版本号的引用类型原子类可以解决 ABA 问题:解决修改过几次,状态戳原子引用

public class ABADemo {

    private static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
    private static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(100, 1);

    @Test
    public void happenProblem() {
        System.out.println("======ABA 问题的产生======");
        new Thread(() -> {
            atomicReference.compareAndSet(100, 101);
            atomicReference.compareAndSet(101, 100);
        }, "T1").start();

        new Thread(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } // 暂停一下保证 T1 线程完成
            System.out.println(atomicReference.compareAndSet(100, 2022) + "\t" + atomicReference.get());
        }, "T2").start();
        try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }
    }

    @Test
    public void solveProblem() {
        System.out.println("======解决 ABA 问题======");
        new Thread(() -> {
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + "\t 第1次版本号" + stamp + "\t值是" + stampedReference.getReference());

            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } //暂停1秒钟t3线程

            stampedReference.compareAndSet(100, 101, stampedReference.getStamp(), stampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "\t 第2次版本号" + stampedReference.getStamp() + "\t值是" + stampedReference.getReference());
            stampedReference.compareAndSet(101, 100, stampedReference.getStamp(), stampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "\t 第3次版本号" + stampedReference.getStamp() + "\t值是" + stampedReference.getReference());
        }, "t3").start();

        new Thread(() -> {
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + "\t 第1次版本号" + stamp + "\t值是" + stampedReference.getReference());

            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } //保证线程3完成1次ABA

            boolean result = stampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
            System.out.println(Thread.currentThread().getName() + "\t 修改成功否" + result + "\t最新版本号" + stampedReference.getStamp());
            System.out.println("最新的值\t" + stampedReference.getReference());
        }, "t4").start();

        try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}

7.3 AtomicMarkableReference

  1. 原子更新带有标记位的引用类型对象

  2. 解决是否修改过

  3. 状态戳(ture / false)原子引用

  4. 不建议使用她解决 ABA 问题

    AtomicStampedReference 和 AtomicMarkableReference区别

    • AtomicStampedReference:版本号机制,修改一次+1
    • AtomicMarkableReference:true、false,它不关心引用变量更改过几次,只关心是否更改过
public class ABADemo {
    static AtomicMarkableReference<Integer> markableReference = new AtomicMarkableReference<>(100, false);

    public static void main(String[] args) {
        System.out.println("============AtomicMarkableReference不关心引用变量更改过几次,只关心是否更改过======================");
        new Thread(() -> {
            boolean marked = markableReference.isMarked();
            System.out.println(Thread.currentThread().getName() + "\t 1次版本号" + marked);
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            markableReference.compareAndSet(100, 101, marked, !marked);
            System.out.println(Thread.currentThread().getName() + "\t 2次版本号" + markableReference.isMarked());
            markableReference.compareAndSet(101, 100, markableReference.isMarked(), !markableReference.isMarked());
            System.out.println(Thread.currentThread().getName() + "\t 3次版本号" + markableReference.isMarked());
        }, "线程A").start();

        new Thread(() -> {
            boolean marked = markableReference.isMarked();
            System.out.println(Thread.currentThread().getName() + "\t 1次版本号" + marked);
            //暂停几秒钟线程
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            markableReference.compareAndSet(100, 2020, marked, !marked);
            System.out.println(Thread.currentThread().getName() + "\t" + markableReference.getReference() + "\t" + markableReference.isMarked());
        }, "线程B").start();
    }
}

8、对象的属性修改原子类

8.1 对象的属性修改原子类成员

  1. AtomicIntegerFieldUpdater:原子更新对象中 int 类型字段的值
  2. AtomicLongFieldUpdater:原子更新对象中 Long 类型字段的值
  3. AtomicReferenceFieldUpdater:原子更新引用类型字段的值

8.2 使用目的

以一种线程安全的方式操作非线程安全对象内的某些字段(是否可以不要锁定整个对象,减少锁定的范围,只关注长期、敏感性变化的某一个字段,而不是整个对象,以达到精确加锁 + 节约内存的目的)

8.3 使用要求

  1. 更新的对象属性必须使用 public volatile 修饰符
  2. 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater() 创建一个更新器,并且需要设置想要更新的类和属性

8.4 使用案例

  1. 通过 AtomicIntegerFieldUpdater 更新 score 我们获取最后的 int 值时相较于 AtomicInteger 来说不需要调用 get() 方法
  2. AtomicIntegerFieldUpdater 是 static、final 类型也就是说即使创建了 100 个对象 AtomicIntegerField 也只存在一个,不会占用对象的内存。AtomicInteger 会创建多个 AtomicInteger 对象,占用的内存比 AtomicIntegerFieldUpdater 大
class BankAccount {
    //更新的对象属性必须使用 public volatile 修饰符。
    public volatile int money = 0;//钱数

    //因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
    AtomicIntegerFieldUpdater<BankAccount> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class, "money");

    //不加synchronized,保证高性能原子性,局部微创小手术
    public void transMoney(BankAccount bankAccount) {
        fieldUpdater.getAndIncrement(bankAccount);
    }
}

public class AtomicIntegerFieldUpdaterDemo {
    public static void main(String[] args) throws InterruptedException {
        BankAccount bankAccount = new BankAccount();
        CountDownLatch countDownLatch = new CountDownLatch(10);

        for (int i = 1; i <= 10; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 1000; j++) {
                        bankAccount.transMoney(bankAccount);
                    }
                } finally {
                    countDownLatch.countDown();
                }
            }, String.valueOf(i)).start();
        }

        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t" + "result: " + bankAccount.money); // main   result: 10000
    }
}

class MyCar {public volatile Boolean flag = false;}

public class AtomicReferenceFieldUpdaterDemo {

    public static void main(String[] args) {
        MyCar myCar = new MyCar();
        AtomicReferenceFieldUpdater<MyCar, Boolean> atomicReferenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyCar.class, Boolean.class, "flag");

        for (int i = 1; i <= 5; i++) {
            new Thread(() -> {
                if (atomicReferenceFieldUpdater.compareAndSet(myCar, Boolean.FALSE, Boolean.TRUE)) {
                    System.out.println(Thread.currentThread().getName() + "\tinit...");
                    try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // 确保其他线程已经start
                    System.out.println(Thread.currentThread().getName() + "\tinit over !");
                } else {
                    System.out.println(Thread.currentThread().getName() + "\t其它线程正在初始化");
                }
            }, String.valueOf("线程" + i)).start();
        }
    }
}

9、原子操作增强类

9.1 原子操作增强类成员

  1. DoubleAccumulator
  2. DoubleAdder
  3. LongAccumulator
  4. LongAdder

9.2 常用 API

方法 说明
void add(long x) 将当前的 value 加 x
void increment() 将当前的 value 加 1
void decrement() 将当前的 value 减 1
long sum() 返回当前值。特别注意:在没有并发更新 value 的情况下,sum 会返回一个精确值;
在存在并发的情况下,sum 并不保证返回精确值
long sumThenReset() 获取当前 value,并将 value 重置为 0
void reset() 将 value 重置为 0,可用于替代重新 new 一个 LongAdder
但此方法只可以在没有并发更新的情况下使用
long longValue LongAccumulator 和 DoubleAccumulator 中没有 long sum()
等价于 long sum() 方法

9.3 LongAdder 与 LongAccumulator

  1. LongAdder 只能用来计算加法、减法,且从零开始计算
  2. LongAccumulator 提供了自定义的函数操作
public class LongAdderDemo {

    public static void main(String[] args) {
        // LongAdder只能做加减法,不能做乘除法
        LongAdder longAdder = new LongAdder();
        longAdder.increment();
        longAdder.increment();
        longAdder.increment();
        longAdder.decrement();
        System.out.println(longAdder.longValue());

        //LongAccumulator(LongBinaryOperator accumulatorFunction, long identity)
        LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() {
            @Override
            public long applyAsLong(long left, long right) {
                return left * right;
            }
        }, 5);
        longAccumulator.accumulate(1); // 5*1=5
        System.out.println(longAccumulator.longValue());
        longAccumulator.accumulate(2); // 5*2=10
        System.out.println(longAccumulator.longValue());
        longAccumulator.accumulate(5); // 10*5=50
        System.out.println(longAccumulator.longValue());
    }
}

9.4 LongAdder 高性能对比

通过运行结果,我们可以发现执行效率:synchronized < AtomicInteger(AtomicLong)< LongAdder < LongAccumulater

image-20220817213332767

class ClickNumber {
    int number = 0;

    //(1). 使用synchronized实现number++
    public synchronized void add_synchronized() {
        number++;
    }

    //(2). 使用AtomicInteger
    AtomicInteger atomicInteger = new AtomicInteger();

    public void add_atomicInteger() {
        atomicInteger.incrementAndGet();
    }

    //(3). 使用AtomicLong
    AtomicLong atomicLong = new AtomicLong();

    public void add_atomicLong() {
        atomicLong.incrementAndGet();
    }

    //(4). 使用LongAdder
    LongAdder longAdder = new LongAdder();

    public void add_longAdder() {
        longAdder.increment();
    }

    //(5). 使用LongAccumulater
    LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);

    public void add_longAccumulater() {
        longAccumulator.accumulate(1);
    }
}

/**
 * 需求: 50个线程,每个线程100W次,总点赞数出来
 */
public class AccumulatorCompareDemo {
    public static final int _1W = 10000;
    public static final int threadNumber = 50;

    public static void main(String[] args) throws InterruptedException {
        ClickNumber clickNumber = new ClickNumber();
        long startTime;
        long endTime;

        CountDownLatch countDownLatch1 = new CountDownLatch(threadNumber);
        CountDownLatch countDownLatch2 = new CountDownLatch(threadNumber);
        CountDownLatch countDownLatch3 = new CountDownLatch(threadNumber);
        CountDownLatch countDownLatch4 = new CountDownLatch(threadNumber);
        CountDownLatch countDownLatch5 = new CountDownLatch(threadNumber);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * _1W; j++) {
                        clickNumber.add_synchronized();
                    }
                } finally {
                    countDownLatch1.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch1.await();
        endTime = System.currentTimeMillis();
        System.out.println("使用synchronized---costTime:" + (endTime - startTime) + "ms\t\t最终计算结果:" + clickNumber.number);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * _1W; j++) {
                        clickNumber.add_atomicInteger();
                    }
                } finally {
                    countDownLatch2.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch2.await();
        endTime = System.currentTimeMillis();
        System.out.println("使用AtomicInteger---costTime:" + (endTime - startTime) + "ms\t\t最终计算结果:" + clickNumber.number);


        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * _1W; j++) {
                        clickNumber.add_atomicLong();
                    }
                } finally {
                    countDownLatch3.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch3.await();
        endTime = System.currentTimeMillis();
        System.out.println("使用AtomicLong---costTime:" + (endTime - startTime) + "ms\t\t最终计算结果:" + clickNumber.number);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * _1W; j++) {
                        clickNumber.add_longAdder();
                    }
                } finally {
                    countDownLatch4.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch4.await();
        endTime = System.currentTimeMillis();
        System.out.println("使用LongAdder---costTime:" + (endTime - startTime) + "ms\t\t最终计算结果:" + clickNumber.number);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * _1W; j++) {
                        clickNumber.add_longAccumulater();
                    }
                } finally {
                    countDownLatch5.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch5.await();
        endTime = System.currentTimeMillis();
        System.out.println("使用LongAccumulater---costTime:" + (endTime - startTime) + "ms\t\t最终计算结果:" + clickNumber.number);
    }
}

10、AtomicLong 与 LongAdder

10.1 AtomicLong

我们知道 AtomicLong 是利用底层的 CAS 操作来提供并发性的,比如 addAndGet () 方法:

  1. 下面方法调用了 Unsafe 类的 getAndAddLong 方法,该方法是一个 native 方法,它的逻辑是采用自旋的方式不断更新目标值,直到更新成功(也即乐观锁的实现模式)
  2. 在并发量比较低的情况下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发情况下,N 个线程同时进行自旋操作,N-1 个线程失败,导致 CPU 打满场景,此时 AtomicLong 的自旋会成为瓶颈
  3. 这就是 LongAdder 引入的初衷——解决高并发环境下 AtomictLong 的自旋瓶颈问题
public final long addAndGet(long delta) {
    return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
}

10.2 LongAdder 为什么快

LongAdder 在无竞争的情况,跟 AtomicLong 一样,对同一个 base 进行操作,当出现竞争关系时则采用化整为零的做法:从空间换时间,用一个数组 cells 将一个 value 拆分进这个数组 cells。

多个线程需要同时对 value 进行操作时候,可以对线程 id 进行 hash 得到 hash 值,再根据 hash 值映射到这个数组 cells 的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组 cells 的所有值和无竞争值 base 都加起来作为最终结果(分散热点

image-20220817230336114

sum() 会将所有 cell 数组中的 value 和 base 累加作为返回值,核心的思想就是将之前 AtomicLong 一个 value 的更新压力分散到多个 value 中去,从而降级更新热点

10.3 AtomicLong 与 LongAdder 对比

LongAdder 的 API 和 AtomicLong 的 API 还是有比较大的差异,而且 AtomicLong 提供的功能更丰富,尤其是 addAndGet()decrementAndGet()compareAndSet() 这些方法。addAndGet()、decrementAndGet() 除了单纯的做自增自减外,还可以立即获取增减后的值

而 LongAdder 则需要做同步控制才能精确获取增减后的值。如果业务需求需要精确的控制计数,则使用AtomicLong比较合适;

image-20220817210222458

10.4 Striped64

目前我们所说十八罗汉增强已经提及了 16 个了,剩下的两位分别是:Striped64 和 Number

image-20220817214722222
  • 几个重要的成员变量
    1. base:类似于 AtomicLong 中全局的 value 值。在没有竞争的情况数据直接累加到 base 上,或者 cells 扩容时,也需要将数据写入到 base 上
    2. collide:表示扩容意向:false 一定不扩容;true 可能会扩容
    3. cellsBusy:初始化 cells 或者扩容 cells 需要获取锁。0 表示无所,1 表示其他线程已经持有了锁
  • 几个重要的方法
    1. casCellsBusy():通过 CAS 操作修改 cellsBusy 的值,CAS 成功代表获取锁,返回 true
    2. NCPU:当前计算机 CPU 数量,Cell 数据扩容时会使用到
    3. getProbe():获取当前线程的 hash 值
    4. advanceProbe():重置当前线程的 hash 值

并且 Cell 是 java.util.concurrent.atomic 下 Striped64 下的一个内部类

10.5 LongAdder 部分源码解析

10.5.1 LongAdder 类中 add(long x) 方法

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}
  • 最初无竞争时,直接通过 casBase 进行更新 base 的处理
  • 如果更新 base 失败后,首次新建一个 Cell[ ] 数组(默认长度是2)
  • 当多个线程竞争同一个 Cell 比较激烈时,可能就要对 Cell[] 扩容

public void add(long x) {
    //as是striped64中的cells数组属性
    //b是striped64中的base属性
    //v是当前线程hash到的cell中存储的值
    //m是cells的长度减1,hash时作为掩码使用
    //a时当前线程hash到的cell
    Cell[] as; long b, v; int m; Cell a;
    /**
        首次首线程(as = cells) != null)一定是false,此时走casBase方法,以CAS的方式更新base值,
        且只有当cas失败时,才会走到if中
        条件1:cells不为空,说明出现过竞争,cell[]已创建
        条件2:cas操作base失败,说明其他线程先一步修改了base正在出现竞争
        */
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        //true无竞争 fasle表示竞争激烈,多个线程hash到同一个cell,可能要扩容
        boolean uncontended = true;
        /*
            条件1:cells为空,说明正在出现竞争,上面是从条件2过来的,说明!casBase(b = base, b + x))=true
                  会通过调用longAccumulate(x, null, uncontended)新建一个数组,默认长度是2
            条件2:默认会新建一个数组长度为2的数组,m = as.length - 1) < 0 应该不会出现,
            条件3:当前线程所在的cell为空,说明当前线程还没有更新过cell,应初始化一个cell。
                  a = as[getProbe() & m]) == null,如果cell为空,进行一个初始化的处理
            条件4:更新当前线程所在的cell失败,说明现在竞争很激烈,多个线程hash到同一个Cell,应扩容
                  (如果是cell中有一个线程操作,这个时候,通过a.cas(v = a.value, v + x)可以进行处理,返回的结果是true)
            **/
        if (as == null || (m = as.length - 1) < 0 ||
            //getProbe( )方法返回的时线程中的threadLocalRandomProbe字段
            //它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的(除非刻意修改它)
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            //调用Striped64中的方法处理
            longAccumulate(x, null, uncontended);
    }
}

image-20220817232225587

10.5.2 Striped64 类中 longAccumulate(...) 方法

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) 入参说明:

  • long x:需要增加的值,一般默认都是 1
  • LongBinaryOperator fn:默认传递的是 null
  • boolean wasUncontended:竞争标识,如果是 false 代表有竞争。只有 cells 初始化之后,并且当前线程 CAS 竞争修改失败,才会是 false
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

  1. 线程 hash 值

    final void longAccumulate(long x, LongBinaryOperator fn,
                           boolean wasUncontended) {
     //存储线程的probe值
     int h;
     //如果getProbe()方法返回0,说明随机数未初始化
     if ((h = getProbe()) == 0) { //这个if相当于给当前线程生成一个非0的hash值
         //使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化
         ThreadLocalRandom.current(); // force initialization
         //重新获取probe值,hash值被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为true
         h = getProbe();
         //重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈
         //wasUncontended竞争状态为true
         wasUncontended = true;
     }
    
  2. 刚刚初始化 Cell[ ] 数组(首次新建)

        //CASE2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组
        /*
        cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁
        cells == as == null  是成立的
        casCellsBusy:通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,
        返回true,第一次进来没人抢占cell单元格,肯定返回true
        **/
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) { 
            //是否初始化的标记
            boolean init = false;
            try {                           // Initialize table(新建cells)
                // 前面else if中进行了判断,这里再次判断,采用双端检索的机制
                if (cells == as) {
                    //如果上面条件都执行成功就会执行数组的初始化及赋值操作,Cell[] rs = new Cell[2]标识数组的长度为2
                    Cell[] rs = new Cell[2];
                    //rs[h & 1] = new Cell(x)表示创建一个新的cell元素,value是x值,默认为1
                    //h & 1 类似于我们之前hashmap常用到的计算散列桶index的算法,
                    //通常都是hash&(table.len-1),同hashmap一个意思
                    //看这次的value是落在0还是1
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
  3. 兜底(多个线程尝试 CAS 修改失败的线程会走这个分支)

     //CASE3:cells正在进行初始化,则尝试直接在基数base上进行累加操作
     //这种情况是cell中都CAS失败了,有一个兜底的方法
     //该分支实现直接操作base基数,将值累加到base上,
     //也即其他线程正在初始化,多个线程正在更新base的值
     else if (casBase(v = base, ((fn == null) ? v + x :
                                 fn.applyAsLong(v, x))))
         break;     
    
  4. Cell 数组不再为空且可能存在 Cell 数组扩容

    for (;;) {
     Cell[] as; Cell a; int n; long v;
     if ((as = cells) != null && (n = as.length) > 0) { // CASE1:cells已经初始化了
         // 当前线程的hash值运算后映射得到的Cell单元为null,说明该Cell没有被使用
         if ((a = as[(n - 1) & h]) == null) {
             //Cell[]数组没有正在扩容
             if (cellsBusy == 0) {       // Try to attach new Cell
                 //先创建一个Cell
                 Cell r = new Cell(x);   // Optimistically create
                 //尝试加锁,加锁后cellsBusy=1
                 if (cellsBusy == 0 && casCellsBusy()) { 
                     boolean created = false;
                     try {               // Recheck under lock
                         Cell[] rs; int m, j; //将cell单元赋值到Cell[]数组上
                         //在有锁的情况下再检测一遍之前的判断 
                         if ((rs = cells) != null &&
                             (m = rs.length) > 0 &&
                             rs[j = (m - 1) & h] == null) {
                             rs[j] = r;
                             created = true;
                         }
                     } finally {
                         cellsBusy = 0;//释放锁
                     }
                     if (created)
                         break;
                     continue;           // Slot is now non-empty
                 }
             }
             collide = false;
         }
         /**
         wasUncontended表示cells初始化后,当前线程竞争修改失败
         wasUncontended=false,表示竞争激烈,需要扩容,这里只是重新设置了这个值为true,
         紧接着执行advanceProbe(h)重置当前线程的hash,重新循环
         */
         else if (!wasUncontended)       // CAS already known to fail
             wasUncontended = true;      // Continue after rehash
         //说明当前线程对应的数组中有了数据,也重置过hash值
         //这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环
         else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                      fn.applyAsLong(v, x))))
             break;
         //如果n大于CPU最大数量,不可扩容,并通过下面的h=advanceProbe(h)方法修改线程的probe再重新尝试
         else if (n >= NCPU || cells != as)
             collide = false;    //扩容标识设置为false,标识永远不会再扩容
         //如果扩容意向collide是false则修改它为true,然后重新计算当前线程的hash值继续循环
         else if (!collide) 
             collide = true;
         //锁状态为0并且将锁状态修改为1(持有锁) 
         else if (cellsBusy == 0 && casCellsBusy()) { 
             try {
                 if (cells == as) {      // Expand table unless stale
                     //按位左移1位来操作,扩容大小为之前容量的两倍
                     Cell[] rs = new Cell[n << 1];
                     for (int i = 0; i < n; ++i)
                         //扩容后将之前数组的元素拷贝到新数组中
                         rs[i] = as[i];
                     cells = rs; 
                 }
             } finally {
                 //释放锁设置cellsBusy=0,设置扩容状态,然后进行循环执行
                 cellsBusy = 0;
             }
             collide = false;
             continue;                   // Retry with expanded table
         }
         h = advanceProbe(h);
     }

/**
    1.LongAdder继承了Striped64类,来实现累加功能,它是实现高并发累加的工具类
    2.Striped64的设计核心思路就是通过内部的分散计算来避免竞争
    3.Striped64内部包含一个base和一个Cell[] cells数组,又叫hash表
    4.没有竞争的情况下,要累加的数通过cas累加到base上;如果有竞争的话,
    会将要累加的数累加到Cells数组中的某个cell元素里面
*/
abstract class Striped64 extends Number {
    //CPU数量,即Cells数组的最大长度
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    //存放Cell的hash表,大小为2的幂
    transient volatile Cell[] cells;
    /*
    1.在开始没有竞争的情况下,将累加值累加到base;
    2.在cells初始化的过程中,cells处于不可用的状态,这时候也会尝试将通过cas操作值累加到base
    */
    transient volatile long base;
    /*
    cellsBusy,它有两个值0或1,它的作用是当要修改cells数组时加锁,
    防止多线程同时修改cells数组(也称cells表),0为无锁,1位加锁,加锁的状况有三种:
    (1). cells数组初始化的时候;
    (2). cells数组扩容的时候;
    (3).如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候;

    */
    transient volatile int cellsBusy;

    //低并发状态,还没有新建cell数组且写入进入base,刚好够用
    //base罩得住,不用上cell数组
    final boolean casBase(long cmp, long val) {
        //当前对象,在base位置上,将base(类似于AtomicLong中全局的value值),将base=0(cmp)改为1(value)
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //存储线程的probe值
        int h;
        //如果getProbe()方法返回0,说明随机数未初始化
        if ((h = getProbe()) == 0) { //这个if相当于给当前线程生成一个非0的hash值
            //使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化
            ThreadLocalRandom.current(); // force initialization
            //重新获取probe值,hash值被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为true
            h = getProbe();
            //重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈,wasUncontended竞争状态为true
            wasUncontended = true;
        }
        //如果hash取模映射得到的Cell单元不是null,则为true,此值也可以看作是扩容意向
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) { // CASE1:cells已经初始化了
                // 当前线程的hash值运算后映射得到的Cell单元为null,说明该Cell没有被使用
                if ((a = as[(n - 1) & h]) == null) {
                    //Cell[]数组没有正在扩容
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        //先创建一个Cell
                        Cell r = new Cell(x);   // Optimistically create
                        //尝试加锁,加锁后cellsBusy=1
                        if (cellsBusy == 0 && casCellsBusy()) { 
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j; //将cell单元赋值到Cell[]数组上
                                //在有锁的情况下再检测一遍之前的判断 
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;//释放锁
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                /**
                wasUncontended表示cells初始化后,当前线程竞争修改失败
                wasUncontended=false,表示竞争激烈,需要扩容,这里只是重新设置了这个值为true,
                紧接着执行advanceProbe(h)重置当前线程的hash,重新循环
                */
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //说明当前线程对应的数组中有了数据,也重置过hash值
                //这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                //如果n大于CPU最大数量,不可扩容,并通过下面的h=advanceProbe(h)方法修改线程的probe再重新尝试
                else if (n >= NCPU || cells != as)
                    collide = false;    //扩容标识设置为false,标识永远不会再扩容
                //如果扩容意向collide是false则修改它为true,然后重新计算当前线程的hash值继续循环
                else if (!collide) 
                    collide = true;
                //锁状态为0并且将锁状态修改为1(持有锁) 
                else if (cellsBusy == 0 && casCellsBusy()) { 
                    try {
                        if (cells == as) {      // Expand table unless stale
                            //按位左移1位来操作,扩容大小为之前容量的两倍
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                //扩容后将之前数组的元素拷贝到新数组中
                                rs[i] = as[i];
                            cells = rs; 
                        }
                    } finally {
                        //释放锁设置cellsBusy=0,设置扩容状态,然后进行循环执行
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            //CASE2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组
            /*
            cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁
            cells == as == null  是成立的
            casCellsBusy:通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true,第一次进来没人抢占cell单元格,肯定返回true
            **/
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) { 
                //是否初始化的标记
                boolean init = false;
                try {                           // Initialize table(新建cells)
                    // 前面else if中进行了判断,这里再次判断,采用双端检索的机制
                    if (cells == as) {
                        //如果上面条件都执行成功就会执行数组的初始化及赋值操作,Cell[] rs = new Cell[2]标识数组的长度为2
                        Cell[] rs = new Cell[2];
                        //rs[h & 1] = new Cell(x)表示创建一个新的cell元素,value是x值,默认为1
                        //h & 1 类似于我们之前hashmap常用到的计算散列桶index的算法,通常都是hash&(table.len-1),同hashmap一个意思
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //CASE3:cells正在进行初始化,则尝试直接在基数base上进行累加操作
            //这种情况是cell中都CAS失败了,有一个兜底的方法
            //该分支实现直接操作base基数,将值累加到base上,也即其他线程正在初始化,多个线程正在更新base的值
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
}

image-20220818021511466

image-20220818021338274

10.5.3 LongAdder 类中 sum() 方法

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

为何 sum() 方法在高并发情况下并不精确,主要有以下三点原因:

  1. sum() 执行时,并没有限制对 base 和 cells 的更新。所以 LongAdder 不是强一致性的,它是最终一致性的。
  2. 首先,最终返回的 sum 局部变量,初始被复制为 base,而最终返回时,很可能 base 已经被更新了,而此时局部变量 sum 不会更新,造成不一致。
  3. 其次,这里对 cell 的读取也无法保证是最后一次写入的值。所以,sum 方法在没有并发的情况下,可以获得正确的结果。


END

本文作者:
文章标题:CAS 与 原子类
本文地址:https://www.pendulumye.com/juc/498.html
版权说明:若无注明,本文皆PendulumYe原创,转载请保留文章出处。
最后修改:2022 年 08 月 30 日
千山万水总是情,给个一毛行不行💋