cppzmq 是 ZeroMQ 的 C++ 绑定库,是一个轻量级的、仅包含头文件的绑定,提供了一套现代 C++ 接口,用于在 C++ 应用程序中使用 ZeroMQ。它基于 RAII 模式,自动管理资源,使 ZeroMQ 的使用更加安全和便捷。
cppzmq 需要先安装 ZeroMQ 库:
# Ubuntu/Debian
sudo apt-get install libzmq3-dev
# macOS
brew install zeromq
# Windows (vcpkg)
vcpkg install zeromq:x64-windows
vcpkg install cppzmq
git clone https://github.com/zeromq/cppzmq.git
cd cppzmq
mkdir build && cd build
cmake ..
cmake --install .
#include <zmq.hpp> // 仅需一个头文件
// 链接时需要 -lzmq
#include <zmq.hpp>
#include <iostream>
#include <string>
int main() {
// 创建 ZeroMQ 上下文
zmq::context_t context(1);
// 创建 REP 套接字
zmq::socket_t socket(context, ZMQ_REP);
socket.bind("tcp://*:5555");
std::cout << "Server started, waiting for requests...\n";
while (true) {
// 接收请求
zmq::message_t request;
socket.recv(request, zmq::recv_flags::none);
std::string req_str(static_cast<char*>(request.data()), request.size());
std::cout << "Received: " << req_str << "\n";
// 发送响应
std::string reply = "World";
zmq::message_t response(reply.begin(), reply.end());
socket.send(response, zmq::send_flags::none);
}
return 0;
}
#include <zmq.hpp>
#include <iostream>
#include <string>
int main() {
// 创建 ZeroMQ 上下文
zmq::context_t context(1);
// 创建 REQ 套接字
zmq::socket_t socket(context, ZMQ_REQ);
socket.connect("tcp://localhost:5555");
// 发送请求
std::string request_str = "Hello";
zmq::message_t request(request_str.begin(), request_str.end());
std::cout << "Sending: " << request_str << "\n";
socket.send(request, zmq::send_flags::none);
// 接收响应
zmq::message_t reply;
socket.recv(reply, zmq::recv_flags::none);
std::string reply_str(static_cast<char*>(reply.data()), reply.size());
std::cout << "Received: " << reply_str << "\n";
return 0;
}
#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <chrono>
int main() {
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
std::cout << "Publisher started\n";
int count = 0;
while (count < 10) {
// 发布消息
std::string topic = "news";
std::string message = "Update #" + std::to_string(count++);
// 先发送主题
zmq::message_t topic_msg(topic.begin(), topic.end());
publisher.send(topic_msg, zmq::send_flags::sndmore);
// 再发送消息内容
zmq::message_t content_msg(message.begin(), message.end());
publisher.send(content_msg, zmq::send_flags::none);
std::cout << "Published: " << topic << " - " << message << "\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return 0;
}
#include <zmq.hpp>
#include <iostream>
#include <string>
int main() {
zmq::context_t context(1);
zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
// 设置过滤器:只订阅 "news" 主题
std::string filter = "news";
subscriber.set(zmq::sockopt::subscribe, filter);
std::cout << "Subscriber started\n";
while (true) {
// 接收主题
zmq::message_t topic_msg;
subscriber.recv(topic_msg, zmq::recv_flags::none);
std::string topic(static_cast<char*>(topic_msg.data()), topic_msg.size());
// 接收消息内容
zmq::message_t content_msg;
auto recv_result = subscriber.recv(content_msg, zmq::recv_flags::none);
std::string content(static_cast<char*>(content_msg.data()), content_msg.size());
std::cout << "Received: [" << topic << "] " << content << "\n";
}
return 0;
}
#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <chrono>
int main() {
zmq::context_t context(1);
zmq::socket_t pusher(context, ZMQ_PUSH);
pusher.bind("tcp://*:5557");
std::cout << "Pusher started\n";
for (int i = 0; i < 10; i++) {
std::string message = "Task #" + std::to_string(i);
zmq::message_t msg(message.begin(), message.end());
pusher.send(msg, zmq::send_flags::none);
std::cout << "Sent: " << message << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
return 0;
}
#include <zmq.hpp>
#include <iostream>
#include <thread>
int main() {
zmq::context_t context(1);
zmq::socket_t puller(context, ZMQ_PULL);
puller.connect("tcp://localhost:5557");
std::cout << "Puller started\n";
while (true) {
zmq::message_t msg;
auto recv_result = puller.recv(msg, zmq::recv_flags::none);
std::string message(static_cast<char*>(msg.data()), msg.size());
std::cout << "Received: " << message
<< " on thread " << std::this_thread::get_id() << "\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return 0;
}
#include <zmq.hpp>
#include <iostream>
int main() {
zmq::context_t context(1);
zmq::socket_t sender(context, ZMQ_PAIR);
sender.bind("inproc://test");
zmq::socket_t receiver(context, ZMQ_PAIR);
receiver.connect("inproc://test");
// 发送多部分消息
std::string part1 = "Header";
std::string part2 = "Body";
std::string part3 = "Footer";
zmq::message_t msg1(part1.begin(), part1.end());
zmq::message_t msg2(part2.begin(), part2.end());
zmq::message_t msg3(part3.begin(), part3.end());
// 使用 sndmore 标志发送多部分消息
sender.send(msg1, zmq::send_flags::sndmore);
sender.send(msg2, zmq::send_flags::sndmore);
sender.send(msg3, zmq::send_flags::none);
std::cout << "Sent 3-part message\n";
// 接收多部分消息
zmq::message_t recv1, recv2, recv3;
receiver.recv(recv1, zmq::recv_flags::none);
receiver.recv(recv2, zmq::recv_flags::none);
receiver.recv(recv3, zmq::recv_flags::none);
std::cout << "Received: "
<< std::string(static_cast<char*>(recv1.data()), recv1.size()) << " "
<< std::string(static_cast<char*>(recv2.data()), recv2.size()) << " "
<< std::string(static_cast<char*>(recv3.data()), recv3.size()) << "\n";
return 0;
}
#include <zmq.hpp>
#include <iostream>
int main() {
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_SUB);
socket.connect("tcp://localhost:5556");
socket.set(zmq::sockopt::subscribe, "");
// 设置接收超时(毫秒)
int timeout_ms = 1000;
socket.set(zmq::sockopt::rcvtimeo, timeout_ms);
std::cout << "Waiting for messages with 1s timeout...\n";
while (true) {
zmq::message_t msg;
auto result = socket.recv(msg, zmq::recv_flags::none);
if (result) {
std::cout << "Received: "
<< std::string(static_cast<char*>(msg.data()), msg.size()) << "\n";
} else {
std::cout << "Timeout, no message received\n";
}
}
return 0;
}
zmq::socket_t socket(context, ZMQ_PUB);
// 设置发送缓冲区大小(消息数)
int sndhwm = 1000;
socket.set(zmq::sockopt::sndhwm, sndhwm);
// 设置接收缓冲区大小
int rcvhwm = 1000;
socket.set(zmq::sockopt::rcvhwm, rcvhwm);
#include <zmq.hpp>
#include <zmq_poll.h>
#include <iostream>
int main() {
zmq::context_t context(1);
// 创建两个订阅者
zmq::socket_t sub1(context, ZMQ_SUB);
sub1.connect("tcp://localhost:5556");
sub1.set(zmq::sockopt::subscribe, "topic1");
zmq::socket_t sub2(context, ZMQ_SUB);
sub2.connect("tcp://localhost:5557");
sub2.set(zmq::sockopt::subscribe, "topic2");
// 设置 poll 项
zmq::pollitem_t items[] = {
{ static_cast<void*>(sub1), 0, ZMQ_POLLIN, 0 },
{ static_cast<void*>(sub2), 0, ZMQ_POLLIN, 0 }
};
std::cout << "Listening on both sockets...\n";
while (true) {
// 等待任一套接字有消息(超时 -1 表示无限等待)
zmq::poll(items, 2, -1);
// 检查 sub1
if (items[0].revents & ZMQ_POLLIN) {
zmq::message_t msg;
sub1.recv(msg, zmq::recv_flags::none);
std::cout << "From sub1: "
<< std::string(static_cast<char*>(msg.data()), msg.size()) << "\n";
}
// 检查 sub2
if (items[1].revents & ZMQ_POLLIN) {
zmq::message_t msg;
sub2.recv(msg, zmq::recv_flags::none);
std::cout << "From sub2: "
<< std::string(static_cast<char*>(msg.data()), msg.size()) << "\n";
}
}
return 0;
}
#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <vector>
int main() {
zmq::context_t context(1);
// 接收客户端的请求
zmq::socket_t receiver(context, ZMQ_PULL);
receiver.bind("tcp://*:5555");
// 发送任务给工作节点
zmq::socket_t sender(context, ZMQ_PUSH);
sender.bind("tcp://*:5557");
std::cout << "Task dispatcher started\n";
// 等待客户端准备就绪
zmq::message_t msg;
receiver.recv(msg, zmq::recv_flags::none);
std::cout << "Client ready, starting to send tasks\n";
// 发送100个任务
for (int i = 0; i < 100; i++) {
std::string task = "Task_" + std::to_string(i);
zmq::message_t task_msg(task.begin(), task.end());
sender.send(task_msg, zmq::send_flags::none);
}
// 发送结束信号
std::string done = "DONE";
zmq::message_t done_msg(done.begin(), done.end());
sender.send(done_msg, zmq::send_flags::none);
std::cout << "All tasks sent\n";
return 0;
}
#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <random>
int main() {
zmq::context_t context(1);
// 接收任务
zmq::socket_t receiver(context, ZMQ_PULL);
receiver.connect("tcp://localhost:5557");
// 发送结果
zmq::socket_t sender(context, ZMQ_PUSH);
sender.connect("tcp://localhost:5558");
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(100, 500);
while (true) {
zmq::message_t msg;
receiver.recv(msg, zmq::recv_flags::none);
std::string task(static_cast<char*>(msg.data()), msg.size());
if (task == "DONE") {
std::cout << "Worker finished\n";
break;
}
// 处理任务(模拟)
int process_time = dis(gen);
std::cout << "Processing: " << task
<< " (" << process_time << "ms)\n";
std::this_thread::sleep_for(std::chrono::milliseconds(process_time));
// 发送结果
zmq::message_t result(msg.data(), msg.size());
sender.send(result, zmq::send_flags::none);
}
return 0;
}
#include <zmq.hpp>
#include <iostream>
#include <fstream>
#include <chrono>
#include <ctime>
std::string get_timestamp() {
auto now = std::chrono::system_clock::now();
auto time = std::chrono::system_clock::to_time_t(now);
char buf[100];
std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", std::localtime(&time));
return buf;
}
int main() {
zmq::context_t context(1);
zmq::socket_t collector(context, ZMQ_SUB);
collector.bind("tcp://*:5559");
// 订阅所有主题
collector.set(zmq::sockopt::subscribe, "");
std::ofstream logfile("application.log", std::ios::app);
std::cout << "Log collector started\n";
while (true) {
// 接收日志级别(主题)
zmq::message_t level_msg;
collector.recv(level_msg, zmq::recv_flags::none);
std::string level(static_cast<char*>(level_msg.data()), level_msg.size());
// 接收日志消息
zmq::message_t msg;
collector.recv(msg, zmq::recv_flags::none);
std::string log_msg(static_cast<char*>(msg.data()), msg.size());
// 格式化日志
std::string formatted = "[" + get_timestamp() + "] [" + level + "] " + log_msg;
// 输出到控制台
std::cout << formatted << "\n";
// 写入文件
logfile << formatted << "\n";
logfile.flush();
}
return 0;
}
#include <zmq.hpp>
#include <iostream>
#include <string>
#include <thread>
int main() {
zmq::context_t context(1);
zmq::socket_t logger(context, ZMQ_PUB);
logger.connect("tcp://localhost:5559");
std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待连接
while (true) {
std::string level;
std::string message;
std::cout << "Enter level (INFO/WARN/ERROR): ";
std::cin >> level;
std::cout << "Enter message: ";
std::cin.ignore();
std::getline(std::cin, message);
// 发送多部分消息:主题 + 内容
zmq::message_t level_msg(level.begin(), level.end());
logger.send(level_msg, zmq::send_flags::sndmore);
zmq::message_t msg(message.begin(), message.end());
logger.send(msg, zmq::send_flags::none);
std::cout << "Log sent\n";
}
return 0;
}
| 类型 | 方向 | 说明 |
|---|---|---|
ZMQ_REQ |
客户端 | 请求-响应客户端 |
ZMQ_REP |
服务端 | 请求-响应服务端 |
ZMQ_DEALER |
客户端 | 高级请求-响应客户端 |
ZMQ_ROUTER |
服务端 | 高级请求-响应服务端 |
ZMQ_PUB |
发布者 | 发布-订阅模式发布者 |
ZMQ_SUB |
订阅者 | 发布-订阅模式订阅者 |
ZMQ_PUSH |
推拉者 | 管道模式推送端 |
ZMQ_PULL |
拉取者 | 管道模式拉取端 |
ZMQ_PAIR |
对等 | 对等通信模式 |