← 返回库列表

cppzmq 使用指南

cppzmq 是 ZeroMQ 的 C++ 绑定库,是一个轻量级的、仅包含头文件的绑定,提供了一套现代 C++ 接口,用于在 C++ 应用程序中使用 ZeroMQ。它基于 RAII 模式,自动管理资源,使 ZeroMQ 的使用更加安全和便捷。

📦 安装

前置要求

cppzmq 需要先安装 ZeroMQ 库:

# Ubuntu/Debian
sudo apt-get install libzmq3-dev

# macOS
brew install zeromq

# Windows (vcpkg)
vcpkg install zeromq:x64-windows

通过 vcpkg 安装

vcpkg install cppzmq

通过 CMake 安装

git clone https://github.com/zeromq/cppzmq.git
cd cppzmq
mkdir build && cd build
cmake ..
cmake --install .

头文件方式使用

#include <zmq.hpp>  // 仅需一个头文件
// 链接时需要 -lzmq

🚀 基础使用

Hello World(REQ-REP 模式)

服务器端(REP)

#include <zmq.hpp>
#include <iostream>
#include <string>

int main() {
    // 创建 ZeroMQ 上下文
    zmq::context_t context(1);

    // 创建 REP 套接字
    zmq::socket_t socket(context, ZMQ_REP);
    socket.bind("tcp://*:5555");

    std::cout << "Server started, waiting for requests...\n";

    while (true) {
        // 接收请求
        zmq::message_t request;
        socket.recv(request, zmq::recv_flags::none);
        std::string req_str(static_cast<char*>(request.data()), request.size());
        std::cout << "Received: " << req_str << "\n";

        // 发送响应
        std::string reply = "World";
        zmq::message_t response(reply.begin(), reply.end());
        socket.send(response, zmq::send_flags::none);
    }

    return 0;
}

客户端(REQ)

#include <zmq.hpp>
#include <iostream>
#include <string>

int main() {
    // 创建 ZeroMQ 上下文
    zmq::context_t context(1);

    // 创建 REQ 套接字
    zmq::socket_t socket(context, ZMQ_REQ);
    socket.connect("tcp://localhost:5555");

    // 发送请求
    std::string request_str = "Hello";
    zmq::message_t request(request_str.begin(), request_str.end());
    std::cout << "Sending: " << request_str << "\n";
    socket.send(request, zmq::send_flags::none);

    // 接收响应
    zmq::message_t reply;
    socket.recv(reply, zmq::recv_flags::none);
    std::string reply_str(static_cast<char*>(reply.data()), reply.size());
    std::cout << "Received: " << reply_str << "\n";

    return 0;
}

📤📥 PUB-SUB 模式(发布-订阅)

发布者(PUB)

#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <chrono>

