6 #include <shared_mutex>
8 #include <spdlog/spdlog.h>
10 #if __has_include("cpu_timer.hpp")
11 #include "cpu_timer.hpp"
13 static std::chrono::nanoseconds thread_cpu_time() {
17 #include "concurrentqueue/blockingconcurrentqueue.hpp"
18 #include "managed_thread.hpp"
19 #include "phonebook.hpp"
20 #include "record_logger.hpp"
24 using plugin_id_t = std::size_t;
30 "switchboard_callback",
32 {
"plugin_id",
typeid(plugin_id_t)},
33 {
"topic_name",
typeid(std::string)},
34 {
"iteration_no",
typeid(std::size_t)},
35 {
"cpu_time_start",
typeid(std::chrono::nanoseconds)},
36 {
"cpu_time_stop",
typeid(std::chrono::nanoseconds)},
37 {
"wall_time_start",
typeid(std::chrono::high_resolution_clock::time_point)},
38 {
"wall_time_stop",
typeid(std::chrono::high_resolution_clock::time_point)},
46 {
"plugin_id",
typeid(plugin_id_t)},
47 {
"topic_name",
typeid(std::string)},
48 {
"enqueued",
typeid(std::size_t)},
49 {
"dequeued",
typeid(std::size_t)},
50 {
"idle_cycles",
typeid(std::size_t)},
102 template<
typename specific_event>
103 using ptr = std::shared_ptr<specific_event>;
118 virtual ~
event() =
default;
131 template<
typename underlying_type>
134 underlying_type underlying_data;
140 : underlying_data{underlying_data_} { }
142 operator underlying_type()
const {
143 return underlying_data;
146 underlying_type& operator*() {
147 return underlying_data;
150 const underlying_type& operator*()
const {
151 return underlying_data;
164 class topic_subscription {
166 const std::string& _m_topic_name;
167 plugin_id_t _m_plugin_id;
169 const std::shared_ptr<record_logger> _m_record_logger;
171 moodycamel::BlockingConcurrentQueue<ptr<const event>> _m_queue{8 };
172 moodycamel::ConsumerToken _m_ctok{_m_queue};
173 static constexpr std::chrono::milliseconds _m_queue_timeout{100};
174 std::size_t _m_enqueued{0};
175 std::size_t _m_dequeued{0};
176 std::size_t _m_idle_cycles{0};
180 managed_thread _m_thread;
182 void thread_on_start() {
184 spdlog::get(
"illixr")->set_pattern(
"[%Y-%m-%d %H:%M:%S.%e] [%n] [%^%l%$] [switchboard] thread %t %v");
185 spdlog::get(
"illixr")->debug(
"start");
186 spdlog::get(
"illixr")->set_pattern(
"%+");
192 ptr<const event> this_event;
193 std::int64_t timeout_usecs = std::chrono::duration_cast<std::chrono::microseconds>(_m_queue_timeout).count();
195 if (_m_queue.wait_dequeue_timed(_m_ctok, this_event, timeout_usecs)) {
199 auto cb_start_cpu_time = thread_cpu_time();
200 auto cb_start_wall_time = std::chrono::high_resolution_clock::now();
203 _m_callback(std::move(this_event), _m_dequeued);
212 {cb_start_wall_time},
213 {std::chrono::high_resolution_clock::now()},
222 void thread_on_stop() {
224 std::size_t unprocessed = _m_enqueued - _m_dequeued;
226 ptr<const event> this_event;
227 for (std::size_t i = 0; i < unprocessed; ++i) {
228 [[maybe_unused]]
bool ret = _m_queue.try_dequeue(_m_ctok, this_event);
237 if (_m_record_logger) {
250 topic_subscription(
const std::string& topic_name, plugin_id_t plugin_id,
251 std::function<
void(ptr<const event>&&, std::size_t)> callback,
252 std::shared_ptr<record_logger> record_logger_)
253 : _m_topic_name{topic_name}
254 , _m_plugin_id{plugin_id}
255 , _m_callback{callback}
256 , _m_record_logger{record_logger_}
257 , _m_cb_log{record_logger_}
262 this->thread_on_start();
265 this->thread_on_stop();
275 void enqueue(ptr<const event>&& this_event) {
276 if (_m_thread.get_state() == managed_thread::state::running) {
277 [[maybe_unused]]
bool ret = _m_queue.enqueue(std::move(this_event));
286 moodycamel::BlockingConcurrentQueue<ptr<const event>> _m_queue{8 };
287 moodycamel::ConsumerToken _m_ctok{_m_queue};
288 std::atomic<size_t> _m_queue_size{0};
293 spdlog::get(
"illixr")->info(
"[switchboard] topic buffer created");
297 void enqueue(ptr<const event>&& this_event) {
299 [[maybe_unused]]
bool ret = _m_queue.enqueue(std::move(this_event));
303 size_t size()
const {
304 return _m_queue_size;
307 ptr<const event> dequeue() {
308 ptr<const event> obj;
310 _m_queue.wait_dequeue(_m_ctok, obj);
331 const std::string _m_name;
332 const std::type_info& _m_ty;
333 const std::shared_ptr<record_logger> _m_record_logger;
334 std::atomic<size_t> _m_latest_index;
335 static constexpr std::size_t _m_latest_buffer_size = 256;
336 std::array<ptr<const event>, _m_latest_buffer_size> _m_latest_buffer;
337 std::list<topic_subscription> _m_subscriptions;
338 std::list<topic_buffer> _m_buffers;
339 std::shared_mutex _m_subscriptions_lock;
342 topic(std::string name,
const std::type_info& ty, std::shared_ptr<record_logger> record_logger_)
345 , _m_record_logger{record_logger_}
346 , _m_latest_index{0} { }
348 const std::string& name() {
352 const std::type_info& ty() {
359 ptr<const event> get()
const {
360 size_t idx = _m_latest_index.load() % _m_latest_buffer_size;
361 ptr<const event> this_event = _m_latest_buffer[idx];
386 void put(ptr<const event>&& this_event) {
387 assert(this_event !=
nullptr);
388 assert(this_event.unique() ||
389 this_event.use_count() <= 2);
392 size_t index = (_m_latest_index.load() + 1) % _m_latest_buffer_size;
393 _m_latest_buffer[index] = this_event;
398 std::unique_lock lock{_m_subscriptions_lock};
399 for (topic_subscription& ts : _m_subscriptions) {
402 ptr<const event> event_ptr_copy{this_event};
403 ts.enqueue(std::move(event_ptr_copy));
406 for (topic_buffer& ts : _m_buffers) {
409 ptr<const event> event_ptr_copy{this_event};
410 ts.enqueue(std::move(event_ptr_copy));
421 void schedule(plugin_id_t plugin_id, std::function<
void(ptr<const event>&&, std::size_t)> callback) {
424 const std::unique_lock lock{_m_subscriptions_lock};
425 _m_subscriptions.emplace_back(_m_name, plugin_id, callback, _m_record_logger);
428 topic_buffer& get_buffer() {
429 const std::unique_lock lock{_m_subscriptions_lock};
430 _m_buffers.emplace_back();
431 return _m_buffers.back();
442 const std::unique_lock lock{_m_subscriptions_lock};
443 _m_subscriptions.clear();
451 template<
typename specific_event>
461 if (
typeid(specific_event) != _m_topic.ty()) {
462 spdlog::get(
"illixr")->error(
"[switchboard] topic '{}' holds type {}, but caller used type {}", _m_topic.name(),
463 _m_topic.ty().name(),
typeid(specific_event).name());
478 if (this_event !=
nullptr) {
479 assert(this_specific_event );
480 return this_specific_event;
493 if (this_specific_event !=
nullptr) {
494 return this_specific_event;
497 throw std::runtime_error(
"No event on topic");
512 return std::make_shared<specific_event>(*this_specific_event);
516 template<
typename specific_event>
517 class buffered_reader {
520 size_t serial_no = 0;
524 buffered_reader(topic& topic)
526 , _m_tb{_m_topic.get_buffer()} { }
528 size_t size()
const {
532 ptr<const specific_event> dequeue() {
536 ptr<const event> this_event = _m_tb.dequeue();
537 ptr<const specific_event> this_specific_event = std::dynamic_pointer_cast<const specific_event>(this_event);
538 return this_specific_event;
545 template<
typename specific_event>
553 : _m_topic{topic_} { }
566 template<
class... Args>
568 return std::make_shared<specific_event>(std::forward<Args>(args)...);
575 assert(
typeid(specific_event) == _m_topic.ty());
576 assert(this_specific_event !=
nullptr);
577 assert(this_specific_event.unique());
579 std::const_pointer_cast<const event>(std::static_pointer_cast<event>(std::move(this_specific_event)));
580 assert(this_event.unique() ||
581 this_event.use_count() <= 2);
582 _m_topic.put(std::move(this_event));
587 std::unordered_map<std::string, topic> _m_registry;
588 std::shared_mutex _m_registry_lock;
589 std::shared_ptr<record_logger> _m_record_logger;
591 template<
typename specific_event>
592 topic& try_register_topic(
const std::string& topic_name) {
594 const std::shared_lock lock{_m_registry_lock};
595 auto found = _m_registry.find(topic_name);
596 if (found != _m_registry.end()) {
597 topic& topic_ = found->second;
599 if (
typeid(specific_event) != topic_.ty()) {
600 spdlog::get(
"illixr")->error(
"[switchboard] topic '{}' holds type {}, but caller used type {}", topic_name,
601 topic_.ty().name(),
typeid(specific_event).name());
610 spdlog::get(
"illixr")->debug(
"[switchboard] Creating: {} for {}", topic_name,
typeid(specific_event).name());
613 const std::unique_lock lock{_m_registry_lock};
614 return _m_registry.try_emplace(topic_name, topic_name,
typeid(specific_event), _m_record_logger).first->second;
622 : _m_record_logger{pb ? pb->lookup_impl<
record_logger>() : nullptr} { }
633 template<
typename specific_event>
634 void schedule(plugin_id_t plugin_id, std::string topic_name,
636 try_register_topic<specific_event>(topic_name)
637 .schedule(plugin_id, [=](
ptr<const event>&& this_event, std::size_t it_no) {
640 std::dynamic_pointer_cast<const specific_event>(std::move(this_event));
641 assert(this_specific_event);
642 fn(std::move(this_specific_event), it_no);
653 template<
typename specific_event>
665 template<
typename specific_event>
670 template<
typename specific_event>
671 buffered_reader<specific_event> get_buffered_reader(
const std::string& topic_name) {
672 return buffered_reader<specific_event>{try_register_topic<specific_event>(topic_name)};
683 const std::shared_lock lock{_m_registry_lock};
684 for (
auto& pair : _m_registry) {
A 'service' that can be registered in the phonebook.
Definition: phonebook.hpp:86
A service locator for ILLIXR.
Definition: phonebook.hpp:68
Coalesces logs of the same type to be written back as a single-transaction.
Definition: record_logger.hpp:307
void log(const record &r)
Appends a log to the buffer, which will eventually be written.
Definition: record_logger.hpp:325
The ILLIXR logging service for structured records.
Definition: record_logger.hpp:226
Helper class for making event types.
Definition: switchboard.hpp:132
Virtual class for event types.
Definition: switchboard.hpp:116
A handle which can read the latest event on a topic.
Definition: switchboard.hpp:452
ptr< const specific_event > get_ro() const
Gets a non-null "read-only" copy of the latest value.
Definition: switchboard.hpp:491
ptr< specific_event > get_rw() const
Gets a non-null mutable copy of the latest value.
Definition: switchboard.hpp:506
ptr< const specific_event > get_ro_nullable() const noexcept
Gets a "read-only" copy of the latest value.
Definition: switchboard.hpp:474
A handle which can publish events to a topic.
Definition: switchboard.hpp:546
void put(ptr< specific_event > &&this_specific_event)
Publish ev to this topic.
Definition: switchboard.hpp:574
ptr< specific_event > allocate(Args &&... args)
Like new/malloc but more efficient for this specific case.
Definition: switchboard.hpp:567
A manager for typesafe, threadsafe, named event-streams (called topics).
Definition: switchboard.hpp:94
std::shared_ptr< specific_event > ptr
The type of shared pointer returned by switchboard.
Definition: switchboard.hpp:103
writer< specific_event > get_writer(const std::string &topic_name)
Gets a handle to publish to the topic topic_name.
Definition: switchboard.hpp:654
void stop()
Stops calling switchboard callbacks.
Definition: switchboard.hpp:682
switchboard(const phonebook *pb)
Definition: switchboard.hpp:621
void schedule(plugin_id_t plugin_id, std::string topic_name, std::function< void(ptr< const specific_event > &&, std::size_t)> fn)
Schedules the callback fn every time an event is published to topic_name.
Definition: switchboard.hpp:634
reader< specific_event > get_reader(const std::string &topic_name)
Gets a handle to read to the latest value from the topic topic_name.
Definition: switchboard.hpp:666
RAC_ERRNO_MSG.
Definition: data_format.hpp:15
void abort(const std::string &msg="", [[maybe_unused]] const int error_val=1)
Exits the application during a fatal error.
Definition: error_util.hpp:61
const record_header __switchboard_callback_header
Definition: switchboard.hpp:29
const record_header __switchboard_topic_stop_header
Definition: switchboard.hpp:44