Back to Taskflow

Taskflow: A General

docs/wsq_8hpp_source.html

4.1.026.4 KB
Original Source

| | Taskflow: A General-purpose Task-parallel Programming System |

Loading...

Searching...

No Matches

wsq.hpp

1#pragma once

2

3#include <bit>

4

5#include "../utility/macros.hpp"

6#include "../utility/traits.hpp"

7

12

13#ifndef TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE

21 #define TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE 8

22#endif

23

24#ifndef TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE

32 #define TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE 10

33#endif

34

35namespace tf {

36

37// ----------------------------------------------------------------------------

38// Work-stealing queue steal protocol sentinels

39//

40// These free functions define the sentinel value used by steal operations

41// across all work-stealing queue types (BoundedWSQ, UnboundedWSQ). They encode

42// the result of a steal attempt into the return value itself, avoiding any

43// out-parameter or separate status type.

44//

45// For pointer types T:

46// wsq_empty_value<T>() = nullptr — queue was empty (or CAS was lost)

47//

48// The sentinel nullptr is returned whenever steal() cannot deliver a task,

49// whether due to the queue being genuinely empty or losing a concurrent CAS

50// race to another thief.

51//

52// For non-pointer types T, std::nullopt is returned to indicate the absence

53// of a value.

54//

55// Queue classes expose this as a static member function (empty_value) that

56// delegates here, so callers can use either form.

57// ----------------------------------------------------------------------------

58

66template <typename T>

67constexpr auto wsq_empty_value() {

68if constexpr (std::is_pointer_v<T>) {

69return T{nullptr};

70 } else {

71return std::optional<T>{std::nullopt};

72 }

73}

74

75// ----------------------------------------------------------------------------

76// Unbounded Work-stealing Queue (WSQ)

77// ----------------------------------------------------------------------------

78

79

104template <typename T>

105class UnboundedWSQ {

106

107struct Array {

108

109size_t C;

110size_t M;

111 std::atomic<T>* S;

112

113explicit Array(size_t c) :

114 C {c},

115 M {c-1},

116 S {new std::atomic<T>[C]} {

117 }

118

119 ~Array() {

120delete [] S;

121 }

122

123size_t capacity() const noexcept {

124return C;

125 }

126

127void push(int64_t i, T o) noexcept {

128 S[i & M].store(o, std::memory_order_relaxed);

129 }

130

131 T pop(int64_t i) noexcept {

132return S[i & M].load(std::memory_order_relaxed);

133 }

134

135 Array* resize(int64_t b, int64_t t) {

136 Array* ptr = new Array(2*C);

137for(int64_t i=t; i!=b; ++i) {

138 ptr->push(i, pop(i));

139 }

140return ptr;

141 }

142

143 Array* resize(int64_t b, int64_t t, size_t N) {

144// assert(N>0);

145 Array* ptr = new Array(std::bit_ceil(C + N));

146for(int64_t i=t; i!=b; ++i) {

147 ptr->push(i, pop(i));

148 }

149return ptr;

150 }

151

152 };

153

154alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _top;

155alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _bottom;

156

157// Owner-private cached upper bound on _top. Never read by thieves.

158// Because _top is never decremented, the real occupancy can only be

159// smaller than what is computed using this cached value, so using it

160// for the overflow check is always safe.

161 int64_t _cached_top {0};

162

163// _array on its own cache line: avoids false-sharing with _bottom when

164// thieves load _array (consume) after reading _bottom (acquire).

165alignas(TF_CACHELINE_SIZE) std::atomic<Array*> _array;

166 std::vector<Array*> _garbage;

167

168public:

169

186using value_type = std::conditional_t<std::is_pointer_v<T>, T, std::optional<T>>;

187

198explicit UnboundedWSQ(int64_t LogSize = TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE);

199

203~UnboundedWSQ();

204

215bool empty() const noexcept;

216

227size_t size() const noexcept;

228

242size_t capacity() const noexcept;

243

264void push(T item);

265

293 template <typename I>

294void bulk_push(I& first, size_t N);

295

316value_type pop();

317

338value_type steal();

339

352 static constexpr auto empty_value() { return wsq_empty_value<T>(); }

353

354private:

355

356 Array* _resize_array(Array* a, int64_t b, int64_t t);

357 Array* _resize_array(Array* a, int64_t b, int64_t t, size_t N);

358};

359

360// Constructor

361template <typename T>

362UnboundedWSQ<T>::UnboundedWSQ(int64_t LogSize) {

363 _top.store(0, std::memory_order_relaxed);

364 _bottom.store(0, std::memory_order_relaxed);

365 _array.store(new Array{(size_t{1} << LogSize)}, std::memory_order_relaxed);

366 _garbage.reserve(32);

367}

368

369// Destructor

370template <typename T>

371UnboundedWSQ<T>::~UnboundedWSQ() {

372for(auto a : _garbage) {

373delete a;

374 }

375delete _array.load();

376}

377

378// Function: empty

379template <typename T>

380bool UnboundedWSQ<T>::empty() const noexcept {

381 int64_t t = _top.load(std::memory_order_relaxed);

382 int64_t b = _bottom.load(std::memory_order_relaxed);

383return (b <= t);

384}

385

386// Function: size

387template <typename T>

388size_t UnboundedWSQ<T>::size() const noexcept {

389 int64_t t = _top.load(std::memory_order_relaxed);

390 int64_t b = _bottom.load(std::memory_order_relaxed);

391return static_cast<size_t>(b >= t ? b - t : 0);

392}

393

394// Function: push

395template <typename T>

396void UnboundedWSQ<T>::push(T o) {

397

398 int64_t b = _bottom.load(std::memory_order_relaxed);

399 Array* a = _array.load(std::memory_order_relaxed);

400

401// queue is full with one additional item (b-t+1)

402if(a->capacity() < static_cast<size_t>(b - _cached_top + 1)) [[unlikely]] {

403 _cached_top = _top.load(std::memory_order_acquire);

404if(a->capacity() < static_cast<size_t>(b - _cached_top + 1)) [[unlikely]] {

405 a = _resize_array(a, b, _cached_top);

406 }

407 }

408

409 a->push(b, o);

410 std::atomic_thread_fence(std::memory_order_release);

411

412// original paper uses relaxed here but tsa complains

413 _bottom.store(b + 1, std::memory_order_release);

414}

415

416// Function: bulk_push

417template <typename T>

418template <typename I>

419void UnboundedWSQ<T>::bulk_push(I& first, size_t N) {

420

421if(N == 0) return;

422

423 int64_t b = _bottom.load(std::memory_order_relaxed);

424 Array* a = _array.load(std::memory_order_relaxed);

425

426// queue is full with N additional items

427if((b - _cached_top + N) > a->capacity()) [[unlikely]] {

428 _cached_top = _top.load(std::memory_order_acquire);

429if((b - _cached_top + N) > a->capacity()) [[unlikely]] {

430 a = _resize_array(a, b, _cached_top, N);

431 }

432 }

433

434for(size_t i=0; i<N; ++i) {

435 a->push(b++, *first++);

436 }

437 std::atomic_thread_fence(std::memory_order_release);

438

439// original paper uses relaxed here but tsa complains

440 _bottom.store(b, std::memory_order_release);

441}

442

443// Function: pop

444template <typename T>

445typename UnboundedWSQ<T>::value_type

446UnboundedWSQ<T>::pop() {

447

448 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;

449 Array* a = _array.load(std::memory_order_relaxed);

450 _bottom.store(b, std::memory_order_relaxed);

451 std::atomic_thread_fence(std::memory_order_seq_cst);

452 int64_t t = _top.load(std::memory_order_relaxed);

453

454//T item {nullptr};

455auto item = empty_value();

456

457if(t <= b) {

458 item = a->pop(b);

459if(t == b) {

460// the last item just got stolen

461if(!_top.compare_exchange_strong(t, t+1, std::memory_order_seq_cst,

462 std::memory_order_relaxed)) {

463//item = nullptr;

464 item = empty_value();

465 }

466 _bottom.store(b + 1, std::memory_order_relaxed);

467 }

468 }

469else {

470 _bottom.store(b + 1, std::memory_order_relaxed);

471 }

472

473return item;

474}

475

476// Function: steal

477template <typename T>

478typename UnboundedWSQ<T>::value_type

479UnboundedWSQ<T>::steal() {

480

481 int64_t t = _top.load(std::memory_order_acquire);

482 std::atomic_thread_fence(std::memory_order_seq_cst);

483 int64_t b = _bottom.load(std::memory_order_acquire);

484

485//T item {nullptr};

486auto item = empty_value();

487

488if(t < b) {

489 Array* a = _array.load(std::memory_order_consume);

490 item = a->pop(t);

491if(!_top.compare_exchange_strong(t, t+1,

492 std::memory_order_seq_cst,

493 std::memory_order_relaxed)) {

494//return nullptr;

495return empty_value();

496 }

497 }

498

499return item;

500}

501

502

503// Function: capacity

504template <typename T>

505size_t UnboundedWSQ<T>::capacity() const noexcept {

506return _array.load(std::memory_order_relaxed)->capacity();

507}

508

509template <typename T>

510typename UnboundedWSQ<T>::Array*

511UnboundedWSQ<T>::_resize_array(Array* a, int64_t b, int64_t t) {

512 Array* tmp = a->resize(b, t);

513 _garbage.push_back(a);

514// Note: the original paper using relaxed causes t-san to complain

515 _array.store(tmp, std::memory_order_release);

516return tmp;

517}

518

519template <typename T>

520typename UnboundedWSQ<T>::Array*

521UnboundedWSQ<T>::_resize_array(Array* a, int64_t b, int64_t t, size_t N) {

522 Array* tmp = a->resize(b, t, N);

523 _garbage.push_back(a);

524// Note: the original paper using relaxed causes t-san to complain

525 _array.store(tmp, std::memory_order_release);

526return tmp;

527}

528

529// ----------------------------------------------------------------------------

530// Bounded Work-stealing Queue (WSQ)

531// ----------------------------------------------------------------------------

532

558template <typename T, size_t LogSize = TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE>

559class BoundedWSQ {

560

561constexpr static size_t BufferSize = size_t{1} << LogSize;

562constexpr static size_t BufferMask = (BufferSize - 1);

563

564static_assert((BufferSize >= 2) && ((BufferSize & (BufferSize - 1)) == 0));

565

566alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _top {0};

567alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _bottom {0};

568alignas(TF_CACHELINE_SIZE) std::atomic<T> _buffer[BufferSize];

569

570public:

571

588using value_type = std::conditional_t<std::is_pointer_v<T>, T, std::optional<T>>;

589

598BoundedWSQ() = default;

599

603~BoundedWSQ() = default;

604

615bool empty() const noexcept;

616

627size_t size() const noexcept;

628

639 constexpr size_t capacity() const;

640

663 template <typename O>

664bool try_push(O&& item);

665

692 template <typename I>

693size_t try_bulk_push(I& first, size_t N);

694

714value_type pop();

715

736value_type steal();

737

738

751 static constexpr auto empty_value() { return wsq_empty_value<T>(); }

752};

753

754// Function: empty

755template <typename T, size_t LogSize>

756bool BoundedWSQ<T, LogSize>::empty() const noexcept {

757 int64_t t = _top.load(std::memory_order_relaxed);

758 int64_t b = _bottom.load(std::memory_order_relaxed);

759return b <= t;

760}

761

762// Function: size

763template <typename T, size_t LogSize>

764size_t BoundedWSQ<T, LogSize>::size() const noexcept {

765 int64_t t = _top.load(std::memory_order_relaxed);

766 int64_t b = _bottom.load(std::memory_order_relaxed);

767return static_cast<size_t>(b >= t ? b - t : 0);

768}

769

770// Function: try_push

771template <typename T, size_t LogSize>

772template <typename O>

773bool BoundedWSQ<T, LogSize>::try_push(O&& o) {

774

775 int64_t b = _bottom.load(std::memory_order_relaxed);

776 int64_t t = _top.load(std::memory_order_acquire);

777

778// queue is full with one additional item (b-t+1)

779if(static_cast<size_t>(b - t + 1) > BufferSize) [[unlikely]] {

780return false;

781 }

782

783 _buffer[b & BufferMask].store(std::forward<O>(o), std::memory_order_relaxed);

784

785 std::atomic_thread_fence(std::memory_order_release);

786

787// original paper uses relaxed here but tsa complains

788 _bottom.store(b + 1, std::memory_order_release);

789

790return true;

791}

792

793// Function: try_bulk_push

794template <typename T, size_t LogSize>

795template <typename I>

796size_t BoundedWSQ<T, LogSize>::try_bulk_push(I& first, size_t N) {

797

798if(N == 0) return 0;

799

800 int64_t b = _bottom.load(std::memory_order_relaxed);

801 int64_t t = _top.load(std::memory_order_acquire);

802

803size_t r = BufferSize - (b - t); // remaining capacity

804size_t n = std::min(N, r); // number of pushable elements

805

806if(n > 0) {

807// push n elements into the queue

808for(size_t i=0; i<n; ++i) {

809 _buffer[b++ & BufferMask].store(*first++, std::memory_order_relaxed);

810 }

811 std::atomic_thread_fence(std::memory_order_release);

812// original paper uses relaxed here but tsa complains

813 _bottom.store(b, std::memory_order_release);

814 }

815

816return n;

817}

818

819// Function: pop

820template <typename T, size_t LogSize>

821typename BoundedWSQ<T, LogSize>::value_type

822BoundedWSQ<T, LogSize>::pop() {

823

824 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;

825 _bottom.store(b, std::memory_order_relaxed);

826 std::atomic_thread_fence(std::memory_order_seq_cst);

827 int64_t t = _top.load(std::memory_order_relaxed);

828

829//T item {nullptr};

830auto item = empty_value();

831

832if(t <= b) {

833 item = _buffer[b & BufferMask].load(std::memory_order_relaxed);

834if(t == b) {

835// the last item just got stolen

836if(!_top.compare_exchange_strong(t, t+1,

837 std::memory_order_seq_cst,

838 std::memory_order_relaxed)) {

839//item = nullptr;

840 item = empty_value();

841 }

842 _bottom.store(b + 1, std::memory_order_relaxed);

843 }

844 }

845else {

846 _bottom.store(b + 1, std::memory_order_relaxed);

847 }

848

849return item;

850}

851

852// Function: steal

853template <typename T, size_t LogSize>

854typename BoundedWSQ<T, LogSize>::value_type

855BoundedWSQ<T, LogSize>::steal() {

856 int64_t t = _top.load(std::memory_order_acquire);

857 std::atomic_thread_fence(std::memory_order_seq_cst);

858 int64_t b = _bottom.load(std::memory_order_acquire);

859

860//T item{nullptr};

861auto item = empty_value();

862

863if(t < b) {

864 item = _buffer[t & BufferMask].load(std::memory_order_relaxed);

865if(!_top.compare_exchange_strong(t, t+1,

866 std::memory_order_seq_cst,

867 std::memory_order_relaxed)) {

868//return nullptr;

869return empty_value();

870 }

871 }

872

873return item;

874}

875

876

877// Function: capacity

878template <typename T, size_t LogSize>

879constexpr size_t BoundedWSQ<T, LogSize>::capacity() const {

880return BufferSize;

881}

882

883

884} // end of namespace tf -----------------------------------------------------

