Back to Taskflow

Taskflow: A General

docs/object__pool_8hpp_source.html

4.1.020.3 KB
Original Source

| | Taskflow: A General-purpose Task-parallel Programming System |

Loading...

Searching...

No Matches

object_pool.hpp

1#pragma once

2

3#include <array>

4#include <cstddef>

5#include <cstdint>

6#include <memory_resource>

7#include <memory>

8#include <atomic>

9#include <thread>

10#include <utility>

11#include "os.hpp"

12

17

18namespace tf {

19

20// ----------------------------------------------------------------------------

21// TaggedHead128

22// ----------------------------------------------------------------------------

23

45struct TaggedHead128 {

46

50using pointer_type = uintptr_t;

51

55using tag_type = uintptr_t;

56

60pointer_type ptr {0};

61

65tag_type tag {0};

66

70TaggedHead128() = default;

71

82TaggedHead128(pointer_type p, tag_type t) noexcept : ptr{p}, tag{t} {}

83

92pointer_type get_ptr() const noexcept { return ptr; }

93

102tag_type get_tag() const noexcept { return tag; }

103};

104

105// ----------------------------------------------------------------------------

106// TaggedHead64

107// ----------------------------------------------------------------------------

108

151template <int PtrBits = TF_POINTER_BITS>

152struct TaggedHead64 {

153static_assert(64 - PtrBits >= 16,

154"TaggedHead64 requires at least 16 tag bits for ABA safety "

155"(PtrBits must be <= 48); use TaggedHead128 instead");

156

160using pointer_type = uintptr_t;

161

165using tag_type = uint16_t;

166

170static constexpr int PTR_BITS = PtrBits;

171

175static constexpr int TAG_BITS = 64 - PtrBits;

176

180static constexpr pointer_type PTR_MASK = (pointer_type{1} << PTR_BITS) - 1;

181

186 uintptr_t bits {0};

187

191TaggedHead64() = default;

192

203TaggedHead64(pointer_type p, tag_type t) noexcept

204 : bits{ (p & PTR_MASK) | (static_cast<uintptr_t>(t) << PTR_BITS) } {}

205

214pointer_type get_ptr() const noexcept { return bits & PTR_MASK; }

215

224tag_type get_tag() const noexcept { return static_cast<tag_type>(bits >> PTR_BITS); }

225};

226

227// ----------------------------------------------------------------------------

228// ObjectBlock

229// ----------------------------------------------------------------------------

230

251template <typename T>

252struct ObjectBlock {

253

257 uint16_t pool_id;

258

259// Intrusive free-list link. Must be atomic to avoid a formal data race

260// between push_free writing next_free and a concurrent pop_free reading it.

261//

262// The race arises from a *stale pointer* held by a thread that loses a CAS.

263// Consider this interleaving (free list: [b -> null]):

264//

265// Thread A (pop_free): loads _free_head = {b, 5} <-- cur.ptr = b

266// Thread B (pop_free): also loads {b, 5}, wins CAS first,

267// pops b, returns it to the caller

268// Thread C (push_free): caller recycles b;

269// C writes b->next_free <-- non-atomic WRITE

270// Thread A (pop_free): reads cur.ptr->next_free <-- non-atomic READ

271// (cur.ptr is the stale b!)

272//

273// Thread A's CAS will ultimately fail (the version tag changed), so there

274// is no algorithmic corruption — but the concurrent non-atomic read + write

275// on the same memory location is a formal data race and undefined behavior

276// per the C++ memory model. Making next_free atomic eliminates the race by

277// definition: two concurrent atomic accesses are never a data race.

278 std::atomic<ObjectBlock*> next_free {nullptr};

279

283alignas(T) std::byte storage[sizeof(T)];

284

288 T* object() noexcept {

289return std::launder(reinterpret_cast<T*>(storage));

290 }

291

295const T* object() const noexcept {

296return std::launder(reinterpret_cast<const T*>(storage));

297 }

298

306static ObjectBlock* from_object(T* obj) noexcept {

307return reinterpret_cast<ObjectBlock*>(

308reinterpret_cast<char*>(obj) - offsetof(ObjectBlock, storage)

309 );

310 }

311};

312

313// ----------------------------------------------------------------------------

314// ObjectPool

315// ----------------------------------------------------------------------------

316

407template <typename T, typename H = TaggedHead128, size_t LogSize = 5>

408class ObjectPool {

409

410static_assert(LogSize >= 1 && LogSize <= 15,

411"LogSize must be in [1, 15]");

412

413using Block = ObjectBlock<T>;

414

415static constexpr size_t NumPools = 1u << LogSize;

416

417struct alignas(TF_CACHELINE_SIZE) Shard {

418

419// Hot path: lock-free Treiber stack of recycled blocks.

420// _free_head sits on its own cache line (via the Shard alignas) so that

421// hot-path CAS does not invalidate the line holding _backing's mutex.

422 std::atomic<H> _free_head {H{}};

423

424// Cold path: backing allocator for fresh block memory.

425// alignas pushes _backing to the next cache line, separating it from the

426// hot _free_head above and preventing hot/cold false sharing.

427alignas(TF_CACHELINE_SIZE) std::pmr::synchronized_pool_resource _backing {

428 std::pmr::pool_options {

429 .max_blocks_per_chunk = 1024,

430 .largest_required_pool_block = sizeof(Block)

431 }

432 };

433

434void push_free(Block* b) noexcept {

435 H cur = _free_head.load(std::memory_order_relaxed);

436 H next;

437do {

438// relaxed: the release CAS below synchronises-with pop_free's acquire,

439// making this store visible to any thread that subsequently observes b

440// at the head of the list.

441 b->next_free.store(

442reinterpret_cast<Block*>(cur.get_ptr()), std::memory_order_relaxed);

443 next = H(

444reinterpret_cast<typename H::pointer_type>(b),

445static_cast<typename H::tag_type>(cur.get_tag() + 1)

446 );

447 } while (!_free_head.compare_exchange_weak(

448 cur, next,

449 std::memory_order_release, // publish next_free write to pop_free

450 std::memory_order_relaxed

451 ));

452 }

453

454 Block* pop_free() noexcept {

455 H cur = _free_head.load(std::memory_order_acquire);

456while (cur.get_ptr()) {

457auto* p = reinterpret_cast<Block*>(cur.get_ptr());

458// relaxed on next_free: the acquire on _free_head (either the load

459// above or the acquire failure ordering below) synchronises-with the

460// release CAS in push_free, so the next_free store that preceded it

461// is already visible to this thread.

462 Block* nx = p->next_free.load(std::memory_order_relaxed);

463 H next(

464reinterpret_cast<typename H::pointer_type>(nx),

465static_cast<typename H::tag_type>(cur.get_tag() + 1)

466 );

467if (_free_head.compare_exchange_weak(

468 cur, next,

469 std::memory_order_acquire, // success: synchronise with push_free

470 std::memory_order_acquire // failure: fresh cur must also synchronise

471 )) { // before the next next_free read

472return p;

473 }

474 }

475return nullptr;

476 }

477

478 Block* allocate_from_backing() {

479return static_cast<Block*>(

480 _backing.allocate(sizeof(Block), alignof(Block))

481 );

482 }

483

484void deallocate_to_backing(Block* b) {

485 _backing.deallocate(b, sizeof(Block), alignof(Block));

486 }

487 };

488

489 std::array<Shard, NumPools> _shards;

490

491// Returns the next shard index for this thread. The thread_local counter

492// is seeded once from the calling thread's ID hash, spreading different

493// threads across different starting shards with zero shared state.

494// Subsequent calls are a bare local increment — no atomic, no cache-line

495// traffic.

496//

497// If thread_local is broken (e.g. MSVC DLL with improper TLS), the counter

498// degrades to a single shared value and causes contention, but shard

499// selection remains correct.

500static size_t _next_shard() noexcept {

501thread_local size_t counter =

502 std::hash<std::thread::id>{}(std::this_thread::get_id());

503return counter++ & (NumPools - 1);

504 }

505

506public:

507

515ObjectPool() = default;

516

524ObjectPool(const ObjectPool&) = delete;

525

529ObjectPool& operator=(const ObjectPool&) = delete;

530

540~ObjectPool() = default;

541

578template <typename... Args>

579 [[nodiscard]] T* animate(Args&&... args) {

580auto sid = _next_shard();

581auto& shard = _shards[sid];

582

583 Block* block = shard.pop_free(); // hot path: lock-free

584if (!block) block = shard.allocate_from_backing(); // cold path: mutex, amortized

585

586 block->pool_id = static_cast<uint16_t>(sid);

587return std::construct_at(block->object(), std::forward<Args>(args)...);

588 }

589

621void recycle(T* obj) {

622if (!obj) return;

623auto* block = Block::from_object(obj);

624 std::destroy_at(block->object());

625 _shards[block->pool_id].push_free(block); // hot path: lock-free

626 }

627

670void release() {

671for (auto& shard : _shards) {

672// Release all backing chunks to upstream first — this covers both blocks

673// on the free stack and any that were never recycled, since the backing

674// pool owns memory at the chunk level, not per block.

675 shard._backing.release();

676// Reset the free stack to null in O(1). Pointers it held are now

677// dangling (their backing chunks were just freed), so they must be

678// cleared before the allocator is used again.

679 shard._free_head.store(H{}, std::memory_order_relaxed);

680 }

681 }

682};

683

684} // namespace tf

