EMQX转发WEB服务
3 分钟 12506 字 |
EMQX转发WEB服务
EMQX开源版并不支持持久化到数据库,但支持webhook,可将数据通过POST方式发送到WEB服务。
此次实践,EMQX为MQTT Broker,SpringBoot为WEB服务框架,InfluxDB为数据库。
EMQX
安装
# 获取镜像
docker pull emqx/emqx:5.0.0
# 启动容器
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:5.0.0
配置
- Dashboard - 数据集成 - 数据桥接
- 资源请求配置
如果是docker容器运行EMQX,URL不能写localhost或127.0.0.1,容器无法直接访问宿主机,需配置相关网络,在mac中,docker提供了容器直接访问宿主机的ip:docker.for.mac.host.internal
- 规则
从mqtt消息中通过规则筛选有用的消息
测试
EMQX Dashboard提供了websocket客户端,用于直接发送mqtt消息
InfluxDB
# 安装
docker pull influxdb:2.4.0
# 启动
docker run \
--name influxdb \
-p 8086:8086 \
--volume $PWD:/var/lib/influxdb2 \ # PWD为当前文件夹,用于持久化
influxdb:2.4.0
WEB服务
- 初始化项目
采用阿里脚手架初始化SpringBoot项目
- Controller.java
@RequestMapping(value = "/mqtt",method = RequestMethod.POST)
public void getMessages(@RequestBody() Map<String,Object> params) throws Exception
{
logger.info("参数列表 {}",params);
String username = (String)params.get("username");
String payload = (String)params.get("payload");
//从json字符串里获取json对象
JSONObject json = JSONObject.parseObject(payload);
String msg = json.getString("msg");
String value = json.getString("value");
}
取输入流字符串总是为undefined原因
经排查,是请求体问题,置空就可以了
MQTTX输入除payload以外的值在web控制台输出失败
客户端MQTTX 对话窗口中输入的仅仅是payload,但mqtt协议中还包含其他的值,比如消息发布报文
{
"clientid": "c_emqx",
"username": "u_emqx",
"topic": "t/a",
"qos": 1,
"payload": "{\"msg\": \"hello\"}"
}
除payload以外的字段是固定字段,都是系统自动读取的,MQTTX仅限于输入payload
集成InfluxDB
InfluxDB OSS2.0
引入依赖
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>1.15.0</version>
</dependency>
配置参数
spring.influx.url=数据库地址(http://localhost:8086)
spring.influx.token=数据库api凭证
spring.influx.org=组织名
spring.influx.bucket=桶名
配置类
@Configuration
public class InfluxdbConfig {
@Value("${spring.influx.url:''}")
private String influxDBUrl;
@Value("${spring.influx.token:''}")
private String token;
@Value("${spring.influx.org:''}")
private String org;
@Bean
public InfluxDBClient influxDBClient() {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxDBUrl, token.toCharArray(),org);
influxDBClient.setLogLevel(LogLevel.BASIC);
return influxDBClient;
}
}
数据操作
@Repository
public class TimeSeriesRepository {
@Autowired
InfluxDBClient influxDBClient;
@Value("${spring.influx.org:''}")
private String org;
@Value("${spring.influx.bucket:''}")
private String bucket;
public void save(String measurement,Map<String,String> tags, Map<String,Object> fields){
WriteOptions writeOptions = WriteOptions.builder()
.batchSize(5000)
.flushInterval(1000)
.bufferLimit(10000)
.jitterInterval(1000)
.retryInterval(5000)
.build();
try (WriteApi writeApi = influxDBClient.getWriteApi(writeOptions)) {
Point point = Point
.measurement(measurement)
.addTags(tags)
.addFields(fields)
.time(Instant.now(), WritePrecision.NS);
writeApi.writePoint(bucket, org, point);
}
}
public List<Message> findAll(){
String flux = "from(bucket: \"test\")\n" +
" |> range(start: 0)\n" +
" |> filter(fn: (r) => r[\"_measurement\"] == \"mqtt\")\n" +
" |> yield(name: \"last\")";
QueryApi queryApi = influxDBClient.getQueryApi();
// Query data
List<Message> list = new ArrayList<>();
List<FluxTable> tables = queryApi.query(flux);
for (FluxTable fluxTable : tables) {
List<FluxRecord> records = fluxTable.getRecords();
for (FluxRecord fluxRecord : records) {/
//fluxRecord.getTime()为Instant类,格式化为标准时间
Date tmpDate = Date.from(fluxRecord.getTime());
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time = formatter.format(tmpDate);
Message message = new Message(time, fluxRecord.getValueByKey("_field"), fluxRecord.getValueByKey("_value"));
list.add(message);
}
}
return list;
}
}
Controller改写
@RestController
public class Controller {
private final static Logger logger = LoggerFactory.getLogger(Controller.class);
@Autowired
TimeSeriesRepository timeSeriesRepository;
//mqtt报文存入InfluxDB
@RequestMapping(value = "/mqtt",method = RequestMethod.POST)
public void saveMessages(@RequestBody() Map<String,Object> params){
logger.info("参数列表 {}",params);
String username = (String)params.get("username");
String payload = (String)params.get("payload");
//从json字符串里获取json对象
JSONObject json = JSONObject.parseObject(payload);
String msg = json.getString("msg");
Integer value = Integer.parseInt(json.getString("value"));
//创建tags字典和fields字典
Map<String,String> tags = new HashMap<>();
tags.put("username",username);
Map<String,Object> fields = new HashMap<>();
fields.put("msg",msg);
fields.put("value",value);
String measurement = "mqtt";
timeSeriesRepository.save(measurement,tags,fields);
}
//在InfluxDB查询指定表的所有字段值
@RequestMapping(value = "/query/{measurement}",method = RequestMethod.GET)
public List<Message> getMessages(@PathVariable("measurement") String measurement){
return timeSeriesRepository.findAll(measurement);
}
//设备上下线
@RequestMapping(value = "/connect",method = RequestMethod.POST)
public void getStatus(@RequestBody() Map<String,Object> params){
logger.info("参数列表 {}",params);
String event = (String)params.get("event");
//去除event中不可见字符
event = event.replaceAll("\\p{C}", "");
String clientid = (String)params.get("clientid");
if(event.equals("client.connected")) {
logger.info("client:{} 上线",clientid);
}else {
logger.info("client:{} 下线",clientid);
}
}
}
一键部署到服务器
IDEA 安装Alibaba Cloud Toolkit
工具-Alibaba Cloud
设置
- 选择maven build:将代码打包为jar
- 选择Target Host:目标服务器
- Target Directory:放置jar包的文件夹 相当于cd xxx
- After deploy: 执行的命令
netstat -anp|grep 9999|awk '{printf $7}'|cut -d/ -f1 |xargs kill -9 || true & nohup java -jar /root/VR/iot-1.0.0-SNAPSHOT.jar >> nohup.log 2>&1 &
参考
[ 1 ] 数据桥接快速入门
[ 2 ] docker容器访问宿主机
[ 3 ] 规则语法
[ 4 ] InfluxDB
[ 5 ] Springboot集成Influxdb2
[ 6 ] influxdb-client-java
[ 7 ] EMQX WebHook及案例
~ ~ The End ~ ~
分类标签:技术,springboot,influxdb,mqtt,数据库
文章标题:EMQX转发WEB服务
文章链接:http://120.46.217.131:82/archives/30/
最后编辑:2022 年 9 月 20 日 15:05 By Yang
许可协议: 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)
文章标题:EMQX转发WEB服务
文章链接:http://120.46.217.131:82/archives/30/
最后编辑:2022 年 9 月 20 日 15:05 By Yang
许可协议: 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)