生产环境使用boost::fiber

这篇具有很好参考价值的文章主要介绍了生产环境使用boost::fiber。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

简介

boost::fiber是一类用户级线程,也就是纤程。其提供的例子与实际生产环境相距较远,本文将对其进行一定的改造,将其能够投入到生产环境。
同时由于纤程是具有传染性的,使用纤程的代码里也全部要用纤程封装,本文将对一些组件进行简单封装。

fiber封装

boost::fiber支持设置pthread和fiber的比例是1:n还是m:n,同时也支持设置调度方式是随机调度还是抢占调度。
本文中选择使用抢占式调度,并且是m:n的比例,这种选择适用面更加广。
既然pthread和fiber比例是m:n,那么这个m一般等于逻辑核数量,也就是需要设置fiber调度的线程控制在大小为固定的线程池中。fiber中抢占式调度方式也要求固定的线程池数量,外部前程加入时,可能会影响抢占式调度,即不能在外部线程中调用fiber,不然这个线程就加入到了fiber调度的pthread中了。

这时,需要一个设置一个队列,外部线程往这个队列中添加任务;内部线程池从队列中取任务,同时触发fiber,在fiber中可以继续触发fiber。触发队列、内部队列、工作线程、外部线程的关系如下图所示:

生产环境使用boost::fiber,存储专栏,c++,xcode,开发语言

运行逻辑被装箱到一个任务中,然后被添加到任务队列,这一步利用模板和上转型实现,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class IFiberTask {
 public:
  IFiberTask() = default;
  virtual ~IFiberTask() = default;

  IFiberTask(const IFiberTask& rhs) = delete;
  IFiberTask& operator=(const IFiberTask& rhs) = delete;
  IFiberTask(IFiberTask&& other) = default;
  IFiberTask& operator=(IFiberTask&& other) = default;

  virtual void execute() = 0;
 public:
  inline static std::atomic_size_t fibers_size {0};
};

template <typename Func>
class FiberTask: public IFiberTask {
 public:
  explicit FiberTask(Func&& func) :func_{std::move(func)} { }

  ~FiberTask() override = default;
  FiberTask(const FiberTask& rhs) = delete;
  FiberTask& operator=(const FiberTask& rhs) = delete;
  FiberTask(FiberTask&& other)  noexcept = default;
  FiberTask& operator=(FiberTask&& other)  noexcept = default;

  void execute() override {
    fibers_size.fetch_add(1);
    func_();
    fibers_size.fetch_sub(1);
  }

 private:
  Func func_;
};

IFiberTask是任务基类,不可拷贝;FiberTask是模板类,成员变量func_存储算子。使用IFiberTask类指针指向特化后的FiberTask对象,这时就实现的装箱操作,调用execute时,实际调用了子类的execute,触发封装的func_对象。

外部队列基于boost::fibers::buffered_channel实现,这是一个支持并发的队列,队列的元素类型为std::tuple<boost::fibers::launch, std::unique_ptr>,其中tuple第一元素存储任务的触发形式,进入队列还是立即触发。

接着是任务装箱,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template<typename Func, typename... Args>
auto Submit(boost::fibers::launch launch_policy, Func&& func, Args&&... args) {
  // 捕获lambda极其参数
  auto capture = [func = std::forward<Func>(func),
      args = std::make_tuple(std::forward<Args>(args)...)]() mutable {
    return std::apply(std::move(func), std::move(args));
  };

  // 任务的返回值类型
  using task_result_t = std::invoke_result_t<decltype(capture)>;
  // 该任务packaged_task的
  using packaged_task_t = boost::fibers::packaged_task<task_result_t()>;
  // 创建任务对象
  packaged_task_t task {std::move(capture)};
  // 装箱到FiberTask中
  using task_t = fiber::FiberTask<packaged_task_t>;
  // 获取packaged_task的future
  auto result_future = task.get_future();
  // 添加到buffered_channel中
  auto status = work_queue_.push(
      std::make_tuple(launch_policy, std::make_unique<task_t>(std::move(task))));

  if (status != boost::fibers::channel_op_status::success) {
    return std::optional<std::decay_t<decltype(result_future)>> {};
  }

  return std::make_optional(std::move(result_future));
}

代码中,先捕获lambda表达式及其参数,获取返回值类型并添加到packaged_task中,然后装箱到FiberTask中,使用packaged_task获取future并返回,FiberTask对象添加到队列中,使用IFiberTask的指针指向这个对象,实现装箱操作。

