项目中有个需求,需要使用Google的protobuf作为压缩协议,然后使用rabbitmq AMQP来发送和接收消息,在研究使用这两个工具中,遇到了有一些坑,之前有写了两篇来介绍分别使用,《 在 lua 中使用 protobuf》和《Lua OpenResty 使用rabbitmq AMQP协议发送和接收消息》 ,这里我们来结合使用一下,然后顺便解决一下lua的相关库的使用问题。
protobuf 相关步骤
1.安装 protoc
首先安装依赖库
1
| sudo apt-get install autoconf automake libtool curl make g++ unzip
|
下载、解压安装包
1 2
| curl -L -o protobuf-all-3.6.1.tar.gz https: tar -xzvf protobuf-all-3.6.1.tar.gz
|
进入安装包安装
1 2 3 4 5 6
| cd protobuf-all-3.6.1 ./configure make make check sudo make install sudo ldconfig
|
最后检查是否安装成功
1 2
| $ protoc --version libprotoc 3.6.1
|
2.生成pb文件
需要定义你自己的标准proto文件 xxx.proto
文件(可能不止一个文件,不止一个目录),定义好了之后,使用protoc
命令生成.pb
文件,命令如下:
1
| protoc --proto_path=proto --descriptor_set_out=common.pb proto/xxxx/*.proto proto/xxxxx/*.proto
|
其中 --proto_path
参数是你的proto文件目录,--descriptor_set_out
是你需要输出.pb
文件的目录,后面几个参数就是具体要引入的.proto
文件。这里需要注意的是,有时候会有多个proto文件并且多目录import的情况,这个时候,就需要在参数中都体现出来(命令的最后两个参数),这条命令是把所有的 .proto
文件生成了一个 common.pb
文件方便引入。注意这个common.pb
的路径,后面需要用到。
3.安装lua的protobuf库
这里我们使用的是lua-protobuf库。 lua-protobuf实际上是一个纯C的protobuf协议实现,和对应的Lua绑定。
项目地址:https://github.com/starwing/lua-protobuf
可以使用 luarocks 安装lua-protobuf
1
| luarocks install lua-protobuf
|
如果没有安装 luarocks 可以安装一下luarocks。
rabbitmq AMQP 相关步骤
1.安装第三方库
如果想要在openresty中使用AMQP协议发送和接收消息的话,需要使用到一个第三方库。地址为:https://github.com/4mig4/lua-amqp 。这里你会发现,这个库和《Lua OpenResty 使用rabbitmq AMQP协议发送和接收消息》文章中介绍使用的库不一样,确实是的,因为在使用过程中我发现,之前的那个库还有一些功能不完善,这里使用的这个库是作者fork了之前的那个项目,并改进了里面许多功能。所以最终我选用了这个库来开发。
安装方式为,使用luarocks安装,由于这个库没有push到https://luarocks.org/ 仓库当中,所以我们需要使用到其中的声明文件安装。把项目clone下来之后,找到这个文件 amqp-1.0-4.rockspec 这个文件,然后执行:
1
| luarocks build amqp-1.0-4.rockspec
|
这里直接执行,可能会报错,报错信息为unrecognized filename extension
,意思是识别不了文件格式之类的,这里需要把这个文件中的
1 2 3 4
| source = { url = "https://github.com/4mig4/lua-amqp.git", tag = "", }
|
修改为:
1 2 3 4
| source = { url = "https://github.com/4mig4/lua-amqp", tag = "", }
|
然后继续执行,执行完成之后,就可以在/usr/local/share/lua/5.1
目录中看到这个第三方库了,如果整理的文件夹名字不是amqp
,而是一个amqp加版本号,可以手动直接把文件夹修改为amqp
即可。到此第三方库安装完成。
在使用过程中,我发现一个问题,作者在fork了项目之后,增加了CQUEUES, NGX.SOCKET, SOCKET一些通讯协议功能,但是我本地环境直接运行会报错,查看了一些源码,发现是我的本地环境的cqueues
支持有点问题,而作者把这个作为最优先的协议,那么我就把源码文件https://github.com/4mig4/lua-amqp/blob/master/amqp/init.lua 中的local use_cqueues = true
改为local use_cqueues = false
,即关闭cqueues来使用了。
发送和接收消息
假设你的proto结构体是这样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| syntax = "proto3"; package aa.bb; message EventEnvelope { string id = 1; int64 created_ts = 2; string server_hash = 3; int64 happened_ts = 4; oneof body { aa.bb.LoggedOut logged_out = 1501; } message LoggedOut { int64 union_id = 1; string app_key = 2; }
|
发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| local function send_pb() local pb = require "pb" assert(pb.loadfile("xxxxx/common.pb")) local data = { created_ts = ngx.now() * 1000, server_hash = 'localhost', happened_ts = ngx.now() * 1000, id = 8376548368364, logged_out = { union_id = 123455666, app_key = xxxxxxxx } } local messages = assert(pb.encode("aa.bb.EventEnvelope", data)) local amqp = require "amqp" local ctx = amqp.new({ role = "publisher", exchange = "exchangexxxx", ssl = false, user = "guest", password = "guest", auto_delete = false, routing_key = "routing_keyxxxxx", passive = true, no_ack = true, no_wait = false, }) ctx:connect("127.0.0.1", port) ctx:setup() local ok, err = ctx:publish(messages) if not ok then ngx.log(ngx.ERR, "[ -- rabbitmq send failed : -- ] " .. err) else ngx.log(ngx.ERR, "[ -- rabbitmq send success ]") end end send_pb("this is a message")
|
接收消息
可以新建一个consume_queue.lua
文件,然后如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| -- 依旧是引入 local amqp = require "amqp" local pb = require "pb" -- 这里是回调函数 local function consume_local(f) print(f) -- 这里就是消息的所有信息,里面包含了properties ,body,frame等信息 print(f.body) -- 这里就是消息的主体 -- 加载pb文件 assert(pb.loadfile("xxxxx/common.pb")) -- 解析消息 local data = assert(pb.decode("aa.bb.EventEnvelope", f.body)) print(data) end local amqp = require "amqp" local ctx = amqp.new({ role = "consumer", queue = "eventbus1", -- 这里可以自定义 exchange = "exchangexxxx", ssl = false, user = "guest", password = "guest", no_wait = false, routing_key = "routing_keyxxxxx", auto_delete = false, -- 是否自动删除消息 no_ack = true, exclusive = false, -- 是否为排他队列 callback = consume_local, -- 回调函数 durable = true, passive = false, type = "topic" }) ctx:connect("127.0.0.1", port) local ok, err = ctx:consume()
|
执行命令开始消费消息
1
| /usr/local/openresty/bin/resty consume_queue.lua
|
本文标题:Lua OpenResty 使用 protobuf 和 rabbitmq AMQP 发送和接收消息
文章作者:qianyugang
发布时间:2019-06-26
最后更新:2020-04-21
原始链接:https://102no.com/2019/06/26/lua-openresty-rabbitmq-protobuf/
版权声明:本网站发表的全部原创内容(不仅限于文章、图片,包含文章评论),著作权均归其发表者所有,均采用 CC BY-NC-SA 4.0 CN 许可协议。转载请注明作者以及原文链接,商业授权请联系作者。