12#include "subscription.h"
13#include "system_context.h"
14#include "supervisor_config.h"
15#include "address_mapping.h"
16#include "error_code.h"
20#include <unordered_map>
21#include <unordered_set>
23#include <boost/lockfree/queue.hpp>
27#pragma warning(disable : 4251)
131 inline size_t do_process() noexcept {
return locality_leader->delivery->process(); }
146 virtual
void start() noexcept = 0;
148 void on_start() noexcept override;
152 virtual
void shutdown() noexcept = 0;
183 inline
void put(
message_ptr_t message) { locality_leader->queue.emplace_back(std::move(message)); }
192 handler_ptr_t wrapped_handler(std::forward<Handler>(handler));
193 lifetime->unsubscribe(wrapped_handler, addr);
199 assert(manager &&
"child_manager_plugin_t should be already initialized");
200 return builder_t([
this](
auto &actor) { manager->create_child(actor); },
this);
208 template <
typename T,
typename... Args>
210 Args &&...args)
noexcept {
238 using actor_base_t::subscribe;
244 template <
typename T>
auto &
access() noexcept;
247 template <typename T, typename... Args> auto access(Args... args) noexcept;
260 void on_request_trigger(
request_id_t timer_id,
bool cancelled) noexcept;
290 plugin::delivery_plugin_base_t *delivery =
nullptr;
293 plugin::child_manager_plugin_t *manager =
nullptr;
302 size_t inbound_queue_size;
305 pt::time_duration poll_duration;
308 const std::atomic_bool *shutdown_flag =
nullptr;
311 pt::time_duration shutdown_poll_frequency = pt::millisec{100};
314 using actors_set_t = std::unordered_set<const actor_base_t *>;
316 bool create_registry;
317 bool synchronize_start;
319 actors_set_t alive_actors;
333 void uplift_last_message() noexcept;
335 void on_shutdown_check_timer(
request_id_t,
bool cancelled) noexcept;
339 auto &map = locality_leader->request_map;
340 auto it = map.find(++locality_leader->last_req_id);
341 if (it != map.end()) {
344 return locality_leader->last_req_id;
353 using builder_t =
typename Supervisor::template config_builder_t<Supervisor>;
355 [
this](
auto &actor) {
361 this->supervisor = actor;
362 actor->do_initialize(
this);
369 supervisor->
put(make_message<M>(addr, std::forward<Args>(args)...));
372template <
typename Delegate,
typename Method>
374 Method method)
noexcept {
376 auto handler = std::make_unique<final_handler_t>(
this, request_id, &delegate, std::forward<Method>(method));
377 supervisor->do_start_timer(interval, *handler);
378 timers_map.emplace(request_id, std::move(handler));
381template <
typename Delegate,
typename Method,
typename>
383 auto request_id = supervisor->next_request_id();
384 start_timer(request_id, interval, delegate, std::forward<Method>(method));
391 auto handler_raw =
new final_handler_t(actor, std::move(handler));
396 auto wrapped_handler =
wrap_handler(*
this, std::move(h));
397 return supervisor->subscribe(wrapped_handler, address,
this, owner_tag_t::ANONYMOUS);
400template <
typename Handler>
402 auto wrapped_handler =
wrap_handler(*
this, std::move(h));
403 return supervisor->subscribe(wrapped_handler, addr,
this, owner_tag_t::ANONYMOUS);
408template <
typename Handler>
411 handler_ptr_t wrapped_handler(
new final_handler_t(*
this, std::move(h)));
412 auto info = actor->supervisor->subscribe(wrapped_handler, addr, actor, owner_tag_t::PLUGIN);
413 own_subscriptions.emplace_back(info);
418 return subscribe(std::forward<Handler>(h), actor->address);
421template <>
inline auto &plugin_base_t::access<plugin::starter_plugin_t>() noexcept {
return own_subscriptions; }
424 auto &address = actor->get_address();
425 return subscribe_actor(std::forward<Handler>(handler), address);
428template <
typename Handler>
430 auto wrapped_handler =
wrap_handler(*actor, std::move(handler));
431 auto info = actor->get_supervisor().subscribe(wrapped_handler, addr, actor, owner_tag_t::PLUGIN);
432 assert(std::count_if(tracked.begin(), tracked.end(), [&](
auto &it) { return *it == *info; }) == 0 &&
433 "already subscribed");
434 tracked.emplace_back(info);
435 access<starter_plugin_t>().emplace_back(info);
440 size_t enqueued_messages{0};
441 while (queue->size()) {
442 auto ptr = queue->front().detach();
444 auto &dest = message->address;
446 auto internal = dest->same_locality(*address);
448 auto local_recipients = subscription_map->get_recipients(*message);
449 if (local_recipients) {
452 if (message->next_route && message->use_count() == 1) {
454 message->
address = std::move(message->next_route);
455 sup->put(std::move(message));
458 dest->supervisor.enqueue(std::move(message));
462 return enqueued_messages;
466 size_t enqueued_messages{0};
467 while (queue->size()) {
468 auto ptr = queue->front().detach();
470 auto &dest = message->address;
472 auto internal = dest->same_locality(*address);
475 local_recipients = subscription_map->get_recipients(*message);
476 if (local_recipients) {
479 if (ptr->next_route && ptr->use_count() == 1) {
481 message->
address = std::move(message->next_route);
482 sup->put(std::move(message));
483 }
else if (!local_recipients) {
487 dest->supervisor.enqueue(std::move(message));
491 return enqueued_messages;
497 supervisor->unsubscribe_actor(address,
wrap_handler(*
this, std::move(h)));
500template <
typename Handler,
typename Enabled>
502 supervisor->unsubscribe_actor(addr,
wrap_handler(*
this, std::move(h)));
506template <
typename... Args>
509 : sup{sup_}, actor{actor_}, request_id{sup.next_request_id()}, destination{destination_}, reply_to{reply_to_},
510 do_install_handler{false} {
511 auto addr = sup.address_mapping.
get_mapped_address(actor_, response_message_t::message_type);
513 imaginary_address = addr;
519 do_install_handler =
true;
522 new request_message_t{destination, request_id, imaginary_address, reply_to_, std::forward<Args>(args)...});
526 if (do_install_handler) {
530 sup.request_map.emplace(request_id,
request_curry_t{fn, reply_to, req, &actor});
533 actor.active_requests.emplace(request_id);
538 auto handler = lambda<response_message_t>([supervisor = &sup](response_message_t &msg) {
539 auto request_id = msg.payload.request_id();
540 auto &request_map = supervisor->request_map;
541 auto it = request_map.find(request_id);
547 if (it != request_map.end()) {
548 auto &curry = it->second;
549 auto &orig_addr = curry.origin;
550 supervisor->template send<wrapped_res_t>(orig_addr, msg.payload);
551 supervisor->discard_request(request_id);
553 supervisor->uplift_last_message();
556 auto wrapped_handler =
wrap_handler(sup, std::move(handler));
557 auto info = sup.subscribe(wrapped_handler, imaginary_address, &actor, owner_tag_t::SUPERVISOR);
558 sup.address_mapping.set(actor, info);
566template <
typename Request,
typename... Args>
580template <
typename Request,
typename... Args>
584 return supervisor->
do_request<request_t>(*
this, dest_addr, reply_addr, std::forward<Args>(args)...);
588 using payload_t =
typename Request::payload_t::request_t;
590 return traits_t::make_error_response(message.payload.reply_to, message, ec);
594 using payload_t =
typename Request::payload_t::request_t;
596 using response_t =
typename req_traits_t::response::wrapped_t;
597 using request_ptr_t =
typename req_traits_t::request::message_ptr_t;
598 return make_message<response_t>(message.payload.reply_to, request_ptr_t{&message}, std::forward<Args>(args)...);
602 supervisor->
put(make_response<Request>(message, std::forward<Args>(args)...));
609template <
typename Actor>
611 : install_action{std::move(action_)}, supervisor{supervisor_}, system_context{*supervisor_->context},
612 config{supervisor_} {
620 system_context.on_error(actor_ptr.get(),
make_error(system_context.identity(), ec));
622 auto &cfg =
static_cast<typename builder_t::config_t &
>(config);
623 auto actor =
new Actor(cfg);
624 actor_ptr.reset(actor);
625 install_action(actor_ptr);
Basic namespace for all rotor functionalities.
Definition rotor.hpp:21
intrusive_ptr_t< message_base_t > message_ptr_t
intrusive pointer for message
Definition message.h:118
intrusive_ptr_t< supervisor_t > supervisor_ptr_t
intrusive pointer for supervisor
Definition forward.hpp:29
ROTOR_API std::error_code make_error_code(const error_code_t e)
makes std::error_code from rotor error code enumerations
Definition error_code.h:78
intrusive_ptr_t< address_t > address_ptr_t
intrusive pointer for address
Definition address.hpp:57
owner_tag_t
who owns the subscription point
Definition subscription_point.h:40
intrusive_ptr_t< extended_error_t > extended_error_ptr_t
intrusive pointer to extended error type
Definition extended_error.h:25
std::deque< message_ptr_t > messages_queue_t
structure to hold messages (intrusive pointers)
Definition message.h:121
handler_ptr_t wrap_handler(actor_base_t &actor, Handler &&handler)
wraps handler (pointer to member function) and actor address into intrusive pointer
Definition supervisor.h:389
intrusive_ptr_t< handler_base_t > handler_ptr_t
intrusive pointer for handler
Definition forward.hpp:26
std::size_t request_id_t
timer identifier type in the scope of the actor
Definition forward.hpp:34
ROTOR_API extended_error_ptr_t make_error(const std::string &context_, const std::error_code &ec_, const extended_error_ptr_t &next_={}, const message_ptr_t &request_={}) noexcept
constructs smart pointer to the extened error
intrusive_ptr_t< subscription_info_t > subscription_info_ptr_t
intrusive pointer for subscription_info_t
Definition subscription_point.h:127
boost::intrusive_ptr< T > intrusive_ptr_t
alias for intrusive pointer
Definition arc.hpp:27
std::function< actor_ptr_t(supervisor_t &, const address_ptr_t &)> factory_t
factory which allows to create actors lazily or on demand
Definition forward.hpp:45
supervisor_policy_t
how to behave on child actor initialization failures
Definition policy.h:12
universal primitive of concurrent computation
Definition actor_base.h:47
request_id_t start_timer(const pt::time_duration &interval, Delegate &delegate, Method method) noexcept
spawns a new one-shot timer
Definition supervisor.h:382
subscription_info_ptr_t subscribe(Handler &&h, const address_ptr_t &addr) noexcept
subscribes actor's handler to process messages on the specified address
Definition supervisor.h:401
auto make_response(Request &message, Args &&...args)
makes response to the request, but does not send it.
Definition supervisor.h:593
request_builder_t< typename request_wrapper_t< R >::request_t > request_via(const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args)
returns request builder for destination address using the specified address for reply
void send(const address_ptr_t &addr, Args &&...args)
sends message to the destination address
Definition supervisor.h:368
request_builder_t< typename request_wrapper_t< R >::request_t > request(const address_ptr_t &dest_addr, Args &&...args)
returns request builder for destination address using the "main" actor address
void reply_with_error(Request &message, const extended_error_ptr_t &ec)
convenient method for constructing and sending error response to a request
Definition supervisor.h:605
supervisor_t * supervisor
non-owning pointer to actor's execution / infrastructure context
Definition actor_base.h:432
void reply_to(Request &message, Args &&...args)
convenient method for constructing and sending response to a request
Definition supervisor.h:601
address_ptr_t address
actor address
Definition actor_base.h:423
void unsubscribe(Handler &&h, address_ptr_t &addr) noexcept
unsubscribes actor's handler from process messages on the specified address
Definition supervisor.h:501
CRTP actor config builder.
Definition actor_config.h:92
actor_config_builder_t(install_action_t &&action_, supervisor_t *supervisor_)
ctor with install action and raw pointer to supervisor
Definition supervisor.h:610
typename Actor::template config_builder_t< Supervisor > builder_t
final builder class
Definition actor_config.h:94
actor_ptr_t finish() &&
constructs actor from the current config
Definition supervisor.h:616
std::function< void(actor_ptr_t &)> install_action_t
actor post-constructor callback type
Definition actor_config.h:107
NAT mechanism for rotor
Definition address_mapping.h:33
address_ptr_t get_mapped_address(actor_base_t &actor, const void *) noexcept
returns temporal destination address for the actor/message type
continue handler invocation (used for intercepting)
Definition handler.h:175
Base class for rotor message.
Definition message.h:52
create actor's addresses
Definition address_maker.h:24
supervisor's plugin for child-actors housekeeping
Definition child_manager.h:30
base implementation for messages delivery plugin
Definition delivery.h:62
templated message delivery plugin, to allow local message delivery be customized
Definition delivery.h:84
size_t process() noexcept override
main messages dispatcher interface
allows non-local actors to subscribe on the local addresses of a supervisor.
Definition foreigners_support.h:17
manages actors init and shutdown procedures
Definition init_shutdown.h:22
static void discard(message_ptr_t &message, const message_stringifier_t *stringifier) noexcept
dumps discarded message
static void delivery(message_ptr_t &message, const subscription_t::joint_handlers_t &local_recipients, const message_stringifier_t *stringifier) noexcept
delivers the message to the recipients, possibly dumping it to console
manages all actor subscriptions (i.e. from plugins or actor itself).
Definition lifetime.h:21
allows actor to have active (client) role in linking
Definition link_client.h:34
allows actor to have passive (server) role in linking
Definition link_server.h:32
static void delivery(message_ptr_t &message, const subscription_t::joint_handlers_t &local_recipients) noexcept
delivers an message for self of one of child-actors (non-supervisors)
detects and assigns locality leader to the supervisor
Definition locality.h:22
subscription_info_ptr_t subscribe(Handler &&handler, const address_ptr_t &address) noexcept
subscribes plugin to the custom plugin handler on the specified address
Definition supervisor.h:409
handy access to registry_t, for name registration and discovery
Definition registry.h:35
"lock" for external resources
Definition resources.h:39
allows custom (actor) subscriptions and it is responsible for starting actor when it receives message...
Definition starter.h:19
subscription_info_ptr_t subscribe_actor(Handler &&handler) noexcept
subscribes actor handler on main actor address
Definition supervisor.h:423
builder pattern implementation for the original request
Definition request.hpp:387
request_builder_t(supervisor_t &sup_, actor_base_t &actor_, const address_ptr_t &destination_, const address_ptr_t &reply_to_, Args &&...args)
constructs request message but still does not dispatch it
Definition supervisor.h:507
request_id_t send(const pt::time_duration &send) noexcept
actually dispatches requests and spawns timeout timer
Definition supervisor.h:525
the recorded context, which is needed to produce error response to the original request
Definition request.hpp:301
type helper to deduce request/response messages from original (user-supplied) request type
Definition request.hpp:318
T request_t
an alias for the original request type
Definition request.hpp:45
allows automatically restart actors
Definition spawner.h:30
pair internal and external handler_t
Definition subscription.h:40
Holds and classifies message handlers on behalf of supervisor.
Definition subscription.h:30
CRTP supervisor config builder.
Definition supervisor_config.h:72
base supervisor config, which holds shutdown timeout value
Definition supervisor_config.h:23
supervisor is responsible for managing actors (workers) lifetime
Definition supervisor.h:69
size_t do_process() noexcept
process queue of messages of locality leader
Definition supervisor.h:131
std::unordered_map< request_id_t, request_curry_t > request_map_t
timer to response with timeout procedure type
Definition supervisor.h:257
virtual address_ptr_t make_address() noexcept
creates new address_t linked with the supervisor
const address_ptr_t & get_registry_address() const noexcept
returns registry actor address (if it was defined or registry actor was created)
Definition supervisor.h:241
void subscribe(actor_base_t &actor, Handler &&handler)
templated version of subscribe_actor
Definition supervisor.h:186
void shutdown_finish() noexcept override
finalizes shutdown
void on_request_trigger(request_id_t timer_id, bool cancelled) noexcept
invoked as timer callback; creates response or just clean up for previously set request
std::tuple< plugin::address_maker_plugin_t, plugin::locality_plugin_t, plugin::delivery_plugin_t< plugin::default_local_delivery_t >, plugin::lifetime_plugin_t, plugin::init_shutdown_plugin_t, plugin::foreigners_support_plugin_t, plugin::child_manager_plugin_t, plugin::link_server_plugin_t, plugin::link_client_plugin_t, plugin::registry_plugin_t, plugin::resources_plugin_t, plugin::starter_plugin_t > plugins_list_t
the default list of plugins for an supervisor
Definition supervisor.h:90
void unsubscribe_actor(const address_ptr_t &addr, Handler &&handler) noexcept
convenient templated version of `unsubscribe_actor
Definition supervisor.h:191
boost::lockfree::queue< message_base_t * > inbound_queue_t
lock-free queue for inbound messages
Definition supervisor.h:250
auto & access() noexcept
generic non-public fields accessor
auto create_actor()
creates child-actor builder
Definition supervisor.h:197
supervisor_t(supervisor_config_t &config)
constructs new supervisor with optional parent supervisor
subscription_info_ptr_t subscribe(const handler_ptr_t &handler, const address_ptr_t &addr, const actor_base_t *owner_ptr, owner_tag_t owner_tag) noexcept
main subscription implementation
void put(message_ptr_t message)
puts a message into internal supervisor queue for further processing
Definition supervisor.h:183
spawner_t spawn(factory_t) noexcept
returns an actor spawner
virtual void do_initialize(system_context_t *ctx) noexcept override
early actor initialization (pre-initialization)
request_builder_t< T > do_request(actor_base_t &actor, const address_ptr_t &dest_addr, const address_ptr_t &reply_to, Args &&...args) noexcept
convenient method for request building
Definition supervisor.h:209
The system context holds root supervisor_t (intrusive pointer) and may be loop-related details in der...
Definition system_context.h:32
virtual void on_error(actor_base_t *actor, const extended_error_ptr_t &ec) noexcept
fatal error handler
virtual std::string identity() noexcept
identifies the context.
auto create_supervisor()
returns builder for root supervisor
Definition supervisor.h:352
Base class for timer handler.
Definition timer_handler.hpp:17
templated implementation of timer handler
Definition timer_handler.hpp:42