从 Java5 开始,java.util.concurrent.atomic 包提供了支持原子变量类,这些类可以在多线程环境中提供无锁数据访问和线程安全的更新。原子变量是不可变的,因此,多线程环境中的访问是线程安全的,不需要额外的同步。
原子变量类主要有以下几种:
AtomicBoolean 表示一个可以原子更新的 boolean 值。
AtomicInteger 表示一个可以原子更新的 int 值。
AtomicLong 表示一个可以原子更新的 long 值。
AtomicReference 表示一个可以原子更新的对象引用。
例如:你可以使用如下代码安全地生成一组数字。
public static AtomicLong nextNumber = new AtomlcLong(); // 在某些线程中... long id = nextNumber.incrementAndGet();
上例中的 incrementAndGet() 方法会自动将 AtomicLong 的值加 1,并返回增加后的值(注意:该操作是原子的,要么全部成功,要么全部失败)。该方法的功能类似如下代码:
//1.获取值 long value = nextNumber.get(); //2.值加1 value += 1; //3.设置值 nextNumber.set(value);
AtomicLong 类可以保证即便有多个线程同时并发访问 AtomicLong 的同一个实例,也能够计算并返回正确的值。例如:
package com.hxstrive.jdk8.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
/**
* AtomicLong 类
* @author hxstrive.com
*/
public class ConcurrentDemo1 {
public static void main(String[] args) throws Exception {
final AtomicLong atomicLong = new AtomicLong(0);
// 创建一个线程池,创建5个线程对AtomicLong进行操作
ExecutorService executorService = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5; i++) {
executorService.execute(() -> {
try {
for(int j = 0; j < 100; j++) {
atomicLong.incrementAndGet();
Thread.sleep((long) (Math.random() * 10));
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
// 等待所有线程执行完毕,关闭线程池
executorService.shutdown();
while(!executorService.isTerminated()) {
Thread.sleep(100);
}
// 输出结果
System.out.println(500 == atomicLong.get()); //true
}
}Java5 中提供了很多设置值、增加值、减少值的原子操作,但是如果你想要进行更复杂的更新操作(使用 AtomicLong 保存最大值),你就必须使用 compareAndset() 方法。
例如:假设你想要追踪由不同线程所监测的最大值,那么下面的代码是行不通的。
package com.hxstrive.jdk8.concurrent;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
/**
* AtomicLong 类
* @author hxstrive.com
*/
public class ConcurrentDemo2 {
private static final AtomicLong atomicLong = new AtomicLong(0);
static class MyTask extends Thread {
private CountDownLatch countDownLatch;
private int max; // 最大值
public MyTask(CountDownLatch countDownLatch, int max) {
this.countDownLatch = countDownLatch;
this.max = max;
}
@Override
public void run() {
try {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(Thread.currentThread().getName()).append(", max=").append(max)
.append(", before=").append(atomicLong.get());
// 更新值
// atomicLong.set(Math.max(atomicLong.get(), max));
// 等价于
long newVal = Math.max(atomicLong.get(), max);
// 在这里模拟一个超时时间,便于快速复现问题
if(max == 100) {
Thread.sleep(50);
}
atomicLong.set(newVal);
stringBuilder.append(", after=").append(atomicLong.get());
System.out.println(stringBuilder.toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
}
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
// 启动两个线程
MyTask task1 = new MyTask(countDownLatch, 100);
MyTask task2 = new MyTask(countDownLatch, 200);
task1.start();
task2.start();
countDownLatch.await();
// 输出结果
System.out.println(atomicLong.get());
}
}这个更新过程不是原子性的,因为存在三步:
(1)获取 largest 的当前值;
(2)获取最大值;
(3)将最大值设置到 largest;
多次运行上面示例,可能会出现如下错误情况:
Thread-1, max=200, before=0, after=200 Thread-0, max=100, before=0, after=100 100
上面输出中,Thread-0 和 Thread-1 从 AtomicLong 获取到的值均为 0,线程 Thread-1 先执行完,将 AtomicLong 设置为 200,Thread-0 最后执行完,又将 200 改为了 100,最终结果是错误。
相反,你应该在一个循环中使用 compareAndset() 方法来计算新值,compareAndSet() 方法定义:
public final boolean compareAndSet(long expect, long update)
如果当前值等于预期值(expect),则使用原子操作将当前值设置为给定的更新值(update)。参数说明:
expect - 预期值,使用该值与当前值进行比较。
update - 如果当前值等于预期值,要更新的新值。
如果执行成功,则返回 true。如果当前值不等于预期值,则返回 false。示例:
@Override
public void run() {
try {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(Thread.currentThread().getName()).append(", max=").append(max)
.append(", before=").append(atomicLong.get());
// 更新值
long oldValue, newValue;
do {
// 获取 largest 的当前值
oldValue = atomicLong.get();
// 获取最大值
newValue = Math.max(oldValue, max);
// 在这里模拟一个超时时间,便于快速复现问题
if(max == 100) {
Thread.sleep(50);
}
System.out.println("oldValue=" + oldValue + ", current=" + atomicLong.get());
// 下面的 compareAndSet() 方法将判断旧值是否发生变化,如果已经发生变化,则不允许修改
// 然后重试,这不就是乐观锁的思路吗?
} while (!atomicLong.compareAndSet(oldValue,newValue));
stringBuilder.append(", after=").append(atomicLong.get());
System.out.println(stringBuilder.toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}如果 atomicLong 的值被另一个线程更新了,那么 compareAndSet() 方法会返回 false,设置新值失败。此时,程序会再次进入循环,读取更新后的值并试图改变它。最终,它成功地将已有值替换为了新的值。虽然这看上去很烦人,但是 compareAndset 方法会映射为一个底层的处理器方法,这远比使用一个锁要快得多。
运行上面示例,输出日志如下:
oldValue=0, current=0 Thread-1, max=200, before=0, after=200 oldValue=0, current=200 // 注意:值已经被 Thread-1 线程修改了 oldValue=200, current=200 // 重试,两则相等,则可以修改 Thread-0, max=100, before=0, after=200 200 // 最终结果
在 Java8 中,你不必再编写循环逻辑( Java API 已经帮你做了)。只需要提供一个用来更新值的 Lambda 表达式,更新操作就会自动完成。例如:
@Override
public void run() {
try {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(Thread.currentThread().getName()).append(", max=").append(max)
.append(", before=").append(atomicLong.get());
//atomicLong.updateAndGet(x -> Math.max(x, max));
// 或者
atomicLong.accumulateAndGet(max, Math::max);
stringBuilder.append(", after=").append(atomicLong.get());
System.out.println(stringBuilder.toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}updateAndGet() 方法源码如下:
public final long updateAndGet(LongUnaryOperator updateFunction) {
long prev = get(), next = 0L;
for (boolean haveNext = false;;) {
if (!haveNext)
// prev 作为我们 Lambda 表达式的参数 x 的值
// 然后使用 Math.max 来获取 x 和 max 之间的最大值
next = updateFunction.applyAsLong(prev);
// 如果当前值 == expectedValue,可能会原子地将值设置为 newValue,
// 其内存效果由 VarHandle.weakCompareAndSet 指定。
if (weakCompareAndSetVolatile(prev, next))
return next;
// 判断 AtomicLong 上次获取的值和本次获取的值是否一致
// 如果一致,则说明没有改变,不要执行 applyAsLong
// 如果不一致,则需要执行 applyAsLong 重新判断最大值
haveNext = (prev == (prev = get()));
}
}accumulateAndGet() 方法源码如下:
public final long accumulateAndGet(long x,
LongBinaryOperator accumulatorFunction) {
long prev = get(), next = 0L;
for (boolean haveNext = false;;) {
if (!haveNext)
// 使用我们传递的 Math::max 来从旧值和新值中获取最大值
next = accumulatorFunction.applyAsLong(prev, x);
// 如果当前值 == expectedValue,可能会原子地将值设置为 newValue,
// 其内存效果由 VarHandle.weakCompareAndSet 指定。
if (weakCompareAndSetVolatile(prev, next))
return next;
// 判断 AtomicLong 上次获取的值和本次获取的值是否一致
// 如果一致,则说明没有改变,不要执行 applyAsLong
// 如果不一致,则需要执行 applyAsLong 重新判断最大值
haveNext = (prev == (prev = get()));
}
}注意:
(1)Java8 还提供了返回原始值的 getAndUpdate 方法和 getAndAccumulate 方法。
(2)AtomicInteger、AtomicIntegerArray、AtomicIntegerFieldUpdater、AtomicLongArray、AtomicLongFieldUpdater、AtoicReference、AtomicReferenceArray 和 AtomicReferenceFieldUpdater 类都提供了这些方法。