集群架构
虚拟机规划
IP |
hostname |
节点说明 |
端口 |
控制台地址 |
192.168.247.150 |
rabbitmq.master |
rabbitmq master |
5672 |
http://192.168.247.150:15672 |
192.168.247.151 |
rabbitmq.s.o |
rabbitmq slave |
5672 |
http://192.168.247.151:15672 |
192.168.247.152 |
rabbitmq.s.t |
rabbitmq slave |
5672 |
http://192.168.247.152:15672 |
192.168.247.153 |
haproxy.k.m |
haproxy+keepalived |
8100 |
http://192.168.247.153:8100/rabbitmq-stats |
192.168.247.154 |
haproxy.k.s |
haproxy+keepalived |
8100 |
http://192.168.247.154:8100/rabbitmq-stats |
镜像模式
- 镜像模式: 集群模式非常经典的就是Mirror镜像模式, 保证100%数据不丢失, 在实际工作中也是用的最多的, 并且实现集群非常的简单, 一般互联网大厂都会构建这种镜像集群模式
- Mirror镜像队列, 目的是为了保证RabbitMQ数据的高可用性解决方案, 主要就是实现数据的同步, 一般来讲是2-3个节点实现数据同步[一般都是3节点+(奇数个节点)](对于100%数据可靠性解决方案一般都是3节点)集群架构如下
服务器规划
- 架构: RabbitMQ Cluster + Queue HA + Haproxy + Keepalived
- 解释: 3台rabbitMQ服务器构建broker集群,允许任意2台服务器故障而服务不受影响,在此基础上,通过Queue HA (queue mirror)实现队列的高可用,在本例中镜像到所有服务器节点(即1个master,2个slave);为保证客户端访问入口地址的唯一性,通过haproxy做4层代理来提供MQ服务,并通过简单的轮询方式来进行负载均衡,设置健康检查来屏蔽故障节点对客户端的影响;使用2台haproxy并且通过keepalived实现客户端访问入口的高可用机制。
服务架构设计
参考资料
官方文档手册:
集群配置文档:Clustering Guide — RabbitMQ
镜像队列文档:Classic Queue Mirroring — RabbitMQ
集群操作文档:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
中文版AMQP用户手册:
Spring AMQP文档:http://www.blogjava.net/qbna350816/category/55056.html?Show=All
事务文档:http://www.blogjava.net/qbna350816/archive/2016/08/13/431567.html
集群架构搭建
配置HOST[5台]
5台服务器配置Host, 参考虚拟机规划
vi /etc/hostname 150改为rabbitmqmaster 151改为rabbitmqso 152改为rabbitmqst 153改为haproxykm 154改为haproxyks vi /etc/hosts 192.168.247.150 rabbitmqmaster 192.168.247.151 rabbitmqso 192.168.247.152 rabbitmqst 192.168.247.153 haproxykm 192.168.247.154 haproxyks
都修改完成后重启: 一定要重启, 我在这里就碰到了个大坑, 应为没有重启, 所以导致hostname没有生效 添加集群节点一致报错,我丢 重启后关闭防火墙
安装依赖环境[5台]
yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
安装RabbitMQ[三台]
150,151,152
# 我用的直接就是3.10.7版本, 我没有找到老版本的, 而且现在应该也都是新版本的了 1: 安装参考安装与启动, 建议三台同时进行 2: 关闭5台电脑的防火墙
官方的部署文档, 应为是英文的, 看起来优点蒙圈, 哎, 探索新版本总是困难的, 但是也总要有人前行
访问控制台[三台]
在虚拟机规划中有控制台地址
不知道为什么, 150,151,可以访问, 152又提示不是私密连接, 我就又创建了一个账户就可以了
150 toor 123456 151 toor 123456 152 toor 123456
重启所有的RabbitMQ[三台]
# 停止 rabbitmqctl stop # 检测端口 lsof -i:5672 lsof -i:15672 # 启动 rabbitmq-server -detached
文件同步步骤[150->151,152]
因为我是最新版本安装的所以.erlang.cookie并没有在var/lib/rabbitmq下面, 而是在root下
scp /root/.erlang.cookie 192.168.247.151:/root/ scp /root/.erlang.cookie 192.168.247.152:/root/
选择150,151,152任意一个节点为Master, 我选择150, 也就是说需要把150的cookie文件同步到151,152节点上, 进入/root目录, ll -a才可以看到隐藏文件, 使用上面的命令发送就可以
注意: 需要在三台服务器RabbitMQ都启动的情况下, 去同步
节点加入集群[151,152]
150直接启动即可
# 启动 rabbitmq-server -detached
151,152, 将自身加入集群
# 停止app rabbitmqctl stop_app # 将节点加入集群 --ram是采用内存的方式 151,采用内存, 152不加, 默认使用磁盘 rabbitmqctl join_cluster --ram rabbit@rabbitmqmaster # 启动app rabbitmqctl start_app
控制台查看集群
查看集群状态
# 任意节点 rabbitmqctl cluster_status
配置镜像队列
设置镜像队列策略
# 任意节点 rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态一致,RabbitMQ高可用集群就已经搭建好了,我们可以重启服务,查看其队列是否在从节点同步
移除集群节点[扩展]
// 在要移除的节点上执行 rabbitmqctl forget_cluster_node rabbit@bhz71
修改集群名称(默认为第一个node名称)[扩展]
# 任意节点 rabbitmqctl set_cluster_name rabbitmq_cluster1
RabbitMQ配置位置
我一般不修改, 正式环境有运维, 开发的话, 能跑就行
/usr/local/rabbitmq/plugins/rabbit-3.10.7/ebin/rabbit.app
测试集群
测试队列同步
在150上新建队列
队列开始同步
同步镜像队列两个
151,152都已经同步
安装Ha-Proxy[153,154]
简介
- HAProxy是一款提供高可用性、负载均衡以及基于TCP和HTTP应用的代理软件,HAProxy是完全免费的、借助HAProxy可以快速并且可靠的提供基于TCP和HTTP应用的代理解决方案。
- HAProxy适用于那些负载较大的web站点,这些站点通常又需要会话保持或七层处理。
- HAProxy可以支持数以万计的并发连接,并且HAProxy的运行模式使得它可以很简单安全的整合进架构中,同时可以保护web服务器不被暴露到网络上。
安装
# 下载 wget http://www.haproxy.org/download/1.6/src/haproxy-1.6.6.tar.gz # 解压 tar -zxvf haproxy-1.6.6.tar.gz # 进入文件夹 cd haproxy-1.6.6/ # 编译 make target=linux31 prefix=/usr/local/haproxy # 安装 make install prefix=/usr/local/haproxy # 创建文件夹 mkdir /etc/haproxy # 权限 groupadd -r -g 149 haproxy useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy # 创建配置文件 touch /etc/haproxy/haproxy.cfg # haproxy 配置文件haproxy.cfg详解 # 编辑配置文件 vi /etc/haproxy/haproxy.cfg
配置文件
#logging options global log 127.0.0.1 local0 info maxconn 5120 chroot /usr/local/haproxy uid 99 gid 99 daemon quiet nbproc 20 pidfile /var/run/haproxy.pid defaults log global #使用4层代理模式,”mode http”为7层代理模式 mode tcp #if you set mode to tcp,then you nust change tcplog into httplog option tcplog option dontlognull retries 3 option redispatch maxconn 2000 contimeout 10s ##客户端空闲超时时间为 60秒 则HA 发起重连机制 clitimeout 10s ##服务器端链接超时时间为 15秒 则HA 发起重连机制 srvtimeout 10s #front-end IP for consumers and producters listen rabbitmq_cluster bind 0.0.0.0:5672 #配置TCP模式 mode tcp #balance url_param userid #balance url_param session_id check_post 64 #balance hdr(User-Agent) #balance hdr(host) #balance hdr(Host) use_domain_only #balance rdp-cookie #balance leastconn #balance source //ip #简单的轮询 balance roundrobin #rabbitmq集群节点配置 #inter 每隔五秒对mq集群做健康检查, 2次正确证明服务器可用,2次失败证明服务器不可用,并且配置主备机制 server rabbitmqmaster 192.168.247.150:5672 check inter 5000 rise 2 fall 2 server rabbitmqso 192.168.247.151:5672 check inter 5000 rise 2 fall 2 server rabbitmqst 192.168.247.152:5672 check inter 5000 rise 2 fall 2 #配置haproxy web监控,查看统计信息 listen stats # 154改为154 bind 192.168.247.153:8100 mode http option httplog stats enable #设置haproxy监控地址为http://localhost:8100/rabbitmq-stats stats uri /rabbitmq-stats stats refresh 5s
启动
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
查看服务
http://192.168.247.153:8100/rabbitmq-stats http://192.168.247.154:8100/rabbitmq-stats
停止[扩展]
killall haproxy ps -ef | grep haproxy netstat -tunpl | grep haproxy ps -ef |grep haproxy |awk '{print $2}'|xargs kill -9
安装Keepalived[153,154]
简介
- 之前在Nginx已经用过Keepalived了
- Keepalived,它是一个高性能的服务器高可用或热备解决方案,Keepalived主要来防止服务器单点故障的发生问题,可以通过其与Nginx、Haproxy等反向代理的负载均衡服务器配合实现web服务端的高可用。Keepalived以VRRP协议为实现基础,用VRRP协议来实现高可用性(HA).VRRP(Virtual Router Redundancy Protocol)协议是用于实现路由器冗余的协议,VRRP协议将两台或多台路由器设备虚拟成一个设备,对外提供虚拟路由器IP(一个或多个)。
安装
我直接采用2.0.18版本, 上传到linux
tar -zxvf keepalived-2.0.18.tar.gz cd keepalived-2.0.18/ yum install libnl libnl-devel ./configure --prefix=/usr/local/keepalived --sysconf=/etc make && make install cd /etc/keepalived/ vi keepalived.conf
153配置
! Configuration File for keepalived global_defs { router_id haproxykm ##标识节点的字符串,通常为hostname } vrrp_script chk_haproxy { script "/etc/keepalived/haproxy_check.sh" ##执行脚本位置 interval 2 ##检测时间间隔 weight -20 ##如果条件成立则权重减20 } vrrp_instance VI_1 { state MASTER ## 主节点为MASTER,备份节点为BACKUP interface ens33 ## 绑定虚拟IP的网络接口(网卡),与本机IP地址所在的网络接口相同(我这里是eth0) virtual_router_id 74 ## 虚拟路由ID号(主备节点一定要相同) mcast_src_ip 192.168.247.153 ## 本机ip地址 priority 100 ##优先级配置(0-254的值) nopreempt advert_int 1 ## 组播信息发送间隔,俩个节点必须配置一致,默认1s authentication { ## 认证匹配 auth_type PASS auth_pass bhz } track_script { chk_haproxy } virtual_ipaddress { 192.168.247.160 ## 虚拟ip,可以指定多个 } }
154配置
! Configuration File for keepalived global_defs { router_id haproxyks ##标识节点的字符串,通常为hostname } vrrp_script chk_haproxy { script "/etc/keepalived/haproxy_check.sh" ##执行脚本位置 interval 2 ##检测时间间隔 weight -20 ##如果条件成立则权重减20 } vrrp_instance VI_1 { state BACKUP ## 主节点为MASTER,备份节点为BACKUP interface ens33 ## 绑定虚拟IP的网络接口(网卡),与本机IP地址所在的网络接口相同(我这里是eth0) virtual_router_id 74 ## 虚拟路由ID号(主备节点一定要相同) mcast_src_ip 192.168.247.154 ## 本机ip地址 priority 80 ##优先级配置(0-254的值) nopreempt advert_int 1 ## 组播信息发送间隔,俩个节点必须配置一致,默认1s authentication { ## 认证匹配 auth_type PASS auth_pass bhz } track_script { chk_haproxy } virtual_ipaddress { 192.168.247.160 ## 虚拟ip,可以指定多个 } }
编写自动检测脚本
touch /etc/keepalived/haproxy_check.sh chmod +x /etc/keepalived/haproxy_check.sh vi /etc/keepalived/haproxy_check.sh
脚本内容
#!/bin/bash COUNT=`ps -C haproxy --no-header |wc -l` if [ $COUNT -eq 0 ];then /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg sleep 2 if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then killall keepalived fi fi
启动
# 记得先启动haproxy, 再启动Keepalived /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg # 查看进程 ps -ef | grep haproxy # 将Keepalived注册为系统服务 cd /home/software/keepalived-2.0.18/keepalived/etc/ cp init.d/keepalived /etc/init.d/ cp sysconfig/keepalived /etc/sysconfig/ systemctl daemon-reload # 启动Keepalived systemctl [start | stop | status | restart] keepalived systemctl start keepalived.service systemctl stop keepalived.service # 查看服务 ps -ef|grep keepalived
高可用测试
160默认在153主Keepalived上
节点宕机测试
停止153上的keepalived
查看153 ip
160的vip已经不再了, 查看154
已经自动绑定154了, 重启153
重启后160重新漂移回153上, 高可用测试ok
队列功能代码测试
直接使用最简单的hello world程序测试, IP使用虚拟的VIP 160
消费者
package com.dance.redis.mq.rabbit.helloworld; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class Receiver { private static final String QUEUE_NAME = "queue-test"; private static final String IP_ADDRESS = "192.168.247.160"; private static final int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Address[] address = new Address[]{ new Address(IP_ADDRESS, PORT) }; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("toor"); factory.setPassword("123456"); // 这里的连接方式与生产者的demo略有不同,注意区别。 Connection connection = factory.newConnection(address); //创建连接 final Channel channel = connection.createChannel();//创建信道 channel.basicQos(64);//设置客户端最多接收未被ack的消息个数 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("recvive message:" + new String(body)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); //等待回调函数执行完毕之后,关闭资源。 TimeUnit.SECONDS.sleep(50); channel.close(); connection.close(); } }
生产者
package com.dance.redis.mq.rabbit.helloworld; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitProducer { private static final String EXCHANGE_NAME = "exchange-test"; private static final String ROUTING_KEY = "text.*"; private static final String QUEUE_NAME = "queue-test"; private static final String IP_ADDRESS = "192.168.247.160"; private static final int PORT = 5672; //RabbitMQ服务默认端口为5672 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setVirtualHost("/"); factory.setUsername("toor"); factory.setPassword("123456"); Connection connection = factory.newConnection();//创建连接 Channel channel = connection.createChannel();//创建信道 //创建一个type="topic"、持久化的、非自动删除的交换器。 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null); //创建一个持久化、非排他的、非自动删除的队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //将交换机与队列通过路由键绑定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); //发送一条持久化消息:Hello World! String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); //关闭资源 channel.close(); connection.close(); } }
我的账户和密码是toor和123456, 请改为自己的
启动消费者
启动生产者
查看消费者
消费成功, ip也是160
查看控制台 exchange
多了exchange-test 功能是ha-all
队列也是, 到此镜像队列+高可用已经实现
也可以在HA的控制台上查看统计
集群架构回顾
我们实现的是RabbitMQ Cluster + Mirror Queue + Haproxy + Keepalived
RabbitMQ Cluster 3台
Mirror Queue RabbitMQ集群方式
Haproxy 反向代理
Keepalived Haproxy集群检测, 虚拟VIP, 实现统一IP对外提供
架构图手绘
emm, 就是这样一个架构, 我应该李姐的挺到位的