MQTT简介

  categories:mq  tags:,   author:

1. MQTT简介

MQTT(Message Queue Telemetry Transport),遥测传输协议,提供订阅/发布模式,更为简约、轻量,易于使用,针对受限环境(带宽低、网络延迟高、网络通信不稳定),可以简单概括为物联网打造,官方总结特点如下:

1.使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2. 对负载内容屏蔽的消息传输。
3. 使用 TCP/IP 提供网络连接。
4. 有三种消息发布服务质量:
    “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    “至少一次”,确保消息到达,但消息重复可能会发生。
    “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5. 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
6. 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品 和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

早在1999年,IBM的Andy Stanford-Clark博士以及Arcom公司ArlenNipper博士发明了MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)技术[1] 。据Andy Stanford-Clark博士称,MQTT将在今年和明年呈现爆炸式增长。
MQTT的话题是我俩谈论开源物联网平台Pachube时提到的。Stanford-Clark认为Pachube很酷,其不足之处是不具备真正的推送功能。你需要不断不断轮询才能得到即时数据。这正是MQTT能够实现的,他提到了使用推送通信系统的石油管道检测系统。

2. MQTT应用

IBM 和St. Jude医疗中心通过MQTT开发了一套Merlin系统,该系统使用了用于家庭保健的传感器。St. Jude医疗中心设计了一个叫做Merlin@home的心脏装置,这种无线发射器可以用来监控那些已经植入复律-除颤器和起搏器(两者都是基本的传感 器)的心脏病人。
该产品利用MQTT把病人的即时更新信息传给医生/医院,然后医院进行保存。这样的话,病人就不用亲自去医院检查心脏仪器了,医生可以随时查看病人的数据,给出建议,病人在家里就可以自行检查。
IBM称该发射器包括一个大型触摸屏,一个嵌入式键盘平台,以及一个Linux操作系统。
在未来几年,MQTT的应用会越来越广,值得关注。
通过MQTT协议,目前已经扩展出了数十个MQTT服务器端程序,可以通过PHP,JAVA,Python,C,C#等系统语言来向MQTT发送相关消息。
此外,国内很多企业都广泛使用MQTT作为Android手机客户端与服务器端推送消息的协议。其中Sohu,Cmstop手机客户端中均有使用到MQTT作为消息推送消息。据Cmstop主要负责消息推送的高级研发工程师李文凯称,随着移动互联网的发展,MQTT由于开放源代码,耗电量小等特点,将会在移动消息推送领域会有更多的贡献,在物联网领域,传感器与服务器的通信,信息的收集,MQTT都可以作为考虑的方案之一。在未来MQTT会进入到我们生活的各各方面。
如果需要下载MQTT服务器端,可以直接去MQTT官方网站点击software进行下载MQTT协议衍生出来的各个不同版本。

3. MQTT协议头部信息

前言

MQTT 3.1协议在线版本: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html

官方下载地址: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/MQTT_V3.1_Protocol_Specific.pdf PDF版本,42页,不算多。

另外,目前MQTT大家都用在了手机推送,可能还有很多的使用方式,有待进一步的探索。

协议方面,以前曾简单实现过一点HTTP协议,基于HTTP上构建若干种通信管道的socket.io协议,不过socket.io 0.9版本的协议才两三页而已。面对领域不同,自然解决的方式也不一样。

阅读完毕MQTT协议,有一个想法,其实可以基于MQTT协议,打造更加私有、精简(协议一些地方,略显多余)的传输协议,比如一个字节的传输开销。有时间,会详细说一下。

固定头部

固定头部,使用两个字节,共16位:

bit76543210
byte 1Message TypeDUP flagQoS levelRETAIN
byte 2Remaining Length

第一个字节(byte 1)

消息类型(4-7),使用4位二进制表示,可代表16种消息类型:

MnemonicEnumerationDescription
Reserved0Reserved
CONNECT1Client request to connect to Server
CONNACK2Connect Acknowledgment
PUBLISH3Publish message
PUBACK4Publish Acknowledgment
PUBREC5Publish Received (assured delivery part 1)
PUBREL6Publish Release (assured delivery part 2)
PUBCOMP7Publish Complete (assured delivery part 3)
SUBSCRIBE8Client Subscribe request
SUBACK9Subscribe Acknowledgment
UNSUBSCRIBE10Client Unsubscribe request
UNSUBACK11Unsubscribe Acknowledgment
PINGREQ12PING Request
PINGRESP13PING Response
DISCONNECT14Client is Disconnecting
Reserved15Reserved

除去0和15位置属于保留待用,共14种消息事件类型。

DUP flag(打开标志)

