在 Linux 下使用 C 语言和 Mosquitto 库实现 MQTT 通信是一种轻量级且高效的方式。Mosquitto 提供了 C/C++ 的客户端库 libmosquitto,可以直接操作 MQTT 协议。
以下是详细的实现步骤
1. 安装 Mosquitto 开发库
在 Ubuntu/Debian 系统中,安装 Mosquitto 客户端和开发库:
sudo apt-get update
sudo apt-get install libmosquitto-dev mosquitto-clients
2. 编写 MQTT 客户端代码
以下是两个示例代码:一个用于发布消息,另一个用于订阅消息。
示例 1:发布消息(Publisher)
#include <stdio.h>
#include <stdlib.h>
#include <mosquitto.h>
#include <string.h>
#define MQTT_HOST "localhost"
#define MQTT_PORT 1883
#define MQTT_TOPIC "test/topic"
#define MQTT_MESSAGE "Hello, MQTT from C!"
#define QOS 1 // 服务质量等级
int main() {
struct mosquitto *mosq = NULL;
int rc;
// 初始化 Mosquitto 库
mosquitto_lib_init();
// 创建客户端实例
mosq = mosquitto_new(NULL, true, NULL);
if (!mosq) {
fprintf(stderr, "Error: 创建客户端失败\n");
return -1;
}
// 连接到 MQTT 代理
rc = mosquitto_connect(mosq, MQTT_HOST, MQTT_PORT, 60);
if (rc != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "连接失败: %s\n", mosquitto_strerror(rc));
mosquitto_destroy(mosq);
return -1;
}
// 发布消息
rc = mosquitto_publish(mosq, NULL, MQTT_TOPIC, strlen(MQTT_MESSAGE), MQTT_MESSAGE, QOS, false);
if (rc != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "发布失败: %s\n", mosquitto_strerror(rc));
} else {
printf("消息已发布到主题: %s\n", MQTT_TOPIC);
}
// 断开连接并清理资源
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
}
示例 2:订阅消息(Subscriber)
#include <stdio.h>
#include <stdlib.h>
#include <mosquitto.h>
#include <string.h>
#define MQTT_HOST "localhost"
#define MQTT_PORT 1883
#define MQTT_TOPIC "test/topic"
#define QOS 1
// 收到消息的回调函数
void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg) {
if (msg->payloadlen) {
printf("收到消息 [%s]: %s\n", msg->topic, (char *)msg->payload);
} else {
printf("收到空消息\n");
}
}
int main() {
struct mosquitto *mosq = NULL;
int rc;
mosquitto_lib_init();
mosq = mosquitto_new(NULL, true, NULL);
if (!mosq) {
fprintf(stderr, "Error: 创建客户端失败\n");
return -1;
}
// 设置消息回调函数
mosquitto_message_callback_set(mosq, on_message);
rc = mosquitto_connect(mosq, MQTT_HOST, MQTT_PORT, 60);
if (rc != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "连接失败: %s\n", mosquitto_strerror(rc));
mosquitto_destroy(mosq);
return -1;
}
// 订阅主题
rc = mosquitto_subscribe(mosq, NULL, MQTT_TOPIC, QOS);
if (rc != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "订阅失败: %s\n", mosquitto_strerror(rc));
mosquitto_destroy(mosq);
return -1;
}
printf("等待消息...\n");
// 进入事件循环(阻塞式)
mosquitto_loop_forever(mosq, -1, 1);
// 清理资源(实际不会执行到这里,需手动终止程序)
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
}
3. 编译代码
使用以下命令编译代码(需链接 libmosquitto):
编译发布者
gcc publisher.c -o publisher -lmosquitto
编译订阅者
gcc subscriber.c -o subscriber -lmosquitto
4. 运行程序
步骤 1:启动 Mosquitto 代理
确保 Mosquitto 代理正在运行:
mosquitto -v
步骤 2:运行订阅者
./subscriber
步骤 3:运行发布者
./publisher
5. 输出结果
订阅者输出:
复制
等待消息…
收到消息 [test/topic]: Hello, MQTT from C!
发布者输出:
复制
消息已发布到主题: test/topic
6.关键代码解释
初始化与清理:
mosquitto_lib_init():初始化 Mosquitto 库。
mosquitto_lib_cleanup():清理资源。
客户端对象:
mosquitto_new():创建客户端实例。
mosquitto_connect():连接到 MQTT 代理。
发布消息:
mosquitto_publish():发布消息到指定主题。
订阅消息:
mosquitto_subscribe():订阅主题。
mosquitto_message_callback_set():设置消息到达时的回调函数。
事件循环:
mosquitto_loop_forever():进入阻塞式事件循环,等待消息。
7. 扩展功能
添加认证:
mosquitto_username_pw_set(mosq, “username”, “password”);
使用 TLS 加密:
mosquitto_tls_set(mosq, “ca.crt”, NULL, “client.crt”, “client.key”, NULL);
保留消息(Retained Message):
mosquitto_publish(mosq, NULL, MQTT_TOPIC, strlen(MQTT_MESSAGE), MQTT_MESSAGE, QOS, true);
8.参考文档
Mosquitto C 客户端库文档
MQTT 协议规范
通过上述代码,你可以快速实现基于 C 语言的 MQTT 通信。如果需要异步操作,可以结合多线程或 mosquitto_loop_start() 实现非阻塞行为。
发表回复