Back to Taskflow

Taskflow: A General

docs/nonblocking__notifier_8hpp_source.html

4.1.029.7 KB
Original Source

| | Taskflow: A General-purpose Task-parallel Programming System |

Loading...

Searching...

No Matches

nonblocking_notifier.hpp

1#pragma once

2

3#include <iostream>

4#include <vector>

5#include <cstdlib>

6#include <cstdio>

7#include <atomic>

8#include <memory>

9#include <deque>

10#include <mutex>

11#include <condition_variable>

12#include <thread>

13#include <algorithm>

14#include <numeric>

15#include <cassert>

16#include "../utility/os.hpp"

17

22

23namespace tf {

24

84class NonblockingNotifier {

85

86friend class Executor;

87

88struct Waiter {

89alignas (TF_CACHELINE_SIZE) std::atomic<Waiter*> next;

90 uint64_t epoch;

91

92 enum : unsigned {

93 kNotSignaled = 0,

94 kWaiting,

95 kSignaled,

96 };

97 std::atomic<unsigned> state {0};

98

99//mutable std::mutex mu;

100//std::condition_variable cv;

101//unsigned state;

102 };

103

104public:

105

106// The state variable consists of the following three parts:

107// - low STACK_BITS is a stack of waiters committed wait.

108// - next PREWAITER_BITS is count of waiters in prewait state.

109// - next EPOCH_BITS is modification counter.

110// [32-bit epoch | 16-bit pre-waiter count | 16-bit pre-waiter stack]

111

113static const uint64_t STACK_BITS = 16;

114

116static const uint64_t STACK_MASK = (1ull << STACK_BITS) - 1;

117

119static const uint64_t PREWAITER_BITS = 16;

120

122static const uint64_t PREWAITER_SHIFT = 16;

123

125static const uint64_t PREWAITER_MASK = ((1ull << PREWAITER_BITS) - 1) << PREWAITER_SHIFT;

126

128static const uint64_t PREWAITER_INC = 1ull << PREWAITER_BITS;

129

131static const uint64_t EPOCH_BITS = 32;

132

134static const uint64_t EPOCH_SHIFT = 32;

135

137static const uint64_t EPOCH_MASK = ((1ull << EPOCH_BITS) - 1) << EPOCH_SHIFT;

138

140static const uint64_t EPOCH_INC = 1ull << EPOCH_SHIFT;

141

151explicit NonblockingNotifier(size_t N) : _state(STACK_MASK), _waiters(N) {

152if(_waiters.size() >= ((1 << PREWAITER_BITS) - 1)) {

153 TF_THROW("nonblocking waiter supports only up to ", (1<<PREWAITER_BITS)-1, " waiters");

154 }

155//assert(_waiters.size() < (1 << PREWAITER_BITS) - 1);

156// Initialize epoch to something close to overflow to test overflow.

157//_state = STACK_MASK | (EPOCH_MASK - EPOCH_INC * _waiters.size() * 2);

158 }

159

163~NonblockingNotifier() {

164// Ensure there are no waiters.

165 assert((_state.load() & (STACK_MASK | PREWAITER_MASK)) == STACK_MASK);

166 }

167

176size_t num_waiters() const {

177size_t n = 0;

178for(auto& w : _waiters) {

179 n += (w.state.load(std::memory_order_relaxed) == Waiter::kWaiting);

180//std::scoped_lock lock(w.mu);

181//n += (w.state == Waiter::kWaiting);

182 }

183return n;

184 }

185

192size_t capacity() const {

193return 1 << STACK_BITS;

194 }

195

212void prepare_wait(size_t wid) {

213 _waiters[wid].epoch = _state.fetch_add(PREWAITER_INC, std::memory_order_relaxed);

214 std::atomic_thread_fence(std::memory_order_seq_cst);

215 }

216

235void commit_wait(size_t wid) {

236

237auto w = &_waiters[wid];

238

239 w->state.store(Waiter::kNotSignaled, std::memory_order_relaxed);

240

241/*

242 Epoch and ticket semantics.

243

244 sepoch = _state & EPOCH_MASK

245 wepoch = w->epoch & EPOCH_MASK

246 ticket = w->epoch & PREWAITER_MASK

247

248 Each waiter entering the pre-waiting stage is assigned a monotonically

249 increasing ticket that determines the processing order (e.g.,

250 cancel_wait, commit_wait, notify). Ticket 0 is processed first, followed

251 by ticket 1, and so on.

252

253 The global epoch sepoch is incremented whenever a request is fulfilled.

254 Therefore, the difference sepoch - wepoch indicates which ticket is

255 currently ready to be handled:

256

257 - sepoch - wepoch == ticket : this waiter's turn

258 - sepoch - wepoch \> ticket : this waiter's ticket has expired

259 - sepoch - wepoch \< ticket : this waiter's ticket has not yet reached

260

261 Unsigned wraparound does not affect correctness. All epoch arithmetic is

262 performed using unsigned integers, which obey modulo-2^N arithmetic.

263 Converting the unsigned difference to a signed value yields the correct

264 result as long as the true difference lies within the signed range.

265

266 In general:

267 - Unsigned range: [0, 2^N − 1]

268 - Signed range : [−2^(N−1), 2^(N−1) − 1]

269

270 When overflow occurs, unsigned subtraction computes:

271

272 (sepoch − wepoch) mod 2^N

273

274 If the true value of sepoch − wepoch is within the signed range

275 [−2^(N−1), 2^(N−1) − 1], reinterpreting this result as a signed integer

276 produces the correct mathematical difference.

277

278 Example (3-bit arithmetic):

279

280 a b | true a−b | unsigned (bin / dec) | signed (dec)

281 ----------------------------------------------------

282 1 0 | 1 | 001 / 1 | +1

283 1 1 | 0 | 000 / 0 | 0

284 1 2 | -1 | 111 / 7 | -1

285 1 3 | -2 | 110 / 6 | -2

286 1 4 | -3 | 101 / 5 | -3

287 1 5 | -4 | 100 / 4 | -4

288 1 6 | -5 | 011 / 3 | +3 (wrap around)

289 1 7 | -6 | 010 / 2 | +2 (wrap around)

290

291 Signed interpretation is correct only when the true difference lies

292 within [−4, +3].

293

294 In this implementation, sepoch − wepoch is guaranteed not to exceed

295 2^16 in magnitude, which is far smaller than 2^(EPOCH_BITS − 1).

296 Consequently, the expression:

297

298 int64_t((state & EPOCH_MASK) - epoch)

299

300 remains correct even if sepoch and wepoch individually overflow.

301 */

302 uint64_t epoch =

303 (w->epoch & EPOCH_MASK) +

304 (((w->epoch & PREWAITER_MASK) >> PREWAITER_SHIFT) << EPOCH_SHIFT);

305 uint64_t state = _state.load(std::memory_order_seq_cst);

306for (;;) {

307if (int64_t((state & EPOCH_MASK) - epoch) < 0) {

308// The preceding waiter has not decided on its fate. Wait until it

309// calls either cancel_wait or commit_wait, or is notified.

310 std::this_thread::yield();

311 state = _state.load(std::memory_order_seq_cst);

312continue;

313 }

314// We've already been notified.

315if (int64_t((state & EPOCH_MASK) - epoch) > 0) {

316return;

317 }

318// Remove this thread from prewait counter and add it to the waiter stack.

319 assert((state & PREWAITER_MASK) != 0);

320 uint64_t newstate = state - PREWAITER_INC + EPOCH_INC;

321 newstate = (newstate & ~STACK_MASK) | wid;

322

323// stack is empty -> this waiter is at the top of the stack, pointing to nothing

324if ((state & STACK_MASK) == STACK_MASK) {

325 w->next.store(nullptr, std::memory_order_relaxed);

326 }

327// stack is non-empty -> this waiter is at the top of the stack, pointing to the origin top

328else {

329 w->next.store(&_waiters[state & STACK_MASK], std::memory_order_relaxed);

330 }

331if (_state.compare_exchange_weak(state, newstate, std::memory_order_release)) {

332break;

333 }

334 }

335 _park(w);

336 }

337

356void cancel_wait(size_t wid) {

357 uint64_t epoch =

358 (_waiters[wid].epoch & EPOCH_MASK) +

359 (((_waiters[wid].epoch & PREWAITER_MASK) >> PREWAITER_SHIFT) << EPOCH_SHIFT);

360 uint64_t state = _state.load(std::memory_order_relaxed);

361for (;;) {

362if (int64_t((state & EPOCH_MASK) - epoch) < 0) {

363// The preceding waiter has not decided on its fate. Wait until it

364// calls either cancel_wait or commit_wait, or is notified.

365 std::this_thread::yield();

366 state = _state.load(std::memory_order_relaxed);

367continue;

368 }

369// We've already been notified.

370if (int64_t((state & EPOCH_MASK) - epoch) > 0) {

371return;

372 }

373// Remove this thread from prewait counter.

374 assert((state & PREWAITER_MASK) != 0);

375if (_state.compare_exchange_weak(state, state - PREWAITER_INC + EPOCH_INC,

376 std::memory_order_relaxed)) {

377return;

378 }

379 }

380 }

381

389void notify_one() {

390 std::atomic_thread_fence(std::memory_order_seq_cst);

391 uint64_t state = _state.load(std::memory_order_acquire);

392for (;;) {

393// Easy case: no waiters.

394if ((state & STACK_MASK) == STACK_MASK && (state & PREWAITER_MASK) == 0) {

395return;

396 }

397 uint64_t num_prewaiters = (state & PREWAITER_MASK) >> PREWAITER_SHIFT;

398 uint64_t newstate;

399if (num_prewaiters) {

400// There is a thread in pre-wait state, unblock it.

401 newstate = state + EPOCH_INC - PREWAITER_INC;

402 }

403else {

404// Pop a waiter from list and unpark it.

405 Waiter* w = &_waiters[state & STACK_MASK];

406 Waiter* wnext = w->next.load(std::memory_order_relaxed);

407 uint64_t next = STACK_MASK;

408//if (wnext != nullptr) next = wnext - &_waiters[0];

409if (wnext != nullptr) {

410 next = static_cast<uint64_t>(wnext - &_waiters[0]);

411 }

412// Note: we don't add EPOCH_INC here. ABA problem on the lock-free stack

413// can't happen because a waiter is re-pushed onto the stack only after

414// it was in the pre-wait state which inevitably leads to epoch increment.

415 newstate = (state & EPOCH_MASK) + next;

416 }

417if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {

418if(num_prewaiters) {

419return; // unblocked pre-wait thread

420 }

421// if there is no pre-waiters, the stack must have something

422 Waiter* w = &_waiters[state & STACK_MASK];

423 w->next.store(nullptr, std::memory_order_relaxed);

424 _unpark(w);

425return;

426 }

427 }

428 }

429

437void notify_all() {

438 std::atomic_thread_fence(std::memory_order_seq_cst);

439 uint64_t state = _state.load(std::memory_order_acquire);

440for (;;) {

441

442// Easy case: no waiters.

443if ((state & STACK_MASK) == STACK_MASK && (state & PREWAITER_MASK) == 0) {

444return;

445 }

446 uint64_t num_prewaiters = (state & PREWAITER_MASK) >> PREWAITER_SHIFT;

447

448// Reset prewait counter and empty wait list.

449 uint64_t newstate = (state & EPOCH_MASK) + (EPOCH_INC * num_prewaiters) + STACK_MASK;

450

451if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {

452if ((state & STACK_MASK) == STACK_MASK) {

453return;

454 }

455 Waiter* w = &_waiters[state & STACK_MASK];

456 _unpark(w);

457return;

458 }

459 }

460 }

461

473void notify_n(size_t N) {

474

475// trivial case

476if(N == 0) {

477return;

478 }

479

480// if the target N is bigger than the waiter size, notify all waiters

481if(N >= _waiters.size()) {

482notify_all();

483return;

484 }

485

486 std::atomic_thread_fence(std::memory_order_seq_cst);

487 uint64_t state = _state.load(std::memory_order_acquire);

488do {

489// Easy case: no waiters.

490if ((state & STACK_MASK) == STACK_MASK && (state & PREWAITER_MASK) == 0) {

491return;

492 }

493 uint64_t num_prewaiters = (state & PREWAITER_MASK) >> PREWAITER_SHIFT;

494 uint64_t newstate;

495size_t newN;

496

497// unblock waiters from pre-waiting list first.

498if(num_prewaiters) {

499size_t to_unblock = (N < num_prewaiters) ? N : num_prewaiters;

500 newstate = state + (EPOCH_INC * to_unblock) - (PREWAITER_INC * to_unblock);

501 newN = N - to_unblock;

502 }

503// pop one waiter from the stack

504else {

505 Waiter* w = &_waiters[state & STACK_MASK];

506 Waiter* wnext = w->next.load(std::memory_order_relaxed);

507 uint64_t next = STACK_MASK;

508//if (wnext != nullptr) next = wnext - &_waiters[0];

509if (wnext != nullptr) {

510 next = static_cast<uint64_t>(wnext - &_waiters[0]);

511 }

512// Note: we don't add EPOCH_INC here. ABA problem on the lock-free stack

513// can't happen because a waiter is re-pushed onto the stack only after

514// it was in the pre-wait state which inevitably leads to epoch increment.

515 newstate = (state & EPOCH_MASK) + next;

516 newN = N - 1;

517 }

518

519if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {

520 N = newN;

521if(num_prewaiters == 0) {

522 Waiter* w = &_waiters[state & STACK_MASK];

523 w->next.store(nullptr, std::memory_order_relaxed);

524 _unpark(w);

525 }

526 }

527 } while(N > 0);

528

529//if(n >= _waiters.size()) {

530// notify_all();

531//}

532//else {

533// for(size_t k=0; k<n; ++k) {

534// notify_one();

535// }

536//}

537 }

538

546size_t size() const {

547return _waiters.size();

548 }

549

550 private:

551

552 std::atomic<uint64_t> _state;

553 std::vector<Waiter> _waiters;

554

555// only this waiter can park itself, with the following two possible paths:

556// 1. kNotSignaled (this) -> in-stack -> kWaiting (this) -> wait

557// 2. kNotSignaled (this) -> in-stack -> kSignaled -> unwait

558void _park(Waiter* w) {

559unsigned target = Waiter::kNotSignaled;

560if(w->state.compare_exchange_strong(target, Waiter::kWaiting, std::memory_order_relaxed

561 , std::memory_order_relaxed)) {

562 w->state.wait(Waiter::kWaiting, std::memory_order_relaxed);

563 }

564//std::unique_lock<std::mutex> lock(w->mu);

565//while (w->state != Waiter::kSignaled) {

566// w->state = Waiter::kWaiting;

567// w->cv.wait(lock);

568//}

569 }

570

571// others can unpark

572void _unpark(Waiter* waiters) {

573 Waiter* next = nullptr;

574for (Waiter* w = waiters; w; w = next) {

575 next = w->next.load(std::memory_order_relaxed);

576// We only notify if the other is waiting - this is why we use tri-state

577// variable instead of binary-state variable (i.e., atomic_flag)

578// Performance is about 0.1% faster

579if(w->state.exchange(Waiter::kSignaled, std::memory_order_relaxed) == Waiter::kWaiting) {

580 w->state.notify_one();

581 }

582//unsigned state;

583//{

584// std::unique_lock<std::mutex> lock(w->mu);

585// state = w->state;

586// w->state = Waiter::kSignaled;

587//}

589//if (state == Waiter::kWaiting) w->cv.notify_one();

590 }

591 }

592

593// notify wakes one or all waiting threads.

594// Must be called after changing the associated wait predicate.

595//void _notify(bool all) {

596// std::atomic_thread_fence(std::memory_order_seq_cst);

597// uint64_t state = _state.load(std::memory_order_acquire);

598// for (;;) {

599// // Easy case: no waiters.

600// if ((state & STACK_MASK) == STACK_MASK && (state & PREWAITER_MASK) == 0) {

601// return;

602// }

603// uint64_t num_prewaiters = (state & PREWAITER_MASK) >> PREWAITER_SHIFT;

604// uint64_t newstate;

605// if (all) {

606// // Reset prewait counter and empty wait list.

607// newstate = (state & EPOCH_MASK) + (EPOCH_INC * num_prewaiters) + STACK_MASK;

608// } else if (num_prewaiters) {

609// // There is a thread in pre-wait state, unblock it.

610// newstate = state + EPOCH_INC - PREWAITER_INC;

611// } else {

612// // Pop a waiter from list and unpark it.

613// Waiter* w = &_waiters[state & STACK_MASK];

614// Waiter* wnext = w->next.load(std::memory_order_relaxed);

615// uint64_t next = STACK_MASK;

616// //if (wnext != nullptr) next = wnext - &_waiters[0];

617// if (wnext != nullptr) {

618// next = static_cast<uint64_t>(wnext - &_waiters[0]);

619// }

620// // Note: we don't add EPOCH_INC here. ABA problem on the lock-free stack

621// // can't happen because a waiter is re-pushed onto the stack only after

622// // it was in the pre-wait state which inevitably leads to epoch increment.

623// newstate = (state & EPOCH_MASK) + next;

624// }

625// if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {

626// if(!all && num_prewaiters) return; // unblocked pre-wait thread

627// if ((state & STACK_MASK) == STACK_MASK) return;

628// Waiter* w = &_waiters[state & STACK_MASK];

629// if(!all) {

630// w->next.store(nullptr, std::memory_order_relaxed);

631// }

632// _unpark(w);

633// return;

634// }

635// }

636//}

637};

638

639

640} // namespace tf ------------------------------------------------------------

