読者です 読者をやめる 読者になる 読者になる

Boost.Context 非同期処理を逐次的にする

C++

継続が使えるようになることでうれしいのは、やはり非同期処理のコールバック地獄の解消です。
非同期処理は、処理が終わった時に呼ばれるコールバック関数を登録しますが、連続的な非同期処理をしようと思ったら非常に多くの関数を行ったり来たりして書かないといけません。ラムダ式を使うとネストの深い関数をどんどん定義していくことになるので、状況はさらに悪化します。


継続を使うとこれを解消することができます。
以下のプログラムは、Boost.ContextのサンプルにあるcontinuationクラスをBoost.Asioと組み合わせて非同期処理をその場で書き下しています。

#include <iostream>
#include <boost/asio.hpp>
#include "continuation.hpp"

namespace asio = boost::asio;

class Client {
    asio::io_service& io_service_;
    asio::ip::tcp::socket socket_;

    continuation cont_;
    boost::system::error_code ec_;

public:
    Client(asio::io_service& io_service)
        : io_service_(io_service),
          socket_(io_service)
    {
        cont_ = continuation(boost::bind(&Client::do_, this));
    }

    void start()
    {
        ec_ = boost::system::error_code();
        cont_.resume();
    }

private:
    void do_()
    {
        namespace ip = asio::ip;
        socket_.async_connect(
            ip::tcp::endpoint(ip::address::from_string("127.0.0.1"), 31400),
            boost::bind(&Client::completion_handler, this, asio::placeholders::error));
        cont_.suspend();

        while (!ec_) {
            const std::string req = "ping\n";
            asio::async_write(socket_, asio::buffer(req),
                boost::bind(&Client::completion_handler, this, asio::placeholders::error));
            cont_.suspend();

            if (ec_) break;

            asio::streambuf rep;
            asio::async_read_until(socket_, rep, '\n',
                boost::bind(&Client::completion_handler, this, asio::placeholders::error));
            cont_.suspend();

            if (ec_) break;

            std::cout << asio::buffer_cast<const char*>(rep.data());
        }

        std::cout << ec_.message() << std::endl;
    }

    void completion_handler(const boost::system::error_code& ec)
    {
        ec_ = ec;
        cont_.resume();
    }
};

int main()
{
    asio::io_service io_service;
    Client client(io_service);

    client.start();

    io_service.run();
}

流れとしては以下のようになります:

  1. do_関数内でasync_xxx関数で非同期処理を実行したらsuspendを実行してその場で待機
  2. 非同期処理が完了したらcompletion_handler関数が呼ばれ、suspend状態を解除して処理を復帰させる
  3. do_関数のsuspendしたところに戻って処理を続行
  4. 1.に戻る

非同期処理の完了ハンドラは、エラー状態の更新とスタックの復帰のみを行います。
非同期処理を実行する側は、非同期処理を開始したら完了するまでブロッキングするのではなく、処理を中断して一旦処理を抜けます。これで処理が固まることはありません。完了ハンドラ内でresumeが呼ばれたら中断したところから再び処理が開始されます。これで、ブロッキングする同期処理を行うのと大差ないコードで非同期処理を行うことができるようになりました。


それぞれの処理のためのハンドラを用意したほうが管理しやすい場面もあると思うので、継続を使用したスタイルとは状況に応じて使い分けていくといいんじゃないかと思います。


ちなみに次期C++にも継続(コルーチン)の仕組みが提案されています。
N3360 Networking Library Status Report

1. C++11 Lambdas
...

2. Promises and Futures
...

3. "Stackless" Coroutines
Duffによるデバイスライクなアプローチ、それといくつかの適切に定義されたマクロを採用することで、非同期処理の複雑なチェーンが簡潔に書かれたことがある:

while (!ec)
{
  yield my_socket->async_read_some(my_buffer, *this);
  if (ec) break;
  yield async_write(*my_socket, buffer(my_buffer, length), *this);
}

4. "Stackful" Coroutines
同様に、最近採択されたBoost.Contextのようなライブラリを使用することで、非同期操作を開始した時点でのコールスタックの維持と、それを元に戻すという方法もある:

while (!ec_)
{
  my_socket->async_read_some(my_buffer, *this);
  my_continuation->suspend();
  if (ec_) break;
  async_write(*my_socket, buffer(my_buffer, length_), *this);
  my_continuation->suspend();
}