docs/object__pool_8hpp_source.html
| | Taskflow: A General-purpose Task-parallel Programming System |
Loading...
Searching...
No Matches
object_pool.hpp
1#pragma once
2
3#include <array>
4#include <cstddef>
5#include <cstdint>
6#include <memory_resource>
7#include <memory>
8#include <atomic>
9#include <thread>
10#include <utility>
11#include "os.hpp"
12
17
18namespace tf {
19
20// ----------------------------------------------------------------------------
21// TaggedHead128
22// ----------------------------------------------------------------------------
23
45struct TaggedHead128 {
46
50using pointer_type = uintptr_t;
51
56
60pointer_type ptr {0};
61
66
70TaggedHead128() = default;
71
82TaggedHead128(pointer_type p, tag_type t) noexcept : ptr{p}, tag{t} {}
83
92pointer_type get_ptr() const noexcept { return ptr; }
93
102tag_type get_tag() const noexcept { return tag; }
103};
104
105// ----------------------------------------------------------------------------
106// TaggedHead64
107// ----------------------------------------------------------------------------
108
151template <int PtrBits = TF_POINTER_BITS>
152struct TaggedHead64 {
153static_assert(64 - PtrBits >= 16,
154"TaggedHead64 requires at least 16 tag bits for ABA safety "
155"(PtrBits must be <= 48); use TaggedHead128 instead");
156
160using pointer_type = uintptr_t;
161
166
170static constexpr int PTR_BITS = PtrBits;
171
175static constexpr int TAG_BITS = 64 - PtrBits;
176
180static constexpr pointer_type PTR_MASK = (pointer_type{1} << PTR_BITS) - 1;
181
187
191TaggedHead64() = default;
192
203TaggedHead64(pointer_type p, tag_type t) noexcept
204 : bits{ (p & PTR_MASK) | (static_cast<uintptr_t>(t) << PTR_BITS) } {}
205
214pointer_type get_ptr() const noexcept { return bits & PTR_MASK; }
215
224tag_type get_tag() const noexcept { return static_cast<tag_type>(bits >> PTR_BITS); }
225};
226
227// ----------------------------------------------------------------------------
228// ObjectBlock
229// ----------------------------------------------------------------------------
230
251template <typename T>
252struct ObjectBlock {
253
257 uint16_t pool_id;
258
259// Intrusive free-list link. Must be atomic to avoid a formal data race
260// between push_free writing next_free and a concurrent pop_free reading it.
261//
262// The race arises from a *stale pointer* held by a thread that loses a CAS.
263// Consider this interleaving (free list: [b -> null]):
264//
265// Thread A (pop_free): loads _free_head = {b, 5} <-- cur.ptr = b
266// Thread B (pop_free): also loads {b, 5}, wins CAS first,
267// pops b, returns it to the caller
268// Thread C (push_free): caller recycles b;
269// C writes b->next_free <-- non-atomic WRITE
270// Thread A (pop_free): reads cur.ptr->next_free <-- non-atomic READ
271// (cur.ptr is the stale b!)
272//
273// Thread A's CAS will ultimately fail (the version tag changed), so there
274// is no algorithmic corruption — but the concurrent non-atomic read + write
275// on the same memory location is a formal data race and undefined behavior
276// per the C++ memory model. Making next_free atomic eliminates the race by
277// definition: two concurrent atomic accesses are never a data race.
278 std::atomic<ObjectBlock*> next_free {nullptr};
279
283alignas(T) std::byte storage[sizeof(T)];
284
288 T* object() noexcept {
289return std::launder(reinterpret_cast<T*>(storage));
290 }
291
295const T* object() const noexcept {
296return std::launder(reinterpret_cast<const T*>(storage));
297 }
298
306static ObjectBlock* from_object(T* obj) noexcept {
307return reinterpret_cast<ObjectBlock*>(
308reinterpret_cast<char*>(obj) - offsetof(ObjectBlock, storage)
309 );
310 }
311};
312
313// ----------------------------------------------------------------------------
314// ObjectPool
315// ----------------------------------------------------------------------------
316
407template <typename T, typename H = TaggedHead128, size_t LogSize = 5>
408class ObjectPool {
409
410static_assert(LogSize >= 1 && LogSize <= 15,
411"LogSize must be in [1, 15]");
412
413using Block = ObjectBlock<T>;
414
415static constexpr size_t NumPools = 1u << LogSize;
416
417struct alignas(TF_CACHELINE_SIZE) Shard {
418
419// Hot path: lock-free Treiber stack of recycled blocks.
420// _free_head sits on its own cache line (via the Shard alignas) so that
421// hot-path CAS does not invalidate the line holding _backing's mutex.
422 std::atomic<H> _free_head {H{}};
423
424// Cold path: backing allocator for fresh block memory.
425// alignas pushes _backing to the next cache line, separating it from the
426// hot _free_head above and preventing hot/cold false sharing.
427alignas(TF_CACHELINE_SIZE) std::pmr::synchronized_pool_resource _backing {
428 std::pmr::pool_options {
429 .max_blocks_per_chunk = 1024,
430 .largest_required_pool_block = sizeof(Block)
431 }
432 };
433
434void push_free(Block* b) noexcept {
435 H cur = _free_head.load(std::memory_order_relaxed);
436 H next;
437do {
438// relaxed: the release CAS below synchronises-with pop_free's acquire,
439// making this store visible to any thread that subsequently observes b
440// at the head of the list.
441 b->next_free.store(
442reinterpret_cast<Block*>(cur.get_ptr()), std::memory_order_relaxed);
443 next = H(
444reinterpret_cast<typename H::pointer_type>(b),
445static_cast<typename H::tag_type>(cur.get_tag() + 1)
446 );
447 } while (!_free_head.compare_exchange_weak(
448 cur, next,
449 std::memory_order_release, // publish next_free write to pop_free
450 std::memory_order_relaxed
451 ));
452 }
453
454 Block* pop_free() noexcept {
455 H cur = _free_head.load(std::memory_order_acquire);
456while (cur.get_ptr()) {
457auto* p = reinterpret_cast<Block*>(cur.get_ptr());
458// relaxed on next_free: the acquire on _free_head (either the load
459// above or the acquire failure ordering below) synchronises-with the
460// release CAS in push_free, so the next_free store that preceded it
461// is already visible to this thread.
462 Block* nx = p->next_free.load(std::memory_order_relaxed);
463 H next(
464reinterpret_cast<typename H::pointer_type>(nx),
465static_cast<typename H::tag_type>(cur.get_tag() + 1)
466 );
467if (_free_head.compare_exchange_weak(
468 cur, next,
469 std::memory_order_acquire, // success: synchronise with push_free
470 std::memory_order_acquire // failure: fresh cur must also synchronise
471 )) { // before the next next_free read
472return p;
473 }
474 }
475return nullptr;
476 }
477
478 Block* allocate_from_backing() {
479return static_cast<Block*>(
480 _backing.allocate(sizeof(Block), alignof(Block))
481 );
482 }
483
484void deallocate_to_backing(Block* b) {
485 _backing.deallocate(b, sizeof(Block), alignof(Block));
486 }
487 };
488
489 std::array<Shard, NumPools> _shards;
490
491// Returns the next shard index for this thread. The thread_local counter
492// is seeded once from the calling thread's ID hash, spreading different
493// threads across different starting shards with zero shared state.
494// Subsequent calls are a bare local increment — no atomic, no cache-line
495// traffic.
496//
497// If thread_local is broken (e.g. MSVC DLL with improper TLS), the counter
498// degrades to a single shared value and causes contention, but shard
499// selection remains correct.
500static size_t _next_shard() noexcept {
501thread_local size_t counter =
502 std::hash<std::thread::id>{}(std::this_thread::get_id());
503return counter++ & (NumPools - 1);
504 }
505
506public:
507
515ObjectPool() = default;
516
524ObjectPool(const ObjectPool&) = delete;
525
529ObjectPool& operator=(const ObjectPool&) = delete;
530
540~ObjectPool() = default;
541
578template <typename... Args>
579 [[nodiscard]] T* animate(Args&&... args) {
580auto sid = _next_shard();
581auto& shard = _shards[sid];
582
583 Block* block = shard.pop_free(); // hot path: lock-free
584if (!block) block = shard.allocate_from_backing(); // cold path: mutex, amortized
585
586 block->pool_id = static_cast<uint16_t>(sid);
587return std::construct_at(block->object(), std::forward<Args>(args)...);
588 }
589
622if (!obj) return;
623auto* block = Block::from_object(obj);
624 std::destroy_at(block->object());
625 _shards[block->pool_id].push_free(block); // hot path: lock-free
626 }
627
671for (auto& shard : _shards) {
672// Release all backing chunks to upstream first — this covers both blocks
673// on the free stack and any that were never recycled, since the backing
674// pool owns memory at the chunk level, not per block.
675 shard._backing.release();
676// Reset the free stack to null in O(1). Pointers it held are now
677// dangling (their backing chunks were just freed), so they must be
678// cleared before the allocator is used again.
679 shard._free_head.store(H{}, std::memory_order_relaxed);
680 }
681 }
682};
683
684} // namespace tf
ObjectPool(const ObjectPool &)=delete
disabled copy constructor
ObjectPool()=default
constructs the allocator with 2^LogSize empty shards
T * animate(Args &&... args)
constructs an object of type T in the pool and returns a pointer
Definition object_pool.hpp:579
~ObjectPool()=default
destroys the allocator and releases all backing memory to upstream
void recycle(T *obj)
destructs the object and returns its storage to the pool
Definition object_pool.hpp:621
ObjectPool & operator=(const ObjectPool &)=delete
disabled copy assignment operator
void release()
returns all recycled blocks and backing memory to the system allocator
Definition object_pool.hpp:670
taskflow namespace
Definition small_vector.hpp:20
pointer_type get_ptr() const noexcept
returns the stored block address
Definition object_pool.hpp:92
tf::TaggedHead128::TaggedHead128
TaggedHead128()=default
constructs a null, zero-tagged head
pointer_type ptr
block address (reinterpret-cast to/from ObjectBlock*)
Definition object_pool.hpp:60
tag_type tag
ABA version counter; incremented on every push and pop.
Definition object_pool.hpp:65
tf::TaggedHead128::TaggedHead128
TaggedHead128(pointer_type p, tag_type t) noexcept
constructs a head with an explicit block address and version counter
Definition object_pool.hpp:82
tf::TaggedHead128::pointer_type
uintptr_t pointer_type
block address representation
Definition object_pool.hpp:50
uintptr_t tag_type
ABA version counter representation.
Definition object_pool.hpp:55
tag_type get_tag() const noexcept
returns the ABA version counter
Definition object_pool.hpp:102
uint16_t tag_type
ABA version counter representation.
Definition object_pool.hpp:165
tf::TaggedHead64::pointer_type
uintptr_t pointer_type
block address representation
Definition object_pool.hpp:160
tf::TaggedHead64::TaggedHead64
TaggedHead64()=default
constructs a null, zero-tagged head
tag_type get_tag() const noexcept
returns the 16-bit ABA version counter
Definition object_pool.hpp:224
pointer_type get_ptr() const noexcept
returns the block address
Definition object_pool.hpp:214
static constexpr int PTR_BITS
bits reserved for the block address
Definition object_pool.hpp:170
tf::TaggedHead64::TaggedHead64
TaggedHead64(pointer_type p, tag_type t) noexcept
constructs a head with an explicit block address and version counter
Definition object_pool.hpp:203
uintptr_t bits
packed word: high TAG_BITS bits hold the version tag, low PTR_BITS bits hold the address
Definition object_pool.hpp:186
static constexpr int TAG_BITS
bits reserved for the version counter
Definition object_pool.hpp:175
static constexpr pointer_type PTR_MASK
mask isolating the address field
Definition object_pool.hpp:180