641

tf::NonblockingNotifier::num_waiters

size_t num_waiters() const

returns the number of committed waiters

Definition nonblocking_notifier.hpp:176

tf::NonblockingNotifier::PREWAITER_SHIFT

static const uint64_t PREWAITER_SHIFT

Bit shift of the pre-waiter ticket field.

Definition nonblocking_notifier.hpp:122

tf::NonblockingNotifier::NonblockingNotifier

NonblockingNotifier(size_t N)

constructs a notifier with N waiters

Definition nonblocking_notifier.hpp:151

tf::NonblockingNotifier::EPOCH_SHIFT

static const uint64_t EPOCH_SHIFT

Bit shift of the epoch field.

Definition nonblocking_notifier.hpp:134

tf::NonblockingNotifier::cancel_wait

void cancel_wait(size_t wid)

cancels a previously prepared wait operation

Definition nonblocking_notifier.hpp:356

tf::NonblockingNotifier::prepare_wait

void prepare_wait(size_t wid)

prepares the calling thread to enter the waiting set

Definition nonblocking_notifier.hpp:212

tf::NonblockingNotifier::PREWAITER_INC

static const uint64_t PREWAITER_INC

Increment value for advancing the pre-waiter ticket.

Definition nonblocking_notifier.hpp:128

tf::NonblockingNotifier::EPOCH_BITS

