rotor
Event loop friendly C++ actor micro-framework
 
Loading...
Searching...
No Matches
Introduction

rotor is event loop friendly C++ actor micro framework.

That means, that rotor is probably useless without event loop, e.g. boost-asio or wx-widgets event loop; as a framework rotor imposes some usage patters of actor-model, however their amount a bare minimum, hence it is micro framework.

As actor model and event loops are asynchronous by their nature, the underlying intuition is to uplift the low level events of an event loop into high level messages between actors, making high-level messages abstracted from the event loop; hence rotor should provide message passing facilities between actors independently from the used event loop(s).

rotor can be used in the applications, where different loop engines are used together and it is desirable to write some loop-agnostic logic still having message passing interface. The library can be used as lightweight loop abstraction with actor-like flavor.

rotor should be considered experimental project, i.e. no stability in API is guaranteed.

rotor is licensed on MIT license.

rotor is influenced by sobjectizer.

Hello World (loop-less)

This example is very artificial, and probably will never be met in the real-life, as the supervisor does not uses any event loop:

#include "rotor.hpp"
#include "dummy_supervisor.h"
#include <iostream>
struct hello_actor : public rotor::actor_base_t {
void on_start() noexcept override {
std::cout << "hello world\n";
supervisor->do_shutdown();
}
};
int main() {
auto timeout = boost::posix_time::milliseconds{500}; /* does not matter */
auto sup = ctx.create_supervisor<dummy_supervisor_t>().timeout(timeout).finish();
sup->create_actor<hello_actor>().timeout(timeout).finish();
sup->do_process();
return 0;
}
universal primitive of concurrent computation
Definition actor_base.h:47
actor_base_t(config_t &cfg)
constructs actor and links it's supervisor
virtual void on_start() noexcept
actor is fully initialized and it's supervisor has sent signal to start
The system context holds root supervisor_t (intrusive pointer) and may be loop-related details in der...
Definition system_context.h:32
auto create_supervisor()
returns builder for root supervisor
Definition supervisor.h:352

It prints "hello world" and exits. The example uses dummy_supervisor_t (sources omitted), which does almost nothing, but gives you idea what supervisor should do. The code with "real" supervisor is shown below, however for pure messaging the kind of supervisor does not matter.

Hello World (boost::asio loop)

namespace asio = boost::asio;
namespace pt = boost::posix_time;
struct server_actor : public rotor::actor_base_t {
void on_start() noexcept override {
std::cout << "hello world\n";
}
};
int main() {
asio::io_context io_context;
auto strand = std::make_shared<asio::io_context::strand>(io_context);
auto timeout = boost::posix_time::milliseconds{500};
auto sup = system_context->create_supervisor<rotor::asio::supervisor_asio_t>()
.strand(strand)
.timeout(timeout)
.finish();
sup->create_actor<server_actor>()
.timeout(timeout)
.autoshutdown_supervisor()
.finish();
sup->start();
io_context.run();
return 0;
}
virtual void do_shutdown(const extended_error_ptr_t &reason={}) noexcept
convenient method to send actor's supervisor shutdown trigger message
delivers rotor-messages on top of boost asio event loop using strand for serialization
Definition supervisor_asio.h:56
The boost::asio system context, which holds a reference to boost::asio::io_context and root superviso...
Definition system_context_asio.h:32
rotor::intrusive_ptr_t< system_context_asio_t > ptr_t
intrusive pointer type for boost::asio system context
Definition system_context_asio.h:34
auto create_actor()
creates child-actor builder
Definition supervisor.h:197

It is obvious that the actor code is the almost the same in both cases, however the system environment and supervisors are different. In the last example the important property of rotor is shown : it is not intrusive to event loops, i.e. an event loop runs on by itself, not introducing additional environment/thread; as the consequence, rotor actor can be seamlessly integrated with loops.

