rotor
Event loop friendly C++ actor micro-framework
 
Loading...
Searching...
No Matches
supervisor.h
1#pragma once
2
3//
4// Copyright (c) 2019-2024 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
5//
6// Distributed under the MIT Software License
7//
8
9#include "actor_base.h"
10#include "handler.h"
11#include "message.h"
12#include "subscription.h"
13#include "system_context.h"
14#include "supervisor_config.h"
15#include "address_mapping.h"
16#include "error_code.h"
17#include "spawner.h"
18
19#include <functional>
20#include <unordered_map>
21#include <unordered_set>
22
23#include <boost/lockfree/queue.hpp>
24
25#if defined(_MSC_VER)
26#pragma warning(push)
27#pragma warning(disable : 4251)
28#endif
29
30namespace rotor {
31
69struct ROTOR_API supervisor_t : public actor_base_t {
70
71 // clang-format off
78 using plugins_list_t = std::tuple<
91 // clang-format on
92
95
97 template <typename Supervisor> using config_builder_t = supervisor_config_builder_t<Supervisor>;
98
101 supervisor_t(const supervisor_t &) = delete;
102 supervisor_t(supervisor_t &&) = delete;
104
105 virtual void do_initialize(system_context_t *ctx) noexcept override;
106
131 inline size_t do_process() noexcept { return locality_leader->delivery->process(); }
132
134 virtual address_ptr_t make_address() noexcept;
135
139 virtual void commit_unsubscription(const subscription_info_ptr_t &info) noexcept;
140
146 virtual void start() noexcept = 0;
147
148 void on_start() noexcept override;
149
152 virtual void shutdown() noexcept = 0;
153
154 void do_shutdown(const extended_error_ptr_t &reason = {}) noexcept override;
155
156 void shutdown_finish() noexcept override;
157
159 virtual void on_child_init(actor_base_t *actor, const extended_error_ptr_t &ec) noexcept;
160
162 virtual void on_child_shutdown(actor_base_t *actor) noexcept;
163
175 virtual void enqueue(message_ptr_t message) noexcept = 0;
176
183 inline void put(message_ptr_t message) { locality_leader->queue.emplace_back(std::move(message)); }
184
186 template <typename Handler> void subscribe(actor_base_t &actor, Handler &&handler) {
187 supervisor->subscribe(actor.address, wrap_handler(actor, std::move(handler)));
188 }
189
191 template <typename Handler> inline void unsubscribe_actor(const address_ptr_t &addr, Handler &&handler) noexcept {
192 handler_ptr_t wrapped_handler(std::forward<Handler>(handler));
193 lifetime->unsubscribe(wrapped_handler, addr);
194 }
195
197 template <typename Actor> auto create_actor() {
198 using builder_t = typename Actor::template config_builder_t<Actor>;
199 assert(manager && "child_manager_plugin_t should be already initialized");
200 return builder_t([this](auto &actor) { manager->create_child(actor); }, this);
201 }
202
208 template <typename T, typename... Args>
209 request_builder_t<T> do_request(actor_base_t &actor, const address_ptr_t &dest_addr, const address_ptr_t &reply_to,
210 Args &&...args) noexcept {
211 return request_builder_t<T>(*this, actor, dest_addr, reply_to, std::forward<Args>(args)...);
212 }
228 const actor_base_t *owner_ptr, owner_tag_t owner_tag) noexcept;
229
237
238 using actor_base_t::subscribe;
239
241 inline const address_ptr_t &get_registry_address() const noexcept { return registry_address; }
242
244 template <typename T> auto &access() noexcept;
245
247 template <typename T, typename... Args> auto access(Args... args) noexcept;
248
250 using inbound_queue_t = boost::lockfree::queue<message_base_t *>;
251
252 protected:
254 virtual address_ptr_t instantiate_address(const void *locality) noexcept;
255
257 using request_map_t = std::unordered_map<request_id_t, request_curry_t>;
258
260 void on_request_trigger(request_id_t timer_id, bool cancelled) noexcept;
261
263 virtual void do_start_timer(const pt::time_duration &interval, timer_handler_base_t &handler) noexcept = 0;
264
266 virtual void do_cancel_timer(request_id_t timer_id) noexcept = 0;
267
269 virtual void intercept(message_ptr_t &message, const void *tag, const continuation_t &continuation) noexcept;
270
273
276
278 request_id_t last_req_id;
279
281 request_map_t request_map;
282
284 subscription_t subscription_map;
285
288
290 plugin::delivery_plugin_base_t *delivery = nullptr;
291
293 plugin::child_manager_plugin_t *manager = nullptr;
294
296 supervisor_t *locality_leader;
297
299 inbound_queue_t inbound_queue;
300
302 size_t inbound_queue_size;
303
305 pt::time_duration poll_duration;
306
308 const std::atomic_bool *shutdown_flag = nullptr;
309
311 pt::time_duration shutdown_poll_frequency = pt::millisec{100};
312
313 private:
314 using actors_set_t = std::unordered_set<const actor_base_t *>;
315
316 bool create_registry;
317 bool synchronize_start;
318 address_ptr_t registry_address;
319 actors_set_t alive_actors;
320
321 supervisor_policy_t policy;
322
324 address_mapping_t address_mapping;
325
326 template <typename T> friend struct request_builder_t;
327 template <typename Supervisor> friend struct actor_config_builder_t;
328 friend struct plugin::delivery_plugin_base_t;
329 friend struct actor_base_t;
330 template <typename T> friend struct plugin::delivery_plugin_t;
331
332 void discard_request(request_id_t request_id) noexcept;
333 void uplift_last_message() noexcept;
334
335 void on_shutdown_check_timer(request_id_t, bool cancelled) noexcept;
336
337 inline request_id_t next_request_id() noexcept {
338 AGAIN:
339 auto &map = locality_leader->request_map;
340 auto it = map.find(++locality_leader->last_req_id);
341 if (it != map.end()) {
342 goto AGAIN;
343 }
344 return locality_leader->last_req_id;
345 }
346};
347
348using supervisor_ptr_t = intrusive_ptr_t<supervisor_t>;
349
350/* third-party classes implementations */
351
352template <typename Supervisor> auto system_context_t::create_supervisor() {
353 using builder_t = typename Supervisor::template config_builder_t<Supervisor>;
354 return builder_t(
355 [this](auto &actor) {
356 if (supervisor) {
357 auto ec = make_error_code(error_code_t::supervisor_defined);
358 on_error(actor.get(), make_error(identity(), ec));
359 actor.reset();
360 } else {
361 this->supervisor = actor;
362 actor->do_initialize(this);
363 }
364 },
365 *this);
366}
367
368template <typename M, typename... Args> void actor_base_t::send(const address_ptr_t &addr, Args &&...args) {
369 supervisor->put(make_message<M>(addr, std::forward<Args>(args)...));
370}
371
372template <typename Delegate, typename Method>
373void actor_base_t::start_timer(request_id_t request_id, const pt::time_duration &interval, Delegate &delegate,
374 Method method) noexcept {
375 using final_handler_t = timer_handler_t<Delegate, Method>;
376 auto handler = std::make_unique<final_handler_t>(this, request_id, &delegate, std::forward<Method>(method));
377 supervisor->do_start_timer(interval, *handler);
378 timers_map.emplace(request_id, std::move(handler));
379}
380
381template <typename Delegate, typename Method, typename>
382request_id_t actor_base_t::start_timer(const pt::time_duration &interval, Delegate &delegate, Method method) noexcept {
383 auto request_id = supervisor->next_request_id();
384 start_timer(request_id, interval, delegate, std::forward<Method>(method));
385 return request_id;
386}
387
389template <typename Handler> handler_ptr_t wrap_handler(actor_base_t &actor, Handler &&handler) {
390 using final_handler_t = handler_t<Handler>;
391 auto handler_raw = new final_handler_t(actor, std::move(handler));
392 return handler_ptr_t{handler_raw};
393}
394
395template <typename Handler> subscription_info_ptr_t actor_base_t::subscribe(Handler &&h) noexcept {
396 auto wrapped_handler = wrap_handler(*this, std::move(h));
397 return supervisor->subscribe(wrapped_handler, address, this, owner_tag_t::ANONYMOUS);
398}
399
400template <typename Handler>
402 auto wrapped_handler = wrap_handler(*this, std::move(h));
403 return supervisor->subscribe(wrapped_handler, addr, this, owner_tag_t::ANONYMOUS);
404}
405
406namespace plugin {
407
408template <typename Handler>
410 using final_handler_t = handler_t<Handler>;
411 handler_ptr_t wrapped_handler(new final_handler_t(*this, std::move(h)));
412 auto info = actor->supervisor->subscribe(wrapped_handler, addr, actor, owner_tag_t::PLUGIN);
413 own_subscriptions.emplace_back(info);
414 return info;
415}
416
417template <typename Handler> subscription_info_ptr_t plugin_base_t::subscribe(Handler &&h) noexcept {
418 return subscribe(std::forward<Handler>(h), actor->address);
419}
420
421template <> inline auto &plugin_base_t::access<plugin::starter_plugin_t>() noexcept { return own_subscriptions; }
422
423template <typename Handler> subscription_info_ptr_t starter_plugin_t::subscribe_actor(Handler &&handler) noexcept {
424 auto &address = actor->get_address();
425 return subscribe_actor(std::forward<Handler>(handler), address);
426}
427
428template <typename Handler>
430 auto wrapped_handler = wrap_handler(*actor, std::move(handler));
431 auto info = actor->get_supervisor().subscribe(wrapped_handler, addr, actor, owner_tag_t::PLUGIN);
432 assert(std::count_if(tracked.begin(), tracked.end(), [&](auto &it) { return *it == *info; }) == 0 &&
433 "already subscribed");
434 tracked.emplace_back(info);
435 access<starter_plugin_t>().emplace_back(info);
436 return info;
437}
438
439template <> inline size_t delivery_plugin_t<plugin::local_delivery_t>::process() noexcept {
440 size_t enqueued_messages{0};
441 while (queue->size()) {
442 auto message = message_ptr_t(queue->front().detach(), false);
443 auto &dest = message->address;
444 queue->pop_front();
445 auto internal = dest->same_locality(*address);
446 if (internal) { /* subscriptions are handled by me */
447 auto local_recipients = subscription_map->get_recipients(*message);
448 if (local_recipients) {
449 plugin::local_delivery_t::delivery(message, *local_recipients);
450 }
451 } else {
452 dest->supervisor.enqueue(std::move(message));
453 ++enqueued_messages;
454 }
455 }
456 return enqueued_messages;
457}
458
460 size_t enqueued_messages{0};
461 while (queue->size()) {
462 auto message = message_ptr_t(queue->front().detach(), false);
463 auto &dest = message->address;
464 queue->pop_front();
465 auto internal = dest->same_locality(*address);
466 const subscription_t::joint_handlers_t *local_recipients = nullptr;
467 bool delivery_attempt = false;
468 if (internal) { /* subscriptions are handled by me */
469 local_recipients = subscription_map->get_recipients(*message);
470 delivery_attempt = true;
471 } else {
472 dest->supervisor.enqueue(std::move(message));
473 ++enqueued_messages;
474 }
475 if (local_recipients) {
476 plugin::inspected_local_delivery_t::delivery(message, *local_recipients, stringifier);
477 } else {
478 if (delivery_attempt) {
480 }
481 }
482 }
483 return enqueued_messages;
484}
485
486} // namespace plugin
487
488template <typename Handler, typename Enabled> void actor_base_t::unsubscribe(Handler &&h) noexcept {
489 supervisor->unsubscribe_actor(address, wrap_handler(*this, std::move(h)));
490}
491
492template <typename Handler, typename Enabled>
493void actor_base_t::unsubscribe(Handler &&h, address_ptr_t &addr) noexcept {
494 supervisor->unsubscribe_actor(addr, wrap_handler(*this, std::move(h)));
495}
496
497template <typename T>
498template <typename... Args>
500 const address_ptr_t &reply_to_, Args &&...args)
501 : sup{sup_}, actor{actor_}, request_id{sup.next_request_id()}, destination{destination_}, reply_to{reply_to_},
502 do_install_handler{false} {
503 auto addr = sup.address_mapping.get_mapped_address(actor_, response_message_t::message_type);
504 if (addr) {
505 imaginary_address = addr;
506 } else {
507 // subscribe to imaginary address instead of real one because of
508 // 1. faster dispatching
509 // 2. need to distinguish between "timeout guarded responses" and "responses to own requests"
510 imaginary_address = sup.make_address();
511 do_install_handler = true;
512 }
513 req.reset(
514 new request_message_t{destination, request_id, imaginary_address, reply_to_, std::forward<Args>(args)...});
515}
516
517template <typename T> request_id_t request_builder_t<T>::send(const pt::time_duration &timeout_) noexcept {
518 if (do_install_handler) {
519 install_handler();
520 }
522 sup.request_map.emplace(request_id, request_curry_t{fn, reply_to, req, &actor});
523 sup.put(req);
524 sup.start_timer(request_id, timeout_, sup, &supervisor_t::on_request_trigger);
525 actor.active_requests.emplace(request_id);
526 return request_id;
527}
528
529template <typename T> void request_builder_t<T>::install_handler() noexcept {
530 auto handler = lambda<response_message_t>([supervisor = &sup](response_message_t &msg) {
531 auto request_id = msg.payload.request_id();
532 auto &request_map = supervisor->request_map;
533 auto it = request_map.find(request_id);
534
535 // if a response to request has arrived and no timer can be found
536 // that means that either timeout timer already triggered
537 // and error-message already delivered or response is not expected.
538 // just silently drop it anyway
539 if (it != request_map.end()) {
540 auto &curry = it->second;
541 auto &orig_addr = curry.origin;
542 supervisor->template send<wrapped_res_t>(orig_addr, msg.payload);
543 supervisor->discard_request(request_id);
544 // keep order, i.e. deliver response immediately
545 supervisor->uplift_last_message();
546 }
547 });
548 auto wrapped_handler = wrap_handler(sup, std::move(handler));
549 auto info = sup.subscribe(wrapped_handler, imaginary_address, &actor, owner_tag_t::SUPERVISOR);
550 sup.address_mapping.set(actor, info);
551}
552
558template <typename Request, typename... Args>
560 Args &&...args) {
561 using request_t = typename request_wrapper_t<Request>::request_t;
562 return supervisor->do_request<request_t>(*this, dest_addr, address, std::forward<Args>(args)...);
563}
564
571template <typename Request, typename... Args>
573actor_base_t::request_via(const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args) {
574 using request_t = typename request_wrapper_t<Request>::request_t;
575 return supervisor->do_request<request_t>(*this, dest_addr, reply_addr, std::forward<Args>(args)...);
576}
577
578template <typename Request> auto actor_base_t::make_response(Request &message, const extended_error_ptr_t &ec) {
579 using payload_t = typename Request::payload_t::request_t;
580 using traits_t = request_traits_t<payload_t>;
581 return traits_t::make_error_response(message.payload.reply_to, message, ec);
582}
583
584template <typename Request, typename... Args> auto actor_base_t::make_response(Request &message, Args &&...args) {
585 using payload_t = typename Request::payload_t::request_t;
586 using req_traits_t = request_traits_t<payload_t>;
587 using response_t = typename req_traits_t::response::wrapped_t;
588 using request_ptr_t = typename req_traits_t::request::message_ptr_t;
589 return make_message<response_t>(message.payload.reply_to, request_ptr_t{&message}, std::forward<Args>(args)...);
590}
591
592template <typename Request, typename... Args> void actor_base_t::reply_to(Request &message, Args &&...args) {
593 supervisor->put(make_response<Request>(message, std::forward<Args>(args)...));
594}
595
596template <typename Request> void actor_base_t::reply_with_error(Request &message, const extended_error_ptr_t &ec) {
597 supervisor->put(make_response<Request>(message, ec));
598}
599
600template <typename Actor>
602 : install_action{std::move(action_)}, supervisor{supervisor_}, system_context{*supervisor_->context},
603 config{supervisor_} {
604 init_ctor();
605}
606
608 intrusive_ptr_t<Actor> actor_ptr;
609 if (!validate()) {
610 auto ec = make_error_code(error_code_t::actor_misconfigured);
611 system_context.on_error(actor_ptr.get(), make_error(system_context.identity(), ec));
612 } else {
613 auto &cfg = static_cast<typename builder_t::config_t &>(config);
614 auto actor = new Actor(cfg);
615 actor_ptr.reset(actor);
616 install_action(actor_ptr);
617 }
618 return actor_ptr;
619}
620
621} // namespace rotor
622
623#if defined(_MSC_VER)
624#pragma warning(pop)
625#endif
Basic namespace for all rotor functionalities.
Definition rotor.hpp:21
intrusive_ptr_t< message_base_t > message_ptr_t
intrusive pointer for message
Definition message.h:115
intrusive_ptr_t< supervisor_t > supervisor_ptr_t
intrusive pointer for supervisor
Definition forward.hpp:29
ROTOR_API std::error_code make_error_code(const error_code_t e)
makes std::error_code from rotor error code enumerations
Definition error_code.h:78
intrusive_ptr_t< address_t > address_ptr_t
intrusive pointer for address
Definition address.hpp:57
owner_tag_t
who owns the subscription point
Definition subscription_point.h:40
intrusive_ptr_t< extended_error_t > extended_error_ptr_t
intrusive pointer to extended error type
Definition extended_error.h:25
std::deque< message_ptr_t > messages_queue_t
structure to hold messages (intrusive pointers)
Definition message.h:118
handler_ptr_t wrap_handler(actor_base_t &actor, Handler &&handler)
wraps handler (pointer to member function) and actor address into intrusive pointer
Definition supervisor.h:389
intrusive_ptr_t< handler_base_t > handler_ptr_t
intrusive pointer for handler
Definition forward.hpp:26
std::size_t request_id_t
timer identifier type in the scope of the actor
Definition forward.hpp:34
ROTOR_API extended_error_ptr_t make_error(const std::string &context_, const std::error_code &ec_, const extended_error_ptr_t &next_={}, const message_ptr_t &request_={}) noexcept
constructs smart pointer to the extened error
intrusive_ptr_t< subscription_info_t > subscription_info_ptr_t
intrusive pointer for subscription_info_t
Definition subscription_point.h:127
boost::intrusive_ptr< T > intrusive_ptr_t
alias for intrusive pointer
Definition arc.hpp:27
std::function< actor_ptr_t(supervisor_t &, const address_ptr_t &)> factory_t
factory which allows to create actors lazily or on demand
Definition forward.hpp:45
supervisor_policy_t
how to behave on child actor initialization failures
Definition policy.h:12
universal primitive of concurrent computation
Definition actor_base.h:47
request_id_t start_timer(const pt::time_duration &interval, Delegate &delegate, Method method) noexcept
spawns a new one-shot timer
Definition supervisor.h:382
subscription_info_ptr_t subscribe(Handler &&h, const address_ptr_t &addr) noexcept
subscribes actor's handler to process messages on the specified address
Definition supervisor.h:401
auto make_response(Request &message, Args &&...args)
makes response to the request, but does not send it.
Definition supervisor.h:584
request_builder_t< typename request_wrapper_t< R >::request_t > request_via(const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args)
returns request builder for destination address using the specified address for reply
void send(const address_ptr_t &addr, Args &&...args)
sends message to the destination address
Definition supervisor.h:368
request_builder_t< typename request_wrapper_t< R >::request_t > request(const address_ptr_t &dest_addr, Args &&...args)
returns request builder for destination address using the "main" actor address
void reply_with_error(Request &message, const extended_error_ptr_t &ec)
convenient method for constructing and sending error response to a request
Definition supervisor.h:596
supervisor_t * supervisor
non-owning pointer to actor's execution / infrastructure context
Definition actor_base.h:432
void reply_to(Request &message, Args &&...args)
convenient method for constructing and sending response to a request
Definition supervisor.h:592
address_ptr_t address
actor address
Definition actor_base.h:423
void unsubscribe(Handler &&h, address_ptr_t &addr) noexcept
unsubscribes actor's handler from process messages on the specified address
Definition supervisor.h:493
CRTP actor config builder.
Definition actor_config.h:92
actor_config_builder_t(install_action_t &&action_, supervisor_t *supervisor_)
ctor with install action and raw pointer to supervisor
Definition supervisor.h:601
typename Actor::template config_builder_t< Supervisor > builder_t
final builder class
Definition actor_config.h:94
actor_ptr_t finish() &&
constructs actor from the current config
Definition supervisor.h:607
std::function< void(actor_ptr_t &)> install_action_t
actor post-constructor callback type
Definition actor_config.h:107
NAT mechanism for rotor
Definition address_mapping.h:33
address_ptr_t get_mapped_address(actor_base_t &actor, const void *) noexcept
returns temporal destination address for the actor/message type
continue handler invocation (used for intercepting)
Definition handler.h:175
Definition handler.h:224
Base class for rotor message.
Definition message.h:52
create actor's addresses
Definition address_maker.h:24
supervisor's plugin for child-actors housekeeping
Definition child_manager.h:30
base implementation for messages delivery plugin
Definition delivery.h:62
templated message delivery plugin, to allow local message delivery be customized
Definition delivery.h:84
size_t process() noexcept override
main messages dispatcher interface
allows non-local actors to subscribe on the local addresses of a supervisor.
Definition foreigners_support.h:17
manages actors init and shutdown procedures
Definition init_shutdown.h:22
static void discard(message_ptr_t &message, const message_stringifier_t *stringifier) noexcept
dumps discarded message
static void delivery(message_ptr_t &message, const subscription_t::joint_handlers_t &local_recipients, const message_stringifier_t *stringifier) noexcept
delivers the message to the recipients, possibly dumping it to console
manages all actor subscriptions (i.e. from plugins or actor itself).
Definition lifetime.h:21
static void delivery(message_ptr_t &message, const subscription_t::joint_handlers_t &local_recipients) noexcept
delivers an message for self of one of child-actors (non-supervisors)
detects and assigns locality leader to the supervisor
Definition locality.h:22
subscription_info_ptr_t subscribe(Handler &&handler, const address_ptr_t &address) noexcept
subscribes plugin to the custom plugin handler on the specified address
Definition supervisor.h:409
handy access to registry_t, for name registration and discovery
Definition registry.h:35
"lock" for external resources
Definition resources.h:39
allows custom (actor) subscriptions and it is responsible for starting actor when it receives message...
Definition starter.h:19
subscription_info_ptr_t subscribe_actor(Handler &&handler) noexcept
subscribes actor handler on main actor address
Definition supervisor.h:423
builder pattern implementation for the original request
Definition request.hpp:387
request_builder_t(supervisor_t &sup_, actor_base_t &actor_, const address_ptr_t &destination_, const address_ptr_t &reply_to_, Args &&...args)
constructs request message but still does not dispatch it
Definition supervisor.h:499
request_id_t send(const pt::time_duration &send) noexcept
actually dispatches requests and spawns timeout timer
Definition supervisor.h:517
the recorded context, which is needed to produce error response to the original request
Definition request.hpp:301
type helper to deduce request/response messages from original (user-supplied) request type
Definition request.hpp:318
T request_t
an alias for the original request type
Definition request.hpp:45
allows automatically restart actors
Definition spawner.h:30
pair internal and external handler_t
Definition subscription.h:40
Holds and classifies message handlers on behalf of supervisor.
Definition subscription.h:30
CRTP supervisor config builder.
Definition supervisor_config.h:72
base supervisor config, which holds shutdown timeout value
Definition supervisor_config.h:23
supervisor is responsible for managing actors (workers) lifetime
Definition supervisor.h:69
size_t do_process() noexcept
process queue of messages of locality leader
Definition supervisor.h:131
std::unordered_map< request_id_t, request_curry_t > request_map_t
timer to response with timeout procedure type
Definition supervisor.h:257
virtual address_ptr_t make_address() noexcept
creates new address_t linked with the supervisor
const address_ptr_t & get_registry_address() const noexcept
returns registry actor address (if it was defined or registry actor was created)
Definition supervisor.h:241
void subscribe(actor_base_t &actor, Handler &&handler)
templated version of subscribe_actor
Definition supervisor.h:186
void shutdown_finish() noexcept override
finalizes shutdown
void on_request_trigger(request_id_t timer_id, bool cancelled) noexcept
invoked as timer callback; creates response or just clean up for previously set request
std::tuple< plugin::address_maker_plugin_t, plugin::locality_plugin_t, plugin::delivery_plugin_t< plugin::default_local_delivery_t >, plugin::lifetime_plugin_t, plugin::init_shutdown_plugin_t, plugin::foreigners_support_plugin_t, plugin::child_manager_plugin_t, plugin::link_server_plugin_t, plugin::link_client_plugin_t, plugin::registry_plugin_t, plugin::resources_plugin_t, plugin::starter_plugin_t > plugins_list_t
the default list of plugins for an supervisor
Definition supervisor.h:90
void unsubscribe_actor(const address_ptr_t &addr, Handler &&handler) noexcept
convenient templated version of `unsubscribe_actor
Definition supervisor.h:191
boost::lockfree::queue< message_base_t * > inbound_queue_t
lock-free queue for inbound messages
Definition supervisor.h:250
auto & access() noexcept
generic non-public fields accessor
auto create_actor()
creates child-actor builder
Definition supervisor.h:197
supervisor_t(supervisor_config_t &config)
constructs new supervisor with optional parent supervisor
subscription_info_ptr_t subscribe(const handler_ptr_t &handler, const address_ptr_t &addr, const actor_base_t *owner_ptr, owner_tag_t owner_tag) noexcept
main subscription implementation
void put(message_ptr_t message)
puts a message into internal supervisor queue for further processing
Definition supervisor.h:183
spawner_t spawn(factory_t) noexcept
returns an actor spawner
virtual void do_initialize(system_context_t *ctx) noexcept override
early actor initialization (pre-initialization)
request_builder_t< T > do_request(actor_base_t &actor, const address_ptr_t &dest_addr, const address_ptr_t &reply_to, Args &&...args) noexcept
convenient method for request building
Definition supervisor.h:209
The system context holds root supervisor_t (intrusive pointer) and may be loop-related details in der...
Definition system_context.h:32
virtual void on_error(actor_base_t *actor, const extended_error_ptr_t &ec) noexcept
fatal error handler
virtual std::string identity() noexcept
identifies the context.
auto create_supervisor()
returns builder for root supervisor
Definition supervisor.h:352
Base class for timer handler.
Definition timer_handler.hpp:17
templated implementation of timer handler
Definition timer_handler.hpp:42