| 
 | 
 
[size=13.3333px]0 前言 
[size=13.3333px]最近在学习MQTT,发现MQTT还是挺好用的,于是花了点时间做了一个简单的应用示例,希望能给需要做这方面的人一些参考。 
[size=13.3333px]相关背景知识:[size=13.3333px]http://www.embed-net.com/thread-224-1-1.html 
[size=13.3333px]具体功能为: 
[size=13.3333px]1,STM32F405为主控芯片,它通过传感器采集环境数据,比如温度,湿度,光照度,大气压强等; 
[size=13.3333px]2,主控芯片通过W5500模块将测量的数据通过MQTT协议方式发布到MQTT服务器(服务器域名和IP见固件程序); 
[size=13.3333px]3,主控订阅LED灯控制的消息,当接收到对应的控制指令后点亮或者熄灭对应的LED灯; 
[size=13.3333px]4,安卓手机端订阅传感器数据的消息,当接收到消息后将传感器数据在界面显示; 
[size=13.3333px]5,安卓手机可发送点亮或者熄灭LED灯的指令到服务器,然后服务器会将该指令转发给STM32主控,然后STM32主控解析该指令并执行指令。 
 
[size=13.3333px]1 单片机端实现 
[size=13.3333px]MQTT协议是基于TCP的协议,所以我们只需要在单片机端实现TCP客户端代码之后就很容易移植MQTT了,STM32F4+W5500实现TCP客户端的代码我们以前已经实现过,代码下载地址为: 
[size=13.3333px]http://www.embed-net.com/thread-87-1-1.html 
[size=13.3333px]当然,如果你想在代码里面直接使用服务器域名方式进行连接,我们还得在TCP客户端代码里面集成DNS的代码,当然在上面这个连接里面也有相关的代码。 
[size=13.3333px]MQTT代码源码下载地址: 
[size=13.3333px]http://www.eclipse.org/paho/ 
[size=13.3333px]在STM32这边我们使用的是C/C++ MQTT Embedded clients代码。 
[size=13.3333px]硬件连接如下图所示: 
 
[size=13.3333px]1.1 MQTT的移植 
[size=13.3333px]MQTT的移植非常简单,将C/C++ MQTT Embedded clients的代码添加到工程中,然后我们只需要再次封装4个函数即可: 
[size=13.3333px]int transport_sendPacketBuffer(unsigned char* buf, int buflen); 
[size=13.3333px]int transport_getdata(unsigned char* buf, int count); 
[size=13.3333px]int transport_open(void); 
[size=13.3333px]int transport_close(void); 
 