接着是内部任务触发的逻辑,首先创建一个线程池,每个线程注册调度器,接着从队列中获取任务,触发fiber。
工作线程的执行函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 注册调度算法为抢占式调度
boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(threads_size_, true);
// 创建task类型
auto task_tuple = typename decltype(work_queue_)::value_type {};

// 从队列中获取任务
while(boost::fibers::channel_op_status::success == work_queue_.pop(task_tuple)) {
  // 解包
  auto& [launch_policy, task_to_run] = task_tuple;
  // 触发 fiber并detach
  boost::fibers::fiber(launch_policy, [task = std::move(task_to_run)]() {
    task->execute();
  }).detach();
}

抢占式调度在注册时需要指定线程池大小,这时不能在外部线程中调用fiber,因为调用fiber的时候会把该线程添加到fiber调度的线程中,也就调整了fiber的worker线程数量。

以上代码实现了fiber触发器、任务队列、工作线程池等逻辑。
理论上可以创建多个fiber调度组件对象,每个组件根据自己的需要设置资源情况。
但实际应用中,还是建议使用一个全局调度组件,因为当A调度器中的任务依赖B调度器的任务的同时,就会出现阻塞工作线程,影响实际性能。

下面封装一个全局调度器,提供递交任务的接口和结束调度的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class DefaultPool {
 private:
  static auto* Pool() {
    const static size_t size = std::thread::hardware_concurrency();
    static fiber::FiberPool pool(size, size*8);
    return &pool;
  }

 public:
  template<typename Func, typename... Args>
  static auto SubmitJob(boost::fibers::launch launch_policy, Func &&func, Args &&... args) {
    return Pool()->Submit(launch_policy, std::forward<Func>(func), std::forward<Args>(args)...);
  }

  template<typename Func, typename... Args>
  static auto SubmitJob(Func &&func, Args &&... args) {
    return Pool()->Submit(std::forward<Func>(func), std::forward<Args>(args)...);
  }

  static void Close() {
    Pool()->CloseQueue();
  }

 private:
  DefaultPool() = default;
};

其他组件封装

上面对boost::fiber进行封装,得到一个能投入生产环境的调度器。
但是仅仅是这些是不够的,毕竟对于生产环境中的服务而言,外部服务、中间件的依赖是不能少的。
纤程是具有传染性的,对于外部组件提供的sdk,发送请求并进行同步等待会阻塞纤程对应的工作线程,影响整套机制。
为此,需要对现有的组件进行封装,对于同步接口,需要使用线程池配合fiber::promise;对于异步接口,可以改造成fiber::promise、future机制。下面介绍几种常用组件的fiber封装。

redis客户端封装

同步接口加线程池的方式将同步接口改造成异步接口的方案,存在较大的安全隐患。
线程池的容量不可控,当流量突然增加时,需要大量线程去等待,从而耗尽线程池资源,造成任务大量积压,服务崩溃。
而扩大线程池数量,又消耗了大量的资源。

综上,对于fiber化封装,还是建议采用异步接口。hiredis库支持异步接口,redis_plus_plus库对hiredis进行了c++封装,同时也提供了异步接口,本节将面向这个接口进行改造。

redis提供了挺多的接口,这里只对del、get、set三个接口做个示范:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
template<typename Type>
using Promise = boost::fibers::promise<Type>;

template<typename Type>
using Future = boost::fibers::future<Type>;

Future<long long > Del(const StringView &key) {
  auto promise = std::make_unique<Promise<long long >>();
  auto future = promise->get_future();
  // 在回调函数中对promise赋值
  redis_.del(key, [promise =promise.release()](sw::redis::Future<long long > &&fut) mutable {
    try {
      promise->set_value(fut.get());
    } catch (...) {
      promise->set_exception(std::current_exception());
    }
    delete promise;
  });
  return future;
}

Future<OptionalString> Get(const StringView &key) {
  auto promise = std::make_unique<Promise<OptionalString>>();
  auto future = promise->get_future();
  // 在回调函数中对promise赋值
  redis_.get(key, [promise = promise.release()](sw::redis::Future<OptionalString> &&fut) mutable {
    try {
      promise->set_value(fut.get());
    } catch (...) {
      promise->set_exception(std::current_exception());
    }
    delete promise;
  });
  return future;
}

Future<bool> Set(const StringView &key, const StringView &val) {
  auto promise = std::make_unique<Promise<bool>>();
  auto future = promise->get_future();
  // 在回调函数中对promise赋值
  redis_.set(key, val, [promise = promise.release()](sw::redis::Future<bool> &&fut) mutable {
    try {
      promise->set_value(fut.get());
    } catch (...) {
      promise->set_exception(std::current_exception());
    }
    delete promise;
  });
  return future;
}

