12#include "subscription.h"
13#include "system_context.h"
14#include "supervisor_config.h"
15#include "address_mapping.h"
16#include "error_code.h"
19#include <unordered_map>
20#include <unordered_set>
22#include <boost/lockfree/queue.hpp>
26#pragma warning(disable : 4251)
185 template <
typename Handler>
void subscribe(actor_base_t &actor, Handler &&handler) {
191 handler_ptr_t wrapped_handler(std::forward<Handler>(handler));
192 lifetime->unsubscribe(wrapped_handler, addr);
198 assert(
manager &&
"child_manager_plugin_t should be already initialized");
199 return builder_t([
this](
auto &actor) {
manager->create_child(actor); },
this);
207 template <
typename T,
typename... Args>
209 Args &&...args)
noexcept {
210 return request_builder_t<T>(*
this, actor, dest_addr,
reply_to, std::forward<Args>(args)...);
227 const actor_base_t *owner_ptr,
owner_tag_t owner_tag)
noexcept;
243 template <
typename T>
auto &
access() noexcept;
246 template <typename T, typename... Args> auto
access(Args... args) noexcept;
289 plugin::delivery_plugin_base_t *
delivery =
nullptr;
292 plugin::child_manager_plugin_t *
manager =
nullptr;
313 using actors_set_t = std::unordered_set<const actor_base_t *>;
315 bool create_registry;
316 bool synchronize_start;
318 actors_set_t alive_actors;
332 void uplift_last_message() noexcept;
334 void on_shutdown_check_timer(
request_id_t,
bool cancelled) noexcept;
338 auto &map = locality_leader->request_map;
339 auto it = map.find(++locality_leader->last_req_id);
340 if (it != map.end()) {
343 return locality_leader->last_req_id;
352 using builder_t =
typename Supervisor::template config_builder_t<Supervisor>;
354 [
this](
auto &actor) {
360 this->supervisor = actor;
361 actor->do_initialize(
this);
371template <
typename M,
typename... Args>
378template <
typename Delegate,
typename Method>
380 Method method)
noexcept {
382 auto handler = std::make_unique<final_handler_t>(
this, request_id, &delegate, std::forward<Method>(method));
383 supervisor->do_start_timer(interval, *handler);
384 timers_map.emplace(request_id, std::move(handler));
387template <
typename Delegate,
typename Method,
typename>
389 auto request_id =
supervisor->next_request_id();
390 start_timer(request_id, interval, delegate, std::forward<Method>(method));
397 auto handler_raw =
new final_handler_t(actor, std::move(handler));
402 auto wrapped_handler =
wrap_handler(*
this, std::move(h));
403 return supervisor->subscribe(wrapped_handler,
address,
this, owner_tag_t::ANONYMOUS);
406template <
typename Handler>
408 auto wrapped_handler =
wrap_handler(*
this, std::move(h));
409 return supervisor->subscribe(wrapped_handler, addr,
this, owner_tag_t::ANONYMOUS);
414template <
typename Handler>
417 handler_ptr_t wrapped_handler(
new final_handler_t(*
this, std::move(h)));
418 auto info =
actor->supervisor->subscribe(wrapped_handler, addr,
actor, owner_tag_t::PLUGIN);
419 own_subscriptions.emplace_back(info);
430 auto &address =
actor->get_address();
434template <
typename Handler>
437 auto info =
actor->get_supervisor().subscribe(wrapped_handler, addr,
actor, owner_tag_t::PLUGIN);
438 assert(std::count_if(tracked.begin(), tracked.end(), [&](
auto &it) { return *it == *info; }) == 0 &&
439 "already subscribed");
440 tracked.emplace_back(info);
446 size_t enqueued_messages{0};
447 while (
queue->size()) {
448 auto ptr =
queue->front().detach();
452 auto internal = dest->same_locality(*
address);
455 if (local_recipients) {
464 dest->supervisor.enqueue(std::move(
message));
468 return enqueued_messages;
472 size_t enqueued_messages{0};
473 while (
queue->size()) {
474 auto ptr =
queue->front().detach();
478 auto internal = dest->same_locality(*
address);
482 if (local_recipients) {
485 if (ptr->next_route && ptr->use_count() == 1) {
489 }
else if (!local_recipients) {
493 dest->supervisor.enqueue(std::move(
message));
497 return enqueued_messages;
506template <
typename Handler,
typename Enabled>
512template <
typename... Args>
515 : sup{sup_}, actor{actor_}, request_id{sup.next_request_id()}, destination{destination_}, reply_to{reply_to_},
516 do_install_handler{false} {
517 auto addr = sup.address_mapping.get_mapped_address(actor_, response_message_t::message_type);
519 imaginary_address = addr;
524 imaginary_address = sup.make_address();
525 do_install_handler =
true;
528 new request_message_t{destination, request_id, imaginary_address, reply_to_, std::forward<Args>(args)...});
532 if (do_install_handler) {
536 sup.request_map.emplace(request_id,
request_curry_t{fn, reply_to, req, &actor});
539 actor.active_requests.emplace(request_id);
543template <
typename T>
void request_builder_t<T>::install_handler() noexcept {
545 auto request_id = msg.payload.request_id();
546 auto &request_map = supervisor->request_map;
547 auto it = request_map.find(request_id);
553 if (it != request_map.end()) {
554 auto &curry = it->second;
555 auto &orig_addr = curry.origin;
556 supervisor->template send<wrapped_res_t>(orig_addr, msg.payload);
557 supervisor->discard_request(request_id);
559 supervisor->uplift_last_message();
562 auto wrapped_handler =
wrap_handler(sup, std::move(handler));
563 auto info = sup.subscribe(wrapped_handler, imaginary_address, &actor, owner_tag_t::SUPERVISOR);
564 sup.address_mapping.set(actor, info);
572template <
typename Request,
typename... Args>
577 return supervisor->do_request<request_t>(*
this, dest_addr,
address, std::forward<Args>(args)...);
586template <
typename Request,
typename... Args>
590 return supervisor->do_request<request_t>(*
this, dest_addr, reply_addr, std::forward<Args>(args)...);
594 using payload_t =
typename Request::payload_t::request_t;
596 return traits_t::make_error_response(
message.payload.reply_to,
message, ec);
600 using payload_t =
typename Request::payload_t::request_t;
602 using response_t =
typename req_traits_t::response::wrapped_t;
603 using request_ptr_t =
typename req_traits_t::request::message_ptr_t;
615template <
typename Actor>
628 auto &cfg =
static_cast<typename builder_t::config_t &
>(
config);
629 auto actor =
new Actor(cfg);
630 actor_ptr.reset(actor);
namespace for rotor core messages (which just transform payloads)
Definition messages.hpp:317
Basic namespace for all rotor functionalities.
Definition rotor.hpp:21
intrusive_ptr_t< extended_error_t > extended_error_ptr_t
intrusive pointer to extended error type
Definition extended_error.h:25
ROTOR_API std::error_code make_error_code(const error_code_t e)
makes std::error_code from rotor error code enumerations
Definition error_code.h:78
intrusive_ptr_t< message_base_t > message_ptr_t
intrusive pointer for message
Definition message.h:118
owner_tag_t
who owns the subscription point
Definition subscription_point.h:39
std::function< actor_ptr_t(supervisor_t &, const address_ptr_t &)> factory_t
factory which allows to create actors lazily or on demand
Definition forward.hpp:45
intrusive_ptr_t< handler_base_t > handler_ptr_t
intrusive pointer for handler
Definition forward.hpp:26
constexpr lambda_holder_t< M, F > lambda(F &&fn)
helper function for lambda holder constructing
Definition handler.h:48
intrusive_ptr_t< address_t > address_ptr_t
intrusive pointer for address
Definition address.hpp:57
std::deque< message_ptr_t > messages_queue_t
structure to hold messages (intrusive pointers)
Definition message.h:121
handler_ptr_t wrap_handler(actor_base_t &actor, Handler &&handler)
wraps handler (pointer to member function) and actor address into intrusive pointer
Definition supervisor.h:395
ROTOR_API extended_error_ptr_t make_error(const std::string &context_, const std::error_code &ec_, const extended_error_ptr_t &next_={}, const message_ptr_t &request_={}) noexcept
constructs smart pointer to the extened error
auto make_routed_message(const address_ptr_t &addr, const address_ptr_t &route_addr, Args &&...args) -> message_ptr_t
constructs message by constructing it's payload; after delivery to destination address (to all subscr...
Definition message.h:146
auto make_message(const address_ptr_t &addr, Args &&...args) -> message_ptr_t
constructs message by constructing it's payload; intrusive pointer for the message is returned
Definition message.h:124
boost::intrusive_ptr< T > intrusive_ptr_t
alias for intrusive pointer
Definition arc.hpp:27
intrusive_ptr_t< supervisor_t > supervisor_ptr_t
intrusive pointer for supervisor
Definition forward.hpp:29
std::size_t request_id_t
timer identifier type in the scope of the actor
Definition forward.hpp:34
intrusive_ptr_t< subscription_info_t > subscription_info_ptr_t
intrusive pointer for subscription_info_t
Definition subscription_point.h:121
supervisor_policy_t
how to behave on child actor initialization failures
Definition policy.h:12
universal primitive of concurrent computation
Definition actor_base.h:47
request_id_t start_timer(const pt::time_duration &interval, Delegate &delegate, Method method) noexcept
spawns a new one-shot timer
Definition supervisor.h:388
timers_map_t timers_map
timer-id to timer-handler map
Definition actor_base.h:497
plugin::lifetime_plugin_t * lifetime
non-owning pointer to lifetime plugin
Definition actor_base.h:476
subscription_info_ptr_t subscribe(Handler &&h, const address_ptr_t &addr) noexcept
subscribes actor's handler to process messages on the specified address
Definition supervisor.h:407
auto make_response(Request &message, Args &&...args)
makes response to the request, but does not send it.
Definition supervisor.h:599
void route(const address_ptr_t &addr, const address_ptr_t &next_addr, Args &&...args)
routes message to the destination address, then to the next address
Definition supervisor.h:372
request_builder_t< typename request_wrapper_t< R >::request_t > request_via(const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args)
returns request builder for destination address using the specified address for reply
void send(const address_ptr_t &addr, Args &&...args)
sends message to the destination address
Definition supervisor.h:367
request_builder_t< typename request_wrapper_t< R >::request_t > request(const address_ptr_t &dest_addr, Args &&...args)
returns request builder for destination address using the "main" actor address
void reply_with_error(Request &message, const extended_error_ptr_t &ec)
convenient method for constructing and sending error response to a request
Definition supervisor.h:611
supervisor_t * supervisor
non-owning pointer to actor's execution / infrastructure context
Definition actor_base.h:455
void reply_to(Request &message, Args &&...args)
convenient method for constructing and sending response to a request
Definition supervisor.h:607
address_ptr_t address
actor address
Definition actor_base.h:446
void unsubscribe(Handler &&h, address_ptr_t &addr) noexcept
unsubscribes actor's handler from process messages on the specified address
Definition supervisor.h:507
CRTP actor config builder.
Definition actor_config.h:92
system_context_t & system_context
reference to system_context_t
Definition actor_config.h:125
virtual bool validate() noexcept
checks whether config is valid, i.e. all necessary fields are set
Definition actor_config.h:197
actor_config_builder_t(install_action_t &&action_, supervisor_t *supervisor_)
ctor with install action and raw pointer to supervisor
Definition supervisor.h:616
config_t config
the currently build config
Definition actor_config.h:128
actor_ptr_t finish() &&
constructs actor from the current config
Definition supervisor.h:622
supervisor_t * supervisor
raw pointer to supervisor_t (is null for top-level supervisors)
Definition actor_config.h:122
std::function< void(actor_ptr_t &)> install_action_t
actor post-constructor callback type
Definition actor_config.h:107
install_action_t install_action
post-construction callback
Definition actor_config.h:119
NAT mechanism for rotor
Definition address_mapping.h:33
continue handler invocation (used for intercepting)
Definition handler.h:175
Base class for rotor message.
Definition message.h:52
create actor's addresses
Definition address_maker.h:24
supervisor's plugin for child-actors housekeeping
Definition child_manager.h:30
base implementation for messages delivery plugin
Definition delivery.h:62
const message_stringifier_t * stringifier
non-owning raw pointer to system's stringifier
Definition delivery.h:80
messages_queue_t * queue
non-owning raw pointer of supervisor's messages queue
Definition delivery.h:71
subscription_t * subscription_map
non-owning raw pointer to supervisor's subscriptions map
Definition delivery.h:77
address_t * address
non-owning raw pointer to supervisor's main address
Definition delivery.h:74
templated message delivery plugin, to allow local message delivery be customized
Definition delivery.h:84
size_t process() noexcept override
main messages dispatcher interface
allows non-local actors to subscribe on the local addresses of a supervisor.
Definition foreigners_support.h:17
manages actors init and shutdown procedures
Definition init_shutdown.h:22
static void discard(message_ptr_t &message, const message_stringifier_t *stringifier) noexcept
dumps discarded message
static void delivery(message_ptr_t &message, const subscription_t::joint_handlers_t &local_recipients, const message_stringifier_t *stringifier) noexcept
delivers the message to the recipients, possibly dumping it to console
manages all actor subscriptions (i.e. from plugins or actor itself).
Definition lifetime.h:21
allows actor to have active (client) role in linking
Definition link_client.h:34
allows actor to have passive (server) role in linking
Definition link_server.h:32
static void delivery(message_ptr_t &message, const subscription_t::joint_handlers_t &local_recipients) noexcept
delivers an message for self of one of child-actors (non-supervisors)
detects and assigns locality leader to the supervisor
Definition locality.h:22
auto & access() noexcept
generic non-public fields accessor
actor_base_t * actor
non-owning actor pointer
Definition plugin_base.h:179
subscription_info_ptr_t subscribe(Handler &&handler, const address_ptr_t &address) noexcept
subscribes plugin to the custom plugin handler on the specified address
Definition supervisor.h:415
handy access to registry_t, for name registration and discovery
Definition registry.h:35
"lock" for external resources
Definition resources.h:39
allows custom (actor) subscriptions and it is responsible for starting actor when it receives message...
Definition starter.h:19
subscription_info_ptr_t subscribe_actor(Handler &&handler) noexcept
subscribes actor handler on main actor address
Definition supervisor.h:429
builder pattern implementation for the original request
Definition request.hpp:387
request_builder_t(supervisor_t &sup_, actor_base_t &actor_, const address_ptr_t &destination_, const address_ptr_t &reply_to_, Args &&...args)
constructs request message but still does not dispatch it
Definition supervisor.h:513
request_id_t send(const pt::time_duration &send) noexcept
actually dispatches requests and spawns timeout timer
Definition supervisor.h:531
the recorded context, which is needed to produce error response to the original request
Definition request.hpp:301
type helper to deduce request/response messages from original (user-supplied) request type
Definition request.hpp:318
static message_ptr_t make_error_response(const address_ptr_t &reply_to, message_base_t &message, const extended_error_ptr_t &ee) noexcept
helper free function to produce error reply to the original request
Definition request.hpp:373
T request_t
an alias for the original request type
Definition request.hpp:45
allows automatically restart actors
Definition spawner.h:30
pair internal and external handler_t
Definition subscription.h:40
Holds and classifies message handlers on behalf of supervisor.
Definition subscription.h:30
CRTP supervisor config builder.
Definition supervisor_config.h:72
base supervisor config, which holds shutdown timeout value
Definition supervisor_config.h:23
supervisor is responsible for managing actors (workers) lifetime
Definition supervisor.h:68
virtual address_ptr_t instantiate_address(const void *locality) noexcept
creates new address with respect to supervisor locality mark
plugin::delivery_plugin_base_t * delivery
delivery plugin pointer
Definition supervisor.h:289
size_t do_process() noexcept
process queue of messages of locality leader
Definition supervisor.h:130
inbound_queue_t inbound_queue
inbound queue for external messages
Definition supervisor.h:298
const std::atomic_bool * shutdown_flag
when flag is set, the supervisor will shut self down
Definition supervisor.h:307
subscription_t subscription_map
main subscription support class
Definition supervisor.h:283
supervisor_config_builder_t< Supervisor > config_builder_t
injects templated supervisor_config_builder_t
Definition supervisor.h:96
std::unordered_map< request_id_t, request_curry_t > request_map_t
timer to response with timeout procedure type
Definition supervisor.h:256
void do_shutdown(const extended_error_ptr_t &reason={}) noexcept override
convenient method to send actor's supervisor shutdown trigger message
pt::time_duration shutdown_poll_frequency
frequency to check atomic shutdown flag
Definition supervisor.h:310
virtual address_ptr_t make_address() noexcept
creates new address_t linked with the supervisor
virtual void start() noexcept=0
thread-safe version of do_process
const address_ptr_t & get_registry_address() const noexcept
returns registry actor address (if it was defined or registry actor was created)
Definition supervisor.h:240
void subscribe(actor_base_t &actor, Handler &&handler)
templated version of subscribe_actor
Definition supervisor.h:185
void shutdown_finish() noexcept override
finalizes shutdown
void on_request_trigger(request_id_t timer_id, bool cancelled) noexcept
invoked as timer callback; creates response or just clean up for previously set request
virtual void do_start_timer(const pt::time_duration &interval, timer_handler_base_t &handler) noexcept=0
starts non-recurring timer (to be implemented in descendants)
virtual void do_cancel_timer(request_id_t timer_id) noexcept=0
cancels timer (to be implemented in descendants)
supervisor_config_t config_t
injects an alias for supervisor_config_t
Definition supervisor.h:93
virtual void shutdown() noexcept=0
thread-safe version of do_shutdown, i.e. send shutdown request let it be processed by the supervisor
void unsubscribe_actor(const address_ptr_t &addr, Handler &&handler) noexcept
convenient templated version of `unsubscribe_actor
Definition supervisor.h:190
plugin::child_manager_plugin_t * manager
child manager plugin pointer
Definition supervisor.h:292
request_id_t last_req_id
counter for request/timer ids
Definition supervisor.h:277
boost::lockfree::queue< message_base_t * > inbound_queue_t
lock-free queue for inbound messages
Definition supervisor.h:249
auto & access() noexcept
generic non-public fields accessor
auto create_actor()
creates child-actor builder
Definition supervisor.h:196
virtual void on_child_init(actor_base_t *actor, const extended_error_ptr_t &ec) noexcept
supervisor hook for reaction on child actor init
virtual void enqueue(message_ptr_t message) noexcept=0
enqueues messages thread safe way and triggers processing
supervisor_t(supervisor_config_t &config)
constructs new supervisor with optional parent supervisor
virtual void intercept(message_ptr_t &message, const void *tag, const continuation_t &continuation) noexcept
intercepts message delivery for the tagged handler
subscription_info_ptr_t subscribe(const handler_ptr_t &handler, const address_ptr_t &addr, const actor_base_t *owner_ptr, owner_tag_t owner_tag) noexcept
main subscription implementation
pt::time_duration poll_duration
how much time spend in active inbound queue polling
Definition supervisor.h:304
void put(message_ptr_t message)
puts a message into internal supervisor queue for further processing
Definition supervisor.h:182
size_t inbound_queue_size
size of inbound queue
Definition supervisor.h:301
std::tuple< plugin::address_maker_plugin_t, plugin::locality_plugin_t, plugin::delivery_plugin_t< plugin::default_local_delivery_t >, plugin::lifetime_plugin_t, plugin::init_shutdown_plugin_t, plugin::foreigners_support_plugin_t, plugin::child_manager_plugin_t, plugin::link_server_plugin_t, plugin::link_client_plugin_t, plugin::registry_plugin_t, plugin::resources_plugin_t, plugin::starter_plugin_t > plugins_list_t
the default list of plugins for an supervisor
Definition supervisor.h:77
spawner_t spawn(factory_t) noexcept
returns an actor spawner
virtual void on_child_shutdown(actor_base_t *actor) noexcept
supervisor hook for reaction on child actor shutdown
virtual void do_initialize(system_context_t *ctx) noexcept override
early actor initialization (pre-initialization)
request_map_t request_map
timer to response with timeout procedure
Definition supervisor.h:280
request_builder_t< T > do_request(actor_base_t &actor, const address_ptr_t &dest_addr, const address_ptr_t &reply_to, Args &&...args) noexcept
convenient method for request building
Definition supervisor.h:208
system_context_t * context
non-owning pointer to system context.
Definition supervisor.h:271
void on_start() noexcept override
actor is fully initialized and it's supervisor has sent signal to start
supervisor_t * locality_leader
root supervisor for the locality
Definition supervisor.h:295
supervisor_t * parent
non-owning pointer to parent supervisor, NULL for root supervisor
Definition supervisor.h:286
virtual void commit_unsubscription(const subscription_info_ptr_t &info) noexcept
removes the subscription point: local address and (foreign-or-local) handler pair
messages_queue_t queue
queue of unprocessed messages
Definition supervisor.h:274
The system context holds root supervisor_t (intrusive pointer) and may be loop-related details in der...
Definition system_context.h:32
virtual void on_error(actor_base_t *actor, const extended_error_ptr_t &ec) noexcept
fatal error handler
virtual std::string identity() noexcept
identifies the context.
auto create_supervisor()
returns builder for root supervisor
Definition supervisor.h:351
Base class for timer handler.
Definition timer_handler.hpp:17
templated implementation of timer handler
Definition timer_handler.hpp:42