zhaofengze
作者zhaofengze·2018-03-20 00:43
项目经理·ca

rabbit mq

字数 8039阅读 1293评论 0赞 1

RabbitMQ
一个独立的开源实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。RabbitMQ发布在Ubuntu、FreeBSD、Centos中的实现
4.RabbitMQ是什么
RabbitMQ是一个遵循AMQP协议的消息中间件,它从生产者接收消息并递送给消费者,在这个过程中,根据规则进行路由,缓存与持久化。
5.RabbitMQ能为你做些什么?
消息系统允许软件、应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用,或者将用户设备和数据进行连接。消息系统通过将消息的发送和接收分离来实现应用程序的异步和解偶。
或许你正在考虑进行数据投递,非阻塞操作或推送通知。或许你想要实现发布/订阅,异步处理,或者工作队列。所有这些都可以通过消息系统实现。
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
6.RabbitMQ的结构图

几个概念说明:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
Producer:消息生产者,就是投递消息的程序。
Consumer:消息消费者,就是接受消息的程序。
Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
消息队列的使用过程大概如下:
客户端连接到消息队列服务器,打开一个channel。
客户端声明一个exchange,并设置相关属性。
客户端声明一个queue,并设置相关属性。
客户端使用routing key,在exchange和queue之间建立好绑定关系。
客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里
exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号” ”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc. ”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
exchange持久化,在声明时指定durable => 1
queue持久化,在声明时指定durable => 1
消息持久化,在投递时指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
7.RabbitMQ之交换机
消息(Message)由Client发送,RabbitMQ接收到消息之后通过交换机转发到对应的队列上面。Worker会从队列中获取未被读取的数据处理。
RabbitMQ包含四种不同的交换机类型:
Direct exchange:直连交换机,转发消息到routigKey指定的队列,如果消息的routigKey和binding的routigKey直接匹配的话,消息将会路由到该队列
Fanout exchange:扇形交换机,转发消息到所有绑定队列(速度最快),不管消息的routigKey息和binding的参数表头部信息和值是什么,消息将会路由到所有的队列
Topic exchange:主题交换机,按规则转发消息(最灵活),如果消息的routigKey和binding的routigKey符合通配符匹配的话,消息将会路由到该队列
Headers exchange:首部交换机 ,如果消息的头部信息和binding的参数表中匹配的话,消息将会路由到该队列。

openstack
1.rabbitmq基础应用
消息型中间件
AMQP 协议:高级消息队列协议
路由模型:
direct
topic
fan-out
headers
broker:
exchange
binding
queue

yum install rabiitmq-server -y
rabiitmq-plugins enable rabbitmq_managment
rabiitmq-plugins list
systemctl restart rabbitmq-server.services
配置方式:
环境变量:网络参数及配置文件路径
配置文件:服务器各组件访问权限、资源限制、插件及集群限制
运行时参数:集群的运行时参数设定
rabbitmqctl add_user test pass

./rabbitmqctl add_user admin admin123

./rabbitmqctl set_user_tags admin administrator

./rabbitmqctl add_vhost /myhost1

./rabbitmqctl delete /myhost1

Commands:

stop [<pid_file>]

shutdown
stop_app
start_app
wait <pid_file>
reset
force_reset
rotate_logs <suffix>
hipe_compile <directory>

join_cluster <clusternode> [--ram]
cluster_status
change_cluster_node_type disc | ram
forget_cluster_node [--offline]
rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2 ...]
update_cluster_nodes clusternode
force_boot
sync_queue [-p <vhost>] queue
cancel_sync_queue [-p <vhost>] queue
purge_queue [-p <vhost>] queue
set_cluster_name name

用户管理:

add_user <username> <password>
delete_user <username>
change_password <username> <newpassword>
clear_password <username>
authenticate_user <username> <password>
set_user_tags <username> <tag> ...
list_users

权限管理:

add_vhost <vhost>
delete_vhost <vhost>
list_vhosts [<vhostinfoitem> ...]
set_permissions [-p <vhost>] <user> <conf> <write> <read>
clear_permissions [-p <vhost>] <username>
list_permissions [-p <vhost>]
list_user_permissions <username>

set_parameter [-p <vhost>] <component_name> <name> <value>
clear_parameter [-p <vhost>] <component_name> <key>
list_parameters [-p <vhost>]
set_global_parameter <name> <value>
clear_global_parameter <name>
list_global_parameters

set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] 

<name> <pattern> <definition>

clear_policy [-p <vhost>] <name>
list_policies [-p <vhost>]

list_queues [-p <vhost>] [--offline|--online|--local] [<queueinfoitem> ...]
list_exchanges [-p <vhost>] [<exchangeinfoitem> ...]
list_bindings [-p <vhost>] [<bindinginfoitem> ...]
list_connections [<connectioninfoitem> ...]
list_channels [<channelinfoitem> ...]
list_consumers [-p <vhost>]
status
node_health_check
environment
report
eval <expr>

close_connection <connectionpid> <explanation>
trace_on [-p <vhost>]
trace_off [-p <vhost>]
set_vm_memory_high_watermark <fraction>
set_vm_memory_high_watermark absolute <memory_limit>
set_disk_free_limit <disk_limit>
set_disk_free_limit mem_relative <fraction>
encode [--decode] [<value>] [<passphrase>] [--list-ciphers] [--list-hashes] 

