openresty 学习笔记番外篇:rabbitmq

之所以要捣鼓这个,一方面是规划lua与redis交互后把redis与mysql的同步事务通过MQ发布出去。其他程序订阅该频道后去处理写入mysql的操作。另一方面,在实际情况中进程会遇到需要通过消息队列来处理各个环节间的延迟和阻塞带来的影响。

MQ的集群可扩展性、跨语言开发也确实很吸引。

在对比各个MQ平台后,对可靠性和成熟程度、各种语言的支持库和各种MQ协议的支持。最终选择rabbitMQ。

Centos7 安装 rabbitmq

yum安装法

安装erlang

wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
yum localinstall erlang-19.0.4-1.el7.centos.x86_64.rpm
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.9/rabbitmq-server-3.6.9-1.el7.noarch.rpm
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc ###需要先导入key
yum install rabbitmq-server-3.6.9-1.noarch.rpm

启动rabbitmq

systemctl daemon-reload
systemctl enable  rabbitmq-server
systemctl start rabbitmq-server.service

若启动失败检测端口是否被占用

4369 (epmd), 25672 (Erlang distribution)
5672, 5671 (AMQP 0-9-1 without and with TLS)
15672 (if management plugin is enabled)
61613, 61614 (if STOMP is enabled)
1883, 8883 (if MQTT is enabled)

参考:http://www.rabbitmq.com/install-rpm.html

RabbitMQ Server 配置

复制一份完整的示例配置,只要把其中想要配置的部分注释去掉并根据自己的情况进行更改就可以了

cp /usr/share/doc/rabbitmq-server-3.6.2/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
vim /etc/rabbitmq/rabbitmq.config

参考:
http://www.rabbitmq.com/configure.html

添加web管理插件

rabbitmq-plugins enable rabbitmq_management

配置主要在配置文件的rabbitmq_management部分进行添加,比如对web界面的端口和允许访问的IP进行限制

{rabbitmq_management,
 [
   {listener, [{port,     15671},
               {ip,       "127.0.0.1"}
              ]
   }
 ]
},

添加STOMP适配器

rabbitmq-plugins enable rabbitmq_stomp

配置方面可以修改,地址限制和端口还有默认用户等等

{rabbitmq_stomp,
 [
  {tcp_listeners, [{"127.0.0.1", 61613}]
  }
 ]
},

RabbitMQ 日志、用户、client

日志目录 /var/log/rabbitmq

默认virtual host与默认user
virtual host: /
user:guest
passwd:guest

默认用户只可在localhost上连至/且有全部权限

添加用户命令

rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator

STOMP v1.2

Stomp是一个简单的消息文本协议,它的设计核心理念就是简单与可用性,官方文档:
http://stomp.github.io/stomp-specification-1.2.html

rabbitmq安装了stomp适配器后支持该协议,下面先了解该协议的基本流程框架。先采用 centos 的netcat 进行调试

yum install nc

1.CONNECT 建立连接

当 Client 向 Server 发送一个CONNECT Frame,就向服务器发起了一个连接请求,此时服务器端返回一个CONNECTED Frame表示建立连接成功,还可以发起连接可以指定版本和登陆用户


CONNECT
accept-version:1.2
login:guest
passcode:guest
host:/

^@

#返回结果

^@

CONNECTED
server:RabbitMQ/3.6.2
session:session-lln_3mVKRWkDovu0CUQGxQ
heart-beat:0,0
version:1.2

2.SEND 消息传递

receipt 服务端收到返回receipt-id
persistent 是否持久化

SEND
destination:/topic/sync_mysql
receipt:luaid1
app-id:luaresty
delivery-mode:2
persistent:true
content-type:json/application

hello
^@

#返回结果

RECEIPT
receipt-id:luaid1

3.DISCONNECT 断开连接

receipt 服务端会返回同样的字符串进行确认

DISCONNECT
receipt:100

4.SUBSCRIBE 订阅

SUBSCRIBE
id:python
destination:/amq/queue/python_sync_mysql
ack:client

^@

5.ACK 确认

当ack:client时,客户端收到服务端信息之后必须回复ACK帧。如果在收到客户端回复的ACK之前连接断开,服务端会认为这个消息没有被处理而改发给其他客户端。客户端回复的ACK会被当做累加的处理。这意味着对信息的确认操作不仅仅是确认了这单个的消息,还确认了这个订阅之前发送的所有消息(即接收到一个确认消息就会把之前的消息一起确认掉,批量操作)。
如果客户端没有处理收到的消息,它应该回复一个NACK告诉服务端他没有处理该消息。

