rotor
Event loop friendly C++ actor micro-framework
 
Loading...
Searching...
No Matches
rotor::supervisor_t Struct Referenceabstract

supervisor is responsible for managing actors (workers) lifetime More...

#include <supervisor.h>

Inheritance diagram for rotor::supervisor_t:
rotor::actor_base_t rotor::asio::supervisor_asio_t rotor::ev::supervisor_ev_t rotor::fltk::supervisor_fltk_t rotor::thread::supervisor_thread_t rotor::wx::supervisor_wx_t

Public Types

using plugins_list_t = std::tuple< plugin::address_maker_plugin_t, plugin::locality_plugin_t, plugin::delivery_plugin_t< plugin::default_local_delivery_t >, plugin::lifetime_plugin_t, plugin::init_shutdown_plugin_t, plugin::foreigners_support_plugin_t, plugin::child_manager_plugin_t, plugin::link_server_plugin_t, plugin::link_client_plugin_t, plugin::registry_plugin_t, plugin::resources_plugin_t, plugin::starter_plugin_t >
 the default list of plugins for an supervisor
 
using config_t = supervisor_config_t
 injects an alias for supervisor_config_t
 
template<typename Supervisor >
using config_builder_t = supervisor_config_builder_t< Supervisor >
 injects templated supervisor_config_builder_t
 
using inbound_queue_t = boost::lockfree::queue< message_base_t * >
 lock-free queue for inbound messages
 
- Public Types inherited from rotor::actor_base_t
using config_t = actor_config_t
 injects an alias for actor_config_t
 
template<typename Actor >
using config_builder_t = actor_config_builder_t< Actor >
 injects templated actor_config_builder_t
 
template<typename Handler >
using is_handler = std::enable_if_t< std::is_member_function_pointer_v< Handler >||std::is_base_of_v< handler_base_t, Handler > >
 SFINAE handler detector.
 
using plugins_list_t = std::tuple< plugin::address_maker_plugin_t, plugin::lifetime_plugin_t, plugin::init_shutdown_plugin_t, plugin::link_server_plugin_t, plugin::link_client_plugin_t, plugin::registry_plugin_t, plugin::resources_plugin_t, plugin::starter_plugin_t >
 the default list of plugins for an actor
 

Public Member Functions

 supervisor_t (supervisor_config_t &config)
 constructs new supervisor with optional parent supervisor
 
 supervisor_t (const supervisor_t &)=delete
 
 supervisor_t (supervisor_t &&)=delete
 
virtual void do_initialize (system_context_t *ctx) noexcept override
 early actor initialization (pre-initialization)
 
size_t do_process () noexcept
 process queue of messages of locality leader
 
virtual address_ptr_t make_address () noexcept
 creates new address_t linked with the supervisor
 
virtual void commit_unsubscription (const subscription_info_ptr_t &info) noexcept
 removes the subscription point: local address and (foreign-or-local) handler pair
 
virtual void start () noexcept=0
 thread-safe version of do_process
 
void on_start () noexcept override
 actor is fully initialized and it's supervisor has sent signal to start
 
virtual void shutdown () noexcept=0
 thread-safe version of do_shutdown, i.e. send shutdown request let it be processed by the supervisor
 
void do_shutdown (const extended_error_ptr_t &reason={}) noexcept override
 convenient method to send actor's supervisor shutdown trigger message
 
void shutdown_finish () noexcept override
 finalizes shutdown
 
virtual void on_child_init (actor_base_t *actor, const extended_error_ptr_t &ec) noexcept
 supervisor hook for reaction on child actor init
 
virtual void on_child_shutdown (actor_base_t *actor) noexcept
 supervisor hook for reaction on child actor shutdown
 
virtual void enqueue (message_ptr_t message) noexcept=0
 enqueues messages thread safe way and triggers processing
 
void put (message_ptr_t message)
 puts a message into internal supervisor queue for further processing
 
template<typename Handler >
void subscribe (actor_base_t &actor, Handler &&handler)
 templated version of subscribe_actor
 
