← 返回库列表

📡 ZeroMQ 使用指南

高性能异步消息传输库

项目地址:https://github.com/zeromq/libzmq

官方文档:https://zeromq.org/

📖 什么是 ZeroMQ?

ZeroMQ(简称 ZMQ)是一个高性能、异步的消息传输库,提供简单的 API 让开发者在应用程序中构建快速、可扩展的网络应用。它不是传统的消息队列(如 RabbitMQ、Kafka),而是一个轻量级的消息传输层。

🚀 安装方式

Linux (Ubuntu/Debian)

sudo apt-get install libzmq3-dev

macOS (Homebrew)

brew install zeromq

Windows (vcpkg)

vcpkg install zeromq

💡 基础使用

请求-响应模式 (REQ-REP)

服务端代码

#include <zmq.hpp>
#include <iostream>
#include <thread>

int main() {
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_REP);
    socket.bind("tcp://*:5555");

    while (true) {
        zmq::message_t request;
        socket.recv(&request);
        std::cout << "收到: " << std::string(static_cast<char*>(request.data()), request.size()) << std::endl;

        std::this_thread::sleep_for(std::chrono::milliseconds(1000));

        zmq::message_t reply(5);
        memcpy(reply.data(), "World", 5);
        socket.send(reply);
    }
    return 0;
}

客户端代码

#include <zmq.hpp>
#include <iostream>

int main() {
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_REQ);
    socket.connect("tcp://localhost:5555");

    for (int i = 0; i < 5; i++) {
        zmq::message_t request(5);
        memcpy(request.data(), "Hello", 5);
        socket.send(request);

        zmq::message_t reply;
        socket.recv(&reply);
        std::cout << "收到: " << std::string(static_cast<char*>(reply.data()), reply.size()) << std::endl;
    }
    return 0;
}

发布-订阅模式 (PUB-SUB)

发布者代码

#include <zmq.hpp>
#include <iostream>
#include <thread>

int main() {
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_PUB);
    socket.bind("tcp://*:5556");

    int counter = 0;
    while (true) {
        std::string message = "Update " + std::to_string(counter++);
        zmq::message_t msg(message.size());
        memcpy(msg.data(), message.c_str(), message.size());
        socket.send(msg);

        std::cout << "已发送: " << message << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    return 0;
}

订阅者代码

#include <zmq.hpp>
#include <iostream>

int main() {
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_SUB);
    socket.connect("tcp://localhost:5556");
    
    // 订阅所有消息
    socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);

    while (true) {
        zmq::message_t msg;
        socket.recv(&msg);
        std::cout << "收到: " << std::string(static_cast<char*>(msg.data()), msg.size()) << std::endl;
    }
    return 0;
}

推送-拉取模式 (PUSH-PULL)

Worker (Pull 端)

#include <zmq.hpp>
#include <iostream>

int main() {
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_PULL);
    socket.connect("tcp://localhost:5557");

    while (true) {
        zmq::message_t msg;
        socket.recv(&msg);
        std::cout << "处理任务: " << std::string(static_cast<char*>(msg.data()), msg.size()) << std::endl;
    }
    return 0;
}

任务分发器 (Push 端)

#include <zmq.hpp>
#include <iostream>

int main() {
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_PUSH);
    socket.bind("tcp://*:5557");

    for (int i = 0; i < 10; i++) {
        std::string task = "Task " + std::to_string(i);
        zmq::message_t msg(task.size());
        memcpy(msg.data(), task.c_str(), task.size());
        socket.send(msg);
        std::cout << "发送: " << task << std::endl;
    }
    return 0;
}

🎯 常见模式速查

模式 描述 用途
REQ-REP 请求-响应 远程调用服务
PUB-SUB 发布-订阅 消息广播
PUSH-PULL 推送-拉取 任务分发
DEALER-ROUTER 高级请求-响应 异步服务
PAIR 一对一 进程间通信

📚 更多资源