docs/nonblocking__notifier_8hpp_source.html
| | Taskflow: A General-purpose Task-parallel Programming System |
Loading...
Searching...
No Matches
nonblocking_notifier.hpp
1#pragma once
2
3#include <iostream>
4#include <vector>
5#include <cstdlib>
6#include <cstdio>
7#include <atomic>
8#include <memory>
9#include <deque>
10#include <mutex>
11#include <condition_variable>
12#include <thread>
13#include <algorithm>
14#include <numeric>
15#include <cassert>
16#include "../utility/os.hpp"
17
22
23namespace tf {
24
84class NonblockingNotifier {
85
86friend class Executor;
87
88struct Waiter {
89alignas (TF_CACHELINE_SIZE) std::atomic<Waiter*> next;
90 uint64_t epoch;
91
92 enum : unsigned {
93 kNotSignaled = 0,
94 kWaiting,
95 kSignaled,
96 };
97 std::atomic<unsigned> state {0};
98
99//mutable std::mutex mu;
100//std::condition_variable cv;
101//unsigned state;
102 };
103
104public:
105
106// The state variable consists of the following three parts:
107// - low STACK_BITS is a stack of waiters committed wait.
108// - next PREWAITER_BITS is count of waiters in prewait state.
109// - next EPOCH_BITS is modification counter.
110// [32-bit epoch | 16-bit pre-waiter count | 16-bit pre-waiter stack]
111
113static const uint64_t STACK_BITS = 16;
114
116static const uint64_t STACK_MASK = (1ull << STACK_BITS) - 1;
117
119static const uint64_t PREWAITER_BITS = 16;
120
122static const uint64_t PREWAITER_SHIFT = 16;
123
125static const uint64_t PREWAITER_MASK = ((1ull << PREWAITER_BITS) - 1) << PREWAITER_SHIFT;
126
128static const uint64_t PREWAITER_INC = 1ull << PREWAITER_BITS;
129
131static const uint64_t EPOCH_BITS = 32;
132
134static const uint64_t EPOCH_SHIFT = 32;
135
137static const uint64_t EPOCH_MASK = ((1ull << EPOCH_BITS) - 1) << EPOCH_SHIFT;
138
140static const uint64_t EPOCH_INC = 1ull << EPOCH_SHIFT;
141
151explicit NonblockingNotifier(size_t N) : _state(STACK_MASK), _waiters(N) {
152if(_waiters.size() >= ((1 << PREWAITER_BITS) - 1)) {
153 TF_THROW("nonblocking waiter supports only up to ", (1<<PREWAITER_BITS)-1, " waiters");
154 }
155//assert(_waiters.size() < (1 << PREWAITER_BITS) - 1);
156// Initialize epoch to something close to overflow to test overflow.
157//_state = STACK_MASK | (EPOCH_MASK - EPOCH_INC * _waiters.size() * 2);
158 }
159
164// Ensure there are no waiters.
165 assert((_state.load() & (STACK_MASK | PREWAITER_MASK)) == STACK_MASK);
166 }
167
176size_t num_waiters() const {
177size_t n = 0;
178for(auto& w : _waiters) {
179 n += (w.state.load(std::memory_order_relaxed) == Waiter::kWaiting);
180//std::scoped_lock lock(w.mu);
181//n += (w.state == Waiter::kWaiting);
182 }
183return n;
184 }
185
193return 1 << STACK_BITS;
194 }
195
212void prepare_wait(size_t wid) {
213 _waiters[wid].epoch = _state.fetch_add(PREWAITER_INC, std::memory_order_relaxed);
214 std::atomic_thread_fence(std::memory_order_seq_cst);
215 }
216
235void commit_wait(size_t wid) {
236
237auto w = &_waiters[wid];
238
239 w->state.store(Waiter::kNotSignaled, std::memory_order_relaxed);
240
241/*
242 Epoch and ticket semantics.
243
244 sepoch = _state & EPOCH_MASK
245 wepoch = w->epoch & EPOCH_MASK
246 ticket = w->epoch & PREWAITER_MASK
247
248 Each waiter entering the pre-waiting stage is assigned a monotonically
249 increasing ticket that determines the processing order (e.g.,
250 cancel_wait, commit_wait, notify). Ticket 0 is processed first, followed
251 by ticket 1, and so on.
252
253 The global epoch sepoch is incremented whenever a request is fulfilled.
254 Therefore, the difference sepoch - wepoch indicates which ticket is
255 currently ready to be handled:
256
257 - sepoch - wepoch == ticket : this waiter's turn
258 - sepoch - wepoch \> ticket : this waiter's ticket has expired
259 - sepoch - wepoch \< ticket : this waiter's ticket has not yet reached
260
261 Unsigned wraparound does not affect correctness. All epoch arithmetic is
262 performed using unsigned integers, which obey modulo-2^N arithmetic.
263 Converting the unsigned difference to a signed value yields the correct
264 result as long as the true difference lies within the signed range.
265
266 In general:
267 - Unsigned range: [0, 2^N − 1]
268 - Signed range : [−2^(N−1), 2^(N−1) − 1]
269
270 When overflow occurs, unsigned subtraction computes:
271
272 (sepoch − wepoch) mod 2^N
273
274 If the true value of sepoch − wepoch is within the signed range
275 [−2^(N−1), 2^(N−1) − 1], reinterpreting this result as a signed integer
276 produces the correct mathematical difference.
277
278 Example (3-bit arithmetic):
279
280 a b | true a−b | unsigned (bin / dec) | signed (dec)
281 ----------------------------------------------------
282 1 0 | 1 | 001 / 1 | +1
283 1 1 | 0 | 000 / 0 | 0
284 1 2 | -1 | 111 / 7 | -1
285 1 3 | -2 | 110 / 6 | -2
286 1 4 | -3 | 101 / 5 | -3
287 1 5 | -4 | 100 / 4 | -4
288 1 6 | -5 | 011 / 3 | +3 (wrap around)
289 1 7 | -6 | 010 / 2 | +2 (wrap around)
290
291 Signed interpretation is correct only when the true difference lies
292 within [−4, +3].
293
294 In this implementation, sepoch − wepoch is guaranteed not to exceed
295 2^16 in magnitude, which is far smaller than 2^(EPOCH_BITS − 1).
296 Consequently, the expression:
297
298 int64_t((state & EPOCH_MASK) - epoch)
299
300 remains correct even if sepoch and wepoch individually overflow.
301 */
302 uint64_t epoch =
303 (w->epoch & EPOCH_MASK) +
304 (((w->epoch & PREWAITER_MASK) >> PREWAITER_SHIFT) << EPOCH_SHIFT);
305 uint64_t state = _state.load(std::memory_order_seq_cst);
306for (;;) {
307if (int64_t((state & EPOCH_MASK) - epoch) < 0) {
308// The preceding waiter has not decided on its fate. Wait until it
309// calls either cancel_wait or commit_wait, or is notified.
310 std::this_thread::yield();
311 state = _state.load(std::memory_order_seq_cst);
312continue;
313 }
314// We've already been notified.
315if (int64_t((state & EPOCH_MASK) - epoch) > 0) {
316return;
317 }
318// Remove this thread from prewait counter and add it to the waiter stack.
319 assert((state & PREWAITER_MASK) != 0);
320 uint64_t newstate = state - PREWAITER_INC + EPOCH_INC;
321 newstate = (newstate & ~STACK_MASK) | wid;
322
323// stack is empty -> this waiter is at the top of the stack, pointing to nothing
324if ((state & STACK_MASK) == STACK_MASK) {
325 w->next.store(nullptr, std::memory_order_relaxed);
326 }
327// stack is non-empty -> this waiter is at the top of the stack, pointing to the origin top
328else {
329 w->next.store(&_waiters[state & STACK_MASK], std::memory_order_relaxed);
330 }
331if (_state.compare_exchange_weak(state, newstate, std::memory_order_release)) {
332break;
333 }
334 }
335 _park(w);
336 }
337
356void cancel_wait(size_t wid) {
357 uint64_t epoch =
358 (_waiters[wid].epoch & EPOCH_MASK) +
359 (((_waiters[wid].epoch & PREWAITER_MASK) >> PREWAITER_SHIFT) << EPOCH_SHIFT);
360 uint64_t state = _state.load(std::memory_order_relaxed);
361for (;;) {
362if (int64_t((state & EPOCH_MASK) - epoch) < 0) {
363// The preceding waiter has not decided on its fate. Wait until it
364// calls either cancel_wait or commit_wait, or is notified.
365 std::this_thread::yield();
366 state = _state.load(std::memory_order_relaxed);
367continue;
368 }
369// We've already been notified.
370if (int64_t((state & EPOCH_MASK) - epoch) > 0) {
371return;
372 }
373// Remove this thread from prewait counter.
374 assert((state & PREWAITER_MASK) != 0);
375if (_state.compare_exchange_weak(state, state - PREWAITER_INC + EPOCH_INC,
376 std::memory_order_relaxed)) {
377return;
378 }
379 }
380 }
381
389void notify_one() {
390 std::atomic_thread_fence(std::memory_order_seq_cst);
391 uint64_t state = _state.load(std::memory_order_acquire);
392for (;;) {
393// Easy case: no waiters.
394if ((state & STACK_MASK) == STACK_MASK && (state & PREWAITER_MASK) == 0) {
395return;
396 }
397 uint64_t num_prewaiters = (state & PREWAITER_MASK) >> PREWAITER_SHIFT;
398 uint64_t newstate;
399if (num_prewaiters) {
400// There is a thread in pre-wait state, unblock it.
401 newstate = state + EPOCH_INC - PREWAITER_INC;
402 }
403else {
404// Pop a waiter from list and unpark it.
405 Waiter* w = &_waiters[state & STACK_MASK];
406 Waiter* wnext = w->next.load(std::memory_order_relaxed);
407 uint64_t next = STACK_MASK;
408//if (wnext != nullptr) next = wnext - &_waiters[0];
409if (wnext != nullptr) {
410 next = static_cast<uint64_t>(wnext - &_waiters[0]);
411 }
412// Note: we don't add EPOCH_INC here. ABA problem on the lock-free stack
413// can't happen because a waiter is re-pushed onto the stack only after
414// it was in the pre-wait state which inevitably leads to epoch increment.
415 newstate = (state & EPOCH_MASK) + next;
416 }
417if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {
418if(num_prewaiters) {
419return; // unblocked pre-wait thread
420 }
421// if there is no pre-waiters, the stack must have something
422 Waiter* w = &_waiters[state & STACK_MASK];
423 w->next.store(nullptr, std::memory_order_relaxed);
424 _unpark(w);
425return;
426 }
427 }
428 }
429
437void notify_all() {
438 std::atomic_thread_fence(std::memory_order_seq_cst);
439 uint64_t state = _state.load(std::memory_order_acquire);
440for (;;) {
441
442// Easy case: no waiters.
443if ((state & STACK_MASK) == STACK_MASK && (state & PREWAITER_MASK) == 0) {
444return;
445 }
446 uint64_t num_prewaiters = (state & PREWAITER_MASK) >> PREWAITER_SHIFT;
447
448// Reset prewait counter and empty wait list.
449 uint64_t newstate = (state & EPOCH_MASK) + (EPOCH_INC * num_prewaiters) + STACK_MASK;
450
451if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {
452if ((state & STACK_MASK) == STACK_MASK) {
453return;
454 }
455 Waiter* w = &_waiters[state & STACK_MASK];
456 _unpark(w);
457return;
458 }
459 }
460 }
461
474
475// trivial case
476if(N == 0) {
477return;
478 }
479
480// if the target N is bigger than the waiter size, notify all waiters
481if(N >= _waiters.size()) {
482notify_all();
483return;
484 }
485
486 std::atomic_thread_fence(std::memory_order_seq_cst);
487 uint64_t state = _state.load(std::memory_order_acquire);
488do {
489// Easy case: no waiters.
490if ((state & STACK_MASK) == STACK_MASK && (state & PREWAITER_MASK) == 0) {
491return;
492 }
493 uint64_t num_prewaiters = (state & PREWAITER_MASK) >> PREWAITER_SHIFT;
494 uint64_t newstate;
495size_t newN;
496
497// unblock waiters from pre-waiting list first.
498if(num_prewaiters) {
499size_t to_unblock = (N < num_prewaiters) ? N : num_prewaiters;
500 newstate = state + (EPOCH_INC * to_unblock) - (PREWAITER_INC * to_unblock);
501 newN = N - to_unblock;
502 }
503// pop one waiter from the stack
504else {
505 Waiter* w = &_waiters[state & STACK_MASK];
506 Waiter* wnext = w->next.load(std::memory_order_relaxed);
507 uint64_t next = STACK_MASK;
508//if (wnext != nullptr) next = wnext - &_waiters[0];
509if (wnext != nullptr) {
510 next = static_cast<uint64_t>(wnext - &_waiters[0]);
511 }
512// Note: we don't add EPOCH_INC here. ABA problem on the lock-free stack
513// can't happen because a waiter is re-pushed onto the stack only after
514// it was in the pre-wait state which inevitably leads to epoch increment.
515 newstate = (state & EPOCH_MASK) + next;
516 newN = N - 1;
517 }
518
519if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {
520 N = newN;
521if(num_prewaiters == 0) {
522 Waiter* w = &_waiters[state & STACK_MASK];
523 w->next.store(nullptr, std::memory_order_relaxed);
524 _unpark(w);
525 }
526 }
527 } while(N > 0);
528
529//if(n >= _waiters.size()) {
530// notify_all();
531//}
532//else {
533// for(size_t k=0; k<n; ++k) {
534// notify_one();
535// }
536//}
537 }
538
547return _waiters.size();
548 }
549
550 private:
551
552 std::atomic<uint64_t> _state;
553 std::vector<Waiter> _waiters;
554
555// only this waiter can park itself, with the following two possible paths:
556// 1. kNotSignaled (this) -> in-stack -> kWaiting (this) -> wait
557// 2. kNotSignaled (this) -> in-stack -> kSignaled -> unwait
558void _park(Waiter* w) {
559unsigned target = Waiter::kNotSignaled;
560if(w->state.compare_exchange_strong(target, Waiter::kWaiting, std::memory_order_relaxed
561 , std::memory_order_relaxed)) {
562 w->state.wait(Waiter::kWaiting, std::memory_order_relaxed);
563 }
564//std::unique_lock<std::mutex> lock(w->mu);
565//while (w->state != Waiter::kSignaled) {
566// w->state = Waiter::kWaiting;
567// w->cv.wait(lock);
568//}
569 }
570
571// others can unpark
572void _unpark(Waiter* waiters) {
573 Waiter* next = nullptr;
574for (Waiter* w = waiters; w; w = next) {
575 next = w->next.load(std::memory_order_relaxed);
576// We only notify if the other is waiting - this is why we use tri-state
577// variable instead of binary-state variable (i.e., atomic_flag)
578// Performance is about 0.1% faster
579if(w->state.exchange(Waiter::kSignaled, std::memory_order_relaxed) == Waiter::kWaiting) {
580 w->state.notify_one();
581 }
582//unsigned state;
583//{
584// std::unique_lock<std::mutex> lock(w->mu);
585// state = w->state;
586// w->state = Waiter::kSignaled;
587//}
589//if (state == Waiter::kWaiting) w->cv.notify_one();
590 }
591 }
592
593// notify wakes one or all waiting threads.
594// Must be called after changing the associated wait predicate.
595//void _notify(bool all) {
596// std::atomic_thread_fence(std::memory_order_seq_cst);
597// uint64_t state = _state.load(std::memory_order_acquire);
598// for (;;) {
599// // Easy case: no waiters.
600// if ((state & STACK_MASK) == STACK_MASK && (state & PREWAITER_MASK) == 0) {
601// return;
602// }
603// uint64_t num_prewaiters = (state & PREWAITER_MASK) >> PREWAITER_SHIFT;
604// uint64_t newstate;
605// if (all) {
606// // Reset prewait counter and empty wait list.
607// newstate = (state & EPOCH_MASK) + (EPOCH_INC * num_prewaiters) + STACK_MASK;
608// } else if (num_prewaiters) {
609// // There is a thread in pre-wait state, unblock it.
610// newstate = state + EPOCH_INC - PREWAITER_INC;
611// } else {
612// // Pop a waiter from list and unpark it.
613// Waiter* w = &_waiters[state & STACK_MASK];
614// Waiter* wnext = w->next.load(std::memory_order_relaxed);
615// uint64_t next = STACK_MASK;
616// //if (wnext != nullptr) next = wnext - &_waiters[0];
617// if (wnext != nullptr) {
618// next = static_cast<uint64_t>(wnext - &_waiters[0]);
619// }
620// // Note: we don't add EPOCH_INC here. ABA problem on the lock-free stack
621// // can't happen because a waiter is re-pushed onto the stack only after
622// // it was in the pre-wait state which inevitably leads to epoch increment.
623// newstate = (state & EPOCH_MASK) + next;
624// }
625// if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {
626// if(!all && num_prewaiters) return; // unblocked pre-wait thread
627// if ((state & STACK_MASK) == STACK_MASK) return;
628// Waiter* w = &_waiters[state & STACK_MASK];
629// if(!all) {
630// w->next.store(nullptr, std::memory_order_relaxed);
631// }
632// _unpark(w);
633// return;
634// }
635// }
636//}
637};
638
639
640} // namespace tf ------------------------------------------------------------
641
tf::NonblockingNotifier::num_waiters
size_t num_waiters() const
returns the number of committed waiters
Definition nonblocking_notifier.hpp:176
tf::NonblockingNotifier::PREWAITER_SHIFT
static const uint64_t PREWAITER_SHIFT
Bit shift of the pre-waiter ticket field.
Definition nonblocking_notifier.hpp:122
tf::NonblockingNotifier::NonblockingNotifier
NonblockingNotifier(size_t N)
constructs a notifier with N waiters
Definition nonblocking_notifier.hpp:151
tf::NonblockingNotifier::EPOCH_SHIFT
static const uint64_t EPOCH_SHIFT
Bit shift of the epoch field.
Definition nonblocking_notifier.hpp:134
tf::NonblockingNotifier::cancel_wait
void cancel_wait(size_t wid)
cancels a previously prepared wait operation
Definition nonblocking_notifier.hpp:356
tf::NonblockingNotifier::prepare_wait
void prepare_wait(size_t wid)
prepares the calling thread to enter the waiting set
Definition nonblocking_notifier.hpp:212
tf::NonblockingNotifier::PREWAITER_INC
static const uint64_t PREWAITER_INC
Increment value for advancing the pre-waiter ticket.
Definition nonblocking_notifier.hpp:128
tf::NonblockingNotifier::EPOCH_BITS
static const uint64_t EPOCH_BITS
Number of bits used to encode the epoch counter.
Definition nonblocking_notifier.hpp:131
tf::NonblockingNotifier::capacity
size_t capacity() const
returns the maximum number of waiters supported by this notifier
Definition nonblocking_notifier.hpp:192
tf::NonblockingNotifier::EPOCH_MASK
static const uint64_t EPOCH_MASK
Bit mask for extracting the epoch field.
Definition nonblocking_notifier.hpp:137
tf::NonblockingNotifier::commit_wait
void commit_wait(size_t wid)
commits a previously prepared wait operation
Definition nonblocking_notifier.hpp:235
tf::NonblockingNotifier::EPOCH_INC
static const uint64_t EPOCH_INC
Increment value for advancing the epoch counter.
Definition nonblocking_notifier.hpp:140
tf::NonblockingNotifier::notify_one
void notify_one()
notifies one waiter from the waiting set
Definition nonblocking_notifier.hpp:389
tf::NonblockingNotifier::PREWAITER_MASK
static const uint64_t PREWAITER_MASK
Bit mask for extracting the pre-waiter ticket field.
Definition nonblocking_notifier.hpp:125
tf::NonblockingNotifier::PREWAITER_BITS
static const uint64_t PREWAITER_BITS
Number of bits used to encode the pre-waiter ticket.
Definition nonblocking_notifier.hpp:119
tf::NonblockingNotifier::STACK_MASK
static const uint64_t STACK_MASK
Bit mask for extracting the waiter stack index.
Definition nonblocking_notifier.hpp:116
tf::NonblockingNotifier::notify_n
void notify_n(size_t N)
notifies up to N waiters from the waiting set
Definition nonblocking_notifier.hpp:473
tf::NonblockingNotifier::notify_all
void notify_all()
notifies all waiter from the waiting set
Definition nonblocking_notifier.hpp:437
tf::NonblockingNotifier::~NonblockingNotifier
~NonblockingNotifier()
destructs the notifier
Definition nonblocking_notifier.hpp:163
tf::NonblockingNotifier::STACK_BITS
static const uint64_t STACK_BITS
Number of bits used to encode the waiter stack index.
Definition nonblocking_notifier.hpp:113
size_t size() const
returns the number of waiters supported by this notifier
Definition nonblocking_notifier.hpp:546
taskflow namespace
Definition small_vector.hpp:20