EMQ

EMQ

八月 13, 2019

一、EMQ安装

1.安装包文件

1
2
3
4
5
6
7
8
9
10
11
[emqtt@lp ~]$ unzip emqttd-centos7-v2.3.11.zip
[emqtt@lp ~]$ cd emqttd/
[emqtt@lp emqttd]$ ls
bin data erts-9.0 etc hook_lua lib log releases

# 其中在bin下有很多命令;etc有配置文件;

[emqtt@lp emqttd]$ ./bin/emqttd start # 启动命令
emqttd 2.3.11 is started successfully!
[emqtt@lp emqttd]$ ./bin/emqttd restart # 重启命令
ok

2.检查情况

控制台调试模式启动,检查 EMQ 是否可正常启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
[emqtt@lp ~]$ cd emqttd && ./bin/emqttd console
Exec: /home/emqtt/emqttd/erts-9.0/bin/erlexec -boot
/home/emqtt/emqttd/releases/2.3.11/emqttd -mode embedded
-boot_var ERTS_LIB_DIR /home/emqtt/emqttd/erts-
9.0/../lib -mnesia dir
"/home/emqtt/emqttd/data/mnesia/emq@127.0.0.1" -config
/home/emqtt/emqttd/data/configs/app.2018.08.06.00.30.12.
config -args_file
/home/emqtt/emqttd/data/configs/vm.2018.08.06.00.30.12.a
rgs -vm_args
/home/emqtt/emqttd/data/configs/vm.2018.08.06.00.30.12.a
rgs -- console
Root: /home/emqtt/emqttd
/home/emqtt/emqttd
Erlang/OTP 20 [erts-9.0][source] [64-bit][smp:2:2]
[ds:2:2:10][async-threads:32] [hipe][kernel-poll:true]

=INFO REPORT==== 6-Aug-2018::00:30:16 ===
alarm_handler: {set,{system_memory_high_watermark,
[]}}
starting emqttd on node 'emq@127.0.0.1'
emqttd ctl is starting...[ok]
emqttd hook is starting...[ok]
emqttd router is starting...[ok]
emqttd pubsub is starting...[ok]
emqttd stats is starting...[ok]
emqttd metrics is starting...[ok]
emqttd pooler is starting...[ok]
emqttd trace is starting...[ok]
emqttd client manager is starting...[ok]
emqttd session manager is starting...[ok]
emqttd session supervisor is starting...[ok]
emqttd wsclient supervisor is starting...[ok]
emqttd broker is starting...[ok]
emqttd alarm is starting...[ok]
emqttd mod supervisor is starting...[ok]
emqttd bridge supervisor is starting...[ok]
emqttd access control is starting...[ok]
emqttd system monitor is starting...[ok]
emqttd 2.3.11 is running now
Eshell V9.0 (abort with ^G)
(emq@127.0.0.1)1> Load emq_mod_presence module
successfully.
dashboard:http listen on 0.0.0.0:18083 with 4 acceptors.
mqtt:tcp listen on 127.0.0.1:11883 with 4 acceptors.
mqtt:tcp listen on 0.0.0.0:1883 with 64 acceptors.
mqtt:ws listen on 0.0.0.0:8083 with 4 acceptors.
mqtt:ssl listen on 0.0.0.0:8883 with 16 acceptors.
mqtt:wss listen on 0.0.0.0:8084 with 4 acceptors.
mqtt:api listen on 0.0.0.0:8080 with 4 acceptors.
BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded
(v)ersion (k)ill (D)b-tables (d)istribution
^C[os_mon] memory supervisor port (memsup): Erlang has
closed
[os_mon] cpu supervisor port (cpu_sup): Erlang has
closed

二、压力测试工具

1.下载安装benchmark

1
2
3
4
5
6
7
8
9
10
[emqtt@lp ~]$ unzip emqtt_benchmark-master.zip # 这个下载
会比较难
[emqtt@lp ~]$ cd emqtt_benchmark-master/
[emqtt@lp emqtt_benchmark-master]$ make
/usr/bin/env: escript: No such file or directory
make: *** [get-deps] Error 127

# 可以看到编译会出现问题,这样的问题是没有安装Erlang,因为这项服务

是用这个Erlang来写的,因此必须要编译这个语言环境

2.部署Erlang 环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 这个包的地址我已经收藏了,需要找到这个版本务必记住R16B03,然后下

载下来解压
[emqtt@lp ~]$ tar xvf otp_src_20.3.tar.gz

# 这里我是用root用户进行操作,因为是安装一个语言环境,所以需要全局

都起作用,那么就选择这样了
[root@lp otp_src_20.3]# ./configure --
prefix=/usr/local/erlang
[root@lp otp_src_20.3]# make && make install
[root@lp otp_src_20.3]# vim /etc/profile
export PATH=$PATH:/usr/local/erlang/bin
[root@lp otp_src_20.3]# source /etc/profile

# 测试安装版本以及完整性

[root@lp emqtt_benchmark-master]# erl
Erlang R16B03-1 (erts-5.10.4) [source][64-bit]
[smp:2:2][async-threads:10] [kernel-poll:false]
Eshell V5.10.4 (abort with ^G)

3.编译安装benchmark又遇问题(大坑必看)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
[root@lp emqtt_benchmark-master]# make
Uncaught error in rebar_core: {'EXIT',
{undef,
[{crypto,start,[],[]},
{rebar,run_aux,2,
[{file,"src/rebar.erl"},{line,165}]},
{rebar,main,1,
[{file,"src/rebar.erl"},{line,58}]},
{escript,run,2,

[{file,"escript.erl"},
{line,747}]},
{escript,start,1,
[{file,"escript.erl"},
{line,277}]},
{init,start_it,1,[]},
{init,start_em,1,[]}]}}
make: *** [get-deps] Error 1

# 真是崩溃,这个问题....说明erlang的版本太低,需要升级!!!