注意,redis_plus_plus对每个回调函数通过模板进行判断,因此无法使用mutable+移动捕获promise,只能使用指针赋值的方式实现。redis_plus_plus在1.3.6以后的版本才有回调函数机制,之前的版本不支持。
上面原理是,创建fiber的promise和future,然后让redis的回调函数中捕获promise,并在promise中对数据进行赋值。而外部使用fiber的future进行等待,并不会阻塞工作线程。

grpc客户端封装

跟上面的redis客户端类似,这里也建议对grpc的异步客户端进行改造,支持fiber的promise、future机制。
grpc的异步客户端需要牵扯到grpc::CompletionQueue,里面实现了一套poll engine,需要绑定一个线程去进行epoll_wait操作。首先定义一个GrpcClient类,包含四个成员变量、两个成员函数,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class GrpcClient {
 public:
  explicit GrpcClient(const ClientOption& option);
  ~GrpcClient();

  // 对外提供的接口
  Future<meta::HelloResponse> Call(const meta::HelloRequest& request);

 private:
  // worker线程执行的逻辑
  void Work();

 private:
  std::unique_ptr<grpc::CompletionQueue> completion_queue_;
  std::thread worker_;
  std::shared_ptr<grpc::Channel> channel_;
  gpr_timespec timespec_{};
};

异步客户端分为三个部分逻辑,第一个是请求发送(Call函数),第二个是io线程批量处理,第三个是外部等待Future。
为了能够让io线程里给Promise进行赋值,需要Call函数中将Promise及其相关上下文传递到io线程中,这里定义一个上下文结构体:

1
2
3
4
5
6
struct CallData {
  grpc::ClientContext context;          // grpc上下文
  Promise<meta::HelloResponse> promise; // Promise对象
  grpc::Status status;                  // grpc调用状态
  meta::HelloResponse response;         // 相应包
};

Call函数中的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
// 创建上下文对象
auto data = new CallData;
// 设置超时时间
data->context.set_deadline(timespec_);
// 创建桩
meta::HelloService::Stub stub(channel_);
auto future = data->promise.get_future();
// 异步调用,添加到完成队列中
auto rpc = stub.Asynchello(&data->context, request, completion_queue_.get());
// 绑定response、status,并将上下文对象作为tag传下去
rpc->Finish(&data->response, &data->status, reinterpret_cast<void*>(data));
return future;

data对象在该函数中创建,在Work函数中释放,不存在内存泄漏问题。
grpc的异步稍微有点麻烦,发送之后,还要绑定数据。
接着是Work线程中的逻辑了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CallData* data = nullptr;
bool ok = false;
// 获取状态完毕的数据
while (completion_queue_->Next((void**)&data, &ok)) {
  // 判断队列是否已经结束
  if (!ok) {
    break;
  }
  // 如果grpc状态ok,则赋值
  if (data->status.ok()) {
    data->promise.set_value(std::move(data->response));
  } else {
    // 否则设置异常
    data->promise.set_exception(std::make_exception_ptr(
        std::runtime_error(data->status.error_message())));
  }
  // 删除数据
  delete data;
  data = nullptr;
}

调用完成队列的Next函数会阻塞,如果队列中存在状态达到最终状态的数据,则返回一条。从完成对于中取到的数据的顺序与入队顺序不同。

上面两个函数组合实现了Future获取和Promise赋值的操作,使得grpc客户端能在fiber中使用。文章来源地址https://www.toymoban.com/news/detail-734141.html

参考

  • fiberpool代码
  • 生产环境使用fiber
  • grpc异步客户端
  • hiredis
  • 生产环境使用boost::fiber