tf::ObjectPool::ObjectPool

ObjectPool(const ObjectPool &)=delete

disabled copy constructor

tf::ObjectPool::ObjectPool

ObjectPool()=default

constructs the allocator with 2^LogSize empty shards

tf::ObjectPool::animate

T * animate(Args &&... args)

constructs an object of type T in the pool and returns a pointer

Definition object_pool.hpp:579

tf::ObjectPool::~ObjectPool

~ObjectPool()=default

destroys the allocator and releases all backing memory to upstream

tf::ObjectPool::recycle

void recycle(T *obj)

destructs the object and returns its storage to the pool

Definition object_pool.hpp:621

tf::ObjectPool::operator=

ObjectPool & operator=(const ObjectPool &)=delete

disabled copy assignment operator

tf::ObjectPool::release

void release()

returns all recycled blocks and backing memory to the system allocator

Definition object_pool.hpp:670

tf

taskflow namespace

Definition small_vector.hpp:20

tf::TaggedHead128::get_ptr

pointer_type get_ptr() const noexcept

returns the stored block address

Definition object_pool.hpp:92

tf::TaggedHead128::TaggedHead128

TaggedHead128()=default

constructs a null, zero-tagged head

tf::TaggedHead128::ptr

pointer_type ptr

block address (reinterpret-cast to/from ObjectBlock*)

