Java8 教程

Java8 LongAdder 和 DoubleAdder类

当你有大量线程访问同一个原子值时,由于乐观锁更新需要太多次重试,因此会导致性能严重下降。

什么是乐观锁?

乐观锁(Optimistic Locking)是一种思想,它认为多用户并发修改同一资源时,发生并发冲突的概率较小,因此在进行数据处理时不会直接锁定数据,而是在数据提交更新时,才会正式对数据的冲突与否进行检测。如果发现冲突,则返回用户错误的信息,让用户决定如何重新尝试。

乐观锁的实现通常依赖于数据版本(Version)记录机制。当读取数据时,将此版本号一同读出,数据每更新一次,对此版本号加一。当我们提交更新的时候,判断数据库表对应记录的当前版本信息与之前读取到的版本号进行比较,如果数据库表当前版本号与我们之前读取到的版本号一致,则予以更新,否则认为是过期数据。

乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。在Java中,实现乐观锁可以通过多种方法,例如使用CAS(Compare-and-Swap)操作,或者使用版本号机制等。

为了解决乐观锁性能问题,Java8 提供了 LongAdderDoubleAdder 来解决该问题。

LongAdder 类

LongAdder 旨在解决高并发环境下使用 AtomicLong 时因 CAS 操作失败导致的大量线程自旋和 CPU 资源浪费问题。

CAS(Compare-and-Swap)是一种用于实现多线程同步的原子操作。CAS操作包含三个参数:一个内存位置(V)、期望的原值(A)和新值(B)。当且仅当该内存位置的值与期望的原值相同时,才将该内存位置的值设置为新值。如果内存位置的值与期望的原值不相同,则CAS操作失败,并返回当前内存位置的值。

LongAdder 通过将单个 long 类型的变量拆分为多个 Cell,让多个线程分别竞争这些 Cell,从而降低单个变量的竞争线程数目,提高并发性能。

LongAdder 中,多个线程可以更新不同的被加数,当线程数量增加时会自动增加新的被加数。由于通常情况下都是直到所有工作完成后才需要总和值,所以这种方法效率很高。它所能带来的性能提升十分可观。如下图:

2fdfb1301fcb4ed35bb44a7cfd731478_1720582683768-bc6c11c9-98a7-44d7-ad8f-de2f152701f9.png

上图中,Cell 是 Java8 内部工具类,用于减少多线程间的缓存竞争和提高性能。当线程要对 LongAdder 进行加值/递增操作时,会根据线程的 hashCode 取模,获取到对应的 Cell,然后对该 Cell 进行操作。这样就可以避免多个线程竞争同一个资源,提高了并发(注意:如果线程过多,可能会导致多个线程竞争同一个 Cell 的情况)。

下面自定义一个 LongAdder 类,来模拟上面介绍的实现原理,方便读者理解。如下:

package com.hxstrive.jdk8.concurrent;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

/**
 * AtomicLong 类
 * @author hxstrive.com
 */
public class ConcurrentDemo5 {
    private static final MyLongAdder myLongAdder = new MyLongAdder();

    // 模拟自己的 Adder
    static class MyLongAdder {
        // 存放线程 hashCode 和具体指的关系
        private static final Map<Integer,Long> value = new ConcurrentHashMap<>();

        // 递增1
        public void increment() {
            add(1);
        }

        // 递增N
        public void add(int i) {
            Integer hashCode = Thread.currentThread().hashCode();
            long old = value.getOrDefault(hashCode, 0L);
            value.put(hashCode, old + i);
        }

        // 获取和值,类似 AtomicLong 的 get 方法
        public long sum() {
            return value.values().stream().mapToLong(x -> x).sum();
        }
    }

    static class MyTask extends Thread {
        private CountDownLatch countDownLatch;
        private int max; // 最大值

        public MyTask(CountDownLatch countDownLatch, int max) {
            this.countDownLatch = countDownLatch;
            this.max = max;
        }

        @Override
        public void run() {
            try {
                StringBuilder stringBuilder = new StringBuilder();
                stringBuilder.append(Thread.currentThread().getName()).append(", max=").append(max)
                        .append(", before=").append(myLongAdder.sum());

                // 添加值
                myLongAdder.increment();
                myLongAdder.add(max);

                stringBuilder.append(", after=").append(myLongAdder.sum());
                System.out.println(stringBuilder.toString());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for(int i = 0; i < 5; i++) {
            new MyTask(countDownLatch, 100 * (i + 1)).start();
        }
        countDownLatch.await();
        System.out.println(myLongAdder.sum());
    }

}

上面示例,通过一个在内部维持一个 Map<Integer,Long> 数据结构来分别存放各个线程的值,来模拟实现 LongAdder 的工作原理。

如果你的环境中存在高度竞争,那么就应当用 LongAdder 来代替 AtomicLong。

注意:二者之间的方法命名稍有不同。Increment 方法用来将计数器自增1,add 方法用来加上某个数值,sum 方法用来获取总和值。例如:

package com.hxstrive.jdk8.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.LongAdder;

/**
 * LongAdder 示例
 * @author HuangXin
 * @since 1.0.0  2024/7/9 10:54
 */
public class ConcurrentDemo6 {

