RabbitMQ是一个消息代理:它接收并转发消息。你可以将它想象成一个邮局:你可以把邮件放在邮箱里,然后邮递员会把邮件送到你的收件人那里。在这个比喻中,RabbitMQ是一个邮箱,一个邮局和一个邮递员。
RabbitMQ与邮局之间的主要区别在于:它不处理纸张,而是接收,存储和转发二进制数据消息。
RabbitMQ和消息传递通常使用一些术语:
生产者
Producer:生产者,就是投递消息的一方。
生产者创建消息,然后发布到RabbitMQ中。消息一般可以包含2个部分:消息体和标签(Lable)。消息体也可以称为payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。当然可以进一步对这个消息体进行序列化操作,消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。生产者把消息交由RabbitMQ,RabbitMQ之后会根据标签把消息发送给感兴趣的消费者(Consumer)
消费者
Consumer:消费者,就是接收消息的一方。
消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)。在消息路由的过程中,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也不知道消息的生产者是谁,当然消费者也不需要知道。
队列
Queue:队列,是RabbitMQ的内部对象,用来存储消息,队列可以用下图表示。
RabbitMQ中消息都只能存储在队列中。RabbitMQ的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,如下图所示。
交换器
Exchange:交换器。在图2-4中我们暂时可以理解成生产者将消息投递到队列中,实际上这个在RabbitMQ中不会发生。真实情况是:生产者将消息发送到Exchange(交换器,通常也可以用大写的“X”表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或者会返给生产者,或许直接丢失。我们可以将RabbitMQ中的交换器看成一个简单的实体。
交换器的具体示意图如下图所示:
RabbitMQ中的常用的交换器类型有fanout,direct,topic,headers这四种。下面我们来简单了解一下:
1:fanout
它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
2:direct
direct类型的交换器路由规则也很简单,它会把消息路由到BindinKey和RoutingKey完全匹配的队列中。
以下图为例,交换器的类型为direct,如果我们发送一条消息,并在发送消息的时候设置路由键为”warning“,则消息会路由到Queue1和Queue2。
如果在发送消息的时候设置路由键为”info“或者”debug“,则消息只会路由到Queue2。如果以其他的路由键发送消息,则消息不会路由到这两个队列中。
3:topic
从上面我们可以得知direct类型的交换器路由规则是完全匹配BindingKey和RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic类型的交换器在匹配规则上进行了扩展。它与direct类型相似,也是将消息路由到BindingKey和RoutingKey相匹配的队列中,但这里的匹配规则有些不同,它约定:
- RoutingKey 为一个点号” “分隔的字符串(被点号” “分隔开的每 一 段独立的字符 串称为一个单词 )如“com.rabbitmg.client”“java.util.concurrent”、“com.hidden.client””;
- BindingKey 和 RoutingKey 一样也是点号” “分隔的字符串;
- BindingKey 中可以存在两种特殊 字符串”*”和”#”,用于做模糊匹配,其中”#”用于匹配一个单词,吁”用于匹配多规格单词(可以是零个)。
以图 2-8 中的配置为例:
· 路由键为” com.rabbitmq.client” 的消息会同时路由到 Queuel 和 Queue2; · 路由键为” com.hidden.client” 的消息只会路由到 Queue2 中:
· 路由键为” com.hidden.demo” 的消息只会路由到 Queue2 中:
· 路由键为 “java.rabbitmq.demo” 的消息只会路由到 Queuel 中:
· 路由键为” java.util.concurrent” 的消息将会被丢弃或者返回给生产者(需要设置
mandatory 参数) ,因为它没有匹配任何路由键。
- 路由键为” com.rabbitmq.client” 的消息会同时路由到 Queuel 和 Queue2;
- 路由键为” com.hidden.client” 的消息只会路由到 Queue2 中:
- 路由键为” com.hidden.demo” 的消息只会路由到 Queue2 中:
- 路由键为 “java.rabbitmq.demo” 的消息只会路由到 Queuel 中:
- 路由键为” java.util.concurrent” 的消息将会被丢弃或者返回给生产者(需要设置mandatory 参数) ,因为它没有匹配任何路由键。
4:headers
headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时制定一组键值对 , 当发送消息到交换器时, RabbitMQ 会获取到该消息的 headers (也是一个键值对的形式) ,对比其中的键值对 是否完全 匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由 到该队列 。 headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
路由键
RoutingKey:路由键。生产者将消息发送交换器的时候,一般会制定一个RoutingKey,用来指定这个消息的路由规则。
而这个RoutingKey需要交换器类型和绑定键(BindingKey)联合使用才能最终生效。
在交换器类型和绑定键(BindingKey)固定的情况下,生产者可以在发送消息给交换器时,通过指定RoutingKey来决定消息流向哪里。
绑定:
Binding:绑定。RabbitMQ通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键。这样RabbitMQ就知道如何正确地将消息路由到队列了。如下图所示:
生产者将消息发送给交换器时, 需要一个 RoutingKey, 当 BindingKey和 RoutingKey相匹 配时, 消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候, 这些绑定允许 使用 相 同的 BindingKey0 BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型 , 比 如 fanout类型的交换器就会无视 BindingKey,而是将消息路由到所有绑定到该交换器的队列中 。
案例:
下面我们将使用PHP语言开始我们的第一个案例:
首先我们安装客户端库,首先我们先查一下哪个包更流行
ok,那么我们就选择php-amqplib/php-amqplib
接着执行以下命令:
$ composer require php-amqplib/php-amqplib
定义一个生产者
/**
* 发送【生产者】
* @throws \Exception
*/
public function actionSend()
{
//链接RabbitMQ服务器
$connection = new AMQPStreamConnection(self::HOST, self::PORT, self::USER, self::PASSWD);
//创建一个通道
$channel = $connection->channel();
//要想发送,我们必须声明一个队列,接着我们发布一条消息到队列,声明队列是幂等的-仅当队列不存在时才创建
$channel->queue_declare("hello", false, false, false, false);
$msg = new AmqpMessage("Hello world");
$channel->basic_publish($msg, "", 'hello');
echo "[x] Sent 'hello world\n'";
//最后,我们关闭通道跟链接
$channel->close();
$connection->close();
}
再来定义一个消费者
/**
* 接收【消费者】
* @throws \ErrorException
*/
public function actionReceive()
{
//链接RabbitMQ服务器
$connection = new AMQPStreamConnection(self::HOST, self::PORT, self::USER, self::PASSWD);
//创建一个通道
$channel = $connection->channel();
//声明要消耗的队列。请注意,这与发布者的队列匹配。
$channel->queue_declare("hello", false, false, false, false);
echo "* Waiting for messages. To exit press CTRL+C\n";
//我们将告诉服务器将队列中的消息发送给我们。我们将定义一个可调用的PHP,它将接收服务器发送的消息。请记住,消息是从服务器异步发送到客户端的。
//当$channel有回调时,我们的代码将阻塞。每当我们收到一条消息,我们的$callback函数将传递收到的消息。
$callback = function ($msg) {
echo '[x] Received' . $msg->body . "\n";
};
$channel->basic_consume("hello", "", false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
}
我们接着在终端,运行两个命令:
root@4fab6becb32d:/var/www/yii-basic# php yii queue/receive
root@4fab6becb32d:/var/www/yii-basic# php yii queue/send
[x] Sent 'hello world
这时候我们可以看到消费者脚本也打印了一条信息
[x] ReceivedHello world
至此,我们也算简单的入门了,下面我们再接着讲RabbitMQ的工作队列。
参考资料
《RabbitMQ 实战指南》