tf::BoundedWSQ< Node * >::pop

value_type pop()

tf::BoundedWSQ< Node * >::try_bulk_push

size_t try_bulk_push(I &first, size_t N)

tf::BoundedWSQ::~BoundedWSQ

~BoundedWSQ()=default

destructs the queue

tf::BoundedWSQ< Node * >::steal

value_type steal()

tf::BoundedWSQ< Node * >::capacity

constexpr size_t capacity() const

tf::BoundedWSQ::BoundedWSQ

BoundedWSQ()=default

constructs the queue with a given capacity

tf::BoundedWSQ< Node * >::try_push

bool try_push(O &&item)

tf::BoundedWSQ::value_type

std::conditional_t< std::is_pointer_v< T >, T, std::optional< T > > value_type

the return type of queue operations

Definition wsq.hpp:588

tf::BoundedWSQ< Node * >::size

size_t size() const noexcept

tf::BoundedWSQ< Node * >::empty_value

static constexpr auto empty_value()

Definition wsq.hpp:751

tf::BoundedWSQ::empty

bool empty() const noexcept

queries if the queue is empty at the time of this call

Definition wsq.hpp:756

tf::UnboundedWSQ::empty

bool empty() const noexcept

queries if the queue is empty at the time of this call

Definition wsq.hpp:380

