目录
  1. 1. 基础概念
  2. 2. RabbitMQ部署
    1. 2.1. 安装依赖&hostname设置
    2. 2.2. 安装simplejson
    3. 2.3. 安装erlang
    4. 2.4. 安装rabbitmq
    5. 2.5. 配置rabbitmq
    6. 2.6. 启动rabbitmq
    7. 2.7. 启用插件
      1. 2.7.1. 插件相关操作
    8. 2.8. 用户设置
    9. 2.9. 日志rotate
  3. 3. RabbitMQ镜像集群
    1. 3.1. 集群hostname设置
    2. 3.2. 同步Erlang Cookie
    3. 3.3. 组成集群
    4. 3.4. 设置镜像队列策略
      1. 3.4.1. 镜像队列策略
  4. 4. RabbitMQ 常用操作
    1. 4.1. 用户操作
    2. 4.2. 修改集群节点类型
    3. 4.3. rabbitmqadmin 操作
    4. 4.4. 强制删除 Queue
    5. 4.5. Federation 插件
      1. 4.5.1. 安装
      2. 4.5.2. 创建Upstream
      3. 4.5.3. 创建Policy
      4. 4.5.4. 检测 Federation 状态
  5. 5. RabbitMQ Python API

基础概念

  • Exchange

    producer生产消息发送给RabbitMQ后并非直接到Queue,而是经由Exchange做筛选后再分发给Queue。Exchange又分4中类型(Exchange Type)

    1. direct: 将消息转发到指定routing key的Queue中。有个特殊Exchange,若Exchange名为空则为default Exchange
    2. topic: 将消息按照规则转发。对routing key进行通配符匹配,将消息转发到匹配的Queue中。*匹配一个词、#匹配一个或多个词,使用.分隔。
    3. fanout: 将消息转发到所有绑定的Queue,适用于广播。
    4. header: 使用消息头代替routing key作为判断规则。
  • Queue

    用于存储消息。相同属性的Queue可重复定义。Queue分2种类型:

    • Durable: 持久化,broker重启后Queue还存在
    • Transient: 临时队列(自删除),broker重启后自动删除

      如果消息持久化,则持久化的消息需要在持久化的Queue和Exchange中,即必须保证: exchange(指定 durable=1)、queue(指定 durable=1)和消息(delivery_mode=2)

      Queue属性

    • 持久性:如果启用,队列将会在server重启前都有效
    • 自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除掉自身
    • 惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明
    • 排他性:如果启用,队列只能被声明它的消费者使用
  • Binding

    将Exchange和Queue绑定在一起(会指定binding key),生产者将消息发送到Exchange时会指定routing key。routing key会和Exchange Type及binding key联合使用。

    • fanout型无视Binding-key,直接将消息发送到和该Exchange绑定的Queue中。
    • direct型则要求binding key和routing key完全匹配,将消息发送到完全匹配的Queue中。
    • topic型则要求binding key和routing key正则匹配
    • header型则不依赖binding key和routing key,而是根据消息中的header路由消息。
  • Virtual Host

    Virtual Host拥有一组Exchange、Queue、Binding,根据Virtual Host进行用户权限设置
    不同Virtual Host之间完全隔离,一般不能共享Exchange、Queue等

  • Channel
    消息通道,一个TCP连接中可建立多个channel,每个channel代表一个会话任务

  • Acknowledgment
    RabbitMQ为了保障消息的可靠性,在消费端和生产端都有相应的机制来确保

    • 消费端
      消费者在消费完Queue中的消息后会发送一个ack给RabbitMQ,RabbitMQ收到ack后确认消息被消费后从Queue中将消息移除。若RabbitMQ没有收到ack而消费者断开连接,RabbitMQ会将消息发送给其他消费者。RabbitMQ不会为没ack的消息做超时处理,会一直等待ack,除非消费者断开连接。
    • 生产端
      生产端消息的可靠性主要有两种方式
      • AMQP事务机制
        1. 生产者调用txSelect()方法将channel设置成transaction模式
        2. 生产者调用txCommit()方法发送消息,txCommit()成功则消息确认到达MQ;失败则调用txRollback()回滚事务。
      • channel confirm模式
        AMQP事务机制对生产端的性能会有影响,所以RabbitMQ提供了另外一种方式以确保可靠性,就是将channel设置成confirm模式。一旦channel进入confirm模式,每条消息会分配一个唯一ID(1开始),一旦消息到达MQ,MQ会发送携带ID的ack给生产者。为了提高性能RabbitMQ还提供了3中confirm模式:
        1. common: 发送一条消息确认一条
        2. batch: 批量发送一批消息,批量确认
        3. async: 提供回调方法,消息被确认后生产者调用该回调方法

