rotor
Event loop friendly C++ actor micro-framework
Loading...
Searching...
No Matches
supervisor.h
1#pragma once
2
3//
4// Copyright (c) 2019-2025 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
5//
6// Distributed under the MIT Software License
7//
8
9#include "actor_base.h"
10#include "handler.h"
11#include "message.h"
12#include "subscription.h"
13#include "system_context.h"
14#include "supervisor_config.h"
15#include "address_mapping.h"
16#include "error_code.h"
17#include "spawner.h"
18
19#include <unordered_map>
20#include <unordered_set>
21
22#include <boost/lockfree/queue.hpp>
23
24#if defined(_MSC_VER)
25#pragma warning(push)
26#pragma warning(disable : 4251)
27#endif
28
29namespace rotor {
30
68struct ROTOR_API supervisor_t : public actor_base_t {
69
70 // clang-format off
77 using plugins_list_t = std::tuple<
90 // clang-format on
91
94
96 template <typename Supervisor> using config_builder_t = supervisor_config_builder_t<Supervisor>;
97
100 supervisor_t(const supervisor_t &) = delete;
101 supervisor_t(supervisor_t &&) = delete;
103
104 virtual void do_initialize(system_context_t *ctx) noexcept override;
105
130 inline size_t do_process() noexcept { return locality_leader->delivery->process(); }
131
133 virtual address_ptr_t make_address() noexcept;
134
138 virtual void commit_unsubscription(const subscription_info_ptr_t &info) noexcept;
139
145 virtual void start() noexcept = 0;
146
147 void on_start() noexcept override;
148
151 virtual void shutdown() noexcept = 0;
152
153 void do_shutdown(const extended_error_ptr_t &reason = {}) noexcept override;
154
155 void shutdown_finish() noexcept override;
156
158 virtual void on_child_init(actor_base_t *actor, const extended_error_ptr_t &ec) noexcept;
159
161 virtual void on_child_shutdown(actor_base_t *actor) noexcept;
162
174 virtual void enqueue(message_ptr_t message) noexcept = 0;
175
182 inline void put(message_ptr_t message) { locality_leader->queue.emplace_back(std::move(message)); }
183
185 template <typename Handler> void subscribe(actor_base_t &actor, Handler &&handler) {
186 supervisor->subscribe(actor.address, wrap_handler(actor, std::move(handler)));
187 }
188
190 template <typename Handler> inline void unsubscribe_actor(const address_ptr_t &addr, Handler &&handler) noexcept {
191 handler_ptr_t wrapped_handler(std::forward<Handler>(handler));
192 lifetime->unsubscribe(wrapped_handler, addr);
193 }
194
196 template <typename Actor> auto create_actor() {
197 using builder_t = typename Actor::template config_builder_t<Actor>;
198 assert(manager && "child_manager_plugin_t should be already initialized");
199 return builder_t([this](auto &actor) { manager->create_child(actor); }, this);
200 }
201
206 */
207 template <typename T, typename... Args>
208 request_builder_t<T> do_request(actor_base_t &actor, const address_ptr_t &dest_addr, const address_ptr_t &reply_to,
209 Args &&...args) noexcept {
210 return request_builder_t<T>(*this, actor, dest_addr, reply_to, std::forward<Args>(args)...);
211 }
212
227 const actor_base_t *owner_ptr, owner_tag_t owner_tag) noexcept;
228
236
238
240 inline const address_ptr_t &get_registry_address() const noexcept { return registry_address; }
241
243 template <typename T> auto &access() noexcept;
244
246 template <typename T, typename... Args> auto access(Args... args) noexcept;
247
249 using inbound_queue_t = boost::lockfree::queue<message_base_t *>;
250
251 protected:
253 virtual address_ptr_t instantiate_address(const void *locality) noexcept;
254
256 using request_map_t = std::unordered_map<request_id_t, request_curry_t>;
257
259 void on_request_trigger(request_id_t timer_id, bool cancelled) noexcept;
260
262 virtual void do_start_timer(const pt::time_duration &interval, timer_handler_base_t &handler) noexcept = 0;
263
265 virtual void do_cancel_timer(request_id_t timer_id) noexcept = 0;
266
268 virtual void intercept(message_ptr_t &message, const void *tag, const continuation_t &continuation) noexcept;
269
272
275
278
281
284
287
289 plugin::delivery_plugin_base_t *delivery = nullptr;
290
292 plugin::child_manager_plugin_t *manager = nullptr;
293
296
299
302
304 pt::time_duration poll_duration;
305
307 const std::atomic_bool *shutdown_flag = nullptr;
308
310 pt::time_duration shutdown_poll_frequency = pt::millisec{100};
311
312 private:
313 using actors_set_t = std::unordered_set<const actor_base_t *>;
314
315 bool create_registry;
316 bool synchronize_start;
317 address_ptr_t registry_address;
318 actors_set_t alive_actors;
319
320 supervisor_policy_t policy;
321
323 address_mapping_t address_mapping;
324
325 template <typename T> friend struct request_builder_t;
326 template <typename Supervisor> friend struct actor_config_builder_t;
327 friend struct plugin::delivery_plugin_base_t;
328 friend struct actor_base_t;
329 template <typename T> friend struct plugin::delivery_plugin_t;
330
331 void discard_request(request_id_t request_id) noexcept;
332 void uplift_last_message() noexcept;
333
334 void on_shutdown_check_timer(request_id_t, bool cancelled) noexcept;
335
336 inline request_id_t next_request_id() noexcept {
337 AGAIN:
338 auto &map = locality_leader->request_map;
339 auto it = map.find(++locality_leader->last_req_id);
340 if (it != map.end()) {
341 goto AGAIN;
342 }
343 return locality_leader->last_req_id;
344 }
345};
346
348
349/* third-party classes implementations */
350
351template <typename Supervisor> auto system_context_t::create_supervisor() {
352 using builder_t = typename Supervisor::template config_builder_t<Supervisor>;
353 return builder_t(
354 [this](auto &actor) {
355 if (supervisor) {
356 auto ec = make_error_code(error_code_t::supervisor_defined);
357 on_error(actor.get(), make_error(identity(), ec));
358 actor.reset();
359 } else {
360 this->supervisor = actor;
361 actor->do_initialize(this);
362 }
363 },
364 *this);
365}
366
367template <typename M, typename... Args> void actor_base_t::send(const address_ptr_t &addr, Args &&...args) {
368 supervisor->put(make_message<M>(addr, std::forward<Args>(args)...));
369}
370
371template <typename M, typename... Args>
372void actor_base_t::route(const address_ptr_t &addr, const address_ptr_t &next_addr, Args &&...args) {
373 supervisor->put(make_routed_message<M>(addr, next_addr, std::forward<Args>(args)...));
374}
375
376template <typename M> void redirect(M message, const address_ptr_t &addr, const address_ptr_t &next_addr) {}
377
378template <typename Delegate, typename Method>
379void actor_base_t::start_timer(request_id_t request_id, const pt::time_duration &interval, Delegate &delegate,
380 Method method) noexcept {
381 using final_handler_t = timer_handler_t<Delegate, Method>;
382 auto handler = std::make_unique<final_handler_t>(this, request_id, &delegate, std::forward<Method>(method));
383 supervisor->do_start_timer(interval, *handler);
384 timers_map.emplace(request_id, std::move(handler));
385}
386
387template <typename Delegate, typename Method, typename>
388request_id_t actor_base_t::start_timer(const pt::time_duration &interval, Delegate &delegate, Method method) noexcept {
389 auto request_id = supervisor->next_request_id();
390 start_timer(request_id, interval, delegate, std::forward<Method>(method));
391 return request_id;
392}
393
395template <typename Handler> handler_ptr_t wrap_handler(actor_base_t &actor, Handler &&handler) {
396 using final_handler_t = handler_t<Handler>;
397 auto handler_raw = new final_handler_t(actor, std::move(handler));
398 return handler_ptr_t{handler_raw};
399}
400
401template <typename Handler> subscription_info_ptr_t actor_base_t::subscribe(Handler &&h) noexcept {
402 auto wrapped_handler = wrap_handler(*this, std::move(h));
403 return supervisor->subscribe(wrapped_handler, address, this, owner_tag_t::ANONYMOUS);
404}
405
406template <typename Handler>
408 auto wrapped_handler = wrap_handler(*this, std::move(h));
409 return supervisor->subscribe(wrapped_handler, addr, this, owner_tag_t::ANONYMOUS);
410}
411
412namespace plugin {
413
414template <typename Handler>
416 using final_handler_t = handler_t<Handler>;
417 handler_ptr_t wrapped_handler(new final_handler_t(*this, std::move(h)));
418 auto info = actor->supervisor->subscribe(wrapped_handler, addr, actor, owner_tag_t::PLUGIN);
419 own_subscriptions.emplace_back(info);
420 return info;
421}
422
423template <typename Handler> subscription_info_ptr_t plugin_base_t::subscribe(Handler &&h) noexcept {
424 return subscribe(std::forward<Handler>(h), actor->address);
425}
426
427template <> inline auto &plugin_base_t::access<plugin::starter_plugin_t>() noexcept { return own_subscriptions; }
428
429template <typename Handler> subscription_info_ptr_t starter_plugin_t::subscribe_actor(Handler &&handler) noexcept {
430 auto &address = actor->get_address();
431 return subscribe_actor(std::forward<Handler>(handler), address);
432}
433
434template <typename Handler>
436 auto wrapped_handler = wrap_handler(*actor, std::move(handler));
437 auto info = actor->get_supervisor().subscribe(wrapped_handler, addr, actor, owner_tag_t::PLUGIN);
438 assert(std::count_if(tracked.begin(), tracked.end(), [&](auto &it) { return *it == *info; }) == 0 &&
439 "already subscribed");
440 tracked.emplace_back(info);
441 access<starter_plugin_t>().emplace_back(info);
442 return info;
443}
444
445template <> inline size_t delivery_plugin_t<plugin::local_delivery_t>::process() noexcept {
446 size_t enqueued_messages{0};
447 while (queue->size()) {
448 auto ptr = queue->front().detach();
449 auto message = message_ptr_t(ptr, false);
450 auto &dest = message->address;
451 queue->pop_front();
452 auto internal = dest->same_locality(*address);
453 if (internal) { /* subscriptions are handled by me */
454 auto local_recipients = subscription_map->get_recipients(*message);
455 if (local_recipients) {
457 }
458 if (message->next_route && message->use_count() == 1) {
459 auto sup = static_cast<supervisor_t *>(actor);
460 message->address = std::move(message->next_route);
461 sup->put(std::move(message));
462 }
463 } else {
464 dest->supervisor.enqueue(std::move(message));
465 ++enqueued_messages;
466 }
467 }
468 return enqueued_messages;
469}
470
472 size_t enqueued_messages{0};
473 while (queue->size()) {
474 auto ptr = queue->front().detach();
475 auto message = message_ptr_t(ptr, false);
476 auto &dest = message->address;
477 queue->pop_front();
478 auto internal = dest->same_locality(*address);
479 const subscription_t::joint_handlers_t *local_recipients = nullptr;
480 if (internal) { /* subscriptions are handled by me */
481 local_recipients = subscription_map->get_recipients(*message);
482 if (local_recipients) {
484 }
485 if (ptr->next_route && ptr->use_count() == 1) {
486 auto sup = static_cast<supervisor_t *>(actor);
487 message->address = std::move(message->next_route);
488 sup->put(std::move(message));
489 } else if (!local_recipients) {
491 }
492 } else {
493 dest->supervisor.enqueue(std::move(message));
494 ++enqueued_messages;
495 }
496 }
497 return enqueued_messages;
498}
499
500} // namespace plugin
501
502template <typename Handler, typename Enabled> void actor_base_t::unsubscribe(Handler &&h) noexcept {
503 supervisor->unsubscribe_actor(address, wrap_handler(*this, std::move(h)));
504}
505
506template <typename Handler, typename Enabled>
507void actor_base_t::unsubscribe(Handler &&h, address_ptr_t &addr) noexcept {
508 supervisor->unsubscribe_actor(addr, wrap_handler(*this, std::move(h)));
509}
510
511template <typename T>
512template <typename... Args>
514 const address_ptr_t &reply_to_, Args &&...args)
515 : sup{sup_}, actor{actor_}, request_id{sup.next_request_id()}, destination{destination_}, reply_to{reply_to_},
516 do_install_handler{false} {
517 auto addr = sup.address_mapping.get_mapped_address(actor_, response_message_t::message_type);
518 if (addr) {
519 imaginary_address = addr;
520 } else {
521 // subscribe to imaginary address instead of real one because of
522 // 1. faster dispatching
523 // 2. need to distinguish between "timeout guarded responses" and "responses to own requests"
524 imaginary_address = sup.make_address();
525 do_install_handler = true;
526 }
527 req.reset(
528 new request_message_t{destination, request_id, imaginary_address, reply_to_, std::forward<Args>(args)...});
529}
530
531template <typename T> request_id_t request_builder_t<T>::send(const pt::time_duration &timeout_) noexcept {
532 if (do_install_handler) {
533 install_handler();
534 }
536 sup.request_map.emplace(request_id, request_curry_t{fn, reply_to, req, &actor});
537 sup.put(req);
538 sup.start_timer(request_id, timeout_, sup, &supervisor_t::on_request_trigger);
539 actor.active_requests.emplace(request_id);
540 return request_id;
541}
542
543template <typename T> void request_builder_t<T>::install_handler() noexcept {
544 auto handler = lambda<response_message_t>([supervisor = &sup](response_message_t &msg) {
545 auto request_id = msg.payload.request_id();
546 auto &request_map = supervisor->request_map;
547 auto it = request_map.find(request_id);
548
549 // if a response to request has arrived and no timer can be found
550 // that means that either timeout timer already triggered
551 // and error-message already delivered or response is not expected.
552 // just silently drop it anyway
553 if (it != request_map.end()) {
554 auto &curry = it->second;
555 auto &orig_addr = curry.origin;
556 supervisor->template send<wrapped_res_t>(orig_addr, msg.payload);
557 supervisor->discard_request(request_id);
558 // keep order, i.e. deliver response immediately
559 supervisor->uplift_last_message();
560 }
561 });
562 auto wrapped_handler = wrap_handler(sup, std::move(handler));
563 auto info = sup.subscribe(wrapped_handler, imaginary_address, &actor, owner_tag_t::SUPERVISOR);
564 sup.address_mapping.set(actor, info);
565}
566
572template <typename Request, typename... Args>
573request_builder_t<typename request_wrapper_t<Request>::request_t> actor_base_t::request(const address_ptr_t &dest_addr,
574 Args &&...args) {
575 using request_t = typename request_wrapper_t<Request>::request_t;
576 assert(dest_addr);
577 return supervisor->do_request<request_t>(*this, dest_addr, address, std::forward<Args>(args)...);
578}
579
586template <typename Request, typename... Args>
588actor_base_t::request_via(const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args) {
589 using request_t = typename request_wrapper_t<Request>::request_t;
590 return supervisor->do_request<request_t>(*this, dest_addr, reply_addr, std::forward<Args>(args)...);
591}
592
593template <typename Request> auto actor_base_t::make_response(Request &message, const extended_error_ptr_t &ec) {
594 using payload_t = typename Request::payload_t::request_t;
595 using traits_t = request_traits_t<payload_t>;
596 return traits_t::make_error_response(message.payload.reply_to, message, ec);
597}
598
599template <typename Request, typename... Args> auto actor_base_t::make_response(Request &message, Args &&...args) {
600 using payload_t = typename Request::payload_t::request_t;
601 using req_traits_t = request_traits_t<payload_t>;
602 using response_t = typename req_traits_t::response::wrapped_t;
603 using request_ptr_t = typename req_traits_t::request::message_ptr_t;
604 return make_message<response_t>(message.payload.reply_to, request_ptr_t{&message}, std::forward<Args>(args)...);
605}
606
607template <typename Request, typename... Args> void actor_base_t::reply_to(Request &message, Args &&...args) {
608 supervisor->put(make_response<Request>(message, std::forward<Args>(args)...));
609}
610
611template <typename Request> void actor_base_t::reply_with_error(Request &message, const extended_error_ptr_t &ec) {
613}
614
615template <typename Actor>
617 : install_action{std::move(action_)}, supervisor{supervisor_}, system_context{*supervisor_->context},
618 config{supervisor_} {
619 init_ctor();
620}
621
623 intrusive_ptr_t<Actor> actor_ptr;
624 if (!validate()) {
625 auto ec = make_error_code(error_code_t::actor_misconfigured);
626 system_context.on_error(actor_ptr.get(), make_error(system_context.identity(), ec));
627 } else {
628 auto &cfg = static_cast<typename builder_t::config_t &>(config);
629 auto actor = new Actor(cfg);
630 actor_ptr.reset(actor);
631 install_action(actor_ptr);
632 }
633 return actor_ptr;
634}
635
636} // namespace rotor
637
638#if defined(_MSC_VER)
639#pragma warning(pop)
640#endif
namespace for rotor core messages (which just transform payloads)
Definition messages.hpp:317
Basic namespace for all rotor functionalities.
Definition rotor.hpp:21
intrusive_ptr_t< extended_error_t > extended_error_ptr_t
intrusive pointer to extended error type
Definition extended_error.h:25
ROTOR_API std::error_code make_error_code(const error_code_t e)
makes std::error_code from rotor error code enumerations
Definition error_code.h:78
intrusive_ptr_t< message_base_t > message_ptr_t
intrusive pointer for message
Definition message.h:118
owner_tag_t
who owns the subscription point
Definition subscription_point.h:39
std::function< actor_ptr_t(supervisor_t &, const address_ptr_t &)> factory_t
factory which allows to create actors lazily or on demand
Definition forward.hpp:45
intrusive_ptr_t< handler_base_t > handler_ptr_t
intrusive pointer for handler
Definition forward.hpp:26
constexpr lambda_holder_t< M, F > lambda(F &&fn)
helper function for lambda holder constructing
Definition handler.h:48
intrusive_ptr_t< address_t > address_ptr_t
intrusive pointer for address
Definition address.hpp:57
std::deque< message_ptr_t > messages_queue_t
structure to hold messages (intrusive pointers)
Definition message.h:121
handler_ptr_t wrap_handler(actor_base_t &actor, Handler &&handler)
wraps handler (pointer to member function) and actor address into intrusive pointer
Definition supervisor.h:395
ROTOR_API extended_error_ptr_t make_error(const std::string &context_, const std::error_code &ec_, const extended_error_ptr_t &next_={}, const message_ptr_t &request_={}) noexcept
constructs smart pointer to the extened error
auto make_routed_message(const address_ptr_t &addr, const address_ptr_t &route_addr, Args &&...args) -> message_ptr_t
constructs message by constructing it's payload; after delivery to destination address (to all subscr...
Definition message.h:146
auto make_message(const address_ptr_t &addr, Args &&...args) -> message_ptr_t
constructs message by constructing it's payload; intrusive pointer for the message is returned
Definition message.h:124
boost::intrusive_ptr< T > intrusive_ptr_t
alias for intrusive pointer
Definition arc.hpp:27
intrusive_ptr_t< supervisor_t > supervisor_ptr_t
intrusive pointer for supervisor
Definition forward.hpp:29
std::size_t request_id_t
timer identifier type in the scope of the actor
Definition forward.hpp:34
intrusive_ptr_t< subscription_info_t > subscription_info_ptr_t
intrusive pointer for subscription_info_t
Definition subscription_point.h:121
supervisor_policy_t
how to behave on child actor initialization failures
Definition policy.h:12
universal primitive of concurrent computation
Definition actor_base.h:47
request_id_t start_timer(const pt::time_duration &interval, Delegate &delegate, Method method) noexcept
spawns a new one-shot timer
Definition supervisor.h:388
timers_map_t timers_map
timer-id to timer-handler map
Definition actor_base.h:497
plugin::lifetime_plugin_t * lifetime
non-owning pointer to lifetime plugin
Definition actor_base.h:476
subscription_info_ptr_t subscribe(Handler &&h, const address_ptr_t &addr) noexcept
subscribes actor's handler to process messages on the specified address
Definition supervisor.h:407
auto make_response(Request &message, Args &&...args)
makes response to the request, but does not send it.
Definition supervisor.h:599
void route(const address_ptr_t &addr, const address_ptr_t &next_addr, Args &&...args)
routes message to the destination address, then to the next address
Definition supervisor.h:372
request_builder_t< typename request_wrapper_t< R >::request_t > request_via(const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args)
returns request builder for destination address using the specified address for reply
void send(const address_ptr_t &addr, Args &&...args)
sends message to the destination address
Definition supervisor.h:367
request_builder_t< typename request_wrapper_t< R >::request_t > request(const address_ptr_t &dest_addr, Args &&...args)
returns request builder for destination address using the "main" actor address
void reply_with_error(Request &message, const extended_error_ptr_t &ec)
convenient method for constructing and sending error response to a request
Definition supervisor.h:611
supervisor_t * supervisor
non-owning pointer to actor's execution / infrastructure context
Definition actor_base.h:455
void reply_to(Request &message, Args &&...args)
convenient method for constructing and sending response to a request
Definition supervisor.h:607
address_ptr_t address
actor address
Definition actor_base.h:446
void unsubscribe(Handler &&h, address_ptr_t &addr) noexcept
unsubscribes actor's handler from process messages on the specified address
Definition supervisor.h:507
CRTP actor config builder.
Definition actor_config.h:92
system_context_t & system_context
reference to system_context_t
Definition actor_config.h:125
virtual bool validate() noexcept
checks whether config is valid, i.e. all necessary fields are set
Definition actor_config.h:197
actor_config_builder_t(install_action_t &&action_, supervisor_t *supervisor_)
ctor with install action and raw pointer to supervisor
Definition supervisor.h:616
config_t config
the currently build config
Definition actor_config.h:128
actor_ptr_t finish() &&
constructs actor from the current config
Definition supervisor.h:622
supervisor_t * supervisor
raw pointer to supervisor_t (is null for top-level supervisors)
Definition actor_config.h:122
std::function< void(actor_ptr_t &)> install_action_t
actor post-constructor callback type
Definition actor_config.h:107
install_action_t install_action
post-construction callback
Definition actor_config.h:119
NAT mechanism for rotor
Definition address_mapping.h:33
continue handler invocation (used for intercepting)
Definition handler.h:175
Definition handler.h:224
Base class for rotor message.
Definition message.h:52
create actor's addresses
Definition address_maker.h:24
supervisor's plugin for child-actors housekeeping
Definition child_manager.h:30
base implementation for messages delivery plugin
Definition delivery.h:62
const message_stringifier_t * stringifier
non-owning raw pointer to system's stringifier
Definition delivery.h:80
messages_queue_t * queue
non-owning raw pointer of supervisor's messages queue
Definition delivery.h:71
subscription_t * subscription_map
non-owning raw pointer to supervisor's subscriptions map
Definition delivery.h:77
address_t * address
non-owning raw pointer to supervisor's main address
Definition delivery.h:74
templated message delivery plugin, to allow local message delivery be customized
Definition delivery.h:84
size_t process() noexcept override
main messages dispatcher interface
allows non-local actors to subscribe on the local addresses of a supervisor.
Definition foreigners_support.h:17
manages actors init and shutdown procedures
Definition init_shutdown.h:22
static void discard(message_ptr_t &message, const message_stringifier_t *stringifier) noexcept
dumps discarded message
static void delivery(message_ptr_t &message, const subscription_t::joint_handlers_t &local_recipients, const message_stringifier_t *stringifier) noexcept
delivers the message to the recipients, possibly dumping it to console
manages all actor subscriptions (i.e. from plugins or actor itself).
Definition lifetime.h:21
static void delivery(message_ptr_t &message, const subscription_t::joint_handlers_t &local_recipients) noexcept
delivers an message for self of one of child-actors (non-supervisors)
detects and assigns locality leader to the supervisor
Definition locality.h:22
auto & access() noexcept
generic non-public fields accessor
actor_base_t * actor
non-owning actor pointer
Definition plugin_base.h:179
subscription_info_ptr_t subscribe(Handler &&handler, const address_ptr_t &address) noexcept
subscribes plugin to the custom plugin handler on the specified address
Definition supervisor.h:415
handy access to registry_t, for name registration and discovery
Definition registry.h:35
"lock" for external resources
Definition resources.h:39
allows custom (actor) subscriptions and it is responsible for starting actor when it receives message...
Definition starter.h:19
subscription_info_ptr_t subscribe_actor(Handler &&handler) noexcept
subscribes actor handler on main actor address
Definition supervisor.h:429
builder pattern implementation for the original request
Definition request.hpp:387
request_builder_t(supervisor_t &sup_, actor_base_t &actor_, const address_ptr_t &destination_, const address_ptr_t &reply_to_, Args &&...args)
constructs request message but still does not dispatch it
Definition supervisor.h:513
request_id_t send(const pt::time_duration &send) noexcept
actually dispatches requests and spawns timeout timer
Definition supervisor.h:531
the recorded context, which is needed to produce error response to the original request
Definition request.hpp:301
type helper to deduce request/response messages from original (user-supplied) request type
Definition request.hpp:318
static message_ptr_t make_error_response(const address_ptr_t &reply_to, message_base_t &message, const extended_error_ptr_t &ee) noexcept
helper free function to produce error reply to the original request
Definition request.hpp:373
T request_t
an alias for the original request type
Definition request.hpp:45
allows automatically restart actors
Definition spawner.h:30
pair internal and external handler_t
Definition subscription.h:40
Holds and classifies message handlers on behalf of supervisor.
Definition subscription.h:30
CRTP supervisor config builder.
Definition supervisor_config.h:72
base supervisor config, which holds shutdown timeout value
Definition supervisor_config.h:23
supervisor is responsible for managing actors (workers) lifetime
Definition supervisor.h:68
virtual address_ptr_t instantiate_address(const void *locality) noexcept
creates new address with respect to supervisor locality mark
plugin::delivery_plugin_base_t * delivery
delivery plugin pointer
Definition supervisor.h:289
size_t do_process() noexcept
process queue of messages of locality leader
Definition supervisor.h:130
inbound_queue_t inbound_queue
inbound queue for external messages
Definition supervisor.h:298
const std::atomic_bool * shutdown_flag
when flag is set, the supervisor will shut self down
Definition supervisor.h:307
subscription_t subscription_map
main subscription support class
Definition supervisor.h:283
supervisor_config_builder_t< Supervisor > config_builder_t
injects templated supervisor_config_builder_t
Definition supervisor.h:96
std::unordered_map< request_id_t, request_curry_t > request_map_t
timer to response with timeout procedure type
Definition supervisor.h:256
void do_shutdown(const extended_error_ptr_t &reason={}) noexcept override
convenient method to send actor's supervisor shutdown trigger message
pt::time_duration shutdown_poll_frequency
frequency to check atomic shutdown flag
Definition supervisor.h:310
virtual address_ptr_t make_address() noexcept
creates new address_t linked with the supervisor
virtual void start() noexcept=0
thread-safe version of do_process
const address_ptr_t & get_registry_address() const noexcept
returns registry actor address (if it was defined or registry actor was created)
Definition supervisor.h:240
void subscribe(actor_base_t &actor, Handler &&handler)
templated version of subscribe_actor
Definition supervisor.h:185
void shutdown_finish() noexcept override
finalizes shutdown
void on_request_trigger(request_id_t timer_id, bool cancelled) noexcept
invoked as timer callback; creates response or just clean up for previously set request
virtual void do_start_timer(const pt::time_duration &interval, timer_handler_base_t &handler) noexcept=0
starts non-recurring timer (to be implemented in descendants)
virtual void do_cancel_timer(request_id_t timer_id) noexcept=0
cancels timer (to be implemented in descendants)
supervisor_config_t config_t
injects an alias for supervisor_config_t
Definition supervisor.h:93
virtual void shutdown() noexcept=0
thread-safe version of do_shutdown, i.e. send shutdown request let it be processed by the supervisor
void unsubscribe_actor(const address_ptr_t &addr, Handler &&handler) noexcept
convenient templated version of `unsubscribe_actor
Definition supervisor.h:190
plugin::child_manager_plugin_t * manager
child manager plugin pointer
Definition supervisor.h:292
request_id_t last_req_id
counter for request/timer ids
Definition supervisor.h:277
boost::lockfree::queue< message_base_t * > inbound_queue_t
lock-free queue for inbound messages
Definition supervisor.h:249
auto & access() noexcept
generic non-public fields accessor
auto create_actor()
creates child-actor builder
Definition supervisor.h:196
virtual void on_child_init(actor_base_t *actor, const extended_error_ptr_t &ec) noexcept
supervisor hook for reaction on child actor init
virtual void enqueue(message_ptr_t message) noexcept=0
enqueues messages thread safe way and triggers processing
supervisor_t(supervisor_config_t &config)
constructs new supervisor with optional parent supervisor
virtual void intercept(message_ptr_t &message, const void *tag, const continuation_t &continuation) noexcept
intercepts message delivery for the tagged handler
subscription_info_ptr_t subscribe(const handler_ptr_t &handler, const address_ptr_t &addr, const actor_base_t *owner_ptr, owner_tag_t owner_tag) noexcept
main subscription implementation
pt::time_duration poll_duration
how much time spend in active inbound queue polling
Definition supervisor.h:304
void put(message_ptr_t message)
puts a message into internal supervisor queue for further processing
Definition supervisor.h:182
size_t inbound_queue_size
size of inbound queue
Definition supervisor.h:301
std::tuple< plugin::address_maker_plugin_t, plugin::locality_plugin_t, plugin::delivery_plugin_t< plugin::default_local_delivery_t >, plugin::lifetime_plugin_t, plugin::init_shutdown_plugin_t, plugin::foreigners_support_plugin_t, plugin::child_manager_plugin_t, plugin::link_server_plugin_t, plugin::link_client_plugin_t, plugin::registry_plugin_t, plugin::resources_plugin_t, plugin::starter_plugin_t > plugins_list_t
the default list of plugins for an supervisor
Definition supervisor.h:77
spawner_t spawn(factory_t) noexcept
returns an actor spawner
virtual void on_child_shutdown(actor_base_t *actor) noexcept
supervisor hook for reaction on child actor shutdown
virtual void do_initialize(system_context_t *ctx) noexcept override
early actor initialization (pre-initialization)
request_map_t request_map
timer to response with timeout procedure
Definition supervisor.h:280
request_builder_t< T > do_request(actor_base_t &actor, const address_ptr_t &dest_addr, const address_ptr_t &reply_to, Args &&...args) noexcept
convenient method for request building
Definition supervisor.h:208
system_context_t * context
non-owning pointer to system context.
Definition supervisor.h:271
void on_start() noexcept override
actor is fully initialized and it's supervisor has sent signal to start
supervisor_t * locality_leader
root supervisor for the locality
Definition supervisor.h:295
supervisor_t * parent
non-owning pointer to parent supervisor, NULL for root supervisor
Definition supervisor.h:286
virtual void commit_unsubscription(const subscription_info_ptr_t &info) noexcept
removes the subscription point: local address and (foreign-or-local) handler pair
messages_queue_t queue
queue of unprocessed messages
Definition supervisor.h:274
The system context holds root supervisor_t (intrusive pointer) and may be loop-related details in der...
Definition system_context.h:32
virtual void on_error(actor_base_t *actor, const extended_error_ptr_t &ec) noexcept
fatal error handler
virtual std::string identity() noexcept
identifies the context.
auto create_supervisor()
returns builder for root supervisor
Definition supervisor.h:351
Base class for timer handler.
Definition timer_handler.hpp:17
templated implementation of timer handler
Definition timer_handler.hpp:42