SpringBoot集成RabbitMQ

RabbitMQ是一个基于Erlang语言实现了AMQP(Advanced Message Queuing Protocol 高级消息队列协议)的消息队列中间件中的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层保存这个数据。


正文

RabbitMQ相关概念

通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

这里写图片描述

  • 左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。
  • 中间即是 RabbitMQ,其中包括了 交换机 和 队列。
  • 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。

那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。

  • 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。
  • 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
  • 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

交换机(Exchange)

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout

  • Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
  • Topic:按规则转发消息(最灵活)
  • Headers:设置header attribute参数类型的交换机
  • Fanout:转发消息到所有绑定队列

Direct Exchange
Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
这里写图片描述

X - Q1 有一个 binding key,名字为 orange; X - Q2 有 2 个 binding key,名字为 black 和 green。当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。

Ps:为什么 X 到 Q2 要有 black,green,2个 binding key呢,一个不就行了吗? - 这个主要是因为可能又有 Q3,而Q3只接受 black 的信息,而Q2不仅接受black 的信息,还接受 green 的信息。

Topic Exchange

Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。

在这种交换机模式下:

路由键必须是一串字符,用点(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。
路由模式中 星号(),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements.\.a,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第三个单词是 a。 井号(#)表示一个或者多个单词,例如一个匹配模式是agreements.eu.berlin.#,那么,以agreements.eu.berlin开头的路由键都是可以的。
具体代码发送的时候还是一样,第一个参数表示交换机,第二个参数表示routing key,第三个参数即消息。如下:

1
rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is  RabbitMQ!");

topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以使用两个通配符:

  • * 表示一个词.
  • # 表示零个或多个词.

Headers Exchange
headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

Fanout Exchange
Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略。


安装RabbitMQ

本文介绍在CentOS 系统中如何安装RabbitMQ服务器,RabbitMQ是基于Erlang语言的,所以要先安装其运行环境erlang,通过yum安装:

1
yum install erlang

安装完erlang后就可以安装RabbitMQ了,使用如下命令:

1
2
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.5/rabbitmq-server-3.3.5-1.noarch.rpm
yum install rabbitmq-server-3.3.5-1.noarch.rpm

加入开机启动

1
chkconfig rabbitmq-server on

启动服务

1
service rabbitmq-server start

RabbitMQ的web管理默认是非开启的,而我们经常使用web管理来管理RabbitMQ,所以需要开启web管理,运行一下命令可以查看是否开启了web管理

1
rabbitmq-plugins list -e

如果打印的列表中没有rabbitmq_management,就需要开启插件才可以使用,运行命令

1
rabbitmq-plugins enable rabbitmq_management

rabbitmq默认账户是guest,处于安全考虑,该账户禁止远程登录,并且官方建议删除guest账户创建一个新账户,输入以下命令

1
2
3
4
rabbitmqctl delete_user  guest
rabbitmqctl add_user admin 123456
//设置admin账户支持远程访问
rabbitmqctl set_user_tags admin administrator

在配置文件/etc/rabbitmq/rabbitmq.config(不存在就新建一个)中添加(注意后面的点要加上):

1
[{rabbit, [{loopback_users, []}]}].

重启服务

1
service rabbitmq-service restart

RabbitMQ默认的web管理访问端口是15672,服务端口(应用程序访问的端口)是5672,在浏览器中输入http://{server_name}:15672/访问,输入刚才创建的账户密码admin/123456登录就可以进行管理了。
这时候RabbitMQ服务器就安装完成并且已经启动成功了,但是需要注意一点,如问题1中,要使springboot应用程序能够访问RabbitMQ服务器,还需要对admin账号进行授权

springboot集成RabbitMQ

简单使用

  • springboot集成RabbitMQ非常简单,引入依赖包:

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • 配置文件中加入:

    1
    2
    3
    4
    5
    6
    7
    8
    ##写rabbitmq服务器的IP地址,如果在本地就是localhost
    spring.rabbitmq.host=
    ##rabbitmq服务器默认访问端口是5672
    spring.rabbitmq.port=5672
    ##登录名
    spring.rabbitmq.username=admin
    ##密码
    spring.rabbitmq.password=123456
  • 使用@Configuration@Bean 注解配置队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Configuration
    public class RabbitConfig {

    @Bean
    public Queue helloQueue() {
    return new Queue("hello");
    }

    }

如上所示表示向spring容器装配了一个名为helloQueue的Bean,@Bean 相当于XML 配置中的 标签, 其注解的方法的返回值识别为 Spring Bean,并注册到容器中,受 IoC 容器管理。
以上方法在等同于在xml中这样写:

1
2
3
4
5
<beans>
<bean id="helloQueue" class="org.springframework.amqp.core.Queue">
<constructor-arg value="hello"/>
</bean>
</beans>

这样,在spring容器里面就有了一个RabbitMQ队列,队列名(队列名并非bean的id,通过构造函数的参数传入的是队列名)叫做hello。

  • 创建一个发送消息者
    rabbitTemplate是springboot 提供的默认实现,通过convertAndSend()向队列中发送消息,converAndSend第一个参数传入队列名,前面创建的队列名是hello,所以传入hello,第二个传入的参数是要发送的消息,消息可以是String,也可以是对象,但是发送对象时,对象必须序列化,否则会出现问题2

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
    String message = "i'm hello message";
    System.out.println("helloSender send: " + message);
    rabbitTemplate.convertAndSend("hello", message);
    }

    }
  • 接收者
    接收者通过@RabbitListener监听hello队列,当队列中有消息时,通过@RabbitHandler注解的方法处理消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Component
    @RabbitListener(queues = "hello")
    public class HelloReceiver {

    @RabbitHandler
    public void receive(String message) {
    System.out.println("HelloReceiver receive: " + message);
    }

    }
  • 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {

@Autowired
private HelloSender helloSender;

@Test
public void hello() throws Exception {
helloSender.send();
}

}