int main() {
    zmq::context_t context(1);
    zmq::socket_t publisher(context, ZMQ_PUB);
    publisher.bind("tcp://*:5556");

    std::cout << "Publisher started\n";

    int count = 0;
    while (count < 10) {
        // 发布消息
        std::string topic = "news";
        std::string message = "Update #" + std::to_string(count++);

        // 先发送主题
        zmq::message_t topic_msg(topic.begin(), topic.end());
        publisher.send(topic_msg, zmq::send_flags::sndmore);

        // 再发送消息内容
        zmq::message_t content_msg(message.begin(), message.end());
        publisher.send(content_msg, zmq::send_flags::none);

        std::cout << "Published: " << topic << " - " << message << "\n";

        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    return 0;
}

订阅者(SUB)

#include <zmq.hpp>
#include <iostream>
#include <string>

int main() {
    zmq::context_t context(1);
    zmq::socket_t subscriber(context, ZMQ_SUB);
    subscriber.connect("tcp://localhost:5556");

    // 设置过滤器:只订阅 "news" 主题
    std::string filter = "news";
    subscriber.set(zmq::sockopt::subscribe, filter);

    std::cout << "Subscriber started\n";

    while (true) {
        // 接收主题
        zmq::message_t topic_msg;
        subscriber.recv(topic_msg, zmq::recv_flags::none);
        std::string topic(static_cast<char*>(topic_msg.data()), topic_msg.size());

        // 接收消息内容
        zmq::message_t content_msg;
        auto recv_result = subscriber.recv(content_msg, zmq::recv_flags::none);
        std::string content(static_cast<char*>(content_msg.data()), content_msg.size());

        std::cout << "Received: [" << topic << "] " << content << "\n";
    }

    return 0;
}

⚡ PUSH-PULL 模式(管道)

推拉者(PUSH)

#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <chrono>

int main() {
    zmq::context_t context(1);
    zmq::socket_t pusher(context, ZMQ_PUSH);
    pusher.bind("tcp://*:5557");

    std::cout << "Pusher started\n";

    for (int i = 0; i < 10; i++) {
        std::string message = "Task #" + std::to_string(i);
        zmq::message_t msg(message.begin(), message.end());
        
        pusher.send(msg, zmq::send_flags::none);
        std::cout << "Sent: " << message << "\n";

        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }

    return 0;
}

拉取者(PULL)

#include <zmq.hpp>
#include <iostream>
#include <thread>

int main() {
    zmq::context_t context(1);
    zmq::socket_t puller(context, ZMQ_PULL);
    puller.connect("tcp://localhost:5557");

    std::cout << "Puller started\n";

    while (true) {
        zmq::message_t msg;
        auto recv_result = puller.recv(msg, zmq::recv_flags::none);
        
        std::string message(static_cast<char*>(msg.data()), msg.size());
        std::cout << "Received: " << message 
                  << " on thread " << std::this_thread::get_id() << "\n";
        
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    return 0;
}

🔄 高级功能

多部分消息

#include <zmq.hpp>
#include <iostream>

int main() {
    zmq::context_t context(1);
    zmq::socket_t sender(context, ZMQ_PAIR);
    sender.bind("inproc://test");

    zmq::socket_t receiver(context, ZMQ_PAIR);
    receiver.connect("inproc://test");

    // 发送多部分消息
    std::string part1 = "Header";
    std::string part2 = "Body";
    std::string part3 = "Footer";

    zmq::message_t msg1(part1.begin(), part1.end());
    zmq::message_t msg2(part2.begin(), part2.end());
    zmq::message_t msg3(part3.begin(), part3.end());

    // 使用 sndmore 标志发送多部分消息
    sender.send(msg1, zmq::send_flags::sndmore);
    sender.send(msg2, zmq::send_flags::sndmore);
    sender.send(msg3, zmq::send_flags::none);

    std::cout << "Sent 3-part message\n";

    // 接收多部分消息
    zmq::message_t recv1, recv2, recv3;
    receiver.recv(recv1, zmq::recv_flags::none);
    receiver.recv(recv2, zmq::recv_flags::none);
    receiver.recv(recv3, zmq::recv_flags::none);

    std::cout << "Received: " 
              << std::string(static_cast<char*>(recv1.data()), recv1.size()) << " "
              << std::string(static_cast<char*>(recv2.data()), recv2.size()) << " "
              << std::string(static_cast<char*>(recv3.data()), recv3.size()) << "\n";

    return 0;
}

设置超时

#include <zmq.hpp>
#include <iostream>

int main() {
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_SUB);
    socket.connect("tcp://localhost:5556");
    socket.set(zmq::sockopt::subscribe, "");

    // 设置接收超时(毫秒)
    int timeout_ms = 1000;
    socket.set(zmq::sockopt::rcvtimeo, timeout_ms);

    std::cout << "Waiting for messages with 1s timeout...\n";

    while (true) {
        zmq::message_t msg;
        auto result = socket.recv(msg, zmq::recv_flags::none);

        if (result) {
            std::cout << "Received: " 
                      << std::string(static_cast<char*>(msg.data()), msg.size()) << "\n";
        } else {
            std::cout << "Timeout, no message received\n";
        }
    }

    return 0;
}

设置发送/接收缓冲区大小

zmq::socket_t socket(context, ZMQ_PUB);

// 设置发送缓冲区大小(消息数)
int sndhwm = 1000;
socket.set(zmq::sockopt::sndhwm, sndhwm);

// 设置接收缓冲区大小
int rcvhwm = 1000;
socket.set(zmq::sockopt::rcvhwm, rcvhwm);

使用 poll 监听多个套接字

#include <zmq.hpp>
#include <zmq_poll.h>
#include <iostream>

int main() {
    zmq::context_t context(1);

    // 创建两个订阅者
    zmq::socket_t sub1(context, ZMQ_SUB);
    sub1.connect("tcp://localhost:5556");
    sub1.set(zmq::sockopt::subscribe, "topic1");

    zmq::socket_t sub2(context, ZMQ_SUB);
    sub2.connect("tcp://localhost:5557");
    sub2.set(zmq::sockopt::subscribe, "topic2");

    // 设置 poll 项
    zmq::pollitem_t items[] = {
        { static_cast<void*>(sub1), 0, ZMQ_POLLIN, 0 },
        { static_cast<void*>(sub2), 0, ZMQ_POLLIN, 0 }
    };

    std::cout << "Listening on both sockets...\n";

    while (true) {
        // 等待任一套接字有消息(超时 -1 表示无限等待)
        zmq::poll(items, 2, -1);

        // 检查 sub1
        if (items[0].revents & ZMQ_POLLIN) {
            zmq::message_t msg;
            sub1.recv(msg, zmq::recv_flags::none);
            std::cout << "From sub1: " 
                      << std::string(static_cast<char*>(msg.data()), msg.size()) << "\n";
        }

        // 检查 sub2
        if (items[1].revents & ZMQ_POLLIN) {
            zmq::message_t msg;
            sub2.recv(msg, zmq::recv_flags::none);
            std::cout << "From sub2: " 
                      << std::string(static_cast<char*>(msg.data()), msg.size()) << "\n";
        }
    }

    return 0;
}

💼 实际应用示例

任务分发系统

任务分发器(Ventilator)

#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <vector>

int main() {
    zmq::context_t context(1);

    // 接收客户端的请求
    zmq::socket_t receiver(context, ZMQ_PULL);
    receiver.bind("tcp://*:5555");

    // 发送任务给工作节点
    zmq::socket_t sender(context, ZMQ_PUSH);
    sender.bind("tcp://*:5557");

    std::cout << "Task dispatcher started\n";

    // 等待客户端准备就绪
    zmq::message_t msg;
    receiver.recv(msg, zmq::recv_flags::none);
    std::cout << "Client ready, starting to send tasks\n";

    // 发送100个任务
    for (int i = 0; i < 100; i++) {
        std::string task = "Task_" + std::to_string(i);
        zmq::message_t task_msg(task.begin(), task.end());
        sender.send(task_msg, zmq::send_flags::none);
    }

    // 发送结束信号
    std::string done = "DONE";
    zmq::message_t done_msg(done.begin(), done.end());
    sender.send(done_msg, zmq::send_flags::none);

    std::cout << "All tasks sent\n";

    return 0;
}

工作节点(Worker)

#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <random>

int main() {
    zmq::context_t context(1);

    // 接收任务
    zmq::socket_t receiver(context, ZMQ_PULL);
    receiver.connect("tcp://localhost:5557");

    // 发送结果
    zmq::socket_t sender(context, ZMQ_PUSH);
    sender.connect("tcp://localhost:5558");

    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(100, 500);

    while (true) {
        zmq::message_t msg;
        receiver.recv(msg, zmq::recv_flags::none);

        std::string task(static_cast<char*>(msg.data()), msg.size());

        if (task == "DONE") {
            std::cout << "Worker finished\n";
            break;
        }

        // 处理任务(模拟)
        int process_time = dis(gen);
        std::cout << "Processing: " << task 
                  << " (" << process_time << "ms)\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(process_time));

        // 发送结果
        zmq::message_t result(msg.data(), msg.size());
        sender.send(result, zmq::send_flags::none);
    }

    return 0;
}

实时日志系统

日志收集器

#include <zmq.hpp>
#include <iostream>
#include <fstream>
#include <chrono>
#include <ctime>

std::string get_timestamp() {
    auto now = std::chrono::system_clock::now();
    auto time = std::chrono::system_clock::to_time_t(now);
    char buf[100];
    std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", std::localtime(&time));
    return buf;
}

int main() {
    zmq::context_t context(1);
    zmq::socket_t collector(context, ZMQ_SUB);
    collector.bind("tcp://*:5559");

    // 订阅所有主题
    collector.set(zmq::sockopt::subscribe, "");

    std::ofstream logfile("application.log", std::ios::app);

    std::cout << "Log collector started\n";

    while (true) {
        // 接收日志级别(主题)
        zmq::message_t level_msg;
        collector.recv(level_msg, zmq::recv_flags::none);
        std::string level(static_cast<char*>(level_msg.data()), level_msg.size());

        // 接收日志消息
        zmq::message_t msg;
        collector.recv(msg, zmq::recv_flags::none);
        std::string log_msg(static_cast<char*>(msg.data()), msg.size());

        // 格式化日志
        std::string formatted = "[" + get_timestamp() + "] [" + level + "] " + log_msg;

        // 输出到控制台
        std::cout << formatted << "\n";

        // 写入文件
        logfile << formatted << "\n";
        logfile.flush();
    }

    return 0;
}

日志生成器

#include <zmq.hpp>
#include <iostream>
#include <string>
#include <thread>

int main() {
    zmq::context_t context(1);
    zmq::socket_t logger(context, ZMQ_PUB);
    logger.connect("tcp://localhost:5559");

    std::this_thread::sleep_for(std::chrono::seconds(1));  // 等待连接

    while (true) {
        std::string level;
        std::string message;

        std::cout << "Enter level (INFO/WARN/ERROR): ";
        std::cin >> level;

        std::cout << "Enter message: ";
        std::cin.ignore();
        std::getline(std::cin, message);

        // 发送多部分消息:主题 + 内容
        zmq::message_t level_msg(level.begin(), level.end());
        logger.send(level_msg, zmq::send_flags::sndmore);

        zmq::message_t msg(message.begin(), message.end());
        logger.send(msg, zmq::send_flags::none);

        std::cout << "Log sent\n";
    }

    return 0;
}

📋 ZeroMQ 套接字类型速查

类型 方向 说明
ZMQ_REQ 客户端 请求-响应客户端
ZMQ_REP 服务端 请求-响应服务端
ZMQ_DEALER 客户端 高级请求-响应客户端
ZMQ_ROUTER 服务端 高级请求-响应服务端
ZMQ_PUB 发布者 发布-订阅模式发布者
ZMQ_SUB 订阅者 发布-订阅模式订阅者
ZMQ_PUSH 推拉者 管道模式推送端
ZMQ_PULL 拉取者 管道模式拉取端
ZMQ_PAIR 对等 对等通信模式
💡 提示: cppzmq 使用 RAII 模式自动管理资源,套接字在析构时会自动关闭,上下文在析构时会自动终止。这比原生的 C API 更安全。
⚠️ 注意:

🔗 参考资料