1、CAS 基本介绍
1.1 CAS 出现后改善
-
没有 CAS 之前
多线程环境不使用原子类保证线程安全,常用 synchronized 锁,但是它比较笨重,牵扯到了用户态和内核态的的切换,效率不高。
-
使用 CAS 之后
多线程情况下使用原子类保证线程安全,类似于乐观锁的实现
1.2 什么是 CAS
compare and swap 的缩写,中文翻译成比较并交换,实现并发算法时常用到的一种技术。它包含三个操作数——内存位置、预期原值及更新值。
执行CAS操作的时候,将内存位置的值与预期原值比较:
- 如果相匹配,那么处理器会自动将该位置值更新为新值,
- 如果不匹配,处理器不做任何操作或者重来(自旋),多个线程同时执行 CAS 操作只有一个会成功。
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(5);
System.out.println(atomicInteger.compareAndSet(5, 2022)+"\t"+atomicInteger.get()); // true 2022
System.out.println(atomicInteger.compareAndSet(5, 2023)+"\t"+atomicInteger.get()); // false 2022
System.out.println(atomicInteger.compareAndSet(2022, 5)+"\t"+atomicInteger.get()); // true 5
}
}
2、CAS 底层原理
2.1硬件级别保证
-
CAS 是 JDK 提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性。
-
CAS是一条 CPU 的原子指令(
cmpxchg
指令),不会造成所谓的数据不一致问题,Unsafe
提供的CAS
方法(如 compareAndSwapXXX)底层实现即为 CPU 指令 cmpxchg。 -
执行 cmpxchg 指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功 ,加锁成功之后会执行 CAS 操作,也就是说 CAS 的原子性实际上是 CPU 实现的, 其实在这一点上还是有排他锁的,只是比起用 synchronized, 这里的排他时间要短的多, 所以在多线程情况下性能会比较好
2.2 Unsafe
2.2.1 引出 Unsafe
java.util.concurrent.atomic 包下的方法
我们发现她底层调用了 Unsafe 类中的方法,接着我们进入其中查看:
上面三个方法都是类似的,主要对 4 个参数做一下说明:
- var1:表示要操作的对象
- var2:表示要操作对象中属性地址的偏移量
- var4:表示需要修改数据的期望的值
- var5 / var6:表示需要修改为的新值
2.2.2 什么是 Unsafe
CAS 这个理念,落地就是 Unsafe 类,是 CAS 的核心类。由于 Java 方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe 相当于一个后门 。
基于该类可以直接操作特定内存的数据 。Unsafe 类存在于 sun.misc 包中,其内部方法操作可以像 C 的指针一样直接操作内存,因为 Java 中 CAS 操作的执行依赖于 Unsafe 类的方法,其都直接调用操作系统底层资源执行相应任务 。
2.2.3 原理
CAS 并发原语体现在 JAVA 语言中就是 sun.misc.Unsafe 类中的各个方法。调用 UnSafe 类中的 CAS 方法,JVM 会帮我们实现出 CAS 汇编指令 。这是一种完全依赖于硬件的功能,通过它实现了原子操作。
再次强调,由于 CAS 是一种系统原语 ,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说 CAS 是一条 CPU 的原子指令,不会造成所谓的数据不一致问题。
假设线程 A 和线程 B 两个线程同时执行 getAndAddInt 操作(分别跑在不同 CPU 上):
AtomicInteger 里面的 value原始值为 3,即主内存中 AtomicInteger 的 value 为 3,根据 JMM 模型,线程 A 和线程 B 各自持有一份值为 3 的 value 的副本分别到各自的工作内存。
线程 A 通过 getIntVolatile(var1, var2) 拿到 value 值 3,这时线程 A 被挂起。
线程 B 也通过 getIntVolatile(var1, var2) 方法获取到 value 值 3 ,此时刚好线程 B 没有被挂起并执行 compareAndSwapInt 方法比较内存值也为 3,成功修改内存值为 4,线程 B 打完收工,一切 OK。
这时线程 A 恢复,执行 compareAndSwapInt 方法比较,发现自己手里的值数字 3 和主内存的值数字 4 不一致,说明该值已经被其它线程抢先一步修改过了,那 A 线程本次修改失败,只能重新读取重新来一遍了。
线程 A 重新获取 value 值,因为变量 value 被 volatile 修饰,所以其它线程对它的修改,线程 A 总是能够看到,线程 A 继续执行 compareAndSwapInt 进行比较替换,直到成功
3、CAS 与自旋锁
3.1 什么是自旋锁
指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁。当线程发现锁被占用时,会不断循环判断锁的状态,直到获取。这样的好处是减少线程上下文切换的消耗,循环比较获取没有类似 wait 的阻塞。
3.2 实现自旋锁
具体实现下面讲述 AtomicReference 引用类型原子类时再具体实现(点我也可直达)
3.3 CAS 缺点
3.3.1 开销大
CAS 失败,如果它一直自旋会一直占用 CPU 时间,造成较大的开销
3.3.2 只能保证一个共享变量的原子性
- 对一个共享变量执行操作时,可以使用循环 CAS 的方式来保证原子操作
- 对多个共享变量操作,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁来保证其原子性
3.3.3 ABA 问题
CAS 算法实现一个重要前提:需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差类会导致数据的变化。比如说:
- 一个线程 one 从内存位置 V 中取出A,这时候另一个线程 two 也从内存中取出 A,并且线程 two 进行了一些操作将值变成了 B,
- 然后线程 two 又将 V 位置的数据变成 A,这时候线程 one 进行 CAS 操作发现内存中仍然是 A,然后线程 one 操作成功。
- 尽管线程 one 的 CAS 操作成功,但是不代表这个过程就是没有问题的。
解决方式:AtomicStampedReference
版本号
public class ABADemo {
static AtomicInteger atomicInteger = new AtomicInteger(100);
public static void main(String[] args) {
new Thread(() -> {
atomicInteger.compareAndSet(100,101);
try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
atomicInteger.compareAndSet(101,100);
},"t1").start();
new Thread(() -> {
try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(atomicInteger.compareAndSet(100, 2022)+"\t"+atomicInteger.get());
},"t2").start();
}
}
public class ABADemo {
static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(100,1);
public static void main(String[] args) {
new Thread(() -> {
int stamp = stampedReference.getStamp();
System.out.println(Thread.currentThread().getName()+"\t"+"首次版本号:"+stamp);
//暂停500毫秒,保证后面的t4线程初始化拿到的版本号和我一样
try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
stampedReference.compareAndSet(100,101,stampedReference.getStamp(),stampedReference.getStamp()+1);
System.out.println(Thread.currentThread().getName()+"\t"+"2次流水号:"+stampedReference.getStamp());
stampedReference.compareAndSet(101,100,stampedReference.getStamp(),stampedReference.getStamp()+1);
System.out.println(Thread.currentThread().getName()+"\t"+"3次流水号:"+stampedReference.getStamp());
},"t3").start();
new Thread(() -> {
int stamp = stampedReference.getStamp();
System.out.println(Thread.currentThread().getName()+"\t"+"首次版本号:"+stamp);
//暂停1秒钟线程,等待上面的t3线程,发生了ABA问题
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
boolean b = stampedReference.compareAndSet(100, 2022, stamp, stamp + 1);
System.out.println(b+"\t"+stampedReference.getReference()+"\t"+stampedReference.getStamp());
},"t4").start();
}
}
/**
* t3 首次版本号:1
* t4 首次版本号:1
* t3 2次流水号:2
* t3 3次流水号:3
* t4执行结果:false 100 版本号:3
*/
4、原子类
都是 java.util.concurrent.atomic
包下的
5、基本类型原子类
5.1 基本类型原子类成员
- AtomicInteger:可以解决例如 i++ 多线程下不安全的问题
- AtomicBoolean:可以作为中断标识停止线程的方式
- AtomicLong:底层是 CAS +自旋锁的思想,适用于低并发的全局计算
5.2 常用 API
方法 | 说明 |
---|---|
public final int get() |
获取当前值 |
public final int getAndSet(int newValue) |
获取当前的值,并设置新的值 |
public final int getAndIncrement() |
获取当前的值,并自增 |
public final int getAndDecrement() |
获取当前的值,并自减 |
public final int getAndAdd(int delta) |
获取当前的值,并加上预期值 |
public final int incrementAndGet( ) |
返回的是加 1 后的值 |
boolean compareAndSet(int expect,int update) |
如果输入的数值等于预期值,返回 true |
5.3 使用案例
public class AtomicIntegerDemo {
public static final int SIZE = 50;
public static void main(String[] args) throws InterruptedException {
MyNumber myNumber = new MyNumber();
CountDownLatch countDownLatch = new CountDownLatch(SIZE);
for (int i = 1; i <= SIZE; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 1000; j++) {
myNumber.addPlusPlus();
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
//等待上面50个线程全部计算完成后,再去获得最终值,否则可能没算完。
//try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //暂停几秒钟线程
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "\t" + "result: " + myNumber.atomicInteger.get());
}
}
public class AtomicBooleanDemo {
public static void main(String[] args) {
AtomicBoolean atomicBoolean=new AtomicBoolean(false);
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t"+"coming.....");
while(!atomicBoolean.get()){
System.out.println("==========");
}
System.out.println(Thread.currentThread().getName()+"\t"+"over.....");
},"A").start();
new Thread(()->{
atomicBoolean.set(true);
},"B").start();
}
}
6、数组类型原子类
- AtomicIntegerArray
- AtomicLongArray
- AtomicRreferenceArray
public class AtomicIntegerArrayDemo {
public static void main(String[] args) {
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
for (int i = 0; i < atomicIntegerArray.length(); i++) {
System.out.println(atomicIntegerArray.get(i));
}
System.out.println();
int tmpInt = 0;
tmpInt = atomicIntegerArray.getAndSet(0, 1122);
System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0)); // 0 1122
tmpInt = atomicIntegerArray.getAndIncrement(0);
System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0)); // 1122 1123
}
}
7、引用类型原子类
7.1 AtomicReference
可以携带泛型,可以使用 AtomicReference 来实现自旋锁案例
- CAS:匹配的值是我所期望的值,则原来的值变为新的值
- 案例体现: A 线程抢占到自旋锁之后,略过锁循环,B 线程却遇到了自旋锁循环,直到 A 线程解锁,B 线程才能脱离循环
- 本质:A 线程抢占锁之后改变了判断值(atomicReference),只有 A 线程对象改变回原来的判断值(null)之后其他线程才允许不再自旋,所以不论其他任何的线程进来都无法摆脱自旋从而等待。
public class AtomicReferenceThreadDemo {
private AtomicReference<Thread> atomicReference = new AtomicReference<>();
private void lock() {
System.out.println(Thread.currentThread().getName() + " 执行加锁操作...");
while (!atomicReference.compareAndSet(null, Thread.currentThread())){
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName() + " -- 自旋ing");
}
}
private void unlock() {
atomicReference.compareAndSet(Thread.currentThread(), null);
System.out.println(Thread.currentThread().getName() + "--解锁成功,线程" + Thread.currentThread().getName() + " over!");
}
public static void main(String[] args) {
AtomicReferenceThreadDemo demo = new AtomicReferenceThreadDemo();
new Thread(() -> {
demo.lock();
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + " 线程执行即将结束,开始解锁");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
demo.unlock();
}
}, "A").start();
new Thread(() -> {
demo.lock();
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " 线程执行即将结束,开始解锁");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
demo.unlock();
}
}, "B").start();
}
}
执行结果解析:
7.2 AtomicStampedReference
携带版本号的引用类型原子类可以解决 ABA 问题:解决修改过几次,状态戳原子引用
public class ABADemo {
private static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
private static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(100, 1);
@Test
public void happenProblem() {
System.out.println("======ABA 问题的产生======");
new Thread(() -> {
atomicReference.compareAndSet(100, 101);
atomicReference.compareAndSet(101, 100);
}, "T1").start();
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
} // 暂停一下保证 T1 线程完成
System.out.println(atomicReference.compareAndSet(100, 2022) + "\t" + atomicReference.get());
}, "T2").start();
try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }
}
@Test
public void solveProblem() {
System.out.println("======解决 ABA 问题======");
new Thread(() -> {
int stamp = stampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t 第1次版本号" + stamp + "\t值是" + stampedReference.getReference());
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } //暂停1秒钟t3线程
stampedReference.compareAndSet(100, 101, stampedReference.getStamp(), stampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t 第2次版本号" + stampedReference.getStamp() + "\t值是" + stampedReference.getReference());
stampedReference.compareAndSet(101, 100, stampedReference.getStamp(), stampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t 第3次版本号" + stampedReference.getStamp() + "\t值是" + stampedReference.getReference());
}, "t3").start();
new Thread(() -> {
int stamp = stampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t 第1次版本号" + stamp + "\t值是" + stampedReference.getReference());
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } //保证线程3完成1次ABA
boolean result = stampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "\t 修改成功否" + result + "\t最新版本号" + stampedReference.getStamp());
System.out.println("最新的值\t" + stampedReference.getReference());
}, "t4").start();
try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
7.3 AtomicMarkableReference
-
原子更新带有标记位的引用类型对象
-
解决是否修改过
-
状态戳(ture / false)原子引用
-
不建议使用她解决 ABA 问题
AtomicStampedReference 和 AtomicMarkableReference区别
- AtomicStampedReference:版本号机制,修改一次+1
- AtomicMarkableReference:true、false,它不关心引用变量更改过几次,只关心是否更改过
public class ABADemo {
static AtomicMarkableReference<Integer> markableReference = new AtomicMarkableReference<>(100, false);
public static void main(String[] args) {
System.out.println("============AtomicMarkableReference不关心引用变量更改过几次,只关心是否更改过======================");
new Thread(() -> {
boolean marked = markableReference.isMarked();
System.out.println(Thread.currentThread().getName() + "\t 1次版本号" + marked);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
markableReference.compareAndSet(100, 101, marked, !marked);
System.out.println(Thread.currentThread().getName() + "\t 2次版本号" + markableReference.isMarked());
markableReference.compareAndSet(101, 100, markableReference.isMarked(), !markableReference.isMarked());
System.out.println(Thread.currentThread().getName() + "\t 3次版本号" + markableReference.isMarked());
}, "线程A").start();
new Thread(() -> {
boolean marked = markableReference.isMarked();
System.out.println(Thread.currentThread().getName() + "\t 1次版本号" + marked);
//暂停几秒钟线程
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
markableReference.compareAndSet(100, 2020, marked, !marked);
System.out.println(Thread.currentThread().getName() + "\t" + markableReference.getReference() + "\t" + markableReference.isMarked());
}, "线程B").start();
}
}
8、对象的属性修改原子类
8.1 对象的属性修改原子类成员
- AtomicIntegerFieldUpdater:原子更新对象中 int 类型字段的值
- AtomicLongFieldUpdater:原子更新对象中 Long 类型字段的值
- AtomicReferenceFieldUpdater:原子更新引用类型字段的值
8.2 使用目的
以一种线程安全的方式操作非线程安全对象内的某些字段(是否可以不要锁定整个对象,减少锁定的范围,只关注长期、敏感性变化的某一个字段,而不是整个对象,以达到精确加锁 + 节约内存的目的)
8.3 使用要求
- 更新的对象属性必须使用 public volatile 修饰符
- 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater() 创建一个更新器,并且需要设置想要更新的类和属性
8.4 使用案例
- 通过 AtomicIntegerFieldUpdater 更新 score 我们获取最后的 int 值时相较于 AtomicInteger 来说不需要调用 get() 方法
- AtomicIntegerFieldUpdater 是 static、final 类型也就是说即使创建了 100 个对象 AtomicIntegerField 也只存在一个,不会占用对象的内存。AtomicInteger 会创建多个 AtomicInteger 对象,占用的内存比 AtomicIntegerFieldUpdater 大
class BankAccount {
//更新的对象属性必须使用 public volatile 修饰符。
public volatile int money = 0;//钱数
//因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
AtomicIntegerFieldUpdater<BankAccount> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class, "money");
//不加synchronized,保证高性能原子性,局部微创小手术
public void transMoney(BankAccount bankAccount) {
fieldUpdater.getAndIncrement(bankAccount);
}
}
public class AtomicIntegerFieldUpdaterDemo {
public static void main(String[] args) throws InterruptedException {
BankAccount bankAccount = new BankAccount();
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 1000; j++) {
bankAccount.transMoney(bankAccount);
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "\t" + "result: " + bankAccount.money); // main result: 10000
}
}
class MyCar {public volatile Boolean flag = false;}
public class AtomicReferenceFieldUpdaterDemo {
public static void main(String[] args) {
MyCar myCar = new MyCar();
AtomicReferenceFieldUpdater<MyCar, Boolean> atomicReferenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyCar.class, Boolean.class, "flag");
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
if (atomicReferenceFieldUpdater.compareAndSet(myCar, Boolean.FALSE, Boolean.TRUE)) {
System.out.println(Thread.currentThread().getName() + "\tinit...");
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // 确保其他线程已经start
System.out.println(Thread.currentThread().getName() + "\tinit over !");
} else {
System.out.println(Thread.currentThread().getName() + "\t其它线程正在初始化");
}
}, String.valueOf("线程" + i)).start();
}
}
}
9、原子操作增强类
9.1 原子操作增强类成员
- DoubleAccumulator
- DoubleAdder
- LongAccumulator
- LongAdder
9.2 常用 API
方法 | 说明 |
---|---|
void add(long x) |
将当前的 value 加 x |
void increment() |
将当前的 value 加 1 |
void decrement() |
将当前的 value 减 1 |
long sum() |
返回当前值。特别注意:在没有并发更新 value 的情况下,sum 会返回一个精确值; 在存在并发的情况下,sum 并不保证返回精确值 |
long sumThenReset() |
获取当前 value,并将 value 重置为 0 |
void reset() |
将 value 重置为 0,可用于替代重新 new 一个 LongAdder 但此方法只可以在没有并发更新的情况下使用 |
long longValue |
LongAccumulator 和 DoubleAccumulator 中没有 long sum() 等价于 long sum() 方法 |
9.3 LongAdder 与 LongAccumulator
- LongAdder 只能用来计算加法、减法,且从零开始计算
- LongAccumulator 提供了自定义的函数操作
public class LongAdderDemo {
public static void main(String[] args) {
// LongAdder只能做加减法,不能做乘除法
LongAdder longAdder = new LongAdder();
longAdder.increment();
longAdder.increment();
longAdder.increment();
longAdder.decrement();
System.out.println(longAdder.longValue());
//LongAccumulator(LongBinaryOperator accumulatorFunction, long identity)
LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() {
@Override
public long applyAsLong(long left, long right) {
return left * right;
}
}, 5);
longAccumulator.accumulate(1); // 5*1=5
System.out.println(longAccumulator.longValue());
longAccumulator.accumulate(2); // 5*2=10
System.out.println(longAccumulator.longValue());
longAccumulator.accumulate(5); // 10*5=50
System.out.println(longAccumulator.longValue());
}
}
9.4 LongAdder 高性能对比
通过运行结果,我们可以发现执行效率:synchronized < AtomicInteger(AtomicLong)< LongAdder < LongAccumulater
class ClickNumber {
int number = 0;
//(1). 使用synchronized实现number++
public synchronized void add_synchronized() {
number++;
}
//(2). 使用AtomicInteger
AtomicInteger atomicInteger = new AtomicInteger();
public void add_atomicInteger() {
atomicInteger.incrementAndGet();
}
//(3). 使用AtomicLong
AtomicLong atomicLong = new AtomicLong();
public void add_atomicLong() {
atomicLong.incrementAndGet();
}
//(4). 使用LongAdder
LongAdder longAdder = new LongAdder();
public void add_longAdder() {
longAdder.increment();
}
//(5). 使用LongAccumulater
LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);
public void add_longAccumulater() {
longAccumulator.accumulate(1);
}
}
/**
* 需求: 50个线程,每个线程100W次,总点赞数出来
*/
public class AccumulatorCompareDemo {
public static final int _1W = 10000;
public static final int threadNumber = 50;
public static void main(String[] args) throws InterruptedException {
ClickNumber clickNumber = new ClickNumber();
long startTime;
long endTime;
CountDownLatch countDownLatch1 = new CountDownLatch(threadNumber);
CountDownLatch countDownLatch2 = new CountDownLatch(threadNumber);
CountDownLatch countDownLatch3 = new CountDownLatch(threadNumber);
CountDownLatch countDownLatch4 = new CountDownLatch(threadNumber);
CountDownLatch countDownLatch5 = new CountDownLatch(threadNumber);
startTime = System.currentTimeMillis();
for (int i = 1; i <= threadNumber; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * _1W; j++) {
clickNumber.add_synchronized();
}
} finally {
countDownLatch1.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch1.await();
endTime = System.currentTimeMillis();
System.out.println("使用synchronized---costTime:" + (endTime - startTime) + "ms\t\t最终计算结果:" + clickNumber.number);
startTime = System.currentTimeMillis();
for (int i = 1; i <= threadNumber; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * _1W; j++) {
clickNumber.add_atomicInteger();
}
} finally {
countDownLatch2.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch2.await();
endTime = System.currentTimeMillis();
System.out.println("使用AtomicInteger---costTime:" + (endTime - startTime) + "ms\t\t最终计算结果:" + clickNumber.number);
startTime = System.currentTimeMillis();
for (int i = 1; i <= threadNumber; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * _1W; j++) {
clickNumber.add_atomicLong();
}
} finally {
countDownLatch3.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch3.await();
endTime = System.currentTimeMillis();
System.out.println("使用AtomicLong---costTime:" + (endTime - startTime) + "ms\t\t最终计算结果:" + clickNumber.number);
startTime = System.currentTimeMillis();
for (int i = 1; i <= threadNumber; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * _1W; j++) {
clickNumber.add_longAdder();
}
} finally {
countDownLatch4.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch4.await();
endTime = System.currentTimeMillis();
System.out.println("使用LongAdder---costTime:" + (endTime - startTime) + "ms\t\t最终计算结果:" + clickNumber.number);
startTime = System.currentTimeMillis();
for (int i = 1; i <= threadNumber; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * _1W; j++) {
clickNumber.add_longAccumulater();
}
} finally {
countDownLatch5.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch5.await();
endTime = System.currentTimeMillis();
System.out.println("使用LongAccumulater---costTime:" + (endTime - startTime) + "ms\t\t最终计算结果:" + clickNumber.number);
}
}
10、AtomicLong 与 LongAdder
10.1 AtomicLong
我们知道 AtomicLong 是利用底层的 CAS 操作来提供并发性的,比如 addAndGet ()
方法:
- 下面方法调用了 Unsafe 类的 getAndAddLong 方法,该方法是一个 native 方法,它的逻辑是采用自旋的方式不断更新目标值,直到更新成功(也即乐观锁的实现模式)
- 在并发量比较低的情况下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发情况下,N 个线程同时进行自旋操作,N-1 个线程失败,导致 CPU 打满场景,此时 AtomicLong 的自旋会成为瓶颈
- 这就是 LongAdder 引入的初衷——解决高并发环境下 AtomictLong 的自旋瓶颈问题
public final long addAndGet(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
}
10.2 LongAdder 为什么快
LongAdder 在无竞争的情况,跟 AtomicLong 一样,对同一个 base 进行操作,当出现竞争关系时则采用化整为零的做法:从空间换时间,用一个数组 cells 将一个 value 拆分进这个数组 cells。
多个线程需要同时对 value 进行操作时候,可以对线程 id 进行 hash 得到 hash 值,再根据 hash 值映射到这个数组 cells 的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组 cells 的所有值和无竞争值 base 都加起来作为最终结果(分散热点)
sum()
会将所有 cell 数组中的 value 和 base 累加作为返回值,核心的思想就是将之前 AtomicLong 一个 value 的更新压力分散到多个 value 中去,从而降级更新热点
10.3 AtomicLong 与 LongAdder 对比
LongAdder 的 API 和 AtomicLong 的 API 还是有比较大的差异,而且 AtomicLong 提供的功能更丰富,尤其是 addAndGet()
、decrementAndGet()
、compareAndSet()
这些方法。addAndGet()、decrementAndGet() 除了单纯的做自增自减外,还可以立即获取增减后的值
而 LongAdder 则需要做同步控制才能精确获取增减后的值。如果业务需求需要精确的控制计数,则使用AtomicLong比较合适;
10.4 Striped64
目前我们所说十八罗汉增强已经提及了 16 个了,剩下的两位分别是:Striped64 和 Number

