ILLIXR: Illinois Extended Reality Testbed
switchboard.hpp
1 #pragma once
2 
3 #include <iostream>
4 #include <list>
5 #include <mutex>
6 #include <shared_mutex>
7 #ifndef NDEBUG
8  #include <spdlog/spdlog.h>
9 #endif
10 #if __has_include("cpu_timer.hpp")
11  #include "cpu_timer.hpp"
12 #else
13 static std::chrono::nanoseconds thread_cpu_time() {
14  return {};
15 }
16 #endif
17 #include "concurrentqueue/blockingconcurrentqueue.hpp"
18 #include "managed_thread.hpp"
19 #include "phonebook.hpp"
20 #include "record_logger.hpp"
21 
22 namespace ILLIXR {
23 
24 using plugin_id_t = std::size_t;
25 
30  "switchboard_callback",
31  {
32  {"plugin_id", typeid(plugin_id_t)},
33  {"topic_name", typeid(std::string)},
34  {"iteration_no", typeid(std::size_t)},
35  {"cpu_time_start", typeid(std::chrono::nanoseconds)},
36  {"cpu_time_stop", typeid(std::chrono::nanoseconds)},
37  {"wall_time_start", typeid(std::chrono::high_resolution_clock::time_point)},
38  {"wall_time_stop", typeid(std::chrono::high_resolution_clock::time_point)},
39  }};
40 
44 const record_header __switchboard_topic_stop_header{"switchboard_topic_stop",
45  {
46  {"plugin_id", typeid(plugin_id_t)},
47  {"topic_name", typeid(std::string)},
48  {"enqueued", typeid(std::size_t)},
49  {"dequeued", typeid(std::size_t)},
50  {"idle_cycles", typeid(std::size_t)},
51  }};
52 
95 public:
102  template<typename specific_event>
103  using ptr = std::shared_ptr<specific_event>;
104 
116  class event {
117  public:
118  virtual ~event() = default;
119  };
120 
131  template<typename underlying_type>
132  class event_wrapper : public event {
133  private:
134  underlying_type underlying_data;
135 
136  public:
137  event_wrapper() { }
138 
139  event_wrapper(underlying_type underlying_data_)
140  : underlying_data{underlying_data_} { }
141 
142  operator underlying_type() const {
143  return underlying_data;
144  }
145 
146  underlying_type& operator*() {
147  return underlying_data;
148  }
149 
150  const underlying_type& operator*() const {
151  return underlying_data;
152  }
153  };
154 
155 private:
164  class topic_subscription {
165  private:
166  const std::string& _m_topic_name;
167  plugin_id_t _m_plugin_id;
168  std::function<void(ptr<const event>&&, std::size_t)> _m_callback;
169  const std::shared_ptr<record_logger> _m_record_logger;
170  record_coalescer _m_cb_log;
171  moodycamel::BlockingConcurrentQueue<ptr<const event>> _m_queue{8 /*max size estimate*/};
172  moodycamel::ConsumerToken _m_ctok{_m_queue};
173  static constexpr std::chrono::milliseconds _m_queue_timeout{100};
174  std::size_t _m_enqueued{0};
175  std::size_t _m_dequeued{0};
176  std::size_t _m_idle_cycles{0};
177 
178  // This needs to be last,
179  // so it is destructed before the data it uses.
180  managed_thread _m_thread;
181 
182  void thread_on_start() {
183 #ifndef NDEBUG
184  spdlog::get("illixr")->set_pattern("[%Y-%m-%d %H:%M:%S.%e] [%n] [%^%l%$] [switchboard] thread %t %v");
185  spdlog::get("illixr")->debug("start");
186  spdlog::get("illixr")->set_pattern("%+");
187 #endif
188  }
189 
190  void thread_body() {
191  // Try to pull event off of queue
192  ptr<const event> this_event;
193  std::int64_t timeout_usecs = std::chrono::duration_cast<std::chrono::microseconds>(_m_queue_timeout).count();
194  // Note the use of timed blocking wait
195  if (_m_queue.wait_dequeue_timed(_m_ctok, this_event, timeout_usecs)) {
196  // Process event
197  // Also, record and log the time
198  _m_dequeued++;
199  auto cb_start_cpu_time = thread_cpu_time();
200  auto cb_start_wall_time = std::chrono::high_resolution_clock::now();
201  // std::cerr << "deq " << ptr_to_str(reinterpret_cast<const void*>(this_event.get_ro())) << " " <<
202  // this_event.use_count() << " v\n";
203  _m_callback(std::move(this_event), _m_dequeued);
204  if (_m_cb_log) {
205  _m_cb_log.log(record{__switchboard_callback_header,
206  {
207  {_m_plugin_id},
208  {_m_topic_name},
209  {_m_dequeued},
210  {cb_start_cpu_time},
211  {thread_cpu_time()},
212  {cb_start_wall_time},
213  {std::chrono::high_resolution_clock::now()},
214  }});
215  }
216  } else {
217  // Nothing to do.
218  _m_idle_cycles++;
219  }
220  }
221 
222  void thread_on_stop() {
223  // Drain queue
224  std::size_t unprocessed = _m_enqueued - _m_dequeued;
225  {
226  ptr<const event> this_event;
227  for (std::size_t i = 0; i < unprocessed; ++i) {
228  [[maybe_unused]] bool ret = _m_queue.try_dequeue(_m_ctok, this_event);
229  assert(ret);
230  // std::cerr << "deq (stopping) " << ptr_to_str(reinterpret_cast<const void*>(this_event.get_ro())) << " "
231  // << this_event.use_count() << " v\n";
232  this_event.reset();
233  }
234  }
235 
236  // Log stats
237  if (_m_record_logger) {
238  _m_record_logger->log(record{__switchboard_topic_stop_header,
239  {
240  {_m_plugin_id},
241  {_m_topic_name},
242  {_m_dequeued},
243  {unprocessed},
244  {_m_idle_cycles},
245  }});
246  }
247  }
248 
249  public:
250  topic_subscription(const std::string& topic_name, plugin_id_t plugin_id,
251  std::function<void(ptr<const event>&&, std::size_t)> callback,
252  std::shared_ptr<record_logger> record_logger_)
253  : _m_topic_name{topic_name}
254  , _m_plugin_id{plugin_id}
255  , _m_callback{callback}
256  , _m_record_logger{record_logger_}
257  , _m_cb_log{record_logger_}
258  , _m_thread{[this] {
259  this->thread_body();
260  },
261  [this] {
262  this->thread_on_start();
263  },
264  [this] {
265  this->thread_on_stop();
266  }} {
267  _m_thread.start();
268  }
269 
275  void enqueue(ptr<const event>&& this_event) {
276  if (_m_thread.get_state() == managed_thread::state::running) {
277  [[maybe_unused]] bool ret = _m_queue.enqueue(std::move(this_event));
278  assert(ret);
279  _m_enqueued++;
280  }
281  }
282  };
283 
284  class topic_buffer {
285  private:
286  moodycamel::BlockingConcurrentQueue<ptr<const event>> _m_queue{8 /*max size estimate*/};
287  moodycamel::ConsumerToken _m_ctok{_m_queue};
288  std::atomic<size_t> _m_queue_size{0};
289 
290  public:
291  topic_buffer() {
292 #ifndef NDEBUG
293  spdlog::get("illixr")->info("[switchboard] topic buffer created");
294 #endif
295  }
296 
297  void enqueue(ptr<const event>&& this_event) {
298  _m_queue_size++;
299  [[maybe_unused]] bool ret = _m_queue.enqueue(std::move(this_event));
300  assert(ret);
301  }
302 
303  size_t size() const {
304  return _m_queue_size;
305  }
306 
307  ptr<const event> dequeue() {
308  ptr<const event> obj;
309  _m_queue_size--;
310  _m_queue.wait_dequeue(_m_ctok, obj);
311  return obj;
312  }
313  };
314 
329  class topic {
330  private:
331  const std::string _m_name;
332  const std::type_info& _m_ty;
333  const std::shared_ptr<record_logger> _m_record_logger;
334  std::atomic<size_t> _m_latest_index;
335  static constexpr std::size_t _m_latest_buffer_size = 256;
336  std::array<ptr<const event>, _m_latest_buffer_size> _m_latest_buffer;
337  std::list<topic_subscription> _m_subscriptions;
338  std::list<topic_buffer> _m_buffers;
339  std::shared_mutex _m_subscriptions_lock;
340 
341  public:
342  topic(std::string name, const std::type_info& ty, std::shared_ptr<record_logger> record_logger_)
343  : _m_name{name}
344  , _m_ty{ty}
345  , _m_record_logger{record_logger_}
346  , _m_latest_index{0} { }
347 
348  const std::string& name() {
349  return _m_name;
350  }
351 
352  const std::type_info& ty() {
353  return _m_ty;
354  }
355 
359  ptr<const event> get() const {
360  size_t idx = _m_latest_index.load() % _m_latest_buffer_size;
361  ptr<const event> this_event = _m_latest_buffer[idx];
362  // if (this_event) {
363  // std::cerr << "get " << ptr_to_str(reinterpret_cast<const void*>(this_event.get())) << " " <<
364  // this_event.use_count() << "v \n";
365  // }
366  return this_event;
367  }
368 
386  void put(ptr<const event>&& this_event) {
387  assert(this_event != nullptr);
388  assert(this_event.unique() ||
389  this_event.use_count() <= 2);
390 
391  /* The pointer that this gets exchanged with needs to get dropped. */
392  size_t index = (_m_latest_index.load() + 1) % _m_latest_buffer_size;
393  _m_latest_buffer[index] = this_event;
394  _m_latest_index++;
395 
396  // Read/write on _m_subscriptions.
397  // Must acquire shared state on _m_subscriptions_lock
398  std::unique_lock lock{_m_subscriptions_lock};
399  for (topic_subscription& ts : _m_subscriptions) {
400  // std::cerr << "enq " << ptr_to_str(reinterpret_cast<const void*>(this_event->get())) << " " <<
401  // this_event->use_count() << " ^\n";
402  ptr<const event> event_ptr_copy{this_event};
403  ts.enqueue(std::move(event_ptr_copy));
404  }
405 
406  for (topic_buffer& ts : _m_buffers) {
407  // std::cerr << "enq " << ptr_to_str(reinterpret_cast<const void*>(this_event->get())) << " " <<
408  // this_event->use_count() << " ^\n";
409  ptr<const event> event_ptr_copy{this_event};
410  ts.enqueue(std::move(event_ptr_copy));
411  }
412  // std::cerr << "put done " << ptr_to_str(reinterpret_cast<const void*>(this_event->get())) << " " <<
413  // this_event->use_count() << " (= 1 + len(sub)) \n";
414  }
415 
421  void schedule(plugin_id_t plugin_id, std::function<void(ptr<const event>&&, std::size_t)> callback) {
422  // Write on _m_subscriptions.
423  // Must acquire unique state on _m_subscriptions_lock
424  const std::unique_lock lock{_m_subscriptions_lock};
425  _m_subscriptions.emplace_back(_m_name, plugin_id, callback, _m_record_logger);
426  }
427 
428  topic_buffer& get_buffer() {
429  const std::unique_lock lock{_m_subscriptions_lock};
430  _m_buffers.emplace_back();
431  return _m_buffers.back();
432  }
433 
439  void stop() {
440  // Write on _m_subscriptions.
441  // Must acquire unique state on _m_subscriptions_lock
442  const std::unique_lock lock{_m_subscriptions_lock};
443  _m_subscriptions.clear();
444  }
445  };
446 
447 public:
451  template<typename specific_event>
452  class reader {
453  private:
455  topic& _m_topic;
456 
457  public:
458  reader(topic& topic_)
459  : _m_topic{topic_} {
460 #ifndef NDEBUG
461  if (typeid(specific_event) != _m_topic.ty()) {
462  spdlog::get("illixr")->error("[switchboard] topic '{}' holds type {}, but caller used type {}", _m_topic.name(),
463  _m_topic.ty().name(), typeid(specific_event).name());
464  abort();
465  }
466 #endif
467  }
468 
475  ptr<const event> this_event = _m_topic.get();
476  ptr<const specific_event> this_specific_event = std::dynamic_pointer_cast<const specific_event>(this_event);
477 
478  if (this_event != nullptr) {
479  assert(this_specific_event /* Otherwise, dynamic cast failed; dynamic type information could be wrong*/);
480  return this_specific_event;
481  } else {
482  return ptr<const specific_event>{nullptr};
483  }
484  }
485 
492  ptr<const specific_event> this_specific_event = get_ro_nullable();
493  if (this_specific_event != nullptr) {
494  return this_specific_event;
495  } else {
497  throw std::runtime_error("No event on topic");
498  }
499  }
500 
507  /*
508  This method is currently not more efficient than calling get_ro() and making a copy,
509  but in the future it could be.
510  */
511  ptr<const specific_event> this_specific_event = get_ro();
512  return std::make_shared<specific_event>(*this_specific_event);
513  }
514  };
515 
516  template<typename specific_event>
517  class buffered_reader {
518  private:
519  topic& _m_topic;
520  size_t serial_no = 0;
521  topic_buffer& _m_tb;
522 
523  public:
524  buffered_reader(topic& topic)
525  : _m_topic{topic}
526  , _m_tb{_m_topic.get_buffer()} { }
527 
528  size_t size() const {
529  return _m_tb.size();
530  }
531 
532  ptr<const specific_event> dequeue() {
533  // CPU_TIMER_TIME_EVENT_INFO(true, false, "callback", cpu_timer::make_type_eraser<FrameInfo>("", _m_topic.name(),
534  // serial_no));
535  serial_no++;
536  ptr<const event> this_event = _m_tb.dequeue();
537  ptr<const specific_event> this_specific_event = std::dynamic_pointer_cast<const specific_event>(this_event);
538  return this_specific_event;
539  }
540  };
541 
545  template<typename specific_event>
546  class writer {
547  private:
548  // Reference to the underlying topic
549  topic& _m_topic;
550 
551  public:
552  writer(topic& topic_)
553  : _m_topic{topic_} { }
554 
566  template<class... Args>
567  ptr<specific_event> allocate(Args&&... args) {
568  return std::make_shared<specific_event>(std::forward<Args>(args)...);
569  }
570 
574  void put(ptr<specific_event>&& this_specific_event) {
575  assert(typeid(specific_event) == _m_topic.ty());
576  assert(this_specific_event != nullptr);
577  assert(this_specific_event.unique());
578  ptr<const event> this_event =
579  std::const_pointer_cast<const event>(std::static_pointer_cast<event>(std::move(this_specific_event)));
580  assert(this_event.unique() ||
581  this_event.use_count() <= 2);
582  _m_topic.put(std::move(this_event));
583  }
584  };
585 
586 private:
587  std::unordered_map<std::string, topic> _m_registry;
588  std::shared_mutex _m_registry_lock;
589  std::shared_ptr<record_logger> _m_record_logger;
590 
591  template<typename specific_event>
592  topic& try_register_topic(const std::string& topic_name) {
593  {
594  const std::shared_lock lock{_m_registry_lock};
595  auto found = _m_registry.find(topic_name);
596  if (found != _m_registry.end()) {
597  topic& topic_ = found->second;
598 #ifndef NDEBUG
599  if (typeid(specific_event) != topic_.ty()) {
600  spdlog::get("illixr")->error("[switchboard] topic '{}' holds type {}, but caller used type {}", topic_name,
601  topic_.ty().name(), typeid(specific_event).name());
602  abort();
603  }
604 #endif
605  return topic_;
606  }
607  }
608 
609 #ifndef NDEBUG
610  spdlog::get("illixr")->debug("[switchboard] Creating: {} for {}", topic_name, typeid(specific_event).name());
611 #endif
612  // Topic not found. Need to create it here.
613  const std::unique_lock lock{_m_registry_lock};
614  return _m_registry.try_emplace(topic_name, topic_name, typeid(specific_event), _m_record_logger).first->second;
615  }
616 
617 public:
622  : _m_record_logger{pb ? pb->lookup_impl<record_logger>() : nullptr} { }
623 
633  template<typename specific_event>
634  void schedule(plugin_id_t plugin_id, std::string topic_name,
635  std::function<void(ptr<const specific_event>&&, std::size_t)> fn) {
636  try_register_topic<specific_event>(topic_name)
637  .schedule(plugin_id, [=](ptr<const event>&& this_event, std::size_t it_no) {
638  assert(this_event);
639  ptr<const specific_event> this_specific_event =
640  std::dynamic_pointer_cast<const specific_event>(std::move(this_event));
641  assert(this_specific_event);
642  fn(std::move(this_specific_event), it_no);
643  });
644  }
645 
653  template<typename specific_event>
654  writer<specific_event> get_writer(const std::string& topic_name) {
655  return writer<specific_event>{try_register_topic<specific_event>(topic_name)};
656  }
657 
665  template<typename specific_event>
666  reader<specific_event> get_reader(const std::string& topic_name) {
667  return reader<specific_event>{try_register_topic<specific_event>(topic_name)};
668  }
669 
670  template<typename specific_event>
671  buffered_reader<specific_event> get_buffered_reader(const std::string& topic_name) {
672  return buffered_reader<specific_event>{try_register_topic<specific_event>(topic_name)};
673  }
674 
682  void stop() {
683  const std::shared_lock lock{_m_registry_lock};
684  for (auto& pair : _m_registry) {
685  pair.second.stop();
686  }
687  }
688 };
689 
690 } // namespace ILLIXR
A 'service' that can be registered in the phonebook.
Definition: phonebook.hpp:86
A service locator for ILLIXR.
Definition: phonebook.hpp:68
Coalesces logs of the same type to be written back as a single-transaction.
Definition: record_logger.hpp:307
void log(const record &r)
Appends a log to the buffer, which will eventually be written.
Definition: record_logger.hpp:325
Schema of each record.
Definition: record_logger.hpp:29
The ILLIXR logging service for structured records.
Definition: record_logger.hpp:226
Helper class for making event types.
Definition: switchboard.hpp:132
Virtual class for event types.
Definition: switchboard.hpp:116
A handle which can read the latest event on a topic.
Definition: switchboard.hpp:452
ptr< const specific_event > get_ro() const
Gets a non-null "read-only" copy of the latest value.
Definition: switchboard.hpp:491
ptr< specific_event > get_rw() const
Gets a non-null mutable copy of the latest value.
Definition: switchboard.hpp:506
ptr< const specific_event > get_ro_nullable() const noexcept
Gets a "read-only" copy of the latest value.
Definition: switchboard.hpp:474
A handle which can publish events to a topic.
Definition: switchboard.hpp:546
void put(ptr< specific_event > &&this_specific_event)
Publish ev to this topic.
Definition: switchboard.hpp:574
ptr< specific_event > allocate(Args &&... args)
Like new/malloc but more efficient for this specific case.
Definition: switchboard.hpp:567
A manager for typesafe, threadsafe, named event-streams (called topics).
Definition: switchboard.hpp:94
std::shared_ptr< specific_event > ptr
The type of shared pointer returned by switchboard.
Definition: switchboard.hpp:103
writer< specific_event > get_writer(const std::string &topic_name)
Gets a handle to publish to the topic topic_name.
Definition: switchboard.hpp:654
void stop()
Stops calling switchboard callbacks.
Definition: switchboard.hpp:682
switchboard(const phonebook *pb)
Definition: switchboard.hpp:621
void schedule(plugin_id_t plugin_id, std::string topic_name, std::function< void(ptr< const specific_event > &&, std::size_t)> fn)
Schedules the callback fn every time an event is published to topic_name.
Definition: switchboard.hpp:634
reader< specific_event > get_reader(const std::string &topic_name)
Gets a handle to read to the latest value from the topic topic_name.
Definition: switchboard.hpp:666
RAC_ERRNO_MSG.
Definition: data_format.hpp:15
void abort(const std::string &msg="", [[maybe_unused]] const int error_val=1)
Exits the application during a fatal error.
Definition: error_util.hpp:61
const record_header __switchboard_callback_header
Definition: switchboard.hpp:29
const record_header __switchboard_topic_stop_header
Definition: switchboard.hpp:44