supervisor is responsible for managing actors (workers) lifetime More...
#include <supervisor.h>
Public Member Functions | |
supervisor_t (supervisor_config_t &config) | |
constructs new supervisor with optional parent supervisor | |
supervisor_t (const supervisor_t &)=delete | |
supervisor_t (supervisor_t &&)=delete | |
virtual void | do_initialize (system_context_t *ctx) noexcept override |
early actor initialization (pre-initialization) | |
size_t | do_process () noexcept |
process queue of messages of locality leader | |
virtual address_ptr_t | make_address () noexcept |
creates new address_t linked with the supervisor | |
virtual void | commit_unsubscription (const subscription_info_ptr_t &info) noexcept |
removes the subscription point: local address and (foreign-or-local) handler pair | |
virtual void | start () noexcept=0 |
thread-safe version of do_process | |
void | on_start () noexcept override |
actor is fully initialized and it's supervisor has sent signal to start | |
virtual void | shutdown () noexcept=0 |
thread-safe version of do_shutdown , i.e. send shutdown request let it be processed by the supervisor | |
void | do_shutdown (const extended_error_ptr_t &reason={}) noexcept override |
convenient method to send actor's supervisor shutdown trigger message | |
void | shutdown_finish () noexcept override |
finalizes shutdown | |
virtual void | on_child_init (actor_base_t *actor, const extended_error_ptr_t &ec) noexcept |
supervisor hook for reaction on child actor init | |
virtual void | on_child_shutdown (actor_base_t *actor) noexcept |
supervisor hook for reaction on child actor shutdown | |
virtual void | enqueue (message_ptr_t message) noexcept=0 |
enqueues messages thread safe way and triggers processing | |
void | put (message_ptr_t message) |
puts a message into internal supervisor queue for further processing | |
template<typename Handler > | |
void | subscribe (actor_base_t &actor, Handler &&handler) |
templated version of subscribe_actor | |
template<typename Handler > | |
void | unsubscribe_actor (const address_ptr_t &addr, Handler &&handler) noexcept |
convenient templated version of `unsubscribe_actor | |
template<typename Actor > | |
auto | create_actor () |
creates child-actor builder | |
template<typename T , typename... Args> | |
request_builder_t< T > | do_request (actor_base_t &actor, const address_ptr_t &dest_addr, const address_ptr_t &reply_to, Args &&...args) noexcept |
convenient method for request building | |
subscription_info_ptr_t | subscribe (const handler_ptr_t &handler, const address_ptr_t &addr, const actor_base_t *owner_ptr, owner_tag_t owner_tag) noexcept |
main subscription implementation | |
spawner_t | spawn (factory_t) noexcept |
returns an actor spawner | |
const address_ptr_t & | get_registry_address () const noexcept |
returns registry actor address (if it was defined or registry actor was created) | |
template<typename T > | |
auto & | access () noexcept |
generic non-public fields accessor | |
template<typename T , typename... Args> | |
auto | access (Args... args) noexcept |
generic non-public methods accessor | |
template<typename Handler > | |
subscription_info_ptr_t | subscribe (Handler &&h, const address_ptr_t &addr) noexcept |
subscribes actor's handler to process messages on the specified address | |
template<typename Handler > | |
subscription_info_ptr_t | subscribe (Handler &&h) noexcept |
subscribes actor's handler to process messages on the actor's "main" address | |
Public Member Functions inherited from rotor::actor_base_t | |
actor_base_t (config_t &cfg) | |
constructs actor and links it's supervisor | |
template<typename M , typename... Args> | |
void | send (const address_ptr_t &addr, Args &&...args) |
sends message to the destination address | |
template<typename R , typename... Args> | |
request_builder_t< typename request_wrapper_t< R >::request_t > | request (const address_ptr_t &dest_addr, Args &&...args) |
returns request builder for destination address using the "main" actor address | |
template<typename R , typename... Args> | |
request_builder_t< typename request_wrapper_t< R >::request_t > | request_via (const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args) |
returns request builder for destination address using the specified address for reply | |
template<typename Request , typename... Args> | |
void | reply_to (Request &message, Args &&...args) |
convenient method for constructing and sending response to a request | |
template<typename Request > | |
void | reply_with_error (Request &message, const extended_error_ptr_t &ec) |
convenient method for constructing and sending error response to a request | |
template<typename Request , typename... Args> | |
auto | make_response (Request &message, Args &&...args) |
makes response to the request, but does not send it. | |
template<typename Request > | |
auto | make_response (Request &message, const extended_error_ptr_t &ec) |
makes error response to the request, but does not send it. | |
template<typename Handler > | |
subscription_info_ptr_t | subscribe (Handler &&h, const address_ptr_t &addr) noexcept |
subscribes actor's handler to process messages on the specified address | |
template<typename Handler > | |
subscription_info_ptr_t | subscribe (Handler &&h) noexcept |
subscribes actor's handler to process messages on the actor's "main" address | |
template<typename Handler , typename = is_handler<Handler>> | |
void | unsubscribe (Handler &&h, address_ptr_t &addr) noexcept |
unsubscribes actor's handler from process messages on the specified address | |
template<typename Handler , typename = is_handler<Handler>> | |
void | unsubscribe (Handler &&h) noexcept |
unsubscribes actor's handler from processing messages on the actor's "main" address | |
void | unsubscribe (const handler_ptr_t &h) noexcept |
initiates handler unsubscription from the default actor address | |
void | activate_plugins () noexcept |
starts plugins activation | |
void | commit_plugin_activation (plugin::plugin_base_t &plugin, bool success) noexcept |
finishes plugin activation, successful or not | |
void | deactivate_plugins () noexcept |
starts plugins deactivation | |
void | commit_plugin_deactivation (plugin::plugin_base_t &plugin) noexcept |
finishes plugin deactivation | |
void | on_subscription (message::subscription_t &message) noexcept |
propagates subscription message to corresponding actors | |
void | on_unsubscription (message::unsubscription_t &message) noexcept |
propagates unsubscription message to corresponding actors | |
void | on_unsubscription_external (message::unsubscription_external_t &message) noexcept |
propagates external unsubscription message to corresponding actors | |
address_ptr_t | create_address () noexcept |
creates new unique address for an actor (via address_maker plugin) | |
virtual void | shutdown_start () noexcept |
starts shutdown procedure, e.g. upon receiving shutdown request | |
void | shutdown_continue () noexcept |
polls plugins for shutdown | |
virtual void | init_start () noexcept |
starts initialization procedure | |
void | init_continue () noexcept |
polls plugins whether they completed initialization. | |
virtual void | init_finish () noexcept |
finalizes initialization | |
virtual void | configure (plugin::plugin_base_t &plugin) noexcept |
main callback for plugin configuration when it's ready | |
template<typename T > | |
auto & | access () noexcept |
generic non-public fields accessor | |
template<typename T , typename... Args> | |
auto | access (Args... args) noexcept |
generic non-public methods accessor | |
template<typename T > | |
auto & | access () const noexcept |
generic non-public fields accessor | |
template<typename T , typename... Args> | |
auto | access (Args... args) const noexcept |
generic non-public methods accessor | |
const address_ptr_t & | get_address () const noexcept |
returns actor's main address | |
supervisor_t & | get_supervisor () const noexcept |
returns actor's supervisor | |
template<typename Delegate , typename Method , typename = std::enable_if_t<std::is_invocable_v<Method, Delegate *, request_id_t, bool>>> | |
request_id_t | start_timer (const pt::time_duration &interval, Delegate &delegate, Method method) noexcept |
spawns a new one-shot timer | |
void | cancel_timer (request_id_t request_id) noexcept |
cancels previously started timer | |
const extended_error_ptr_t & | get_shutdown_reason () const noexcept |
returns actor shutdown reason | |
const std::string & | get_identity () const noexcept |
retuns human-readable actor identity | |
virtual bool | should_restart () const noexcept |
whether spawner should create a new instance of the actor | |
template<typename Request , typename... Args> | |
request_builder_t< typename request_wrapper_t< Request >::request_t > | request (const address_ptr_t &dest_addr, Args &&...args) |
makes an request to the destination address with the message constructed from args | |
template<typename Request , typename... Args> | |
request_builder_t< typename request_wrapper_t< Request >::request_t > | request_via (const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args) |
makes an request to the destination address with the message constructed from args | |
Protected Types | |
using | request_map_t = std::unordered_map< request_id_t, request_curry_t > |
timer to response with timeout procedure type | |
Protected Types inherited from rotor::actor_base_t | |
using | timers_map_t = std::unordered_map< request_id_t, timer_handler_ptr_t > |
timer-id to timer-handler map (type) | |
using | requests_t = std::unordered_set< request_id_t > |
list of ids of active requests (type) | |
Protected Member Functions | |
virtual address_ptr_t | instantiate_address (const void *locality) noexcept |
creates new address with respect to supervisor locality mark | |
void | on_request_trigger (request_id_t timer_id, bool cancelled) noexcept |
invoked as timer callback; creates response or just clean up for previously set request | |
virtual void | do_start_timer (const pt::time_duration &interval, timer_handler_base_t &handler) noexcept=0 |
starts non-recurring timer (to be implemented in descendants) | |
virtual void | do_cancel_timer (request_id_t timer_id) noexcept=0 |
cancels timer (to be implemented in descendants) | |
virtual void | intercept (message_ptr_t &message, const void *tag, const continuation_t &continuation) noexcept |
intercepts message delivery for the tagged handler | |
Protected Member Functions inherited from rotor::actor_base_t | |
void | on_timer_trigger (request_id_t request_id, bool cancelled) noexcept |
triggers timer handler associated with the timer id | |
template<typename Delegate , typename Method > | |
void | start_timer (request_id_t request_id, const pt::time_duration &interval, Delegate &delegate, Method method) noexcept |
starts timer with pre-forged timer id (aka request-id | |
void | assign_shutdown_reason (extended_error_ptr_t reason) noexcept |
helper-method, which assigns shutdown reason if it isn't set | |
extended_error_ptr_t | make_error (const std::error_code &ec, const extended_error_ptr_t &next={}, const message_ptr_t &request={}) const noexcept |
makes extended error within the context of the actor | |
virtual bool | on_unlink (const address_ptr_t &server_addr) noexcept |
notification, when actor has been unlinked from server actor | |
plugin::plugin_base_t * | get_plugin (const std::type_index &) const noexcept |
finds plugin by plugin class identity | |
Protected Attributes | |
system_context_t * | context |
non-owning pointer to system context. | |
messages_queue_t | queue |
queue of unprocessed messages | |
request_id_t | last_req_id |
counter for request/timer ids | |
request_map_t | request_map |
timer to response with timeout procedure | |
subscription_t | subscription_map |
main subscription support class | |
supervisor_t * | parent |
non-owning pointer to parent supervisor, NULL for root supervisor | |
plugin::delivery_plugin_base_t * | delivery = nullptr |
delivery plugin pointer | |
plugin::child_manager_plugin_t * | manager = nullptr |
child manager plugin pointer | |
supervisor_t * | locality_leader |
root supervisor for the locality | |
inbound_queue_t | inbound_queue |
inbound queue for external messages | |
size_t | inbound_queue_size |
size of inbound queue | |
pt::time_duration | poll_duration |
how much time spend in active inbound queue polling | |
const std::atomic_bool * | shutdown_flag = nullptr |
when flag is set, the supervisor will shut self down | |
pt::time_duration | shutdown_poll_frequency = pt::millisec{100} |
frequency to check atomic shutdown flag | |
Protected Attributes inherited from rotor::actor_base_t | |
intrusive_ptr_t< message::init_request_t > | init_request |
suspended init request message | |
intrusive_ptr_t< message::shutdown_request_t > | shutdown_request |
suspended shutdown request message | |
address_ptr_t | address |
actor address | |
address_ptr_t | spawner_address |
actor spawner address | |
std::string | identity |
actor identity, which might have some meaning for developers | |
supervisor_t * | supervisor |
non-owning pointer to actor's execution / infrastructure context | |
plugin_storage_ptr_t | plugins_storage |
opaque plugins storage (owning) | |
plugins_t | plugins |
non-owning list of plugins | |
pt::time_duration | init_timeout |
timeout for actor initialization (used by supervisor) | |
pt::time_duration | shutdown_timeout |
timeout for actor shutdown (used by supervisor) | |
state_t | state |
current actor state | |
plugin::address_maker_plugin_t * | address_maker = nullptr |
non-owning pointer to address_maker plugin | |
plugin::lifetime_plugin_t * | lifetime = nullptr |
non-owning pointer to lifetime plugin | |
plugin::link_server_plugin_t * | link_server = nullptr |
non-owning pointer to link_server plugin | |
plugin::resources_plugin_t * | resources = nullptr |
non-owning pointer to resources plugin | |
std::set< const std::type_index * > | activating_plugins |
set of activating plugin identities | |
std::set< const std::type_index * > | deactivating_plugins |
set of deactivating plugin identities | |
timers_map_t | timers_map |
timer-id to timer-handler map | |
requests_t | active_requests |
list of ids of active requests | |
std::uint32_t | continuation_mask = 0 |
set of currently processing states, i.e. init or shutdown | |
extended_error_ptr_t | shutdown_reason |
explanation, why actor is been requested for shut down | |
Additional Inherited Members | |
Static Public Attributes inherited from rotor::actor_base_t | |
static const constexpr std::uint32_t | PROGRESS_INIT = 1 << 0 |
flag to mark, that actor is already executing initialization | |
static const constexpr std::uint32_t | PROGRESS_SHUTDOWN = 1 << 1 |
flag to mark, that actor is already executing shutdown | |
static const constexpr std::uint32_t | ESCALATE_FALIURE = 1 << 2 |
flag to mark, that actor is already executing shutdown | |
static const constexpr std::uint32_t | AUTOSHUTDOWN_SUPERVISOR = 1 << 3 |
flag to mark, that actor trigger supervisor shutdown | |
supervisor is responsible for managing actors (workers) lifetime
Supervisor starts, stops actors (children/workers) and process messages. The message processing is basically sorting messages by their destination address_t
: if an address belongs to the supervisor, then message is dispatched locally, otherwise the message is forwarded to supervisor, which owns address.
During message dispatching phase, supervisor examines handlers (handler_base_t
), if they are local, then a message in immediately delivered to it (i.e. a local actor is invoked immediately), otherwise is is forwarded for delivery to the supervisor, which owns the handler.
Supervisor is responsible for managing it's local actors lifetime, i.e. sending initialization, start, shutdown requests etc.
Supervisor is locality-aware: i.e. if two supervisors have the same locality (i.e. executed in the same thread/event loop), it takes advantage of this and immediately delivers message to the target supervisor without involving any synchronization mechanisms. In other words, a message is delivered to any actor of the locality, even if the actor is not child of the current supervisor.
As supervisor is special kind of actor, it should be possible to spawn other supervisors constructing tree-like organization of responsibilities.
Unlike Erlang's supervisor, rotor's supervisor does not spawn actors if they terminated. It should be possible, however, to implement it in derived classes with application-specific logic.
This supervisor class is abstract, and the concrete implementation is is event-loop specific, i.e. it should know how to start/stop shutdown timers, how to trigger messages processing in thread-safe way, how to deliver message to a supervisor in a thread-safe way etc.
the default list of plugins for an supervisor
The order of plugins is very important, as they are initialized in the direct order and deinitilized in the reverse order.
|
protectedpure virtualnoexcept |
cancels timer (to be implemented in descendants)
Implemented in rotor::asio::supervisor_asio_t, rotor::ev::supervisor_ev_t, rotor::fltk::supervisor_fltk_t, rotor::thread::supervisor_thread_t, and rotor::wx::supervisor_wx_t.
|
overridevirtualnoexcept |
early actor initialization (pre-initialization)
Actor's plugins are activated, "main" address is created (via plugin::address_maker_plugin_t
), state is set to INITIALIZING
(via plugin::init_shutdown_plugin_t
).
Reimplemented from rotor::actor_base_t.
Reimplemented in rotor::ev::supervisor_ev_t.
|
inlinenoexcept |
process queue of messages of locality leader
The locality leaders queue queue
of messages is processed.
deliver_local
method is used.It is expected, that derived classes should invoke do_process
message, whenever it is known that there are messages for processing. The invocation should be performed in safe thread/loop context.
The method should be invoked in event-loop context only.
The method returns amount of messages it enqueued for other locality leaders (i.e. to be processed externally).
|
inlinenoexcept |
convenient method for request building
The built request isn't sent immediately, but only after invoking send(timeout)
|
overridevirtualnoexcept |
convenient method to send actor's supervisor shutdown trigger message
If actor is already shutting down, the method will do nothing, otherwise it will send shutdown trigger to its supervisor.
The shutdown reason is forwarded "as is". If it is missing, than it will be constructed with the error code "normal shutdown".
Reimplemented from rotor::actor_base_t.
|
protectedpure virtualnoexcept |
starts non-recurring timer (to be implemented in descendants)
Implemented in rotor::asio::supervisor_asio_t, rotor::ev::supervisor_ev_t, rotor::fltk::supervisor_fltk_t, rotor::thread::supervisor_thread_t, and rotor::wx::supervisor_wx_t.
|
pure virtualnoexcept |
enqueues messages thread safe way and triggers processing
This is the only method for deliver message outside of rotor
context. Basically it is put
and process
in the event loop context.
The thread-safety should be guaranteed by derived class and/or used event-loop.
This method is used for messaging between supervisors with different localities, event loops or threads.
Implemented in rotor::asio::supervisor_asio_t, rotor::ev::supervisor_ev_t, rotor::fltk::supervisor_fltk_t, rotor::thread::supervisor_thread_t, and rotor::wx::supervisor_wx_t.
|
protectedvirtualnoexcept |
intercepts message delivery for the tagged handler
Reimplemented in rotor::thread::supervisor_thread_t.
|
virtualnoexcept |
creates new address_t
linked with the supervisor
Reimplemented in rotor::asio::supervisor_asio_t.
|
overridevirtualnoexcept |
actor is fully initialized and it's supervisor has sent signal to start
The actor state is set to OPERATIONAL
.
Reimplemented from rotor::actor_base_t.
|
inline |
puts a message into internal supervisor queue for further processing
This is thread-unsafe method. The enqueue
method should be used to put a new message from external context in thread-safe way.
|
pure virtualnoexcept |
thread-safe version of do_shutdown
, i.e. send shutdown request let it be processed by the supervisor
Implemented in rotor::asio::supervisor_asio_t, rotor::ev::supervisor_ev_t, rotor::fltk::supervisor_fltk_t, rotor::thread::supervisor_thread_t, and rotor::wx::supervisor_wx_t.
|
overridevirtualnoexcept |
finalizes shutdown
The shutdown response is sent and actor state is set to SHUT_DOWN.
This is the last action in the shutdown sequence. No further methods will be invoked on the actor.
All unfinished requests and untriggered timers will be cancelled by force in the method.
Reimplemented from rotor::actor_base_t.
returns an actor spawner
Spawner allows to create a new actor instance, when the current actor instance is down. Different policies (reactions) can be applied.
|
pure virtualnoexcept |
thread-safe version of do_process
Starts supervisor to processing messages queue in safe thread/loop context. Once it becomes empty, the method returns
Implemented in rotor::asio::supervisor_asio_t, rotor::ev::supervisor_ev_t, rotor::fltk::supervisor_fltk_t, rotor::thread::supervisor_thread_t, and rotor::wx::supervisor_wx_t.
|
noexcept |
main subscription implementation
The subscription point is materialized into subscription info. If address is internal/local, then it is immediately confirmed to the source actor as payload::subscription_confirmation_t
.
Otherwise, if the address is external (foreign), then subscription request is forwarded to appropiate supervisor as payload::external_subscription_t
request.
The materialized subscription info is returned.