高性能异步消息传输库
ZeroMQ(简称 ZMQ)是一个高性能、异步的消息传输库,提供简单的 API 让开发者在应用程序中构建快速、可扩展的网络应用。它不是传统的消息队列(如 RabbitMQ、Kafka),而是一个轻量级的消息传输层。
sudo apt-get install libzmq3-dev
brew install zeromq
vcpkg install zeromq
#include <zmq.hpp>
#include <iostream>
#include <thread>
int main() {
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REP);
socket.bind("tcp://*:5555");
while (true) {
zmq::message_t request;
socket.recv(&request);
std::cout << "收到: " << std::string(static_cast<char*>(request.data()), request.size()) << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
zmq::message_t reply(5);
memcpy(reply.data(), "World", 5);
socket.send(reply);
}
return 0;
}
#include <zmq.hpp>
#include <iostream>
int main() {
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REQ);
socket.connect("tcp://localhost:5555");
for (int i = 0; i < 5; i++) {
zmq::message_t request(5);
memcpy(request.data(), "Hello", 5);
socket.send(request);
zmq::message_t reply;
socket.recv(&reply);
std::cout << "收到: " << std::string(static_cast<char*>(reply.data()), reply.size()) << std::endl;
}
return 0;
}
#include <zmq.hpp>
#include <iostream>
#include <thread>
int main() {
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_PUB);
socket.bind("tcp://*:5556");
int counter = 0;
while (true) {
std::string message = "Update " + std::to_string(counter++);
zmq::message_t msg(message.size());
memcpy(msg.data(), message.c_str(), message.size());
socket.send(msg);
std::cout << "已发送: " << message << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return 0;
}
#include <zmq.hpp>
#include <iostream>
int main() {
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_SUB);
socket.connect("tcp://localhost:5556");
// 订阅所有消息
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
while (true) {
zmq::message_t msg;
socket.recv(&msg);
std::cout << "收到: " << std::string(static_cast<char*>(msg.data()), msg.size()) << std::endl;
}
return 0;
}
#include <zmq.hpp>
#include <iostream>
int main() {
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_PULL);
socket.connect("tcp://localhost:5557");
while (true) {
zmq::message_t msg;
socket.recv(&msg);
std::cout << "处理任务: " << std::string(static_cast<char*>(msg.data()), msg.size()) << std::endl;
}
return 0;
}
#include <zmq.hpp>
#include <iostream>
int main() {
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_PUSH);
socket.bind("tcp://*:5557");
for (int i = 0; i < 10; i++) {
std::string task = "Task " + std::to_string(i);
zmq::message_t msg(task.size());
memcpy(msg.data(), task.c_str(), task.size());
socket.send(msg);
std::cout << "发送: " << task << std::endl;
}
return 0;
}
| 模式 | 描述 | 用途 |
|---|---|---|
| REQ-REP | 请求-响应 | 远程调用服务 |
| PUB-SUB | 发布-订阅 | 消息广播 |
| PUSH-PULL | 推送-拉取 | 任务分发 |
| DEALER-ROUTER | 高级请求-响应 | 异步服务 |
| PAIR | 一对一 | 进程间通信 |