多对多使用
多对多使用和一对多、多对一使用都和一对一使用一样,再新建一个Receiver2同样监听hello队列,在发送方发送多条消息,两个接收者可以均匀取得hello队列中的消息,当多个发送者时,多个发送者发送的消息依次进入队列,接收方依次从队列中取出消息。RabbitMQ队列遵循FIFO(先进先出规则),并且不能设置优先级

高级使用
前面说到RabbitMQ在队列前加了一层交换器,交换器的作用是将消息发送者发送来的消息根据调度策略分配给相应的队列,上面的RabbitMQ简单使用 其实使用的就是第一种交换器Direct,direct交换器是RabbitMQ默认的交换器,所以在上面我们看似将消息直接发送给了队列,实际是通过direct交换器直接根据队列名匹配将消息分配给队列,接下来说说其他交换器:

  • Topic Exchange
    topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列,首先向spring容器里面注册queueMessage和queueMessages队列,这两个队列的队列名分别是topic.message和topic.messages,然后向spring容器注册一个topicExchange交换机,该交换机的name为exchange,然后给这个交换机绑定路由规则,其中第一个规则是:当 routing_key完全匹配topic.message时,将消息分配给queueMessage队列,第二个规则是:当routing_key的第一个单词是topic时,将消息分配给queueMessages队列,路由规则就配好了。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    @Configuration
    public class RabbitConfig {

    final static String message = "topic.message";
    final static String messages = "topic.messages";

    @Bean
    public Queue queueMessage(){
    return new Queue(RabbitConfig.message);
    }

    @Bean
    public Queue queueMessages() {
    return new Queue(RabbitConfig.messages);
    }

    @Bean
    public TopicExchange topicExchange(){
    return new TopicExchange("exchange");
    }

    @Bean
    public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange topicExchange) {
    return BindingBuilder.bind(queueMessage).to(topicExchange).with("topic.message");
    }

    @Bean
    public Binding bindingExchangeMessages(Queue queueMessages, TopicExchange topicExchange) {
    return BindingBuilder.bind(queueMessages).to(topicExchange).with("topic.#");
    }
    }

消息发送者:
接下来定义一个消息发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class TopicSender {

@Autowired
private AmqpTemplate amqpTemplate;

public void send1(){
String context = "i'm 1";
System.out.println("Send1 :" + context);
amqpTemplate.convertAndSend("exchange", "topic.1", context);
}

public void send2(){
String context = "i'm message";
System.out.println("Send2 :" + context);
amqpTemplate.convertAndSend("exchange", "topic.message", context);
}

public void send3(){
String context = "i'm messages";
System.out.println("Send3 :" + context);
amqpTemplate.convertAndSend("exchange", "topic.messages", context);
}
}

convertAndSend方法在传入三个参数时,第一个参数表示消息要发送给的交换器,第二个参数就是routing_key,交换机会根据该routing_key匹配要将消息分配给那个队列,第三个参数就是要发送的消息。
通过调用TopicSender 的三个send方法会向exchange交换机发送三个消息,其中第一个消息的routing_key是topic.1,由于第一个单词是topic所以根据路由规则,该消息会被分配给queueMessages队列,第二个消息的routing_key是topic.message,由于第一个单词是topic所以消息会被分配给queueMessages队列,同时由于该routing_key与topic.message完全匹配,所以该消息还会分配给queueMessage队列,第三个消息同理于第一个消息会被分配给queueMessages队列,所以queueMessages能接受到这三个消息,而queueMessage只能接受到第二个消息