保证消息可靠传输,默认为0,只占用一个字节,表示第一次发送。不能用于检测消息重复发送等。只适用于客户端或服务器端尝试重发PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要满足以下条件:

 当QoS > 0
 消息需要回复确认

此时,在可变头部需要包含消息ID。当值为1时,表示当前消息先前已经被传送过。

QoS(Quality of Service,服务质量)

使用两个二进制表示PUBLISH类型消息:

QoS valuebit 2bit 1Description
000至多一次发完即丢弃<=1
101至少一次需要确认回复>=1
210只有一次需要确认回复=1
311待用,保留位置

RETAIN(保持)

仅针对PUBLISH消息。不同值,不同含义: 1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。

备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。 0:仅仅为当前订阅者推送此消息。

假如服务器收到一个空消息体(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服务器可以删除掉对应的已被持久化的PUBLISH消息。

如何解析

因为java使用有符号(最高位为符号位)数据表示,byte范围:-128-127。该字节的最高位(左边第一位),可能为1。若直接转换为 byte类型,会出现负数,这是一个雷区。DataInputStream提供了int readUnsignedByte()读取方式,请注意。下面演示了,如何从一个字节中,获取到所有定义的信息,同时绕过雷区:

public static void main(String[] args) {
    byte publishFixHeader = 50;// 0 0 1 1 0 0 1 0

    doGetBit(publishFixHeader);
    int ori = 224;//1110000,DISCONNECT ,Message Type (14)
    byte flag = (byte) ori; //有符号byte       
    doGetBit(flag);
    doGetBit_v2(ori);
}


public static void doGetBit(byte flags) {
    boolean retain = (flags & 1) > 0;
    int qosLevel = (flags & 0x06) >> 1;
    boolean dupFlag = (flags & 8) > 0;
    int messageType = (flags >> 4) & 0x0f;

    System.out.format(
            "Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
            messageType, dupFlag, qosLevel, retain);
}

public static void doGetBit_v2(int flags) {
    boolean retain = (flags & 1) > 0;
    int qosLevel = (flags & 0x06) >> 1;
    boolean dupFlag = (flags & 8) > 0;
    int messageType = flags >> 4;

    System.out.format(
            "Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
            messageType, dupFlag, qosLevel, retain);
}

处理Remaining Length(剩余长度)

在当前消息中剩余的byte(字节)数,包含可变头部和负荷(称之为内容/body,更为合适)。单个字节最大值:01111111,16进 制:0x7F,10进制为127。单个字节为什么不能是11111111(0xFF)呢?因为MQTT协议规定,第八位(最高位)若为1,则表示还有后续 字节存在。同时MQTT协议最多允许4个字节表示剩余长度。那么最大长度为:0xFF,0xFF,0xFF,0x7F,二进制表示 为:11111111,11111111,11111111,01111111,十进制:268435455 byte=261120KB=256MB=0.25GB 四个字节之间值的范围:

DigitsFromTo
10 (0x00)127 (0x7F)
2128 (0x80, 0x01)16 383 (0xFF, 0x7F)
316 384 (0x80, 0x80, 0x01)2 097 151 (0xFF, 0xFF, 0x7F)
42 097 152 (0x80, 0x80, 0x80, 0x01)268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

如何换算成十进制呢 ? 使用java语言表示如下:

public static void main(String[] args) throws IOException {
    // 模拟客户端写入
   ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
   DataOutputStream dataOutputStream = new DataOutputStream(arrayOutputStream);
   dataOutputStream.write(0xff);
   dataOutputStream.write(0xff);
   dataOutputStream.write(0xff);
   dataOutputStream.write(0x7f);

   InputStream arrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray());

    // 模拟服务器/客户端解析
   System. out.println( "result is " + bytes2Length(arrayInputStream));
}

/**
* 转化字节为 int类型长度
* @param in
* @return
* @throws IOException
*/
private static int bytes2Length(InputStream in) throws IOException {
    int multiplier = 1;
    int length = 0;
    int digit = 0;
    do {
        digit = in.read(); //一个字节的有符号或者无符号,转换转换为四个字节有符号 int类型
        length += (digit & 0x7f) * multiplier;
        multiplier *= 128;
   } while ((digit & 0x80) != 0);

    return length;
}

一般最后一个字节小于127(01111111),和0x80(10000000)进行&操作,最终结果都为0,因此计算会终止。代理中间件和请求者,中间传递的是字节流Stream,自然要从流中读取,逐一解析出来。

那么如何将int类型长度解析为不确定的字节值呢?

