SpringBoot集成MQTT

  4 分钟   6420 字    |    

SpringBoot集成MQTT

Spring Integration 是一种轻量级消息传递模块,支持通过声明式适配器与外部系统集成,Spring Integration 抽象了用于消息传递的一套规范,并且基于这套规范提供多种企业级的中间件的集成,支持基于 AMQP 的消息队列、MQTT、RMI 等等中间件。

Spring Integration

消息通信模式

image-20230331220051176

核心组件

  • Message:Message是对消息的包装,在Spring 系统中传递的任何消息都会被包装为Message。Message是Spring Integration消息传递的基本单位

  • Message Channel:Message 在Message Channel中进行传递,生产者向管道中投递消息,消费者从管道中取出消息。Spring Integration 支持两种消息传递模型,point-to-point(点对点模型),Publish-subscribe(发布订阅模型)有多种管道类型。

  • Message Endpoint:消息在管道中流动那必定会有某些流入或流出的点亦或是在某个位置(即特定函数)需要对消息进行处理,过滤,格式转换等。这些点即为Message Endpoint(实际为某些处理函数)

  • Message Transformer:将消息进行特定转换例如将一个 Object 序列化为 Json 字符串

  • Message Router:向管道投递消息时可由消息路由根据路由规则选择投递给那个管道

  • Service Activator:将系统服务实例接入到消息系统的泛型切入点,该切入点必须配置输入管道。其返回值可是消息类型也可以是一个消息处理器,当返回值为消息类型时需要指定输出管道,即在该切入点对消息加工处理后再发送到指定的输出管道,如果返回值为消息处理器。那么消息交由消息处理器进行处理。

  • Channel Adapter:因为外部协议有无数种,消息适配器则用于连接不同协议的外部系统。从外部系统读入数据并对数据进行处理最终与Spring Integration 内部的消息系统适配。

集成

  1. 依赖
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
  1. MQTT配置类
@Data
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfig {
    //mqtt Broker 地址
    private String[] uris;
    //连接用户名
    private String username;
    //连接密码
    private String password;
    //入站Client ID
    private String inClientId;
    //出站Client ID
    private String outClientId;
    //默认订阅主题
    private String defaultTopic;

		//MQTT客户端工厂
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(uris);
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }
  	//管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(inClientId,factory,defaultTopic);
    }
		//消息生产者
    @Bean
    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //入站投递给入站管道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setErrorChannel(errorChannel());
        adapter.setQos(0);
        return adapter;
    }
		//出站处理器
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
        MqttPahoMessageHandler handler =
                new MqttPahoMessageHandler(outClientId,factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(defaultTopic);
        return handler;
    }
		//三个管道,用于处理入站消息,出站消息,错误消息
    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }
    @Bean
    public MessageChannel mqttInboundChannel(){
        return new DirectChannel();
    }
    @Bean
    public MessageChannel errorChannel(){
        return new DirectChannel();
    }
}
  1. 配置文件
mqtt.uris=tcp://119.3.209.222:1883
mqtt.username=pi
mqtt.password=raspberry
mqtt.in-client-id=${random.value}
mqtt.out-client-id=${random.value}
mqtt.default-topic=application/analysis/#
mqtt.timeout=1000
mqtt.keepalive=2000
  1. 消息接收器
@Component
public class Receiver {
    @Bean
    //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
        return message -> {
            System.out.println(message.getPayload());
        };
    }
}

通过前文的配置,当MQTT 订阅主题产生消息时会通过 MessageProducer(本例中是一个管道适配器)将消息投递到入站管道中,所以当需要接收并处理Mqtt消息时只需要从入站管道中取出消息即可。取出消息即可使用前文的 Endpoint 本例使用Service Activator

  1. 消息发送器
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttSender {
    void sendToMqtt(String text);
    void sendWithTopic(@Header(MqttHeaders.TOPIC) String topic, String text);
    void sendWithTopicAndQos(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String text);
}

此接口内函数参数将作为消息的负载被包装成为消息并投递到出站管道中

调用MqttSender

@Test
void send(){
    String payload = "hello";
    mqttSender.sendWithTopic("topic/1",payload);
}

参考

  1. https://www.jianshu.com/p/da84fbcd4c5b
  2. https://juejin.cn/post/7008123266023817253
  3. https://juejin.cn/post/7175328199402848312
~  ~  The   End  ~  ~


 赏 
感谢您的支持,我会继续努力哒!
支付宝收款码
tips
文章二维码 分类标签:技术springbootmqtt
文章标题:SpringBoot集成MQTT
文章链接:http://120.46.217.131:82/archives/65/
最后编辑:2023 年 4 月 6 日 09:34 By Yang
许可协议: 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)

相关推荐

热门推荐

(*) 5 + 4 =
快来做第一个评论的人吧~