[emqtt@lp emqtt_benchmark-master]$ make
WARN: Expected /home/emqtt/emqtt_benchmarkmaster/
deps/emqttc to be an app dir (containing
ebin/*.app), but no .app found.
WARN: Expected /home/emqtt/emqtt_benchmarkmaster/
deps/getopt to be an app dir (containing
ebin/*.app), but no .app found.
==> emqtt_benchmark-master (get-deps)
WARN: Expected /home/emqtt/emqtt_benchmarkmaster/
deps/emqttc to be an app dir (containing
ebin/*.app), but no .app found.
WARN: Expected /home/emqtt/emqtt_benchmarkmaster/
deps/getopt to be an app dir (containing
ebin/*.app), but no .app found.
WARN: Expected /home/emqtt/emqtt_benchmarkmaster/
deps/emqttc to be an app dir (containing
ebin/*.app), but no .app found.
ERROR: Dependency dir /home/emqtt/emqtt_benchmarkmaster/
deps/emqttc failed application validation with
reason:
{missing_app_file,"/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc"}.
ERROR: 'get-deps' failed while processing
/home/emqtt/emqtt_benchmark-master: rebar_abort
make: *** [get-deps] Error 1

# 这个问题是由于make之后,按下crtl+C取消了,这样就导致deps当中有

文件emqttc,但是里面的东西却还没有下载完成,那么就会报这样的错误,
这种情况下就把deps文件下的东西清空,然后再次make

# 又遇到问题了,下载不动这些资源,

[emqtt@lp emqtt_benchmark-master]$ make
WARN: Expected /home/emqtt/emqtt_benchmarkmaster/
deps/getopt to be an app dir (containing
ebin/*.app), but no .app found.

WARN: Expected /home/emqtt/emqtt_benchmarkmaster/
deps/goldrush to be an app dir (containing
ebin/*.app), but no .app found.
==> lager (get-deps)
WARN: Expected /home/emqtt/emqtt_benchmarkmaster/
deps/goldrush to be an app dir (containing
ebin/*.app), but no .app found.
Pulling goldrush from
{git,"https://github.com/basho/goldrush.git",
{tag,"0.1.9"}}
Cloning into 'goldrush'...

# 比如卡在“goldrush”的下载上,但是有个曲线救国的方式就是自己把这

些依赖包下载下来,我最后算了一下,一共是有五个包,这里我通过浏览器的
方式下载,注意看当中
的“{git,"https://github.com/basho/goldrush.git",”,这个就
是web地址,https://github.com/basho/goldrush,然后进去吧zip
包下载下来,移动到deps当中,并且是创建好的goldrush目录
[emqtt@lp emqtt_benchmark-master]$ ls deps
emqttc gen_logger getopt goldrush lager

# 就是这5个包的文件夹,都要解压以后相应的把内容放在其中,例如:

[emqtt@lp ~]$ zip getopt-master.zip
[emqtt@lp ~]$ cp -rf getopt-master/* emqtt_benchmarkmaster/
deps/getopt/

# 后面的包都需要这么做。

4.编译benchmark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[emqtt@lp emqtt_benchmark-master]$ make
WARN: Expected /home/emqtt/emqtt_benchmarkmaster/
deps/getopt to be an app dir (containing
ebin/*.app), but no .app found.
==> goldrush (get-deps)
==> lager (get-deps)
==> gen_logger (get-deps)
==> emqttc (get-deps)
==> emqtt_benchmark-master (get-deps)
WARN: Expected /home/emqtt/emqtt_benchmarkmaster/
deps/getopt to be an app dir (containing
ebin/*.app), but no .app found.
Pulling getopt from
{git,"https://github.com/jcomellas/getopt.git",
{branch,"master"}}
Cloning into 'getopt'...
^Cmake: *** [get-deps] Interrupt
[emqtt@lp emqtt_benchmark-master]$ make
==> goldrush (get-deps)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
==> lager (get-deps)
==> gen_logger (get-deps)
==> emqttc (get-deps)
==> getopt (get-deps)
==> emqtt_benchmark-master (get-deps)
==> goldrush (compile)
Compiled src/gr_sup.erl
Compiled src/gr_param_sup.erl
Compiled src/gre.erl
Compiled src/gr_manager_sup.erl
Compiled src/gr_manager.erl
Compiled src/gr_counter_sup.erl
Compiled src/gr_context.erl
Compiled src/gr_param.erl
Compiled src/gr_app.erl
Compiled src/glc_run.erl
Compiled src/gr_counter.erl
Compiled src/glc_ops.erl
Compiled src/glc_lib.erl
Compiled src/glc.erl
Compiled src/glc_code.erl
==> lager (compile)
Compiled src/lager_util.erl
Compiled src/lager_transform.erl
Compiled src/lager_sup.erl
Compiled src/lager_msg.erl
Compiled src/lager_manager_killer.erl
Compiled src/lager_handler_watcher_sup.erl
Compiled src/lager_handler_watcher.erl
Compiled src/lager_trunc_io.erl
Compiled src/lager_stdlib.erl
Compiled src/lager_format.erl
Compiled src/lager_default_formatter.erl
Compiled src/lager_crash_log.erl
Compiled src/lager_file_backend.erl
Compiled src/lager_config.erl
Compiled src/lager_console_backend.erl
Compiled src/lager_common_test_backend.erl
Compiled src/lager_backend_throttle.erl
Compiled src/lager_app.erl
Compiled src/error_logger_lager_h.erl
Compiled src/lager.erl
==> gen_logger (compile)
Compiled src/gen_logger.erl
Compiled src/console_logger.erl
Compiled src/error_logger_logger.erl
Compiled src/lager_logger.erl
==> emqttc (compile)

Compiled src/emqttc_topic.erl
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc_socket.erl:204: Warning:
gen_fsm:send_all_state_event/2 is deprecated and will be
removed in a future release; use gen_statem:cast/2
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc_socket.erl:215: Warning:
gen_fsm:send_all_state_event/2 is deprecated and will be
removed in a future release; use gen_statem:cast/2
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc_socket.erl:233: Warning:
gen_fsm:send_event/2 is deprecated and will be removed
in a future release; use gen_statem:cast/2
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc_socket.erl:242: Warning:
gen_fsm:send_all_state_event/2 is deprecated and will be
removed in a future release; use gen_statem:cast/2
Compiled src/emqttc_socket.erl
Compiled src/emqttc_serialiser.erl
Compiled src/emqttc_reconnector.erl
Compiled src/emqttc_packet.erl
Compiled src/emqttc_parser.erl
Compiled src/emqttc_opts.erl
Compiled src/emqttc_protocol.erl
Compiled src/emqttc_keepalive.erl
Compiled src/emqttc_message.erl
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:170: Warning:
gen_fsm:start_link/3 is deprecated and will be removed
in a future release; use gen_statem:start_link/3
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:193: Warning:
gen_fsm:start_link/4 is deprecated and will be removed
in a future release; use gen_statem:start_link/4
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:201: Warning:
gen_fsm:sync_send_all_state_event/2 is deprecated and
will be removed in a future release; use
gen_statem:call/2
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:254: Warning:
gen_fsm:send_event/2 is deprecated and will be removed
in a future release; use gen_statem:cast/2
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:262: Warning:
gen_fsm:sync_send_event/2 is deprecated and will be
removed in a future release; use gen_statem:call/2

/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:335: Warning:
gen_fsm:send_event/2 is deprecated and will be removed
in a future release; use gen_statem:cast/2
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:338: Warning:
gen_fsm:sync_send_event/3 is deprecated and will be
removed in a future release; use gen_statem:call/3
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:350: Warning:
gen_fsm:send_event/2 is deprecated and will be removed
in a future release; use gen_statem:cast/2
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:358: Warning:
gen_fsm:sync_send_event/3 is deprecated and will be
removed in a future release; use gen_statem:call/3
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:366: Warning:
gen_fsm:send_event/2 is deprecated and will be removed
in a future release; use gen_statem:cast/2
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:488: Warning:
gen_fsm:cancel_timer/1 is deprecated and will be removed
in a future release; use erlang:cancel_timer/1
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:505: Warning:
gen_fsm:send_event/2 is deprecated and will be removed
in a future release; use gen_statem:cast/2
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:658: Warning: variable
'Logger' is unused
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:658: Warning: variable
'Name' is unused
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:770: Warning:
gen_fsm:cancel_timer/1 is deprecated and will be removed
in a future release; use erlang:cancel_timer/1
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:962: Warning:
gen_fsm:start_timer/2 is deprecated and will be removed
in a future release; use erlang:start_timer/3
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:1052: Warning:
gen_fsm:reply/2 is deprecated and will be removed in a
future release; use gen_statem:reply/2

/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:1065: Warning:
gen_fsm:reply/2 is deprecated and will be removed in a
future release; use gen_statem:reply/2
/home/emqtt/emqtt_benchmarkmaster/
deps/emqttc/src/emqttc.erl:1076: Warning:
gen_fsm:reply/2 is deprecated and will be removed in a
future release; use gen_statem:reply/2
Compiled src/emqttc.erl
==> getopt (compile)
Compiled src/getopt.erl
==> emqtt_benchmark-master (compile)
src/emqtt_benchmark.erl:95: Warning: random:seed/1: the
'random' module is deprecated; use the 'rand' module
instead
src/emqtt_benchmark.erl:196: Warning: random:uniform/1:
the 'random' module is deprecated; use the 'rand' module
instead
Compiled src/emqtt_benchmark.erl
==> emqtt_benchmark-master (xref)

# 注意前面的提示:

==> goldrush (get-deps)
==> lager (get-deps)
==> gen_logger (get-deps)
==> emqttc (get-deps)
==> emqtt_benchmark-master (get-deps)

# 说明5个包都加载好了

5.测试

为了保证测试的可行性,需要更改内核参数配置以及emq自身的配置文件

①更改内核参数

系统全局允许分配的最大文件句柄数:

1
2
3
4
5
[root@lp emqtt]# vim /etc/sysctl.conf
fs.file-max = 2097152 # 系统所有进程可打开的文件数量
fs.nr_open=2097152
net.core.somaxconn=65536 # backlog - Socket 监听队列长度
[root@lp emqtt]# sysctl -p

设置服务最大文件句柄数:

一般只有用到systemctl去启动的时候,这个才会生效

1
2
[root@lp emqtt]# vim /etc/systemd/system.conf
DefaultLimitNOFILE=1048576

持久化设置允许用户/进程打开文件句柄数:

    • 1
      2
      3
      [root@lp emqtt]# vim /etc/security/limits.conf

      - - nofile 1048576

TCP 协议栈网络参数

1
2
3
4
5
6
7
[root@lp emqtt]# vim /etc/sysctl.conf
sysctl -w net.core.somaxconn=32768 # 监听队列的最大长度
sysctl -w net.ipv4.tcp_max_syn_backlog=16384 # Tcp syn队
列的最大长度 ,但是有可能会造成syn攻击
sysctl -w net.core.netdev_max_backlog=16384 # 每个网络接口
接收数据包的速率比内核处理这些包的速率快时,允许送到队列的数据包的最
大数目

可用知名端口范围:

1
2
[root@lp emqtt]# vim /etc/sysctl.conf
net.ipv4.ip_local_port_range='1000 65535'

TCP Socket 读写 Buffer 设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
[root@lp emqtt]# vim /etc/sysctl.conf
net.core.rmem_default=262144 # 接收套接字缓冲区大小的默认值
net.core.wmem_default=262144 # 发送套接字缓冲区大小的默认值
net.core.rmem_max=16777216 # 接收套接字缓冲区大小的最大值
net.core.wmem_max=16777216 # 发送套接字缓冲区大小的最大值
net.core.optmem_max=16777216 # socket buffer的最大初始化
值,默认10K
#sysctl -w net.ipv4.tcp_mem='16777216 16777216 16777216'
net.ipv4.tcp_rmem='1024 4096 16777216'
net.ipv4.tcp_wmem='1024 4096 16777216'

# 为自动调优定义每个 socket 使用的内存。

# 第一个值是为 socket 的发送缓冲区分配的最少字节数。

# 第二个值是默认值(该值会被 wmem_default 覆盖),缓冲区在系统负

载不重的情况下可以增长到这个值。

# 第三个值是发送缓冲区空间的最大字节数(该值会被 wmem_max 覆

盖)。

TCP 连接追踪设置: (和firewall的开启有关系)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
[root@lp emqtt]# vim /etc/sysctl.conf
net.nf_conntrack_max=1000000
net.netfilter.nf_conntrack_max=1000000
net.netfilter.nf_conntrack_tcp_timeout_time_wait=30

# nf_conntrack用1个哈希表记录已建立的连接,包括其他机器到本机、本

机到其他机器、本机到本机(例如 ping 127.0.0.1 也会被跟踪)。

# 如果连接进来比释放的快,把哈希表塞满了,新连接的数据包会被丢掉,

此时netfilter变成了一个黑洞,导致拒绝服务。 这发生在3层(网络
层),应用程序毫无办法。

# 默认值参考以下公式:(使用内存的 1/16384)

# CONNTRACK_MAX = RAMSIZE (in bytes) / 16384 / (ARCH /

32)
#(ARCH为你机器CPU的架构,64或32)

# HASHSIZE = CONNTRACK_MAX / 4

推荐bucket至少 262144,max至少 1048576,不够再继续加
调优的基本思路是先看 /proc/net/nf_conntrack ,哪种协议哪种状态
的连接最多,改小对应的超时参数
注意要充分测试,确保不影响业务。

# 一篇博客关于这方面的调优推荐

net.netfilter.nf_conntrack_max=1048576
net.nf_conntrack_max=1048576
net.netfilter.nf_conntrack_tcp_timeout_fin_wait=30
net.netfilter.nf_conntrack_tcp_timeout_time_wait=30
net.netfilter.nf_conntrack_tcp_timeout_close_wait=15
net.netfilter.nf_conntrack_tcp_timeout_established=300

TIME-WAIT Socket 最大数量、回收与重用设置:

1
2
3
4
5
6
7
8
[root@lp emqtt]# vim /etc/sysctl.conf
net.ipv4.tcp_max_tw_buckets=1048576

# 注意: 不建议开启該设置,NAT模式下可能引起连接RST

# net.ipv4.tcp_tw_recycle = 1

# net.ipv4.tcp_tw_reuse = 1

FIN-WAIT-2 Socket 超时设置:

1
2
[root@lp emqtt]# vim /etc/sysctl.conf
net.ipv4.tcp_fin_timeout = 15

②虚拟机Erlang 参数

1
2
3
[emqtt@lp ~]$ vim emqttd/etc/emq.conf
node.process_limit = 2097152
node.max_ports = 1048576

③EMQ 消息服务器参数

1
2
3
4
[emqtt@lp ~]$ vim emqttd/etc/emq.conf
listener.tcp.external = 0.0.0.0:1883
listener.tcp.external.acceptors = 64
listener.tcp.external.max_clients = 1000000

Sub Benchmark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[emqtt@lp emqtt_benchmark-master]$ ./emqtt_bench_sub --
help
Usage: emqtt_bench_sub [--help <help>][-h []] [-p
[<port>]][-c []][-i []][-t ]
[-q [<qos>]][-u ] [-P <password>][-k []][-C []][--ifaddr ]
--help help information
-h, --host mqtt server hostname or IP address
[default: localhost]
-p, --port mqtt server port number [default:
1883]
-c, --count max count of clients [default: 200]
-n, --startnumber start number [default: 0]
-i, --interval interval of connecting to the
broker [default: 10]
-t, --topic topic subscribe, support %u, %c, %i
variables
-q, --qos subscribe qos [default: 0]
-u, --username username for connecting to server
-P, --password password for connecting to server
-k, --keepalive keep alive in seconds [default:
300]
-C, --clean clean session [default: true]
--ifaddr local ipaddress or interface
address

Pub Benchmark

1
$ ./emqtt_bench_pub --help
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Usage: emqtt_bench_pub [--help <help>][-h []] [-p
[<port>]][-c []] [-i [<interval>]][-I []] [-u
<username>][-P ] [-t <topic>][-s []]
[-q [<qos>]][-r []] [-k
[<keepalive>]][-C []] [--ifaddr
<ifaddr>]
--help help information
-h, --host mqtt server hostname or IP
address [default:
localhost]
-p, --port mqtt server port number
[default: 1883]
-c, --count max count of clients [default:
200]
-n, --startnumber start number [default: 0]
-i, --interval interval of connecting to the
broker [default: 10]
-I, --interval_of_msg interval of publishing
message(ms) [default: 1000]
-u, --username username for connecting to
server
-P, --password password for connecting to
server
-t, --topic topic subscribe, support %u,
%c, %i variables
-s, --size payload size [default: 256]
-q, --qos subscribe qos [default: 0]
-r, --retain retain message [default: false]
-k, --keepalive keep alive in seconds [default:
300]
-C, --clean clean session [default: true]
--ifaddr local ipaddress or interface
address
1
2
3
4
[emqtt@lp emqtt_benchmark-master]$ ./emqtt_bench_pub -c
100 -I 10 -t bench/%i -s 256
21:08:18.085 [info] Application lager started on node
nonode@nohost
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
21:08:18.087 [info] Application emqttc started on node
nonode@nohost
21:08:18.088 [info] Application emqtt_benchmark started
on node nonode@nohost
conneted: 1
conneted: 2
conneted: 3
conneted: 4
conneted: 5
conneted: 6
conneted: 7
conneted: 8
conneted: 9
conneted: 10
conneted: 11
conneted: 12
conneted: 13
conneted: 14
conneted: 15
conneted: 16
conneted: 17
conneted: 18
conneted: 19
conneted: 20
conneted: 21
conneted: 22
conneted: 23
conneted: 24
conneted: 25
conneted: 26
conneted: 27
conneted: 28
conneted: 29
conneted: 30
conneted: 31
conneted: 32
conneted: 33
conneted: 34
conneted: 35
conneted: 36
conneted: 37
conneted: 38
conneted: 39
conneted: 40
conneted: 41
conneted: 42
conneted: 43
conneted: 44

conneted: 45
conneted: 46
conneted: 47
conneted: 48
conneted: 49
conneted: 50
conneted: 51
conneted: 52
conneted: 53
conneted: 54
conneted: 55
conneted: 56
conneted: 57
conneted: 58
conneted: 59
conneted: 60
conneted: 61
conneted: 62
conneted: 63
conneted: 64
conneted: 65
conneted: 66
conneted: 67
conneted: 68
conneted: 69
conneted: 70
conneted: 71
conneted: 72
conneted: 73
conneted: 74
conneted: 75
conneted: 76
conneted: 77
conneted: 78
conneted: 79
conneted: 80
conneted: 81
sent(1001): total=3899, rate=3899(msg/sec)
conneted: 82
conneted: 83
conneted: 84
conneted: 85
conneted: 86
conneted: 87
conneted: 88
conneted: 89
conneted: 90
conneted: 91

conneted: 92
conneted: 93
conneted: 94
conneted: 95
conneted: 96
conneted: 97
conneted: 98
conneted: 99
conneted: 100
sent(2001): total=13561, rate=9662(msg/sec)
sent(3001): total=23564, rate=10003(msg/sec)
sent(4001): total=33570, rate=10006(msg/sec)
sent(5001): total=43578, rate=10008(msg/sec)
sent(6002): total=53576, rate=9998(msg/sec)
sent(7002): total=63571, rate=9995(msg/sec)
sent(8002): total=73587, rate=10016(msg/sec)
sent(9006): total=83565, rate=9978(msg/sec)
sent(10002): total=93565, rate=10000(msg/sec)
sent(11001): total=103515, rate=9950(msg/sec)
sent(12001): total=113556, rate=10041(msg/sec)
sent(13004): total=123590, rate=10034(msg/sec)
sent(14003): total=133580, rate=9990(msg/sec)
sent(15000): total=143531, rate=9951(msg/sec)
sent(16006): total=153589, rate=10058(msg/sec)
sent(17004): total=163585, rate=9996(msg/sec)
sent(18002): total=173555, rate=9970(msg/sec)
sent(19003): total=183530, rate=9975(msg/sec)
sent(20003): total=193570, rate=10040(msg/sec)
sent(21003): total=203556, rate=9986(msg/sec)
sent(22004): total=213552, rate=9996(msg/sec)
sent(23001): total=223534, rate=9982(msg/sec)
sent(24002): total=233579, rate=10045(msg/sec)
sent(25004): total=243607, rate=10028(msg/sec)
sent(26002): total=253571, rate=9964(msg/sec)
sent(27002): total=263535, rate=9964(msg/sec)
sent(28003): total=273574, rate=10039(msg/sec)
sent(29004): total=283575, rate=10001(msg/sec)
sent(30004): total=293558, rate=9983(msg/sec)
sent(31000): total=303537, rate=9979(msg/sec)
sent(32004): total=313589, rate=10052(msg/sec)
sent(33004): total=323592, rate=10003(msg/sec)
sent(34004): total=333585, rate=9993(msg/sec)
sent(35004): total=343563, rate=9978(msg/sec)
sent(36002): total=353553, rate=9990(msg/sec)
sent(37008): total=363561, rate=10008(msg/sec)
sent(38004): total=373574, rate=10013(msg/sec)
sent(39004): total=383552, rate=9978(msg/sec)
sent(40005): total=393573, rate=10021(msg/sec)

sent(41005): total=403559, rate=9986(msg/sec)
sent(42015): total=413605, rate=10046(msg/sec)
sent(43003): total=423574, rate=9969(msg/sec)
sent(44002): total=433552, rate=9978(msg/sec)
sent(45005): total=443570, rate=10018(msg/sec)
sent(46000): total=453536, rate=9966(msg/sec)
sent(47001): total=463565, rate=10029(msg/sec)
sent(48004): total=473590, rate=10025(msg/sec)
sent(49012): total=483560, rate=9970(msg/sec)
sent(50002): total=493528, rate=9968(msg/sec)
sent(51004): total=503592, rate=10064(msg/sec)
sent(52003): total=513577, rate=9985(msg/sec)
sent(53002): total=523538, rate=9961(msg/sec)
sent(54008): total=533602, rate=10064(msg/sec)
sent(55004): total=543539, rate=9937(msg/sec)
sent(56004): total=553545, rate=10006(msg/sec)
sent(57002): total=563556, rate=10011(msg/sec)
sent(58004): total=573528, rate=9972(msg/sec)
sent(59006): total=583608, rate=10080(msg/sec)
sent(60004): total=593552, rate=9944(msg/sec)
sent(61002): total=603495, rate=9943(msg/sec)
sent(62003): total=613581, rate=10086(msg/sec)
sent(63005): total=623551, rate=9970(msg/sec)
sent(64007): total=633556, rate=10005(msg/sec)
sent(65006): total=643588, rate=10032(msg/sec)
sent(66010): total=653559, rate=9971(msg/sec)
sent(67006): total=663559, rate=10000(msg/sec)
sent(68005): total=673559, rate=10000(msg/sec)
sent(69006): total=683589, rate=10030(msg/sec)
sent(70018): total=693634, rate=10045(msg/sec)
sent(71008): total=703568, rate=9934(msg/sec)
sent(72016): total=713592, rate=10024(msg/sec)
sent(73006): total=723602, rate=10010(msg/sec)
sent(74004): total=733581, rate=9979(msg/sec)
sent(75015): total=743593, rate=10012(msg/sec)
sent(76006): total=753533, rate=9940(msg/sec)
sent(77016): total=763560, rate=10027(msg/sec)
sent(78007): total=773563, rate=10003(msg/sec)
sent(79007): total=783541, rate=9978(msg/sec)
sent(80014): total=793549, rate=10008(msg/sec)
sent(81018): total=803514, rate=9965(msg/sec)
sent(82006): total=813533, rate=10019(msg/sec)
sent(83013): total=823551, rate=10018(msg/sec)
sent(84021): total=833455, rate=9904(msg/sec)
sent(85007): total=843510, rate=10055(msg/sec)
sent(86018): total=853485, rate=9975(msg/sec)
sent(87016): total=863449, rate=9964(msg/sec)
sent(88009): total=873440, rate=9991(msg/sec)

sent(89015): total=883132, rate=9692(msg/sec)
sent(90035): total=892878, rate=9746(msg/sec)
sent(91161): total=902805, rate=9927(msg/sec)
sent(92453): total=912682, rate=9877(msg/sec)
sent(93755): total=922701, rate=10019(msg/sec)
sent(95034): total=932882, rate=10181(msg/sec)
sent(96240): total=942679, rate=9797(msg/sec)
sent(97483): total=952734, rate=10055(msg/sec)
sent(98772): total=962904, rate=10170(msg/sec)
sent(100124): total=972869, rate=9965(msg/sec)
sent(101529): total=982883, rate=10014(msg/sec)
sent(102938): total=992867, rate=9984(msg/sec)
sent(104714): total=1002897, rate=10030(msg/sec)
sent(106113): total=1012913, rate=10016(msg/sec)
sent(107381): total=1022744, rate=9831(msg/sec)
sent(108738): total=1032766, rate=10022(msg/sec)
sent(110016): total=1042760, rate=9994(msg/sec)
sent(111547): total=1052673, rate=9913(msg/sec)
sent(112924): total=1062830, rate=10157(msg/sec)

三、EMQ概念

1.部署架构

典型部署结构:

QQ截图20000.png

LB (负载均衡器) 负责分发设备的 MQTT 连接与消息到 EMQ 集群,LB 提高
EMQ 集群可用性、实现负载平衡以及动态扩容。
部署架构推荐在 LB 终结 SSL 连接。设备与 LB 之间 TLS 安全连接,LB 与
EMQ 之间普通 TCP 连接。这种部署模式下 EMQ 单集群可轻松支持100万设
备。
国内公有云部署推荐青云(EMQ 合作伙伴),国外部署推荐 AWS 。私有部署推
荐使用 HAProxy 作为 LB。
EMQ 默认开启的 MQTT 服务 TCP 端口:

1883 MQTT 协议端口
8883 MQTT/SSL 端口
8083 MQTT/WebSocket 端口
8084 MQTT/WebSocket/SSL 端口

防火墙根据使用的 MQTT 接入方式,开启上述端口的访问权限。

EMQ 节点集群使用的 TCP 端口:

4369 集群节点发现端口
6369 集群节点控制通道

集群节点间如有防护墙,需开启上述 TCP 端口互访权限。

  1. 创建 VPC 网络。
  2. VPC 网络内创建 EMQ 集群’私有网络’,例如: 192.168.0.0/24
  3. 私有网络内创建两台 EMQ 主机,例如:
EMQ2 192.168.0.2
emq2 192.168.0.3
  1. 安装并集群 EMQ 主机,具体配置请参考安装集群章节。

  2. 创建 LB(负载均衡器) 并指定公网 IP 地址。

  3. 在 LB 上创建 MQTT TCP 监听器:

    QQ图片20001.png

或创建 SSL 监听器,并终结 SSL 在 LB :

  1. MQTT 客户端连接 LB 公网地址测试。

    QQ图片20002.png

HAProxy -> EMQ 集群

HAProxy 作为 LB 部署 EMQ 集群,并终结 SSL 连接:

  1. 创建 EMQ 集群节点,例如:
节点 IP 地址
emq1 192.168.0.2
emq2 192.168.0.3
  1. 配置 /etc/haproxy/haproxy.cfg,示例:
    2.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
listen mqtt-ssl
bind *:8883 ssl crt /etc/ssl/emqttd/emq.pem
no-sslv3
mode tcp
maxconn 50000
timeout client 600s
default_backend emq_cluster
backend emq_cluster
mode tcp
balance source
timeout server 50s
timeout check 5000
server emq1 192.168.0.2:1883 check inter
10000 fall 2 rise 5 weight 1
server emq2 192.168.0.3:1883 check inter
10000 fall 2 rise 5 weight 1
source 0.0.0.0 usesrc clientip

2.配置文件详解

EMQ 2.0 消息服务器通过 etc/ 目录下配置文件进行设置,主要配置文件包括:

配置文件 说明
etc/emq.conf EMQ 2.0 消息服务器配置文件
etc/acl.conf EMQ 2.0 默认ACL规则配置文件
etc/plugins/*.conf EMQ 2.0 各类插件配置文件
①EMQ 集群设置

集群名称

1
2
3
## Cluster name

cluster.name = emqcl

自动发现策略

1
2
3
4
## Cluster dis-covery strategy: manual | static | mcast |

dns | etcd | k8s
cluster.dis-covery = manual

启用集群自愈

1
2
3
## Cluster Autoheal: on | off

cluster.autoheal = on

节点自动清除

自动清除宕机节点:

1
2
3
## Clean down node of the cluster

cluster.autoclean = 5m

EMQ 集群自动发现

EMQ R2.3 版本支持多种策略的节点自动发现与集群:

策略 说明
manual 手工命令创建集群
static 静态节点列表自动集群
mcast UDP 组播方式自动集群
dns DNS A 记录自动集群
etcd 通过 etcd 自动集群
k8s Kubernetes 服务自动集群

manual 手动创建集群

默认配置为手动创建集群,节点通过 ./bin/emqttd_ctl join 命令加入:

1
cluster.dis-covery = manual

基于 static 节点列表自动集群

配置固定的节点列表,自动发现并创建集群:

1
2
3
4
5
6
7
cluster.dis-covery = static

## ##------------------------------------------------------

## Cluster with static node list

cluster.static.seeds = emq1@127.0.0.1,ekka2@127.0.0.1

基于 mcast 组播自动集群

基于 UDP 组播自动发现并创建集群:

1
2
3
4
5
6
7
8
9
10
11
cluster.dis-covery = mcast

## ##------------------------------------------------------

## Cluster with multicast

cluster.mcast.addr = 239.192.0.1
cluster.mcast.ports = 4369,4370
cluster.mcast.iface = 0.0.0.0
cluster.mcast.ttl = 255
cluster.mcast.loop = on

基于 DNS A 记录自动集群

基于 DNS A 记录自动发现并创建集群:

1
2
3
4
5
6
7
8
cluster.dis-covery = dns

## ##------------------------------------------------------

## Cluster with DNS

cluster.dns.name = localhost
cluster.dns.app = ekka

基于 etcd 自动集群

基于 etcd _ 自动发现并创建集群:

1
2
3
4
5
6
7
8
9
cluster.dis-covery = etcd

## ##------------------------------------------------------

## Cluster with Etcd

cluster.etcd.server = http://127.0.0.1:2379
cluster.etcd.prefix = emqcl
cluster.etcd.node_ttl = 1m

基于 Kubernetes 自动集群

Kubernetes _ 下自动发现并创建集群:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
cluster.dis-covery = k8s

## ##------------------------------------------------------

## Cluster with k8s

cluster.k8s.apiserver = http://10.110.111.204:8080
cluster.k8s.service_name = ekka

## Address Type: ip | dns

cluster.k8s.address_type = ip

## The Erlang application name

cluster.k8s.app_name = ekka

EMQ 节点与 Cookie

Erlang 节点名称、分布式节点间通信 Cookie:

1
2
3
4
5
6
7
## Node name

node.name = emqttd@127.0.0.1

## Cookie for distributed node

node.cookie = emq_dist_cookie

注解

Erlang/OTP 平台应用多由分布的 Erlang 节点(进程)组成,每个 Erlang 节点
(进程)需指配一个节点名,用于节点间通信互访。 所有互相通信的 Erlang 节
点(进程)间通过一个共用的 Cookie 进行安全认证。
EMQ 节点连接方式
EMQ 节点基于 Erlang/OTP 平台的 TCPv4, TCPv6 或 TLS 协议连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
## Specify the erlang distributed protocol.

##

## Value: Enum

## - inet_tcp: the default; handles TCP streams with

IPv4 addressing.

## - inet6_tcp: handles TCP with IPv6 addressing.

## - inet_tls: using TLS for Erlang Distribution.

##

## vm.args: -proto_dist inet_tcp

node.proto_dist = inet_tcp

## Specify SSL Options in the file if using SSL for

Erlang Distribution.
##

## Value: File

##

## vm.args: -ssl_dist_optfile <File>

## node.ssl_dist_optfile = {{ platform_etc_dir

}}/ssl_dist.conf
②Erlang 虚拟机参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
## SMP support: enable, auto, disable

node.smp = auto

## Enable kernel poll

node.kernel_poll = on

## async thread pool

node.async_threads = 32

## Erlang Process Limit

node.process_limit = 256000

## Sets the maximum number of simultaneously existing

ports for this system
node.max_ports = 65536
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
## Set the distribution buffer busy limit

(dist_buf_busy_limit)
node.dist_buffer_size = 32MB

## Max ETS Tables.

## Note that mnesia and SSL will create temporary ets

tables.
node.max_ets_tables = 256000

## Tweak GC to run more often

node.fullsweep_after = 1000

## Crash dump

node.crash_dump = log/crash.dump

## Distributed node ticktime

node.dist_net_ticktime = 60

## Distributed node port range

## node.dist_listen_min = 6000

## node.dist_listen_max = 6999

Erlang 虚拟机主要参数说明:

| NODE.PROCESS_LIMIT | ERLANG 虚拟机允许的最大进程数,一个 MQTT

连接会消耗2个 ERLANG 进程,所以参数值 > 最

大连接数 * 2 |
| ——————– | ———————————————————— |
| node.max_ports | Erlang 虚拟机允许的最大 Port 数量,一个 MQTT

连接消耗1个 Port,所以参数值 > 最大连接数 |
| node.dist_listen_min | Erlang 分布节点间通信使用 TCP 连接端口范围。

注: 节点间如有防火墙,需要配置该端口段 |
| node.dist_listen_max | Erlang 分布节点间通信使用 TCP 连接端口范围。

注: 节点间如有防火墙,需要配置该端口段 |

③日志参数配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
##### console 日志

## Console log. Enum: off, file, console, both

log.console = console

## Console log level. Enum: debug, info, notice,

warning, error, critical, alert, emergency
log.console.level = error

## Console log file

## log.console.file = log/console.log

error 日志

1
2
3
## Error log file

log.error.file = log/error.log

crash 日志

1
2
3
4
## Enable the crash log. Enum: on, off

log.crash = on
log.crash.file = log/crash.log

syslog 日志

1
2
3
4
5
6
7
8
## Syslog. Enum: on, off

log.syslog = on

## syslog level. Enum: debug, info, notice, warning,

error, critical, alert, emergency
log.syslog.level = error
④MQTT 协议参数配置

ClientId 最大允许长度

1
2
3
## Max ClientId Length Allowed.

mqtt.max_clientid_len = 1024

MQTT 最大报文尺寸

1
2
3
## Max Packet Size Allowed, 64K by default.

mqtt.max_packet_size = 64KB

客户端连接闲置时间

设置 MQTT 客户端最大允许闲置时间(Socket 连接建立,但未收到 CONNECT
报文):

1
2
3
## Client Idle Timeout (Second)

mqtt.client.idle_timeout = 30

启用客户端连接统计

1
2
## Enable client Stats: on | off
mqtt.client.enable_stats = off

强制 GC 设置

1
2
3
## Force GC: integer. Value 0 disabled the Force GC.

mqtt.conn.force_gc_count = 100
⑤匿名认证与 ACL 文件

是否开启匿名认证
默认开启,允许任意客户端登录:

1
2
3
## Allow Anonymous authentication

mqtt.allow_anonymous = true

默认访问控制(ACL)文件
EMQ 支持基于 etc/acl.conf 文件或 MySQL、 PostgreSQL 等插件的访问控制规
则。

1
2
3
4
5
6
7
## ACL nomatch

mqtt.acl_nomatch = allow

## Default ACL File

mqtt.acl_file = etc/acl.conf

etc/acl.conf 访问控制规则定义:
允许|拒绝 用户|IP地址|ClientID 发布|订阅 主题列表
访问控制规则采用 Erlang 元组格式,访问控制模块逐条匹配规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21


------

------

Client -> | Rule1 | --nomatch--> | Rule2 | --nomatch-->
| Rule3 | --> Default

------

------

| |
|
match match
match
\|/ \|/
\|/
allow | deny allow | deny
allow | deny
1
2
3
4
5
6
7
8
9
10
etc/acl.conf 默认访问规则设置:

%% 允许'dashboard'用户订阅 '$SYS/#'
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
%% 允许本机用户发布订阅全部主题
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
%% 拒绝用户订阅'$SYS#'与'#'主题
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
%% 上述规则无匹配,允许
{allow, all}.

注解
默认规则只允许本机用户订阅’$SYS/#’与’#’
EMQ 消息服务器接收到 MQTT 客户端发布(PUBLISH)或订阅(SUBSCRIBE)请
求时,会逐条匹配 ACL 访问控制规则,直到匹配成功返回 allow 或 deny。

⑥MQTT 会话参数设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
## Upgrade QoS?

mqtt.session.upgrade_qos = off

## Max number of QoS 1 and 2 messages that can be

“inflight” at one time.

## 0 means no limit

mqtt.session.max_inflight = 32

## Retry Interval for redelivering QoS1/2 messages.

mqtt.session.retry_interval = 20s

## Max Packets that Awaiting PUBREL, 0 means no limit

mqtt.session.max_awaiting_rel = 100

## Awaiting PUBREL Timeout

mqtt.session.await_rel_timeout = 20s

## Enable Statistics: on | off

mqtt.session.enable_stats = off

## Expired after 1 day:

## w - week

## d - day

## h - hour

## m - minute

## s - second

mqtt.session.expiry_interval = 2h
⑦MQTT 消息队列参数设置

EMQ 消息服务器会话通过队列缓存 Qos1/Qos2 消息:

  1. 持久会话(Session)的离线消息

  2. 飞行窗口满而延迟下发的消息
    队列参数设置:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    ## Type: simple | priority

    mqtt.mqueue.type = simple

    ## Topic Priority: 0~255, Default is 0

    ## mqtt.mqueue.priority = topic/1=10,topic/2=8

    ## Max queue length. Enqueued messages when persistent

    client disconnected,

    ## or inflight window is full. 0 means no limit.

    mqtt.mqueue.max_length = 0

    ## Low-water mark of queued messages

    mqtt.mqueue.low_watermark = 20%

    ## High-water mark of queued messages

    mqtt.mqueue.high_watermark = 60%

    ## Queue Qos0 messages?

    mqtt.mqueue.store_qos0 = true

    队列参数说明:

    | MQUEUE.TYPE | 队列类型。SIMPLE: 简单队列,PRIORITY: 优先

    级队列 |
    | ——————— | —————————————————– |
    | mqueue.priority | 主题(Topic)队列优先级设置 |
    | mqueue.max_length | 队列长度, infinity 表示不限制 |
    | mqueue.low_watermark | 解除告警水位线 |
    | mqueue.high_watermark | 队列满告警水位线 |
    | mqueue.qos0 | 是否缓存 QoS0 消息 |

    Broker 参数设置

    broker_sys_interval 设置系统发布 $SYS 消息周期:

    1
    2
    3
    1. System Interval of publishing broker $SYS Messages

    mqtt.broker.sys_interval = 60s

发布订阅(PubSub)参数设置

1
2
3
4
5
6
7
8
9
## PubSub Pool Size. Default should be scheduler

numbers.
mqtt.pubsub.pool_size = 8
mqtt.pubsub.by_clientid = true

## Subscribe Asynchronously

mqtt.pubsub.async = true

桥接(Bridge)参数设置

1
2
3
4
5
6
7
## Bridge Queue Size

mqtt.bridge.max_queue_len = 10000

## Ping Interval of bridge node. Unit: Second

mqtt.bridge.ping_down_interval = 1s

插件(Plugin) 配置目录设置

1
2
3
4
5
6
7
## Dir of plugins' config

mqtt.plugins.etc_dir = etc/plugins/

## File to store loaded plugin names.

mqtt.plugins.loaded_file = data/loaded_plugins
⑧MQTT Listeners 参数说明

EMQ 消息服务器支持 MQTT、MQTT/SSL、MQTT/WS 协议服务端,可通过
listener.tcp|ssl|ws|wss|.* 设置端口、最大允许连接数等参数。
EMQ 2.2 消息服务器默认开启的 TCP 服务端口包括:

1883 MQTT 协议端口
8883 MQTT/SSL 端口
8083 MQTT/WebSocket 端口
8080 HTTP 管理 API 端口
8084 MQTT/WebSocket/SSL 端口

Listener 参数说明:

LISTENER.TCP.${NAME}.ACCEPTORS TCP ACCEPTOR 池
listener.tcp.${name}.max_clients 最大允许 TCP 连接数
listener.tcp.${name}.rate_limit 连接限速配置,例如限速10KB/秒:

“100,10”

MQTT/TCP 监听器 - 1883
EMQ 2.2 版本支持配置多个 MQTT 协议监听器,例如配置 external、internal
两个监听器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
## ##------------------------------------------------------

## External TCP Listener

## External TCP Listener: 1883, 127.0.0.1:1883, ::1:1883

listener.tcp.external = 0.0.0.0:1883

## Size of acceptor pool

listener.tcp.external.acceptors = 16

## Maximum number of concurrent clients

listener.tcp.external.max_clients = 102400
#listener.tcp.external.mountpoint = external/

## Rate Limit. Format is 'burst,rate', Unit is KB/Sec

#listener.tcp.external.rate_limit = 100,10
#listener.tcp.external.access.1 = allow 192.168.0.0/24
listener.tcp.external.access.2 = allow all
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
## Proxy Protocol V1/2

## listener.tcp.external.proxy_protocol = on

## listener.tcp.external.proxy_protocol_timeout = 3s

## TCP Socket Options

listener.tcp.external.backlog = 1024
#listener.tcp.external.recbuf = 4KB
#listener.tcp.external.sndbuf = 4KB
listener.tcp.external.buffer = 4KB
listener.tcp.external.nodelay = true

## ##------------------------------------------------------

## Internal TCP Listener

## Internal TCP Listener: 11883, 127.0.0.1:11883,

::1:11883
listener.tcp.internal = 127.0.0.1:11883

## Size of acceptor pool

listener.tcp.internal.acceptors = 16

## Maximum number of concurrent clients

listener.tcp.internal.max_clients = 102400
#listener.tcp.external.mountpoint = internal/

## Rate Limit. Format is 'burst,rate', Unit is KB/Sec

## listener.tcp.internal.rate_limit = 1000,100

## TCP Socket Options

listener.tcp.internal.backlog = 512
listener.tcp.internal.tune_buffer = on
listener.tcp.internal.buffer = 1MB
listener.tcp.internal.recbuf = 4KB
listener.tcp.internal.sndbuf = 1MB
listener.tcp.internal.nodelay = true

MQTT/SSL 监听器 - 8883

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
## ##------------------------------------------------------

## External SSL Listener

listener.ssl.external = 8883

## Size of acceptor pool

listener.ssl.external.acceptors = 16

## Maximum number of concurrent clients

listener.ssl.external.max_clients = 1024

## listener.ssl.external.mountpoint = inbound/

## Rate Limit. Format is 'burst,rate', Unit is KB/Sec

## listener.ssl.external.rate_limit = 100,10

## Proxy Protocol V1/2

## listener.ssl.external.proxy_protocol = on

## listener.ssl.external.proxy_protocol_timeout = 3s

listener.ssl.external.access.1 = allow all

## SSL Options

listener.ssl.external.handshake_timeout = 15
listener.ssl.external.keyfile = etc/certs/key.pem
listener.ssl.external.certfile = etc/certs/cert.pem

## 开启双向认证

## listener.ssl.external.cacertfile =

etc/certs/cacert.pem

## listener.ssl.external.verify = verify_peer

## listener.ssl.external.fail_if_no_peer_cert = true

MQTT/WebSocket 监听器 - 8083

1
2
3
4
5
6
7
8
## ##------------------------------------------------------

## External MQTT/WebSocket Listener

listener.ws.external = 8083
listener.ws.external.acceptors = 4
listener.ws.external.max_clients = 64
listener.ws.external.access.1 = allow all

MQTT/WebSocket/SSL 监听器 - 8084

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
## ##------------------------------------------------------

## External MQTT/WebSocket/SSL Listener

listener.wss.external = 8084
listener.wss.external.acceptors = 4
listener.wss.external.max_clients = 64
listener.wss.external.access.1 = allow all

## SSL Options

listener.wss.external.handshake_timeout = 15s
listener.wss.external.keyfile = {{ platform_etc_dir
}}/certs/key.pem
listener.wss.external.certfile = {{ platform_etc_dir
}}/certs/cert.pem

## listener.wss.external.cacertfile = {{

platform_etc_dir }}/certs/cacert.pem

## listener.wss.external.verify = verify_peer

## listener.wss.external.fail_if_no_peer_cert = true

HTTP API 监听器 - 8080

1
2
3
4
5
6
7
8
## ##------------------------------------------------------

## HTTP Management API Listener

listener.api.mgmt = 127.0.0.1:8080
listener.api.mgmt.acceptors = 4
listener.api.mgmt.max_clients = 64
listener.api.mgmt.access.1 = allow all
⑨Erlang 虚拟机监控设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
## Long GC, don't monitor in production mode for:

sysmon.long_gc = false

## Long Schedule(ms)

sysmon.long_schedule = 240

## 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.

sysmon.large_heap = 8MB

## Busy Port

sysmon.busy_port = false

## Busy Dist Port

sysmon.busy_dist_port = true
⑩扩展插件配置文件

EMQ 2.2 插件配置文件,全部在 etc/plugins/ 目录:

配置文件 说明
etc/plugins/emq_mod_presence 客户端上下线状态消息发布
etc/plugins/emq_mod_retainer Retain 消息存储插件
etc/plugins/emq_mod_subscription 客户端上线自动主题订阅
etc/plugins/emq_auth_username.conf 用户名、密码认证插件
etc/plugins/emq_auth_clientid.conf ClientId 认证插件
etc/plugins/emq_auth_http.conf HTTP 认证插件配置
etc/plugins/emq_auth_mongo.conf MongoDB 认证插件配置
etc/plugins/emq_auth_mysql.conf MySQL 认证插件配置
etc/plugins/emq_auth_pgsql.conf Postgre 认证插件配置
etc/plugins/emq_auth_redis.conf Redis 认证插件配置
etc/plugins/emq_web_hook.conf Web Hook 插件配置
etc/plugins/emq_lua_hook.conf Lua Hook 插件配置
etc/plugins/emq_coap.conf CoAP 协议服务器配置
etc/plugins/emq_dashboard.conf Dashboard 控制台插件配置
etc/plugins/emq_plugin_template.conf 示例插件模版
etc/plugins/emq_recon.conf Recon 调试插件配置
etc/plugins/emq_reloader.conf 热加载插件配置
etc/plugins/emq_sn.conf MQTT-SN 协议插件配置
etc/plugins/emq_stomp.conf Stomp 协议插件配置

3.管理命令解析

EMQ 消息服务器提供了 ./bin/emqttd_ctl的管理命令行。

①启动服务器
1
2
3
4
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$ ./bin/emqttd
start
Node 'emq@127.0.0.1' is started
emqttd 2.3.11 is running
②status 命令

查询 EMQ 消息服务器运行状态:

1
2
3
4
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl status
Node 'emqttd@127.0.0.1' is started
emqttd 2.0 is running
③broker 命令

broker 命令查询服务器基本信息,启动时间,统计数据与性能数据。

BROKER 查询 EMQ 消息服务器描述、版本、启动时间
broker 查询核心的 Erlang PubSub 进程状态(调试)
pubsub
broker 查询连接(Client)、会话(Session)、主题(Topic)、 订阅

(Subscription)、路由(Route)统计信息
stats
broker 查询 MQTT 报文(Packet)、消息(Message)收发统计
metrics

查询 EMQ 消息服务器基本信息包括版本、启动时间等:

1
2
3
4
5
6
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl broker
sysdescr : Erlang MQTT Broker
version : 2.3.6
uptime : 25 days,3 hours, 38 minutes, 10 seconds
datetime : 2018-08-08 16:52:08

broker stats
查询服务器客户端连接(Client)、会话(Session)、主题(Topic)、订阅
(Subscription)、路由(Route)统计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl broker stats
clients/count : 519
clients/max : 810
retained/count : 3821
retained/max : 3821
routes/count : 557
routes/max : 814
sessions/count : 556
sessions/max : 814
subscribers/count : 1111
subscribers/max : 1625
subscriptions/count : 1111
subscriptions/max : 1625
topics/count : 557
topics/max : 814

broker metrics

查询服务器流量(Bytes)、MQTT报文(Packets)、消息(Messages)收发统计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl broker metrics
bytes/received : 1116429069
bytes/sent : 1114027032
messages/dropped : 50313
messages/qos0/received : 0
messages/qos0/sent : 0
messages/qos1/received : 7837778
messages/qos1/sent : 7954830
messages/qos2/dropped : 0
messages/qos2/received : 0
messages/qos2/sent : 0
messages/received : 7837778
messages/retained : 3821
messages/sent : 7954830
packets/connack : 64632
packets/connect : 67040
packets/disconnect : 0
packets/pingreq : 25769
packets/pingresp : 25769
packets/puback/missed : 402
packets/puback/received : 7931899
packets/puback/sent : 7837778
packets/pubcomp/missed : 0
packets/pubcomp/received: 0
packets/pubcomp/sent : 0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
packets/publish/received: 7837778
packets/publish/sent : 7954830
packets/pubrec/missed : 0
packets/pubrec/received : 0
packets/pubrec/sent : 0
packets/pubrel/missed : 0
packets/pubrel/received : 0
packets/pubrel/sent : 0
packets/received : 15985446
packets/sent : 16005957
packets/suback : 122948
packets/subscribe : 122960
packets/unsuback : 0
packets/unsubscribe : 0
④cluster 命令

cluster 命令集群多个 EMQ 消息服务器节点(进程):

CLUSTER JOIN 加入集群
cluster leave 离开集群
cluster remove 从集群删除节点
cluster status 查询集群状态

cluster 命令集群本机两个 EMQ 节点示例:

目录 节点名 MQTT 端口
emqttd1 emqttd1@127.0.0.1 1883
emqttd2 emqttd2@127.0.0.1 2883

启动 emqttd1

1
cd emqttd1 && ./bin/emqttd start

启动 emqttd2

1
cd emqttd2 && ./bin/emqttd start

emqttd2 节点与 emqttd1 集群,emqttd2 目录下:

1
2
3
4
$ ./bin/emqttd_ctl cluster join emqttd1@127.0.0.1
Join the cluster successfully.
Cluster status: [{running_nodes,
['emqttd1@127.0.0.1','emqttd2@127.0.0.1']}]

任意节点目录下查询集群状态:

1
2
3
$ ./bin/emqttd_ctl cluster status
Cluster status: [{running_nodes,
['emqttd2@127.0.0.1','emqttd1@127.0.0.1']}]

集群消息路由测试:

1
2
3
4
5
6
7
# emqttd1节点上订阅x

mosquitto_sub -t x -q 1 -p 1883

# emqttd2节点上向x发布消息

mosquitto_pub -t x -q 1 -p 2883 -m hello

emqttd2 节点离开集群:

1
cd emqttd2 && ./bin/emqttd_ctl cluster leave

emqttd1 节点下删除 emqttd2:

1
2
cd emqttd1 && ./bin/emqttd_ctl cluster remove
emqttd2@127.0.0.1
⑤clients 命令

clients 命令查询连接的 MQTT 客户端。

CLIENTS LIST 查询全部客户端连接
clients show 根据 ClientId 查询客户端
clients kick 根据 ClientId 踢出客户端

clients list
查询全部客户端连接:

1
2
3
4
5
6
7
8
9
10
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl clients list
Client(867967027640038, clean_sess=false,
username=tt867967027640038,
peername=223.104.255.3:11240, connected_at=1533698992)
Client(egege_web_1_1, clean_sess=true,
username=egege_web, peername=10.161.139.95:47564,
connected_at=1533022825)
Client(sc_vmims_01_2_1, clean_sess=true, username=vmims,
peername=10.161.139.95:39606, connected_at=1533559450)

返回 Client 对象的属性:

CLEAN_SESS 清除会话标记
username 用户名
peername 对端 TCP 地址
connected_at 客户端连接时间

clients show
根据 ClientId 查询客户端:

1
2
3
4
5
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl clients show "867967027640038"
Client(867967027640038, clean_sess=false,
username=tt867967027640038,
peername=223.104.255.3:11240, connected_at=1533698992)

clients kick
根据 ClientId 踢出客户端:

1
2
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl clients kick "867967027640038"
⑥sessions 命令

sessions 命令查询 MQTT 连接会话。EMQ 消息服务器会为每个连接创建会
话。创建临时(transient)会话 –> clean_session 标记 true;创建持久会话
(persistent) –> clean_session 标记为 false。

SESSIONS LIST 查询全部会话
sessions list persistent 查询全部持久会话
sessions list transient 查询全部临时会话
sessions show 根据 ClientID 查询会话

sessions list

查询全部会话:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl sessions list
Session(867967027640038, clean_sess=false,
subscriptions=2, max_inflight=2, inflight=0,
mqueue_len=0, mqueue_dropped=0, awaiting_rel=0,
deliver_msg=2, enqueue_msg=0, created_at=1533619718)
Session(egege_web_1_1, clean_sess=true, subscriptions=2,
max_inflight=2, inflight=0, mqueue_len=0,
mqueue_dropped=800, awaiting_rel=0, deliver_msg=3157831,
enqueue_msg=40061, created_at=1533022825)
Session(sc_vmims_01_2_1, clean_sess=true,
subscriptions=1, max_inflight=2, inflight=0,
mqueue_len=0, mqueue_dropped=899, awaiting_rel=0,
deliver_msg=5115, enqueue_msg=1942,
created_at=1533559450)

返回 Session 对象属性:

CLEAN_SESS FALSE: 持久会话,TRUE: 临时会话
max_inflight 飞行窗口(最大允许同时下发消息数)
inflight_queue 当前正在下发的消息数
message_queue 当前缓存消息数
message_dropped 会话丢掉的消息数
awaiting_rel 等待客户端发送 PUBREL 的 QoS2 消息数
awaiting_ack 等待客户端响应 PUBACK 的 QoS1/2 消息数
awaiting_comp 等待客户端响应 PUBCOMP 的 QoS2 消息数
created_at 会话创建时间戳

sessions list persistent
查询全部持久会话:

1
2
3
4
5
6
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl sessions list persistent
Session(867967027640038, clean_sess=false,
subscriptions=2, max_inflight=2, inflight=0,
mqueue_len=0, mqueue_dropped=0, awaiting_rel=0,
deliver_msg=2, enqueue_msg=0, created_at=1533619718)

sessions list transient
查询全部临时会话:

1
2
3
4
5
$ ./bin/emqttd_ctl sessions list transient
Session(mosqsub/44101-airlee.lo, clean_sess=true,
max_inflight=100, inflight_queue=0, message_queue=0,
message_dropped=0, awaiting_rel=0, awaiting_ack=0,
awaiting_comp=0, created_at=1452935401)

sessions show
根据 ClientId 查询会话:

1
2
3
4
5
6
7
8
9
10
11
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl sessions list transient
Session(egege_web_1_1, clean_sess=true, subscriptions=2,
max_inflight=2, inflight=2, mqueue_len=0,
mqueue_dropped=800, awaiting_rel=0, deliver_msg=3159156,
enqueue_msg=40071, created_at=1533022825)
Session(sc_vmims_01_2_1, clean_sess=true,
subscriptions=1, max_inflight=2, inflight=0,
mqueue_len=0, mqueue_dropped=899, awaiting_rel=0,
deliver_msg=5126, enqueue_msg=1942,
created_at=1533559450)
⑦routes 命令

routes 命令查询路由表。
routes list
查询全部路由:

1
2
3
4
5
6
7
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl routes list
terminal/server/866855039562970 ->
emq_sc_01@10.24.253.192
terminal/+/health -> emq_sc_01@10.24.253.192
terminal/client/+ -> emq_sc_01@10.24.253.192
terminal/server/event/100 -> emq_sc_01@10.24.253.192
⑧topics 命令

topics 命令查询当前的主题(Topic)表。
topics list
查询全部主题(Topic):

1
2
3
4
5
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl topics list
terminal/server/866855039410527
terminal/client/+
terminal/+/health

topics show
查询某个主题(Topic):

1
2
$ ./bin/emqttd_ctl topics show '$SYS/brokers'
$SYS/brokers: static
⑨subscriptions 命令

subscriptions 命令查询消息服务器的订阅(Subscription)表。

SUBSCRIPTIONS LIST 查询全部订阅
subscriptions show 查询某个 ClientId 的订阅

subscriptions list
查询全部订阅:

1
2
3
4
5
6
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl subscriptions list
867187036659701<10186.9605.17> ->
terminal/server/event/100
867187036659701<10186.9605.17> ->
terminal/server/867187036659701

subscriptions show
查询某个 Client 的订阅:

1
2
3
4
5
6
7
8
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl subscriptions show 867967027640038
867967027640038<10186.17806.17> ->
terminal/server/867967027640038
qos : 1
867967027640038<10186.17806.17> ->
terminal/server/event/100
qos : 1
⑩plugins 命令

plugins 命令用于加载、卸载、查询插件应用。EMQ 消息服务器通过插件扩展
认证、定制功能,插件置于 plugins/ 目录下。

PLUGINS LIST 列出全部插件(PLUGIN)
plugins load 加载插件(Plugin)
plugins unload 卸载插件(Plugin)

plugins list
列出全部插件:

1
2
3
4
5
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl plugins list
Plugin(emq_auth_clientid, version=2.3.6,
description=Authentication with ClientId/Password,
active=false)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Plugin(emq_auth_http, version=2.3.6,
description=Authentication/ACL with HTTP API,
active=false)
Plugin(emq_auth_jwt, version=2.3.6,
description=Authentication with JWT, active=false)
Plugin(emq_auth_ldap, version=2.3.6,
description=Authentication/ACL with LDAP, active=false)
Plugin(emq_auth_mongo, version=2.3.6,
description=Authentication/ACL with MongoDB,
active=false)
Plugin(emq_auth_mysql, version=2.3.6,
description=Authentication/ACL with MySQL, active=true)
Plugin(emq_auth_pgsql, version=2.3.6,
description=Authentication/ACL with PostgreSQL,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
active=false)
Plugin(emq_auth_redis, version=2.3.6,
description=Authentication/ACL with Redis, active=false)
Plugin(emq_auth_username, version=2.3.6,
description=Authentication with Username/Password,
active=false)
Plugin(emq_coap, version=2.3.6, description=CoAP
Gateway, active=false)
Plugin(emq_dashboard, version=2.3.6, description=EMQ Web
Dashboard, active=true)
Plugin(emq_lua_hook, version=2.3.6, description=EMQ
Hooks in lua, active=false)
Plugin(emq_modules, version=2.3.6, description=EMQ
Modules, active=true)
Plugin(emq_plugin_template, version=2.3.6,
description=EMQ Plugin Template, active=false)
Plugin(emq_recon, version=2.3.6, description=Recon
Plugin, active=true)
Plugin(emq_reloader, version=2.3.6, description=Reloader
Plugin, active=false)
1
Plugin(emq_retainer, version=2.3.6, description=EMQ
1
2
3
4
5
6
7
Retainer, active=true)
Plugin(emq_sn, version=2.3.6, description=MQTT-SN
Gateway, active=false)
Plugin(emq_stomp, version=2.3.6, description=Stomp
Protocol Plugin, active=false)
Plugin(emq_web_hook, version=2.3.6, description=EMQ
Webhook Plugin, active=false)

插件属性:

VERSION 插件版本
description 插件描述
active 是否已加载

load
加载插件:

1
2
3
4
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl plugins load emq_lua_hook
Start apps: [emq_lua_hook]
Plugin emq_lua_hook loaded successfully.

unload
卸载插件:

1
2
3
[emqtt@lp emqttd]$ ./bin/emqttd_ctl plugins unload
emq_lua_hook
Plugin emq_lua_hook unloaded successfully.
⑩+①bridges 命令

bridges 命令用于在多台 EMQ 服务器节点间创建桥接:


Publisher –> | node1 | –Bridge Forward–> | node2 | –>
Subscriber


BRIDGES LIST 查询全部桥接
bridges options 查询创建桥接选项
bridges start 创建桥接
bridges start 创建桥接并带选项设置
bridges stop 删除桥接

创建一条 emqttd1 -> emqttd2 节点的桥接,转发传感器主题(Topic)消息到
emqttd2:

1
2
3
4
5
$ ./bin/emqttd_ctl bridges start emqttd2@127.0.0.1
sensor/#
bridge is started.
$ ./bin/emqttd_ctl bridges list
bridge: emqttd1@127.0.0.1--sensor/#-->emqttd2@127.0.0.1

测试 emqttd1–sensor/#–>emqttd2 的桥接:

1
2
3
4
#emqttd2节点上
mosquitto_sub -t sensor/# -p 2883 -d
#emqttd1节点上
mosquitto_pub -t sensor/1/temperature -m "37.5" -d

bridge options
查询 bridge 创建选项设置:

1
2
$ ./bin/emqttd_ctl bridges options
Options:
1
2
3
4
5
6
qos = 0 | 1 | 2
prefix = string
suffix = string
queue = integer
Example:
qos=2,prefix=abc/,suffix=/yxz,queue=1000

bridges stop
删除 emqttd1–sensor/#–>emqttd2 的桥接:

1
2
3
$ ./bin/emqttd_ctl bridges stop emqttd2@127.0.0.1
sensor/#
bridge is stopped.
⑩+②vm 命令

vm 命令用于查询 Erlang 虚拟机负载、内存、进程、IO 信息。

VM ALL 查询 VM 全部信息
vm load 查询 VM 负载
vm memory 查询 VM 内存
vm process 查询 VM Erlang 进程数量
vm io 查询 VM io 最大文件句柄

vm all

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl vm all
cpu/load1 : 0.00
cpu/load5 : 0.03
cpu/load15 : 0.05
memory/total : 61039832
memory/processes : 10484664
memory/processes_used : 10482256
memory/system : 50555168
memory/atom : 1213657
memory/atom_used : 1196893
memory/binary : 1478648
memory/code : 28024188
memory/ets : 7885264
1
2
3
4
5
6
process/limit : 262144
process/count : 1437
io/max_fds : 65535
io/active_fds : 1
ports/count : 571
ports/limit : 65536

vm load
查询 VM 负载:

1
2
3
4
5
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl vm load
cpu/load1 : 0.21
cpu/load5 : 0.07
cpu/load15 : 0.06

vm memory
查询 VM 内存:

1
2
3
4
5
6
7
8
9
10
11
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl vm memory
memory/total : 61099792
memory/processes : 10540696
memory/processes_used : 10539864
memory/system : 50559096
memory/atom : 1213657
memory/atom_used : 1196963
memory/binary : 1470992
memory/code : 28024188
memory/ets : 7891376

vm process
查询 Erlang 进程数量:

1
2
3
4
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl vm process
process/limit : 262144
process/count : 1443

vm io

查询 IO 最大句柄数:

1
2
3
4
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl vm io
io/max_fds : 65535
io/active_fds : 1
⑩+③trace 命令

trace 命令用于追踪某个客户端或 Topic,打印日志信息到文件。

TRACE LIST 查询全部开启的追踪
trace client 开启 Client 追踪,日志到文件
trace client off 关闭 Client 追踪
trace topic 开启 Topic 追踪,日志到文件
trace topic off 关闭 Topic 追踪

trace client
开启 Client 追踪:

1
2
3
4
5
6
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl trace client 867967027640038 demolog
trace client 867967027640038 successfully.
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl trace list
trace client 867967027640038 -> demolog

trace client off
关闭 Client 追踪:

1
2
3
4
5
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl trace client 867967027640038 off
stop tracing client 867967027640038 successfully.
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl trace list

trace topic
开启 Topic 追踪:

1
2
3
4
5
6
7
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl trace client terminal/client/+
demolog
trace client terminal/client/+ successfully.
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl trace list
trace client terminal/client/+ -> demolog

trace topic off
关闭 Topic 追踪:

1
2
3
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl trace client terminal/client/+ off
stop tracing client terminal/client/+ successfully.

trace list
列出全部开启的追踪:

1
2
3
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl trace list
trace client terminal/client/+ -> demolog
⑩+④listeners

listeners 命令用于查询开启的 TCP 服务监听器:

1
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
1
2
3
4
5
6
7
./bin/emqttd_ctl listeners
listener on mqtt:api:0.0.0.0:8080
acceptors : 4
max_clients : 64
current_clients : 0
shutdown_count : []
listener on mqtt:wss:8084
1
2
3
4
5
6
7
8
acceptors : 4
max_clients : 64
current_clients : 0
shutdown_count : []
listener on mqtt:ssl:8883
acceptors : 16
max_clients : 1024
current_clients : 0
1
shutdown_count : [{auth_failure,2},{closed,1}]
1
listener on mqtt:ws:8083
1
2
3
4
5
6
7
8
9
10
11
12
13
acceptors : 4
max_clients : 102400
current_clients : 0
shutdown_count : []
listener on mqtt:tcp:0.0.0.0:1883
acceptors : 16
max_clients : 102400
current_clients : 547
shutdown_count : [{idle_timeout,73588},
{keepalive_timeout,39659},{conflict,19126},
{protocol_bad_connect,2413},{closed,298},
{parser_error,5},{auth_failure,2386}]
listener on mqtt:tcp:127.0.0.1:11883
1
2
3
4
5
6
7
8
9
acceptors : 4
max_clients : 102400
current_clients : 0
shutdown_count : []
listener on dashboard:http:18083
acceptors : 4
max_clients : 512
current_clients : 0
shutdown_count : []

listener 参数说明:

ACCEPTORS TCP ACCEPTOR 池
max_clients 最大允许连接数
current_clients 当前连接数
shutdown_count Socket 关闭原因统计

重启监听端口:
listeners restart

1
2
3
4
5
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl listeners restart mqtt:tcp
127.0.0.1:11883
Restart mqtt:tcp listener on 127.0.0.1:11883
successfully.

停止监听端口:
listeners stop

1
2
3
4
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$ [emqtt@lp
emqttd]$ ./bin/emqttd_ctl listeners stop mqtt:tcp
127.0.0.1:11883
Stop mqtt:tcp listener on 127.0.0.1:11883 successfully.

一旦停止无法启动,需要重启emq

⑩+⑤mnesia 命令

查询 mnesia 数据库系统状态。

1
2
3
4
5
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl mnesia
===> System info in version "4.15", debug level = none
<===
opt_disc. Directory
1
2
"/home/emqtt/emqttd/data/mnesia/emq@127.0.0.1" is used.
use fallback at restart = false
1
2
3
running db nodes = ['emq@127.0.0.1']
stopped db nodes = []
master node tables = []
1
remote = []
1
2
3
ram_copies =
[mqtt_retained,mqtt_route,mqtt_session,mqtt_trie,
mqtt_trie_node]
1
2
3
4
5
6
7
8
disc_copies = [mqtt_admin,schema]
disc_only_copies = [][{'emq@127.0.0.1',disc_copies}] = [mqtt_admin,schema][{'emq@127.0.0.1',ram_copies}] =
[mqtt_retained,mqtt_route,mqtt_trie_node,
mqtt_trie,mqtt_session]
2 transactions committed, 8 aborted, 0 restarted, 0
logged to disc
0 held locks, 0 in queue; 0 local transactions, 0 remote
0 transactions waits for other nodes: []
⑩+⑥admins 命令

Dashboard 插件会自动注册 admins 命令,用于创建、删除管理员账号,重置
管理员密码。

ADMINS ADD 创建 ADMIN 账号
admins passwd 重置 admin 密码
ADMINS ADD 创建 ADMIN 账号
admins del 删除 admin 账号

admins add
创建 admin 账户:

1
2
3
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl admins add zhendong 666666
ok

admins passwd
重置 admin 账户密码:

1
2
3
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl admins passwd zhendong 666
ok

admins del
删除 admin 账户:

1
2
3
[sc_emq@iZbp1bpj58bdzdomgul98aZ emqttd]$
./bin/emqttd_ctl admins del zhendong
ok