RabbitMQ部署

安装依赖&hostname设置

1
2
yum -y install unixODBC unixODBC-devel libxslt xmlto ncurses-devel
echo '10.201.3.100 sd-3-centos100' >> /etc/hosts

安装simplejson

1
2
3
wget 'https://pypi.python.org/packages/source/s/simplejson/simplejson-3.8.1.tar.gz'
tar -zxf simplejson-3.8.1.tar.gz && cd simplejson-3.8.1
python setup.py build && python setup.py install

安装erlang

1
2
3
4
5
6
7
wget 'http://www.erlang.org/download/otp_src_18.1.tar.gz'
tar -zxf otp_src_18.1.tar.gz && otp_src_18.1
./configure --without-javac --prefix=/usr/local/erlang
make && make install
echo 'PATH=/usr/local/rabbitmq/sbin:/usr/local/erlang/bin:$PATH' >> /etc/profile
. /etc/profile
ln -s /usr/local/erlang/bin/escript /usr/sbin/escript

安装rabbitmq

1
2
3
wget 'http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-3.5.6.tar.gz'
tar -zxf rabbitmq-server-3.5.6.tar.gz && cd rabbitmq-server-3.5.6
make TARGET_DIR=/usr/local/rabbitmq SBIN_DIR=/usr/local/rabbitmq/sbin MAN_DIR=/usr/local/rabbitmq/man DOC_INSTALL_DIR=/usr/local/rabbitmq/doc install

配置rabbitmq

1
2
mkdir -p /etc/rabbitmq /usr/local/rabbitmq/data /usr/local/rabbitmq/log /usr/local/rabbitmq/plugins
cp /usr/local/rabbitmq/doc/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
1
2
3
4
5
6
# /etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_MNESIA_BASE=/usr/local/rabbitmq/data
RABBITMQ_LOG_BASE=/usr/local/rabbitmq/log
RABBITMQ_PLUGINS_DIR=/usr/local/rabbitmq/plugins
#RABBITMQ_NODE_PORT=5673
#RABBITMQ_NODENAME=rabbitmq-master

启动rabbitmq

1
2
3
4
5
6
. /etc/profile
/usr/local/rabbitmq/sbin/rabbitmq-server -detached
#/usr/local/rabbitmq/sbin/rabbitmq-server &

#停止rabbitqm
#/usr/local/rabbitmq/sbin/rabbitmqctl stop

启用插件

1
2
/usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management
/usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_mqtt

插件相关操作

  • 查询插件

    1
    2
    3
    4
    # 显示所有插件
    rabbitmq-plugins list -v
    # 显示已使用插件
    rabbitmq-plugins list -E
  • 启用插件

    1
    rabbitmq-plugins enable rabbitmq_management
  • 停用插件

    1
    rabbitmq-plugins disable rabbitmq_management

用户设置

1
2
3
/usr/local/rabbitmq/sbin/rabbitmqctl add_user <user_name> <user_passwd>
/usr/local/rabbitmq/sbin/rabbitmqctl set_user_tags <user_name> administrator
/usr/local/rabbitmq/sbin/rabbitmqctl delete_user guest

日志rotate

1
2
# crontab
/usr/local/rabbitmq/sbin/rabbitmqctl rotate_logs ."`date '+%Y-%m-%d'`"

RabbitMQ镜像集群

由于Erlang原因RaggitMQ集群不支持跨网段,若需要跨网段则使用shovel/federation等插件

安装RabbitMQ后,先停止RabbitMQ。/usr/local/rabbitmq/sbin/rabbitmqctl stop

集群hostname设置