tf::UnboundedWSQ::pop

value_type pop()

pops out an item from the queue

Definition wsq.hpp:446

tf::UnboundedWSQ< Node * >::steal

value_type steal()

Definition wsq.hpp:479

tf::UnboundedWSQ::UnboundedWSQ

UnboundedWSQ(int64_t LogSize=TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE)

constructs the queue with the given size in the base-2 logarithm

Definition wsq.hpp:362

tf::UnboundedWSQ::push

void push(T item)

inserts an item to the queue

Definition wsq.hpp:396

tf::UnboundedWSQ< Node * >::size

size_t size() const noexcept

Definition wsq.hpp:388

tf::UnboundedWSQ< Node * >::bulk_push

void bulk_push(I &first, size_t N)

Definition wsq.hpp:419

tf::UnboundedWSQ::value_type

std::conditional_t< std::is_pointer_v< T >, T, std::optional< T > > value_type

the return type of queue operations

Definition wsq.hpp:186

tf::UnboundedWSQ::capacity

size_t capacity() const noexcept

queries the capacity of the queue

Definition wsq.hpp:505

tf::UnboundedWSQ::empty_value

static constexpr auto empty_value()

returns the empty sentinel value for the queue element type

Definition wsq.hpp:352

tf::UnboundedWSQ::~UnboundedWSQ

~UnboundedWSQ()

destructs the queue

Definition wsq.hpp:371

tf

taskflow namespace

Definition small_vector.hpp:20

tf::wsq_empty_value

constexpr auto wsq_empty_value()

returns the empty sentinel for work-stealing steal operations

Definition wsq.hpp:67