The supervisor.do_shutdown() from previous example just sends message to supervisor to perform shutdown procedure. However, it might be better to have .autoshutdown_supervisor() during actor setup, as it will shutdown supervisor in any case (i.e. not only from on_start()). Then, in the code io_context.run() loop terminates, as long as there are no any pending event. rotor does not make loop run endlessly.

The timeout variable is used to spawn timers for actor initialization and shutdown requests. As the actor does not do any I/O the operations will be executed immediately, and timeout values do not matter.

Ping-pong example

The following example demonstrates how to send messages between actors.

struct ping_t {};
struct pong_t {};
struct pinger_t : public rotor::actor_base_t {
void set_ponger_addr(const rotor::address_ptr_t &addr) { ponger_addr = addr; }
void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&pinger_t::on_pong); });
}
void on_start() noexcept override {
send<ping_t>(ponger_addr);
}
void on_pong(rotor::message_t<pong_t> &) noexcept {
std::cout << "pong\n";
}
rotor::address_ptr_t ponger_addr;
};
struct ponger_t : public rotor::actor_base_t {
void set_pinger_addr(const rotor::address_ptr_t &addr) { pinger_addr = addr; }
void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&ponger_t::on_ping); });
}
void on_ping(rotor::message_t<ping_t> &) noexcept {
std::cout << "ping\n";
send<pong_t>(pinger_addr);
}
private:
rotor::address_ptr_t pinger_addr;
};
int main() {
auto timeout = boost::posix_time::milliseconds{500}; /* does not matter */
auto sup = ctx.create_supervisor<dummy_supervisor_t>().timeout(timeout).finish();
auto pinger = sup->create_actor<pinger_t>()
.init_timeout(timeout)
.shutdown_timeout(timeout)
.autoshutdown_supervisor()
.finish();
auto ponger = sup->create_actor<ponger_t>()
.timeout(timeout) // shortcut for init/shutdown
.finish();
pinger->set_ponger_addr(ponger->get_address());
ponger->set_pinger_addr(pinger->get_address());
sup->do_process();
return 0;
}
intrusive_ptr_t< address_t > address_ptr_t
intrusive pointer for address
Definition address.hpp:57
virtual void configure(plugin::plugin_base_t &plugin) noexcept
main callback for plugin configuration when it's ready
the generic message meant to hold user-specific payload
Definition message.h:83
base class for all actor plugins
Definition plugin_base.h:23
allows custom (actor) subscriptions and it is responsible for starting actor when it receives message...
Definition starter.h:19

Without loss of generality the system context/supervisor can be switched to any other and the example will still work, because the actors in the sample are loop-agnostic, as well as the messages.

In the real world scenarios, actors might be not so pure, i.e. they will interact with timers and other system events, however the interface for sending messages is still the same (i.e. loop-agnostic), which makes it quite a convenient way to send messages to actors running on different loops.

pub-sub example

Any message can be send to any address; any actor, including actors running on different supervisors and loops, can subscribe to any kind of message on any address.

#include "rotor.hpp"
#include "dummy_supervisor.h"
#include <iostream>
namespace r = rotor;
struct payload_t {};
using sample_message_t = r::message_t<payload_t>;
struct pub_t : public r::actor_base_t {
using r::actor_base_t::actor_base_t;
void set_pub_addr(const r::address_ptr_t &addr) { pub_addr = addr; }
void on_start() noexcept override {
r::actor_base_t::on_start();
send<payload_t>(pub_addr);
}
r::address_ptr_t pub_addr;
};
struct sub_t : public r::actor_base_t {
using r::actor_base_t::actor_base_t;
void set_pub_addr(const r::address_ptr_t &addr) { pub_addr = addr; }
void configure(r::plugin::plugin_base_t &plugin) noexcept override {
plugin.with_casted<r::plugin::starter_plugin_t>(
[&](auto &p) { p.subscribe_actor(&sub_t::on_payload, pub_addr); });
}
void on_payload(sample_message_t &) noexcept { std::cout << "received on " << static_cast<void *>(this) << "\n"; }
r::address_ptr_t pub_addr;
};
int main() {
auto timeout = boost::posix_time::milliseconds{500}; /* does not matter */
auto sup = ctx.create_supervisor<dummy_supervisor_t>().timeout(timeout).finish();
auto pub_addr = sup->create_address(); // (1)
auto pub = sup->create_actor<pub_t>().timeout(timeout).finish();
auto sub1 = sup->create_actor<sub_t>().timeout(timeout).finish();
auto sub2 = sup->create_actor<sub_t>().timeout(timeout).finish();
pub->set_pub_addr(pub_addr);
sub1->set_pub_addr(pub_addr);
sub2->set_pub_addr(pub_addr);
sup->do_process();
sup->do_shutdown();
sup->do_process();
return 0;
}
Basic namespace for all rotor functionalities.
Definition rotor.hpp:21

