Java8 教程

Java8 原子值 Atomic

从 Java5 开始,java.util.concurrent.atomic 包提供了支持原子变量类,这些类可以在多线程环境中提供无锁数据访问和线程安全的更新。原子变量是不可变的,因此,多线程环境中的访问是线程安全的,不需要额外的同步。

原子变量类主要有以下几种:

  • AtomicBoolean  表示一个可以原子更新的 boolean 值。

  • AtomicInteger  表示一个可以原子更新的 int 值。

  • AtomicLong  表示一个可以原子更新的 long 值。

  • AtomicReference  表示一个可以原子更新的对象引用。

例如:你可以使用如下代码安全地生成一组数字。

public static AtomicLong nextNumber = new AtomlcLong();

// 在某些线程中...
long id = nextNumber.incrementAndGet();

上例中的 incrementAndGet() 方法会自动将 AtomicLong 的值加 1,并返回增加后的值(注意:该操作是原子的,要么全部成功,要么全部失败)。该方法的功能类似如下代码:

//1.获取值
long value = nextNumber.get();
//2.值加1
value += 1;
//3.设置值
nextNumber.set(value);

AtomicLong 类可以保证即便有多个线程同时并发访问 AtomicLong 的同一个实例,也能够计算并返回正确的值。例如:

package com.hxstrive.jdk8.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

/**
 * AtomicLong 类
 * @author hxstrive.com
 */
public class ConcurrentDemo1 {