template<typename Handler >
void unsubscribe_actor (const address_ptr_t &addr, Handler &&handler) noexcept
 convenient templated version of `unsubscribe_actor
 
template<typename Actor >
auto create_actor ()
 creates child-actor builder
 
template<typename T , typename... Args>
request_builder_t< T > do_request (actor_base_t &actor, const address_ptr_t &dest_addr, const address_ptr_t &reply_to, Args &&...args) noexcept
 convenient method for request building
 
subscription_info_ptr_t subscribe (const handler_ptr_t &handler, const address_ptr_t &addr, const actor_base_t *owner_ptr, owner_tag_t owner_tag) noexcept
 main subscription implementation
 
spawner_t spawn (factory_t) noexcept
 returns an actor spawner
 
const address_ptr_tget_registry_address () const noexcept
 returns registry actor address (if it was defined or registry actor was created)
 
template<typename T >
auto & access () noexcept
 generic non-public fields accessor
 
template<typename T , typename... Args>
auto access (Args... args) noexcept
 generic non-public methods accessor
 
template<typename Handler >
subscription_info_ptr_t subscribe (Handler &&h, const address_ptr_t &addr) noexcept
 subscribes actor's handler to process messages on the specified address
 
template<typename Handler >
subscription_info_ptr_t subscribe (Handler &&h) noexcept
 subscribes actor's handler to process messages on the actor's "main" address
 
- Public Member Functions inherited from rotor::actor_base_t
 actor_base_t (config_t &cfg)
 constructs actor and links it's supervisor
 
template<typename M , typename... Args>
void send (const address_ptr_t &addr, Args &&...args)
 sends message to the destination address
 
template<typename R , typename... Args>
request_builder_t< typename request_wrapper_t< R >::request_t > request (const address_ptr_t &dest_addr, Args &&...args)
 returns request builder for destination address using the "main" actor address
 
template<typename R , typename... Args>
request_builder_t< typename request_wrapper_t< R >::request_t > request_via (const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args)
 returns request builder for destination address using the specified address for reply
 
template<typename Request , typename... Args>
void reply_to (Request &message, Args &&...args)
 convenient method for constructing and sending response to a request
 
template<typename Request >
void reply_with_error (Request &message, const extended_error_ptr_t &ec)
 convenient method for constructing and sending error response to a request
 
template<typename Request , typename... Args>
auto make_response (Request &message, Args &&...args)
 makes response to the request, but does not send it.
 
template<typename Request >
auto make_response (Request &message, const extended_error_ptr_t &ec)
 makes error response to the request, but does not send it.
 
template<typename Handler >
subscription_info_ptr_t subscribe (Handler &&h, const address_ptr_t &addr) noexcept
 subscribes actor's handler to process messages on the specified address
 
template<typename Handler >
subscription_info_ptr_t subscribe (Handler &&h) noexcept
 subscribes actor's handler to process messages on the actor's "main" address
 
template<typename Handler , typename = is_handler<Handler>>
void unsubscribe (Handler &&h, address_ptr_t &addr) noexcept
 unsubscribes actor's handler from process messages on the specified address
 
template<typename Handler , typename = is_handler<Handler>>
void unsubscribe (Handler &&h) noexcept
 unsubscribes actor's handler from processing messages on the actor's "main" address
 
void unsubscribe (const handler_ptr_t &h) noexcept
 initiates handler unsubscription from the default actor address
 
void activate_plugins () noexcept
 starts plugins activation
 
void commit_plugin_activation (plugin::plugin_base_t &plugin, bool success) noexcept
 finishes plugin activation, successful or not
 
void deactivate_plugins () noexcept
 starts plugins deactivation
 
void commit_plugin_deactivation (plugin::plugin_base_t &plugin) noexcept
 finishes plugin deactivation
 
void on_subscription (message::subscription_t &message) noexcept
 propagates subscription message to corresponding actors
 
void on_unsubscription (message::unsubscription_t &message) noexcept
 propagates unsubscription message to corresponding actors
 
void on_unsubscription_external (message::unsubscription_external_t &message) noexcept
 propagates external unsubscription message to corresponding actors
 
address_ptr_t create_address () noexcept
 creates new unique address for an actor (via address_maker plugin)
 
virtual void shutdown_start () noexcept
 starts shutdown procedure, e.g. upon receiving shutdown request
 
void shutdown_continue () noexcept
 polls plugins for shutdown
 
virtual void init_start () noexcept
 starts initialization procedure
 
void init_continue () noexcept
 polls plugins whether they completed initialization.
 
virtual void init_finish () noexcept
 finalizes initialization
 
virtual void configure (plugin::plugin_base_t &plugin) noexcept
 main callback for plugin configuration when it's ready
 
template<typename T >
auto & access () noexcept
 generic non-public fields accessor
 
template<typename T , typename... Args>
auto access (Args... args) noexcept
 generic non-public methods accessor
 
template<typename T >
auto & access () const noexcept
 generic non-public fields accessor
 
template<typename T , typename... Args>
auto access (Args... args) const noexcept
 generic non-public methods accessor
 
const address_ptr_tget_address () const noexcept
 returns actor's main address
 
supervisor_tget_supervisor () const noexcept
 returns actor's supervisor
 
template<typename Delegate , typename Method , typename = std::enable_if_t<std::is_invocable_v<Method, Delegate *, request_id_t, bool>>>
request_id_t start_timer (const pt::time_duration &interval, Delegate &delegate, Method method) noexcept
 spawns a new one-shot timer
 
void cancel_timer (request_id_t request_id) noexcept
 cancels previously started timer
 
const extended_error_ptr_tget_shutdown_reason () const noexcept
 returns actor shutdown reason
 
const std::string & get_identity () const noexcept
 retuns human-readable actor identity
 
virtual bool should_restart () const noexcept
 whether spawner should create a new instance of the actor
 
template<typename Request , typename... Args>
request_builder_t< typename request_wrapper_t< Request >::request_t > request (const address_ptr_t &dest_addr, Args &&...args)
 makes an request to the destination address with the message constructed from args
 
template<typename Request , typename... Args>
request_builder_t< typename request_wrapper_t< Request >::request_t > request_via (const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args)
 makes an request to the destination address with the message constructed from args
 

Protected Types

using request_map_t = std::unordered_map< request_id_t, request_curry_t >
 timer to response with timeout procedure type
 
- Protected Types inherited from rotor::actor_base_t
using timers_map_t = std::unordered_map< request_id_t, timer_handler_ptr_t >
 timer-id to timer-handler map (type)
 
using requests_t = std::unordered_set< request_id_t >
 list of ids of active requests (type)
 

Protected Member Functions

virtual address_ptr_t instantiate_address (const void *locality) noexcept
 creates new address with respect to supervisor locality mark
 
void on_request_trigger (request_id_t timer_id, bool cancelled) noexcept
 invoked as timer callback; creates response or just clean up for previously set request
 
virtual void do_start_timer (const pt::time_duration &interval, timer_handler_base_t &handler) noexcept=0
 starts non-recurring timer (to be implemented in descendants)
 
virtual void do_cancel_timer (request_id_t timer_id) noexcept=0
 cancels timer (to be implemented in descendants)
 
virtual void intercept (message_ptr_t &message, const void *tag, const continuation_t &continuation) noexcept
 intercepts message delivery for the tagged handler
 
- Protected Member Functions inherited from rotor::actor_base_t
void on_timer_trigger (request_id_t request_id, bool cancelled) noexcept
 triggers timer handler associated with the timer id
 
template<typename Delegate , typename Method >
void start_timer (request_id_t request_id, const pt::time_duration &interval, Delegate &delegate, Method method) noexcept
 starts timer with pre-forged timer id (aka request-id
 
void assign_shutdown_reason (extended_error_ptr_t reason) noexcept
 helper-method, which assigns shutdown reason if it isn't set
 
extended_error_ptr_t make_error (const std::error_code &ec, const extended_error_ptr_t &next={}, const message_ptr_t &request={}) const noexcept
 makes extended error within the context of the actor
 
virtual bool on_unlink (const address_ptr_t &server_addr) noexcept
 notification, when actor has been unlinked from server actor
 
plugin::plugin_base_tget_plugin (const std::type_index &) const noexcept
 finds plugin by plugin class identity
 

Protected Attributes

system_context_tcontext
 non-owning pointer to system context.
 
messages_queue_t queue
 queue of unprocessed messages
 
request_id_t last_req_id
 counter for request/timer ids
 
request_map_t request_map
 timer to response with timeout procedure
 
subscription_t subscription_map
 main subscription support class

 
supervisor_tparent
 non-owning pointer to parent supervisor, NULL for root supervisor
 
plugin::delivery_plugin_base_tdelivery = nullptr
 delivery plugin pointer
 
plugin::child_manager_plugin_tmanager = nullptr
 child manager plugin pointer
 
supervisor_tlocality_leader
 root supervisor for the locality
 
inbound_queue_t inbound_queue
 inbound queue for external messages
 
size_t inbound_queue_size
 size of inbound queue
 
pt::time_duration poll_duration
 how much time spend in active inbound queue polling
 
const std::atomic_bool * shutdown_flag = nullptr
 when flag is set, the supervisor will shut self down
 
pt::time_duration shutdown_poll_frequency = pt::millisec{100}
 frequency to check atomic shutdown flag
 
- Protected Attributes inherited from rotor::actor_base_t
intrusive_ptr_t< message::init_request_tinit_request
 suspended init request message
 
intrusive_ptr_t< message::shutdown_request_tshutdown_request
 suspended shutdown request message
 
address_ptr_t address
 actor address
 
address_ptr_t spawner_address
 actor spawner address
 
std::string identity
 actor identity, which might have some meaning for developers
 
supervisor_tsupervisor
 non-owning pointer to actor's execution / infrastructure context
 
plugin_storage_ptr_t plugins_storage
 opaque plugins storage (owning)
 
plugins_t plugins
 non-owning list of plugins
 
pt::time_duration init_timeout
 timeout for actor initialization (used by supervisor)
 
pt::time_duration shutdown_timeout
 timeout for actor shutdown (used by supervisor)
 
state_t state
 current actor state
 
plugin::address_maker_plugin_taddress_maker = nullptr
 non-owning pointer to address_maker plugin
 
plugin::lifetime_plugin_tlifetime = nullptr
 non-owning pointer to lifetime plugin
 
plugin::link_server_plugin_tlink_server = nullptr
 non-owning pointer to link_server plugin
 
plugin::resources_plugin_tresources = nullptr
 non-owning pointer to resources plugin
 
std::set< const std::type_index * > activating_plugins
 set of activating plugin identities
 
std::set< const std::type_index * > deactivating_plugins
 set of deactivating plugin identities
 
timers_map_t timers_map
 timer-id to timer-handler map
 
requests_t active_requests
 list of ids of active requests
 
std::uint32_t continuation_mask = 0
 set of currently processing states, i.e. init or shutdown
 
extended_error_ptr_t shutdown_reason
 explanation, why actor is been requested for shut down
 

Friends

template<typename T >
struct request_builder_t
 
template<typename Supervisor >
struct actor_config_builder_t
 
struct plugin::delivery_plugin_base_t
 
struct actor_base_t
 
template<typename T >
struct plugin::delivery_plugin_t
 

Additional Inherited Members

- Static Public Attributes inherited from rotor::actor_base_t
static const constexpr std::uint32_t PROGRESS_INIT = 1 << 0
 flag to mark, that actor is already executing initialization
 
static const constexpr std::uint32_t PROGRESS_SHUTDOWN = 1 << 1
 flag to mark, that actor is already executing shutdown
 
static const constexpr std::uint32_t ESCALATE_FALIURE = 1 << 2
 flag to mark, that actor is already executing shutdown
 
static const constexpr std::uint32_t AUTOSHUTDOWN_SUPERVISOR = 1 << 3
 flag to mark, that actor trigger supervisor shutdown
 

Detailed Description

supervisor is responsible for managing actors (workers) lifetime

Supervisor starts, stops actors (children/workers) and process messages. The message processing is basically sorting messages by their destination address_t: if an address belongs to the supervisor, then message is dispatched locally, otherwise the message is forwarded to supervisor, which owns address.

During message dispatching phase, supervisor examines handlers (handler_base_t), if they are local, then a message in immediately delivered to it (i.e. a local actor is invoked immediately), otherwise is is forwarded for delivery to the supervisor, which owns the handler.

Supervisor is responsible for managing it's local actors lifetime, i.e. sending initialization, start, shutdown requests etc.

Supervisor is locality-aware: i.e. if two supervisors have the same locality (i.e. executed in the same thread/event loop), it takes advantage of this and immediately delivers message to the target supervisor without involving any synchronization mechanisms. In other words, a message is delivered to any actor of the locality, even if the actor is not child of the current supervisor.

As supervisor is special kind of actor, it should be possible to spawn other supervisors constructing tree-like organization of responsibilities.

Unlike Erlang's supervisor, rotor's supervisor does not spawn actors if they terminated. It should be possible, however, to implement it in derived classes with application-specific logic.

This supervisor class is abstract, and the concrete implementation is is event-loop specific, i.e. it should know how to start/stop shutdown timers, how to trigger messages processing in thread-safe way, how to deliver message to a supervisor in a thread-safe way etc.

Member Typedef Documentation

◆ plugins_list_t

Member Function Documentation

◆ do_cancel_timer()

virtual void rotor::supervisor_t::do_cancel_timer ( request_id_t  timer_id)
protectedpure virtualnoexcept

◆ do_initialize()

virtual void rotor::supervisor_t::do_initialize ( system_context_t ctx)
overridevirtualnoexcept

early actor initialization (pre-initialization)

Actor's plugins are activated, "main" address is created (via plugin::address_maker_plugin_t), state is set to INITIALIZING (via plugin::init_shutdown_plugin_t).

Reimplemented from rotor::actor_base_t.

Reimplemented in rotor::ev::supervisor_ev_t.

◆ do_process()

size_t rotor::supervisor_t::do_process ( )
inlinenoexcept

process queue of messages of locality leader

The locality leaders queue queue of messages is processed.

  1. It takes message from the queue
  2. If the message destination address belongs to the foreign the supervisor, then it is forwarded to it immediately.
  3. Otherwise, the message is local, i.e. either for the supervisor or one of its non-supervisor children (internal), or to other supervisor within the same locality.
  4. in the former case the message is immediately delivered locally in the context of current supervisor; in the latter case in the context of other supervisor. In the both cases deliver_local method is used.

It is expected, that derived classes should invoke do_process message, whenever it is known that there are messages for processing. The invocation should be performed in safe thread/loop context.

The method should be invoked in event-loop context only.

The method returns amount of messages it enqueued for other locality leaders (i.e. to be processed externally).

◆ do_request()

template<typename T , typename... Args>
request_builder_t< T > rotor::supervisor_t::do_request ( actor_base_t actor,
const address_ptr_t dest_addr,
const address_ptr_t reply_to,
Args &&...  args 
)
inlinenoexcept

convenient method for request building

The built request isn't sent immediately, but only after invoking send(timeout)

◆ do_shutdown()

void rotor::supervisor_t::do_shutdown ( const extended_error_ptr_t reason = {})
overridevirtualnoexcept

convenient method to send actor's supervisor shutdown trigger message

If actor is already shutting down, the method will do nothing, otherwise it will send shutdown trigger to its supervisor.

The shutdown reason is forwarded "as is". If it is missing, than it will be constructed with the error code "normal shutdown".

Reimplemented from rotor::actor_base_t.

◆ do_start_timer()

virtual void rotor::supervisor_t::do_start_timer ( const pt::time_duration &  interval,
timer_handler_base_t handler 
)
protectedpure virtualnoexcept

◆ enqueue()

virtual void rotor::supervisor_t::enqueue ( message_ptr_t  message)
pure virtualnoexcept

enqueues messages thread safe way and triggers processing

This is the only method for deliver message outside of rotor context. Basically it is put and process in the event loop context.

The thread-safety should be guaranteed by derived class and/or used event-loop.

This method is used for messaging between supervisors with different localities, event loops or threads.

Implemented in rotor::asio::supervisor_asio_t, rotor::ev::supervisor_ev_t, rotor::fltk::supervisor_fltk_t, rotor::thread::supervisor_thread_t, and rotor::wx::supervisor_wx_t.

◆ intercept()

virtual void rotor::supervisor_t::intercept ( message_ptr_t message,
const void *  tag,
const continuation_t continuation 
)
protectedvirtualnoexcept

intercepts message delivery for the tagged handler

Reimplemented in rotor::thread::supervisor_thread_t.

◆ make_address()

virtual address_ptr_t rotor::supervisor_t::make_address ( )
virtualnoexcept

creates new address_t linked with the supervisor

Reimplemented in rotor::asio::supervisor_asio_t.

◆ on_start()

void rotor::supervisor_t::on_start ( )
overridevirtualnoexcept

actor is fully initialized and it's supervisor has sent signal to start

The actor state is set to OPERATIONAL.

Reimplemented from rotor::actor_base_t.

◆ put()

void rotor::supervisor_t::put ( message_ptr_t  message)
inline

puts a message into internal supervisor queue for further processing

This is thread-unsafe method. The enqueue method should be used to put a new message from external context in thread-safe way.

◆ shutdown()

virtual void rotor::supervisor_t::shutdown ( )
pure virtualnoexcept

thread-safe version of do_shutdown, i.e. send shutdown request let it be processed by the supervisor

Implemented in rotor::asio::supervisor_asio_t, rotor::ev::supervisor_ev_t, rotor::fltk::supervisor_fltk_t, rotor::thread::supervisor_thread_t, and rotor::wx::supervisor_wx_t.

◆ shutdown_finish()

void rotor::supervisor_t::shutdown_finish ( )
overridevirtualnoexcept

finalizes shutdown

The shutdown response is sent and actor state is set to SHUT_DOWN.

This is the last action in the shutdown sequence. No further methods will be invoked on the actor.

All unfinished requests and untriggered timers will be cancelled by force in the method.

Reimplemented from rotor::actor_base_t.

◆ spawn()

spawner_t rotor::supervisor_t::spawn ( factory_t  )
noexcept

returns an actor spawner

Spawner allows to create a new actor instance, when the current actor instance is down. Different policies (reactions) can be applied.

◆ start()

virtual void rotor::supervisor_t::start ( )
pure virtualnoexcept

thread-safe version of do_process

Starts supervisor to processing messages queue in safe thread/loop context. Once it becomes empty, the method returns

Implemented in rotor::asio::supervisor_asio_t, rotor::ev::supervisor_ev_t, rotor::fltk::supervisor_fltk_t, rotor::thread::supervisor_thread_t, and rotor::wx::supervisor_wx_t.

◆ subscribe()

subscription_info_ptr_t rotor::supervisor_t::subscribe ( const handler_ptr_t handler,
const address_ptr_t addr,
const actor_base_t owner_ptr,
owner_tag_t  owner_tag 
)
noexcept

main subscription implementation

The subscription point is materialized into subscription info. If address is internal/local, then it is immediately confirmed to the source actor as payload::subscription_confirmation_t.

Otherwise, if the address is external (foreign), then subscription request is forwarded to appropiate supervisor as payload::external_subscription_t request.

The materialized subscription info is returned.


The documentation for this struct was generated from the following file: