← 返回主页

Cpp-TaskFlow 使用指南

Cpp-TaskFlow 是一个现代 C++ 并发编程库,用于简化并行任务的定义和执行。它提供声明式的任务图定义、高效的并行执行、丰富的任务依赖控制等功能。

📦 安装

通过 vcpkg 安装

vcpkg install taskflow

通过 Conan 安装

conan install taskflow/3.6.0@

从源码编译

git clone https://github.com/cpp-taskflow/cpp-taskflow.git
cd cpp-taskflow
mkdir build && cd build
cmake ..
cmake --build . --parallel

头文件方式使用

#include <taskflow/taskflow.hpp>  // 仅需一个头文件

🚀 基础使用

Hello World 示例

#include <taskflow/taskflow.hpp>
#include <iostream>

int main() {
    // 创建任务流
    tf::Taskflow taskflow;

    // 创建一个简单任务
    auto [A, B, C] = taskflow.emplace(
        [] () { std::cout << "Task A\n"; },
        [] () { std::cout << "Task B\n"; },
        [] () { std::cout << "Task C\n"; }
    );

    // 设置任务依赖:A -> B -> C
    A.precede(B);  // A 在 B 之前执行
    B.precede(C);  // B 在 C 之前执行

    // 创建执行器并运行任务流
    tf::Executor executor;
    executor.run(taskflow).wait();

    return 0;
}

并行执行任务

#include <taskflow/taskflow.hpp>
#include <iostream>
#include <thread>

