基础概念
- Exchange - producer生产消息发送给RabbitMQ后并非直接到Queue,而是经由Exchange做筛选后再分发给Queue。Exchange又分4中类型(Exchange Type) - direct: 将消息转发到指定routing key的Queue中。有个特殊Exchange,若Exchange名为空则为default Exchange
- topic: 将消息按照规则转发。对routing key进行通配符匹配,将消息转发到匹配的Queue中。*匹配一个词、#匹配一个或多个词,使用.分隔。
- fanout: 将消息转发到所有绑定的Queue,适用于广播。
- header: 使用消息头代替routing key作为判断规则。
 
- Queue - 用于存储消息。相同属性的Queue可重复定义。Queue分2种类型: - Durable: 持久化,broker重启后Queue还存在
- Transient: 临时队列(自删除),broker重启后自动删除 - 如果消息持久化,则持久化的消息需要在持久化的Queue和Exchange中,即必须保证: exchange(指定 durable=1)、queue(指定 durable=1)和消息(delivery_mode=2) - Queue属性 
- 持久性:如果启用,队列将会在server重启前都有效
- 自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除掉自身
- 惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明
- 排他性:如果启用,队列只能被声明它的消费者使用
 
- Binding - 将Exchange和Queue绑定在一起(会指定binding key),生产者将消息发送到Exchange时会指定routing key。routing key会和Exchange Type及binding key联合使用。 - fanout型无视Binding-key,直接将消息发送到和该Exchange绑定的Queue中。
- direct型则要求binding key和routing key完全匹配,将消息发送到完全匹配的Queue中。
- topic型则要求binding key和routing key正则匹配
- header型则不依赖binding key和routing key,而是根据消息中的header路由消息。
 
- Virtual Host - Virtual Host拥有一组Exchange、Queue、Binding,根据Virtual Host进行用户权限设置 
 不同Virtual Host之间完全隔离,一般不能共享Exchange、Queue等
- Channel 
 消息通道,一个TCP连接中可建立多个channel,每个channel代表一个会话任务
- Acknowledgment 
 RabbitMQ为了保障消息的可靠性,在消费端和生产端都有相应的机制来确保- 消费端
 消费者在消费完Queue中的消息后会发送一个ack给RabbitMQ,RabbitMQ收到ack后确认消息被消费后从Queue中将消息移除。若RabbitMQ没有收到ack而消费者断开连接,RabbitMQ会将消息发送给其他消费者。RabbitMQ不会为没ack的消息做超时处理,会一直等待ack,除非消费者断开连接。
- 生产端
 生产端消息的可靠性主要有两种方式- AMQP事务机制- 生产者调用txSelect()方法将channel设置成transaction模式
- 生产者调用txCommit()方法发送消息,txCommit()成功则消息确认到达MQ;失败则调用txRollback()回滚事务。
 
- 生产者调用
- channel confirm模式
 AMQP事务机制对生产端的性能会有影响,所以RabbitMQ提供了另外一种方式以确保可靠性,就是将channel设置成confirm模式。一旦channel进入confirm模式,每条消息会分配一个唯一ID(1开始),一旦消息到达MQ,MQ会发送携带ID的ack给生产者。为了提高性能RabbitMQ还提供了3中confirm模式:- common: 发送一条消息确认一条
- batch: 批量发送一批消息,批量确认
- async: 提供回调方法,消息被确认后生产者调用该回调方法
 
 
- AMQP事务机制
 
- 消费端
RabbitMQ部署
安装依赖&hostname设置
| 1 | yum -y install unixODBC unixODBC-devel libxslt xmlto ncurses-devel | 
安装simplejson
| 1 | wget 'https://pypi.python.org/packages/source/s/simplejson/simplejson-3.8.1.tar.gz' | 
安装erlang
| 1 | wget 'http://www.erlang.org/download/otp_src_18.1.tar.gz' | 
安装rabbitmq
| 1 | wget 'http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-3.5.6.tar.gz' | 
配置rabbitmq
| 1 | mkdir -p /etc/rabbitmq /usr/local/rabbitmq/data /usr/local/rabbitmq/log /usr/local/rabbitmq/plugins | 
| 1 | # /etc/rabbitmq/rabbitmq-env.conf | 
启动rabbitmq
| 1 | . /etc/profile | 
启用插件
| 1 | /usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management | 
插件相关操作
- 查询插件 - 1 
 2
 3
 4- # 显示所有插件 
 rabbitmq-plugins list -v
 # 显示已使用插件
 rabbitmq-plugins list -E
