从 Java5 开始,java.util.concurrent.atomic 包提供了支持原子变量类,这些类可以在多线程环境中提供无锁数据访问和线程安全的更新。原子变量是不可变的,因此,多线程环境中的访问是线程安全的,不需要额外的同步。
原子变量类主要有以下几种:
AtomicBoolean 表示一个可以原子更新的 boolean 值。
AtomicInteger 表示一个可以原子更新的 int 值。
AtomicLong 表示一个可以原子更新的 long 值。
AtomicReference 表示一个可以原子更新的对象引用。
例如:你可以使用如下代码安全地生成一组数字。
public static AtomicLong nextNumber = new AtomlcLong(); // 在某些线程中... long id = nextNumber.incrementAndGet();
上例中的 incrementAndGet() 方法会自动将 AtomicLong 的值加 1,并返回增加后的值(注意:该操作是原子的,要么全部成功,要么全部失败)。该方法的功能类似如下代码:
//1.获取值 long value = nextNumber.get(); //2.值加1 value += 1; //3.设置值 nextNumber.set(value);
AtomicLong 类可以保证即便有多个线程同时并发访问 AtomicLong 的同一个实例,也能够计算并返回正确的值。例如:
package com.hxstrive.jdk8.concurrent; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; /** * AtomicLong 类 * @author hxstrive.com */ public class ConcurrentDemo1 { public static void main(String[] args) throws Exception { final AtomicLong atomicLong = new AtomicLong(0); // 创建一个线程池,创建5个线程对AtomicLong进行操作 ExecutorService executorService = Executors.newFixedThreadPool(5); for(int i = 0; i < 5; i++) { executorService.execute(() -> { try { for(int j = 0; j < 100; j++) { atomicLong.incrementAndGet(); Thread.sleep((long) (Math.random() * 10)); } } catch (Exception e) { e.printStackTrace(); } }); } // 等待所有线程执行完毕,关闭线程池 executorService.shutdown(); while(!executorService.isTerminated()) { Thread.sleep(100); } // 输出结果 System.out.println(500 == atomicLong.get()); //true } }
Java5 中提供了很多设置值、增加值、减少值的原子操作,但是如果你想要进行更复杂的更新操作(使用 AtomicLong 保存最大值),你就必须使用 compareAndset() 方法。
例如:假设你想要追踪由不同线程所监测的最大值,那么下面的代码是行不通的。
package com.hxstrive.jdk8.concurrent; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; /** * AtomicLong 类 * @author hxstrive.com */ public class ConcurrentDemo2 { private static final AtomicLong atomicLong = new AtomicLong(0); static class MyTask extends Thread { private CountDownLatch countDownLatch; private int max; // 最大值 public MyTask(CountDownLatch countDownLatch, int max) { this.countDownLatch = countDownLatch; this.max = max; } @Override public void run() { try { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(Thread.currentThread().getName()).append(", max=").append(max) .append(", before=").append(atomicLong.get()); // 更新值 // atomicLong.set(Math.max(atomicLong.get(), max)); // 等价于 long newVal = Math.max(atomicLong.get(), max); // 在这里模拟一个超时时间,便于快速复现问题 if(max == 100) { Thread.sleep(50); } atomicLong.set(newVal); stringBuilder.append(", after=").append(atomicLong.get()); System.out.println(stringBuilder.toString()); } catch (Exception e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } } } public static void main(String[] args) throws Exception { CountDownLatch countDownLatch = new CountDownLatch(2); // 启动两个线程 MyTask task1 = new MyTask(countDownLatch, 100); MyTask task2 = new MyTask(countDownLatch, 200); task1.start(); task2.start(); countDownLatch.await(); // 输出结果 System.out.println(atomicLong.get()); } }
这个更新过程不是原子性的,因为存在三步:
(1)获取 largest 的当前值;
(2)获取最大值;
(3)将最大值设置到 largest;
多次运行上面示例,可能会出现如下错误情况:
Thread-1, max=200, before=0, after=200 Thread-0, max=100, before=0, after=100 100
上面输出中,Thread-0 和 Thread-1 从 AtomicLong 获取到的值均为 0,线程 Thread-1 先执行完,将 AtomicLong 设置为 200,Thread-0 最后执行完,又将 200 改为了 100,最终结果是错误。
相反,你应该在一个循环中使用 compareAndset() 方法来计算新值,compareAndSet() 方法定义:
public final boolean compareAndSet(long expect, long update)
如果当前值等于预期值(expect),则使用原子操作将当前值设置为给定的更新值(update)。参数说明:
expect - 预期值,使用该值与当前值进行比较。
update - 如果当前值等于预期值,要更新的新值。
如果执行成功,则返回 true。如果当前值不等于预期值,则返回 false。示例:
@Override public void run() { try { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(Thread.currentThread().getName()).append(", max=").append(max) .append(", before=").append(atomicLong.get()); // 更新值 long oldValue, newValue; do { // 获取 largest 的当前值 oldValue = atomicLong.get(); // 获取最大值 newValue = Math.max(oldValue, max); // 在这里模拟一个超时时间,便于快速复现问题 if(max == 100) { Thread.sleep(50); } System.out.println("oldValue=" + oldValue + ", current=" + atomicLong.get()); // 下面的 compareAndSet() 方法将判断旧值是否发生变化,如果已经发生变化,则不允许修改 // 然后重试,这不就是乐观锁的思路吗? } while (!atomicLong.compareAndSet(oldValue,newValue)); stringBuilder.append(", after=").append(atomicLong.get()); System.out.println(stringBuilder.toString()); } catch (Exception e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }
如果 atomicLong 的值被另一个线程更新了,那么 compareAndSet() 方法会返回 false,设置新值失败。此时,程序会再次进入循环,读取更新后的值并试图改变它。最终,它成功地将已有值替换为了新的值。虽然这看上去很烦人,但是 compareAndset 方法会映射为一个底层的处理器方法,这远比使用一个锁要快得多。
运行上面示例,输出日志如下:
oldValue=0, current=0 Thread-1, max=200, before=0, after=200 oldValue=0, current=200 // 注意:值已经被 Thread-1 线程修改了 oldValue=200, current=200 // 重试,两则相等,则可以修改 Thread-0, max=100, before=0, after=200 200 // 最终结果
在 Java8 中,你不必再编写循环逻辑( Java API 已经帮你做了)。只需要提供一个用来更新值的 Lambda 表达式,更新操作就会自动完成。例如:
@Override public void run() { try { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(Thread.currentThread().getName()).append(", max=").append(max) .append(", before=").append(atomicLong.get()); //atomicLong.updateAndGet(x -> Math.max(x, max)); // 或者 atomicLong.accumulateAndGet(max, Math::max); stringBuilder.append(", after=").append(atomicLong.get()); System.out.println(stringBuilder.toString()); } catch (Exception e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }
updateAndGet() 方法源码如下:
public final long updateAndGet(LongUnaryOperator updateFunction) { long prev = get(), next = 0L; for (boolean haveNext = false;;) { if (!haveNext) // prev 作为我们 Lambda 表达式的参数 x 的值 // 然后使用 Math.max 来获取 x 和 max 之间的最大值 next = updateFunction.applyAsLong(prev); // 如果当前值 == expectedValue,可能会原子地将值设置为 newValue, // 其内存效果由 VarHandle.weakCompareAndSet 指定。 if (weakCompareAndSetVolatile(prev, next)) return next; // 判断 AtomicLong 上次获取的值和本次获取的值是否一致 // 如果一致,则说明没有改变,不要执行 applyAsLong // 如果不一致,则需要执行 applyAsLong 重新判断最大值 haveNext = (prev == (prev = get())); } }
accumulateAndGet() 方法源码如下:
public final long accumulateAndGet(long x, LongBinaryOperator accumulatorFunction) { long prev = get(), next = 0L; for (boolean haveNext = false;;) { if (!haveNext) // 使用我们传递的 Math::max 来从旧值和新值中获取最大值 next = accumulatorFunction.applyAsLong(prev, x); // 如果当前值 == expectedValue,可能会原子地将值设置为 newValue, // 其内存效果由 VarHandle.weakCompareAndSet 指定。 if (weakCompareAndSetVolatile(prev, next)) return next; // 判断 AtomicLong 上次获取的值和本次获取的值是否一致 // 如果一致,则说明没有改变,不要执行 applyAsLong // 如果不一致,则需要执行 applyAsLong 重新判断最大值 haveNext = (prev == (prev = get())); } }
注意:
(1)Java8 还提供了返回原始值的 getAndUpdate 方法和 getAndAccumulate 方法。
(2)AtomicInteger、AtomicIntegerArray、AtomicIntegerFieldUpdater、AtomicLongArray、AtomicLongFieldUpdater、AtoicReference、AtomicReferenceArray 和 AtomicReferenceFieldUpdater 类都提供了这些方法。