|
关注+星标公众号,不错过精彩内容
fosqabbiod064013373207.gif
转自| 嵌入式情报局
做嵌入式开发,很多时候都会用到“消息通信”。今天分享一个嵌入式开发的轻量级消息库:ZeroMQ(简称ZMQ)
作为分布式系统通信的“隐藏王者”,凭借其独特设计理念在物联网、微服务、金融交易等领域广泛应用。本文将从核心特性剖析入手,带你掌握Linux C环境下的ZMQ开发全流程。
一、ZeroMQ是什么?ZeroMQ不是传统意义上的消息队列(如RabbitMQ),而是一个嵌入式网络通信库。它通过TCP/IPC等传输协议,提供类似Socket的API接口,却实现了异步消息传递、灵活路由、负载均衡等高级特性,被誉为“网络通信的瑞士军刀”。
二、ZeroMQ的七大核心特性异步非阻塞IO
基于事件驱动的消息处理,单线程即可处理数万并发连接。
多模式通信
/* 支持9种通信模式 */
ZMQ_REQ // 同步请求
ZMQ_REP // 同步响应
ZMQ_PUB // 消息发布
ZMQ_SUB // 消息订阅
ZMQ_PUSH // 负载分发
ZMQ_PULL // 负载收集
// ...其他模式
去中心化架构
无需中间代理服务器,节点间直接通信。
多传输协议支持
tcp://192.168.1.1:5555 # TCP网络传输
ipc:///tmp/feeds.sock # 进程间通信
inproc://worker_pool # 线程间通信
智能消息路由
自动处理连接重试、队列缓冲、负载均衡。
多语言支持
C/C++、Python、Java、Go等40+语言绑定。
轻量高效
核心库仅300KB,吞吐量可达百万级消息/秒。
[/ol]三、环境搭建(Linux C)下面手把手教大家在Linux C下使用ZMQ库:
3.1 安装开发包
# Ubuntu/Debian
sudo apt install libzmq3-dev build-essential
# 验证安装
pkg-config --modversion libzmq # 输出类似4.3.4
3.2 基础编译命令
gcc -o demo demo.c -lzmq -lpthread
四、三大经典模式下面正对ZMQ库非常经典的三种模式进行一个简单的使用介绍:请求-响应模式(REQ-REP)服务端(rep_server.c)#include
#include
#include
#include
#include
#define CHECK_STATUS(call) if (call == -1) { printf("Error at %s:%d
", __FILE__, __LINE__); exit(1); }
int main() {
// 创建上下文
void *context = zmq_ctx_new();
assert(context);
// 创建REP套接字
void *responder = zmq_socket(context, ZMQ_REP);
assert(responder);
// 绑定端口
int rc = zmq_bind(responder, "tcp://*:5555");
CHECK_STATUS(rc);
printf("服务端启动,等待请求...
");
while (1) {
char buffer[256];
// 接收请求(阻塞模式)
int size = zmq_recv(responder, buffer, 255, 0);
CHECK_STATUS(size);
buffer[size] = '\0';
printf("收到请求: %s
", buffer);
// 处理业务逻辑
sleep(1); // 模拟处理耗时
// 发送响应
constchar *response = "处理结果:OK";
size = zmq_send(responder, response, strlen(response), 0);
CHECK_STATUS(size);
}
// 清理资源(实际不会执行到此处)
zmq_close(responder);
zmq_ctx_destroy(context);
return0;
}
客户端(req_client.c)#include
#include
#include
#include
int main() {
void *context = zmq_ctx_new();
assert(context);
void *requester = zmq_socket(context, ZMQ_REQ);
assert(requester);
printf("连接服务端...
");
int rc = zmq_connect(requester, "tcp://localhost:5555");
assert(rc == 0);
for (int i = 0; i 5; i++) {
char request[50];
snprintf(request, 50, "请求#%d", i+1);
// 发送请求
printf("发送: %s
", request);
rc = zmq_send(requester, request, strlen(request), 0);
assert(rc != -1);
// 接收响应
char buffer[256];
rc = zmq_recv(requester, buffer, 255, 0);
assert(rc != -1);
buffer[rc] = '\0';
printf("收到响应: %s
", buffer);
}
zmq_close(requester);
zmq_ctx_destroy(context);
return0;
}
编译运行:
# 服务端
gcc rep_server.c -o server -lzmq
./server
# 客户端
gcc req_client.c -o client -lzmq
./client
发布-订阅模式(PUB-SUB)发布者(pub_server.c)#include
#include
#include
#include
#include
#include
int main() {
void *context = zmq_ctx_new();
void *publisher = zmq_socket(context, ZMQ_PUB);
assert(publisher);
int rc = zmq_bind(publisher, "tcp://*:6000");
assert(rc == 0);
printf("数据发布中...
");
while (1) {
// 生成两种消息类型
time_t now = time(NULL);
// 温度消息
char temp_msg[50];
snprintf(temp_msg, 50, "TEMPERATURE %.2f", 25.0 + (rand()%100)/100.0);
zmq_send(publisher, temp_msg, strlen(temp_msg), 0);
// 湿度消息
char humi_msg[50];
snprintf(humi_msg, 50, "HUMIDITY %.2f", 60.0 + (rand()%100)/100.0);
zmq_send(publisher, humi_msg, strlen(humi_msg), 0);
sleep(1); // 每秒更新
}
zmq_close(publisher);
zmq_ctx_destroy(context);
return0;
}
订阅者(sub_client.c)#include
#include
#include
#include
int main(int argc, char *argv[]) {
if (argc 2) {
printf("Usage: %s [TEMPERATURE|HUMIDITY]
", argv[0]);
return1;
}
void *context = zmq_ctx_new();
void *subscriber = zmq_socket(context, ZMQ_SUB);
assert(subscriber);
int rc = zmq_connect(subscriber, "tcp://localhost:6000");
assert(rc == 0);
// 设置订阅过滤器
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, argv[1], strlen(argv[1]));
assert(rc == 0);
printf("订阅 [%s] 数据...
", argv[1]);
while (1) {
char buffer[256];
int size = zmq_recv(subscriber, buffer, 255, 0);
buffer[size] = '\0';
printf("收到数据: %s
", buffer);
}
zmq_close(subscriber);
zmq_ctx_destroy(context);
return0;
}
运行示例:
# 发布者
gcc pub_server.c -o publisher -lzmq
./publisher
# 温度订阅者
gcc sub_client.c -o subscriber -lzmq
./subscriber TEMPERATURE
# 湿度订阅者
./subscriber HUMIDITY
流水线模式(PUSH-PULL)任务分发器(ventilator.c)#include
#include
#include
#include
int main() {
void *context = zmq_ctx_new();
void *sender = zmq_socket(context, ZMQ_PUSH);
assert(sender);
// 绑定推送端口
int rc = zmq_bind(sender, "tcp://*:7000");
assert(rc == 0);
printf("任务分发准备就绪
");
sleep(1); // 等待worker连接
// 生成100个计算任务
for (int task_id = 0; task_id 100; task_id++) {
char msg[20];
snprintf(msg, 20, "TASK:%04d", task_id);
rc = zmq_send(sender, msg, strlen(msg), 0);
assert(rc != -1);
usleep(100000); // 0.1秒间隔
}
// 发送结束信号
rc = zmq_send(sender, "EXIT", 4, 0);
assert(rc != -1);
zmq_close(sender);
zmq_ctx_destroy(context);
return0;
}
工作者(worker.c)#include
#include
#include
#include
#include
int main() {
void *context = zmq_ctx_new();
void *receiver = zmq_socket(context, ZMQ_PULL);
assert(receiver);
int rc = zmq_connect(receiver, "tcp://localhost:7000");
assert(rc == 0);
printf("工作者就绪
");
while (1) {
char buffer[256];
int size = zmq_recv(receiver, buffer, 255, 0);
buffer[size] = '\0';
if (strcmp(buffer, "EXIT") == 0) break;
printf("处理任务: %s
", buffer);
usleep(500000); // 模拟处理耗时
}
printf("收到退出指令
");
zmq_close(receiver);
zmq_ctx_destroy(context);
return0;
}
运行步骤:
# 启动分发器
gcc ventilator.c -o ventilator -lzmq
./ventilator
# 启动多个工作者(另开终端)
gcc worker.c -o worker -lzmq
./worker
这三种经典的通信模型,也是我们在实际项目中经常使用,还有其他相关的特点,大家可以自行摸索~
------------ END ------------
gdt4hok3kcl64013373307.gif
●专栏《嵌入式工具》
●专栏《嵌入式开发》
●专栏《Keil教程》
●嵌入式专栏精选教程
关注公众号回复“加群”按规则加入技术交流群,回复“1024”查看更多内容。
点击“阅读原文”查看更多分享。 |
|