RabbitMQ主要是有这么几个部分,这张图能说明RabbitMQ:

Publisher:

      是Message的生产者,Publisher这个Clients产生了一些Message。

Consumer:

      Message的消费者,Publisher产生的Message,最终要到达Consumer这个Clients,进行消费。

Exchange:

     指定消息按什么规则,路由到哪个Queue,Message消息先要到达Exchange,在Server中承担着从Produce接收Message的责任。

Queue:

     到达Exchange的消息,根据制定的规则(Routing key)到达对应    的Queue,在Server中承担着装载Message,是Message的容器,等待被消费出去。

Routing key:

     在Exchange和Queue之间隐藏有一条黑线,可以将这条黑线看成是Routing key,Exchange就是根据这些定义好的Routing key将Message送到对应的Queue中去,是Exchange和Queue之间的桥梁。

Broker:

    之前一直不理解这个Broker,其实Broker就是接收和分发消息的应用,也就是说RabbitMQ Server就是Message Broker。

VirtualHost:

    虚拟主机,一个Broker里可以开有多个VirtualHost,它的作用是用作不同用户的权限分离。 

Connection:

   是Publisher/Consumer和Broker之间的TCP连接。断开连接的操作只会在Publisher/Consumer端进行,Broker不会断开连接,除非出现网络故障或者Broker服务出现问题,Broker服务宕了。

Connection: Channel:

    如果每一次访问RabbitMQ就建立一个Connection,那在消息量大的时候建立TCP Connection的开销就会很大,导致的后果就是效率低下。

左边的Client向右边的Client发送消息,流程:

   第一:获取Conection
   第二:获取Channel
   第三:定义Exchange,Queue
   第四:使用一个RoutingKey将Queue Binding到一个Exchange上
   第五:通过指定一个Exchange和一个RoutingKey来将消息发送到对应的Queue上,
   第六:Consumer在接收时也是获取connection,接着获取channel,然后指定一个Queue,到Queue上取消息,它对Exchange,RoutingKey及如何Binding都不关心,到对应的Queue上去取消息就行了。       
    一个Publisher Client发送消息,哪些Consumer Client可以收到消息,在于Exchange,RoutingKey,Queue的关系上。

CLUSTER
实现:apache(qpid,activemq)

    RabbitMQ: erlang语言研发
    kafka:一般在facebook级别的公司使用

    0MQ,ZMQ,ZeroMQ:基于库的调用,没有中间件,性能极好
安装:
    rabbitmq-server (epel源)
        插件:rabbitmq-plugins {enable|disable|list}

        rabbit-server有多个监听端口,rabbitmq_management监听于16672端口

    配置方式:

        环境变量:/etc/rabbitmq/rabbitmq-env.conf
        RABBITMQ_BASE: 数据库和日志文件
        RABBITMA_CONFIGFILE:
        RABBITMA_LOGS
        RABBITMQ_NODE_IP_ADDRESS
        RABBITMQ_NODE_PORT
        RABBITMQ_PLUGIN_DIR

    配置文件:
        auth_mechanisms: 认机机制    
        default_user: guest
        default_pass: guest
        default_permission
        dis_free_limit
        heartbeat
        hipe_compile
        log_levels {none| error|waring|info}
        tcp_listeners: 监听的地址和端口
        ssl_listeners:基于ssl通信协议鉴定的地址和端口
        vm_memory_hight_watermark

    运行时参数:
        rabbitmqctl命令:
        命令行语法风格:使用下划线    
        rabbitctl status 查看服务状态
        rabbitctl stop_app 不指定则停止所有应用
        rabbitctl start_app    
        rabbitctl add_user <username> <password>
        rabbitctl set_user_tags <username> <tags>
        list_users

    权限管理:

    组件查看命令:

    broker状态查看

    环境变量产看

    执行erlang底层命令

        eval <expr>

    关闭指定连接            
        rabbitctl list_permissions -p VHOST 指定虚拟主机,不指定则显示根(以主机为中心)
        rabbitctl list_user_permissions    (以用户为中心的参数查询)

rabbitmq ha :
rabbitmq 作为整个系统非常重要的节点。所以必须做HA及负载均衡
负载均衡通常通过LB的方式haproxy
HA功能用软件自带功能去实现

rabbitmq cluster:
    (1)设置各节点主机名
        hostnamectl ste-hostname HOSTNAME

        确保对方的都能解析节点名称

    (2)将cookie复制到各节点,保持cokkie一致

    (3)停止rabbitmq服务后手动启动
        # service rabbitmq stop

        # rabbitmq-server -detached

        可使用rabbitmq cluster_status查看集群状态


    (4)停止其中一个节点的应用
        rabbitctl stop_app (如停止node2)

        rabbitmqctl join_cluster rabbit@NODE_NAME(在node2执行命令)     


    容易出现的错误:集群名称需要和节点名一致,负责将无法添加节点


基于haproxy的LB集群:
    listen rabbitmq:5672
        mod tcp
        status enable
        banlance roundrobin
        server rabbit01 IP:PORT check inter 5000
        server rabbit02 IP:PORT check inter 5000

如果觉得我的文章对您有用,请点赞。您的支持将鼓励我继续创作!

1

添加新评论0 条评论

Ctrl+Enter 发表

相关问题

相关资料

X社区推广