电子产业一站式赋能平台

PCB联盟网

搜索
查看: 43|回复: 0
收起左侧

分享一个嵌入式开发的轻量级消息库

[复制链接]

558

主题

558

帖子

6667

积分

高级会员

Rank: 5Rank: 5

积分
6667
发表于 前天 11:45 | 显示全部楼层 |阅读模式
关注+星标公众,不错过精彩内容

fosqabbiod064013373207.gif

fosqabbiod064013373207.gif

转自| 嵌入式情报局

做嵌入式开发,很多时候都会用到“消息通信”。今天分享一个嵌入式开发的轻量级消息库:ZeroMQ(简称ZMQ)
作为分布式系统通信的“隐藏王者”,凭借其独特设计理念在物联网、微服务、金融交易等领域广泛应用。本文将从核心特性剖析入手,带你掌握Linux C环境下的ZMQ开发全流程。
一、ZeroMQ是什么?ZeroMQ不是传统意义上的消息队列(如RabbitMQ),而是一个嵌入式网络通信库。它通过TCP/IPC等传输协议,提供类似Socket的API接口,却实现了异步消息传递、灵活路由、负载均衡等高级特性,被誉为“网络通信的瑞士军刀”。
二、ZeroMQ的七大核心特性异步非阻塞IO
基于事件驱动的消息处理,单线程即可处理数万并发连接。
多模式通信
/* 支持9种通信模式 */
ZMQ_REQ  // 同步请求
ZMQ_REP   // 同步响应
ZMQ_PUB   // 消息发布
ZMQ_SUB   // 消息订阅
ZMQ_PUSH  // 负载分发
ZMQ_PULL  // 负载收集
// ...其他模式
去中心化架构
无需中间代理服务器,节点间直接通信。
多传输协议支持
tcp://192.168.1.1:5555   # TCP网络传输
ipc:///tmp/feeds.sock     # 进程间通信
inproc://worker_pool      # 线程间通信
智能消息路由
自动处理连接重试、队列缓冲、负载均衡。
多语言支持
C/C++、Python、Java、Go等40+语言绑定。
轻量高效
核心库仅300KB,吞吐量可达百万级消息/秒。
[/ol]三、环境搭建(Linux C)下面手把手教大家在Linux C下使用ZMQ库:
3.1 安装开发包
# Ubuntu/Debian
sudo apt install libzmq3-dev build-essential
# 验证安装
pkg-config --modversion libzmq  # 输出类似4.3.4
3.2 基础编译命令
gcc -o demo demo.c -lzmq -lpthread
四、三大经典模式下面正对ZMQ库非常经典的三种模式进行一个简单的使用介绍:请求-响应模式(REQ-REP)服务端(rep_server.c)#include
#include
#include
#include
#include
#define CHECK_STATUS(call) if (call == -1) { printf("Error at %s:%d
", __FILE__, __LINE__); exit(1); }
int main() {
    // 创建上下文
    void *context = zmq_ctx_new();
    assert(context);
    // 创建REP套接字
    void *responder = zmq_socket(context, ZMQ_REP);
    assert(responder);
   
    // 绑定端口
    int rc = zmq_bind(responder, "tcp://*:5555");
    CHECK_STATUS(rc);
    printf("服务端启动,等待请求...
");
    while (1) {
        char buffer[256];
        
        // 接收请求(阻塞模式)
        int size = zmq_recv(responder, buffer, 255, 0);
        CHECK_STATUS(size);
        buffer[size] = '\0';
        printf("收到请求: %s
", buffer);
        // 处理业务逻辑
        sleep(1); // 模拟处理耗时
        
        // 发送响应
        constchar *response = "处理结果:OK";
        size = zmq_send(responder, response, strlen(response), 0);
        CHECK_STATUS(size);
    }
    // 清理资源(实际不会执行到此处)
    zmq_close(responder);
    zmq_ctx_destroy(context);
    return0;
}
客户端(req_client.c)#include
#include
#include
#include
int main() {
    void *context = zmq_ctx_new();
    assert(context);
    void *requester = zmq_socket(context, ZMQ_REQ);
    assert(requester);
   
    printf("连接服务端...
");
    int rc = zmq_connect(requester, "tcp://localhost:5555");
    assert(rc == 0);
    for (int i = 0; i 5; i++) {
        char request[50];
        snprintf(request, 50, "请求#%d", i+1);
        
        // 发送请求
        printf("发送: %s
", request);
        rc = zmq_send(requester, request, strlen(request), 0);
        assert(rc != -1);
        // 接收响应
        char buffer[256];
        rc = zmq_recv(requester, buffer, 255, 0);
        assert(rc != -1);
        buffer[rc] = '\0';
        printf("收到响应: %s
", buffer);
    }
    zmq_close(requester);
    zmq_ctx_destroy(context);
    return0;
}
编译运行
# 服务端
gcc rep_server.c -o server -lzmq
./server
# 客户端
gcc req_client.c -o client -lzmq
./client
发布-订阅模式(PUB-SUB)发布者(pub_server.c)#include
#include
#include
#include
#include
#include
int main() {
    void *context = zmq_ctx_new();
    void *publisher = zmq_socket(context, ZMQ_PUB);
    assert(publisher);
    int rc = zmq_bind(publisher, "tcp://*:6000");
    assert(rc == 0);
    printf("数据发布中...
");
    while (1) {
        // 生成两种消息类型
        time_t now = time(NULL);
        
        // 温度消息
        char temp_msg[50];
        snprintf(temp_msg, 50, "TEMPERATURE %.2f", 25.0 + (rand()%100)/100.0);
        zmq_send(publisher, temp_msg, strlen(temp_msg), 0);
        // 湿度消息
        char humi_msg[50];
        snprintf(humi_msg, 50, "HUMIDITY %.2f", 60.0 + (rand()%100)/100.0);
        zmq_send(publisher, humi_msg, strlen(humi_msg), 0);
        sleep(1); // 每秒更新
    }
    zmq_close(publisher);
    zmq_ctx_destroy(context);
    return0;
}
订阅者(sub_client.c)#include
#include
#include
#include
int main(int argc, char *argv[]) {
    if (argc 2) {
        printf("Usage: %s [TEMPERATURE|HUMIDITY]
", argv[0]);
        return1;
    }
    void *context = zmq_ctx_new();
    void *subscriber = zmq_socket(context, ZMQ_SUB);
    assert(subscriber);
    int rc = zmq_connect(subscriber, "tcp://localhost:6000");
    assert(rc == 0);
    // 设置订阅过滤器
    rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, argv[1], strlen(argv[1]));
    assert(rc == 0);
    printf("订阅 [%s] 数据...
", argv[1]);
    while (1) {
        char buffer[256];
        int size = zmq_recv(subscriber, buffer, 255, 0);
        buffer[size] = '\0';
        printf("收到数据: %s
", buffer);
    }
    zmq_close(subscriber);
    zmq_ctx_destroy(context);
    return0;
}
运行示例
# 发布者
gcc pub_server.c -o publisher -lzmq
./publisher
# 温度订阅者
gcc sub_client.c -o subscriber -lzmq
./subscriber TEMPERATURE
# 湿度订阅者
./subscriber HUMIDITY
流水线模式(PUSH-PULL)任务分发器(ventilator.c)#include
#include
#include
#include
int main() {
    void *context = zmq_ctx_new();
    void *sender = zmq_socket(context, ZMQ_PUSH);
    assert(sender);
    // 绑定推送端口
    int rc = zmq_bind(sender, "tcp://*:7000");
    assert(rc == 0);
    printf("任务分发准备就绪
");
    sleep(1); // 等待worker连接
    // 生成100个计算任务
    for (int task_id = 0; task_id 100; task_id++) {
        char msg[20];
        snprintf(msg, 20, "TASK:%04d", task_id);
        
        rc = zmq_send(sender, msg, strlen(msg), 0);
        assert(rc != -1);
        
        usleep(100000); // 0.1秒间隔
    }
    // 发送结束信号
    rc = zmq_send(sender, "EXIT", 4, 0);
    assert(rc != -1);
    zmq_close(sender);
    zmq_ctx_destroy(context);
    return0;
}
工作者(worker.c)#include
#include
#include
#include
#include
int main() {
    void *context = zmq_ctx_new();
    void *receiver = zmq_socket(context, ZMQ_PULL);
    assert(receiver);
    int rc = zmq_connect(receiver, "tcp://localhost:7000");
    assert(rc == 0);
    printf("工作者就绪
");
    while (1) {
        char buffer[256];
        int size = zmq_recv(receiver, buffer, 255, 0);
        buffer[size] = '\0';
        if (strcmp(buffer, "EXIT") == 0) break;
        printf("处理任务: %s
", buffer);
        usleep(500000); // 模拟处理耗时
    }
    printf("收到退出指令
");
    zmq_close(receiver);
    zmq_ctx_destroy(context);
    return0;
}
运行步骤
# 启动分发器
gcc ventilator.c -o ventilator -lzmq
./ventilator
# 启动多个工作者(另开终端)
gcc worker.c -o worker -lzmq
./worker
这三种经典的通信模型,也是我们在实际项目中经常使用,还有其他相关的特点,大家可以自行摸索~
------------ END ------------

gdt4hok3kcl64013373307.gif

gdt4hok3kcl64013373307.gif


●专栏《嵌入式工具
●专栏《嵌入式开发》
●专栏《Keil教程》
●嵌入式专栏精选教程

关注公众号回复“加群”按规则加入技术交流群,回复“1024”查看更多内容。
点击“阅读原文”查看更多分享。
回复

使用道具 举报

发表回复

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则


联系客服 关注微信 下载APP 返回顶部 返回列表