开源地址
https://github.com/jiejieTop/mqttclient
一个高性能、高稳定性的跨平台MQTT客户端
一个高性能、高稳定性的跨平台MQTT客户端,基于socket API之上开发,可以在嵌入式设备(FreeRTOS/LiteOS/RT-Thread/TencentOS tiny)、Linux、Windows、Mac上使用,拥有非常简洁的API接口,以极少的资源实现QOS2的服务质量,并且无缝衔接了mbedtls加密库。
拥有非常明确的分层框架。
整体框架
整体框架
目前已实现了Linux、TencentOS tiny、FreeRTOS、RT-Thread平台(已做成软件包,名字为kawaii-mqtt),除此之外TencentOS tiny的AT框架亦可以使用(RAM消耗不足15K),并且稳定性极好!
欢迎以 GitHub Issues 的形式提交问题和bug报告
mqttclient 遵循 Apache License v2.0 开源协议。鼓励代码共享和尊重原作者的著作权,可以自由的使用、修改源代码,也可以将修改后的代码作为开源或闭源软件发布,但必须保留原作者版权声明。
sudo apt-get install cmake
在mqttclient/test/test.c文件中修改以下内容:
init_params.connect_params.network_params.network_ssl_params.ca_crt = test_ca_get(); /* CA证书 */ init_params.connect_params.network_params.addr = "xxxxxxx"; /* 服务器域名 */ init_params.connect_params.network_params.port = "8883"; /* 服务器端口号 */ init_params.connect_params.user_name = "xxxxxxx"; /* 用户名 */ init_params.connect_params.password = "xxxxxxx"; /* 密码 */ init_params.connect_params.client_id = "xxxxxxx"; /* 客户端id */
默认打开mbedtls。
salof 全称是:Synchronous Asynchronous Log Output Framework(同步异步日志输出框架),它是一个同步异步日志输出框架,在空闲时候输出对应的日志信息,并且该库与mqttclient无缝衔接。
配置对应的日志输出级别:
#define BASE_LEVEL (0)#define ASSERT_LEVEL (BASE_LEVEL + 1) /* 日志输出级别:断言级别(非常高优先级) */#define ERR_LEVEL (ASSERT_LEVEL + 1) /* 日志输出级别:错误级别(高优先级) */#define WARN_LEVEL (ERR_LEVEL + 1) /* 日志输出级别:警告级别(中优先级) */#define INFO_LEVEL (WARN_LEVEL + 1) /* 日志输出级别:信息级别(低优先级) */#define DEBUG_LEVEL (INFO_LEVEL + 1) /* 日志输出级别:调试级别(更低优先级) */#define LOG_LEVEL WARN_LEVEL /* 日志输出级别 */
日志其他选项:
配置mqtt等待应答列表的最大值,对于qos1 qos2服务质量有要求的可以将其设置大一点,当然也必须资源跟得上,它主要是保证qos1 qos2的mqtt报文能准确到达服务器。
#define MQTT_ACK_HANDLER_NUM_MAX 64
选择MQTT协议的版本,默认为4,表示使用MQTT 3.1.1版本,而3则表示为MQTT 3.1版本。
#define MQTT_VERSION 4 // 4 is mqtt 3.1.1
设置默认的保活时间,它主要是保证MQTT客户端与服务器的保持活性连接,单位为 秒 ,比如MQTT客户端与服务器100S没有发送数据了,有没有接收到数据,此时MQTT客户端会发送一个ping包,确认一下这个会话是否存在,如果收到服务器的应答,那么说明这个会话还是存在的,可以随时收发数据,而如果不存在了,就清除会话。
#define MQTT_KEEP_ALIVE_INTERVAL 100 // unit: second
默认的命令超时,它主要是用于socket读写超时,在MQTT初始化时可以指定:
#define MQTT_DEFAULT_CMD_TIMEOUT 4000
默认主题的长度,主题是支持通配符的,如果主题太长则会被截断:
#define MQTT_TOPIC_LEN_MAX 64
默认的算法数据缓冲区的大小,如果要发送大量数据则修改大一些,在MQTT初始化时可以指定:
#define MQTT_DEFAULT_BUF_SIZE 1024
线程相关的配置,如线程栈,线程优先级,线程时间片等:在linux环境下可以是不需要理会这些参数的,而在RTOS平台则需要配置,如果不使用mbedtls,线程栈2048字节已足够,而使用mbedtls加密后,需要配置4096字节以上。
#define MQTT_THREAD_STACK_SIZE 2048 // 线程栈#define MQTT_THREAD_PRIO 5 // 线程优先级#define MQTT_THREAD_TICK 50 // 线程时间片
默认的重连时间间隔,当发生掉线时,会以这个时间间隔尝试重连:
#define MQTT_RECONNECT_DEFAULT_DURATION 1000
其他不需要怎么配置的东西:
#define MQTT_MAX_PACKET_ID (0xFFFF - 1) // mqtt报文id#define MQTT_MAX_CMD_TIMEOUT 20000 //最大的命令超时参数#define MQTT_MIN_CMD_TIMEOUT 1000 //最小的命令超时参数
ps:以上参数基本不需要怎么配置的,直接用即可~
./build.sh
运行build.sh脚本后会在 ./build/bin/目录下生成可执行文件mqtt-client,直接运行即可。
./make-libmqttclient.sh
运行make-libmqttclient.sh脚本后会在 ./libmqttclient/lib目录下生成一个动态库文件libmqttclient.so,并安装到系统的/usr/lib目录下,相关头文件已经拷贝到./libmqttclient/include目录下,编译应用程序的时候只需要链接动态库即可-lmqttclient,动态库的配置文件根据./test/mqtt_config.h配置的。
mqttclient拥有非常简洁的api接口
int mqtt_keep_alive(mqtt_client_t* c);int mqtt_init(mqtt_client_t* c, client_init_params_t* init);int mqtt_release(mqtt_client_t* c);int mqtt_connect(mqtt_client_t* c);int mqtt_disconnect(mqtt_client_t* c);int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t msg_handler);int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter);int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg);int mqtt_list_subscribe_topic(mqtt_client_t* c);int mqtt_set_interceptor_handler(mqtt_client_t* c, interceptor_handler_t handler);
mqtt_client_t 结构
typedef struct mqtt_client { unsigned short packet_id; unsigned char ping_outstanding; unsigned char ack_handler_number; unsigned char *read_buf; unsigned char *write_buf; unsigned int cmd_timeout; unsigned int read_buf_size; unsigned int write_buf_size; unsigned int reconnect_try_duration; void *reconnect_date; reconnect_handler_t reconnect_handler; client_state_t client_state; platform_mutex_t write_lock; platform_mutex_t global_lock; mqtt_list_t msg_handler_list; mqtt_list_t ack_handler_list; network_t *network; platform_thread_t *thread; platform_timer_t reconnect_timer; platform_timer_t last_sent; platform_timer_t last_received; connect_params_t *connect_params; interceptor_handler_t interceptor_handler;} mqtt_client_t;
该结构主要维护以下内容:
以下是整个框架的实现方式,方便大家更容易理解mqttclient的代码与设计思想,让大家能够修改源码与使用,还可以提交pr或者issues,开源的世界期待各位大神的参与,感谢!
除此之外以下代码的记录机制与其超时处理机制是非常好的编程思想,大家有兴趣一定要看源代码!
int mqtt_init(mqtt_client_t* c, client_init_params_t* init)
主要是配置mqtt_client_t结构的相关信息,如果没有指定初始化参数,则系统会提供默认的参数。但连接部分的参数则必须指定:
init_params.connect_params.network_params.addr = "[你的mqtt服务器IP地址或者是域名]"; init_params.connect_params.network_params.port = 1883; //端口号 init_params.connect_params.user_name = "jiejietop"; init_params.connect_params.password = "123456"; init_params.connect_params.client_id = "clientid"; mqtt_init(&client, &init_params);
int mqtt_connect(mqtt_client_t* c);
参数只有 mqtt_client_t 类型的指针,字符串类型的主题(支持通配符"#" "+"),主题的服务质量,以及收到报文的处理函数,如不指定则有默认处理函数。连接服务器则是使用非异步的方式设计,因为必须等待连接上服务器才能进行下一步操作。
过程如下:
c->network->connect(c->network);
MQTTSerialize_connect(c->write_buf, c->write_buf_size, &connect_data)mqtt_send_packet(c, len, &connect_timer)
mqtt_wait_packet(c, CONNACK, &connect_timer)
platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK)if (NULL != c->thread) { mqtt_set_client_state(c, CLIENT_STATE_CONNECTED); platform_thread_startup(c->thread); platform_thread_start(c->thread); /* start run mqtt thread */}
mqtt_set_client_state(c, CLIENT_STATE_CONNECTED);
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)
订阅报文使用异步设计来实现的:过程如下:
MQTTSerialize_subscribe(c->write_buf, c->write_buf_size, 0, mqtt_get_next_packet_id(c), 1, &topic, (int*)&qos)mqtt_send_packet(c, len, &timer)
mqtt_msg_handler_create(topic_filter, qos, handler)
mqtt_ack_list_record(c, SUBACK, mqtt_get_next_packet_id(c), len, msg_handler)
与订阅报文的逻辑基本差不多的~
MQTTSerialize_unsubscribe(c->write_buf, c->write_buf_size, 0, packet_id, 1, &topic)mqtt_send_packet(c, len, &timer)
mqtt_msg_handler_create((const char*)topic_filter, QOS0, NULL)
mqtt_ack_list_record(c, UNSUBACK, packet_id, len, msg_handler)
int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg)
参数只有 mqtt_client_t 类型的指针,字符串类型的主题(支持通配符),要发布的消息(包括服务质量、消息主体)。
mqtt_message_t msg; msg.qos = 2; msg.payload = (void *) buf; mqtt_publish(&client, "testtopic1", &msg);
核心思想都差不多,过程如下:
MQTTSerialize_publish(c->write_buf, c->write_buf_size, 0, msg->qos, msg->retained, msg->id, topic, (unsigned char*)msg->payload, msg->payloadlen);mqtt_send_packet(c, len, &timer)
if (QOS1 == msg->qos) { rc = mqtt_ack_list_record(c, PUBACK, mqtt_get_next_packet_id(c), len, NULL); } else if (QOS2 == msg->qos) { rc = mqtt_ack_list_record(c, PUBREC, mqtt_get_next_packet_id(c), len, NULL); }
mqtt_set_publish_dup(c,1); /* may resend this data, set the udp flag in advance */
static void mqtt_yield_thread(void *arg)
主要是对mqtt_yield函数的返回值做处理,比如在disconnect的时候销毁这个线程。
static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
对不同的包使用不一样的处理:
switch (packet_type) { case 0: /* timed out reading packet */ break; case CONNACK: break; case PUBACK: case PUBCOMP: rc = mqtt_puback_and_pubcomp_packet_handle(c, timer); break; case SUBACK: rc = mqtt_suback_packet_handle(c, timer); break; case UNSUBACK: rc = mqtt_unsuback_packet_handle(c, timer); break; case PUBLISH: rc = mqtt_publish_packet_handle(c, timer); break; case PUBREC: case PUBREL: rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer); break; case PINGRESP: c->ping_outstanding = 0; break; default: goto exit; }
并且做保活的处理:
mqtt_keep_alive(c)
当发生超时后
if (platform_timer_is_expired(&c->last_sent) || platform_timer_is_expired(&c->last_received))
序列号一个心跳包并且发送给服务器
MQTTSerialize_pingreq(c->write_buf, c->write_buf_size);mqtt_send_packet(c, len, &timer);
当再次发生超时后,表示与服务器的连接已断开,需要重连的操作,设置客户端状态为断开连接
mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
mqtt_ack_list_scan(c);
当超时后就销毁ack链表节点:
mqtt_ack_handler_destroy(ack_handler);
当然下面这几种报文则需要重发操作:(PUBACK 、PUBREC、 PUBREL 、PUBCOMP,保证QOS1 QOS2的服务质量)
if ((ack_handler->type == PUBACK) || (ack_handler->type == PUBREC) || (ack_handler->type == PUBREL) || (ack_handler->type == PUBCOMP)) mqtt_ack_handler_resend(c, ack_handler);
mqtt_try_reconnect(c);
重连成功后尝试重新订阅报文,保证恢复原始状态~
mqtt_try_resubscribe(c)
static int mqtt_puback_and_pubcomp_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)
mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);
static int mqtt_suback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
MQTTDeserialize_suback(&packet_id, 1, &count, (int*)&granted_qos, c->read_buf, c->read_buf_size)
mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);
mqtt_msg_handlers_install(c, msg_handler);
static int mqtt_unsuback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
MQTTDeserialize_unsuback(&packet_id, c->read_buf, c->read_buf_size)
mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)
mqtt_msg_handler_destory(msg_handler);
static int mqtt_publish_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
MQTTDeserialize_publish(&msg.dup, &qos, &msg.retained, &msg.id, &topic_name, (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->read_buf, c->read_buf_size)
mqtt_deliver_message(c, &topic_name, &msg);
MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBACK, 0, msg.id);
MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBREC, 0, msg.id);mqtt_ack_list_record(c, PUBREL, msg.id + 1, len, NULL)mqtt_deliver_message(c, &topic_name, &msg);
说明:一旦注册到ack列表上的报文,当具有重复的报文是不会重新被注册的,它会通过mqtt_ack_list_node_is_exist函数判断这个节点是否存在,主要是依赖等待响应的消息类型与msgid。
static int mqtt_pubrec_and_pubrel_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)
mqtt_publish_ack_packet(c, packet_id, packet_type);
mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)
https://github.com/jiejieTop/mqttclient