openresty 学习笔记番外篇:rabbitmq
之所以要捣鼓这个,一方面是规划lua与redis交互后把redis与mysql的同步事务通过MQ发布出去。其他程序订阅该频道后去处理写入mysql的操作。另一方面,在实际情况中进程会遇到需要通过消息队列来处理各个环节间的延迟和阻塞带来的影响。
MQ的集群可扩展性、跨语言开发也确实很吸引。
在对比各个MQ平台后,对可靠性和成熟程度、各种语言的支持库和各种MQ协议的支持。最终选择rabbitMQ。
Centos7 安装 rabbitmq
yum安装法
安装erlang
wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
yum localinstall erlang-19.0.4-1.el7.centos.x86_64.rpm
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.9/rabbitmq-server-3.6.9-1.el7.noarch.rpm
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc ###需要先导入key
yum install rabbitmq-server-3.6.9-1.noarch.rpm
启动rabbitmq
systemctl daemon-reload
systemctl enable rabbitmq-server
systemctl start rabbitmq-server.service
若启动失败检测端口是否被占用
4369 (epmd), 25672 (Erlang distribution)
5672, 5671 (AMQP 0-9-1 without and with TLS)
15672 (if management plugin is enabled)
61613, 61614 (if STOMP is enabled)
1883, 8883 (if MQTT is enabled)
参考:http://www.rabbitmq.com/install-rpm.html
RabbitMQ Server 配置
复制一份完整的示例配置,只要把其中想要配置的部分注释去掉并根据自己的情况进行更改就可以了
cp /usr/share/doc/rabbitmq-server-3.6.2/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
vim /etc/rabbitmq/rabbitmq.config
参考:
http://www.rabbitmq.com/configure.html
添加web管理插件
rabbitmq-plugins enable rabbitmq_management
配置主要在配置文件的rabbitmq_management部分进行添加,比如对web界面的端口和允许访问的IP进行限制
{rabbitmq_management,
[
{listener, [{port, 15671},
{ip, "127.0.0.1"}
]
}
]
},
添加STOMP适配器
rabbitmq-plugins enable rabbitmq_stomp
配置方面可以修改,地址限制和端口还有默认用户等等
{rabbitmq_stomp,
[
{tcp_listeners, [{"127.0.0.1", 61613}]
}
]
},
RabbitMQ 日志、用户、client
日志目录 /var/log/rabbitmq
默认virtual host与默认user
virtual host: /
user:guest
passwd:guest
默认用户只可在localhost上连至/且有全部权限
添加用户命令
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
STOMP v1.2
Stomp是一个简单的消息文本协议,它的设计核心理念就是简单与可用性,官方文档:
http://stomp.github.io/stomp-specification-1.2.html
rabbitmq安装了stomp适配器后支持该协议,下面先了解该协议的基本流程框架。先采用 centos 的netcat 进行调试
yum install nc
1.CONNECT 建立连接
当 Client 向 Server 发送一个CONNECT Frame,就向服务器发起了一个连接请求,此时服务器端返回一个CONNECTED Frame表示建立连接成功,还可以发起连接可以指定版本和登陆用户
CONNECT
accept-version:1.2
login:guest
passcode:guest
host:/
^@
#返回结果
^@
CONNECTED
server:RabbitMQ/3.6.2
session:session-lln_3mVKRWkDovu0CUQGxQ
heart-beat:0,0
version:1.2
2.SEND 消息传递
receipt 服务端收到返回receipt-id
persistent 是否持久化
SEND
destination:/topic/sync_mysql
receipt:luaid1
app-id:luaresty
delivery-mode:2
persistent:true
content-type:json/application
hello
^@
#返回结果
RECEIPT
receipt-id:luaid1
3.DISCONNECT 断开连接
receipt 服务端会返回同样的字符串进行确认
DISCONNECT
receipt:100
4.SUBSCRIBE 订阅
SUBSCRIBE
id:python
destination:/amq/queue/python_sync_mysql
ack:client
^@
5.ACK 确认
当ack:client时,客户端收到服务端信息之后必须回复ACK帧。如果在收到客户端回复的ACK之前连接断开,服务端会认为这个消息没有被处理而改发给其他客户端。客户端回复的ACK会被当做累加的处理。这意味着对信息的确认操作不仅仅是确认了这单个的消息,还确认了这个订阅之前发送的所有消息(即接收到一个确认消息就会把之前的消息一起确认掉,批量操作)。
如果客户端没有处理收到的消息,它应该回复一个NACK告诉服务端他没有处理该消息。
ACK
id:T_python@@session-RkbWBHO9WAlr_f1MoI58kw@@1
^@
6.UNSCUBSCRIBE 取消订阅
UNSUBSCRIBE
id:python
^@
lua-resty-rabbitmqstomp
环境部署好了,协议流程也了解了,可以使用lua的rabbitmq库来实验一下stomp协议。
https://github.com/wingify/lua-resty-rabbitmqstomp
分析其提供共的例子:
local cjson = require "cjson"
local rabbitmq = require "resty.rabbitmqstomp"
--------CONNECT--------
local opts = { username = "guest",
password = "guest",
vhost = "/" }
local mq, err = rabbitmq:new(opts) //输入用户名密码和虚拟主机名称创建一个对象
if not mq then
return
end
mq:set_timeout(10000)
local ok, err = mq:connect("127.0.0.1",61613) //建立连接
if not ok then
return
end
--------CONNECT END--------
--------SEND--------
local msg = {key="value1", key2="value2"}
local headers = {}
headers["destination"] = "/exchange/test/binding"
headers["receipt"] = "msg#1"
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"
local ok, err = mq:send(cjson.encode(msg), headers)
if not ok then
return
end
ngx.log(ngx.INFO, "Published: " .. msg)
--------SEND END--------
--------SUBSCRIBE--------
local headers = {}
headers["destination"] = "/amq/queue/queuename"
headers["persistent"] = "true"
headers["id"] = "123"
local ok, err = mq:subscribe(headers)
if not ok then
return
end
--------SUBSCRIBE END--------
--------RECEIVE--------
local data, err = mq:receive()
if not data then
return
end
ngx.log(ngx.INFO, "Consumed: " .. data)
--------RECEIVE END--------
--------UNSUBSCRIBE--------
local headers = {}
headers["persistent"] = "true"
headers["id"] = "123"
local ok, err = mq:unsubscribe(headers)
--------UNSUBSCRIBE END--------
--------DISCONNECT--------
local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
return
end
--------DISCONNECT END--------
初步做了下二次封装,这里只是做了send的方法,订阅的目前lua还没用到
local rabbitmq = require "resty.rabbitmqstomp"
local _M = {}
_M._VERSION = '0.01'
local mt = { __index = _M }
local msg_no = 1
function _M.new(self, opts)
opts = opts or {}
return setmetatable({
mq_host = opts.host or '127.0.0.1',
mq_port = opts.port or 61613,
mq_timeout = opts.timeout or 10000,
mq_user = opts.username or 'guest',
mq_password = opts.password or 'guest',
mq_vhost = opts.vhost or "/"}, mt)
end
function _M.get_connect(self)
local mq, err = rabbitmq:new({ username = self.mq_user,
password = self.mq_password,
vhost = self.mq_vhost })
if not mq then
return false,err
end
mq:set_timeout(self.mq_timeout)
local ok, err = mq:connect(self.mq_host,self.mq_port)
if not ok then
return false,err
end
return true,mq
end
function _M.send(self , destination, msg)
local ret, client = self:get_connect()
if not ret then
return false,client
end
local send_receipt_id = "msgid"..msg_no
local headers = {}
headers["destination"] = destination
headers["receipt"] = send_receipt_id
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"
msg_no = msg_no + 1
local ok, err = client:send(msg, headers)
if not ok then
return false,err
end
local _,str_start = string.find(ok, "receipt%-id", 1)
local str_end = string.find(ok, "\n\n", 1)
if str_start == nil or str_end == nil then
return false,"send receipt not receive"
end
local receipt_id = string.sub(ok, str_start + 2 ,str_end - 1)
if receipt_id ~= send_receipt_id then
return false,"receipt id not right"
end
local ok, err = client:set_keepalive(10000, 10000)
return true,send_receipt_id
end
return _M
Hello. http://jakshgy773733.us