static const uint64_t EPOCH_BITS

Number of bits used to encode the epoch counter.

Definition nonblocking_notifier.hpp:131

tf::NonblockingNotifier::capacity

size_t capacity() const

returns the maximum number of waiters supported by this notifier

Definition nonblocking_notifier.hpp:192

tf::NonblockingNotifier::EPOCH_MASK

static const uint64_t EPOCH_MASK

Bit mask for extracting the epoch field.

Definition nonblocking_notifier.hpp:137

tf::NonblockingNotifier::commit_wait

void commit_wait(size_t wid)

commits a previously prepared wait operation

Definition nonblocking_notifier.hpp:235

tf::NonblockingNotifier::EPOCH_INC

static const uint64_t EPOCH_INC

Increment value for advancing the epoch counter.

Definition nonblocking_notifier.hpp:140

tf::NonblockingNotifier::notify_one

void notify_one()

notifies one waiter from the waiting set

Definition nonblocking_notifier.hpp:389

tf::NonblockingNotifier::PREWAITER_MASK

static const uint64_t PREWAITER_MASK

Bit mask for extracting the pre-waiter ticket field.

Definition nonblocking_notifier.hpp:125

tf::NonblockingNotifier::PREWAITER_BITS

static const uint64_t PREWAITER_BITS

Number of bits used to encode the pre-waiter ticket.

Definition nonblocking_notifier.hpp:119

tf::NonblockingNotifier::STACK_MASK

static const uint64_t STACK_MASK

Bit mask for extracting the waiter stack index.

Definition nonblocking_notifier.hpp:116

tf::NonblockingNotifier::notify_n

void notify_n(size_t N)

notifies up to N waiters from the waiting set

Definition nonblocking_notifier.hpp:473

tf::NonblockingNotifier::notify_all

void notify_all()

notifies all waiter from the waiting set

Definition nonblocking_notifier.hpp:437

tf::NonblockingNotifier::~NonblockingNotifier

~NonblockingNotifier()

destructs the notifier

Definition nonblocking_notifier.hpp:163

tf::NonblockingNotifier::STACK_BITS

static const uint64_t STACK_BITS

Number of bits used to encode the waiter stack index.

Definition nonblocking_notifier.hpp:113

tf::NonblockingNotifier::size

size_t size() const

returns the number of waiters supported by this notifier

Definition nonblocking_notifier.hpp:546

tf

taskflow namespace

Definition small_vector.hpp:20