[QUEUED scylladb next] Merge 'dirty_memory_manager: simplify region_group' from Avi Kivity

0 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Oct 3, 2022, 6:26:55 AM10/3/22
to scylladb-dev@googlegroups.com, Botond Dénes
From: Botond Dénes <bde...@scylladb.com>
Committer: Botond Dénes <bde...@scylladb.com>
Branch: next

Merge 'dirty_memory_manager: simplify region_group' from Avi Kivity

region_group evolved as a tree, each node of which contains some
regions (memtables). Each node has some constraints on memory, and
can start flushing and/or stop allocation into its memtables and those
below it when those constraints are violated.

Today, the tree has exactly two nodes, only one of which can hold memtables.
However, all the complexity of the tree remains.

This series applies some mechanical code transformations that remove
the tree structure and all the excess functionality, leaving a much simpler
structure behind.

Before:
- a tree of region_group objects
- each with two parameters: soft limit and hard limit
- but only two instances ever instantiated
After:
- a single region_group object
- with three parameters - two from the bottom instance, one from the top instance

Closes #11570

* github.com:scylladb/scylladb:
dirty_memory_manager: move third memory threshold parameter of region_group constructor to reclaim_config
dirty_memory_manager: simplify region_group::update()
dirty_memory_manager: fold region_group::notify_hard_pressure_relieved into its callers
dirty_memory_manager: clean up region_group::do_update_hard_and_check_relief()
dirty_memory_manager: make do_update_hard_and_check_relief() a member of region_group
dirty_memory_manager: remove accessors around region_group::_under_hard_pressure
dirty_memory_manager: merge memory_hard_limit into region_group
dirty_memory_manager: rename members in memory_hard_limit
dirty_memory_manager: fold do_update() into region_group::update()
dirty_memory_manager: simplify memory_hard_limit's do_update
dirty_memory_manager: drop soft limit / soft pressure members in memory_hard_limit
dirty_memory_manager: de-template do_update(region_group_or_memory_hard_limit)
dirty_memory_manager: adjust soft_limit threshold check
dirty_memory_manager: drop memory_hard_limit::_name
dirty_memory_manager: simplify memory_hard_limit configuration
dirty_memory_manager: fold region_group_reclaimer into {memory_hard_limit,region_group}
dirty_memory_manager: stop inheriting from region_group_reclaimer
dirty_memory_manager: test: unwrap region_group_reclaimer
dirty_memory_manager: change region_group_reclaimer configuration to a struct
dirty_memory_manager: convert region_group_reclaimer to callbacks
dirty_memory_manager: consolidate region_group_reclaimer constructors
dirty_memory_manager: rename {memory_hard_limit,region_group}::notify_relief
dirty_memory_manager: drop unused parameter to memory_hard_limit constructor
dirty_memory_manager: drop memory_hard_limit::shutdown()
dirty_memory_manager: split region_group hierarchy into separate classes
dirty_memory_manager: extract code block from region_group::update
dirty_memory_manager: move more allocation_queue functions out of region_group
dirty_memory_manager: move some allocation queue related function definitions outside class scope
dirty_memory_manager: move region_group::allocating_function and related classes to new class allocation_queue
dirty_memory_manager: remove support for multiple subgroups

---
diff --git a/dirty_memory_manager.cc b/dirty_memory_manager.cc
--- a/dirty_memory_manager.cc
+++ b/dirty_memory_manager.cc
@@ -44,8 +44,6 @@ region_evictable_occupancy_ascending_less_comparator::operator()(size_tracked_re
return r1->evictable_occupancy().total_space() < r2->evictable_occupancy().total_space();
}

-region_group_reclaimer region_group::no_reclaimer;
-
uint64_t region_group::top_region_evictable_space() const noexcept {
return _regions.empty() ? 0 : _regions.top()->evictable_occupancy().total_space();
}
@@ -54,18 +52,6 @@ dirty_memory_manager_logalloc::size_tracked_region* region_group::get_largest_re
return _regions.empty() ? nullptr : _regions.top();
}