Output sample:

received on 0x55d159ea36c0
received on 0x55d159ea3dc0

The address in the line (1) is arbitrary: the address of pub-actor itself as well as the address of the supervisor can be used... even address of different supervisor can be used.

Please note, that since v0.09 there is a new way of subscribing an actor to messages: now it is done via starter_plugin_t and overriding configure method of the actor.

request-response example (boost::asio)

In the example below the usage of request-response pattern is demonstrated. The "server" actor takes the number from request and replies to "client" actor with square root if the value is >= 0, otherwise it replies with error.

Contrary to the regular messages, request-response is a little bit more complex pattern: the response should include full request (message) to be able to match to the request, as well as the possibility of response not arrival in time; in the last case the response message should arrive with error.

That's why below not pure user-defined payloads are used, but a little bit modified to support request/reply machinery.

#include "rotor.hpp"
#include "rotor/asio.hpp"
#include <iostream>
#include <cmath>
#include <system_error>
namespace asio = boost::asio;
namespace pt = boost::posix_time;
namespace payload {
struct sample_res_t {
double value;
};
struct sample_req_t {
using response_t = sample_res_t;
double value;
};
} // namespace payload
namespace message {
} // namespace message
struct server_actor : public rotor::actor_base_t {
void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
plugin.with_casted<rotor::plugin::starter_plugin_t>(
[](auto &p) { p.subscribe_actor(&server_actor::on_request); });
}
void on_request(message::request_t &req) noexcept {
auto in = req.payload.request_payload.value;
if (in >= 0) {
auto value = std::sqrt(in);
reply_to(req, value);
} else {
// IRL, it should be your custom error codes
auto ec = std::make_error_code(std::errc::invalid_argument);
}
}
};
struct client_actor : public rotor::actor_base_t {
rotor::address_ptr_t server_addr;
void set_server(const rotor::address_ptr_t addr) { server_addr = addr; }
void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
plugin.with_casted<rotor::plugin::starter_plugin_t>(
[](auto &p) { p.subscribe_actor(&client_actor::on_response); });
}
void on_response(message::response_t &res) noexcept {
if (!res.payload.ee) { // check for possible error
auto &in = res.payload.req->payload.request_payload.value;
auto &out = res.payload.res.value;
std::cout << " in = " << in << ", out = " << out << "\n";
}
}
void on_start() noexcept override {
auto timeout = rotor::pt::milliseconds{1};
request<payload::sample_req_t>(server_addr, 25.0).send(timeout);
}
};
int main() {
asio::io_context io_context;
auto strand = std::make_shared<asio::io_context::strand>(io_context);
auto timeout = boost::posix_time::milliseconds{500};
auto sup =
system_context->create_supervisor<rotor::asio::supervisor_asio_t>().strand(strand).timeout(timeout).finish();
auto server = sup->create_actor<server_actor>().timeout(timeout).finish();
auto client = sup->create_actor<client_actor>().timeout(timeout).autoshutdown_supervisor().finish();
client->set_server(server->get_address());
sup->do_process();
return 0;
}
extended_error_ptr_t make_error(const std::error_code &ec, const extended_error_ptr_t &next={}, const message_ptr_t &request={}) const noexcept
makes extended error within the context of the actor
void reply_with_error(Request &message, const extended_error_ptr_t &ec)
convenient method for constructing and sending error response to a request
Definition supervisor.h:605
void reply_to(Request &message, Args &&...args)
convenient method for constructing and sending response to a request
Definition supervisor.h:601