[size=13.3333px]transport_sendPacketBuffer:通过网络以TCP的方式发送数据; 
[size=13.3333px]transport_getdata:TCP方式从服务器端读取数据,该函数目前属于阻塞函数; 
[size=13.3333px]transport_open:打开一个网络接口,其实就是和服务器建立一个TCP连接; 
[size=13.3333px]transport_close:关闭网络接口。 
[size=13.3333px]如果已经移植好了socket方式的TCP客户端的程序,那么这几个函数的封装也是非常简单的,程序代码如下所示: 
[size=13.3333px]/** 
[size=13.3333px]  * @brief  通过TCP方式发送数据到TCP服务器 
[size=13.3333px]  * @param  buf 数据首地址 
[size=13.3333px]  * @param  buflen 数据长度 
[size=13.3333px]  * @retval 小于0表示发送失败 
[size=13.3333px]  */ 
[size=13.3333px]int transport_sendPacketBuffer(unsigned char* buf, int buflen) 
[size=13.3333px]{ 
[size=13.3333px]  return send(SOCK_TCPS,buf,buflen); 
[size=13.3333px]} 
[size=13.3333px]/** 
[size=13.3333px]  * @brief  阻塞方式接收TCP服务器发送的数据 
[size=13.3333px]  * @param  buf 数据存储首地址 
[size=13.3333px]  * @param  count 数据缓冲区长度 
[size=13.3333px]  * @retval 小于0表示接收数据失败 
[size=13.3333px]  */ 
[size=13.3333px]int transport_getdata(unsigned char* buf, int count) 
[size=13.3333px]{ 
[size=13.3333px]  return recv(SOCK_TCPS,buf,count); 
[size=13.3333px]} 
 
 
[size=13.3333px]/** 
[size=13.3333px]  * @brief  打开一个socket并连接到服务器 
[size=13.3333px]  * @param  无 
[size=13.3333px]  * @retval 小于0表示打开失败 
[size=13.3333px]  */ 
[size=13.3333px]int transport_open(void) 
[size=13.3333px]{ 
[size=13.3333px]  int32_t ret; 
[size=13.3333px]  //新建一个Socket并绑定本地端口5000 
[size=13.3333px]  ret = socket(SOCK_TCPS,Sn_MR_TCP,5000,0x00); 
[size=13.3333px]  if(ret != SOCK_TCPS){ 
[size=13.3333px]    printf("%d:Socket Error\r\n",SOCK_TCPS); 
[size=13.3333px]    while(1); 
[size=13.3333px]  }else{ 
[size=13.3333px]    printf("%d:Opened\r\n",SOCK_TCPS); 
[size=13.3333px]  } 
 
[size=13.3333px]  //连接TCP服务器 
[size=13.3333px]  ret = connect(SOCK_TCPS,domain_ip,1883);//端口必须为1883 
[size=13.3333px]  if(ret != SOCK_OK){ 
[size=13.3333px]    printf("%d:Socket Connect Error\r\n",SOCK_TCPS); 
[size=13.3333px]    while(1); 
[size=13.3333px]  }else{ 
[size=13.3333px]    printf("%d:Connected\r\n",SOCK_TCPS); 
[size=13.3333px]  }      
[size=13.3333px]    return 0; 
[size=13.3333px]} 
[size=13.3333px]/** 
[size=13.3333px]  * @brief  关闭socket 
[size=13.3333px]  * @param  无 
[size=13.3333px]  * @retval 小于0表示关闭失败 
[size=13.3333px]  */ 
[size=13.3333px]int transport_close(void) 
[size=13.3333px]{ 
[size=13.3333px]  close(SOCK_TCPS); 
[size=13.3333px]  return 0; 
[size=13.3333px]} 
 
 
[size=13.3333px]完成了这几个函数,然后我们就可以根据官方提供的示例代码实现我们自己的代码了,比如我们向代理服务器发送一个消息的代码如下所示: 
[size=13.3333px]/** 
[size=13.3333px]  * @brief  向代理(服务器)发送一个消息 
[size=13.3333px]  * @param  pTopic 消息主题 
[size=13.3333px]  * @param  pMessage 消息内容 
[size=13.3333px]  * @retval 小于0表示发送失败 
[size=13.3333px]  */ 
[size=13.3333px]int mqtt_publish(char *pTopic,char *pMessage) 
[size=13.3333px]{ 
[size=13.3333px]  int32_t len,rc; 
[size=13.3333px]  MQTTPacket_connectData data = MQTTPacket_connectData_initializer; 
[size=13.3333px]  unsigned char buf[200]; 
[size=13.3333px]  MQTTString topicString = MQTTString_initializer; 
[size=13.3333px]  int msglen = strlen(pMessage); 
[size=13.3333px]  int buflen = sizeof(buf); 
 
[size=13.3333px]  data.clientID.cstring = "me"; 
[size=13.3333px]  data.keepAliveInterval = 5; 
[size=13.3333px]  data.cleansession = 1; 
[size=13.3333px]  len = MQTTSerialize_connect(buf, buflen, &data); /* 1 */ 
 
[size=13.3333px]  topicString.cstring = pTopic; 
[size=13.3333px]  len += MQTTSerialize_publish(buf + len, buflen - len, 0, 0, 0, 0, topicString, (unsigned char*)pMessage, msglen); /* 2 */ 
 
[size=13.3333px]  len += MQTTSerialize_disconnect(buf + len, buflen - len); /* 3 */ 
[size=13.3333px]  transport_open();  
[size=13.3333px]  rc = transport_sendPacketBuffer(buf,len); 
[size=13.3333px]  transport_close(); 
[size=13.3333px]    if (rc == len) 
[size=13.3333px]        printf("Successfully published\n\r"); 
[size=13.3333px]    else 
[size=13.3333px]        printf("Publish failed\n\r"); 
[size=13.3333px]  return 0; 
[size=13.3333px]} 
 