public static void main(String[] args) throws IOException {
    // 模拟服务器/客户端写入
   ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
   DataOutputStream dataOutputStream = new DataOutputStream(
             arrayOutputStream);

    // 模拟服务器/客户端解析
    length2Bytes(dataOutputStream, 128);
}

/**
* int类型长度解析为1-4个字节
* @param out
* @param length
* @throws IOException
*/
private static void length2Bytes(OutputStream out, int length)
         throws IOException {
    int val = length;
    do {
         int digit = val % 128;
        val = val / 128;
         if (val > 0)
             digit = digit | 0x80;

        out.write(digit);
   } while (val > 0);
}

digit对val求模,最大值可能是127,一旦127 | 10000000 = 11111111 = 0xff = 255 请注意:剩余长度,只在固定头部中,无论是一个字节,还是四个字节,不能被算作可变头部中。

可变头部

固定头部仅定义了消息类型和一些标志位,一些消息的元数据,需要放入可变头部中。可变头部内容字节长度 + Playload/负荷字节长度 = 剩余长度,这个是需要牢记的。可变头部,包含了协议名称,版本号,连接标志,用户授权,心跳时间等内容,这部分和后面要讲到的CONNECT消息类型,有 重复,暂时略过。

Playload/消息体/负荷

消息体主要是为配合固定/可变头部命令(比如CONNECT可变头部User name标记若为1则需要在消息体中附加用户名称字符串)而存在。

CONNECT/SUBSCRIBE/SUBACK/PUBLISH等消息有消息体。PUBLISH的消息体以二进制形式对待。

请记住,MQTT协议只允许在PUBLISH类型消息体中使用自定义特性,在固定/可变头部想加入自定义私有特性,就免了吧。这也是为了协议免于流 于形式,变得很分裂也为了兼顾现有客户端等。比如支持压缩等,那就可以在Playload中定义数据支持,在应用中进行读取处理。

这部分会在后面详细论述。

消息标识符/消息ID

固定头中的QoS level标志值为1或2时才会在:PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK等消息的可变头中出现。

一个16位无符号位的short类型值(值不能为 0,0做保留作为无效的消息ID),仅仅要求在一个特定方向(服务器发往客户端为一个方向,客户端发送到服务器端为另一个方向)的通信消息中必须唯一。比 如客户端发往服务器,有可能存在服务器发往客户端会同时存在重复,但不碍事。

可变头部中,需要两个字节的顺序是MSB(Most Significant Bit) LSB(Last/Least Significant Bit),翻译成中文就是,最高有效位,最低有效位。最高有效位在最低有效位左边/上面,表示这是一个大端字节/网络字节序,符合人的阅读习惯,高位在最 左边。

bit76543210
Message Identifier MSB
Message Identifier LSB

但凡如此表示的,都可以视为一个16位无符号short类型整数,两个字节表示。在JAVA中处理比较简单:

DataInputStream.readUnsignedShort

或者

in.read() * 0xFF + in.read();

最大长度可为: 65535

UTF-8编码

有关字符串,MQTT采用的是修改版的UTF-8编码,一般形式为如下,需要牢记:

bit76543210
byte 1String Length MSB
byte 2String Length LSB
bytes 3 …Encoded Character Data

比如AVA,使用writeUTF()方法写入一串文字“OTWP”,头两个字节为一个完整的无符号数字,代表字符串字节长度,后面四个字节才是字符串真正的长度,共六个字节:

bit76543210
byte 1Message Length MSB (0x00)
00000000
byte 2Message Length LSB (0x04)
00000100
byte 3‘O’ (0x4F)
01001111
byte 4‘T’ (0x54)
01010100
byte 5‘W’ (0x57)
01010111
byte 6‘P’ (0x50)
01010000

这点,在程序中,可不用单独处理默认,直接使用readUTF()方法,可自动省去了处理字符串长度的麻烦。当然,可以手动读取字符串:

// 模拟写入
dataOutputStream.writeUTF( "abcd");// 2 + 4 = 6 byte
......
// 模拟读取 
int decodedLength = dataInputStream.readUnsignedShort();//2 byte
byte[] decodedString = new byte[decodedLength]; // 4 bytes
dataInputStream.read(decodedString);
String target = new String(decodedString, "UTF-8");

等同于:

String target = dataInputStream.readUTF();

MQTT无论是可变头部还是消息体中,只要是字符串部分,都是采用了修改版的UTF-8编码,读取和写入,借助DataInputStream/DataOutputStream的帮助,一行语句,略去了手动处理的麻烦。

总之,掌握固定头部的QoS level、RETAIN标记、可变头部的Connect flags作用和意义,对总体理解MQTT作用很大。

 

4. 采用MQTT协议实现Android消息推送