- 启用插件 - 1 - rabbitmq-plugins enable rabbitmq_management 
- 停用插件 - 1 - rabbitmq-plugins disable rabbitmq_management 
用户设置
| 1 | /usr/local/rabbitmq/sbin/rabbitmqctl add_user <user_name> <user_passwd> | 
日志rotate
| 1 | # crontab | 
RabbitMQ镜像集群
由于Erlang原因RaggitMQ集群不支持跨网段,若需要跨网段则使用shovel/federation等插件
安装RabbitMQ后,先停止RabbitMQ。/usr/local/rabbitmq/sbin/rabbitmqctl stop
集群hostname设置
集群所有节点拥有所有主机IP+hostname
| 1 | # /etc/hosts | 
同步Erlang Cookie
将其中一个机器(10.201.3.49)的$HOME/.erlang.cookie同步至所有机器。$HOME/.erlang.cookie权限必须400且ower、group必须为启动用户(此处以root用户启动)
| 1 | # 所有节点执行 | 
组成集群
在其余所有节点执行加入集群命令
| 1 | # 10.201.3.100 执行 | 
默认使用disk模式加入集群,可使用ram模式加入集群,但集群中必须有一个节点是disk模式。
| 1 | /usr/local/rabbitmq/sbin/rabbitmqctl join_cluster --ram rabbit@sd-3-centos49 | 
设置镜像队列策略
| 1 | # 将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一直 | 
镜像队列策略
| 1 | set_policy [-p <vhostpath>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition> | 
- rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'- ha-all: 策略名(- ) 
- "^": 匹配Queue队列名表达式,- "^"表示所有Queue
- {"ha-mode":"all"}: 定义HA类型- all: 镜像队列将应用在整个集群。当一个新的节点加入后,该节点会获得所有数据的一份完全副本
- exactly: 镜像队列在集群中复制- count份副本。当集群节点数目少于- count时,队列会复制到所有节点。- '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
- nodes: 镜像队列会在- node name中存在副本。- '{"ha-mode":"nodes","ha-params":["<node_name>", "<node_name>"]}'
 
 
- -p <vhostpath>,默认- /
- --priority <priority>,设置优先级,越高越优先
- --apply-to <apply-to>,作用对象,queue、exchange、all
RabbitMQ 常用操作
用户操作
- 查看用户 - 1 - rabbitmqctl list_users 
- 添加用户 - 1 - rabbitmqctl add_user <user_name> <user_password> 
- 修改密码 - 1 - rabbitmqctl change_password <user_name> <new_password> 
- 授权管理员权限 - 1 - rabbitmqctl set_user_tags <user_name> administrator 
- 删除用户 - 1 - rabbitmqctl delete_user <user_name> 
修改集群节点类型
将集群节点由disk类型改成ram类型,在该节点中执行:
| 1 | rabbitmqctl stop_app | 
rabbitmqadmin 操作
rabbitmqadmin为额外命令,开启rabbitmq_management插件后可用
- 列出所有 Queue - 1 - rabbitmqadmin list queues name 
- 删除 Queue - 1 - rabbitmqadmin delete queue name=<queue_name> 
- 列出操作 - 1 - rabbitmqadmin list [users|vhosts|connections|exchanges|bindings|permissions|channels|consumers|queues|policies|nodes] 
强制删除 Queue
| 1 | rabbitmqctl eval 'Q = {resource, <<"VHOST NAME">>, queue, <<"QUEUE NAME">>}, rabbit_amqqueue:internal_delete(Q).' | 
| 1 | # 删除crash queue | 
Federation 插件
安装
| 1 | rabbitmq-plugins enable rabbitmq_federation | 
创建Upstream
| 1 | # rabbitmqctl set_parameter federation-upstream <upstream_name> '{"uri":"amqp://server-name","expires":3600000}' | 
创建Policy
| 1 | # rabbitmqctl set_policy --apply-to exchanges <policy_name> "<exchange_name>" '{"federation-upstream-set":"all"}' | 
检测 Federation 状态
| 1 | rabbitmqctl eval 'rabbit_federation_status:status().' | 
| 1 | rabbitmqctl list_exchanges name policy | 
RabbitMQ Python API
| 1 | #!/usr/bin/env python |