int main() {
    tf::Taskflow taskflow;
    tf::Executor executor(4);  // 使用 4 个工作线程

    // 创建三个并行任务
    auto [A, B, C] = taskflow.emplace(
        [] () { 
            std::cout << "Task A on thread " << std::this_thread::get_id() << "\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        },
        [] () { 
            std::cout << "Task B on thread " << std::this_thread::get_id() << "\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        },
        [] () { 
            std::cout << "Task C on thread " << std::this_thread::get_id() << "\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    );

    // 执行并等待完成
    executor.run(taskflow).wait();

    return 0;
}

🔗 任务依赖

线性依赖链

auto [A, B, C, D] = taskflow.emplace(
    [] () { std::cout << "A\n"; },
    [] () { std::cout << "B\n"; },
    [] () { std::cout << "C\n"; },
    [] () { std::cout << "D\n"; }
);

// A -> B -> C -> D
A.precede(B).precede(C).precede(D);

分支依赖

auto [A, B, C, D] = taskflow.emplace(
    [] () { std::cout << "A\n"; },
    [] () { std::cout << "B\n"; },
    [] () { std::cout << "C\n"; },
    [] () { std::cout << "D\n"; }
);

// A -> B, A -> C, B, C -> D
A.precede(B, C);  // A 完成后 B 和 C 可以并行执行
B.precede(D);
C.precede(D);

复杂任务图

tf::Taskflow taskflow;

auto [A, B, C, D, E, F] = taskflow.emplace(
    [] () { std::cout << "A\n"; },
    [] () { std::cout << "B\n"; },
    [] () { std::cout << "C\n"; },
    [] () { std::cout << "D\n"; },
    [] () { std::cout << "E\n"; },
    [] () { std::cout << "F\n"; }
);

//     A
//    / \
//   B   C
//  / \ / \
// D   E   F
A.precede(B, C);
B.precede(D, E);
C.precede(E, F);

executor.run(taskflow).wait();

📊 高级功能

条件任务

#include <taskflow/taskflow.hpp>
#include <iostream>
#include <random>

int main() {
    tf::Taskflow taskflow;
    tf::Executor executor;

    // 创建任务
    auto init = taskflow.emplace([] () { std::cout << "Initialize\n"; });
    auto condition = taskflow.emplace(
        [] () {
            // 返回 0 或 1,决定走哪个分支
            std::random_device rd;
            std::mt19937 gen(rd());
            std::uniform_int_distribution<> dis(0, 1);
            int result = dis(gen);
            std::cout << "Condition: " << result << "\n";
            return result;
        }
    );
    auto taskA = taskflow.emplace([] () { std::cout << "Task A\n"; });
    auto taskB = taskflow.emplace([] () { std::cout << "Task B\n"; });
    auto finish = taskflow.emplace([] () { std::cout << "Finish\n"; });

    // 设置条件依赖
    init.precede(condition);
    condition.precede(taskA, taskB);
    condition.succeed(taskA, taskB);
    taskA.precede(finish);
    taskB.precede(finish);

    executor.run(taskflow).wait();

    return 0;
}

异步任务

tf::Executor executor;

// 异步运行任务
auto future = executor.async([] () {
    std::cout << "Async task running\n";
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Async task completed\n";
    return 42;
});

// 等待结果
int result = future.get();
std::cout << "Result: " << result << "\n";

循环任务

#include <taskflow/taskflow.hpp>
#include <iostream>

int main() {
    tf::Taskflow taskflow;
    tf::Executor executor;

    int counter = 0;

    auto [init, cond, body, done] = taskflow.emplace(
        [&counter] () { 
            counter = 0;
            std::cout << "Init: counter = " << counter << "\n"; 
        },
        [&counter] () {
            // 条件函数:返回 true 继续循环,false 退出
            return counter < 3;
        },
        [&counter] () {
            counter++;
            std::cout << "Body: counter = " << counter << "\n";
        },
        [] () { 
            std::cout << "Done\n"; 
        }
    );

    // 构建循环结构
    init.precede(cond);
    cond.precede(body, done);
    body.precede(cond);

    executor.run(taskflow).wait();

    return 0;
}

并行for循环

#include <taskflow/taskflow.hpp>
#include <iostream>
#include <vector>

int main() {
    tf::Taskflow taskflow;
    tf::Executor executor;

    std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8};

    // 并行处理每个元素
    taskflow.for_each(data.begin(), data.end(), [] (int i) {
        std::cout << "Processing " << i 
                  << " on thread " << std::this_thread::get_id() << "\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    });

    executor.run(taskflow).wait();

    return 0;
}

并行reduce操作

#include <taskflow/taskflow.hpp>
#include <vector>
#include <iostream>

int main() {
    tf::Taskflow taskflow;
    tf::Executor executor;

    std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    int sum = 0;

    // 并行求和
    taskflow.reduce(data.begin(), data.end(), sum,
        [] (int& s, int item) {
            s += item;
        },
        std::plus<<int>()  // 合并子结果
    );

    executor.run(taskflow).wait();

    std::cout << "Sum: " << sum << "\n";  // 输出: Sum: 55

    return 0;
}

🔧 子任务流

组合多个任务流

#include <taskflow/taskflow.hpp>
#include <iostream>

int main() {
    tf::Executor executor;

    // 创建子任务流 1
    tf::Taskflow taskflow1("Flow1");
    auto [A1, B1, C1] = taskflow1.emplace(
        [] () { std::cout << "Flow1: A\n"; },
        [] () { std::cout << "Flow1: B\n"; },
        [] () { std::cout << "Flow1: C\n"; }
    );
    A1.precede(B1).precede(C1);

    // 创建子任务流 2
    tf::Taskflow taskflow2("Flow2");
    auto [A2, B2, C2] = taskflow2.emplace(
        [] () { std::cout << "Flow2: A\n"; },
        [] () { std::cout << "Flow2: B\n"; },
        [] () { std::cout << "Flow2: C\n"; }
    );
    A2.precede(B2).precede(C2);

    // 创建主任务流并组合
    tf::Taskflow main_taskflow;
    auto init = main_taskflow.emplace([] () { 
        std::cout << "Main: Init\n"; 
    });
    auto module1 = main_taskflow.composed_of(taskflow1);
    auto module2 = main_taskflow.composed_of(taskflow2);
    auto done = main_taskflow.emplace([] () { 
        std::cout << "Main: Done\n"; 
    });

    init.precede(module1);
    module1.precede(module2);
    module2.precede(done);

    executor.run(main_taskflow).wait();

    return 0;
}

💼 实际应用示例

图像处理流水线

#include <taskflow/taskflow.hpp>
#include <vector>
#include <iostream>

struct Image {
    int id;
    std::string data;
};

std::vector<Image> load_images() {
    std::cout << "Loading images...\n";
    return {{1, "data1"}, {2, "data2"}, {3, "data3"}};
}

void preprocess(Image& img) {
    std::cout << "Preprocessing image " << img.id << "\n";
    img.data = "pre_" + img.data;
}

void process(Image& img) {
    std::cout << "Processing image " << img.id << "\n";
    img.data = "proc_" + img.data;
}

void save(const Image& img) {
    std::cout << "Saving image " << img.id << ": " << img.data << "\n";
}

int main() {
    tf::Taskflow taskflow;
    tf::Executor executor;

    // 加载图像
    auto load = taskflow.emplace([] () {
        return load_images();
    }).name("load");

    // 预处理(并行)
    auto preprocess = taskflow.emplace([&] () {
        auto images = loadImages;
        for (auto& img : images) {
            preprocess(img);
        }
    }).name("preprocess");

    // 处理(并行)
    auto process_stage = taskflow.emplace([&] () {
        auto images = loadImages;
        for (auto& img : images) {
            process(img);
        }
    }).name("process");

    // 保存(并行)
    auto save_stage = taskflow.emplace([&] () {
        auto images = loadImages;
        for (const auto& img : images) {
            save(img);
        }
    }).name("save");

    // 设置依赖
    load.precede(preprocess);
    preprocess.precede(process_stage);
    process_stage.precede(save_stage);

    executor.run(taskflow).wait();

    return 0;
}

数据并行计算

#include <taskflow/taskflow.hpp>
#include <vector>
#include <algorithm>
#include <random>
#include <iostream>

int main() {
    tf::Taskflow taskflow;
    tf::Executor executor;

    // 生成大量数据
    std::vector<int> data(1000000);
    std::generate(data.begin(), data.end(), [] () { return rand() % 100; });

    int min_val = INT_MAX;
    int max_val = INT_MIN;
    long long sum = 0;

    // 并行查找最小值
    taskflow.min_element(data.begin(), data.end(), min_val);

    // 并行查找最大值
    taskflow.max_element(data.begin(), data.end(), max_val);

    // 并行求和
    taskflow.reduce(data.begin(), data.end(), sum,
        [] (long long& s, int v) { s += v; },
        std::plus<<long long>()
    );

    // 并行排序
    taskflow.sort(data.begin(), data.end(), std::less<int>());

    executor.run(taskflow).wait();

    std::cout << "Min: " << min_val << "\n";
    std::cout << "Max: " << max_val << "\n";
    std::cout << "Sum: " << sum << "\n";
    std::cout << "First 10 sorted: ";
    for (int i = 0; i < 10; i++) {
        std::cout << data[i] << " ";
    }
    std::cout << "\n";

    return 0;
}

📋 常用API速查

API 说明
tf::Taskflow 任务流对象
tf::Executor 执行器对象,负责执行任务流
taskflow.emplace(funcs...) 创建一个或多个任务
task.precede(tasks...) 设置任务在指定任务之前执行
task.succeed(tasks...) 设置任务在指定任务之后执行
executor.run(taskflow) 执行任务流(异步)
executor.run(taskflow).wait() 执行任务流并等待完成
executor.async(func) 异步执行单个任务
taskflow.for_each 并行for循环
taskflow.reduce 并行reduce操作
💡 提示: Cpp-TaskFlow 采用 任务图 模型,任务以图结构组织,节点表示任务,边表示依赖关系。执行器根据任务图自动调度并行执行。
⚠️ 注意:

🔗 参考资料

← 返回主页