基础概念
Exchange
producer生产消息发送给RabbitMQ后并非直接到Queue,而是经由Exchange做筛选后再分发给Queue。Exchange又分4中类型(Exchange Type)
- direct: 将消息转发到指定routing key的Queue中。有个特殊Exchange,若Exchange名为空则为default Exchange
- topic: 将消息按照规则转发。对routing key进行通配符匹配,将消息转发到匹配的Queue中。
*
匹配一个词、#
匹配一个或多个词,使用.
分隔。 - fanout: 将消息转发到所有绑定的Queue,适用于广播。
- header: 使用消息头代替routing key作为判断规则。
Queue
用于存储消息。相同属性的Queue可重复定义。Queue分2种类型:
- Durable: 持久化,broker重启后Queue还存在
Transient: 临时队列(自删除),broker重启后自动删除
如果消息持久化,则持久化的消息需要在持久化的Queue和Exchange中,即必须保证: exchange(指定 durable=1)、queue(指定 durable=1)和消息(delivery_mode=2)
Queue属性
- 持久性:如果启用,队列将会在server重启前都有效
- 自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除掉自身
- 惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明
- 排他性:如果启用,队列只能被声明它的消费者使用
Binding
将Exchange和Queue绑定在一起(会指定binding key),生产者将消息发送到Exchange时会指定routing key。routing key会和Exchange Type及binding key联合使用。
- fanout型无视Binding-key,直接将消息发送到和该Exchange绑定的Queue中。
- direct型则要求binding key和routing key完全匹配,将消息发送到完全匹配的Queue中。
- topic型则要求binding key和routing key正则匹配
- header型则不依赖binding key和routing key,而是根据消息中的header路由消息。
Virtual Host
Virtual Host拥有一组Exchange、Queue、Binding,根据Virtual Host进行用户权限设置
不同Virtual Host之间完全隔离,一般不能共享Exchange、Queue等Channel
消息通道,一个TCP连接中可建立多个channel,每个channel代表一个会话任务Acknowledgment
RabbitMQ为了保障消息的可靠性,在消费端和生产端都有相应的机制来确保- 消费端
消费者在消费完Queue中的消息后会发送一个ack给RabbitMQ,RabbitMQ收到ack后确认消息被消费后从Queue中将消息移除。若RabbitMQ没有收到ack而消费者断开连接,RabbitMQ会将消息发送给其他消费者。RabbitMQ不会为没ack的消息做超时处理,会一直等待ack,除非消费者断开连接。 - 生产端
生产端消息的可靠性主要有两种方式- AMQP事务机制
- 生产者调用
txSelect()
方法将channel设置成transaction模式 - 生产者调用
txCommit()
方法发送消息,txCommit()
成功则消息确认到达MQ;失败则调用txRollback()
回滚事务。
- 生产者调用
- channel confirm模式
AMQP事务机制对生产端的性能会有影响,所以RabbitMQ提供了另外一种方式以确保可靠性,就是将channel设置成confirm模式。一旦channel进入confirm模式,每条消息会分配一个唯一ID(1
开始),一旦消息到达MQ,MQ会发送携带ID的ack给生产者。为了提高性能RabbitMQ还提供了3中confirm模式:- common: 发送一条消息确认一条
- batch: 批量发送一批消息,批量确认
- async: 提供回调方法,消息被确认后生产者调用该回调方法
- AMQP事务机制
- 消费端
RabbitMQ部署
安装依赖&hostname设置
1 | yum -y install unixODBC unixODBC-devel libxslt xmlto ncurses-devel |
安装simplejson
1 | wget 'https://pypi.python.org/packages/source/s/simplejson/simplejson-3.8.1.tar.gz' |
安装erlang
1 | wget 'http://www.erlang.org/download/otp_src_18.1.tar.gz' |
安装rabbitmq
1 | wget 'http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-3.5.6.tar.gz' |
配置rabbitmq
1 | mkdir -p /etc/rabbitmq /usr/local/rabbitmq/data /usr/local/rabbitmq/log /usr/local/rabbitmq/plugins |
1 | # /etc/rabbitmq/rabbitmq-env.conf |
启动rabbitmq
1 | . /etc/profile |
启用插件
1 | /usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management |
插件相关操作
查询插件
1
2
3
4# 显示所有插件
rabbitmq-plugins list -v
# 显示已使用插件
rabbitmq-plugins list -E启用插件
1
rabbitmq-plugins enable rabbitmq_management
停用插件
1
rabbitmq-plugins disable rabbitmq_management
用户设置
1 | /usr/local/rabbitmq/sbin/rabbitmqctl add_user <user_name> <user_passwd> |
日志rotate
1 | # crontab |
RabbitMQ镜像集群
由于Erlang原因RaggitMQ集群不支持跨网段,若需要跨网段则使用shovel/federation等插件
安装RabbitMQ后,先停止RabbitMQ。/usr/local/rabbitmq/sbin/rabbitmqctl stop
集群hostname设置
集群所有节点拥有所有主机IP+hostname
1 | # /etc/hosts |
同步Erlang Cookie
将其中一个机器(10.201.3.49)的$HOME/.erlang.cookie
同步至所有机器。$HOME/.erlang.cookie
权限必须400
且ower
、group
必须为启动用户(此处以root
用户启动)
1 | # 所有节点执行 |
组成集群
在其余所有节点执行加入集群命令
1 | # 10.201.3.100 执行 |
默认使用disk模式加入集群,可使用ram模式加入集群,但集群中必须有一个节点是disk模式。
1 | /usr/local/rabbitmq/sbin/rabbitmqctl join_cluster --ram rabbit@sd-3-centos49 |
设置镜像队列策略
1 | # 将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一直 |
镜像队列策略
1 | set_policy [-p <vhostpath>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition> |
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
ha-all
: 策略名() "^"
: 匹配Queue队列名表达式,"^"
表示所有Queue{"ha-mode":"all"}
: 定义HA类型all
: 镜像队列将应用在整个集群。当一个新的节点加入后,该节点会获得所有数据的一份完全副本exactly
: 镜像队列在集群中复制count
份副本。当集群节点数目少于count
时,队列会复制到所有节点。'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
nodes
: 镜像队列会在node name
中存在副本。'{"ha-mode":"nodes","ha-params":["<node_name>", "<node_name>"]}'
-p <vhostpath>
,默认/
--priority <priority>
,设置优先级,越高越优先--apply-to <apply-to>
,作用对象,queue、exchange、all
RabbitMQ 常用操作
用户操作
查看用户
1
rabbitmqctl list_users
添加用户
1
rabbitmqctl add_user <user_name> <user_password>
修改密码
1
rabbitmqctl change_password <user_name> <new_password>
授权管理员权限
1
rabbitmqctl set_user_tags <user_name> administrator
删除用户
1
rabbitmqctl delete_user <user_name>
修改集群节点类型
将集群节点由disk类型改成ram类型,在该节点中执行:
1 | rabbitmqctl stop_app |
rabbitmqadmin 操作
rabbitmqadmin
为额外命令,开启rabbitmq_management
插件后可用
列出所有 Queue
1
rabbitmqadmin list queues name
删除 Queue
1
rabbitmqadmin delete queue name=<queue_name>
列出操作
1
rabbitmqadmin list [users|vhosts|connections|exchanges|bindings|permissions|channels|consumers|queues|policies|nodes]
强制删除 Queue
1 | rabbitmqctl eval 'Q = {resource, <<"VHOST NAME">>, queue, <<"QUEUE NAME">>}, rabbit_amqqueue:internal_delete(Q).' |
1 | # 删除crash queue |
Federation 插件
安装
1 | rabbitmq-plugins enable rabbitmq_federation |
创建Upstream
1 | # rabbitmqctl set_parameter federation-upstream <upstream_name> '{"uri":"amqp://server-name","expires":3600000}' |
创建Policy
1 | # rabbitmqctl set_policy --apply-to exchanges <policy_name> "<exchange_name>" '{"federation-upstream-set":"all"}' |
检测 Federation 状态
1 | rabbitmqctl eval 'rabbit_federation_status:status().' |
1 | rabbitmqctl list_exchanges name policy |
RabbitMQ Python API
1 | #!/usr/bin/env python |