Skip to content

File threadpool.h

File List > deps > threadpool.h

Go to the documentation of this file

// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef MEDIAPIPE_DEPS_THREADPOOL_H_
#define MEDIAPIPE_DEPS_THREADPOOL_H_

#include <deque>
#include <functional>
#include <string>
#include <vector>

#include "absl/synchronization/mutex.h"
#include "mediapipe/framework/deps/thread_options.h"

namespace mediapipe {

// A thread pool consists of a set of threads that sit around waiting
// for callbacks to appear on a queue.  When that happens, one of the
// threads pulls a callback off the queue and runs it.
//
// The thread pool is shut down when the pool is destroyed.
//
// Sample usage:
//
// {
//   ThreadPool pool("testpool", num_workers);
//   pool.StartWorkers();
//   for (int i = 0; i < N; ++i) {
//     pool.Schedule([i]() { DoWork(i); });
//   }
// }
//
class ThreadPool {
 public:
  // Create a thread pool that provides a concurrency of "num_threads"
  // threads. I.e., if "num_threads" items are added, they are all
  // guaranteed to run concurrently without excessive delay.
  // It has an effectively infinite maximum queue length.
  // If num_threads is 1, the callbacks are run in FIFO order.
  explicit ThreadPool(int num_threads);
  ThreadPool(const ThreadPool&) = delete;
  ThreadPool& operator=(const ThreadPool&) = delete;

  // Like the ThreadPool(int num_threads) constructor, except that
  // it also associates "name_prefix" with each of the threads
  // in the thread pool.
  ThreadPool(const std::string& name_prefix, int num_threads);

  // Create a thread pool that creates and can use up to "num_threads"
  // threads.  Any standard thread options, such as stack size, should
  // be passed via "thread_options".  "name_prefix" specifies the
  // thread name prefix.
  ThreadPool(const ThreadOptions& thread_options,
             const std::string& name_prefix, int num_threads);

  // Waits for closures (if any) to complete. May be called without
  // having called StartWorkers().
  ~ThreadPool();

  // REQUIRES: StartWorkers has not been called
  // Actually start the worker threads.
  void StartWorkers();

  // REQUIRES: StartWorkers has been called
  // Add specified callback to queue of pending callbacks.  Eventually a
  // thread will pull this callback off the queue and execute it.
  void Schedule(std::function<void()> callback);

  // Provided for debugging and testing only.
  int num_threads() const;

  // Standard thread options.  Use this accessor to get them.
  const ThreadOptions& thread_options() const;

 private:
  class WorkerThread;
  void RunWorker();

  std::string name_prefix_;
  std::vector<WorkerThread*> threads_;
  int num_threads_;

  absl::Mutex mutex_;
  absl::CondVar condition_;
  bool stopped_ ABSL_GUARDED_BY(mutex_) = false;
  std::deque<std::function<void()>> tasks_ ABSL_GUARDED_BY(mutex_);

  ThreadOptions thread_options_;
};

namespace internal {

// Creates name for thread in a thread pool based on provided prefix and
// thread id. Length of the resulting name is guaranteed to be less or equal
// to 15. Name or thread id can be truncated to achieve that, see truncation
// samples below:
// name_prefix, 1234       -> name_prefix/123
// name_prefix, 1234567    -> name_prefix/123
// name_prefix_long, 1234  -> name_prefix_lon
std::string CreateThreadName(const std::string& prefix, int thread_id);

}  // namespace internal

}  // namespace mediapipe

#endif  // MEDIAPIPE_DEPS_THREADPOOL_H_