Output sample:

in = 25, out = 5

registry example (boost::asio)

There is a minor drawback in the last example: the binding of client and server actors is performed manually, the server address is injected directly into client actor and there is no any guarantees that server actor will "outlive" client actor.

As usual in the computer sciences all problems can be solved with additional layer of indirection. We'll create special registry actor. When server actor will be ready, it will register self there under symbolic name "server"; the client actor will look up in the registry server address by the same symbolic name and "link" to the server actor, i.e. make sure that server actor will have longer lifetime than client actor.

#include "rotor.hpp"
#include "rotor/asio.hpp"
#include <iostream>
#include <cmath>
#include <system_error>
#include <random>
namespace asio = boost::asio;
namespace pt = boost::posix_time;
namespace payload {
struct sample_res_t {
double value;
};
struct sample_req_t {
using response_t = sample_res_t;
double value;
};
} // namespace payload
namespace message {
} // namespace message
struct server_actor : public rotor::actor_base_t {
void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
plugin.with_casted<rotor::plugin::starter_plugin_t>(
[](auto &p) { p.subscribe_actor(&server_actor::on_request); });
[&](auto &p) { p.register_name("server", get_address()); });
}
void on_request(message::request_t &req) noexcept {
auto in = req.payload.request_payload.value;
if (in >= 0) {
auto value = std::sqrt(in);
reply_to(req, value);
} else {
// IRL, it should be your custom error codes
auto ec = std::make_error_code(std::errc::invalid_argument);
}
}
};
struct client_actor : public rotor::actor_base_t {
rotor::address_ptr_t server_addr;
void set_server(const rotor::address_ptr_t addr) { server_addr = addr; }
void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
plugin.with_casted<rotor::plugin::starter_plugin_t>(
[](auto &p) { p.subscribe_actor(&client_actor::on_response); });
[&](auto &p) { p.discover_name("server", server_addr, true).link(); });
}
void on_response(message::response_t &res) noexcept {
if (!res.payload.ee) { // check for possible error
auto &in = res.payload.req->payload.request_payload.value;
auto &out = res.payload.res.value;
std::cout << " in = " << in << ", out = " << out << "\n";
}
do_shutdown(); // optional;
}
void on_start() noexcept override {
auto timeout = rotor::pt::milliseconds{1};
request<payload::sample_req_t>(server_addr, 25.0).send(timeout);
}
};
int main() {
asio::io_context io_context;
auto strand = std::make_shared<asio::io_context::strand>(io_context);
auto timeout = boost::posix_time::milliseconds{500};
auto sup = system_context->create_supervisor<rotor::asio::supervisor_asio_t>()
.strand(strand)
.create_registry()
.timeout(timeout)
.finish();
auto server = sup->create_actor<server_actor>().timeout(timeout).finish();
auto client = sup->create_actor<client_actor>().timeout(timeout).autoshutdown_supervisor().finish();
sup->do_process();
return 0;
}
const address_ptr_t & get_address() const noexcept
returns actor's main address
Definition actor_base.h:299
void with_casted(Fn &&fn, config_phase_t desired_phase=config_phase_t::INITIALIZING) noexcept
invokes the callback if plugin type and phase mach
Definition plugin_base.h:147
handy access to registry_t, for name registration and discovery
Definition registry.h:35

there is more...

There is more rotor capabilities, like requests cancellations, and generalized timer spawning interface. Read on in the Advanced Examples section.