EMQX转发WEB服务

  3 分钟   12506 字    |    

EMQX转发WEB服务

EMQX开源版并不支持持久化到数据库,但支持webhook,可将数据通过POST方式发送到WEB服务。

此次实践,EMQX为MQTT Broker,SpringBoot为WEB服务框架,InfluxDB为数据库。

EMQX

安装

# 获取镜像
docker pull emqx/emqx:5.0.0
# 启动容器
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:5.0.0

配置

  1. Dashboard - 数据集成 - 数据桥接image-20220915110557032
  2. 资源请求配置image-20220915110715897

如果是docker容器运行EMQX,URL不能写localhost或127.0.0.1,容器无法直接访问宿主机,需配置相关网络,在mac中,docker提供了容器直接访问宿主机的ip:docker.for.mac.host.internal

  1. 规则

image-20220915111101652

从mqtt消息中通过规则筛选有用的消息

测试

EMQX Dashboard提供了websocket客户端,用于直接发送mqtt消息image-20220915111400386

InfluxDB

# 安装
docker pull influxdb:2.4.0
# 启动
docker run \
    --name influxdb \
    -p 8086:8086 \
    --volume $PWD:/var/lib/influxdb2 \ # PWD为当前文件夹,用于持久化
    influxdb:2.4.0

WEB服务

  1. 初始化项目

采用阿里脚手架初始化SpringBoot项目image-20220915112514163

  1. Controller.java
@RequestMapping(value = "/mqtt",method = RequestMethod.POST)
    public void getMessages(@RequestBody() Map<String,Object> params) throws Exception
    {
        logger.info("参数列表 {}",params);
        String username = (String)params.get("username");
        String payload = (String)params.get("payload");
        //从json字符串里获取json对象
        JSONObject json = JSONObject.parseObject(payload);
        String msg = json.getString("msg");
        String value = json.getString("value");
    }

取输入流字符串总是为undefined原因

经排查,是请求体问题,置空就可以了image-20220915135954809

MQTTX输入除payload以外的值在web控制台输出失败

客户端MQTTX 对话窗口中输入的仅仅是payload,但mqtt协议中还包含其他的值,比如消息发布报文

{
  "clientid": "c_emqx",
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "payload": "{\"msg\": \"hello\"}"
}

除payload以外的字段是固定字段,都是系统自动读取的,MQTTX仅限于输入payload

集成InfluxDB

InfluxDB OSS2.0

引入依赖

<dependency>
    <groupId>com.influxdb</groupId>
    <artifactId>influxdb-client-java</artifactId>
    <version>1.15.0</version>
</dependency>

配置参数

spring.influx.url=数据库地址(http://localhost:8086)
spring.influx.token=数据库api凭证
spring.influx.org=组织名
spring.influx.bucket=桶名

配置类

@Configuration
public class InfluxdbConfig {

    @Value("${spring.influx.url:''}")
    private String influxDBUrl;
    @Value("${spring.influx.token:''}")
    private String token;
  	@Value("${spring.influx.org:''}")
    private String org;

    @Bean
    public InfluxDBClient influxDBClient() {
        InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxDBUrl, token.toCharArray(),org);
        influxDBClient.setLogLevel(LogLevel.BASIC);
        return influxDBClient;
    }
}

数据操作

@Repository
public class TimeSeriesRepository {
    @Autowired
    InfluxDBClient influxDBClient;
    @Value("${spring.influx.org:''}")
    private String org;

    @Value("${spring.influx.bucket:''}")
    private String bucket;

    public void save(String measurement,Map<String,String> tags, Map<String,Object> fields){
        WriteOptions writeOptions = WriteOptions.builder()
                .batchSize(5000)
                .flushInterval(1000)
                .bufferLimit(10000)
                .jitterInterval(1000)
                .retryInterval(5000)
                .build();
        try (WriteApi writeApi = influxDBClient.getWriteApi(writeOptions)) {
            Point point = Point
                    .measurement(measurement)
                    .addTags(tags)
                    .addFields(fields)
                    .time(Instant.now(), WritePrecision.NS);
            writeApi.writePoint(bucket, org, point);
        }
    }