-void
-region_group::add(region_group* child) {
- _subgroups.push_back(child);
- update(child->_total_memory);
-}
-
-void
-region_group::del(region_group* child) {
- _subgroups.erase(std::find(_subgroups.begin(), _subgroups.end(), child));
- update(-child->_total_memory);
-}
-
void
region_group::add(region* child_r) {
auto child = static_cast<size_tracked_region*>(child_r);
@@ -103,9 +89,15 @@ region_group::moved(region* old_address, region* new_address) {

bool
region_group::execution_permitted() noexcept {
- return do_for_each_parent(this, [] (auto rg) noexcept {
- return rg->under_pressure() ? stop_iteration::yes : stop_iteration::no;
- }) == nullptr;
+ return !(this->under_pressure()
+ || (_under_hard_pressure));
+}
+
+void
+allocation_queue::execute_one() {
+ auto req = std::move(_blocked_requests.front());
+ _blocked_requests.pop_front();
+ req->allocate();
}

future<>
@@ -118,9 +110,7 @@ region_group::start_releaser(scheduling_group deferred_work_sg) {
}

if (!_blocked_requests.empty() && execution_permitted()) {
- auto req = std::move(_blocked_requests.front());
- _blocked_requests.pop_front();
- req->allocate();
+ _blocked_requests.execute_one();
return make_ready_future<stop_iteration>(stop_iteration::no);
} else {
// Block reclaiming to prevent signal() from being called by reclaimer inside wait()
@@ -135,59 +125,79 @@ region_group::start_releaser(scheduling_group deferred_work_sg) {
});
}

-region_group::region_group(sstring name, region_group *parent,
- region_group_reclaimer& reclaimer, scheduling_group deferred_work_sg)
- : _parent(parent)
- , _reclaimer(reclaimer)
+region_group::region_group(sstring name,
+ reclaim_config cfg, scheduling_group deferred_work_sg)
+ : _cfg(std::move(cfg))
, _blocked_requests(on_request_expiry{std::move(name)})
, _releaser(reclaimer_can_block() ? start_releaser(deferred_work_sg) : make_ready_future<>())
{
- if (_parent) {
- _parent->add(this);
- }
}

bool region_group::reclaimer_can_block() const {
- return _reclaimer.throttle_threshold() != std::numeric_limits<size_t>::max();
+ return throttle_threshold() != std::numeric_limits<size_t>::max();
}

-void region_group::notify_relief() {
+void region_group::notify_pressure_relieved() {
_relief.signal();
- for (region_group* child : _subgroups) {
- child->notify_relief();
+}
+
+bool region_group::do_update_hard_and_check_relief(ssize_t delta) {
+ _hard_total_memory += delta;
+
+ if (_hard_total_memory > hard_throttle_threshold()) {
+ _under_hard_pressure = true;
+ } else if (_under_hard_pressure) {
+ _under_hard_pressure = false;
+ return true;
+ }
+ return false;
+}
+
+void region_group::update_hard(ssize_t delta) {
+ if (do_update_hard_and_check_relief(delta)) {
+ notify_pressure_relieved();
}
}

void region_group::update(ssize_t delta) {
// Most-enclosing group which was relieved.
- region_group* top_relief = nullptr;
+ bool relief = false;

- do_for_each_parent(this, [&top_relief, delta] (region_group* rg) mutable {
- rg->_total_memory += delta;
+ _total_memory += delta;

- if (rg->_total_memory >= rg->_reclaimer.soft_limit_threshold()) {
- rg->_reclaimer.notify_soft_pressure();
- } else {
- rg->_reclaimer.notify_soft_relief();
- }
+ if (_total_memory > soft_limit_threshold()) {
+ notify_soft_pressure();
+ } else {
+ notify_soft_relief();
+ }

- if (rg->_total_memory > rg->_reclaimer.throttle_threshold()) {
- rg->_reclaimer.notify_pressure();
- } else if (rg->_reclaimer.under_pressure()) {
- rg->_reclaimer.notify_relief();
- top_relief = rg;
- }
+ if (_total_memory > throttle_threshold()) {
+ notify_pressure();
+ } else if (under_pressure()) {
+ notify_relief();
+ relief = true;
+ }

- return stop_iteration::no;
- });
+ relief |= do_update_hard_and_check_relief(delta);

- if (top_relief) {
- top_relief->notify_relief();
+ if (relief) {
+ notify_pressure_relieved();
}
}

-void region_group::on_request_expiry::operator()(std::unique_ptr<allocating_function>& func) noexcept {
+future<>
+region_group::shutdown() noexcept {
+ _shutdown_requested = true;
+ _relief.signal();
+ return std::move(_releaser);
+}
+
+void allocation_queue::on_request_expiry::operator()(std::unique_ptr<allocating_function>& func) noexcept {
func->fail(std::make_exception_ptr(blocked_requests_timed_out_error{_name}));
}

+allocation_queue::allocation_queue(allocation_queue::on_request_expiry on_expiry)
+ : _blocked_requests(std::move(on_expiry)) {
+}
+
}
diff --git a/dirty_memory_manager.hh b/dirty_memory_manager.hh
--- a/dirty_memory_manager.hh
+++ b/dirty_memory_manager.hh
@@ -39,96 +39,41 @@ public:
std::optional<region_heap::handle_type> _heap_handle;
};

+// Users of a region_group configure reclaim with a soft limit (where reclaim starts, but allocation
+// can still continue), a hard limit (where allocation cannot proceed until reclaim makes progress),
+// and callbacks that are called when reclaiming is required and no longer necessary.
//
-// Users of a region_group can pass an instance of the class region_group_reclaimer, and specialize
-// its methods start_reclaiming() and stop_reclaiming(). Those methods will be called when the LSA
+// These callbacks will be called when the LSA
// see relevant changes in the memory pressure conditions for this region_group. By specializing
// those methods - which are a nop by default - the callers can take action to aid the LSA in
// alleviating pressure.
-class region_group_reclaimer {
-protected:
- size_t _threshold;
- size_t _soft_limit;
- bool _under_pressure = false;
- bool _under_soft_pressure = false;
- // The following restrictions apply to implementations of start_reclaiming() and stop_reclaiming():
- //
- // - must not use any region or region_group objects, because they're invoked synchronously
- // with operations on those.
- //
- // - must be noexcept, because they're called on the free path.
- //
- // - the implementation may be called synchronously with any operation
- // which allocates memory, because these are called by memory reclaimer.
- // In particular, the implementation should not depend on memory allocation
- // because that may fail when in reclaiming context.
- //
- virtual void start_reclaiming() noexcept {}
- virtual void stop_reclaiming() noexcept {}
-public:
- bool under_pressure() const noexcept {
- return _under_pressure;
- }
-
- bool over_soft_limit() const noexcept {
- return _under_soft_pressure;
- }

- void notify_soft_pressure() noexcept {
- if (!_under_soft_pressure) {
- _under_soft_pressure = true;
- start_reclaiming();
- }
- }
-
- void notify_soft_relief() noexcept {
- if (_under_soft_pressure) {
- _under_soft_pressure = false;
- stop_reclaiming();
- }
- }
-
- void notify_pressure() noexcept {
- _under_pressure = true;
- }
-
- void notify_relief() noexcept {
- _under_pressure = false;
- }
-
- region_group_reclaimer() noexcept
- : _threshold(std::numeric_limits<size_t>::max()), _soft_limit(std::numeric_limits<size_t>::max()) {}
- region_group_reclaimer(size_t threshold) noexcept
- : _threshold(threshold), _soft_limit(threshold) {}
- region_group_reclaimer(size_t threshold, size_t soft) noexcept
- : _threshold(threshold), _soft_limit(soft) {
- assert(_soft_limit <= _threshold);
- }
+// The following restrictions apply to implementations of start_reclaiming() and stop_reclaiming():
+//
+// - must not use any region or region_group objects, because they're invoked synchronously
+// with operations on those.
+//
+// - must be noexcept, because they're called on the free path.
+//
+// - the implementation may be called synchronously with any operation
+// which allocates memory, because these are called by memory reclaimer.
+// In particular, the implementation should not depend on memory allocation
+// because that may fail when in reclaiming context.
+//

- virtual ~region_group_reclaimer() {}
+using reclaim_start_callback = noncopyable_function<void () noexcept>;
+using reclaim_stop_callback = noncopyable_function<void () noexcept>;

- size_t throttle_threshold() const noexcept {
- return _threshold;
- }
- size_t soft_limit_threshold() const noexcept {
- return _soft_limit;
- }
+struct reclaim_config {
+ size_t hard_limit = std::numeric_limits<size_t>::max();
+ size_t soft_limit = hard_limit;
+ size_t absolute_hard_limit = std::numeric_limits<size_t>::max();
+ reclaim_start_callback start_reclaiming = [] () noexcept {};
+ reclaim_stop_callback stop_reclaiming = [] () noexcept {};
};

-// Groups regions for the purpose of statistics. Can be nested.
-// Interfaces to regions via region_listener
-class region_group : public region_listener {
- static region_group_reclaimer no_reclaimer;
-
- using region_heap = dirty_memory_manager_logalloc::region_heap;
-
- region_group* _parent = nullptr;
- size_t _total_memory = 0;
- region_group_reclaimer& _reclaimer;
-
- std::vector<region_group*> _subgroups;
- region_heap _regions;
-
+class allocation_queue {
+public:
struct allocating_function {
virtual ~allocating_function() = default;
virtual void allocate() = 0;
@@ -169,7 +114,7 @@ class region_group : public region_listener {
explicit on_request_expiry(sstring name) : _name(std::move(name)) {}
void operator()(std::unique_ptr<allocating_function>&) noexcept;
};
-
+private:
// It is a more common idiom to just hold the promises in the circular buffer and make them
// ready. However, in the time between the promise being made ready and the function execution,
// it could be that our memory usage went up again. To protect against that, we have to recheck
@@ -184,13 +129,111 @@ class region_group : public region_listener {

uint64_t _blocked_requests_counter = 0;

+public:
+ explicit allocation_queue(on_request_expiry on_expiry);
+
+ void execute_one();
+
+ void push_back(std::unique_ptr<allocating_function>, db::timeout_clock::time_point timeout);
+
+ size_t blocked_requests() const noexcept;
+
+ uint64_t blocked_requests_counter() const noexcept;
+
+ size_t size() const noexcept { return _blocked_requests.size(); }
+
+ bool empty() const noexcept { return _blocked_requests.empty(); }
+};
+
+// Groups regions for the purpose of statistics. Can be nested.
+// Interfaces to regions via region_listener
+class region_group : public region_listener {
+ reclaim_config _cfg;
+
+ bool _under_pressure = false;
+ bool _under_soft_pressure = false;
+
+ region_group* _subgroup = nullptr;
+
+ size_t _hard_total_memory = 0;
+
+ bool _under_hard_pressure = false;
+
+ size_t hard_throttle_threshold() const noexcept {
+ return _cfg.absolute_hard_limit;
+ }
+public:
+ void update_hard(ssize_t delta);
+
+ size_t hard_memory_used() const noexcept {
+ return _hard_total_memory;
+ }
+
+private:
+ bool do_update_hard_and_check_relief(ssize_t delta);
+
+public:
+ bool under_pressure() const noexcept {
+ return _under_pressure;
+ }
+
+ bool over_soft_limit() const noexcept {
+ return _under_soft_pressure;
+ }
+
+ void notify_soft_pressure() noexcept {
+ if (!_under_soft_pressure) {
+ _under_soft_pressure = true;
+ _cfg.start_reclaiming();
+ }
+ }
+
+private:
+ void notify_soft_relief() noexcept {
+ if (_under_soft_pressure) {
+ _under_soft_pressure = false;
+ _cfg.stop_reclaiming();
+ }
+ }
+
+ void notify_pressure() noexcept {
+ _under_pressure = true;
+ }
+
+ void notify_relief() noexcept {
+ _under_pressure = false;
+ }
+
+public:
+ size_t throttle_threshold() const noexcept {
+ return _cfg.hard_limit;
+ }
+private:
+ size_t soft_limit_threshold() const noexcept {
+ return _cfg.soft_limit;
+ }
+ using region_heap = dirty_memory_manager_logalloc::region_heap;
+
+ size_t _total_memory = 0;
+
+ region_heap _regions;
+
+ using allocating_function = allocation_queue::allocating_function;
+
+ template <typename Func>
+ using concrete_allocating_function = allocation_queue::concrete_allocating_function<Func>;
+
+ using on_request_expiry = allocation_queue::on_request_expiry;
+
+ allocation_queue _blocked_requests;
+
condition_variable _relief;
future<> _releaser;
bool _shutdown_requested = false;

bool reclaimer_can_block() const;
future<> start_releaser(scheduling_group deferered_work_sg);
- void notify_relief();
+ void notify_pressure_relieved();
friend void region_group_binomial_group_sanity_check(const region_group::region_heap& bh);
private: // from region_listener
virtual void moved(region* old_address, region* new_address) override;
@@ -204,11 +247,7 @@ public:
// The deferred_work_sg parameter specifies a scheduling group in which to run allocations
// (given to run_when_memory_available()) when they must be deferred due to lack of memory
// at the time the call to run_when_memory_available() was made.
- region_group(sstring name = "(unnamed region_group)",
- region_group_reclaimer& reclaimer = no_reclaimer,
- scheduling_group deferred_work_sg = default_scheduling_group()) noexcept
- : region_group(std::move(name), nullptr, reclaimer, deferred_work_sg) {}
- region_group(sstring name, region_group* parent, region_group_reclaimer& reclaimer = no_reclaimer,
+ region_group(sstring name = "(unnamed region group)", reclaim_config cfg = {},
scheduling_group deferred_work_sg = default_scheduling_group());
region_group(region_group&& o) = delete;
region_group(const region_group&) = delete;
@@ -218,9 +257,6 @@ public:
if (reclaimer_can_block()) {
assert(_shutdown_requested);
}
- if (_parent) {
- _parent->del(this);
- }
}
region_group& operator=(const region_group&) = delete;
region_group& operator=(region_group&&) = delete;
@@ -273,22 +309,7 @@ public:
// We disallow future-returning functions here, because otherwise memory may be available
// when we start executing it, but no longer available in the middle of the execution.
requires (!is_future<std::invoke_result_t<Func>>::value)
- futurize_t<std::result_of_t<Func()>> run_when_memory_available(Func&& func, db::timeout_clock::time_point timeout) {
- auto blocked_at = do_for_each_parent(this, [] (auto rg) {
- return (rg->_blocked_requests.empty() && !rg->under_pressure()) ? stop_iteration::no : stop_iteration::yes;
- });
-
- if (!blocked_at) {
- return futurize_invoke(func);
- }
-
- auto fn = std::make_unique<concrete_allocating_function<Func>>(std::forward<Func>(func));
- auto fut = fn->get_future();
- _blocked_requests.push_back(std::move(fn), timeout);
- ++_blocked_requests_counter;
-
- return fut;
- }
+ futurize_t<std::result_of_t<Func()>> run_when_memory_available(Func&& func, db::timeout_clock::time_point timeout);

// returns a pointer to the largest region (in terms of memory usage) that sits below this
// region group. This includes the regions owned by this region group as well as all of its
@@ -297,55 +318,23 @@ public:

// Shutdown is mandatory for every user who has set a threshold
// Can be called at most once.
- future<> shutdown() noexcept {
- _shutdown_requested = true;
- _relief.signal();
- return std::move(_releaser);
- }
+ future<> shutdown() noexcept;

- size_t blocked_requests() const noexcept {
- return _blocked_requests.size();
- }
+ size_t blocked_requests() const noexcept;

- uint64_t blocked_requests_counter() const noexcept {
- return _blocked_requests_counter;
- }
+ uint64_t blocked_requests_counter() const noexcept;
private:
// Returns true if and only if constraints of this group are not violated.
// That's taking into account any constraints imposed by enclosing (parent) groups.
bool execution_permitted() noexcept;

- // Executes the function func for each region_group upwards in the hierarchy, starting with the
- // parameter node. The function func may return stop_iteration::no, in which case it proceeds to
- // the next ancestor in the hierarchy, or stop_iteration::yes, in which case it stops at this
- // level.
- //
- // This method returns a pointer to the region_group that was processed last, or nullptr if the
- // root was reached.
- template <typename Func>
- static region_group* do_for_each_parent(region_group *node, Func&& func) noexcept(noexcept(func(node))) {
- auto rg = node;
- while (rg) {
- if (func(rg) == stop_iteration::yes) {
- return rg;
- }
- rg = rg->_parent;
- }
- return nullptr;
- }
-
- inline bool under_pressure() const noexcept {
- return _reclaimer.under_pressure();
- }
-
uint64_t top_region_evictable_space() const noexcept;

- void add(region_group* child);
- void del(region_group* child);
virtual void add(region* child) override; // from region_listener
virtual void del(region* child) override; // from region_listener

friend class test_region_group;
+ friend class memory_hard_limit;
};

}
@@ -396,16 +385,12 @@ public:
future<flush_permit> reacquire_sstable_write_permit() &&;
};

-class dirty_memory_manager: public dirty_memory_manager_logalloc::region_group_reclaimer {
- dirty_memory_manager_logalloc::region_group_reclaimer _real_dirty_reclaimer;
+class dirty_memory_manager {
// We need a separate boolean, because from the LSA point of view, pressure may still be
// mounting, in which case the pressure flag could be set back on if we force it off.
bool _db_shutdown_requested = false;

replica::database* _db;
- // The _real_region_group protects against actual dirty memory usage hitting the maximum. Usage
- // for this group is the real dirty memory usage of the system.
- dirty_memory_manager_logalloc::region_group _real_region_group;
// The _virtual_region_group accounts for virtual memory usage. It is defined as the real dirty
// memory usage minus bytes that were already written to disk.
dirty_memory_manager_logalloc::region_group _virtual_region_group;
@@ -430,10 +415,10 @@ class dirty_memory_manager: public dirty_memory_manager_logalloc::region_group_r
future<> flush_when_needed();

future<> _waiting_flush;
- virtual void start_reclaiming() noexcept override;
+ void start_reclaiming() noexcept;

bool has_pressure() const noexcept {
- return over_soft_limit();
+ return _virtual_region_group.over_soft_limit();
}

unsigned _extraneous_flushes = 0;
@@ -477,10 +462,12 @@ public:
// We then set the soft limit to 80 % of the virtual dirty hard limit, which is equal to 40 % of
// the user-supplied threshold.
dirty_memory_manager(replica::database& db, size_t threshold, double soft_limit, scheduling_group deferred_work_sg);
- dirty_memory_manager() : dirty_memory_manager_logalloc::region_group_reclaimer()
- , _db(nullptr)
- , _real_region_group("memtable", _real_dirty_reclaimer)
- , _virtual_region_group("memtable (virtual)", &_real_region_group, *this)
+ dirty_memory_manager()
+ : _db(nullptr)
+ , _virtual_region_group("memtable (virtual)",
+ dirty_memory_manager_logalloc::reclaim_config{
+ .start_reclaiming = std::bind_front(&dirty_memory_manager::start_reclaiming, this),
+ })
, _flush_serializer(1)
, _waiting_flush(make_ready_future<>()) {}

@@ -497,33 +484,41 @@ public:
}

void revert_potentially_cleaned_up_memory(logalloc::region* from, int64_t delta) {
- _real_region_group.update(-delta);
+ _virtual_region_group.update_hard(-delta);
_virtual_region_group.update(delta);
_dirty_bytes_released_pre_accounted -= delta;
}

void account_potentially_cleaned_up_memory(logalloc::region* from, int64_t delta) {
- _real_region_group.update(delta);
+ _virtual_region_group.update_hard(delta);
_virtual_region_group.update(-delta);
_dirty_bytes_released_pre_accounted += delta;
}

void pin_real_dirty_memory(int64_t delta) {
- _real_region_group.update(delta);
+ _virtual_region_group.update_hard(delta);
}

void unpin_real_dirty_memory(int64_t delta) {
- _real_region_group.update(-delta);
+ _virtual_region_group.update_hard(-delta);
}

size_t real_dirty_memory() const noexcept {
- return _real_region_group.memory_used();
+ return _virtual_region_group.hard_memory_used();
}

size_t virtual_dirty_memory() const noexcept {
return _virtual_region_group.memory_used();
}

+ void notify_soft_pressure() {
+ _virtual_region_group.notify_soft_pressure();
+ }
+
+ size_t throttle_threshold() const {
+ return _virtual_region_group.throttle_threshold();
+ }
+
future<> flush_one(replica::memtable_list& cf, flush_permit&& permit) noexcept;

future<flush_permit> get_flush_permit() noexcept {
@@ -559,5 +554,58 @@ private:
friend class flush_permit;
};

+namespace dirty_memory_manager_logalloc {
+
+template <typename Func>
+// We disallow future-returning functions here, because otherwise memory may be available
+// when we start executing it, but no longer available in the middle of the execution.
+requires (!is_future<std::invoke_result_t<Func>>::value)
+futurize_t<std::result_of_t<Func()>>
+region_group::run_when_memory_available(Func&& func, db::timeout_clock::time_point timeout) {
+ auto rg = this;
+ bool blocked =
+ !(rg->_blocked_requests.empty() && !rg->under_pressure());
+ if (!blocked) {
+ blocked = _under_hard_pressure;
+ }
+
+ if (!blocked) {
+ return futurize_invoke(func);
+ }
+
+ auto fn = std::make_unique<concrete_allocating_function<Func>>(std::forward<Func>(func));
+ auto fut = fn->get_future();
+ _blocked_requests.push_back(std::move(fn), timeout);
+
+ return fut;
+}
+
+inline
+void
+allocation_queue::push_back(std::unique_ptr<allocation_queue::allocating_function> f, db::timeout_clock::time_point timeout) {
+ _blocked_requests.push_back(std::move(f));
+ ++_blocked_requests_counter;
+}
+
+inline
+size_t
+region_group::blocked_requests() const noexcept {
+ return _blocked_requests.size();
+}
+
+inline
+uint64_t
+allocation_queue::blocked_requests_counter() const noexcept {
+ return _blocked_requests_counter;
+}
+
+inline
+uint64_t
+region_group::blocked_requests_counter() const noexcept {
+ return _blocked_requests.blocked_requests_counter();
+}
+
+}
+
extern thread_local dirty_memory_manager default_dirty_memory_manager;

diff --git a/replica/database.cc b/replica/database.cc
--- a/replica/database.cc
+++ b/replica/database.cc
@@ -448,11 +448,13 @@ void backlog_controller::update_controller(float shares) {


dirty_memory_manager::dirty_memory_manager(replica::database& db, size_t threshold, double soft_limit, scheduling_group deferred_work_sg)
- : dirty_memory_manager_logalloc::region_group_reclaimer(threshold / 2, threshold * soft_limit / 2)
- , _real_dirty_reclaimer(threshold)
- , _db(&db)
- , _real_region_group("memtable", _real_dirty_reclaimer, deferred_work_sg)
- , _virtual_region_group("memtable (virtual)", &_real_region_group, *this, deferred_work_sg)
+ : _db(&db)
+ , _virtual_region_group("memtable (virtual)", dirty_memory_manager_logalloc::reclaim_config{
+ .hard_limit = threshold / 2,
+ .soft_limit = threshold * soft_limit / 2,
+ .absolute_hard_limit = threshold,
+ .start_reclaiming = std::bind_front(&dirty_memory_manager::start_reclaiming, this)
+ }, deferred_work_sg)
, _flush_serializer(1)
, _waiting_flush(flush_when_needed()) {}

@@ -1730,9 +1732,7 @@ future<> dirty_memory_manager::shutdown() {
_db_shutdown_requested = true;
_should_flush.signal();
return std::move(_waiting_flush).then([this] {
- return _virtual_region_group.shutdown().then([this] {
- return _real_region_group.shutdown();
- });
+ return _virtual_region_group.shutdown();
});
}

diff --git a/scylla-gdb.py b/scylla-gdb.py
--- a/scylla-gdb.py
+++ b/scylla-gdb.py
@@ -1731,7 +1731,7 @@ def __init__(self, ref):
self.ref = ref

def real_dirty(self):
- return int(self.ref['_real_region_group']['_total_memory'])
+ return int(self.ref['_virtual_region_group']['_hard_total_memory'])

def virt_dirty(self):
return int(self.ref['_virtual_region_group']['_total_memory'])
diff --git a/test/boost/dirty_memory_manager_test.cc b/test/boost/dirty_memory_manager_test.cc
--- a/test/boost/dirty_memory_manager_test.cc
+++ b/test/boost/dirty_memory_manager_test.cc
@@ -50,15 +50,13 @@ using namespace dirty_memory_manager_logalloc;
SEASTAR_TEST_CASE(test_region_groups) {
return seastar::async([] {
region_group just_four;
- region_group all;
- region_group one_and_two("one_and_two", &all);
+ region_group one_and_two("one_and_two");

auto one = std::make_unique<size_tracked_region>();
one->listen(&one_and_two);
auto two = std::make_unique<size_tracked_region>();
two->listen(&one_and_two);
auto three = std::make_unique<size_tracked_region>();
- three->listen(&all);
auto four = std::make_unique<size_tracked_region>();
four->listen(&just_four);
auto five = std::make_unique<size_tracked_region>();
@@ -75,7 +73,7 @@ SEASTAR_TEST_CASE(test_region_groups) {
BOOST_REQUIRE_GE(ssize_t(one->occupancy().used_space()), ssize_t(one_count * sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(one->occupancy().total_space()), ssize_t(one->occupancy().used_space()));
BOOST_REQUIRE_EQUAL(one_and_two.memory_used(), one->occupancy().total_space());
- BOOST_REQUIRE_EQUAL(all.memory_used(), one->occupancy().total_space());
+ BOOST_REQUIRE_EQUAL(one_and_two.hard_memory_used(), one->occupancy().total_space());

constexpr size_t two_count = 8 * base_count;
std::vector<managed_ref<int>> two_objs;
@@ -87,7 +85,7 @@ SEASTAR_TEST_CASE(test_region_groups) {
BOOST_REQUIRE_GE(ssize_t(two->occupancy().used_space()), ssize_t(two_count * sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(two->occupancy().total_space()), ssize_t(two->occupancy().used_space()));
BOOST_REQUIRE_EQUAL(one_and_two.memory_used(), one->occupancy().total_space() + two->occupancy().total_space());
- BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used());
+ BOOST_REQUIRE_EQUAL(one_and_two.hard_memory_used(), one_and_two.memory_used());

constexpr size_t three_count = 32 * base_count;
std::vector<managed_ref<int>> three_objs;
@@ -98,7 +96,7 @@ SEASTAR_TEST_CASE(test_region_groups) {
});
BOOST_REQUIRE_GE(ssize_t(three->occupancy().used_space()), ssize_t(three_count * sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(three->occupancy().total_space()), ssize_t(three->occupancy().used_space()));
- BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used() + three->occupancy().total_space());
+ BOOST_REQUIRE_EQUAL(one_and_two.hard_memory_used(), one_and_two.memory_used());

constexpr size_t four_count = 4 * base_count;
std::vector<managed_ref<int>> four_objs;
@@ -122,27 +120,27 @@ SEASTAR_TEST_CASE(test_region_groups) {
three->merge(*four);
BOOST_REQUIRE_GE(ssize_t(three->occupancy().used_space()), ssize_t((three_count + four_count)* sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(three->occupancy().total_space()), ssize_t(three->occupancy().used_space()));
- BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used() + three->occupancy().total_space());
+ BOOST_REQUIRE_EQUAL(one_and_two.hard_memory_used(), one_and_two.memory_used());
BOOST_REQUIRE_EQUAL(just_four.memory_used(), 0);

three->merge(*five);
BOOST_REQUIRE_GE(ssize_t(three->occupancy().used_space()), ssize_t((three_count + four_count)* sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(three->occupancy().total_space()), ssize_t(three->occupancy().used_space()));
- BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used() + three->occupancy().total_space());
+ BOOST_REQUIRE_EQUAL(one_and_two.hard_memory_used(), one_and_two.memory_used());

with_allocator(two->allocator(), [&] {
two_objs.clear();
});
two.reset();
BOOST_REQUIRE_EQUAL(one_and_two.memory_used(), one->occupancy().total_space());
- BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used() + three->occupancy().total_space());
+ BOOST_REQUIRE_EQUAL(one_and_two.hard_memory_used(), one_and_two.memory_used());

with_allocator(one->allocator(), [&] {
one_objs.clear();
});
one.reset();
BOOST_REQUIRE_EQUAL(one_and_two.memory_used(), 0);
- BOOST_REQUIRE_EQUAL(all.memory_used(), three->occupancy().total_space());
+ BOOST_REQUIRE_EQUAL(one_and_two.hard_memory_used(), 0);

with_allocator(three->allocator(), [&] {
three_objs.clear();
@@ -151,7 +149,7 @@ SEASTAR_TEST_CASE(test_region_groups) {
three.reset();
four.reset();
five.reset();
- BOOST_REQUIRE_EQUAL(all.memory_used(), 0);
+ BOOST_REQUIRE_EQUAL(one_and_two.hard_memory_used(), 0);
});
}

@@ -169,10 +167,8 @@ inline void quiesce(FutureType&& fut) {
// Simple RAII structure that wraps around a region_group
// Not using defer because we usually employ many region groups
struct test_region_group: public region_group {
- test_region_group(region_group* parent, region_group_reclaimer& reclaimer)
- : region_group("test_region_group", parent, reclaimer) {}
- test_region_group(region_group_reclaimer& reclaimer)
- : region_group("test_region_group", nullptr, reclaimer) {}
+ test_region_group(reclaim_config cfg)
+ : region_group("test_region_group", std::move(cfg)) {}

~test_region_group() {
shutdown().get();
@@ -212,10 +208,8 @@ struct test_region: public dirty_memory_manager_logalloc::size_tracked_region {

SEASTAR_TEST_CASE(test_region_groups_basic_throttling) {
return seastar::async([] {
- region_group_reclaimer simple_reclaimer(logalloc::segment_size);
-
// singleton hierarchy, only one segment allowed
- test_region_group simple(simple_reclaimer);
+ test_region_group simple({ .hard_limit = logalloc::segment_size });
auto simple_region = std::make_unique<test_region>();
simple_region->listen(&simple);

@@ -268,67 +262,10 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling) {
});
}

-SEASTAR_TEST_CASE(test_region_groups_linear_hierarchy_throttling_child_alloc) {
- return seastar::async([] {
- region_group_reclaimer parent_reclaimer(2 * logalloc::segment_size);
- region_group_reclaimer child_reclaimer(logalloc::segment_size);
-
- test_region_group parent(parent_reclaimer);
- test_region_group child(&parent, child_reclaimer);
-
- auto child_region = std::make_unique<test_region>();
- child_region->listen(&child);
- auto parent_region = std::make_unique<test_region>();
- parent_region->listen(&parent);
-
- child_region->alloc();
- BOOST_REQUIRE_GE(parent.memory_used(), logalloc::segment_size);
-
- auto fut = parent.run_when_memory_available([&parent_region] { parent_region->alloc_small(); }, db::no_timeout);
- BOOST_REQUIRE_EQUAL(fut.available(), true);
- BOOST_REQUIRE_GE(parent.memory_used(), 2 * logalloc::segment_size);
-
- // This time child will use all parent's memory. Note that because the child's memory limit
- // is lower than the parent's, for that to happen we need to allocate directly.
- child_region->alloc();
- BOOST_REQUIRE_GE(child.memory_used(), 2 * logalloc::segment_size);
-
- fut = parent.run_when_memory_available([&parent_region] { parent_region->alloc_small(); }, db::no_timeout);
- BOOST_REQUIRE_EQUAL(fut.available(), false);
- BOOST_REQUIRE_GE(parent.memory_used(), 2 * logalloc::segment_size);
-
- child_region.reset();
- quiesce(std::move(fut));
- });
-}
-
-SEASTAR_TEST_CASE(test_region_groups_linear_hierarchy_throttling_parent_alloc) {
- return seastar::async([] {
- region_group_reclaimer simple_reclaimer(logalloc::segment_size);
-
- test_region_group parent(simple_reclaimer);
- test_region_group child(&parent, simple_reclaimer);
-
- auto parent_region = std::make_unique<test_region>();
- parent_region->listen(&parent);
-
- parent_region->alloc();
- BOOST_REQUIRE_GE(parent.memory_used(), logalloc::segment_size);
-
- auto fut = child.run_when_memory_available([] {}, db::no_timeout);
- BOOST_REQUIRE_EQUAL(fut.available(), false);
-
- parent_region.reset();
- quiesce(std::move(fut));
- });
-}
-
SEASTAR_TEST_CASE(test_region_groups_fifo_order) {
// tests that requests that are queued for later execution execute in FIFO order
return seastar::async([] {
- region_group_reclaimer simple_reclaimer(logalloc::segment_size);
-
- test_region_group rg(simple_reclaimer);
+ test_region_group rg({.hard_limit = logalloc::segment_size});

auto region = std::make_unique<test_region>();
region->listen(&rg);
@@ -353,124 +290,6 @@ SEASTAR_TEST_CASE(test_region_groups_fifo_order) {
});
}

-SEASTAR_TEST_CASE(test_region_groups_linear_hierarchy_throttling_moving_restriction) {
- // Hierarchy here is A -> B -> C.
- // We will fill B causing an execution in C to fail. We then fill A and free B.
- //
- // C should still be blocked.
- return seastar::async([] {
- region_group_reclaimer simple_reclaimer(logalloc::segment_size);
-
- test_region_group root(simple_reclaimer);
- test_region_group inner(&root, simple_reclaimer);
- test_region_group child(&inner, simple_reclaimer);
-
- auto inner_region = std::make_unique<test_region>();
- inner_region->listen(&inner);
- auto root_region = std::make_unique<test_region>();
- root_region->listen(&root);
-
- // fill the inner node. Try allocating at child level. Should not be allowed.
- circular_buffer<managed_bytes> big_alloc;
- with_allocator(inner_region->allocator(), [&big_alloc] {
- big_alloc.push_back(managed_bytes(bytes(bytes::initialized_later(), logalloc::segment_size)));
- });
- BOOST_REQUIRE_GE(inner.memory_used(), logalloc::segment_size);
-
- auto fut = child.run_when_memory_available([] {}, db::no_timeout);
- BOOST_REQUIRE_EQUAL(fut.available(), false);
-
- // Now fill the root...
- with_allocator(root_region->allocator(), [&big_alloc] {
- big_alloc.push_back(managed_bytes(bytes(bytes::initialized_later(), logalloc::segment_size)));
- });
- BOOST_REQUIRE_GE(root.memory_used(), logalloc::segment_size);
-
- // And free the inner node. We will verify that
- // 1) the notifications that the inner node sent the child when it was freed won't
- // erroneously cause it to execute
- // 2) the child is still able to receive notifications from the root
- with_allocator(inner_region->allocator(), [&big_alloc] {
- big_alloc.pop_front();
- });
- inner_region.reset();
-
- // Verifying (1)
- // Can't quiesce because we don't want to wait on the futures.
- sleep(10ms).get();
- BOOST_REQUIRE_EQUAL(fut.available(), false);
-
- // Verifying (2)
- with_allocator(root_region->allocator(), [&big_alloc] {
- big_alloc.pop_front();
- });
- root_region.reset();
- quiesce(std::move(fut));
- });
-}
-
-SEASTAR_TEST_CASE(test_region_groups_tree_hierarchy_throttling_leaf_alloc) {
- return seastar::async([] {
- class leaf {
- region_group_reclaimer _leaf_reclaimer;
- test_region_group _rg;
- std::unique_ptr<test_region> _region;
- public:
- leaf(test_region_group& parent)
- : _leaf_reclaimer(logalloc::segment_size)
- , _rg(&parent, _leaf_reclaimer)
- , _region(std::make_unique<test_region>())
- {
- _region->listen(&_rg);
- }
-
- void alloc(size_t size) {
- _region->alloc(size);
- }
-
- future<> try_alloc(size_t size) {
- return _rg.run_when_memory_available([this, size] {
- alloc(size);
- }, db::no_timeout);
- }
- void reset() {
- _region.reset(new test_region());
- _region->listen(&_rg);
- }
- };
-
- region_group_reclaimer simple_reclaimer(logalloc::segment_size);
- test_region_group parent(simple_reclaimer);
-
- leaf first_leaf(parent);
- leaf second_leaf(parent);
- leaf third_leaf(parent);
-
- first_leaf.alloc(logalloc::segment_size);
- second_leaf.alloc(logalloc::segment_size);
- third_leaf.alloc(logalloc::segment_size);
-
- auto fut_1 = first_leaf.try_alloc(sizeof(uint64_t));
- auto fut_2 = second_leaf.try_alloc(sizeof(uint64_t));
- auto fut_3 = third_leaf.try_alloc(sizeof(uint64_t));
-
- BOOST_REQUIRE_EQUAL(fut_1.available() || fut_2.available() || fut_3.available(), false);
-
- // Total memory is still 2 * segment_size, can't proceed
- first_leaf.reset();
- // Can't quiesce because we don't want to wait on the futures.
- sleep(10ms).get();
-
- BOOST_REQUIRE_EQUAL(fut_1.available() || fut_2.available() || fut_3.available(), false);
-
- // Now all futures should resolve.
- first_leaf.reset();
- second_leaf.reset();
- third_leaf.reset();
- quiesce(when_all(std::move(fut_1), std::move(fut_2), std::move(fut_3)));
- });
-}
-
// Helper for all async reclaim tests.
class test_async_reclaim_region {
dirty_memory_manager_logalloc::size_tracked_region _region;
@@ -514,20 +333,20 @@ class test_async_reclaim_region {
}
};

-class test_reclaimer: public region_group_reclaimer {
+class test_reclaimer {
test_reclaimer *_result_accumulator;
region_group _rg;
std::vector<size_t> _reclaim_sizes;
shared_promise<> _unleash_reclaimer;
seastar::gate _reclaimers_done;
promise<> _unleashed;
public:
- virtual void start_reclaiming() noexcept override {
+ void start_reclaiming() noexcept {
// Future is waited on indirectly in `~test_reclaimer()` (via `_reclaimers_done`).
(void)with_gate(_reclaimers_done, [this] {
return _unleash_reclaimer.get_shared_future().then([this] {
_unleashed.set_value();
- while (this->under_pressure()) {
+ while (_rg.under_pressure()) {
size_t reclaimed = test_async_reclaim_region::from_region(_rg.get_largest_region()).evict();
_result_accumulator->_reclaim_sizes.push_back(reclaimed);
}
@@ -548,8 +367,12 @@ class test_reclaimer: public region_group_reclaimer {
return _rg;
}

- test_reclaimer(size_t threshold) : region_group_reclaimer(threshold), _result_accumulator(this), _rg("test_reclaimer RG", *this) {}
- test_reclaimer(test_reclaimer& parent, size_t threshold) : region_group_reclaimer(threshold), _result_accumulator(&parent), _rg("test_reclaimer RG", &parent._rg, *this) {}
+ test_reclaimer(size_t threshold)
+ : _result_accumulator(this)
+ , _rg("test_reclaimer RG", {
+ .hard_limit = threshold,
+ .start_reclaiming = std::bind_front(&test_reclaimer::start_reclaiming, this),
+ }) {}

future<> unleash(future<> after) {
// Result indirectly forwarded to _unleashed (returned below).
@@ -608,33 +431,6 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_worst_offen
});
}

-
-SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_ancestor_block) {
- return seastar::async([] {
- // allocate a parent region group (A) with a leaf region group (B)
- // Make sure that active reclaim still works when we block at an ancestor
- test_reclaimer root(logalloc::segment_size);
- test_reclaimer leaf(root, logalloc::segment_size);
-
- test_async_reclaim_region root_region(root.rg(), logalloc::segment_size);
- auto f = root.unleash(make_ready_future<>());
- // FIXME: discarded future.
- (void)leaf.unleash(std::move(f));
-
- // Can't run this function until we have reclaimed. Try at the leaf, and we'll make sure
- // that the root reclaims
- auto fut = leaf.rg().run_when_memory_available([&root] {
- BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1);
- }, db::no_timeout);
-
- // Initially not available
- BOOST_REQUIRE_EQUAL(fut.available(), false);
- quiesce(std::move(fut));
-
- BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[0], logalloc::segment_size);
- });
-}
-
// Reproduces issue #2021
SEASTAR_TEST_CASE(test_no_crash_when_a_lot_of_requests_released_which_change_region_group_size) {
return seastar::async([test_name = get_name()] {
@@ -643,8 +439,7 @@ SEASTAR_TEST_CASE(test_no_crash_when_a_lot_of_requests_released_which_change_reg

auto free_space = memory::stats().free_memory();
size_t threshold = size_t(0.75 * free_space);
- region_group_reclaimer recl(threshold, threshold);
- region_group gr(test_name, recl);
+ region_group gr(test_name, {.hard_limit = threshold, .soft_limit = threshold});
auto close_gr = defer([&gr] () noexcept { gr.shutdown().get(); });
size_tracked_region r;
r.listen(&gr);
@@ -663,7 +458,7 @@ SEASTAR_TEST_CASE(test_no_crash_when_a_lot_of_requests_released_which_change_reg
});

auto fill_to_pressure = [&] {
- while (!recl.under_pressure()) {
+ while (!gr.under_pressure()) {
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), 1024));
}
};
@@ -682,7 +477,7 @@ SEASTAR_TEST_CASE(test_no_crash_when_a_lot_of_requests_released_which_change_reg
}

// Release
- while (recl.under_pressure()) {
+ while (gr.under_pressure()) {
objs.pop_back();
}
});
@@ -695,58 +490,46 @@ SEASTAR_TEST_CASE(test_reclaiming_runs_as_long_as_there_is_soft_pressure) {
size_t hard_threshold = logalloc::segment_size * 8;
size_t soft_threshold = hard_threshold / 2;

- class reclaimer : public region_group_reclaimer {
- bool _reclaim = false;
- protected:
- void start_reclaiming() noexcept override {
- _reclaim = true;
- }
-
- void stop_reclaiming() noexcept override {
- _reclaim = false;
- }
- public:
- reclaimer(size_t hard_threshold, size_t soft_threshold)
- : region_group_reclaimer(hard_threshold, soft_threshold)
- { }
- bool reclaiming() const { return _reclaim; };
- };
-
- reclaimer recl(hard_threshold, soft_threshold);
- region_group gr(test_name, recl);
+ bool reclaiming = false;
+ region_group gr(test_name, {
+ .hard_limit = hard_threshold,
+ .soft_limit = soft_threshold,
+ .start_reclaiming = [&] () noexcept { reclaiming = true; },
+ .stop_reclaiming = [&] () noexcept { reclaiming = false; },
+ });
auto close_gr = defer([&gr] () noexcept { gr.shutdown().get(); });
size_tracked_region r;
r.listen(&gr);

with_allocator(r.allocator(), [&] {
std::vector<managed_bytes> objs;

- BOOST_REQUIRE(!recl.reclaiming());
+ BOOST_REQUIRE(!reclaiming);

- while (!recl.over_soft_limit()) {
+ while (!gr.over_soft_limit()) {
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), logalloc::segment_size));
}

- BOOST_REQUIRE(recl.reclaiming());
+ BOOST_REQUIRE(reclaiming);

- while (!recl.under_pressure()) {
+ while (!gr.under_pressure()) {
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), logalloc::segment_size));
}

- BOOST_REQUIRE(recl.reclaiming());
+ BOOST_REQUIRE(reclaiming);

- while (recl.under_pressure()) {
+ while (gr.under_pressure()) {
objs.pop_back();
}

- BOOST_REQUIRE(recl.over_soft_limit());
- BOOST_REQUIRE(recl.reclaiming());
+ BOOST_REQUIRE(gr.over_soft_limit());
+ BOOST_REQUIRE(reclaiming);

- while (recl.over_soft_limit()) {
+ while (gr.over_soft_limit()) {
objs.pop_back();
}

- BOOST_REQUIRE(!recl.reclaiming());
+ BOOST_REQUIRE(!reclaiming);
});
});
}

Commit Bot

<bot@cloudius-systems.com>
unread,
Oct 3, 2022, 1:04:12 PM10/3/22
to scylladb-dev@googlegroups.com, Botond Dénes
From: Botond Dénes <bde...@scylladb.com>
Committer: Botond Dénes <bde...@scylladb.com>
Branch: master
Reply all
Reply to author
Forward
0 new messages