|
[size=13.3333px]0 前言
[size=13.3333px]最近在学习MQTT,发现MQTT还是挺好用的,于是花了点时间做了一个简单的应用示例,希望能给需要做这方面的人一些参考。
[size=13.3333px]相关背景知识:[size=13.3333px]http://www.embed-net.com/thread-224-1-1.html
[size=13.3333px]具体功能为:
[size=13.3333px]1,STM32F405为主控芯片,它通过传感器采集环境数据,比如温度,湿度,光照度,大气压强等;
[size=13.3333px]2,主控芯片通过W5500模块将测量的数据通过MQTT协议方式发布到MQTT服务器(服务器域名和IP见固件程序);
[size=13.3333px]3,主控订阅LED灯控制的消息,当接收到对应的控制指令后点亮或者熄灭对应的LED灯;
[size=13.3333px]4,安卓手机端订阅传感器数据的消息,当接收到消息后将传感器数据在界面显示;
[size=13.3333px]5,安卓手机可发送点亮或者熄灭LED灯的指令到服务器,然后服务器会将该指令转发给STM32主控,然后STM32主控解析该指令并执行指令。
[size=13.3333px]1 单片机端实现
[size=13.3333px]MQTT协议是基于TCP的协议,所以我们只需要在单片机端实现TCP客户端代码之后就很容易移植MQTT了,STM32F4+W5500实现TCP客户端的代码我们以前已经实现过,代码下载地址为:
[size=13.3333px]http://www.embed-net.com/thread-87-1-1.html
[size=13.3333px]当然,如果你想在代码里面直接使用服务器域名方式进行连接,我们还得在TCP客户端代码里面集成DNS的代码,当然在上面这个连接里面也有相关的代码。
[size=13.3333px]MQTT代码源码下载地址:
[size=13.3333px]http://www.eclipse.org/paho/
[size=13.3333px]在STM32这边我们使用的是C/C++ MQTT Embedded clients代码。
[size=13.3333px]硬件连接如下图所示:
[size=13.3333px]1.1 MQTT的移植
[size=13.3333px]MQTT的移植非常简单,将C/C++ MQTT Embedded clients的代码添加到工程中,然后我们只需要再次封装4个函数即可:
[size=13.3333px]int transport_sendPacketBuffer(unsigned char* buf, int buflen);
[size=13.3333px]int transport_getdata(unsigned char* buf, int count);
[size=13.3333px]int transport_open(void);
[size=13.3333px]int transport_close(void);
[size=13.3333px]transport_sendPacketBuffer:通过网络以TCP的方式发送数据;
[size=13.3333px]transport_getdata:TCP方式从服务器端读取数据,该函数目前属于阻塞函数;
[size=13.3333px]transport_open:打开一个网络接口,其实就是和服务器建立一个TCP连接;
[size=13.3333px]transport_close:关闭网络接口。
[size=13.3333px]如果已经移植好了socket方式的TCP客户端的程序,那么这几个函数的封装也是非常简单的,程序代码如下所示:
[size=13.3333px]/**
[size=13.3333px] * @brief 通过TCP方式发送数据到TCP服务器
[size=13.3333px] * @param buf 数据首地址
[size=13.3333px] * @param buflen 数据长度
[size=13.3333px] * @retval 小于0表示发送失败
[size=13.3333px] */
[size=13.3333px]int transport_sendPacketBuffer(unsigned char* buf, int buflen)
[size=13.3333px]{
[size=13.3333px] return send(SOCK_TCPS,buf,buflen);
[size=13.3333px]}
[size=13.3333px]/**
[size=13.3333px] * @brief 阻塞方式接收TCP服务器发送的数据
[size=13.3333px] * @param buf 数据存储首地址
[size=13.3333px] * @param count 数据缓冲区长度
[size=13.3333px] * @retval 小于0表示接收数据失败
[size=13.3333px] */
[size=13.3333px]int transport_getdata(unsigned char* buf, int count)
[size=13.3333px]{
[size=13.3333px] return recv(SOCK_TCPS,buf,count);
[size=13.3333px]}
[size=13.3333px]/**
[size=13.3333px] * @brief 打开一个socket并连接到服务器
[size=13.3333px] * @param 无
[size=13.3333px] * @retval 小于0表示打开失败
[size=13.3333px] */
[size=13.3333px]int transport_open(void)
[size=13.3333px]{
[size=13.3333px] int32_t ret;
[size=13.3333px] //新建一个Socket并绑定本地端口5000
[size=13.3333px] ret = socket(SOCK_TCPS,Sn_MR_TCP,5000,0x00);
[size=13.3333px] if(ret != SOCK_TCPS){
[size=13.3333px] printf("%d:Socket Error\r\n",SOCK_TCPS);
[size=13.3333px] while(1);
[size=13.3333px] }else{
[size=13.3333px] printf("%d:Opened\r\n",SOCK_TCPS);
[size=13.3333px] }
[size=13.3333px] //连接TCP服务器
[size=13.3333px] ret = connect(SOCK_TCPS,domain_ip,1883);//端口必须为1883
[size=13.3333px] if(ret != SOCK_OK){
[size=13.3333px] printf("%d:Socket Connect Error\r\n",SOCK_TCPS);
[size=13.3333px] while(1);
[size=13.3333px] }else{
[size=13.3333px] printf("%d:Connected\r\n",SOCK_TCPS);
[size=13.3333px] }
[size=13.3333px] return 0;
[size=13.3333px]}
[size=13.3333px]/**
[size=13.3333px] * @brief 关闭socket
[size=13.3333px] * @param 无
[size=13.3333px] * @retval 小于0表示关闭失败
[size=13.3333px] */
[size=13.3333px]int transport_close(void)
[size=13.3333px]{
[size=13.3333px] close(SOCK_TCPS);
[size=13.3333px] return 0;
[size=13.3333px]}
[size=13.3333px]完成了这几个函数,然后我们就可以根据官方提供的示例代码实现我们自己的代码了,比如我们向代理服务器发送一个消息的代码如下所示:
[size=13.3333px]/**
[size=13.3333px] * @brief 向代理(服务器)发送一个消息
[size=13.3333px] * @param pTopic 消息主题
[size=13.3333px] * @param pMessage 消息内容
[size=13.3333px] * @retval 小于0表示发送失败
[size=13.3333px] */
[size=13.3333px]int mqtt_publish(char *pTopic,char *pMessage)
[size=13.3333px]{
[size=13.3333px] int32_t len,rc;
[size=13.3333px] MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
[size=13.3333px] unsigned char buf[200];
[size=13.3333px] MQTTString topicString = MQTTString_initializer;
[size=13.3333px] int msglen = strlen(pMessage);
[size=13.3333px] int buflen = sizeof(buf);
[size=13.3333px] data.clientID.cstring = "me";
[size=13.3333px] data.keepAliveInterval = 5;
[size=13.3333px] data.cleansession = 1;
[size=13.3333px] len = MQTTSerialize_connect(buf, buflen, &data); /* 1 */
[size=13.3333px] topicString.cstring = pTopic;
[size=13.3333px] len += MQTTSerialize_publish(buf + len, buflen - len, 0, 0, 0, 0, topicString, (unsigned char*)pMessage, msglen); /* 2 */
[size=13.3333px] len += MQTTSerialize_disconnect(buf + len, buflen - len); /* 3 */
[size=13.3333px] transport_open();
[size=13.3333px] rc = transport_sendPacketBuffer(buf,len);
[size=13.3333px] transport_close();
[size=13.3333px] if (rc == len)
[size=13.3333px] printf("Successfully published\n\r");
[size=13.3333px] else
[size=13.3333px] printf("Publish failed\n\r");
[size=13.3333px] return 0;
[size=13.3333px]}
[size=13.3333px]下面我们看下主函数的代码,思路也比较清晰:
[size=13.3333px]int main(void)
[size=13.3333px]{
[size=13.3333px] static char meassage[200];
[size=13.3333px] int rc;
[size=13.3333px] char *led;
[size=13.3333px] char led_value;
[size=13.3333px] float temperature,humidity,light,pressure;
[size=13.3333px] srand(0);
[size=13.3333px] //配置LED灯引脚
[size=13.3333px] LED_Config();
[size=13.3333px] //初始化配置网络
[size=13.3333px] network_init();
[size=13.3333px] while(1){
[size=13.3333px] memset(meassage,0,sizeof(meassage));
[size=13.3333px] //订阅消息
[size=13.3333px] rc = mqtt_subscrib("pyboard_led",meassage);
[size=13.3333px] printf("rc = %d\n\r",rc);
[size=13.3333px] if(rc >= 0){
[size=13.3333px] printf("meassage = %s\n\r",meassage);
[size=13.3333px] //解析JSON格式字符串并点亮相应的LED灯
[size=13.3333px] cJSON *root = cJSON_Parse(meassage);
[size=13.3333px] if(root != NULL){
[size=13.3333px] led = cJSON_GetObjectItem(root,"led")->valuestring;
[size=13.3333px] printf("led = %s\n\r",led);
[size=13.3333px] led_value = cJSON_GetObjectItem(root,"value")->valueint;
[size=13.3333px] if(!strcmp(led,"red")){
[size=13.3333px] if(led_value){
[size=13.3333px] LED_On(LED_RED);
[size=13.3333px] }else{
[size=13.3333px] LED_Off(LED_RED);
[size=13.3333px] }
[size=13.3333px] }else if(!strcmp(led,"green")){
[size=13.3333px] if(led_value){
[size=13.3333px] LED_On(LED_GREEN);
[size=13.3333px] }else{
[size=13.3333px] LED_Off(LED_GREEN);
[size=13.3333px] }
[size=13.3333px] }else if(!strcmp(led,"blue")){
[size=13.3333px] if(led_value){
[size=13.3333px] LED_On(LED_BLUE);
[size=13.3333px] }else{
[size=13.3333px] LED_Off(LED_BLUE);
[size=13.3333px] }
[size=13.3333px] }else if(!strcmp(led,"yellow")){
[size=13.3333px] if(led_value){
[size=13.3333px] LED_On(LED_YELLOW);
[size=13.3333px] printf("Yellow On\n\r");
[size=13.3333px] }else{
[size=13.3333px] LED_Off(LED_YELLOW);
[size=13.3333px] printf("Yellow Off\n\r");
[size=13.3333px] }
[size=13.3333px] }
[size=13.3333px] // 释放内存空间
[size=13.3333px] cJSON_Delete(root);
[size=13.3333px] }else{
[size=13.3333px] printf("Error before: [%s]\n\r",cJSON_GetErrorPtr());
[size=13.3333px] }
[size=13.3333px] }
[size=13.3333px] delay_ms(500);
[size=13.3333px] //获取传感器测量数据,该示例使用随机数
[size=13.3333px] temperature = rand()%50;
[size=13.3333px] humidity = rand()%100;
[size=13.3333px] light = rand()%1000;
[size=13.3333px] pressure = rand()%1000;
[size=13.3333px] //将数据合成为JSON格式数据
[size=13.3333px] sprintf(meassage,"{\"temperature\":%.1f,\"humidity\":%.1f,\"light\":%.1f,\"pressure\":%.1f}",temperature,humidity,light,pressure);
[size=13.3333px] //将数据发送出去
[size=13.3333px] mqtt_publish("pyboard_value",meassage);
[size=13.3333px] }
[size=13.3333px]}
[size=13.3333px]完整工程代码可在后面的附件下载。
[size=13.3333px]2 手机端代码实现
[size=13.3333px]手机端我们也使用官方提供的Java库Java client and utilities,下载地址:
[size=13.3333px]http://www.eclipse.org/paho/
[size=13.3333px]将jar文件添加到工程中即可,程序界面如下所示:
[size=13.3333px]上面4个条目分别显示STM32单片机通过W5500发送到服务器端的传感器测量数据;
[size=13.3333px]下面4个图片分别控制板子上的4个LED灯;
[size=13.3333px]消息发送我们采用线程的方式发送,接收采用回调函数方式接收消息。
2.1 实现消息发送
[size=13.3333px]发送消息的代码如下所示:
[size=13.3333px]/**
[size=13.3333px] * send message
[size=13.3333px] */
[size=13.3333px] class PublishThread extends Thread {
[size=13.3333px] String topic;
[size=13.3333px] MqttMessage message;
[size=13.3333px] int qos = 0;
[size=13.3333px] MemoryPersistence persistence = new MemoryPersistence();
[size=13.3333px] PublishThread(String topic,String message){
[size=13.3333px] this.topic = topic;
[size=13.3333px] this.message = new MqttMessage(message.getBytes());
[size=13.3333px] }
[size=13.3333px] public void sendMessage(String topic,String message){
[size=13.3333px] this.topic = topic;
[size=13.3333px] this.message = new MqttMessage(message.getBytes());
[size=13.3333px] run();
[size=13.3333px] }
[size=13.3333px] @Override
[size=13.3333px] public void run() {
[size=13.3333px] try {
[size=13.3333px] MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
[size=13.3333px] MqttConnectOptions connOpts = new MqttConnectOptions();
[size=13.3333px] connOpts.setCleanSession(true);
[size=13.3333px] connOpts.setKeepAliveInterval(1);
[size=13.3333px] System.out.println("Connecting to broker: " + broker);
[size=13.3333px] sampleClient.connect(connOpts);
[size=13.3333px] System.out.println("Connected");
[size=13.3333px] System.out.println("Publishing message: " + message.toString());
[size=13.3333px] message.setQos(qos);
[size=13.3333px] sampleClient.publish(topic, message);
[size=13.3333px] System.out.println("Message published");
[size=13.3333px] sampleClient.disconnect();
[size=13.3333px] System.out.println("Disconnected");
[size=13.3333px] }catch(MqttException me) {
[size=13.3333px] System.out.println("reason "+me.getReasonCode());
[size=13.3333px] System.out.println("msg "+me.getMessage());
[size=13.3333px] System.out.println("loc "+me.getLocalizedMessage());
[size=13.3333px] System.out.println("cause "+me.getCause());
[size=13.3333px] System.out.println("excep "+me);
[size=13.3333px] me.printStackTrace();
[size=13.3333px] }
[size=13.3333px] }
[size=13.3333px] }
2.2 实现消息接收
[size=13.3333px]接收消息的代码如下所示:
[size=13.3333px]/**
[size=13.3333px] * receive message
[size=13.3333px] */
[size=13.3333px]class SubscribeThread extends Thread{
[size=13.3333px] final String topic;
[size=13.3333px] MemoryPersistence persistence = new MemoryPersistence();
[size=13.3333px] SubscribeThread(String topic){
[size=13.3333px] this.topic = topic;
[size=13.3333px] }
[size=13.3333px] @Override
[size=13.3333px] public void run(){
[size=13.3333px] try {
[size=13.3333px] final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
[size=13.3333px] final MqttConnectOptions connOpts = new MqttConnectOptions();
[size=13.3333px] connOpts.setCleanSession(true);
[size=13.3333px] System.out.println("Connecting to broker: " + broker);
[size=13.3333px] connOpts.setKeepAliveInterval(5);
[size=13.3333px] sampleClient.setCallback(new MqttCallback() {
[size=13.3333px] @Override
[size=13.3333px] public void connectionLost(Throwable throwable) {
[size=13.3333px] System.out.println("connectionLost");
[size=13.3333px] try {
[size=13.3333px] sampleClient.connect(connOpts);
[size=13.3333px] sampleClient.subscribe(topic);
[size=13.3333px] }catch (MqttException e){
[size=13.3333px] e.printStackTrace();
[size=13.3333px] }
[size=13.3333px] }
[size=13.3333px] @Override
[size=13.3333px] public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
[size=13.3333px] System.out.println("messageArrived:"+mqttMessage.toString());
[size=13.3333px] System.out.println(topic);
[size=13.3333px] System.out.println(mqttMessage.toString());
[size=13.3333px] try {
[size=13.3333px] JSONTokener jsonParser = new JSONTokener(mqttMessage.toString());
[size=13.3333px] JSONObject person = (JSONObject) jsonParser.nextValue();
[size=13.3333px] temperature = person.getDouble("temperature");
[size=13.3333px] humidity = person.getDouble("humidity");
[size=13.3333px] light = person.getDouble("light");
[size=13.3333px] pressure = person.getDouble("pressure");
[size=13.3333px] System.out.println("temperature = " + temperature);
[size=13.3333px] System.out.println("humidity = " + humidity);
[size=13.3333px] runOnUiThread(new Runnable() {
[size=13.3333px] @Override
[size=13.3333px] public void run() {
[size=13.3333px] temperatureTextView.setText(String.format("%.1f", temperature));
[size=13.3333px] humidityTextView.setText(String.format("%.1f", humidity));
[size=13.3333px] lightTextView.setText(String.format("%.1f", light));
[size=13.3333px] pressureTextView.setText(String.format("%.1f", pressure));
[size=13.3333px] }
[size=13.3333px] });
[size=13.3333px] } catch (JSONException ex) {
[size=13.3333px] ex.printStackTrace();
[size=13.3333px] }
[size=13.3333px] }
[size=13.3333px] @Override
[size=13.3333px] public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
[size=13.3333px] System.out.println("deliveryComplete");
[size=13.3333px] }
[size=13.3333px] });
[size=13.3333px] sampleClient.connect(connOpts);
[size=13.3333px] sampleClient.subscribe(topic);
[size=13.3333px] } catch(MqttException me) {
[size=13.3333px] System.out.println("reason "+me.getReasonCode());
[size=13.3333px] System.out.println("msg "+me.getMessage());
[size=13.3333px] System.out.println("loc "+me.getLocalizedMessage());
[size=13.3333px] System.out.println("cause "+me.getCause());
[size=13.3333px] System.out.println("excep "+me);
[size=13.3333px] me.printStackTrace();
[size=13.3333px] }
[size=13.3333px] }
[size=13.3333px]}
3 实测效果
[size=13.3333px]1,单片机端定时更新传感器数据,手机端也会同步更新;
[size=13.3333px]2,手机端点击4个LED控制的按钮,板子上也会点亮或者熄灭对应的LED;
4 源码下载
[size=13.3333px]http://www.embed-net.com/thread-230-1-1.html
|
|