集群所有节点拥有所有主机IP+hostname

1
2
3
# /etc/hosts
10.201.3.100 sd-3-centos100
10.201.3.49 sd-3-centos49

将其中一个机器(10.201.3.49)的$HOME/.erlang.cookie同步至所有机器。$HOME/.erlang.cookie权限必须400owergroup必须为启动用户(此处以root用户启动)

1
2
3
4
# 所有节点执行
scp $HOME/.erlang.cookie 10.201.3.100:$HOME
chmod 400 $HOME/.erlang.cookie
chown root:root $HOME/.erlang.cookie

组成集群

在其余所有节点执行加入集群命令

1
2
3
4
5
6
7
8
# 10.201.3.100 执行
# 启动rabbitmq
/usr/local/rabbitmq/sbin/rabbitmq-server -detached
# 加入集群
/usr/local/rabbitmq/sbin/rabbitmqctl stop_app
#/usr/local/rabbitmq/sbin/rabbitmqctl reset
/usr/local/rabbitmq/sbin/rabbitmqctl join_cluster rabbit@sd-3-centos49
/usr/local/rabbitmq/sbin/rabbitmqctl start_app

默认使用disk模式加入集群,可使用ram模式加入集群,但集群中必须有一个节点是disk模式。

1
/usr/local/rabbitmq/sbin/rabbitmqctl join_cluster --ram rabbit@sd-3-centos49

设置镜像队列策略

1
2
3
4
# 将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一直
/usr/local/rabbitmq/sbin/rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
# 查看集群状态
/usr/local/rabbitmq/sbin/rabbitmqctl cluster_status

镜像队列策略

1
set_policy [-p <vhostpath>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern>  <definition>
  • rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
    • ha-all: 策略名()
    • "^": 匹配Queue队列名表达式,"^"表示所有Queue
    • {"ha-mode":"all"}: 定义HA类型
      • all: 镜像队列将应用在整个集群。当一个新的节点加入后,该节点会获得所有数据的一份完全副本
      • exactly: 镜像队列在集群中复制count份副本。当集群节点数目少于count时,队列会复制到所有节点。 '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
      • nodes: 镜像队列会在node name中存在副本。'{"ha-mode":"nodes","ha-params":["<node_name>", "<node_name>"]}'
  • -p <vhostpath>,默认/
  • --priority <priority>,设置优先级,越高越优先
  • --apply-to <apply-to>,作用对象,queue、exchange、all

RabbitMQ 常用操作

用户操作

  • 查看用户

    1
    rabbitmqctl list_users
  • 添加用户

    1
    rabbitmqctl add_user <user_name> <user_password>
  • 修改密码

    1
    rabbitmqctl change_password <user_name> <new_password>
  • 授权管理员权限

    1
    rabbitmqctl set_user_tags <user_name> administrator
  • 删除用户

    1
    rabbitmqctl delete_user <user_name>

修改集群节点类型

将集群节点由disk类型改成ram类型,在该节点中执行:

1
2
3
4
5
6
rabbitmqctl stop_app
rabbitmqctl change_cluster_node_type ram
rabbitmqctl start_app

# 查看集群状态,确定节点类型
rabbitmqctl cluster_status

rabbitmqadmin 操作

rabbitmqadmin为额外命令,开启rabbitmq_management插件后可用

  • 列出所有 Queue

    1
    rabbitmqadmin list queues name
  • 删除 Queue

    1
    rabbitmqadmin delete queue name=<queue_name>
  • 列出操作

    1
    rabbitmqadmin list [users|vhosts|connections|exchanges|bindings|permissions|channels|consumers|queues|policies|nodes]

强制删除 Queue

1
2
3
rabbitmqctl eval 'Q = {resource, <<"VHOST NAME">>, queue, <<"QUEUE NAME">>}, rabbit_amqqueue:internal_delete(Q).'

rabbitmqctl eval 'rabbit_amqqueue:internal_delete({resource,<<"VHOST NAME">>,queue,<<"QUEUE NAME">>}).'
1
2
# 删除crash queue
rabbitmqctl eval '{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"VHOST">>, queue, <<"QUEUE">>)), rabbit_amqqueue:delete_crashed(Q).'

Federation 插件

安装

1
2
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management

创建Upstream

1
2
3
4
# rabbitmqctl set_parameter federation-upstream <upstream_name> '{"uri":"amqp://server-name","expires":3600000}'
# 两两互连需要设置max-hops=1
rabbitmqctl set_parameter federation-upstream <upstream_name> '{"uri":"amqp://server-name","expires":3600000,"max-hops":1}'
# rabbitmqctl set_parameter federation-upstream pytest '{"uri":"amqp://admin:feideeb907888@sz-7-centos225","expires":3600000,"max-hops":1}'

创建Policy

1
2
3
# rabbitmqctl set_policy --apply-to exchanges <policy_name> "<exchange_name>" '{"federation-upstream-set":"all"}'
rabbitmqctl set_policy --apply-to exchanges <policy_name> "^amq\." '{"federation-upstream-set":"all"}'
# rabbitmqctl set_policy --apply-to exchanges pytest_policy "^pytest.direct" '{"federation-upstream-set":"all"}'

检测 Federation 状态

1
rabbitmqctl eval 'rabbit_federation_status:status().'
1
rabbitmqctl list_exchanges name policy

RabbitMQ Python API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/env python

import pika


class RabbitMQAPI(object):
"""RabbitMQAPI"""

def __init__(self,
rabbitmq_host,
rabbitmq_queue,
rabbitmq_user,
rabbitmq_password,
rabbitmq_msg,
rabbitmq_port=5672,
rabbitmq_virtual_host='/'):

self.rabbitmq_host = rabbitmq_host
self.rabbitmq_port = rabbitmq_port
self.rabbitmq_queue = rabbitmq_queue
self.rabbitmq_virtual_host = rabbitmq_virtual_host
self.rabbitmq_user = rabbitmq_user
self.rabbitmq_password = rabbitmq_password
self.rabbitmq_msg = rabbitmq_msg
self.rabbitmq_credentials = pika.PlainCredentials(
self.rabbitmq_user, self.rabbitmq_password)

def connect(self):
"""Connect RabbitMQ"""
try:
rabbitmq_connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.rabbitmq_host,
port=self.rabbitmq_port,
virtual_host=self.rabbitmq_virtual_host,
credentials=self.rabbitmq_credentials))
except (Exception) as e:
raise e
return rabbitmq_connection