    public static void main(String[] args) {
        final LongAdder longAdder = new LongAdder();
        final ExecutorService executor = Executors.newFixedThreadPool(10);

        for(int i = 0; i < 10; i++ ) {
            executor.execute(() -> {
                for(int j = 0; j < 100; j++) {
                    longAdder.increment();
                }
            });
        }

        executor.shutdown();
        while (!executor.isTerminated()){
            // do nothing
        }
        System.out.println(longAdder.sum());
    }

}

注意:LongAdder 的 increment 方法不会返回原始值,如果返回原始值,则需要遍历所有的内部 Cell,然后将他们的值累加起来,这样就把我们提升的一点性能牺牲掉了。

下面是 LongAdder 的部分源码:

// java.util.concurrent.atomic.Striped64
// java.util.concurrent.atomic.Striped64 是 Java 8 引入的一个内部类,
// 用于在并发环境中实现高性能的计数和累加操作。
// 其设计思路主要是通过将竞争分散到多个单元格(Cell)来减少线程间的冲突,从而提高并发性能。
/**
 * Table of cells. When non-null, size is a power of 2.
 * 用来存放具体的值,数组所有 Cell 值的 “和” 才是最终的值
 */
transient volatile Cell[] cells;

// java.util.concurrent.atomic.LongAdder
public class LongAdder extends Striped64 implements Serializable {
    //...
    public void add(long x) {
        Cell[] cs; long b, v; int m; Cell c;
        if ((cs = cells) != null || !casBase(b = base, b + x)) {
            int index = getProbe();
            boolean uncontended = true;
            if (cs == null || (m = cs.length - 1) < 0 ||
                (c = cs[index & m]) == null ||
                !(uncontended = c.cas(v = c.value, v + x)))
                longAccumulate(x, null, uncontended, index);
        }
    }
    
    public long sum() {
        Cell[] cs = cells;
        long sum = base;
        if (cs != null) {
            for (Cell c : cs)
                if (c != null)
                    sum += c.value;
        }
        return sum;
    }
    //...
}

DoubleAdder 类

DoubleAdder 类也是在 Java8 引入的,放在 java.util.concurrent.atomic 包中的一个工具类,用于多线程环境下高效地执行 double 类型的累加操作。实现思路和 LongAdder 一样,采用了“分开计算最后汇总”的思路,将累加操作分散到多个 Cell 中进行,以减少线程间的竞争,提高并发性能。

DoubleAdder 类通常用于需要高并发地累加 double 类型数值的场景,如统计信息、计数器等。

DoubleAdder 类继承了 Number 类,并实现了 Serializable 接口,因此它可以被序列化。作为一个原子类,它提供了线程安全的 double 类型数值的累加功能。

DoubleAdder 类提供了一个无参数的构造方法 DoubleAdder(),用于创建一个初始总和为零的新加法器。如下:

/**
 * Creates a new adder with initial sum of zero.
 */
public DoubleAdder() {
}

DoubleAdder 类主要方法:

  • void add(double x):向加法器中添加给定的 double 值,这是 DoubleAdder 类最常用的方法。

  • double sum():返回当前累加的总和。这是一个最终一致性操作,即它返回的是加法器在某一时间点的近似值,而不是实时值。

  • double sumThenReset():返回当前累加的总和,并将加法器的总和重置为零。这个方法相当于先调用sum() 方法,然后调用 reset() 方法。

  • void reset():将加法器的总和重置为零。

此外,DoubleAdder 类还提供了其他一些方法,如 doubleValue()、floatValue()、intValue()、longValue() 等,用于将累加的总和转换为其他类型的数值。

简单示例:

package com.hxstrive.jdk8.concurrent.atomic;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.DoubleAdder;

/**
 * DoubleAdder 类
 * @author hxstrive.com
 */
public class DoubleAdderDemo {

    public static void main(String[] args) {
        final DoubleAdder doubleAdder = new DoubleAdder();
        final ExecutorService executor = Executors.newFixedThreadPool(10);

        for(int i = 0; i < 10; i++ ) {
            executor.execute(() -> {
                for(int j = 0; j < 100; j++) {
                    doubleAdder.add(1.0);
                }
            });
        }

        executor.shutdown();
        while (!executor.isTerminated()){
            // do nothing
        }
        System.out.println(doubleAdder.sum()); // 1000.0
    }

}

上面示例演示了在多线程环境下,使用 add() 方法对 DoubleAdder 进行累加操作。

说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号