Skip to content

File in_order_output_stream_handler.cc

File List > framework > stream_handler > in_order_output_stream_handler.cc

Go to the documentation of this file

// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "mediapipe/framework/stream_handler/in_order_output_stream_handler.h"

#include "absl/log/absl_check.h"
#include "mediapipe/framework/collection.h"
#include "mediapipe/framework/collection_item_id.h"
#include "mediapipe/framework/output_stream_shard.h"

namespace mediapipe {

REGISTER_OUTPUT_STREAM_HANDLER(InOrderOutputStreamHandler);

void InOrderOutputStreamHandler::PropagationLoop() {
  ABSL_CHECK_EQ(propagation_state_, kIdle);
  Timestamp context_timestamp;
  CalculatorContext* calculator_context;
  if (!calculator_context_manager_->HasActiveContexts()) {
    propagation_state_ = kPropagatingBound;
  } else {
    calculator_context = calculator_context_manager_->GetFrontCalculatorContext(
        &context_timestamp);
    if (!completed_input_timestamps_.empty()) {
      Timestamp completed_timestamp = *completed_input_timestamps_.begin();
      if (context_timestamp != completed_timestamp) {
        ABSL_CHECK_LT(context_timestamp, completed_timestamp);
        return;
      }
      propagation_state_ = kPropagatingPackets;
    }
  }

  while (propagation_state_ != kIdle) {
    if (propagation_state_ == kPropagatingPackets) {
      PropagatePackets(&calculator_context, &context_timestamp);
    } else {
      ABSL_CHECK_EQ(kPropagatingBound, propagation_state_);
      PropagationBound(&calculator_context, &context_timestamp);
    }
  }
}

void InOrderOutputStreamHandler::PropagatePackets(
    CalculatorContext** calculator_context, Timestamp* context_timestamp) {
  timestamp_mutex_.Unlock();
  // Propagates packets without holding timestamp_mutex_.
  PropagateOutputPackets(*context_timestamp, &(*calculator_context)->Outputs());
  calculator_context_manager_->RecycleCalculatorContext();
  timestamp_mutex_.Lock();
  completed_input_timestamps_.erase(completed_input_timestamps_.begin());
  // The first check is for performance reasons (it's cheaper).
  // Note that completed_input_timestamps_ is a subset of the input
  // timestamps of the active contexts. Therefore, the second check
  // covers the first check.
  if (completed_input_timestamps_.empty() &&
      !calculator_context_manager_->HasActiveContexts()) {
    // If task_timestamp_bound_ is not greater than context_timestamp + 1,
    // timestamp propagation isn't necessary since the bound of the
    // downstream input streams has been updated to a larger value
    // already. Timestamp propagation will be skipped, and the
    // propagation process is completed.
    if (task_timestamp_bound_ <= context_timestamp->NextAllowedInStream()) {
      propagation_state_ = kIdle;
      return;
    }
    propagation_state_ = kPropagatingBound;
    return;
  }
  *calculator_context =
      calculator_context_manager_->GetFrontCalculatorContext(context_timestamp);
  if (!completed_input_timestamps_.empty() &&
      *context_timestamp == *completed_input_timestamps_.begin()) {
    // Continues propagating output packets if the smallest completed
    // input timestamp is equal to the input timestamp of the earliest
    // active calculator context.
    return;
  }
  propagation_state_ = kIdle;
}

void InOrderOutputStreamHandler::PropagationBound(
    CalculatorContext** calculator_context, Timestamp* context_timestamp) {
  Timestamp bound_to_propagate = task_timestamp_bound_;
  timestamp_mutex_.Unlock();
  // Timestamp bound propagation without holding timestamp_mutex_.
  TryPropagateTimestampBound(bound_to_propagate);
  timestamp_mutex_.Lock();
  if (propagation_state_ == kPropagatingBound) {
    // There is no invocation completed and no newly arrived timestamp
    // bound during the timestamp bound propagation. So the propagation
    // process is completed.
    propagation_state_ = kIdle;
    return;
  }
  // Some recent changes require the propagation thread to recheck if any
  // new packets can be propagated.
  ABSL_CHECK_EQ(propagation_state_, kPropagationPending);
  // task_timestamp_bound_ was updated while the propagation thread was
  // doing timestamp propagation. This thread will redo timestamp
  // propagation for the new task_timestamp_bound_.
  if (!calculator_context_manager_->HasActiveContexts()) {
    ABSL_CHECK_LT(bound_to_propagate, task_timestamp_bound_);
    propagation_state_ = kPropagatingBound;
    return;
  }
  *calculator_context =
      calculator_context_manager_->GetFrontCalculatorContext(context_timestamp);
  if (completed_input_timestamps_.empty() ||
      *context_timestamp != *completed_input_timestamps_.begin()) {
    // If there is no newly completed invocation or the newly arrived packets
    // are not ready for propagation, the propagation process is completed.
    propagation_state_ = kIdle;
    return;
  } else {
    // Found new packets to be propagated, and will redo packets
    // propagation.
    propagation_state_ = kPropagatingPackets;
    return;
  }
}

}  // namespace mediapipe