Spring Data Redis 教程

Redis Streams 序列化

Spring Data Redis 中,所有发送到流(Stream)中的任何记录都需要序列化为二进制格式。

由于流(Stream)与哈希(Hash)的数据结构非常接近,因此流键(Key)、字段名(Field Name)和值(Field Value)均使用了 RedisTemplate 上配置的相应序列化器。下面列出了对应的序列化和反序列化器:

  • key  使用 keySerializer 的序列化器,用于 Record.getStream() 方法

  • field  使用 hashKeySerializer 的序列化器,用于有效载荷 Map 中的每个键

  • value  使用 hashValueSerializer 的序列化器,用于有效载荷 Map 中的每个值

注意:请确保检查正在使用的RedisSerializers,并注意,如果您决定不使用任何序列化程序,则需要确保这些值已经是二进制的。

对象映射

简单值(Simple Values)

StreamOperations 允许通过 ObjectRecord 将简单的值直接追加到流中,而不需要将这些值放到 Map 结构中。然后,该值将被分配到一个有效载荷字段,在读回该值时可以提取。例如:

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.List;

/**
 * Spring Data Redis 的 Redis Stream 示例(简单值 Simple Value)
 * @author hxstrive.com 2022/2/26
 */
@SpringBootTest
public class RedisStreamForSimpleValue {

    @Autowired
    private RedisTemplate<String,String> redisTemplate;

    @Test
    public void contextLoads() {
        ObjectRecord<String, String> record = StreamRecords.newRecord()
                .in("my-stream")
                .ofObject("my-value");

        // 向流添加一个记录,对应的 Redis 命令如下:
        // XADD my-stream * "_class" "java.lang.String" "_raw" "my-value"
        redisTemplate.opsForStream().add(record);

        List<ObjectRecord<String, String>> records = redisTemplate.opsForStream()
                .read(String.class, StreamOffset.fromStart("my-stream"));
        for(ObjectRecord<String, String> rec : records) {
            System.out.println("id=" + rec.getId());
            System.out.println("stream=" + rec.getStream());
            System.out.println("value=" + rec.getValue());
        }
    }

}

运行示例,输出结果如下:

id=1667279087680-0
stream=my-stream
value=my-value

注意,ObjectRecord 通过与所有其他记录完全相同的序列化过程,因此也可以使用返回 MapRecord 的非类型化读取操作来获取 Record。

复杂值(Complex Values)

向流(Stream)中添加一个复杂值可以通过三种方式完成:

(1)将复杂值转换成简单值,如:使用 JSON 字符串表示

(2)使用合适的 RedisSerializer 序列化值

(3)使用 HashMapper 将值转换为适合序列化的 Map

第一个变体是最直接的变体,但忽略了流结构所提供的字段值功能,流中的值对其他消费者来说仍然是可读的。

第二种方案的好处与第一种方案相同,但可能会导致非常特殊的消费者限制,因为所有的消费者都必须实现非常相似的序列化机制。

第三种方案,HashMapper 方法是一种更复杂的方法,它使用了 Steams 的哈希结构,但对源代码进行了扁平化。只要选择了合适的序列化程序组合,其他用户仍然能够读取记录。

HashMappers 将有效载荷转换为具有特定类型的Map。请确保使用能够(去/反)序列化哈希的 Hash-Key 和 Hash-Value 序列化器。例如:

(1)用户实体类,代码如下:

/**
 * 用户实体
 * @author hxstrive.com 2022/7/5
 */
public class User {
    /** 用户ID */
    private int id;
    /** 用户名 */
    private String name;

    public User() {}
    public User(int id, String name) {
        this.id = id;
        this.name = name;
    }
    //...忽略 getter 和 setter 方法...
}

(2)测试类,代码如下:

import com.hxstrive.redis.entity.User;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.List;

/**
 * Spring Data Redis 的 Redis Stream 示例(复杂值 Complex Value)
 * @author hxstrive.com 2022/2/26
 */
@SpringBootTest
public class RedisStreamForComplexValue {

    @Autowired
    private RedisTemplate<String,String> redisTemplate;

    @Test
    public void contextLoads() {
        ObjectRecord<String, User> record = StreamRecords.newRecord()
                .in("user-logon")
                .ofObject(new User(1000, "Tom"));

        // 向流添加一个记录,对应的 Redis 命令如下:
        // XADD user-logon * "_class" "com.hxstrive.redis.entity.User" "firstname" "night" "lastname" "angel"
        redisTemplate.opsForStream()
                .add(record);

        List<ObjectRecord<String, User>> records = redisTemplate.opsForStream()
                .read(User.class, StreamOffset.fromStart("user-logon"));
        for(ObjectRecord<String, User> rec : records) {
            System.out.println("id=" + rec.getId());
            System.out.println("stream=" + rec.getStream());
            System.out.println("value=" + rec.getValue());
        }
    }

}

运行示例,输出结果如下:

id=1667279949643-0
stream=user-logon
value=com.hxstrive.redis.entity.User@36224f93

默认情况下,StreamOperations 使用 ObjectHashMapper。获取 StreamOperations 时,您可以提供适合您要求的 HashMapper。例如:

redisTemplate.opsForStream(new Jackson2HashMapper(true))
    // 对应的 Redis 命令如下:
    // XADD user-logon * "firstname" "night" "@class" "com.example.User" "lastname" "angel"
    .add(record);

StreamMessageListenerContainer 可能不知道域类型上使用的任何 @TypeAlias,因为这些类型需要通过 MappingContext 进行解析。确保使用 initialEntitySet() 初始化 RedisMappingContext。例如:

@Bean
RedisMappingContext redisMappingContext() {
    RedisMappingContext ctx = new RedisMappingContext();
    ctx.setInitialEntitySet(Collections.singleton(Person.class));
    return ctx;
}

@Bean
RedisConverter redisConverter(RedisMappingContext mappingContext) {
    return new MappingRedisConverter(mappingContext);
}

@Bean
ObjectHashMapper hashMapper(RedisConverter converter) {
    return new ObjectHashMapper(converter);
}

@Bean
StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory connectionFactory, ObjectHashMapper hashMapper) {
    StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options = StreamMessageListenerContainerOptions.builder()
            .objectMapper(hashMapper)
            .build();
    return StreamMessageListenerContainer.create(connectionFactory, options);
}
说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号