← 返回库列表

🎭 SObjectizer 使用指南

C++ Actor、发布/订阅和 CSP 框架

项目地址:https://github.com/Stiffstream/sobjectizer

官方文档:https://stiffstream.github.io/sobjectizer/

📖 什么是 SObjectizer?

SObjectizer 是一个在相当小巧的 C++ 框架中实现 Actor、发布/订阅和 CSP 模型的框架。其性能、质量和稳定性已在多年的生产环境中得到验证。

SObjectizer 允许将并发应用程序创建为一组代理对象,这些代理对象通过异步消息相互交互。它负责消息分发,并为消息处理提供工作上下文。

🚀 安装方式

从源码编译

git clone https://github.com/Stiffstream/sobjectizer.git
cd sobjectizer
mkdir build && cd build
cmake .. -DCMAKE_BUILD_TYPE=Release
cmake --build . --config Release --parallel
sudo cmake --install .

使用 vcpkg

vcpkg install sobjectizer

💡 基础使用

第一个 Actor - Hello World

#include <iostream>
#include <so_5/all.hpp>

class hello_actor final : public so_5::agent_t {
public:
    using so_5::agent_t::agent_t;

    void so_evt_start() override {
        std::cout << "Hello, World!" << std::endl;
        // 结束示例工作
        so_deregister_agent_coop_normally();
    }
};

int main() {
    // 启动 SObjectizer
    so_5::launch([](so_5::environment_t & env) {
        // 在新的 cooperation 中添加 hello_actor 实例
        env.register_agent_as_coop(env.make_agent<hello_actor>());
    });

    return 0;
}

Actor 之间发送消息

#include <so_5/all.hpp>

// 定义消息类型
struct msg_hello final : public so_5::message_t {
    std::string greeting;
    
    explicit msg_hello(std::string g) : greeting(std::move(g)) {}
};

// 接收者 Actor
class receiver_actor final : public so_5::agent_t {
public:
    using so_5::agent_t::agent_t;

    void so_define_agent() override {
        // 定义消息处理
        so_subscribe_self().event(&receiver_actor::on_hello);
    }

private:
    void on_hello(const msg_hello & msg) {
        std::cout << "收到消息: " << msg.greeting << std::endl;
        so_environment().stop();
    }
};

// 发送者 Actor
class sender_actor final : public so_5::agent_t {
public:
    sender_actor(context_t ctx, so_5::mbox_t dest)
        : so_5::agent_t(ctx)
        , dest_(std::move(dest)) {}

    void so_evt_start() override {
        // 发送消息
        so_5::send<msg_hello>(dest_, "Hello from Sender!");
    }

private:
    so_5::mbox_t dest_;
};

int main() {
    so_5::launch([](so_5::environment_t & env) {
        // 创建 cooperation
        auto coop = env.create_coop("demo");
        
        auto receiver = env.make_agent<receiver_actor>();
        auto sender = env.make_agent<sender_actor>(receiver->so_direct_mbox());
        
        env.register_agent_as_coop(std::move(coop), std::move(receiver));
    });

    return 0;
}

定时器 - Periodic Message

class time_actor final : public so_5::agent_t {
public:
    using so_5::agent_t::agent_t;

    void so_evt_start() override {
        // 每 2 秒发送一次消息给自己
        so_subscribe_self()
            .event([this](mhood_t<msg_timer>) {
                std::cout << "定时器触发!" << std::endl;
                // 继续定时
                so_5::send_delayed<msg_timer>(
                    *this,
                    std::chrono::seconds(2)
                );
            });
        
        // 启动第一个定时器
        so_5::send_delayed<msg_timer>(*this, std::chrono::seconds(2));
    }
};

发布-订阅模式

#include <so_5/all.hpp>

// 定义消息
struct msg_news final : public so_5::message_t {
    std::string headline;
    explicit msg_news(std::string h) : headline(std::move(h)) {}
};

// 订阅者 Actor
class subscriber_actor final : public so_5::agent_t {
public:
    subscriber_actor(context_t ctx, so_5::mbox_t pub_channel)
        : so_5::agent_t(ctx)
        , pub_channel_(std::move(pub_channel)) {}

    void so_define_agent() override {
        // 订阅发布通道
        so_subscribe(pub_channel_).event(&subscriber_actor::on_news);
    }

private:
    void on_news(const msg_news & msg) {
        std::cout << "订阅者: " << msg.headline << std::endl;
    }

    so_5::mbox_t pub_channel_;
};

// 发布者 Actor
class publisher_actor final : public so_5::agent_t {
public:
    publisher_actor(context_t ctx, so_5::mbox_t pub_channel)
        : so_5::agent_t(ctx)
        , pub_channel_(std::move(pub_channel)) {}

    void so_evt_start() override {
        // 发布消息
        so_5::send<msg_news>(
            pub_channel_,
            "SObjectizer 5.8 发布了!"
        );
        so_environment().stop();
    }

private:
    so_5::mbox_t pub_channel_;
};

int main() {
    so_5::launch([](so_5::environment_t & env) {
        // 创建发布通道
        auto pub_channel = env.create_mbox("news_channel");
        
        auto coop = env.create_coop("pubsub");
        
        auto publisher = env.make_agent<publisher_actor>(pub_channel);
        auto sub1 = env.make_agent<subscriber_actor>(pub_channel);
        auto sub2 = env.make_agent<subscriber_actor>(pub_channel);
        
        env.register_agent_as_coop(std::move(coop), 
                                   std::move(publisher),
                                   std::move(sub1),
                                   std::move(sub2));
    });

    return 0;
}

CSP 风格的通道

#include <so_5/all.hpp>

// 生产者
class producer_actor final : public so_5::agent_t {
public:
    producer_actor(context_t ctx, so_5::mchain_t<int> chain)
        : so_5::agent_t(ctx)
        , chain_(std::move(chain)) {}

    void so_evt_start() override {
        // 生产数据并发送到通道
        for (int i = 0; i < 10; ++i) {
            so_5::send<int>(chain_, i);
            std::cout << "生产: " << i << std::endl;
        }
    }

private:
    so_5::mchain_t<int> chain_;
};

// 消费者
class consumer_actor final : public so_5::agent_t {
public:
    consumer_actor(context_t ctx, so_5::mchain_t<int> chain)
        : so_5::agent_t(ctx)
        , chain_(std::move(chain)) {}

    void so_evt_start() override {
        // 从通道消费数据
        receive(from(chain_).handle_all([&](int value) {
            std::cout << "消费: " << value << std::endl;
        }));
        
        so_environment().stop();
    }

private:
    so_5::mchain_t<int> chain_;
};

int main() {
    so_5::launch([](so_5::environment_t & env) {
        // 创建通道(容量 10)
        auto chain = env.create_mchain<int>(
            so_5::mchain_props_t{10},
            "demo_chain"
        );
        
        auto coop = env.create_coop("csp_demo");
        auto producer = env.make_agent<producer_actor>(chain);
        auto consumer = env.make_agent<consumer_actor>(chain);
        
        env.register_agent_as_coop(std::move(coop),
                                   std::move(producer),
                                   std::move(consumer));
    });

    return 0;
}

🎯 核心概念

📚 更多资源