到了这里,关于生产环境使用boost::fiber的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • springboot--多环境配置快速切换开发、测试、生产环境

    springboot--多环境配置快速切换开发、测试、生产环境

    环境隔离能力,快速切换开发、测试、生产环境 步骤: 1、标识环境:指定那些组件、配置在那个生效 2、切换环境:这个环境对应的所有组件和配置就应该生效 区分出几个环境:dev(开发环境)、test(测试i环境)、prod(生产环境)、default(默认环境) 指定每个组件在那个环境

    2024年02月06日
    浏览(10)
  • 生产环境部署与协同开发 Git

    生产环境部署与协同开发 Git

    目录 一、前言——Git概述 1.1 Git是什么  1.2 为什么要使用Git  什么是版本控制系统 1.3 Git和SVN对比 SVN集中式 Git分布式 1.4 Git工作流程  四个工作区域 工作流程  1.5 Git下载安装 1.6 环境配置  设置用户信息 查看配置信息 二、git基础 2.1 本地初始化仓库 ​编辑 2.2 文件的两种

    2024年02月06日
    浏览(12)
  • 苹果开发初学者指南:Xcode 如何为运行的 App 添加环境变量(Environmental Variable)

    苹果开发初学者指南:Xcode 如何为运行的 App 添加环境变量(Environmental Variable)

    Xcode 15 在运行 SwiftUI 代码时突然报告如下警告: Error: this application, or a library it uses, has passed an invalid numeric value (NaN, or not-a-number) to CoreGraphics API and this value is being ignored. Please fix this problem. 不仅如此,Xcode 调试控制台中还提示我们需要添加特定的环境变量以进一步与该错误“

    2024年04月24日
    浏览(8)
  • C++_开发_Boost开源库_介绍_使用

    Boost库 是一个功能强大 , 构造精良 , 跨越平台 , 代码开源 , 完全免费的 C ++ 开源程序库。它使得C++编程更优雅、更有活力、更高产,C++11的标准有三分之二来自boost库。在boost1.57版本时,就一共包含了129个组件,分为25个大类,涵盖了文本处理,容器,迭代器,算法,图像处理

    2024年02月17日
    浏览(7)
  • 微信小程序获取环境变量,对生产、测试、开发环境做区分

    微信小程序获取环境变量,对生产、测试、开发环境做区分

    前不久偶然发现微信里有一个变量叫做  __wxConfig ,解决了这个问题,但是微信真的坑,你甚至在官方搜不到这个变量 = =,今天和大家分享一下 经过测试得到 envVersion 的具体键值有: develop(开发版)trial(体验版)release(正式版)   获取开发状态,判断获取请求url

    2024年02月12日
    浏览(7)
  • 做法一: vue-cli(webpack)配置开发环境、测试环境、生产环境

    做法一: vue-cli(webpack)配置开发环境、测试环境、生产环境

            由于开发环境、测试环境、生产环境三者是放在不同的服务器导致请求的接口URL地址不同,所有需要配置根据不同的环境使用不同的服务器地址。 请先简单阅读一下官方文档,了解一下概念 1、根目录创建 .env.development 、 .env.test 、 .env.production 文件(开发、测试、生

    2024年02月07日
    浏览(48)
  • JAVA开发与运维(web生产环境部署)

    JAVA开发与运维(web生产环境部署)

    web生产环境部署,往往是分布式,和开发环境或者测试环境我们一般使用单机不同。 一、部署内容 1、后端服务 2、后台管理系统vue 3、小程序 二、所需要服务器 5台前端服务器  8台后端服务 三、所需要的第三方组件 redis mysql clb OSS CDN WAF RocketMQ redis用来缓存应用的数据 mysq

    2024年02月04日
    浏览(12)
  • 获取小程序生产、开发、体验等环境、版本信息、appid等信息

    获取小程序生产、开发、体验等环境、版本信息、appid等信息

    可以把环境设置单独写的一个js里方便接口统一控制环境,完整代码如下 1、判断环境 env.js(该例子是在utils目录下创建的env.js,如果没有先创建个utils目录,也可以在其他目录创建env.js)   2、app.js里引用env以及使用 这样其他页面在写接口时就可以直接这一个地方切换环境就

    2024年02月17日
    浏览(9)
  • vue项目(vue-cli)配置环境变量和打包时区分开发、测试、生产环境

    在自定义配置Vue-cli 的过程中,想分别通过.env.development .env.test .env.production 来代表开发、测试、生产环境。 本来想使用上面三种配置来区分三个环境,但是发现使用test来打包后在测试环境会报错,报错信息: Uncaught ReferenceError: exports is not defined 本来以为真的是程序出现什么

    2023年04月08日
    浏览(114)
  • 虚拟现实开发在工厂生产环境模拟与培训中的应用

    虚拟现实开发在工厂生产环境模拟与培训中的应用

    hello老铁们...本人熟悉html5,vue对bootsrap,uniapp,layui,element,vite,antd,echarts,jq响应式尤其擅长,ui设计等技能,如果ui前端工作中有遇到烦恼可私信关注评论我们共同交流进步!谢谢 随着科技的飞速发展,虚拟现实(VR)技术已经成为当今世界的热门话题。虚拟现实技术为

    2024年03月26日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包