1.官方的C2DM,但是只支持android2.2及以上平台的,而且使用的google的服务器。
对于google服务器的问题,网友应该都清楚

2.第三方的androidpn,和C2DM一样,都是基于XMPP扩展的,是一个开源的项目,据说不错。

http://sourceforge.net/projects/androidpn/

但是是基于长连接的,如果客户端数量大,特别像手机这种都是长期在线的设备,
会有两个问题,(1)服务器压力,(2)手机的电池不够用啊,电量卡卡卡的被你耗光了(需要优化网络机制)。

3.使用IBM 的MQTT协议实现push消息
地址:http://tokudu.com/2010/how-to-implement-push-notifications-for-android/
这是一个非常理想的解决方案,是基于tcp协议的,低带宽通信,而且国外友人已经测试,耗电量很多哦~

都是E文的,不习惯看E文的,也没关系,有一前辈给整理了一个中文的:
《Android推送通知指南》http://blog.csdn.net/joshua_yu/article/details/6563587
看了上面这些文章的内容,完成上面的例子,
然后看看源码,应该明白一些了。

MQTT是一项消息传递技术, 机制就是使用一个代理服务器message broker,
客户端client连接上这个服务器,然后告诉服务器说,我可以接收哪些类型的消息,
同时,client也可以发布自己的消息,这些消息根据协议的内容,可以被其他client获取。

只要手机客户端,连上服务器,然后就可以接收和发布消息了,不用自己写socket什么了,

低带宽,低耗电量,代码量也少,很简单吧。

package com.pig.test.mqtt;

import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttSimpleCallback;

public class SubscribeClient {
private final static String CONNECTION_STRING = “tcp://192.168.1.60:1883″;
private final static boolean CLEAN_START = true;
private final static short KEEP_ALIVE = 30;//低耗网络,但是又需要及时获取数据,心跳30s
private final static String CLIENT_ID = “client1″;
private final static String[] TOPICS = {
“Test/TestTopics/Topic1″,
“Test/TestTopics/Topic2″,
“Test/TestTopics/Topic3″,
“tokudu/client1″
};
private final static int[] QOS_VALUES = {0, 0, 2, 0};

//////////////////
private MqttClient mqttClient = null;

public SubscribeClient(String i){
try {
mqttClient = new MqttClient(CONNECTION_STRING);
SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();
mqttClient.registerSimpleHandler(simpleCallbackHandler);//注册接收消息方法
mqttClient.connect(CLIENT_ID+i, CLEAN_START, KEEP_ALIVE);
mqttClient.subscribe(TOPICS, QOS_VALUES);//订阅接主题

/**
* 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息
*/

mqttClient.publish(PUBLISH_TOPICS, “keepalive”.getBytes(), QOS_VALUES[0], true);

} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

/**
* 简单回调函数,处理client接收到的主题消息
* @author pig
*
*/
class SimpleCallbackHandler implements MqttSimpleCallback{

/**
* 当客户机和broker意外断开时触发
* 可以再此处理重新订阅
*/
@Override
public void connectionLost() throws Exception {
// TODO Auto-generated method stub
System.out.println( “客户机和broker已经断开”);
}

/**
* 客户端订阅消息后,该方法负责回调接收处理消息
*/
@Override
public void publishArrived(String topicName, byte[] payload, int Qos, boolean retained) throws Exception {
// TODO Auto-generated method stub
System.out.println( “订阅主题: ” + topicName);
System.out.println( “消息数据: ” + new String(payload));
System.out.println( “消息级别(0,1,2): ” + Qos);
System.out.println( “是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): ” + retained);
}

}

/**
* 高级回调
* @author pig
*
*/
class AdvancedCallbackHandler implements MqttSimpleCallback{

@Override
public void connectionLost() throws Exception {
// TODO Auto-generated method stub

}

@Override
public void publishArrived(String arg0, byte[] arg1, int arg2,
boolean arg3) throws Exception {
// TODO Auto-generated method stub

}

}

/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
new SubscribeClient( “” + i);

}

}

broker服务器,MQTT的jar包,记得下载啊,没有就消息我咯~

如果完成IBM 的MQTT协议实现push消息的实例的,都会有个问题,好像没考虑到安全问题,如果客户端连上来作乱怎么办呢?

上面用的broker时rsmb的,mqtt的简单服务器。
IBM已经推出了MQTT V3.1版本,已经加入了安全验证机制,不要怕啦。
据国外网友说,facebook在2011年8月就是用的mqtt v3.1做的应用哦。

来源:http://www.blogjava.net/yongboy/archive/2014/02/07/409587.html



快乐成长 每天进步一点点