Spring Data Redis 中,所有发送到流(Stream)中的任何记录都需要序列化为二进制格式。
由于流(Stream)与哈希(Hash)的数据结构非常接近,因此流键(Key)、字段名(Field Name)和值(Field Value)均使用了 RedisTemplate 上配置的相应序列化器。下面列出了对应的序列化和反序列化器:
key 使用 keySerializer 的序列化器,用于 Record.getStream() 方法
field 使用 hashKeySerializer 的序列化器,用于有效载荷 Map 中的每个键
value 使用 hashValueSerializer 的序列化器,用于有效载荷 Map 中的每个值
注意:请确保检查正在使用的RedisSerializers,并注意,如果您决定不使用任何序列化程序,则需要确保这些值已经是二进制的。
StreamOperations 允许通过 ObjectRecord 将简单的值直接追加到流中,而不需要将这些值放到 Map 结构中。然后,该值将被分配到一个有效载荷字段,在读回该值时可以提取。例如:
import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.core.RedisTemplate; import java.util.List; /** * Spring Data Redis 的 Redis Stream 示例(简单值 Simple Value) * @author hxstrive.com 2022/2/26 */ @SpringBootTest public class RedisStreamForSimpleValue { @Autowired private RedisTemplate<String,String> redisTemplate; @Test public void contextLoads() { ObjectRecord<String, String> record = StreamRecords.newRecord() .in("my-stream") .ofObject("my-value"); // 向流添加一个记录,对应的 Redis 命令如下: // XADD my-stream * "_class" "java.lang.String" "_raw" "my-value" redisTemplate.opsForStream().add(record); List<ObjectRecord<String, String>> records = redisTemplate.opsForStream() .read(String.class, StreamOffset.fromStart("my-stream")); for(ObjectRecord<String, String> rec : records) { System.out.println("id=" + rec.getId()); System.out.println("stream=" + rec.getStream()); System.out.println("value=" + rec.getValue()); } } }
运行示例,输出结果如下:
id=1667279087680-0 stream=my-stream value=my-value
注意,ObjectRecord 通过与所有其他记录完全相同的序列化过程,因此也可以使用返回 MapRecord 的非类型化读取操作来获取 Record。
向流(Stream)中添加一个复杂值可以通过三种方式完成:
(1)将复杂值转换成简单值,如:使用 JSON 字符串表示
(2)使用合适的 RedisSerializer 序列化值
(3)使用 HashMapper 将值转换为适合序列化的 Map
第一个变体是最直接的变体,但忽略了流结构所提供的字段值功能,流中的值对其他消费者来说仍然是可读的。
第二种方案的好处与第一种方案相同,但可能会导致非常特殊的消费者限制,因为所有的消费者都必须实现非常相似的序列化机制。
第三种方案,HashMapper 方法是一种更复杂的方法,它使用了 Steams 的哈希结构,但对源代码进行了扁平化。只要选择了合适的序列化程序组合,其他用户仍然能够读取记录。
HashMappers 将有效载荷转换为具有特定类型的Map。请确保使用能够(去/反)序列化哈希的 Hash-Key 和 Hash-Value 序列化器。例如:
(1)用户实体类,代码如下:
/** * 用户实体 * @author hxstrive.com 2022/7/5 */ public class User { /** 用户ID */ private int id; /** 用户名 */ private String name; public User() {} public User(int id, String name) { this.id = id; this.name = name; } //...忽略 getter 和 setter 方法... }
(2)测试类,代码如下:
import com.hxstrive.redis.entity.User; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.core.RedisTemplate; import java.util.List; /** * Spring Data Redis 的 Redis Stream 示例(复杂值 Complex Value) * @author hxstrive.com 2022/2/26 */ @SpringBootTest public class RedisStreamForComplexValue { @Autowired private RedisTemplate<String,String> redisTemplate; @Test public void contextLoads() { ObjectRecord<String, User> record = StreamRecords.newRecord() .in("user-logon") .ofObject(new User(1000, "Tom")); // 向流添加一个记录,对应的 Redis 命令如下: // XADD user-logon * "_class" "com.hxstrive.redis.entity.User" "firstname" "night" "lastname" "angel" redisTemplate.opsForStream() .add(record); List<ObjectRecord<String, User>> records = redisTemplate.opsForStream() .read(User.class, StreamOffset.fromStart("user-logon")); for(ObjectRecord<String, User> rec : records) { System.out.println("id=" + rec.getId()); System.out.println("stream=" + rec.getStream()); System.out.println("value=" + rec.getValue()); } } }
运行示例,输出结果如下:
id=1667279949643-0 stream=user-logon value=com.hxstrive.redis.entity.User@36224f93
默认情况下,StreamOperations 使用 ObjectHashMapper。获取 StreamOperations 时,您可以提供适合您要求的 HashMapper。例如:
redisTemplate.opsForStream(new Jackson2HashMapper(true)) // 对应的 Redis 命令如下: // XADD user-logon * "firstname" "night" "@class" "com.example.User" "lastname" "angel" .add(record);
StreamMessageListenerContainer 可能不知道域类型上使用的任何 @TypeAlias,因为这些类型需要通过 MappingContext 进行解析。确保使用 initialEntitySet() 初始化 RedisMappingContext。例如:
@Bean RedisMappingContext redisMappingContext() { RedisMappingContext ctx = new RedisMappingContext(); ctx.setInitialEntitySet(Collections.singleton(Person.class)); return ctx; } @Bean RedisConverter redisConverter(RedisMappingContext mappingContext) { return new MappingRedisConverter(mappingContext); } @Bean ObjectHashMapper hashMapper(RedisConverter converter) { return new ObjectHashMapper(converter); } @Bean StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory connectionFactory, ObjectHashMapper hashMapper) { StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options = StreamMessageListenerContainerOptions.builder() .objectMapper(hashMapper) .build(); return StreamMessageListenerContainer.create(connectionFactory, options); }