    public static void main(String[] args) throws Exception {
        final AtomicLong atomicLong = new AtomicLong(0);

        // 创建一个线程池,创建5个线程对AtomicLong进行操作
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for(int i = 0; i < 5; i++) {
            executorService.execute(() -> {
                try {
                    for(int j = 0; j < 100; j++) {
                        atomicLong.incrementAndGet();
                        Thread.sleep((long) (Math.random() * 10));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }

        // 等待所有线程执行完毕,关闭线程池
        executorService.shutdown();
        while(!executorService.isTerminated()) {
            Thread.sleep(100);
        }

        // 输出结果
        System.out.println(500 == atomicLong.get()); //true
    }

}

compareAndset() 方法

Java5 中提供了很多设置值、增加值、减少值的原子操作,但是如果你想要进行更复杂的更新操作(使用 AtomicLong 保存最大值),你就必须使用 compareAndset() 方法。

例如:假设你想要追踪由不同线程所监测的最大值,那么下面的代码是行不通的。

package com.hxstrive.jdk8.concurrent;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

/**
 * AtomicLong 类
 * @author hxstrive.com
 */
public class ConcurrentDemo2 {
    private static final AtomicLong atomicLong = new AtomicLong(0);

    static class MyTask extends Thread {
        private CountDownLatch countDownLatch;
        private int max; // 最大值

        public MyTask(CountDownLatch countDownLatch, int max) {
            this.countDownLatch = countDownLatch;
            this.max = max;
        }

        @Override
        public void run() {
            try {
                StringBuilder stringBuilder = new StringBuilder();
                stringBuilder.append(Thread.currentThread().getName()).append(", max=").append(max)
                        .append(", before=").append(atomicLong.get());

                // 更新值
                // atomicLong.set(Math.max(atomicLong.get(), max));
                // 等价于
                long newVal = Math.max(atomicLong.get(), max);
                // 在这里模拟一个超时时间,便于快速复现问题
                if(max == 100) {
                    Thread.sleep(50);
                }
                atomicLong.set(newVal);

                stringBuilder.append(", after=").append(atomicLong.get());
                System.out.println(stringBuilder.toString());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);

        // 启动两个线程
        MyTask task1 = new MyTask(countDownLatch, 100);
        MyTask task2 = new MyTask(countDownLatch, 200);

        task1.start();
        task2.start();

        countDownLatch.await();

        // 输出结果
        System.out.println(atomicLong.get());
    }

}

这个更新过程不是原子性的,因为存在三步:

(1)获取 largest 的当前值;

(2)获取最大值;

(3)将最大值设置到 largest;

多次运行上面示例,可能会出现如下错误情况:

Thread-1, max=200, before=0, after=200
Thread-0, max=100, before=0, after=100
100

上面输出中,Thread-0 和 Thread-1 从 AtomicLong 获取到的值均为 0,线程 Thread-1 先执行完,将 AtomicLong 设置为 200,Thread-0 最后执行完,又将 200 改为了 100,最终结果是错误。

相反,你应该在一个循环中使用 compareAndset() 方法来计算新值,compareAndSet() 方法定义:

public final boolean compareAndSet(long expect, long update)

如果当前值等于预期值(expect),则使用原子操作将当前值设置为给定的更新值(update)。参数说明:

  • expect - 预期值,使用该值与当前值进行比较。

  • update - 如果当前值等于预期值,要更新的新值。

如果执行成功,则返回 true。如果当前值不等于预期值,则返回 false。示例:

@Override
public void run() {
    try {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append(Thread.currentThread().getName()).append(", max=").append(max)
                .append(", before=").append(atomicLong.get());

        // 更新值
        long oldValue, newValue;
        do {
            // 获取 largest 的当前值
            oldValue = atomicLong.get();
            // 获取最大值
            newValue = Math.max(oldValue, max);
            // 在这里模拟一个超时时间,便于快速复现问题
            if(max == 100) {
                Thread.sleep(50);
            }
            System.out.println("oldValue=" + oldValue + ", current=" + atomicLong.get());

            // 下面的 compareAndSet() 方法将判断旧值是否发生变化,如果已经发生变化,则不允许修改
            // 然后重试,这不就是乐观锁的思路吗?
        } while (!atomicLong.compareAndSet(oldValue,newValue));

        stringBuilder.append(", after=").append(atomicLong.get());
        System.out.println(stringBuilder.toString());
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        countDownLatch.countDown();
    }
}

如果 atomicLong 的值被另一个线程更新了,那么 compareAndSet() 方法会返回 false,设置新值失败。此时,程序会再次进入循环,读取更新后的值并试图改变它。最终,它成功地将已有值替换为了新的值。虽然这看上去很烦人,但是 compareAndset 方法会映射为一个底层的处理器方法,这远比使用一个锁要快得多。

运行上面示例,输出日志如下:

oldValue=0, current=0
Thread-1, max=200, before=0, after=200
oldValue=0, current=200 // 注意:值已经被 Thread-1 线程修改了
oldValue=200, current=200 // 重试,两则相等,则可以修改
Thread-0, max=100, before=0, after=200
200 // 最终结果

Lambda 表达式应用

在 Java8 中,你不必再编写循环逻辑( Java API 已经帮你做了)。只需要提供一个用来更新值的 Lambda 表达式,更新操作就会自动完成。例如:

@Override
public void run() {
    try {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append(Thread.currentThread().getName()).append(", max=").append(max)
                .append(", before=").append(atomicLong.get());

        //atomicLong.updateAndGet(x -> Math.max(x, max));
        // 或者
        atomicLong.accumulateAndGet(max, Math::max);

        stringBuilder.append(", after=").append(atomicLong.get());
        System.out.println(stringBuilder.toString());
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        countDownLatch.countDown();
    }
}

updateAndGet() 方法源码如下:

public final long updateAndGet(LongUnaryOperator updateFunction) {
    long prev = get(), next = 0L;
    for (boolean haveNext = false;;) {
        if (!haveNext)
            // prev 作为我们 Lambda 表达式的参数 x 的值
            // 然后使用 Math.max 来获取 x 和 max 之间的最大值
            next = updateFunction.applyAsLong(prev);

        // 如果当前值 == expectedValue,可能会原子地将值设置为 newValue,
        // 其内存效果由 VarHandle.weakCompareAndSet 指定。
        if (weakCompareAndSetVolatile(prev, next))
            return next;

        // 判断 AtomicLong 上次获取的值和本次获取的值是否一致
        // 如果一致,则说明没有改变,不要执行 applyAsLong
        // 如果不一致,则需要执行 applyAsLong 重新判断最大值
        haveNext = (prev == (prev = get()));
    }
}

accumulateAndGet() 方法源码如下:

public final long accumulateAndGet(long x,
                                   LongBinaryOperator accumulatorFunction) {
    long prev = get(), next = 0L;
    for (boolean haveNext = false;;) {
        if (!haveNext)
            // 使用我们传递的 Math::max 来从旧值和新值中获取最大值
            next = accumulatorFunction.applyAsLong(prev, x);

        // 如果当前值 == expectedValue,可能会原子地将值设置为 newValue,
        // 其内存效果由 VarHandle.weakCompareAndSet 指定。
        if (weakCompareAndSetVolatile(prev, next))
            return next;

        // 判断 AtomicLong 上次获取的值和本次获取的值是否一致
        // 如果一致,则说明没有改变,不要执行 applyAsLong
        // 如果不一致,则需要执行 applyAsLong 重新判断最大值
        haveNext = (prev == (prev = get()));
    }
}

注意:

(1)Java8 还提供了返回原始值的 getAndUpdate 方法和 getAndAccumulate 方法。

(2)AtomicInteger、AtomicIntegerArray、AtomicIntegerFieldUpdater、AtomicLongArray、AtomicLongFieldUpdater、AtoicReference、AtomicReferenceArray 和 AtomicReferenceFieldUpdater 类都提供了这些方法。

说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号