def create_channel(self):
"""create rabbitmq channel"""
self.rabbitmq_connection = self.connect()
self.rabbitmq_channel = self.rabbitmq_connection.channel()
self.rabbitmq_channel.queue_declare(
queue=self.rabbitmq_queue, durable=True)

def producer(self):
"""
producer message to rabbitmq
delivery_mode 2 make message persistent
"""

self.create_channel()
self.rabbitmq_channel.basic_publish(
exchange='',
routing_key=self.rabbitmq_queue,
body=self.rabbitmq_msg,
properties=pika.BasicProperties(delivery_mode=2))

def close_connect(self):
"""close rabbitmq connect"""
self.rabbitmq_connection.close()

def callback(self, ch, method, properties, body):
"""consumer callback function"""
print("receive: %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)

def consumer(self):
"""rabbitmq consumer"""
self.create_channel()
self.rabbitmq_channel.basic_consume(
self.callback, queue=self.rabbitmq_queue, no_ack=False)
try:
self.rabbitmq_channel.start_consuming()
except (KeyboardInterrupt):
print("stop consuming and exit")
self.rabbitmq_channel.stop_consuming()
finally:
self.close_connect()


if __name__ == '__main__':
rabbitmq_host = '10.201.3.49'
rabbitmq_port = 5672
rabbitmq_virtual_host = '/'
rabbitmq_queue = 'pytest'
rabbitmq_user = 'admin'
rabbitmq_password = ''
rabbitmq_msg = "Hello!"

rabbitmq = RabbitMQAPI(rabbitmq_host, rabbitmq_queue,
rabbitmq_user, rabbitmq_password,
rabbitmq_msg)
rabbitmq.producer()
rabbitmq.close_connect()
rabbitmq.consumer()

Powered: Hexo, Theme: Nadya remastered from NadyMain