- 几个重要的成员变量
- base:类似于 AtomicLong 中全局的 value 值。在没有竞争的情况数据直接累加到 base 上,或者 cells 扩容时,也需要将数据写入到 base 上
- collide:表示扩容意向:false 一定不扩容;true 可能会扩容
- cellsBusy:初始化 cells 或者扩容 cells 需要获取锁。0 表示无所,1 表示其他线程已经持有了锁
- 几个重要的方法
- casCellsBusy():通过 CAS 操作修改 cellsBusy 的值,CAS 成功代表获取锁,返回 true
- NCPU:当前计算机 CPU 数量,Cell 数据扩容时会使用到
- getProbe():获取当前线程的 hash 值
- advanceProbe():重置当前线程的 hash 值
并且 Cell 是 java.util.concurrent.atomic 下 Striped64 下的一个内部类
10.5 LongAdder 部分源码解析
10.5.1 LongAdder 类中 add(long x) 方法
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
- 最初无竞争时,直接通过 casBase 进行更新 base 的处理
- 如果更新 base 失败后,首次新建一个 Cell[ ] 数组(默认长度是2)
- 当多个线程竞争同一个 Cell 比较激烈时,可能就要对 Cell[] 扩容
public void add(long x) {
//as是striped64中的cells数组属性
//b是striped64中的base属性
//v是当前线程hash到的cell中存储的值
//m是cells的长度减1,hash时作为掩码使用
//a时当前线程hash到的cell
Cell[] as; long b, v; int m; Cell a;
/**
首次首线程(as = cells) != null)一定是false,此时走casBase方法,以CAS的方式更新base值,
且只有当cas失败时,才会走到if中
条件1:cells不为空,说明出现过竞争,cell[]已创建
条件2:cas操作base失败,说明其他线程先一步修改了base正在出现竞争
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
//true无竞争 fasle表示竞争激烈,多个线程hash到同一个cell,可能要扩容
boolean uncontended = true;
/*
条件1:cells为空,说明正在出现竞争,上面是从条件2过来的,说明!casBase(b = base, b + x))=true
会通过调用longAccumulate(x, null, uncontended)新建一个数组,默认长度是2
条件2:默认会新建一个数组长度为2的数组,m = as.length - 1) < 0 应该不会出现,
条件3:当前线程所在的cell为空,说明当前线程还没有更新过cell,应初始化一个cell。
a = as[getProbe() & m]) == null,如果cell为空,进行一个初始化的处理
条件4:更新当前线程所在的cell失败,说明现在竞争很激烈,多个线程hash到同一个Cell,应扩容
(如果是cell中有一个线程操作,这个时候,通过a.cas(v = a.value, v + x)可以进行处理,返回的结果是true)
**/
if (as == null || (m = as.length - 1) < 0 ||
//getProbe( )方法返回的时线程中的threadLocalRandomProbe字段
//它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的(除非刻意修改它)
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
//调用Striped64中的方法处理
longAccumulate(x, null, uncontended);
}
}
10.5.2 Striped64 类中 longAccumulate(...) 方法
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
入参说明:
- long x:需要增加的值,一般默认都是 1
- LongBinaryOperator fn:默认传递的是 null
- boolean wasUncontended:竞争标识,如果是 false 代表有竞争。只有 cells 初始化之后,并且当前线程 CAS 竞争修改失败,才会是 false
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
-
线程 hash 值
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { //存储线程的probe值 int h; //如果getProbe()方法返回0,说明随机数未初始化 if ((h = getProbe()) == 0) { //这个if相当于给当前线程生成一个非0的hash值 //使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化 ThreadLocalRandom.current(); // force initialization //重新获取probe值,hash值被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为true h = getProbe(); //重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈 //wasUncontended竞争状态为true wasUncontended = true; }
-
刚刚初始化 Cell[ ] 数组(首次新建)
//CASE2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组 /* cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁 cells == as == null 是成立的 casCellsBusy:通过CAS操作修改cellsBusy的值,CAS成功代表获取锁, 返回true,第一次进来没人抢占cell单元格,肯定返回true **/ else if (cellsBusy == 0 && cells == as && casCellsBusy()) { //是否初始化的标记 boolean init = false; try { // Initialize table(新建cells) // 前面else if中进行了判断,这里再次判断,采用双端检索的机制 if (cells == as) { //如果上面条件都执行成功就会执行数组的初始化及赋值操作,Cell[] rs = new Cell[2]标识数组的长度为2 Cell[] rs = new Cell[2]; //rs[h & 1] = new Cell(x)表示创建一个新的cell元素,value是x值,默认为1 //h & 1 类似于我们之前hashmap常用到的计算散列桶index的算法, //通常都是hash&(table.len-1),同hashmap一个意思 //看这次的value是落在0还是1 rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; }
-
兜底(多个线程尝试 CAS 修改失败的线程会走这个分支)
//CASE3:cells正在进行初始化,则尝试直接在基数base上进行累加操作 //这种情况是cell中都CAS失败了,有一个兜底的方法 //该分支实现直接操作base基数,将值累加到base上, //也即其他线程正在初始化,多个线程正在更新base的值 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break;
-
Cell 数组不再为空且可能存在 Cell 数组扩容
for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // CASE1:cells已经初始化了 // 当前线程的hash值运算后映射得到的Cell单元为null,说明该Cell没有被使用 if ((a = as[(n - 1) & h]) == null) { //Cell[]数组没有正在扩容 if (cellsBusy == 0) { // Try to attach new Cell //先创建一个Cell Cell r = new Cell(x); // Optimistically create //尝试加锁,加锁后cellsBusy=1 if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; //将cell单元赋值到Cell[]数组上 //在有锁的情况下再检测一遍之前的判断 if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0;//释放锁 } if (created) break; continue; // Slot is now non-empty } } collide = false; } /** wasUncontended表示cells初始化后,当前线程竞争修改失败 wasUncontended=false,表示竞争激烈,需要扩容,这里只是重新设置了这个值为true, 紧接着执行advanceProbe(h)重置当前线程的hash,重新循环 */ else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //说明当前线程对应的数组中有了数据,也重置过hash值 //这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; //如果n大于CPU最大数量,不可扩容,并通过下面的h=advanceProbe(h)方法修改线程的probe再重新尝试 else if (n >= NCPU || cells != as) collide = false; //扩容标识设置为false,标识永远不会再扩容 //如果扩容意向collide是false则修改它为true,然后重新计算当前线程的hash值继续循环 else if (!collide) collide = true; //锁状态为0并且将锁状态修改为1(持有锁) else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale //按位左移1位来操作,扩容大小为之前容量的两倍 Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) //扩容后将之前数组的元素拷贝到新数组中 rs[i] = as[i]; cells = rs; } } finally { //释放锁设置cellsBusy=0,设置扩容状态,然后进行循环执行 cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); }
/**
1.LongAdder继承了Striped64类,来实现累加功能,它是实现高并发累加的工具类
2.Striped64的设计核心思路就是通过内部的分散计算来避免竞争
3.Striped64内部包含一个base和一个Cell[] cells数组,又叫hash表
4.没有竞争的情况下,要累加的数通过cas累加到base上;如果有竞争的话,
会将要累加的数累加到Cells数组中的某个cell元素里面
*/
abstract class Striped64 extends Number {
//CPU数量,即Cells数组的最大长度
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放Cell的hash表,大小为2的幂
transient volatile Cell[] cells;
/*
1.在开始没有竞争的情况下,将累加值累加到base;
2.在cells初始化的过程中,cells处于不可用的状态,这时候也会尝试将通过cas操作值累加到base
*/
transient volatile long base;
/*
cellsBusy,它有两个值0或1,它的作用是当要修改cells数组时加锁,
防止多线程同时修改cells数组(也称cells表),0为无锁,1位加锁,加锁的状况有三种:
(1). cells数组初始化的时候;
(2). cells数组扩容的时候;
(3).如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候;
*/
transient volatile int cellsBusy;
//低并发状态,还没有新建cell数组且写入进入base,刚好够用
//base罩得住,不用上cell数组
final boolean casBase(long cmp, long val) {
//当前对象,在base位置上,将base(类似于AtomicLong中全局的value值),将base=0(cmp)改为1(value)
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
//存储线程的probe值
int h;
//如果getProbe()方法返回0,说明随机数未初始化
if ((h = getProbe()) == 0) { //这个if相当于给当前线程生成一个非0的hash值
//使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化
ThreadLocalRandom.current(); // force initialization
//重新获取probe值,hash值被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为true
h = getProbe();
//重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈,wasUncontended竞争状态为true
wasUncontended = true;
}
//如果hash取模映射得到的Cell单元不是null,则为true,此值也可以看作是扩容意向
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { // CASE1:cells已经初始化了
// 当前线程的hash值运算后映射得到的Cell单元为null,说明该Cell没有被使用
if ((a = as[(n - 1) & h]) == null) {
//Cell[]数组没有正在扩容
if (cellsBusy == 0) { // Try to attach new Cell
//先创建一个Cell
Cell r = new Cell(x); // Optimistically create
//尝试加锁,加锁后cellsBusy=1
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j; //将cell单元赋值到Cell[]数组上
//在有锁的情况下再检测一遍之前的判断
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;//释放锁
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
/**
wasUncontended表示cells初始化后,当前线程竞争修改失败
wasUncontended=false,表示竞争激烈,需要扩容,这里只是重新设置了这个值为true,
紧接着执行advanceProbe(h)重置当前线程的hash,重新循环
*/
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//说明当前线程对应的数组中有了数据,也重置过hash值
//这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//如果n大于CPU最大数量,不可扩容,并通过下面的h=advanceProbe(h)方法修改线程的probe再重新尝试
else if (n >= NCPU || cells != as)
collide = false; //扩容标识设置为false,标识永远不会再扩容
//如果扩容意向collide是false则修改它为true,然后重新计算当前线程的hash值继续循环
else if (!collide)
collide = true;
//锁状态为0并且将锁状态修改为1(持有锁)
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
//按位左移1位来操作,扩容大小为之前容量的两倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
//扩容后将之前数组的元素拷贝到新数组中
rs[i] = as[i];
cells = rs;
}
} finally {
//释放锁设置cellsBusy=0,设置扩容状态,然后进行循环执行
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//CASE2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组
/*
cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁
cells == as == null 是成立的
casCellsBusy:通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true,第一次进来没人抢占cell单元格,肯定返回true
**/
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//是否初始化的标记
boolean init = false;
try { // Initialize table(新建cells)
// 前面else if中进行了判断,这里再次判断,采用双端检索的机制
if (cells == as) {
//如果上面条件都执行成功就会执行数组的初始化及赋值操作,Cell[] rs = new Cell[2]标识数组的长度为2
Cell[] rs = new Cell[2];
//rs[h & 1] = new Cell(x)表示创建一个新的cell元素,value是x值,默认为1
//h & 1 类似于我们之前hashmap常用到的计算散列桶index的算法,通常都是hash&(table.len-1),同hashmap一个意思
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//CASE3:cells正在进行初始化,则尝试直接在基数base上进行累加操作
//这种情况是cell中都CAS失败了,有一个兜底的方法
//该分支实现直接操作base基数,将值累加到base上,也即其他线程正在初始化,多个线程正在更新base的值
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
}
10.5.3 LongAdder 类中 sum() 方法
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
为何 sum() 方法在高并发情况下并不精确,主要有以下三点原因:
- sum() 执行时,并没有限制对 base 和 cells 的更新。所以 LongAdder 不是强一致性的,它是最终一致性的。
- 首先,最终返回的 sum 局部变量,初始被复制为 base,而最终返回时,很可能 base 已经被更新了,而此时局部变量 sum 不会更新,造成不一致。
- 其次,这里对 cell 的读取也无法保证是最后一次写入的值。所以,sum 方法在没有并发的情况下,可以获得正确的结果。
1 条评论