Cpp-TaskFlow 是一个现代 C++ 并发编程库,用于简化并行任务的定义和执行。它提供声明式的任务图定义、高效的并行执行、丰富的任务依赖控制等功能。
vcpkg install taskflow
conan install taskflow/3.6.0@
git clone https://github.com/cpp-taskflow/cpp-taskflow.git
cd cpp-taskflow
mkdir build && cd build
cmake ..
cmake --build . --parallel
#include <taskflow/taskflow.hpp> // 仅需一个头文件
#include <taskflow/taskflow.hpp>
#include <iostream>
int main() {
// 创建任务流
tf::Taskflow taskflow;
// 创建一个简单任务
auto [A, B, C] = taskflow.emplace(
[] () { std::cout << "Task A\n"; },
[] () { std::cout << "Task B\n"; },
[] () { std::cout << "Task C\n"; }
);
// 设置任务依赖:A -> B -> C
A.precede(B); // A 在 B 之前执行
B.precede(C); // B 在 C 之前执行
// 创建执行器并运行任务流
tf::Executor executor;
executor.run(taskflow).wait();
return 0;
}
#include <taskflow/taskflow.hpp>
#include <iostream>
#include <thread>
int main() {
tf::Taskflow taskflow;
tf::Executor executor(4); // 使用 4 个工作线程
// 创建三个并行任务
auto [A, B, C] = taskflow.emplace(
[] () {
std::cout << "Task A on thread " << std::this_thread::get_id() << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
},
[] () {
std::cout << "Task B on thread " << std::this_thread::get_id() << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
},
[] () {
std::cout << "Task C on thread " << std::this_thread::get_id() << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
);
// 执行并等待完成
executor.run(taskflow).wait();
return 0;
}
auto [A, B, C, D] = taskflow.emplace(
[] () { std::cout << "A\n"; },
[] () { std::cout << "B\n"; },
[] () { std::cout << "C\n"; },
[] () { std::cout << "D\n"; }
);
// A -> B -> C -> D
A.precede(B).precede(C).precede(D);
auto [A, B, C, D] = taskflow.emplace(
[] () { std::cout << "A\n"; },
[] () { std::cout << "B\n"; },
[] () { std::cout << "C\n"; },
[] () { std::cout << "D\n"; }
);
// A -> B, A -> C, B, C -> D
A.precede(B, C); // A 完成后 B 和 C 可以并行执行
B.precede(D);
C.precede(D);
tf::Taskflow taskflow;
auto [A, B, C, D, E, F] = taskflow.emplace(
[] () { std::cout << "A\n"; },
[] () { std::cout << "B\n"; },
[] () { std::cout << "C\n"; },
[] () { std::cout << "D\n"; },
[] () { std::cout << "E\n"; },
[] () { std::cout << "F\n"; }
);
// A
// / \
// B C
// / \ / \
// D E F
A.precede(B, C);
B.precede(D, E);
C.precede(E, F);
executor.run(taskflow).wait();
#include <taskflow/taskflow.hpp>
#include <iostream>
#include <random>
int main() {
tf::Taskflow taskflow;
tf::Executor executor;
// 创建任务
auto init = taskflow.emplace([] () { std::cout << "Initialize\n"; });
auto condition = taskflow.emplace(
[] () {
// 返回 0 或 1,决定走哪个分支
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, 1);
int result = dis(gen);
std::cout << "Condition: " << result << "\n";
return result;
}
);
auto taskA = taskflow.emplace([] () { std::cout << "Task A\n"; });
auto taskB = taskflow.emplace([] () { std::cout << "Task B\n"; });
auto finish = taskflow.emplace([] () { std::cout << "Finish\n"; });
// 设置条件依赖
init.precede(condition);
condition.precede(taskA, taskB);
condition.succeed(taskA, taskB);
taskA.precede(finish);
taskB.precede(finish);
executor.run(taskflow).wait();
return 0;
}
tf::Executor executor;
// 异步运行任务
auto future = executor.async([] () {
std::cout << "Async task running\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Async task completed\n";
return 42;
});
// 等待结果
int result = future.get();
std::cout << "Result: " << result << "\n";
#include <taskflow/taskflow.hpp>
#include <iostream>
int main() {
tf::Taskflow taskflow;
tf::Executor executor;
int counter = 0;
auto [init, cond, body, done] = taskflow.emplace(
[&counter] () {
counter = 0;
std::cout << "Init: counter = " << counter << "\n";
},
[&counter] () {
// 条件函数:返回 true 继续循环,false 退出
return counter < 3;
},
[&counter] () {
counter++;
std::cout << "Body: counter = " << counter << "\n";
},
[] () {
std::cout << "Done\n";
}
);
// 构建循环结构
init.precede(cond);
cond.precede(body, done);
body.precede(cond);
executor.run(taskflow).wait();
return 0;
}
#include <taskflow/taskflow.hpp>
#include <iostream>
#include <vector>
int main() {
tf::Taskflow taskflow;
tf::Executor executor;
std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8};
// 并行处理每个元素
taskflow.for_each(data.begin(), data.end(), [] (int i) {
std::cout << "Processing " << i
<< " on thread " << std::this_thread::get_id() << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});
executor.run(taskflow).wait();
return 0;
}
#include <taskflow/taskflow.hpp>
#include <vector>
#include <iostream>
int main() {
tf::Taskflow taskflow;
tf::Executor executor;
std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
int sum = 0;
// 并行求和
taskflow.reduce(data.begin(), data.end(), sum,
[] (int& s, int item) {
s += item;
},
std::plus<<int>() // 合并子结果
);
executor.run(taskflow).wait();
std::cout << "Sum: " << sum << "\n"; // 输出: Sum: 55
return 0;
}
#include <taskflow/taskflow.hpp>
#include <iostream>
int main() {
tf::Executor executor;
// 创建子任务流 1
tf::Taskflow taskflow1("Flow1");
auto [A1, B1, C1] = taskflow1.emplace(
[] () { std::cout << "Flow1: A\n"; },
[] () { std::cout << "Flow1: B\n"; },
[] () { std::cout << "Flow1: C\n"; }
);
A1.precede(B1).precede(C1);
// 创建子任务流 2
tf::Taskflow taskflow2("Flow2");
auto [A2, B2, C2] = taskflow2.emplace(
[] () { std::cout << "Flow2: A\n"; },
[] () { std::cout << "Flow2: B\n"; },
[] () { std::cout << "Flow2: C\n"; }
);
A2.precede(B2).precede(C2);
// 创建主任务流并组合
tf::Taskflow main_taskflow;
auto init = main_taskflow.emplace([] () {
std::cout << "Main: Init\n";
});
auto module1 = main_taskflow.composed_of(taskflow1);
auto module2 = main_taskflow.composed_of(taskflow2);
auto done = main_taskflow.emplace([] () {
std::cout << "Main: Done\n";
});
init.precede(module1);
module1.precede(module2);
module2.precede(done);
executor.run(main_taskflow).wait();
return 0;
}
#include <taskflow/taskflow.hpp>
#include <vector>
#include <iostream>
struct Image {
int id;
std::string data;
};
std::vector<Image> load_images() {
std::cout << "Loading images...\n";
return {{1, "data1"}, {2, "data2"}, {3, "data3"}};
}
void preprocess(Image& img) {
std::cout << "Preprocessing image " << img.id << "\n";
img.data = "pre_" + img.data;
}
void process(Image& img) {
std::cout << "Processing image " << img.id << "\n";
img.data = "proc_" + img.data;
}
void save(const Image& img) {
std::cout << "Saving image " << img.id << ": " << img.data << "\n";
}
int main() {
tf::Taskflow taskflow;
tf::Executor executor;
// 加载图像
auto load = taskflow.emplace([] () {
return load_images();
}).name("load");
// 预处理(并行)
auto preprocess = taskflow.emplace([&] () {
auto images = loadImages;
for (auto& img : images) {
preprocess(img);
}
}).name("preprocess");
// 处理(并行)
auto process_stage = taskflow.emplace([&] () {
auto images = loadImages;
for (auto& img : images) {
process(img);
}
}).name("process");
// 保存(并行)
auto save_stage = taskflow.emplace([&] () {
auto images = loadImages;
for (const auto& img : images) {
save(img);
}
}).name("save");
// 设置依赖
load.precede(preprocess);
preprocess.precede(process_stage);
process_stage.precede(save_stage);
executor.run(taskflow).wait();
return 0;
}
#include <taskflow/taskflow.hpp>
#include <vector>
#include <algorithm>
#include <random>
#include <iostream>
int main() {
tf::Taskflow taskflow;
tf::Executor executor;
// 生成大量数据
std::vector<int> data(1000000);
std::generate(data.begin(), data.end(), [] () { return rand() % 100; });
int min_val = INT_MAX;
int max_val = INT_MIN;
long long sum = 0;
// 并行查找最小值
taskflow.min_element(data.begin(), data.end(), min_val);
// 并行查找最大值
taskflow.max_element(data.begin(), data.end(), max_val);
// 并行求和
taskflow.reduce(data.begin(), data.end(), sum,
[] (long long& s, int v) { s += v; },
std::plus<<long long>()
);
// 并行排序
taskflow.sort(data.begin(), data.end(), std::less<int>());
executor.run(taskflow).wait();
std::cout << "Min: " << min_val << "\n";
std::cout << "Max: " << max_val << "\n";
std::cout << "Sum: " << sum << "\n";
std::cout << "First 10 sorted: ";
for (int i = 0; i < 10; i++) {
std::cout << data[i] << " ";
}
std::cout << "\n";
return 0;
}
| API | 说明 |
|---|---|
tf::Taskflow |
任务流对象 |
tf::Executor |
执行器对象,负责执行任务流 |
taskflow.emplace(funcs...) |
创建一个或多个任务 |
task.precede(tasks...) |
设置任务在指定任务之前执行 |
task.succeed(tasks...) |
设置任务在指定任务之后执行 |
executor.run(taskflow) |
执行任务流(异步) |
executor.run(taskflow).wait() |
执行任务流并等待完成 |
executor.async(func) |
异步执行单个任务 |
taskflow.for_each |
并行for循环 |
taskflow.reduce |
并行reduce操作 |