ACK       
id:T_python@@session-RkbWBHO9WAlr_f1MoI58kw@@1

^@

6.UNSCUBSCRIBE 取消订阅

UNSUBSCRIBE
id:python

^@

lua-resty-rabbitmqstomp

环境部署好了,协议流程也了解了,可以使用lua的rabbitmq库来实验一下stomp协议。
https://github.com/wingify/lua-resty-rabbitmqstomp

分析其提供共的例子:

local cjson = require "cjson"
local rabbitmq = require "resty.rabbitmqstomp"

--------CONNECT--------
local opts = { username = "guest",
               password = "guest",
               vhost = "/" }

local mq, err = rabbitmq:new(opts)    //输入用户名密码和虚拟主机名称创建一个对象

if not mq then
      return
end

mq:set_timeout(10000)

local ok, err = mq:connect("127.0.0.1",61613) //建立连接

if not ok then
    return
end
--------CONNECT END--------

--------SEND--------
local msg = {key="value1", key2="value2"}
local headers = {}
headers["destination"] = "/exchange/test/binding"
headers["receipt"] = "msg#1"
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"

local ok, err = mq:send(cjson.encode(msg), headers)
if not ok then
    return
end
ngx.log(ngx.INFO, "Published: " .. msg)
--------SEND END--------

--------SUBSCRIBE--------
local headers = {}
headers["destination"] = "/amq/queue/queuename"
headers["persistent"] = "true"
headers["id"] = "123"

local ok, err = mq:subscribe(headers)
if not ok then
    return
end
--------SUBSCRIBE END--------

--------RECEIVE--------
local data, err = mq:receive()
if not data then
    return
end
ngx.log(ngx.INFO, "Consumed: " .. data)
--------RECEIVE END--------

--------UNSUBSCRIBE--------
local headers = {}
headers["persistent"] = "true"
headers["id"] = "123"

local ok, err = mq:unsubscribe(headers)
--------UNSUBSCRIBE END--------

--------DISCONNECT--------
local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
    return
end
--------DISCONNECT END--------

初步做了下二次封装,这里只是做了send的方法,订阅的目前lua还没用到

local rabbitmq = require "resty.rabbitmqstomp"

local _M = {}
_M._VERSION = '0.01'

local mt = { __index = _M }
local msg_no = 1

function _M.new(self, opts)
    opts = opts or {}
    return setmetatable({
            mq_host = opts.host or '127.0.0.1',
            mq_port = opts.port or 61613,
            mq_timeout = opts.timeout or 10000,
            mq_user = opts.username or 'guest',
            mq_password = opts.password or 'guest',
            mq_vhost = opts.vhost or "/"}, mt)
end

function _M.get_connect(self)

    local mq, err = rabbitmq:new({ username = self.mq_user,
                                   password = self.mq_password,
                                   vhost = self.mq_vhost })

    if not mq then
        return false,err
    end

    mq:set_timeout(self.mq_timeout)

    local ok, err = mq:connect(self.mq_host,self.mq_port) 

    if not ok then
        return false,err
    end

    return true,mq

end

function _M.send(self , destination, msg)

    local ret, client = self:get_connect()
    if not ret then
        return false,client
    end

    local send_receipt_id = "msgid"..msg_no

    local headers = {}
    headers["destination"] = destination
    headers["receipt"] = send_receipt_id
    headers["app-id"] = "luaresty"
    headers["persistent"] = "true"
    headers["content-type"] = "application/json"

    msg_no = msg_no + 1

    local ok, err = client:send(msg, headers)
    if not ok then
        return false,err
    end

    local _,str_start = string.find(ok, "receipt%-id", 1)
    local str_end = string.find(ok, "\n\n", 1)
    if str_start == nil or str_end == nil then
        return false,"send receipt not     receive"
    end

    local receipt_id = string.sub(ok, str_start + 2 ,str_end - 1)
    if receipt_id ~= send_receipt_id then
        return false,"receipt id not right"
    end

    local ok, err = client:set_keepalive(10000, 10000)

    return true,send_receipt_id
end


return _M

仅有一条评论

  1. Hello. http://jakshgy773733.us

添加新评论