基于mqtt的消息推送(三)客户端实现

MQTT简介

mqtt基于订阅者模型架构,客户端如果互相通信,必须在同一订阅主题下,即都订阅了同一个topic,客户端之间是没办法直接通讯的。订阅模型显而易见的好处是群发消息的话只需要发布到topic,所有订阅了这个topic的客户端就可以接收到消息了。

发送消息必须发送到某个topic,重点说明的是不管客户端是否订阅了该topic都可以向topic发送了消息,还有如果客户端订阅了该主题,那么自己发送的消息也会接收到。

另外需要重点说明的是QoS,这个代表消息的传输方式,QoS说明如下:

  • 0代表“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
  • 1代表“至少一次”,确保消息到达,但消息重复可能会发生。
  • 2代表“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。 备注:由于服务端采用Mosca实现,Mosca目前只支持到QoS 1

简单说明下,如果发送的是临时的消息,例如给某topic所有在线的设备发送一条消息,丢失的话也无所谓,0就可以了(客户端登录的时候要指明支持的QoS级别,同时发送消息的时候也要指明这条消息支持的QoS级别),如果需要客户端保证能接收消息,需要指定QoS为1,如果同时需要加入客户端不在线也要能接收到消息,那么客户端登录的时候要指定session的有效性,接收离线消息需要指定服务端要保留客户端的session状态。

基于MQTT-Client-Framework的IOS客户端实现

现有客户端sdk分析,基本分为两大类:一类移植自C类库,如Mosquitto,一类是用objc或者swift原生实现。
各种sdk对比如下,我选用的是MQTT-Client,使用swift的同学可以使用CocoaMQTT,这个sdk的作者同时也是服务端实现emqtt的作者。

Name Type Programming Language Code
Paho Original C Open-Source. Eclipse project
IBM Original C Close Source. IBM SDK
Mosquitto Original C Open-Source. Eclipse project
MQTTKit Wrapper (Mosquitto) Objective-C Open-Source. Github
Marquette Wrapper (Mosquitto) Objective-C Open-Source. Github
Moscapsule Wrapper (Mosquitto) Swift Open-Source. Github
Musqueteer Wrapper (Mosquitto) Objective-C
MQTT-Client Native Objective-C Open-Source. Github
MQTTSDK Native Objective-C
CocoaMQTT Native Swift Open-Source. Github

可能的问题如下所述,苹果禁止第三方网络库使用移动网络。

If you are trying to use MQTT for an iOS application I will highly recommend you to use a native (Objective-C/Swift) iOS library. Using C or wrapper libraries usually means you are using POSIX networking calls at some point. Apple forbids the use of third party networking libraries from using the mobile internet antenna. Thus if you use Paho or something similar, you can only use MQTT when you are connected to a WiFi network.
源引自https://github.com/relayr/apple-mqtt-example

集成步骤

MQTT-Client支持pod,方便快速集成到工程中。简单说下应用场景,app在每部安装的终端上会产生不同的业务数据,而业务数据需要在各终端app上都要看到,所以需要同步所有终端的业务数据。mqtt的消息传输都是通过topic进行的,topic需要创建,按照mqtt实现没有单独的创建topic的方法,topic是和订阅绑定的。也就是说只要订阅了一个topic,服务端首先判断是否有topic存在,如果存在的话把当前客户端加入到订阅列表中,如果不在的话就先创建一个topic,同时把自己添加到订阅列表中。

建立连接

如果app需要在多个页面传输数据建议使用单例模式,建立一个全局的连接,复用连接。因为每次连接到服务端也是很消耗资源的。建立连接的代码如下:

1
MQTTSessionManager *sessionManager = [[MQTTSessionManager alloc] init];
 [sessionManager connectTo:@“192.168.1.4” //服务器地址
                      port:1883  //服务端端口号
                       tls:false //是否使用tls协议,mosca是支持tls的,如果使用了要设置成true
                 keepalive:60    //心跳时间,单位秒,每隔固定时间发送心跳包
                     clean:false //session是否清除,这个需要注意,如果味false,代表保持登录,如果客户端离线了再次登录就可以接收到离线消息
                      auth:true //是否使用登录验证,和下面的user和pass参数组合使用
                      user:_userName //用户名
                      pass:_passwd //密码
                 willTopic:@""  //下面四个参数用来设置如果客户端离线发送给其它客户端消息,当前参数是哪个topic用来传输离线消息,这里的离线消息都指的是客户端掉线后发送的掉线消息
                      will:@"" //自定义的离线消息,约定好格式就可以了
                   willQos:0  //接收离线消息的级别
            willRetainFlag:false 
              withClientId:]; //客户端id,需要特别指出的是这个id需要全局唯一,因为服务端是根据这个来区分不同的客户端的,默认情况下一个id登录后,假如有另外的连接以这个id登录,上一个连接会被踢下线

订阅和发送消息

连接一旦建立以后就可以订阅topic和发送消息了,订阅和发送消息代码如下:

1
//订阅topic
sessionManager.subscriptions = [[NSMutableDictionary alloc] init];
[sessionManager.subscriptions setObject:[NSNumber numberWithInt:1]
                                                                     forKey:_topic];
//发送消息,返回值msgid大于0代表发送成功
UInt16 msgid = [sessionManager sendData:[msg dataUsingEncoding:NSUTF8StringEncoding] //要发送的消息体
                                                                          topic:topic  //要往哪个topic发送消息                                                                        
                                                                            qos:1 //消息级别
                                                                         retain:false];

接收消息

接收消息有委托实现,实现如下委托MQTTSessionManagerDelegate接收消息

1
- (void)handleMessage:(NSData *)data onTopic:(NSString *)topic retained:(BOOL)retained;

根据topic可以区分不同的消息

监控连接状态

注册一个观察者,判断state获取不同的连接状态

1
//注册观察者,记得在离开页面时移除观察者
- (void)viewWillAppear:(BOOL)animated {
    [super viewWillAppear:animated];
    
    [sessionManager addObserver:self
                    forKeyPath:@"state"
                       options:NSKeyValueObservingOptionInitial | NSKeyValueObservingOptionNew
                       context:nil];
}


- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context {
    switch (sessionManager.state) {
        case MQTTSessionManagerStateClosed: //连接已经关闭
            break;
        case MQTTSessionManagerStateClosing: //连接正在关闭
            break;
        case MQTTSessionManagerStateConnected: //已经连接
            break;
        case MQTTSessionManagerStateConnecting: //正在连接中
            break;
        case MQTTSessionManagerStateError: //异常
            break;
        case MQTTSessionManagerStateStarting: //开始连接
        default:
            break;
    }
}

mqtt协议本身支持断线重连,另外单独说明此sdk在app退出到后台后自动断开连接,当回到前台时会自动重新连接

Android客户端和js集成

Android客户端和js可以使用polo项目下的客户端,地址http://www.eclipse.org/paho/
我测试使用的sdk:
Android https://www.eclipse.org/paho/clients/android/
js https://www.eclipse.org/paho/clients/js/

具体集成步骤同IOS,只需要注意IOS上面提到的参数设置就可以了

写在最后

最近北方的天气继续恶化,雾霾严重到不能出门的地步了,心情也非常的低落,无力感越来越强烈,我也没什么心思来写了,准备离开这个地方了,等心静下来再写其它的东西吧。预计要写的内容是树莓派和Docker相关的东西。