    public List<Message> findAll(){
        String flux = "from(bucket: \"test\")\n" +
                "  |> range(start: 0)\n" +
                "  |> filter(fn: (r) => r[\"_measurement\"] == \"mqtt\")\n" +
                "  |> yield(name: \"last\")";
        QueryApi queryApi = influxDBClient.getQueryApi();
        // Query data
        List<Message> list = new ArrayList<>();
        List<FluxTable> tables = queryApi.query(flux);
        for (FluxTable fluxTable : tables) {
            List<FluxRecord> records = fluxTable.getRecords();
            for (FluxRecord fluxRecord : records) {/
                //fluxRecord.getTime()为Instant类,格式化为标准时间
                Date tmpDate = Date.from(fluxRecord.getTime());
                SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String time = formatter.format(tmpDate);
                Message message = new Message(time, fluxRecord.getValueByKey("_field"), fluxRecord.getValueByKey("_value"));
                list.add(message);
            }
        }
        return list;
    }
}

Controller改写

@RestController
public class Controller {
    private final static Logger logger = LoggerFactory.getLogger(Controller.class);

    @Autowired
    TimeSeriesRepository timeSeriesRepository;
    
    //mqtt报文存入InfluxDB
    @RequestMapping(value = "/mqtt",method = RequestMethod.POST)
    public void saveMessages(@RequestBody() Map<String,Object> params){
        logger.info("参数列表 {}",params);
        String username = (String)params.get("username");
        String payload = (String)params.get("payload");
        //从json字符串里获取json对象
        JSONObject json = JSONObject.parseObject(payload);
        String msg = json.getString("msg");
        Integer value = Integer.parseInt(json.getString("value"));
        //创建tags字典和fields字典
        Map<String,String> tags = new HashMap<>();
        tags.put("username",username);
        Map<String,Object> fields = new HashMap<>();
        fields.put("msg",msg);
        fields.put("value",value);

        String measurement = "mqtt";
        timeSeriesRepository.save(measurement,tags,fields);

    }
   //在InfluxDB查询指定表的所有字段值
  	@RequestMapping(value = "/query/{measurement}",method = RequestMethod.GET)
    public List<Message> getMessages(@PathVariable("measurement") String measurement){
        return timeSeriesRepository.findAll(measurement);
    }
  	//设备上下线
  	@RequestMapping(value = "/connect",method = RequestMethod.POST)
    public void getStatus(@RequestBody() Map<String,Object> params){
        logger.info("参数列表 {}",params);
        String event = (String)params.get("event");
        //去除event中不可见字符
        event = event.replaceAll("\\p{C}", "");
        String clientid = (String)params.get("clientid");
        if(event.equals("client.connected")) {
            logger.info("client:{} 上线",clientid);
        }else {
            logger.info("client:{} 下线",clientid);
        }
    }
}

一键部署到服务器

  1. IDEA 安装Alibaba Cloud Toolkitimage-20220915185248186

  2. 工具-Alibaba Cloud

    image-20220915185456406

  3. 设置

  • 选择maven build:将代码打包为jar
  • 选择Target Host:目标服务器
  • Target Directory:放置jar包的文件夹 相当于cd xxx
  • After deploy: 执行的命令
netstat -anp|grep 9999|awk '{printf $7}'|cut -d/ -f1 |xargs kill -9 || true & nohup java -jar /root/VR/iot-1.0.0-SNAPSHOT.jar >> nohup.log 2>&1 &

参考

[ 1 ] 数据桥接快速入门

[ 2 ] docker容器访问宿主机

[ 3 ] 规则语法

[ 4 ] InfluxDB

[ 5 ] Springboot集成Influxdb2

[ 6 ] influxdb-client-java
[ 7 ] EMQX WebHook及案例

~  ~  The   End  ~  ~


 赏 
感谢您的支持,我会继续努力哒!
支付宝收款码
tips
文章二维码 分类标签:技术springbootinfluxdbmqtt数据库
文章标题:EMQX转发WEB服务
文章链接:http://120.46.217.131:82/archives/30/
最后编辑:2022 年 9 月 20 日 15:05 By Yang
许可协议: 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)

相关推荐

热门推荐

(*) 5 + 8 =
快来做第一个评论的人吧~