[size=13.3333px]下面我们看下主函数的代码,思路也比较清晰: 
[size=13.3333px]int main(void) 
[size=13.3333px]{ 
[size=13.3333px]  static char meassage[200]; 
[size=13.3333px]  int rc; 
[size=13.3333px]  char *led; 
[size=13.3333px]  char led_value; 
[size=13.3333px]  float temperature,humidity,light,pressure; 
[size=13.3333px]  srand(0); 
[size=13.3333px]  //配置LED灯引脚 
[size=13.3333px]  LED_Config(); 
[size=13.3333px]    //初始化配置网络 
[size=13.3333px]    network_init(); 
[size=13.3333px]  while(1){ 
[size=13.3333px]    memset(meassage,0,sizeof(meassage)); 
[size=13.3333px]    //订阅消息 
[size=13.3333px]    rc = mqtt_subscrib("pyboard_led",meassage); 
[size=13.3333px]    printf("rc = %d\n\r",rc); 
[size=13.3333px]    if(rc >= 0){ 
[size=13.3333px]      printf("meassage = %s\n\r",meassage); 
[size=13.3333px]      //解析JSON格式字符串并点亮相应的LED灯 
[size=13.3333px]      cJSON *root = cJSON_Parse(meassage);  
[size=13.3333px]      if(root != NULL){ 
[size=13.3333px]        led = cJSON_GetObjectItem(root,"led")->valuestring; 
[size=13.3333px]        printf("led = %s\n\r",led); 
[size=13.3333px]        led_value = cJSON_GetObjectItem(root,"value")->valueint; 
[size=13.3333px]        if(!strcmp(led,"red")){ 
[size=13.3333px]          if(led_value){ 
[size=13.3333px]            LED_On(LED_RED); 
[size=13.3333px]          }else{ 
[size=13.3333px]            LED_Off(LED_RED); 
[size=13.3333px]          } 
[size=13.3333px]        }else if(!strcmp(led,"green")){ 
[size=13.3333px]          if(led_value){ 
[size=13.3333px]            LED_On(LED_GREEN); 
[size=13.3333px]          }else{ 
[size=13.3333px]            LED_Off(LED_GREEN); 
[size=13.3333px]          } 
[size=13.3333px]        }else if(!strcmp(led,"blue")){ 
[size=13.3333px]          if(led_value){ 
[size=13.3333px]            LED_On(LED_BLUE); 
[size=13.3333px]          }else{ 
[size=13.3333px]            LED_Off(LED_BLUE); 
[size=13.3333px]          } 
[size=13.3333px]        }else if(!strcmp(led,"yellow")){ 
[size=13.3333px]          if(led_value){ 
[size=13.3333px]            LED_On(LED_YELLOW); 
[size=13.3333px]            printf("Yellow On\n\r"); 
[size=13.3333px]          }else{ 
[size=13.3333px]            LED_Off(LED_YELLOW); 
[size=13.3333px]            printf("Yellow Off\n\r"); 
[size=13.3333px]          } 
[size=13.3333px]        } 
[size=13.3333px]        // 释放内存空间   
[size=13.3333px]        cJSON_Delete(root);  
[size=13.3333px]      }else{ 
[size=13.3333px]        printf("Error before: [%s]\n\r",cJSON_GetErrorPtr());   
[size=13.3333px]      } 
[size=13.3333px]    } 
[size=13.3333px]    delay_ms(500); 
[size=13.3333px]    //获取传感器测量数据,该示例使用随机数 
[size=13.3333px]    temperature = rand()%50; 
[size=13.3333px]    humidity = rand()%100; 
[size=13.3333px]    light = rand()%1000; 
[size=13.3333px]    pressure = rand()%1000; 
[size=13.3333px]    //将数据合成为JSON格式数据 
[size=13.3333px]    sprintf(meassage,"{\"temperature\":%.1f,\"humidity\":%.1f,\"light\":%.1f,\"pressure\":%.1f}",temperature,humidity,light,pressure); 
[size=13.3333px]    //将数据发送出去 
[size=13.3333px]    mqtt_publish("pyboard_value",meassage); 
[size=13.3333px]  } 
[size=13.3333px]} 
 