Definition object_pool.hpp:60

tf::TaggedHead128::tag

tag_type tag

ABA version counter; incremented on every push and pop.

Definition object_pool.hpp:65

tf::TaggedHead128::TaggedHead128

TaggedHead128(pointer_type p, tag_type t) noexcept

constructs a head with an explicit block address and version counter

Definition object_pool.hpp:82

tf::TaggedHead128::pointer_type

uintptr_t pointer_type

block address representation

Definition object_pool.hpp:50

tf::TaggedHead128::tag_type

uintptr_t tag_type

ABA version counter representation.

Definition object_pool.hpp:55

tf::TaggedHead128::get_tag

tag_type get_tag() const noexcept

returns the ABA version counter

Definition object_pool.hpp:102

tf::TaggedHead64::tag_type

uint16_t tag_type

ABA version counter representation.

Definition object_pool.hpp:165

tf::TaggedHead64::pointer_type

uintptr_t pointer_type

block address representation

Definition object_pool.hpp:160

tf::TaggedHead64::TaggedHead64

TaggedHead64()=default

constructs a null, zero-tagged head

tf::TaggedHead64::get_tag

tag_type get_tag() const noexcept

returns the 16-bit ABA version counter

Definition object_pool.hpp:224

tf::TaggedHead64::get_ptr

pointer_type get_ptr() const noexcept

returns the block address

Definition object_pool.hpp:214

tf::TaggedHead64::PTR_BITS

static constexpr int PTR_BITS

bits reserved for the block address

Definition object_pool.hpp:170

tf::TaggedHead64::TaggedHead64

TaggedHead64(pointer_type p, tag_type t) noexcept

constructs a head with an explicit block address and version counter

Definition object_pool.hpp:203

tf::TaggedHead64::bits

uintptr_t bits

packed word: high TAG_BITS bits hold the version tag, low PTR_BITS bits hold the address

Definition object_pool.hpp:186

tf::TaggedHead64::TAG_BITS

static constexpr int TAG_BITS

bits reserved for the version counter

Definition object_pool.hpp:175

tf::TaggedHead64::PTR_MASK

static constexpr pointer_type PTR_MASK

mask isolating the address field

Definition object_pool.hpp:180