C++ Actor、发布/订阅和 CSP 框架
SObjectizer 是一个在相当小巧的 C++ 框架中实现 Actor、发布/订阅和 CSP 模型的框架。其性能、质量和稳定性已在多年的生产环境中得到验证。
SObjectizer 允许将并发应用程序创建为一组代理对象,这些代理对象通过异步消息相互交互。它负责消息分发,并为消息处理提供工作上下文。
git clone https://github.com/Stiffstream/sobjectizer.git
cd sobjectizer
mkdir build && cd build
cmake .. -DCMAKE_BUILD_TYPE=Release
cmake --build . --config Release --parallel
sudo cmake --install .
vcpkg install sobjectizer
#include <iostream>
#include <so_5/all.hpp>
class hello_actor final : public so_5::agent_t {
public:
using so_5::agent_t::agent_t;
void so_evt_start() override {
std::cout << "Hello, World!" << std::endl;
// 结束示例工作
so_deregister_agent_coop_normally();
}
};
int main() {
// 启动 SObjectizer
so_5::launch([](so_5::environment_t & env) {
// 在新的 cooperation 中添加 hello_actor 实例
env.register_agent_as_coop(env.make_agent<hello_actor>());
});
return 0;
}
#include <so_5/all.hpp>
// 定义消息类型
struct msg_hello final : public so_5::message_t {
std::string greeting;
explicit msg_hello(std::string g) : greeting(std::move(g)) {}
};
// 接收者 Actor
class receiver_actor final : public so_5::agent_t {
public:
using so_5::agent_t::agent_t;
void so_define_agent() override {
// 定义消息处理
so_subscribe_self().event(&receiver_actor::on_hello);
}
private:
void on_hello(const msg_hello & msg) {
std::cout << "收到消息: " << msg.greeting << std::endl;
so_environment().stop();
}
};
// 发送者 Actor
class sender_actor final : public so_5::agent_t {
public:
sender_actor(context_t ctx, so_5::mbox_t dest)
: so_5::agent_t(ctx)
, dest_(std::move(dest)) {}
void so_evt_start() override {
// 发送消息
so_5::send<msg_hello>(dest_, "Hello from Sender!");
}
private:
so_5::mbox_t dest_;
};
int main() {
so_5::launch([](so_5::environment_t & env) {
// 创建 cooperation
auto coop = env.create_coop("demo");
auto receiver = env.make_agent<receiver_actor>();
auto sender = env.make_agent<sender_actor>(receiver->so_direct_mbox());
env.register_agent_as_coop(std::move(coop), std::move(receiver));
});
return 0;
}
class time_actor final : public so_5::agent_t {
public:
using so_5::agent_t::agent_t;
void so_evt_start() override {
// 每 2 秒发送一次消息给自己
so_subscribe_self()
.event([this](mhood_t<msg_timer>) {
std::cout << "定时器触发!" << std::endl;
// 继续定时
so_5::send_delayed<msg_timer>(
*this,
std::chrono::seconds(2)
);
});
// 启动第一个定时器
so_5::send_delayed<msg_timer>(*this, std::chrono::seconds(2));
}
};
#include <so_5/all.hpp>
// 定义消息
struct msg_news final : public so_5::message_t {
std::string headline;
explicit msg_news(std::string h) : headline(std::move(h)) {}
};
// 订阅者 Actor
class subscriber_actor final : public so_5::agent_t {
public:
subscriber_actor(context_t ctx, so_5::mbox_t pub_channel)
: so_5::agent_t(ctx)
, pub_channel_(std::move(pub_channel)) {}
void so_define_agent() override {
// 订阅发布通道
so_subscribe(pub_channel_).event(&subscriber_actor::on_news);
}
private:
void on_news(const msg_news & msg) {
std::cout << "订阅者: " << msg.headline << std::endl;
}
so_5::mbox_t pub_channel_;
};
// 发布者 Actor
class publisher_actor final : public so_5::agent_t {
public:
publisher_actor(context_t ctx, so_5::mbox_t pub_channel)
: so_5::agent_t(ctx)
, pub_channel_(std::move(pub_channel)) {}
void so_evt_start() override {
// 发布消息
so_5::send<msg_news>(
pub_channel_,
"SObjectizer 5.8 发布了!"
);
so_environment().stop();
}
private:
so_5::mbox_t pub_channel_;
};
int main() {
so_5::launch([](so_5::environment_t & env) {
// 创建发布通道
auto pub_channel = env.create_mbox("news_channel");
auto coop = env.create_coop("pubsub");
auto publisher = env.make_agent<publisher_actor>(pub_channel);
auto sub1 = env.make_agent<subscriber_actor>(pub_channel);
auto sub2 = env.make_agent<subscriber_actor>(pub_channel);
env.register_agent_as_coop(std::move(coop),
std::move(publisher),
std::move(sub1),
std::move(sub2));
});
return 0;
}
#include <so_5/all.hpp>
// 生产者
class producer_actor final : public so_5::agent_t {
public:
producer_actor(context_t ctx, so_5::mchain_t<int> chain)
: so_5::agent_t(ctx)
, chain_(std::move(chain)) {}
void so_evt_start() override {
// 生产数据并发送到通道
for (int i = 0; i < 10; ++i) {
so_5::send<int>(chain_, i);
std::cout << "生产: " << i << std::endl;
}
}
private:
so_5::mchain_t<int> chain_;
};
// 消费者
class consumer_actor final : public so_5::agent_t {
public:
consumer_actor(context_t ctx, so_5::mchain_t<int> chain)
: so_5::agent_t(ctx)
, chain_(std::move(chain)) {}
void so_evt_start() override {
// 从通道消费数据
receive(from(chain_).handle_all([&](int value) {
std::cout << "消费: " << value << std::endl;
}));
so_environment().stop();
}
private:
so_5::mchain_t<int> chain_;
};
int main() {
so_5::launch([](so_5::environment_t & env) {
// 创建通道(容量 10)
auto chain = env.create_mchain<int>(
so_5::mchain_props_t{10},
"demo_chain"
);
auto coop = env.create_coop("csp_demo");
auto producer = env.make_agent<producer_actor>(chain);
auto consumer = env.make_agent<consumer_actor>(chain);
env.register_agent_as_coop(std::move(coop),
std::move(producer),
std::move(consumer));
});
return 0;
}