[size=13.3333px]完整工程代码可在后面的附件下载。 
 
[size=13.3333px]2 手机端代码实现 
[size=13.3333px]手机端我们也使用官方提供的Java库Java client and utilities,下载地址: 
[size=13.3333px]http://www.eclipse.org/paho/ 
[size=13.3333px]将jar文件添加到工程中即可,程序界面如下所示: 
 
[size=13.3333px]上面4个条目分别显示STM32单片机通过W5500发送到服务器端的传感器测量数据; 
[size=13.3333px]下面4个图片分别控制板子上的4个LED灯; 
[size=13.3333px]消息发送我们采用线程的方式发送,接收采用回调函数方式接收消息。 
 
2.1 实现消息发送 
[size=13.3333px]发送消息的代码如下所示: 
[size=13.3333px]/** 
[size=13.3333px]   * send message 
[size=13.3333px]   */ 
[size=13.3333px]  class PublishThread extends Thread { 
[size=13.3333px]      String topic; 
[size=13.3333px]      MqttMessage message; 
[size=13.3333px]      int qos = 0; 
[size=13.3333px]      MemoryPersistence persistence = new MemoryPersistence(); 
[size=13.3333px]      PublishThread(String topic,String message){ 
[size=13.3333px]          this.topic = topic; 
[size=13.3333px]          this.message = new MqttMessage(message.getBytes()); 
[size=13.3333px]      } 
[size=13.3333px]      public void sendMessage(String topic,String message){ 
[size=13.3333px]          this.topic = topic; 
[size=13.3333px]          this.message = new MqttMessage(message.getBytes()); 
[size=13.3333px]          run(); 
[size=13.3333px]      } 
[size=13.3333px]      @Override 
[size=13.3333px]      public void run() { 
[size=13.3333px]          try { 
[size=13.3333px]              MqttClient sampleClient = new MqttClient(broker, clientId, persistence); 
[size=13.3333px]              MqttConnectOptions connOpts = new MqttConnectOptions(); 
[size=13.3333px]              connOpts.setCleanSession(true); 
[size=13.3333px]              connOpts.setKeepAliveInterval(1); 
[size=13.3333px]              System.out.println("Connecting to broker: " + broker); 
[size=13.3333px]              sampleClient.connect(connOpts); 
[size=13.3333px]              System.out.println("Connected"); 
[size=13.3333px]              System.out.println("Publishing message: " + message.toString()); 
[size=13.3333px]              message.setQos(qos); 
[size=13.3333px]              sampleClient.publish(topic, message); 
[size=13.3333px]              System.out.println("Message published"); 
[size=13.3333px]              sampleClient.disconnect(); 
[size=13.3333px]              System.out.println("Disconnected"); 
[size=13.3333px]          }catch(MqttException me) { 
[size=13.3333px]              System.out.println("reason "+me.getReasonCode()); 
[size=13.3333px]              System.out.println("msg "+me.getMessage()); 
[size=13.3333px]              System.out.println("loc "+me.getLocalizedMessage()); 
[size=13.3333px]              System.out.println("cause "+me.getCause()); 
[size=13.3333px]              System.out.println("excep "+me); 
[size=13.3333px]              me.printStackTrace(); 
[size=13.3333px]          } 
[size=13.3333px]      } 
[size=13.3333px]  } 
 
 
2.2 实现消息接收 
[size=13.3333px]接收消息的代码如下所示: 
[size=13.3333px]/** 
[size=13.3333px]  * receive message 
[size=13.3333px]  */ 
[size=13.3333px]class SubscribeThread extends Thread{ 
[size=13.3333px]     final String topic; 
[size=13.3333px]     MemoryPersistence persistence = new MemoryPersistence(); 
[size=13.3333px]     SubscribeThread(String topic){ 
[size=13.3333px]         this.topic = topic; 
[size=13.3333px]     } 
[size=13.3333px]     @Override 
[size=13.3333px]     public void run(){ 
[size=13.3333px]         try { 
[size=13.3333px]             final MqttClient sampleClient = new MqttClient(broker, clientId, persistence); 
[size=13.3333px]             final MqttConnectOptions connOpts = new MqttConnectOptions(); 
[size=13.3333px]             connOpts.setCleanSession(true); 
[size=13.3333px]             System.out.println("Connecting to broker: " + broker); 
[size=13.3333px]             connOpts.setKeepAliveInterval(5); 
[size=13.3333px]             sampleClient.setCallback(new MqttCallback() { 
[size=13.3333px]                 @Override 
[size=13.3333px]                 public void connectionLost(Throwable throwable) { 
[size=13.3333px]                     System.out.println("connectionLost"); 
[size=13.3333px]                     try { 
[size=13.3333px]                         sampleClient.connect(connOpts); 
[size=13.3333px]                         sampleClient.subscribe(topic); 
[size=13.3333px]                     }catch (MqttException e){ 
[size=13.3333px]                         e.printStackTrace(); 
[size=13.3333px]                     } 
[size=13.3333px]                 } 
 
[size=13.3333px]                 @Override 
[size=13.3333px]                 public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { 
[size=13.3333px]                     System.out.println("messageArrived:"+mqttMessage.toString()); 
[size=13.3333px]                     System.out.println(topic); 
[size=13.3333px]                     System.out.println(mqttMessage.toString()); 
[size=13.3333px]                     try { 
[size=13.3333px]                         JSONTokener jsonParser = new JSONTokener(mqttMessage.toString()); 
[size=13.3333px]                         JSONObject person = (JSONObject) jsonParser.nextValue(); 
[size=13.3333px]                         temperature = person.getDouble("temperature"); 
[size=13.3333px]                         humidity = person.getDouble("humidity"); 
[size=13.3333px]                         light = person.getDouble("light"); 
[size=13.3333px]                         pressure = person.getDouble("pressure"); 
[size=13.3333px]                         System.out.println("temperature = " + temperature); 
[size=13.3333px]                         System.out.println("humidity = " + humidity); 
[size=13.3333px]                         runOnUiThread(new Runnable() { 
[size=13.3333px]                             @Override 
[size=13.3333px]                             public void run() { 
[size=13.3333px]                                 temperatureTextView.setText(String.format("%.1f", temperature)); 
[size=13.3333px]                                 humidityTextView.setText(String.format("%.1f", humidity)); 
[size=13.3333px]                                 lightTextView.setText(String.format("%.1f", light)); 
[size=13.3333px]                                 pressureTextView.setText(String.format("%.1f", pressure)); 
[size=13.3333px]                             } 
[size=13.3333px]                         }); 
[size=13.3333px]                     } catch (JSONException ex) { 
[size=13.3333px]                         ex.printStackTrace(); 
[size=13.3333px]                     } 
[size=13.3333px]                 } 
 
[size=13.3333px]                 @Override 
[size=13.3333px]                 public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 
[size=13.3333px]                     System.out.println("deliveryComplete"); 
[size=13.3333px]                 } 
[size=13.3333px]             }); 
[size=13.3333px]             sampleClient.connect(connOpts); 
[size=13.3333px]             sampleClient.subscribe(topic); 
[size=13.3333px]         } catch(MqttException me) { 
[size=13.3333px]             System.out.println("reason "+me.getReasonCode()); 
[size=13.3333px]             System.out.println("msg "+me.getMessage()); 
[size=13.3333px]             System.out.println("loc "+me.getLocalizedMessage()); 
[size=13.3333px]             System.out.println("cause "+me.getCause()); 
[size=13.3333px]             System.out.println("excep "+me); 
[size=13.3333px]             me.printStackTrace(); 
[size=13.3333px]         } 
[size=13.3333px]     } 
[size=13.3333px]} 
 
 
3 实测效果 
[size=13.3333px]1,单片机端定时更新传感器数据,手机端也会同步更新; 
[size=13.3333px]2,手机端点击4个LED控制的按钮,板子上也会点亮或者熄灭对应的LED; 
 
4 源码下载 
[size=13.3333px]http://www.embed-net.com/thread-230-1-1.html 
 |   
 
 
 
 |