0%

Spring使用redis的订阅和发布

文章字数:215,阅读全文大约需要1分钟

订阅

  • Spring提供了一个统一订阅监听的工具类,可以将多个监听复用一个连接
1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class RedisListenerConfig {
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MyListener mylistener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 添加监听
container.addMessageListener(mylistener, new ChannelTopic("topicName"));
return container;
}
}
1
2
3
4
5
6
7
8
9
10
@Component
public class MyListener implements MessageListener {
// 如何处理消息
@Override
public void onMessage(Message message, byte[] pattern) {
final byte[] messageBody = message.getBody();
String messageStr = new String(messageBody, StandardCharsets.UTF_8);
System.out.println("messageStr = " + messageStr);
}
}

使用适配器订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MyService myService) {
// 初始化
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 添加适配器,使用反射调用myService的dealMess方法处理
// dealMess方法只有一个入参,参数类型和通道发送时的一致即可
MessageListenerAdapter transactionAdapter =
new MessageListenerAdapter(myService, "dealMess");
// 适配器初始化
transactionAdapter.afterPropertiesSet();
// 监听'myChannel'通道,并用上面的适配器处理
container.addMessageListener(transactionAdapter, new ChannelTopic("myChannel"));
return container;
}

发布

1
2
String mess = "发送消息";
redisTemplate.convertAndSend("topicName", mess.getBytes(StandardCharsets.UTF_8));