Skip to content

File scheduler.cc

File List > framework > scheduler.cc

Go to the documentation of this file

// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "mediapipe/framework/scheduler.h"

#include <memory>
#include <queue>
#include <utility>
#include <vector>

#include "absl/log/absl_check.h"
#include "absl/memory/memory.h"
#include "absl/synchronization/mutex.h"
#include "mediapipe/framework/calculator_graph.h"
#include "mediapipe/framework/executor.h"
#include "mediapipe/framework/port.h"
#include "mediapipe/framework/port/canonical_errors.h"
#include "mediapipe/framework/port/logging.h"
#include "mediapipe/framework/port/ret_check.h"
#include "mediapipe/framework/port/status.h"
#include "mediapipe/framework/timestamp.h"
#include "mediapipe/framework/tool/status_util.h"

namespace mediapipe {

namespace internal {

Scheduler::Scheduler(CalculatorGraph* graph)
    : graph_(graph), shared_(), default_queue_(&shared_) {
  shared_.error_callback =
      std::bind(&CalculatorGraph::RecordError, graph_, std::placeholders::_1);
  default_queue_.SetIdleCallback(std::bind(&Scheduler::QueueIdleStateChanged,
                                           this, std::placeholders::_1));
  scheduler_queues_.push_back(&default_queue_);
}

Scheduler::~Scheduler() {
  {
    absl::MutexLock lock(&state_mutex_);
    if (state_ == STATE_NOT_STARTED) {
      return;
    }
  }
  // If the application does not call WaitUntilDone, we should.
  // WaitUntilDone ensures that all queues are done and will no longer access
  // the scheduler.
  Cancel();
  WaitUntilDone().IgnoreError();
}

void Scheduler::Reset() {
  {
    absl::MutexLock lock(&state_mutex_);
    state_ = STATE_NOT_STARTED;
    graph_input_streams_closed_ = graph_->GraphInputStreamsClosed();
    throttled_graph_input_stream_count_ = 0;
    unthrottle_seq_num_ = 0;
    observed_output_signal_ = false;
  }
  for (auto queue : scheduler_queues_) {
    queue->Reset();
  }
  shared_.stopping = false;
  shared_.has_error = false;
}

void Scheduler::CloseAllSourceNodes() { shared_.stopping = true; }

void Scheduler::SetExecutor(Executor* executor) {
  ABSL_CHECK_EQ(state_, STATE_NOT_STARTED)
      << "SetExecutor must not be called after the scheduler has started";
  default_queue_.SetExecutor(executor);
}

// TODO: Consider renaming this method CreateNonDefaultQueue.
absl::Status Scheduler::SetNonDefaultExecutor(const std::string& name,
                                              Executor* executor) {
  RET_CHECK_EQ(state_, STATE_NOT_STARTED) << "SetNonDefaultExecutor must not "
                                             "be called after the scheduler "
                                             "has started";
  auto inserted = non_default_queues_.emplace(
      name, absl::make_unique<SchedulerQueue>(&shared_));
  RET_CHECK(inserted.second)
      << "SetNonDefaultExecutor must be called only once for the executor \""
      << name << "\"";

  SchedulerQueue* queue = inserted.first->second.get();
  queue->SetIdleCallback(std::bind(&Scheduler::QueueIdleStateChanged, this,
                                   std::placeholders::_1));
  queue->SetExecutor(executor);
  scheduler_queues_.push_back(queue);
  return absl::OkStatus();
}

void Scheduler::SetQueuesRunning(bool running) {
  for (auto queue : scheduler_queues_) {
    queue->SetRunning(running);
  }
}

void Scheduler::SubmitWaitingTasksOnQueues() {
  for (auto queue : scheduler_queues_) {
    queue->SubmitWaitingTasksToExecutor();
  }
}

// Note: state_mutex_ is held when this function is entered or
// exited.
void Scheduler::HandleIdle() {
  if (++handling_idle_ > 1) {
    // Someone is already inside this method.
    // Note: This can happen in the sections below where we unlock the mutex
    // and make more nodes runnable: the nodes can run and become idle again
    // while this method is in progress. In that case, the resulting calls to
    // HandleIdle are ignored, which is ok because the original method will
    // run the loop again.
    VLOG(2) << "HandleIdle: already in progress";
    return;
  }

  while (IsIdle() && (state_ == STATE_RUNNING || state_ == STATE_CANCELLING)) {
    // Remove active sources that are closed.
    CleanupActiveSources();

    // Quit if we have errors, or if there are no more packet sources.
    if (shared_.has_error ||
        (active_sources_.empty() && sources_queue_.empty() &&
         graph_input_streams_closed_)) {
      VLOG(2) << "HandleIdle: quitting";
      Quit();
      break;
    }

    // See if we can schedule the next layer of source nodes.
    if (active_sources_.empty() && !sources_queue_.empty()) {
      VLOG(2) << "HandleIdle: activating sources";
      // Note: TryToScheduleNextSourceLayer unlocks and locks state_mutex_
      // internally.
      bool did_activate = TryToScheduleNextSourceLayer();
      ABSL_CHECK(did_activate || active_sources_.empty());
      continue;
    }

    // See if we can unthrottle some source nodes or graph input streams to
    // break deadlock. If we are still idle and there are active source nodes,
    // they must be throttled.
    if (!active_sources_.empty() || throttled_graph_input_stream_count_ > 0) {
      VLOG(2) << "HandleIdle: unthrottling";
      state_mutex_.Unlock();
      bool did_unthrottle = graph_->UnthrottleSources();
      state_mutex_.Lock();
      if (did_unthrottle) {
        continue;
      }
    }

    // If HandleIdle has been called again, then continue scheduling.
    if (handling_idle_ > 1) {
      handling_idle_ = 1;
      continue;
    }

    // Nothing left to do.
    break;
  }

  handling_idle_ = 0;
}

// Note: state_mutex_ is held when this function is entered or exited.
// Once this function returns, the scheduler may be destructed as soon as
// state_mutex_ is unlocked.
void Scheduler::Quit() {
  // All calls to Calculator::Process() have returned (even if we had an
  // error).
  ABSL_CHECK(state_ == STATE_RUNNING || state_ == STATE_CANCELLING);
  SetQueuesRunning(false);
  shared_.timer.EndRun();

  VLOG(2) << "Signaling scheduler termination";
  // Let other threads know that scheduler terminated.
  state_ = STATE_TERMINATED;
  state_cond_var_.SignalAll();
}

void Scheduler::Start() {
  VLOG(2) << "Starting scheduler";
  shared_.timer.StartRun();
  {
    absl::MutexLock lock(&state_mutex_);
    ABSL_CHECK_EQ(state_, STATE_NOT_STARTED);
    state_ = STATE_RUNNING;
    SetQueuesRunning(true);

    // Get the ball rolling.
    HandleIdle();
  }
  SubmitWaitingTasksOnQueues();
}

void Scheduler::AddApplicationThreadTask(std::function<void()> task) {
  absl::MutexLock lock(&state_mutex_);
  app_thread_tasks_.push_back(std::move(task));
  if (app_thread_tasks_.size() == 1) {
    state_cond_var_.SignalAll();
  }
}

void Scheduler::ThrottledGraphInputStream() {
  absl::MutexLock lock(&state_mutex_);
  ++throttled_graph_input_stream_count_;
}

void Scheduler::UnthrottledGraphInputStream() {
  absl::MutexLock lock(&state_mutex_);
  --throttled_graph_input_stream_count_;
  ++unthrottle_seq_num_;
  state_cond_var_.SignalAll();
}

void Scheduler::WaitUntilGraphInputStreamUnthrottled(
    absl::Mutex* secondary_mutex) {
  // Since we want to support multiple concurrent calls to this method, we
  // cannot use a simple boolean flag like in WaitForObservedOutput: when one
  // invocation sees and erases the flag, it would make it invisible to the
  // others. Instead, we use a sequence number. Each call records the current
  // sequence number before unlocking. If an unthrottle event occurred after
  // that point, the sequence number will differ.
  int seq_num;
  {
    absl::MutexLock lock(&state_mutex_);
    seq_num = unthrottle_seq_num_;
  }
  secondary_mutex->Unlock();
  ApplicationThreadAwait(
      [this, seq_num]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(state_mutex_) {
        return (unthrottle_seq_num_ != seq_num) || state_ == STATE_TERMINATED;
      });
  secondary_mutex->Lock();
}

void Scheduler::EmittedObservedOutput() {
  absl::MutexLock lock(&state_mutex_);
  observed_output_signal_ = true;
  if (waiting_for_observed_output_) {
    state_cond_var_.SignalAll();
  }
}

absl::Status Scheduler::WaitForObservedOutput() {
  bool observed = false;
  ApplicationThreadAwait(
      [this, &observed]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(state_mutex_) {
        observed = observed_output_signal_;
        observed_output_signal_ = false;
        waiting_for_observed_output_ = !observed && state_ != STATE_TERMINATED;
        // Wait until the member waiting_for_observed_output_ becomes false.
        return !waiting_for_observed_output_;
      });
  return observed ? absl::OkStatus() : absl::OutOfRangeError("Graph is done.");
}

absl::Status Scheduler::WaitUntilIdle() {
  RET_CHECK_NE(state_, STATE_NOT_STARTED);
  ApplicationThreadAwait(std::bind(&Scheduler::IsIdle, this));
  return absl::OkStatus();
}

absl::Status Scheduler::WaitUntilDone() {
  RET_CHECK_NE(state_, STATE_NOT_STARTED);
  ApplicationThreadAwait([this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(state_mutex_) {
    return state_ == STATE_TERMINATED;
  });
  return absl::OkStatus();
}

void Scheduler::ApplicationThreadAwait(
    const std::function<bool()>& stop_condition) {
  absl::MutexLock lock(&state_mutex_);
  while (!stop_condition()) {
    if (app_thread_tasks_.empty()) {
      state_cond_var_.Wait(&state_mutex_);
    } else {
      std::function<void()> task = std::move(app_thread_tasks_.front());
      app_thread_tasks_.pop_front();
      state_mutex_.Unlock();
      task();
      state_mutex_.Lock();
    }
  }
}

bool Scheduler::IsIdle() { return non_idle_queue_count_ == 0; }

void Scheduler::AddedPacketToGraphInputStream() {
  if (state_ == STATE_TERMINATED) {
    return;
  }
  absl::MutexLock lock(&state_mutex_);
  // It seems that the only thing it really needs to do is to check if more
  // unthrottling needs to be done.
  HandleIdle();
}

// Note: This may be called while we are already in STATE_TERMINATED.
void Scheduler::ClosedAllGraphInputStreams() {
  absl::MutexLock lock(&state_mutex_);
  graph_input_streams_closed_ = true;
  // This is called to check whether we should quit.
  HandleIdle();
}

// TODO: If the node isn't successfully scheduled, we must properly
// handle the pending calculator context. For example, the caller should dispose
// of the calculator context and put it into a pending calculator context
// container.
void Scheduler::ScheduleNodeIfNotThrottled(
    CalculatorNode* node, CalculatorContext* calculator_context) {
  ABSL_DCHECK(node);
  ABSL_DCHECK(calculator_context);
  if (!graph_->IsNodeThrottled(node->Id())) {
    node->GetSchedulerQueue()->AddNode(node, calculator_context);
  }
}

void Scheduler::ScheduleNodeForOpen(CalculatorNode* node) {
  ABSL_DCHECK(node);
  VLOG(1) << "Scheduling OpenNode of calculator " << node->DebugName();
  node->GetSchedulerQueue()->AddNodeForOpen(node);
}

void Scheduler::ScheduleUnthrottledReadyNodes(
    const std::vector<CalculatorNode*>& nodes_to_schedule) {
  for (CalculatorNode* node : nodes_to_schedule) {
    // Source nodes always reuse the default calculator context because they
    // can't be executed in parallel.
    ABSL_CHECK(node->IsSource());
    CalculatorContext* default_context = node->GetDefaultCalculatorContext();
    node->GetSchedulerQueue()->AddNode(node, default_context);
  }
}

void Scheduler::CleanupActiveSources() {
  // Remove sources from the back of the active sources vector if they have
  // been closed. We only remove from the back because it is cheap to remove
  // elements at the end of a std::vector.
  while (!active_sources_.empty()) {
    CalculatorNode* active_source = active_sources_.back();
    if (active_source->Closed()) {
      active_sources_.pop_back();
    } else {
      break;
    }
  }
}

bool Scheduler::TryToScheduleNextSourceLayer() {
  VLOG(3) << "TryToScheduleNextSourceLayer";

  ABSL_CHECK(active_sources_.empty());
  ABSL_CHECK(!sources_queue_.empty());

  if (!unopened_sources_.empty() &&
      (*unopened_sources_.begin())->source_layer() <
          sources_queue_.top().Node()->source_layer()) {
    // If no graph input streams are open, then there are no packet sources in
    // the graph. It's a deadlock.
    if (graph_input_streams_closed_) {
      graph_->RecordError(absl::UnknownError(
          "Detected a deadlock because source nodes cannot be activated when a "
          "source node at a lower layer is still not opened."));
    }
    return false;
  }

  // contexts[i] stores the CalculatorContext to be used with
  // active_sources_[i].
  std::vector<CalculatorContext*> contexts;
  bool activated_any = false;
  while (!sources_queue_.empty()) {
    CalculatorNode* node = sources_queue_.top().Node();
    // Only add sources with the same layer number.
    if (activated_any &&
        (node->source_layer() != active_sources_.back()->source_layer())) {
      break;
    }
    active_sources_.emplace_back(node);
    contexts.emplace_back(sources_queue_.top().Context());
    activated_any = true;
    sources_queue_.pop();
  }
  if (!activated_any) {
    return false;
  }

  state_mutex_.Unlock();
  // Add all the sources in a layer to the scheduler queue at once to
  // guarantee they are scheduled in a round-robin fashion. Pause the
  // scheduler queue until all the sources have been added.
  SetQueuesRunning(false);
  for (int i = 0; i < (int)active_sources_.size(); ++i) {
    CalculatorNode* node = active_sources_[i];
    node->ActivateNode();
    ScheduleNodeIfNotThrottled(node, contexts[i]);
  }
  SetQueuesRunning(true);
  SubmitWaitingTasksOnQueues();
  state_mutex_.Lock();
  return true;
}

void Scheduler::AddUnopenedSourceNode(CalculatorNode* node) {
  ABSL_CHECK_EQ(state_, STATE_NOT_STARTED)
      << "AddUnopenedSourceNode can only be "
         "called before starting the scheduler";
  unopened_sources_.insert(node);
}

void Scheduler::AddNodeToSourcesQueue(CalculatorNode* node) {
  // Source nodes always reuse the default calculator context because they
  // can't be executed in parallel.
  CalculatorContext* default_context = node->GetDefaultCalculatorContext();
  absl::MutexLock lock(&state_mutex_);
  sources_queue_.push(SchedulerQueue::Item(node, default_context));
  unopened_sources_.erase(node);
}

void Scheduler::AssignNodeToSchedulerQueue(CalculatorNode* node) {
  SchedulerQueue* queue;
  if (!node->Executor().empty()) {
    auto iter = non_default_queues_.find(node->Executor());
    ABSL_CHECK(iter != non_default_queues_.end());
    queue = iter->second.get();
  } else {
    queue = &default_queue_;
  }
  node->SetSchedulerQueue(queue);
}

void Scheduler::QueueIdleStateChanged(bool idle) {
  absl::MutexLock lock(&state_mutex_);
  non_idle_queue_count_ += (idle ? -1 : 1);
  VLOG(2) << "active queues: " << non_idle_queue_count_;
  if (non_idle_queue_count_ == 0) {
    state_cond_var_.SignalAll();
    // Here we need to check if we should activate sources, unthrottle, or
    // quit.
    // Note: when non_idle_queue_count_ == 0, we know that we are the last
    // queue remaining active. However, the application thread may still end
    // up calling HandleIdle, e.g. via the Cancel method, and that call may
    // quit the graph. Therefore, we should not unlock the mutex between
    // decrementing non_idle_queue_count_ and calling HandleIdle.
    HandleIdle();
  }
}

void Scheduler::Pause() {
  absl::MutexLock lock(&state_mutex_);
  if (state_ != STATE_RUNNING) {
    return;
  }
  state_ = STATE_PAUSED;
  SetQueuesRunning(false);
}

void Scheduler::Resume() {
  {
    absl::MutexLock lock(&state_mutex_);
    if (state_ != STATE_PAUSED) {
      return;
    }
    state_ = STATE_RUNNING;
    SetQueuesRunning(true);
    // If HandleIdle was called while graph was paused, it did nothing. So call
    // it now.
    HandleIdle();
  }
  SubmitWaitingTasksOnQueues();
}

void Scheduler::Cancel() {
  {
    absl::MutexLock lock(&state_mutex_);
    if (state_ != STATE_RUNNING && state_ != STATE_PAUSED) {
      return;
    }
    graph_->RecordError(absl::CancelledError());
    if (state_ == STATE_PAUSED) {
      // Keep the scheduler queue running, since we need to exhaust it.
      SetQueuesRunning(true);
    }
    state_ = STATE_CANCELLING;
    // Because we have recorded an error, this will cause the graph to quit.
    HandleIdle();
  }
  SubmitWaitingTasksOnQueues();
}

bool Scheduler::IsPaused() {
  absl::MutexLock lock(&state_mutex_);
  return state_ == STATE_PAUSED;
}

bool Scheduler::IsTerminated() {
  absl::MutexLock lock(&state_mutex_);
  return state_ == STATE_TERMINATED;
}

void Scheduler::CleanupAfterRun() {
  {
    absl::MutexLock lock(&state_mutex_);
    while (!sources_queue_.empty()) {
      sources_queue_.pop();
    }
    ABSL_CHECK(app_thread_tasks_.empty());
  }
  for (auto queue : scheduler_queues_) {
    queue->CleanupAfterRun();
  }
  unopened_sources_.clear();
  active_sources_.clear();
  shared_.has_error = false;
}

internal::SchedulerTimes Scheduler::GetSchedulerTimes() {
  ABSL_CHECK_EQ(state_, STATE_TERMINATED);
  return shared_.timer.GetSchedulerTimes();
}

}  // namespace internal
}  // namespace mediapipe