定义两个接收者分别监听queueMessages队列和queueMessage队列,这两个队列的name分别是topic.message和topic.messages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
@RabbitListener(queues = "topic.message")
public class Receiver1 {

@RabbitHandler
public void receive(String message){
System.out.println("message receiver :" + message);
}
}

@Component
@RabbitListener(queues = "topic.messages")
public class Receiver2 {

@RabbitHandler
public void receive(String message){
System.out.println("messages receiver :" + message);
}
}

调用TopicSender的三个send方法发送消息,结果如下:

1
2
3
4
5
6
7
Send1 :i'm 1
Send2 :i'm message
Send3 :i'm messages
messages receiver :i'm 1
message receiver :i'm message
messages receiver :i'm message
messages receiver :i'm messages

  • Fanout Exchange
    Fanout交换器理解起来就比较简单了,Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换器发送消息,绑定了这个交换器的所有队列都能收到消息。
    注册三个队列和一个Fanout交换器,并且将三个队列都与该交换器绑定
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    @Configuration
    public class FanoutRabbitConfig {

    @Bean
    public Queue AMessageQueue(){
    return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessageQueue(){
    return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessageQueue(){
    return new Queue("fanout.C");
    }

    @Bean
    public FanoutExchange fanoutExchange(){
    return new FanoutExchange("fanoutExchange");
    }

    @Bean
    public Binding bindingExchangeA(Queue AMessageQueue, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(AMessageQueue).to(fanoutExchange);
    }

    @Bean
    public Binding bindingExchangeB(Queue BMessageQueue, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(BMessageQueue).to(fanoutExchange);
    }

    @Bean
    public Binding bindingExchangeC(Queue CMessageQueue, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(CMessageQueue).to(fanoutExchange);
    }
    }

前面讲Topic交换器的时候已经讲的很详细了,这里就不再赘述

定义一个消息发送者

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class FanoutSender {

@Autowired
private AmqpTemplate amqpTemplate;

public void send(){
String context = "i'm a fanout message!";
System.out.println("fanout Send: " + context);
amqpTemplate.convertAndSend("fanoutExchange","", context);
}
}

当使用convertAndSend向Fanout交换器发送消息时,第二个参数routing_key无论传入什么都无效

定义三个消息接收者分别监听A、B、C三个队列,在测试类中调用FanoutSender的send方法,结果如下

1
2
3
4
fanout Send: i'm a fanout message!
fanout.A receive: i'm a fanout message!
fanout.B receive: i'm a fanout message!
fanout.C receive: i'm a fanout message!


问题

问题1

记录一个白痴问题,在我使用springboot集成RabbitMQ的时候出现了一个错误,百度&google了许久,终于知道原因所在,故记录下来,希望帮到RabbitMQ的初学者
问题是这样的:
在springboot中集成RabbitMQ,运行时报了如下异常,google了一下,Connection reset 的意思是:服务器在TCP三次握手建立连接的时候调用了Socket.close()方法关闭了Connection,并返回了’RST’标志,’RST’标志的意思是’我不发送数据也不接受数据了’,如此看来应该是服务器拒绝了连接,那么服务器为什么会拒绝连接呢?难道是没有权限?于是又百度之,最后发现问题真的是没有访问权限所致

1
2
3
4
5
6
7
8
9
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:209)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:571)

解决

使用要授权的用户登录web管理控制台之后点击上方的Admin,然后在All users里面点击要授权的用户admin
这里写图片描述
点击admin用户之后发现进入如下页面,发现上面有一排黄色区域提示该用户没有访问权限,直接点击下方的Set permission按钮授权即可
这里写图片描述
授权完成之后再运行程序,发现可以使用RabbitMQ发送接收消息了

###问题2
在使用RabbitMQ发送对象的时候,对象必须要序列化,否者会出现如下错误:

1
2
[cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
Caused by: org.springframework.amqp.AmqpException: No method found for class [B

并导致死循环,如果出现了该错误可以在RabbitMQ的web管理上删除此队列,点击Queues选中需要删除的Queue并点击下方的Delete按钮
这里写图片描述


参考

RabbitMQ详解 —-by纯洁的微笑

centos下安装RabbitMQ —-by熊猫不是猫